1 /*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2018, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "postgres_fdw.h"
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44
45 PG_MODULE_MAGIC;
46
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55
56 /*
57 * Indexes of FDW-private information stored in fdw_private lists.
58 *
59 * These items are indexed with the enum FdwScanPrivateIndex, so an item
60 * can be fetched with list_nth(). For example, to get the SELECT statement:
61 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62 */
63 enum FdwScanPrivateIndex
64 {
65 /* SQL statement to execute remotely (as a String node) */
66 FdwScanPrivateSelectSql,
67 /* Integer list of attribute numbers retrieved by the SELECT */
68 FdwScanPrivateRetrievedAttrs,
69 /* Integer representing the desired fetch_size */
70 FdwScanPrivateFetchSize,
71
72 /*
73 * String describing join i.e. names of relations being joined and types
74 * of join, added when the scan is join
75 */
76 FdwScanPrivateRelations
77 };
78
79 /*
80 * Similarly, this enum describes what's kept in the fdw_private list for
81 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
82 *
83 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84 * 2) Integer list of target attribute numbers for INSERT/UPDATE
85 * (NIL for a DELETE)
86 * 3) Boolean flag showing if the remote query has a RETURNING clause
87 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88 */
89 enum FdwModifyPrivateIndex
90 {
91 /* SQL statement to execute remotely (as a String node) */
92 FdwModifyPrivateUpdateSql,
93 /* Integer list of target attribute numbers for INSERT/UPDATE */
94 FdwModifyPrivateTargetAttnums,
95 /* has-returning flag (as an integer Value node) */
96 FdwModifyPrivateHasReturning,
97 /* Integer list of attribute numbers retrieved by RETURNING */
98 FdwModifyPrivateRetrievedAttrs
99 };
100
101 /*
102 * Similarly, this enum describes what's kept in the fdw_private list for
103 * a ForeignScan node that modifies a foreign table directly. We store:
104 *
105 * 1) UPDATE/DELETE statement text to be sent to the remote server
106 * 2) Boolean flag showing if the remote query has a RETURNING clause
107 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108 * 4) Boolean flag showing if we set the command es_processed
109 */
110 enum FdwDirectModifyPrivateIndex
111 {
112 /* SQL statement to execute remotely (as a String node) */
113 FdwDirectModifyPrivateUpdateSql,
114 /* has-returning flag (as an integer Value node) */
115 FdwDirectModifyPrivateHasReturning,
116 /* Integer list of attribute numbers retrieved by RETURNING */
117 FdwDirectModifyPrivateRetrievedAttrs,
118 /* set-processed flag (as an integer Value node) */
119 FdwDirectModifyPrivateSetProcessed
120 };
121
122 /*
123 * Execution state of a foreign scan using postgres_fdw.
124 */
125 typedef struct PgFdwScanState
126 {
127 Relation rel; /* relcache entry for the foreign table. NULL
128 * for a foreign join scan. */
129 TupleDesc tupdesc; /* tuple descriptor of scan */
130 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
131
132 /* extracted fdw_private data */
133 char *query; /* text of SELECT command */
134 List *retrieved_attrs; /* list of retrieved attribute numbers */
135
136 /* for remote query execution */
137 PGconn *conn; /* connection for the scan */
138 unsigned int cursor_number; /* quasi-unique ID for my cursor */
139 bool cursor_exists; /* have we created the cursor? */
140 int numParams; /* number of parameters passed to query */
141 FmgrInfo *param_flinfo; /* output conversion functions for them */
142 List *param_exprs; /* executable expressions for param values */
143 const char **param_values; /* textual values of query parameters */
144
145 /* for storing result tuples */
146 HeapTuple *tuples; /* array of currently-retrieved tuples */
147 int num_tuples; /* # of tuples in array */
148 int next_tuple; /* index of next one to return */
149
150 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
151 int fetch_ct_2; /* Min(# of fetches done, 2) */
152 bool eof_reached; /* true if last fetch reached EOF */
153
154 /* working memory contexts */
155 MemoryContext batch_cxt; /* context holding current batch of tuples */
156 MemoryContext temp_cxt; /* context for per-tuple temporary data */
157
158 int fetch_size; /* number of tuples per fetch */
159 } PgFdwScanState;
160
161 /*
162 * Execution state of a foreign insert/update/delete operation.
163 */
164 typedef struct PgFdwModifyState
165 {
166 Relation rel; /* relcache entry for the foreign table */
167 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168
169 /* for remote query execution */
170 PGconn *conn; /* connection for the scan */
171 char *p_name; /* name of prepared statement, if created */
172
173 /* extracted fdw_private data */
174 char *query; /* text of INSERT/UPDATE/DELETE command */
175 List *target_attrs; /* list of target attribute numbers */
176 bool has_returning; /* is there a RETURNING clause? */
177 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178
179 /* info about parameters for prepared statement */
180 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181 int p_nums; /* number of parameters to transmit */
182 FmgrInfo *p_flinfo; /* output conversion functions for them */
183
184 /* working memory context */
185 MemoryContext temp_cxt; /* context for per-tuple temporary data */
186
187 /* for update row movement if subplan result rel */
188 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
189 * created */
190 } PgFdwModifyState;
191
192 /*
193 * Execution state of a foreign scan that modifies a foreign table directly.
194 */
195 typedef struct PgFdwDirectModifyState
196 {
197 Relation rel; /* relcache entry for the foreign table */
198 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
199
200 /* extracted fdw_private data */
201 char *query; /* text of UPDATE/DELETE command */
202 bool has_returning; /* is there a RETURNING clause? */
203 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
204 bool set_processed; /* do we set the command es_processed? */
205
206 /* for remote query execution */
207 PGconn *conn; /* connection for the update */
208 int numParams; /* number of parameters passed to query */
209 FmgrInfo *param_flinfo; /* output conversion functions for them */
210 List *param_exprs; /* executable expressions for param values */
211 const char **param_values; /* textual values of query parameters */
212
213 /* for storing result tuples */
214 PGresult *result; /* result for query */
215 int num_tuples; /* # of result tuples */
216 int next_tuple; /* index of next one to return */
217 Relation resultRel; /* relcache entry for the target relation */
218 AttrNumber *attnoMap; /* array of attnums of input user columns */
219 AttrNumber ctidAttno; /* attnum of input ctid column */
220 AttrNumber oidAttno; /* attnum of input oid column */
221 bool hasSystemCols; /* are there system columns of resultRel? */
222
223 /* working memory context */
224 MemoryContext temp_cxt; /* context for per-tuple temporary data */
225 } PgFdwDirectModifyState;
226
227 /*
228 * Workspace for analyzing a foreign table.
229 */
230 typedef struct PgFdwAnalyzeState
231 {
232 Relation rel; /* relcache entry for the foreign table */
233 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
234 List *retrieved_attrs; /* attr numbers retrieved by query */
235
236 /* collected sample rows */
237 HeapTuple *rows; /* array of size targrows */
238 int targrows; /* target # of sample rows */
239 int numrows; /* # of sample rows collected */
240
241 /* for random sampling */
242 double samplerows; /* # of rows fetched */
243 double rowstoskip; /* # of rows to skip before next sample */
244 ReservoirStateData rstate; /* state for reservoir sampling */
245
246 /* working memory contexts */
247 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
248 MemoryContext temp_cxt; /* context for per-tuple temporary data */
249 } PgFdwAnalyzeState;
250
251 /*
252 * Identify the attribute where data conversion fails.
253 */
254 typedef struct ConversionLocation
255 {
256 AttrNumber cur_attno; /* attribute number being processed, or 0 */
257 Relation rel; /* foreign table being processed, or NULL */
258 ForeignScanState *fsstate; /* plan node being processed, or NULL */
259 } ConversionLocation;
260
261 /* Callback argument for ec_member_matches_foreign */
262 typedef struct
263 {
264 Expr *current; /* current expr, or NULL if not yet found */
265 List *already_used; /* expressions already dealt with */
266 } ec_member_foreign_arg;
267
268 /*
269 * SQL functions
270 */
271 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
272
273 /*
274 * FDW callback routines
275 */
276 static void postgresGetForeignRelSize(PlannerInfo *root,
277 RelOptInfo *baserel,
278 Oid foreigntableid);
279 static void postgresGetForeignPaths(PlannerInfo *root,
280 RelOptInfo *baserel,
281 Oid foreigntableid);
282 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
283 RelOptInfo *foreignrel,
284 Oid foreigntableid,
285 ForeignPath *best_path,
286 List *tlist,
287 List *scan_clauses,
288 Plan *outer_plan);
289 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
290 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
291 static void postgresReScanForeignScan(ForeignScanState *node);
292 static void postgresEndForeignScan(ForeignScanState *node);
293 static void postgresAddForeignUpdateTargets(Query *parsetree,
294 RangeTblEntry *target_rte,
295 Relation target_relation);
296 static List *postgresPlanForeignModify(PlannerInfo *root,
297 ModifyTable *plan,
298 Index resultRelation,
299 int subplan_index);
300 static void postgresBeginForeignModify(ModifyTableState *mtstate,
301 ResultRelInfo *resultRelInfo,
302 List *fdw_private,
303 int subplan_index,
304 int eflags);
305 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
306 ResultRelInfo *resultRelInfo,
307 TupleTableSlot *slot,
308 TupleTableSlot *planSlot);
309 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
310 ResultRelInfo *resultRelInfo,
311 TupleTableSlot *slot,
312 TupleTableSlot *planSlot);
313 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
314 ResultRelInfo *resultRelInfo,
315 TupleTableSlot *slot,
316 TupleTableSlot *planSlot);
317 static void postgresEndForeignModify(EState *estate,
318 ResultRelInfo *resultRelInfo);
319 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
320 ResultRelInfo *resultRelInfo);
321 static void postgresEndForeignInsert(EState *estate,
322 ResultRelInfo *resultRelInfo);
323 static int postgresIsForeignRelUpdatable(Relation rel);
324 static bool postgresPlanDirectModify(PlannerInfo *root,
325 ModifyTable *plan,
326 Index resultRelation,
327 int subplan_index);
328 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
329 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
330 static void postgresEndDirectModify(ForeignScanState *node);
331 static void postgresExplainForeignScan(ForeignScanState *node,
332 ExplainState *es);
333 static void postgresExplainForeignModify(ModifyTableState *mtstate,
334 ResultRelInfo *rinfo,
335 List *fdw_private,
336 int subplan_index,
337 ExplainState *es);
338 static void postgresExplainDirectModify(ForeignScanState *node,
339 ExplainState *es);
340 static bool postgresAnalyzeForeignTable(Relation relation,
341 AcquireSampleRowsFunc *func,
342 BlockNumber *totalpages);
343 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
344 Oid serverOid);
345 static void postgresGetForeignJoinPaths(PlannerInfo *root,
346 RelOptInfo *joinrel,
347 RelOptInfo *outerrel,
348 RelOptInfo *innerrel,
349 JoinType jointype,
350 JoinPathExtraData *extra);
351 static bool postgresRecheckForeignScan(ForeignScanState *node,
352 TupleTableSlot *slot);
353 static void postgresGetForeignUpperPaths(PlannerInfo *root,
354 UpperRelationKind stage,
355 RelOptInfo *input_rel,
356 RelOptInfo *output_rel,
357 void *extra);
358
359 /*
360 * Helper functions
361 */
362 static void estimate_path_cost_size(PlannerInfo *root,
363 RelOptInfo *foreignrel,
364 List *param_join_conds,
365 List *pathkeys,
366 double *p_rows, int *p_width,
367 Cost *p_startup_cost, Cost *p_total_cost);
368 static void get_remote_estimate(const char *sql,
369 PGconn *conn,
370 double *rows,
371 int *width,
372 Cost *startup_cost,
373 Cost *total_cost);
374 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
375 EquivalenceClass *ec, EquivalenceMember *em,
376 void *arg);
377 static void create_cursor(ForeignScanState *node);
378 static void fetch_more_data(ForeignScanState *node);
379 static void close_cursor(PGconn *conn, unsigned int cursor_number);
380 static PgFdwModifyState *create_foreign_modify(EState *estate,
381 RangeTblEntry *rte,
382 ResultRelInfo *resultRelInfo,
383 CmdType operation,
384 Plan *subplan,
385 char *query,
386 List *target_attrs,
387 bool has_returning,
388 List *retrieved_attrs);
389 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
390 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
391 ItemPointer tupleid,
392 TupleTableSlot *slot);
393 static void store_returning_result(PgFdwModifyState *fmstate,
394 TupleTableSlot *slot, PGresult *res);
395 static void finish_foreign_modify(PgFdwModifyState *fmstate);
396 static List *build_remote_returning(Index rtindex, Relation rel,
397 List *returningList);
398 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
399 static void execute_dml_stmt(ForeignScanState *node);
400 static TupleTableSlot *get_returning_data(ForeignScanState *node);
401 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
402 List *fdw_scan_tlist,
403 Index rtindex);
404 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
405 TupleTableSlot *slot,
406 EState *estate);
407 static void prepare_query_params(PlanState *node,
408 List *fdw_exprs,
409 int numParams,
410 FmgrInfo **param_flinfo,
411 List **param_exprs,
412 const char ***param_values);
413 static void process_query_params(ExprContext *econtext,
414 FmgrInfo *param_flinfo,
415 List *param_exprs,
416 const char **param_values);
417 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
418 HeapTuple *rows, int targrows,
419 double *totalrows,
420 double *totaldeadrows);
421 static void analyze_row_processor(PGresult *res, int row,
422 PgFdwAnalyzeState *astate);
423 static HeapTuple make_tuple_from_result_row(PGresult *res,
424 int row,
425 Relation rel,
426 AttInMetadata *attinmeta,
427 List *retrieved_attrs,
428 ForeignScanState *fsstate,
429 MemoryContext temp_context);
430 static void conversion_error_callback(void *arg);
431 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
432 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
433 JoinPathExtraData *extra);
434 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
435 Node *havingQual);
436 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
437 RelOptInfo *rel);
438 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
439 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
440 Path *epq_path);
441 static void add_foreign_grouping_paths(PlannerInfo *root,
442 RelOptInfo *input_rel,
443 RelOptInfo *grouped_rel,
444 GroupPathExtraData *extra);
445 static void apply_server_options(PgFdwRelationInfo *fpinfo);
446 static void apply_table_options(PgFdwRelationInfo *fpinfo);
447 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
448 const PgFdwRelationInfo *fpinfo_o,
449 const PgFdwRelationInfo *fpinfo_i);
450
451
452 /*
453 * Foreign-data wrapper handler function: return a struct with pointers
454 * to my callback routines.
455 */
456 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)457 postgres_fdw_handler(PG_FUNCTION_ARGS)
458 {
459 FdwRoutine *routine = makeNode(FdwRoutine);
460
461 /* Functions for scanning foreign tables */
462 routine->GetForeignRelSize = postgresGetForeignRelSize;
463 routine->GetForeignPaths = postgresGetForeignPaths;
464 routine->GetForeignPlan = postgresGetForeignPlan;
465 routine->BeginForeignScan = postgresBeginForeignScan;
466 routine->IterateForeignScan = postgresIterateForeignScan;
467 routine->ReScanForeignScan = postgresReScanForeignScan;
468 routine->EndForeignScan = postgresEndForeignScan;
469
470 /* Functions for updating foreign tables */
471 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
472 routine->PlanForeignModify = postgresPlanForeignModify;
473 routine->BeginForeignModify = postgresBeginForeignModify;
474 routine->ExecForeignInsert = postgresExecForeignInsert;
475 routine->ExecForeignUpdate = postgresExecForeignUpdate;
476 routine->ExecForeignDelete = postgresExecForeignDelete;
477 routine->EndForeignModify = postgresEndForeignModify;
478 routine->BeginForeignInsert = postgresBeginForeignInsert;
479 routine->EndForeignInsert = postgresEndForeignInsert;
480 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
481 routine->PlanDirectModify = postgresPlanDirectModify;
482 routine->BeginDirectModify = postgresBeginDirectModify;
483 routine->IterateDirectModify = postgresIterateDirectModify;
484 routine->EndDirectModify = postgresEndDirectModify;
485
486 /* Function for EvalPlanQual rechecks */
487 routine->RecheckForeignScan = postgresRecheckForeignScan;
488 /* Support functions for EXPLAIN */
489 routine->ExplainForeignScan = postgresExplainForeignScan;
490 routine->ExplainForeignModify = postgresExplainForeignModify;
491 routine->ExplainDirectModify = postgresExplainDirectModify;
492
493 /* Support functions for ANALYZE */
494 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
495
496 /* Support functions for IMPORT FOREIGN SCHEMA */
497 routine->ImportForeignSchema = postgresImportForeignSchema;
498
499 /* Support functions for join push-down */
500 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
501
502 /* Support functions for upper relation push-down */
503 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
504
505 PG_RETURN_POINTER(routine);
506 }
507
508 /*
509 * postgresGetForeignRelSize
510 * Estimate # of rows and width of the result of the scan
511 *
512 * We should consider the effect of all baserestrictinfo clauses here, but
513 * not any join clauses.
514 */
515 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)516 postgresGetForeignRelSize(PlannerInfo *root,
517 RelOptInfo *baserel,
518 Oid foreigntableid)
519 {
520 PgFdwRelationInfo *fpinfo;
521 ListCell *lc;
522 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
523 const char *namespace;
524 const char *relname;
525 const char *refname;
526
527 /*
528 * We use PgFdwRelationInfo to pass various information to subsequent
529 * functions.
530 */
531 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
532 baserel->fdw_private = (void *) fpinfo;
533
534 /* Base foreign tables need to be pushed down always. */
535 fpinfo->pushdown_safe = true;
536
537 /* Look up foreign-table catalog info. */
538 fpinfo->table = GetForeignTable(foreigntableid);
539 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
540
541 /*
542 * Extract user-settable option values. Note that per-table settings of
543 * use_remote_estimate and fetch_size override per-server settings of
544 * them, respectively.
545 */
546 fpinfo->use_remote_estimate = false;
547 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
548 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
549 fpinfo->shippable_extensions = NIL;
550 fpinfo->fetch_size = 100;
551
552 apply_server_options(fpinfo);
553 apply_table_options(fpinfo);
554
555 /*
556 * If the table or the server is configured to use remote estimates,
557 * identify which user to do remote access as during planning. This
558 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
559 * permissions, the query would have failed at runtime anyway.
560 */
561 if (fpinfo->use_remote_estimate)
562 {
563 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
564
565 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
566 }
567 else
568 fpinfo->user = NULL;
569
570 /*
571 * Identify which baserestrictinfo clauses can be sent to the remote
572 * server and which can't.
573 */
574 classifyConditions(root, baserel, baserel->baserestrictinfo,
575 &fpinfo->remote_conds, &fpinfo->local_conds);
576
577 /*
578 * Identify which attributes will need to be retrieved from the remote
579 * server. These include all attrs needed for joins or final output, plus
580 * all attrs used in the local_conds. (Note: if we end up using a
581 * parameterized scan, it's possible that some of the join clauses will be
582 * sent to the remote and thus we wouldn't really need to retrieve the
583 * columns used in them. Doesn't seem worth detecting that case though.)
584 */
585 fpinfo->attrs_used = NULL;
586 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
587 &fpinfo->attrs_used);
588 foreach(lc, fpinfo->local_conds)
589 {
590 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
591
592 pull_varattnos((Node *) rinfo->clause, baserel->relid,
593 &fpinfo->attrs_used);
594 }
595
596 /*
597 * Compute the selectivity and cost of the local_conds, so we don't have
598 * to do it over again for each path. The best we can do for these
599 * conditions is to estimate selectivity on the basis of local statistics.
600 */
601 fpinfo->local_conds_sel = clauselist_selectivity(root,
602 fpinfo->local_conds,
603 baserel->relid,
604 JOIN_INNER,
605 NULL);
606
607 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
608
609 /*
610 * Set cached relation costs to some negative value, so that we can detect
611 * when they are set to some sensible costs during one (usually the first)
612 * of the calls to estimate_path_cost_size().
613 */
614 fpinfo->rel_startup_cost = -1;
615 fpinfo->rel_total_cost = -1;
616
617 /*
618 * If the table or the server is configured to use remote estimates,
619 * connect to the foreign server and execute EXPLAIN to estimate the
620 * number of rows selected by the restriction clauses, as well as the
621 * average row width. Otherwise, estimate using whatever statistics we
622 * have locally, in a way similar to ordinary tables.
623 */
624 if (fpinfo->use_remote_estimate)
625 {
626 /*
627 * Get cost/size estimates with help of remote server. Save the
628 * values in fpinfo so we don't need to do it again to generate the
629 * basic foreign path.
630 */
631 estimate_path_cost_size(root, baserel, NIL, NIL,
632 &fpinfo->rows, &fpinfo->width,
633 &fpinfo->startup_cost, &fpinfo->total_cost);
634
635 /* Report estimated baserel size to planner. */
636 baserel->rows = fpinfo->rows;
637 baserel->reltarget->width = fpinfo->width;
638 }
639 else
640 {
641 /*
642 * If the foreign table has never been ANALYZEd, it will have relpages
643 * and reltuples equal to zero, which most likely has nothing to do
644 * with reality. We can't do a whole lot about that if we're not
645 * allowed to consult the remote server, but we can use a hack similar
646 * to plancat.c's treatment of empty relations: use a minimum size
647 * estimate of 10 pages, and divide by the column-datatype-based width
648 * estimate to get the corresponding number of tuples.
649 */
650 if (baserel->pages == 0 && baserel->tuples == 0)
651 {
652 baserel->pages = 10;
653 baserel->tuples =
654 (10 * BLCKSZ) / (baserel->reltarget->width +
655 MAXALIGN(SizeofHeapTupleHeader));
656 }
657
658 /* Estimate baserel size as best we can with local statistics. */
659 set_baserel_size_estimates(root, baserel);
660
661 /* Fill in basically-bogus cost estimates for use later. */
662 estimate_path_cost_size(root, baserel, NIL, NIL,
663 &fpinfo->rows, &fpinfo->width,
664 &fpinfo->startup_cost, &fpinfo->total_cost);
665 }
666
667 /*
668 * Set the name of relation in fpinfo, while we are constructing it here.
669 * It will be used to build the string describing the join relation in
670 * EXPLAIN output. We can't know whether VERBOSE option is specified or
671 * not, so always schema-qualify the foreign table name.
672 */
673 fpinfo->relation_name = makeStringInfo();
674 namespace = get_namespace_name(get_rel_namespace(foreigntableid));
675 relname = get_rel_name(foreigntableid);
676 refname = rte->eref->aliasname;
677 appendStringInfo(fpinfo->relation_name, "%s.%s",
678 quote_identifier(namespace),
679 quote_identifier(relname));
680 if (*refname && strcmp(refname, relname) != 0)
681 appendStringInfo(fpinfo->relation_name, " %s",
682 quote_identifier(rte->eref->aliasname));
683
684 /* No outer and inner relations. */
685 fpinfo->make_outerrel_subquery = false;
686 fpinfo->make_innerrel_subquery = false;
687 fpinfo->lower_subquery_rels = NULL;
688 /* Set the relation index. */
689 fpinfo->relation_index = baserel->relid;
690 }
691
692 /*
693 * get_useful_ecs_for_relation
694 * Determine which EquivalenceClasses might be involved in useful
695 * orderings of this relation.
696 *
697 * This function is in some respects a mirror image of the core function
698 * pathkeys_useful_for_merging: for a regular table, we know what indexes
699 * we have and want to test whether any of them are useful. For a foreign
700 * table, we don't know what indexes are present on the remote side but
701 * want to speculate about which ones we'd like to use if they existed.
702 *
703 * This function returns a list of potentially-useful equivalence classes,
704 * but it does not guarantee that an EquivalenceMember exists which contains
705 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
706 * ft1.x + t1.x = 0, this function will say that the equivalence class
707 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
708 * t1 is local (or on a different server), it will turn out that no useful
709 * ORDER BY clause can be generated. It's not our job to figure that out
710 * here; we're only interested in identifying relevant ECs.
711 */
712 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)713 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
714 {
715 List *useful_eclass_list = NIL;
716 ListCell *lc;
717 Relids relids;
718
719 /*
720 * First, consider whether any active EC is potentially useful for a merge
721 * join against this relation.
722 */
723 if (rel->has_eclass_joins)
724 {
725 foreach(lc, root->eq_classes)
726 {
727 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
728
729 if (eclass_useful_for_merging(root, cur_ec, rel))
730 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
731 }
732 }
733
734 /*
735 * Next, consider whether there are any non-EC derivable join clauses that
736 * are merge-joinable. If the joininfo list is empty, we can exit
737 * quickly.
738 */
739 if (rel->joininfo == NIL)
740 return useful_eclass_list;
741
742 /* If this is a child rel, we must use the topmost parent rel to search. */
743 if (IS_OTHER_REL(rel))
744 {
745 Assert(!bms_is_empty(rel->top_parent_relids));
746 relids = rel->top_parent_relids;
747 }
748 else
749 relids = rel->relids;
750
751 /* Check each join clause in turn. */
752 foreach(lc, rel->joininfo)
753 {
754 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
755
756 /* Consider only mergejoinable clauses */
757 if (restrictinfo->mergeopfamilies == NIL)
758 continue;
759
760 /* Make sure we've got canonical ECs. */
761 update_mergeclause_eclasses(root, restrictinfo);
762
763 /*
764 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
765 * that left_ec and right_ec will be initialized, per comments in
766 * distribute_qual_to_rels.
767 *
768 * We want to identify which side of this merge-joinable clause
769 * contains columns from the relation produced by this RelOptInfo. We
770 * test for overlap, not containment, because there could be extra
771 * relations on either side. For example, suppose we've got something
772 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
773 * A.y = D.y. The input rel might be the joinrel between A and B, and
774 * we'll consider the join clause A.y = D.y. relids contains a
775 * relation not involved in the join class (B) and the equivalence
776 * class for the left-hand side of the clause contains a relation not
777 * involved in the input rel (C). Despite the fact that we have only
778 * overlap and not containment in either direction, A.y is potentially
779 * useful as a sort column.
780 *
781 * Note that it's even possible that relids overlaps neither side of
782 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
783 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
784 * but overlaps neither side of B. In that case, we just skip this
785 * join clause, since it doesn't suggest a useful sort order for this
786 * relation.
787 */
788 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
789 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
790 restrictinfo->right_ec);
791 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
792 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
793 restrictinfo->left_ec);
794 }
795
796 return useful_eclass_list;
797 }
798
799 /*
800 * get_useful_pathkeys_for_relation
801 * Determine which orderings of a relation might be useful.
802 *
803 * Getting data in sorted order can be useful either because the requested
804 * order matches the final output ordering for the overall query we're
805 * planning, or because it enables an efficient merge join. Here, we try
806 * to figure out which pathkeys to consider.
807 */
808 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)809 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
810 {
811 List *useful_pathkeys_list = NIL;
812 List *useful_eclass_list;
813 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
814 EquivalenceClass *query_ec = NULL;
815 ListCell *lc;
816
817 /*
818 * Pushing the query_pathkeys to the remote server is always worth
819 * considering, because it might let us avoid a local sort.
820 */
821 if (root->query_pathkeys)
822 {
823 bool query_pathkeys_ok = true;
824
825 foreach(lc, root->query_pathkeys)
826 {
827 PathKey *pathkey = (PathKey *) lfirst(lc);
828 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
829 Expr *em_expr;
830
831 /*
832 * The planner and executor don't have any clever strategy for
833 * taking data sorted by a prefix of the query's pathkeys and
834 * getting it to be sorted by all of those pathkeys. We'll just
835 * end up resorting the entire data set. So, unless we can push
836 * down all of the query pathkeys, forget it.
837 *
838 * is_foreign_expr would detect volatile expressions as well, but
839 * checking ec_has_volatile here saves some cycles.
840 */
841 if (pathkey_ec->ec_has_volatile ||
842 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
843 !is_foreign_expr(root, rel, em_expr))
844 {
845 query_pathkeys_ok = false;
846 break;
847 }
848 }
849
850 if (query_pathkeys_ok)
851 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
852 }
853
854 /*
855 * Even if we're not using remote estimates, having the remote side do the
856 * sort generally won't be any worse than doing it locally, and it might
857 * be much better if the remote side can generate data in the right order
858 * without needing a sort at all. However, what we're going to do next is
859 * try to generate pathkeys that seem promising for possible merge joins,
860 * and that's more speculative. A wrong choice might hurt quite a bit, so
861 * bail out if we can't use remote estimates.
862 */
863 if (!fpinfo->use_remote_estimate)
864 return useful_pathkeys_list;
865
866 /* Get the list of interesting EquivalenceClasses. */
867 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
868
869 /* Extract unique EC for query, if any, so we don't consider it again. */
870 if (list_length(root->query_pathkeys) == 1)
871 {
872 PathKey *query_pathkey = linitial(root->query_pathkeys);
873
874 query_ec = query_pathkey->pk_eclass;
875 }
876
877 /*
878 * As a heuristic, the only pathkeys we consider here are those of length
879 * one. It's surely possible to consider more, but since each one we
880 * choose to consider will generate a round-trip to the remote side, we
881 * need to be a bit cautious here. It would sure be nice to have a local
882 * cache of information about remote index definitions...
883 */
884 foreach(lc, useful_eclass_list)
885 {
886 EquivalenceClass *cur_ec = lfirst(lc);
887 Expr *em_expr;
888 PathKey *pathkey;
889
890 /* If redundant with what we did above, skip it. */
891 if (cur_ec == query_ec)
892 continue;
893
894 /* If no pushable expression for this rel, skip it. */
895 em_expr = find_em_expr_for_rel(cur_ec, rel);
896 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
897 continue;
898
899 /* Looks like we can generate a pathkey, so let's do it. */
900 pathkey = make_canonical_pathkey(root, cur_ec,
901 linitial_oid(cur_ec->ec_opfamilies),
902 BTLessStrategyNumber,
903 false);
904 useful_pathkeys_list = lappend(useful_pathkeys_list,
905 list_make1(pathkey));
906 }
907
908 return useful_pathkeys_list;
909 }
910
911 /*
912 * postgresGetForeignPaths
913 * Create possible scan paths for a scan on the foreign table
914 */
915 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)916 postgresGetForeignPaths(PlannerInfo *root,
917 RelOptInfo *baserel,
918 Oid foreigntableid)
919 {
920 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
921 ForeignPath *path;
922 List *ppi_list;
923 ListCell *lc;
924
925 /*
926 * Create simplest ForeignScan path node and add it to baserel. This path
927 * corresponds to SeqScan path of regular tables (though depending on what
928 * baserestrict conditions we were able to send to remote, there might
929 * actually be an indexscan happening there). We already did all the work
930 * to estimate cost and size of this path.
931 *
932 * Although this path uses no join clauses, it could still have required
933 * parameterization due to LATERAL refs in its tlist.
934 */
935 path = create_foreignscan_path(root, baserel,
936 NULL, /* default pathtarget */
937 fpinfo->rows,
938 fpinfo->startup_cost,
939 fpinfo->total_cost,
940 NIL, /* no pathkeys */
941 baserel->lateral_relids,
942 NULL, /* no extra plan */
943 NIL); /* no fdw_private list */
944 add_path(baserel, (Path *) path);
945
946 /* Add paths with pathkeys */
947 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
948
949 /*
950 * If we're not using remote estimates, stop here. We have no way to
951 * estimate whether any join clauses would be worth sending across, so
952 * don't bother building parameterized paths.
953 */
954 if (!fpinfo->use_remote_estimate)
955 return;
956
957 /*
958 * Thumb through all join clauses for the rel to identify which outer
959 * relations could supply one or more safe-to-send-to-remote join clauses.
960 * We'll build a parameterized path for each such outer relation.
961 *
962 * It's convenient to manage this by representing each candidate outer
963 * relation by the ParamPathInfo node for it. We can then use the
964 * ppi_clauses list in the ParamPathInfo node directly as a list of the
965 * interesting join clauses for that rel. This takes care of the
966 * possibility that there are multiple safe join clauses for such a rel,
967 * and also ensures that we account for unsafe join clauses that we'll
968 * still have to enforce locally (since the parameterized-path machinery
969 * insists that we handle all movable clauses).
970 */
971 ppi_list = NIL;
972 foreach(lc, baserel->joininfo)
973 {
974 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
975 Relids required_outer;
976 ParamPathInfo *param_info;
977
978 /* Check if clause can be moved to this rel */
979 if (!join_clause_is_movable_to(rinfo, baserel))
980 continue;
981
982 /* See if it is safe to send to remote */
983 if (!is_foreign_expr(root, baserel, rinfo->clause))
984 continue;
985
986 /* Calculate required outer rels for the resulting path */
987 required_outer = bms_union(rinfo->clause_relids,
988 baserel->lateral_relids);
989 /* We do not want the foreign rel itself listed in required_outer */
990 required_outer = bms_del_member(required_outer, baserel->relid);
991
992 /*
993 * required_outer probably can't be empty here, but if it were, we
994 * couldn't make a parameterized path.
995 */
996 if (bms_is_empty(required_outer))
997 continue;
998
999 /* Get the ParamPathInfo */
1000 param_info = get_baserel_parampathinfo(root, baserel,
1001 required_outer);
1002 Assert(param_info != NULL);
1003
1004 /*
1005 * Add it to list unless we already have it. Testing pointer equality
1006 * is OK since get_baserel_parampathinfo won't make duplicates.
1007 */
1008 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1009 }
1010
1011 /*
1012 * The above scan examined only "generic" join clauses, not those that
1013 * were absorbed into EquivalenceClauses. See if we can make anything out
1014 * of EquivalenceClauses.
1015 */
1016 if (baserel->has_eclass_joins)
1017 {
1018 /*
1019 * We repeatedly scan the eclass list looking for column references
1020 * (or expressions) belonging to the foreign rel. Each time we find
1021 * one, we generate a list of equivalence joinclauses for it, and then
1022 * see if any are safe to send to the remote. Repeat till there are
1023 * no more candidate EC members.
1024 */
1025 ec_member_foreign_arg arg;
1026
1027 arg.already_used = NIL;
1028 for (;;)
1029 {
1030 List *clauses;
1031
1032 /* Make clauses, skipping any that join to lateral_referencers */
1033 arg.current = NULL;
1034 clauses = generate_implied_equalities_for_column(root,
1035 baserel,
1036 ec_member_matches_foreign,
1037 (void *) &arg,
1038 baserel->lateral_referencers);
1039
1040 /* Done if there are no more expressions in the foreign rel */
1041 if (arg.current == NULL)
1042 {
1043 Assert(clauses == NIL);
1044 break;
1045 }
1046
1047 /* Scan the extracted join clauses */
1048 foreach(lc, clauses)
1049 {
1050 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1051 Relids required_outer;
1052 ParamPathInfo *param_info;
1053
1054 /* Check if clause can be moved to this rel */
1055 if (!join_clause_is_movable_to(rinfo, baserel))
1056 continue;
1057
1058 /* See if it is safe to send to remote */
1059 if (!is_foreign_expr(root, baserel, rinfo->clause))
1060 continue;
1061
1062 /* Calculate required outer rels for the resulting path */
1063 required_outer = bms_union(rinfo->clause_relids,
1064 baserel->lateral_relids);
1065 required_outer = bms_del_member(required_outer, baserel->relid);
1066 if (bms_is_empty(required_outer))
1067 continue;
1068
1069 /* Get the ParamPathInfo */
1070 param_info = get_baserel_parampathinfo(root, baserel,
1071 required_outer);
1072 Assert(param_info != NULL);
1073
1074 /* Add it to list unless we already have it */
1075 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1076 }
1077
1078 /* Try again, now ignoring the expression we found this time */
1079 arg.already_used = lappend(arg.already_used, arg.current);
1080 }
1081 }
1082
1083 /*
1084 * Now build a path for each useful outer relation.
1085 */
1086 foreach(lc, ppi_list)
1087 {
1088 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1089 double rows;
1090 int width;
1091 Cost startup_cost;
1092 Cost total_cost;
1093
1094 /* Get a cost estimate from the remote */
1095 estimate_path_cost_size(root, baserel,
1096 param_info->ppi_clauses, NIL,
1097 &rows, &width,
1098 &startup_cost, &total_cost);
1099
1100 /*
1101 * ppi_rows currently won't get looked at by anything, but still we
1102 * may as well ensure that it matches our idea of the rowcount.
1103 */
1104 param_info->ppi_rows = rows;
1105
1106 /* Make the path */
1107 path = create_foreignscan_path(root, baserel,
1108 NULL, /* default pathtarget */
1109 rows,
1110 startup_cost,
1111 total_cost,
1112 NIL, /* no pathkeys */
1113 param_info->ppi_req_outer,
1114 NULL,
1115 NIL); /* no fdw_private list */
1116 add_path(baserel, (Path *) path);
1117 }
1118 }
1119
1120 /*
1121 * postgresGetForeignPlan
1122 * Create ForeignScan plan node which implements selected best path
1123 */
1124 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1125 postgresGetForeignPlan(PlannerInfo *root,
1126 RelOptInfo *foreignrel,
1127 Oid foreigntableid,
1128 ForeignPath *best_path,
1129 List *tlist,
1130 List *scan_clauses,
1131 Plan *outer_plan)
1132 {
1133 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1134 Index scan_relid;
1135 List *fdw_private;
1136 List *remote_exprs = NIL;
1137 List *local_exprs = NIL;
1138 List *params_list = NIL;
1139 List *fdw_scan_tlist = NIL;
1140 List *fdw_recheck_quals = NIL;
1141 List *retrieved_attrs;
1142 StringInfoData sql;
1143 ListCell *lc;
1144
1145 if (IS_SIMPLE_REL(foreignrel))
1146 {
1147 /*
1148 * For base relations, set scan_relid as the relid of the relation.
1149 */
1150 scan_relid = foreignrel->relid;
1151
1152 /*
1153 * In a base-relation scan, we must apply the given scan_clauses.
1154 *
1155 * Separate the scan_clauses into those that can be executed remotely
1156 * and those that can't. baserestrictinfo clauses that were
1157 * previously determined to be safe or unsafe by classifyConditions
1158 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1159 * else in the scan_clauses list will be a join clause, which we have
1160 * to check for remote-safety.
1161 *
1162 * Note: the join clauses we see here should be the exact same ones
1163 * previously examined by postgresGetForeignPaths. Possibly it'd be
1164 * worth passing forward the classification work done then, rather
1165 * than repeating it here.
1166 *
1167 * This code must match "extract_actual_clauses(scan_clauses, false)"
1168 * except for the additional decision about remote versus local
1169 * execution.
1170 */
1171 foreach(lc, scan_clauses)
1172 {
1173 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1174
1175 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1176 if (rinfo->pseudoconstant)
1177 continue;
1178
1179 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1180 remote_exprs = lappend(remote_exprs, rinfo->clause);
1181 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1182 local_exprs = lappend(local_exprs, rinfo->clause);
1183 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1184 remote_exprs = lappend(remote_exprs, rinfo->clause);
1185 else
1186 local_exprs = lappend(local_exprs, rinfo->clause);
1187 }
1188
1189 /*
1190 * For a base-relation scan, we have to support EPQ recheck, which
1191 * should recheck all the remote quals.
1192 */
1193 fdw_recheck_quals = remote_exprs;
1194 }
1195 else
1196 {
1197 /*
1198 * Join relation or upper relation - set scan_relid to 0.
1199 */
1200 scan_relid = 0;
1201
1202 /*
1203 * For a join rel, baserestrictinfo is NIL and we are not considering
1204 * parameterization right now, so there should be no scan_clauses for
1205 * a joinrel or an upper rel either.
1206 */
1207 Assert(!scan_clauses);
1208
1209 /*
1210 * Instead we get the conditions to apply from the fdw_private
1211 * structure.
1212 */
1213 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1214 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1215
1216 /*
1217 * We leave fdw_recheck_quals empty in this case, since we never need
1218 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1219 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1220 * If we're planning an upperrel (ie, remote grouping or aggregation)
1221 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1222 * allowed, and indeed we *can't* put the remote clauses into
1223 * fdw_recheck_quals because the unaggregated Vars won't be available
1224 * locally.
1225 */
1226
1227 /* Build the list of columns to be fetched from the foreign server. */
1228 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1229
1230 /*
1231 * Ensure that the outer plan produces a tuple whose descriptor
1232 * matches our scan tuple slot. Also, remove the local conditions
1233 * from outer plan's quals, lest they be evaluated twice, once by the
1234 * local plan and once by the scan.
1235 */
1236 if (outer_plan)
1237 {
1238 ListCell *lc;
1239
1240 /*
1241 * Right now, we only consider grouping and aggregation beyond
1242 * joins. Queries involving aggregates or grouping do not require
1243 * EPQ mechanism, hence should not have an outer plan here.
1244 */
1245 Assert(!IS_UPPER_REL(foreignrel));
1246
1247 /*
1248 * First, update the plan's qual list if possible. In some cases
1249 * the quals might be enforced below the topmost plan level, in
1250 * which case we'll fail to remove them; it's not worth working
1251 * harder than this.
1252 */
1253 foreach(lc, local_exprs)
1254 {
1255 Node *qual = lfirst(lc);
1256
1257 outer_plan->qual = list_delete(outer_plan->qual, qual);
1258
1259 /*
1260 * For an inner join the local conditions of foreign scan plan
1261 * can be part of the joinquals as well. (They might also be
1262 * in the mergequals or hashquals, but we can't touch those
1263 * without breaking the plan.)
1264 */
1265 if (IsA(outer_plan, NestLoop) ||
1266 IsA(outer_plan, MergeJoin) ||
1267 IsA(outer_plan, HashJoin))
1268 {
1269 Join *join_plan = (Join *) outer_plan;
1270
1271 if (join_plan->jointype == JOIN_INNER)
1272 join_plan->joinqual = list_delete(join_plan->joinqual,
1273 qual);
1274 }
1275 }
1276
1277 /*
1278 * Now fix the subplan's tlist --- this might result in inserting
1279 * a Result node atop the plan tree.
1280 */
1281 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1282 best_path->path.parallel_safe);
1283 }
1284 }
1285
1286 /*
1287 * Build the query string to be sent for execution, and identify
1288 * expressions to be sent as parameters.
1289 */
1290 initStringInfo(&sql);
1291 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1292 remote_exprs, best_path->path.pathkeys,
1293 false, &retrieved_attrs, ¶ms_list);
1294
1295 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1296 fpinfo->final_remote_exprs = remote_exprs;
1297
1298 /*
1299 * Build the fdw_private list that will be available to the executor.
1300 * Items in the list must match order in enum FdwScanPrivateIndex.
1301 */
1302 fdw_private = list_make3(makeString(sql.data),
1303 retrieved_attrs,
1304 makeInteger(fpinfo->fetch_size));
1305 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1306 fdw_private = lappend(fdw_private,
1307 makeString(fpinfo->relation_name->data));
1308
1309 /*
1310 * Create the ForeignScan node for the given relation.
1311 *
1312 * Note that the remote parameter expressions are stored in the fdw_exprs
1313 * field of the finished plan node; we can't keep them in private state
1314 * because then they wouldn't be subject to later planner processing.
1315 */
1316 return make_foreignscan(tlist,
1317 local_exprs,
1318 scan_relid,
1319 params_list,
1320 fdw_private,
1321 fdw_scan_tlist,
1322 fdw_recheck_quals,
1323 outer_plan);
1324 }
1325
1326 /*
1327 * postgresBeginForeignScan
1328 * Initiate an executor scan of a foreign PostgreSQL table.
1329 */
1330 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1331 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1332 {
1333 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1334 EState *estate = node->ss.ps.state;
1335 PgFdwScanState *fsstate;
1336 RangeTblEntry *rte;
1337 Oid userid;
1338 ForeignTable *table;
1339 UserMapping *user;
1340 int rtindex;
1341 int numParams;
1342
1343 /*
1344 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1345 */
1346 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1347 return;
1348
1349 /*
1350 * We'll save private state in node->fdw_state.
1351 */
1352 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1353 node->fdw_state = (void *) fsstate;
1354
1355 /*
1356 * Identify which user to do the remote access as. This should match what
1357 * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1358 * lowest-numbered member RTE as a representative; we would get the same
1359 * result from any.
1360 */
1361 if (fsplan->scan.scanrelid > 0)
1362 rtindex = fsplan->scan.scanrelid;
1363 else
1364 rtindex = bms_next_member(fsplan->fs_relids, -1);
1365 rte = rt_fetch(rtindex, estate->es_range_table);
1366 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1367
1368 /* Get info about foreign table. */
1369 table = GetForeignTable(rte->relid);
1370 user = GetUserMapping(userid, table->serverid);
1371
1372 /*
1373 * Get connection to the foreign server. Connection manager will
1374 * establish new connection if necessary.
1375 */
1376 fsstate->conn = GetConnection(user, false);
1377
1378 /* Assign a unique ID for my cursor */
1379 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1380 fsstate->cursor_exists = false;
1381
1382 /* Get private info created by planner functions. */
1383 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1384 FdwScanPrivateSelectSql));
1385 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1386 FdwScanPrivateRetrievedAttrs);
1387 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1388 FdwScanPrivateFetchSize));
1389
1390 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1391 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1392 "postgres_fdw tuple data",
1393 ALLOCSET_DEFAULT_SIZES);
1394 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1395 "postgres_fdw temporary data",
1396 ALLOCSET_SMALL_SIZES);
1397
1398 /*
1399 * Get info we'll need for converting data fetched from the foreign server
1400 * into local representation and error reporting during that process.
1401 */
1402 if (fsplan->scan.scanrelid > 0)
1403 {
1404 fsstate->rel = node->ss.ss_currentRelation;
1405 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1406 }
1407 else
1408 {
1409 fsstate->rel = NULL;
1410 fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1411 }
1412
1413 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1414
1415 /*
1416 * Prepare for processing of parameters used in remote query, if any.
1417 */
1418 numParams = list_length(fsplan->fdw_exprs);
1419 fsstate->numParams = numParams;
1420 if (numParams > 0)
1421 prepare_query_params((PlanState *) node,
1422 fsplan->fdw_exprs,
1423 numParams,
1424 &fsstate->param_flinfo,
1425 &fsstate->param_exprs,
1426 &fsstate->param_values);
1427 }
1428
1429 /*
1430 * postgresIterateForeignScan
1431 * Retrieve next row from the result set, or clear tuple slot to indicate
1432 * EOF.
1433 */
1434 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1435 postgresIterateForeignScan(ForeignScanState *node)
1436 {
1437 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1438 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1439
1440 /*
1441 * If this is the first call after Begin or ReScan, we need to create the
1442 * cursor on the remote side.
1443 */
1444 if (!fsstate->cursor_exists)
1445 create_cursor(node);
1446
1447 /*
1448 * Get some more tuples, if we've run out.
1449 */
1450 if (fsstate->next_tuple >= fsstate->num_tuples)
1451 {
1452 /* No point in another fetch if we already detected EOF, though. */
1453 if (!fsstate->eof_reached)
1454 fetch_more_data(node);
1455 /* If we didn't get any tuples, must be end of data. */
1456 if (fsstate->next_tuple >= fsstate->num_tuples)
1457 return ExecClearTuple(slot);
1458 }
1459
1460 /*
1461 * Return the next tuple.
1462 */
1463 ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1464 slot,
1465 InvalidBuffer,
1466 false);
1467
1468 return slot;
1469 }
1470
1471 /*
1472 * postgresReScanForeignScan
1473 * Restart the scan.
1474 */
1475 static void
postgresReScanForeignScan(ForeignScanState * node)1476 postgresReScanForeignScan(ForeignScanState *node)
1477 {
1478 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1479 char sql[64];
1480 PGresult *res;
1481
1482 /* If we haven't created the cursor yet, nothing to do. */
1483 if (!fsstate->cursor_exists)
1484 return;
1485
1486 /*
1487 * If any internal parameters affecting this node have changed, we'd
1488 * better destroy and recreate the cursor. Otherwise, rewinding it should
1489 * be good enough. If we've only fetched zero or one batch, we needn't
1490 * even rewind the cursor, just rescan what we have.
1491 */
1492 if (node->ss.ps.chgParam != NULL)
1493 {
1494 fsstate->cursor_exists = false;
1495 snprintf(sql, sizeof(sql), "CLOSE c%u",
1496 fsstate->cursor_number);
1497 }
1498 else if (fsstate->fetch_ct_2 > 1)
1499 {
1500 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1501 fsstate->cursor_number);
1502 }
1503 else
1504 {
1505 /* Easy: just rescan what we already have in memory, if anything */
1506 fsstate->next_tuple = 0;
1507 return;
1508 }
1509
1510 /*
1511 * We don't use a PG_TRY block here, so be careful not to throw error
1512 * without releasing the PGresult.
1513 */
1514 res = pgfdw_exec_query(fsstate->conn, sql);
1515 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1516 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1517 PQclear(res);
1518
1519 /* Now force a fresh FETCH. */
1520 fsstate->tuples = NULL;
1521 fsstate->num_tuples = 0;
1522 fsstate->next_tuple = 0;
1523 fsstate->fetch_ct_2 = 0;
1524 fsstate->eof_reached = false;
1525 }
1526
1527 /*
1528 * postgresEndForeignScan
1529 * Finish scanning foreign table and dispose objects used for this scan
1530 */
1531 static void
postgresEndForeignScan(ForeignScanState * node)1532 postgresEndForeignScan(ForeignScanState *node)
1533 {
1534 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1535
1536 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1537 if (fsstate == NULL)
1538 return;
1539
1540 /* Close the cursor if open, to prevent accumulation of cursors */
1541 if (fsstate->cursor_exists)
1542 close_cursor(fsstate->conn, fsstate->cursor_number);
1543
1544 /* Release remote connection */
1545 ReleaseConnection(fsstate->conn);
1546 fsstate->conn = NULL;
1547
1548 /* MemoryContexts will be deleted automatically. */
1549 }
1550
1551 /*
1552 * postgresAddForeignUpdateTargets
1553 * Add resjunk column(s) needed for update/delete on a foreign table
1554 */
1555 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1556 postgresAddForeignUpdateTargets(Query *parsetree,
1557 RangeTblEntry *target_rte,
1558 Relation target_relation)
1559 {
1560 Var *var;
1561 const char *attrname;
1562 TargetEntry *tle;
1563
1564 /*
1565 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1566 */
1567
1568 /* Make a Var representing the desired value */
1569 var = makeVar(parsetree->resultRelation,
1570 SelfItemPointerAttributeNumber,
1571 TIDOID,
1572 -1,
1573 InvalidOid,
1574 0);
1575
1576 /* Wrap it in a resjunk TLE with the right name ... */
1577 attrname = "ctid";
1578
1579 tle = makeTargetEntry((Expr *) var,
1580 list_length(parsetree->targetList) + 1,
1581 pstrdup(attrname),
1582 true);
1583
1584 /* ... and add it to the query's targetlist */
1585 parsetree->targetList = lappend(parsetree->targetList, tle);
1586 }
1587
1588 /*
1589 * postgresPlanForeignModify
1590 * Plan an insert/update/delete operation on a foreign table
1591 */
1592 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1593 postgresPlanForeignModify(PlannerInfo *root,
1594 ModifyTable *plan,
1595 Index resultRelation,
1596 int subplan_index)
1597 {
1598 CmdType operation = plan->operation;
1599 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1600 Relation rel;
1601 StringInfoData sql;
1602 List *targetAttrs = NIL;
1603 List *returningList = NIL;
1604 List *retrieved_attrs = NIL;
1605 bool doNothing = false;
1606
1607 initStringInfo(&sql);
1608
1609 /*
1610 * Core code already has some lock on each rel being planned, so we can
1611 * use NoLock here.
1612 */
1613 rel = heap_open(rte->relid, NoLock);
1614
1615 /*
1616 * In an INSERT, we transmit all columns that are defined in the foreign
1617 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1618 * foreign table, we transmit all columns like INSERT; else we transmit
1619 * only columns that were explicitly targets of the UPDATE, so as to avoid
1620 * unnecessary data transmission. (We can't do that for INSERT since we
1621 * would miss sending default values for columns not listed in the source
1622 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1623 * those triggers might change values for non-target columns, in which
1624 * case we would miss sending changed values for those columns.)
1625 */
1626 if (operation == CMD_INSERT ||
1627 (operation == CMD_UPDATE &&
1628 rel->trigdesc &&
1629 rel->trigdesc->trig_update_before_row))
1630 {
1631 TupleDesc tupdesc = RelationGetDescr(rel);
1632 int attnum;
1633
1634 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1635 {
1636 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1637
1638 if (!attr->attisdropped)
1639 targetAttrs = lappend_int(targetAttrs, attnum);
1640 }
1641 }
1642 else if (operation == CMD_UPDATE)
1643 {
1644 int col;
1645
1646 col = -1;
1647 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1648 {
1649 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1650 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1651
1652 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1653 elog(ERROR, "system-column update is not supported");
1654 targetAttrs = lappend_int(targetAttrs, attno);
1655 }
1656 }
1657
1658 /*
1659 * Extract the relevant RETURNING list if any.
1660 */
1661 if (plan->returningLists)
1662 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1663
1664 /*
1665 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1666 * should have already been rejected in the optimizer, as presently there
1667 * is no way to recognize an arbiter index on a foreign table. Only DO
1668 * NOTHING is supported without an inference specification.
1669 */
1670 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1671 doNothing = true;
1672 else if (plan->onConflictAction != ONCONFLICT_NONE)
1673 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1674 (int) plan->onConflictAction);
1675
1676 /*
1677 * Construct the SQL command string.
1678 */
1679 switch (operation)
1680 {
1681 case CMD_INSERT:
1682 deparseInsertSql(&sql, rte, resultRelation, rel,
1683 targetAttrs, doNothing, returningList,
1684 &retrieved_attrs);
1685 break;
1686 case CMD_UPDATE:
1687 deparseUpdateSql(&sql, rte, resultRelation, rel,
1688 targetAttrs, returningList,
1689 &retrieved_attrs);
1690 break;
1691 case CMD_DELETE:
1692 deparseDeleteSql(&sql, rte, resultRelation, rel,
1693 returningList,
1694 &retrieved_attrs);
1695 break;
1696 default:
1697 elog(ERROR, "unexpected operation: %d", (int) operation);
1698 break;
1699 }
1700
1701 heap_close(rel, NoLock);
1702
1703 /*
1704 * Build the fdw_private list that will be available to the executor.
1705 * Items in the list must match enum FdwModifyPrivateIndex, above.
1706 */
1707 return list_make4(makeString(sql.data),
1708 targetAttrs,
1709 makeInteger((retrieved_attrs != NIL)),
1710 retrieved_attrs);
1711 }
1712
1713 /*
1714 * postgresBeginForeignModify
1715 * Begin an insert/update/delete operation on a foreign table
1716 */
1717 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1718 postgresBeginForeignModify(ModifyTableState *mtstate,
1719 ResultRelInfo *resultRelInfo,
1720 List *fdw_private,
1721 int subplan_index,
1722 int eflags)
1723 {
1724 PgFdwModifyState *fmstate;
1725 char *query;
1726 List *target_attrs;
1727 bool has_returning;
1728 List *retrieved_attrs;
1729 RangeTblEntry *rte;
1730
1731 /*
1732 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1733 * stays NULL.
1734 */
1735 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1736 return;
1737
1738 /* Deconstruct fdw_private data. */
1739 query = strVal(list_nth(fdw_private,
1740 FdwModifyPrivateUpdateSql));
1741 target_attrs = (List *) list_nth(fdw_private,
1742 FdwModifyPrivateTargetAttnums);
1743 has_returning = intVal(list_nth(fdw_private,
1744 FdwModifyPrivateHasReturning));
1745 retrieved_attrs = (List *) list_nth(fdw_private,
1746 FdwModifyPrivateRetrievedAttrs);
1747
1748 /* Find RTE. */
1749 rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
1750 mtstate->ps.state->es_range_table);
1751
1752 /* Construct an execution state. */
1753 fmstate = create_foreign_modify(mtstate->ps.state,
1754 rte,
1755 resultRelInfo,
1756 mtstate->operation,
1757 mtstate->mt_plans[subplan_index]->plan,
1758 query,
1759 target_attrs,
1760 has_returning,
1761 retrieved_attrs);
1762
1763 resultRelInfo->ri_FdwState = fmstate;
1764 }
1765
1766 /*
1767 * postgresExecForeignInsert
1768 * Insert one row into a foreign table
1769 */
1770 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1771 postgresExecForeignInsert(EState *estate,
1772 ResultRelInfo *resultRelInfo,
1773 TupleTableSlot *slot,
1774 TupleTableSlot *planSlot)
1775 {
1776 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1777 const char **p_values;
1778 PGresult *res;
1779 int n_rows;
1780
1781 /*
1782 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1783 * postgresBeginForeignInsert())
1784 */
1785 if (fmstate->aux_fmstate)
1786 fmstate = fmstate->aux_fmstate;
1787
1788 /* Set up the prepared statement on the remote server, if we didn't yet */
1789 if (!fmstate->p_name)
1790 prepare_foreign_modify(fmstate);
1791
1792 /* Convert parameters needed by prepared statement to text form */
1793 p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1794
1795 /*
1796 * Execute the prepared statement.
1797 */
1798 if (!PQsendQueryPrepared(fmstate->conn,
1799 fmstate->p_name,
1800 fmstate->p_nums,
1801 p_values,
1802 NULL,
1803 NULL,
1804 0))
1805 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1806
1807 /*
1808 * Get the result, and check for success.
1809 *
1810 * We don't use a PG_TRY block here, so be careful not to throw error
1811 * without releasing the PGresult.
1812 */
1813 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1814 if (PQresultStatus(res) !=
1815 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1816 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1817
1818 /* Check number of rows affected, and fetch RETURNING tuple if any */
1819 if (fmstate->has_returning)
1820 {
1821 n_rows = PQntuples(res);
1822 if (n_rows > 0)
1823 store_returning_result(fmstate, slot, res);
1824 }
1825 else
1826 n_rows = atoi(PQcmdTuples(res));
1827
1828 /* And clean up */
1829 PQclear(res);
1830
1831 MemoryContextReset(fmstate->temp_cxt);
1832
1833 /* Return NULL if nothing was inserted on the remote end */
1834 return (n_rows > 0) ? slot : NULL;
1835 }
1836
1837 /*
1838 * postgresExecForeignUpdate
1839 * Update one row in a foreign table
1840 */
1841 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1842 postgresExecForeignUpdate(EState *estate,
1843 ResultRelInfo *resultRelInfo,
1844 TupleTableSlot *slot,
1845 TupleTableSlot *planSlot)
1846 {
1847 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1848 Datum datum;
1849 bool isNull;
1850 const char **p_values;
1851 PGresult *res;
1852 int n_rows;
1853
1854 /* Set up the prepared statement on the remote server, if we didn't yet */
1855 if (!fmstate->p_name)
1856 prepare_foreign_modify(fmstate);
1857
1858 /* Get the ctid that was passed up as a resjunk column */
1859 datum = ExecGetJunkAttribute(planSlot,
1860 fmstate->ctidAttno,
1861 &isNull);
1862 /* shouldn't ever get a null result... */
1863 if (isNull)
1864 elog(ERROR, "ctid is NULL");
1865
1866 /* Convert parameters needed by prepared statement to text form */
1867 p_values = convert_prep_stmt_params(fmstate,
1868 (ItemPointer) DatumGetPointer(datum),
1869 slot);
1870
1871 /*
1872 * Execute the prepared statement.
1873 */
1874 if (!PQsendQueryPrepared(fmstate->conn,
1875 fmstate->p_name,
1876 fmstate->p_nums,
1877 p_values,
1878 NULL,
1879 NULL,
1880 0))
1881 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1882
1883 /*
1884 * Get the result, and check for success.
1885 *
1886 * We don't use a PG_TRY block here, so be careful not to throw error
1887 * without releasing the PGresult.
1888 */
1889 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1890 if (PQresultStatus(res) !=
1891 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1892 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1893
1894 /* Check number of rows affected, and fetch RETURNING tuple if any */
1895 if (fmstate->has_returning)
1896 {
1897 n_rows = PQntuples(res);
1898 if (n_rows > 0)
1899 store_returning_result(fmstate, slot, res);
1900 }
1901 else
1902 n_rows = atoi(PQcmdTuples(res));
1903
1904 /* And clean up */
1905 PQclear(res);
1906
1907 MemoryContextReset(fmstate->temp_cxt);
1908
1909 /* Return NULL if nothing was updated on the remote end */
1910 return (n_rows > 0) ? slot : NULL;
1911 }
1912
1913 /*
1914 * postgresExecForeignDelete
1915 * Delete one row from a foreign table
1916 */
1917 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1918 postgresExecForeignDelete(EState *estate,
1919 ResultRelInfo *resultRelInfo,
1920 TupleTableSlot *slot,
1921 TupleTableSlot *planSlot)
1922 {
1923 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1924 Datum datum;
1925 bool isNull;
1926 const char **p_values;
1927 PGresult *res;
1928 int n_rows;
1929
1930 /* Set up the prepared statement on the remote server, if we didn't yet */
1931 if (!fmstate->p_name)
1932 prepare_foreign_modify(fmstate);
1933
1934 /* Get the ctid that was passed up as a resjunk column */
1935 datum = ExecGetJunkAttribute(planSlot,
1936 fmstate->ctidAttno,
1937 &isNull);
1938 /* shouldn't ever get a null result... */
1939 if (isNull)
1940 elog(ERROR, "ctid is NULL");
1941
1942 /* Convert parameters needed by prepared statement to text form */
1943 p_values = convert_prep_stmt_params(fmstate,
1944 (ItemPointer) DatumGetPointer(datum),
1945 NULL);
1946
1947 /*
1948 * Execute the prepared statement.
1949 */
1950 if (!PQsendQueryPrepared(fmstate->conn,
1951 fmstate->p_name,
1952 fmstate->p_nums,
1953 p_values,
1954 NULL,
1955 NULL,
1956 0))
1957 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1958
1959 /*
1960 * Get the result, and check for success.
1961 *
1962 * We don't use a PG_TRY block here, so be careful not to throw error
1963 * without releasing the PGresult.
1964 */
1965 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1966 if (PQresultStatus(res) !=
1967 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1968 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1969
1970 /* Check number of rows affected, and fetch RETURNING tuple if any */
1971 if (fmstate->has_returning)
1972 {
1973 n_rows = PQntuples(res);
1974 if (n_rows > 0)
1975 store_returning_result(fmstate, slot, res);
1976 }
1977 else
1978 n_rows = atoi(PQcmdTuples(res));
1979
1980 /* And clean up */
1981 PQclear(res);
1982
1983 MemoryContextReset(fmstate->temp_cxt);
1984
1985 /* Return NULL if nothing was deleted on the remote end */
1986 return (n_rows > 0) ? slot : NULL;
1987 }
1988
1989 /*
1990 * postgresEndForeignModify
1991 * Finish an insert/update/delete operation on a foreign table
1992 */
1993 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1994 postgresEndForeignModify(EState *estate,
1995 ResultRelInfo *resultRelInfo)
1996 {
1997 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1998
1999 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2000 if (fmstate == NULL)
2001 return;
2002
2003 /* Destroy the execution state */
2004 finish_foreign_modify(fmstate);
2005 }
2006
2007 /*
2008 * postgresBeginForeignInsert
2009 * Begin an insert operation on a foreign table
2010 */
2011 static void
postgresBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)2012 postgresBeginForeignInsert(ModifyTableState *mtstate,
2013 ResultRelInfo *resultRelInfo)
2014 {
2015 PgFdwModifyState *fmstate;
2016 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2017 EState *estate = mtstate->ps.state;
2018 Index resultRelation;
2019 Relation rel = resultRelInfo->ri_RelationDesc;
2020 RangeTblEntry *rte;
2021 TupleDesc tupdesc = RelationGetDescr(rel);
2022 int attnum;
2023 StringInfoData sql;
2024 List *targetAttrs = NIL;
2025 List *retrieved_attrs = NIL;
2026 bool doNothing = false;
2027
2028 /*
2029 * If the foreign table we are about to insert routed rows into is also
2030 * an UPDATE subplan result rel that will be updated later, proceeding
2031 * with the INSERT will result in the later UPDATE incorrectly modifying
2032 * those routed rows, so prevent the INSERT --- it would be nice if we
2033 * could handle this case; but for now, throw an error for safety.
2034 */
2035 if (plan && plan->operation == CMD_UPDATE &&
2036 (resultRelInfo->ri_usesFdwDirectModify ||
2037 resultRelInfo->ri_FdwState) &&
2038 resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan)
2039 ereport(ERROR,
2040 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2041 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2042 RelationGetRelationName(rel))));
2043
2044 initStringInfo(&sql);
2045
2046 /* We transmit all columns that are defined in the foreign table. */
2047 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2048 {
2049 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2050
2051 if (!attr->attisdropped)
2052 targetAttrs = lappend_int(targetAttrs, attnum);
2053 }
2054
2055 /* Check if we add the ON CONFLICT clause to the remote query. */
2056 if (plan)
2057 {
2058 OnConflictAction onConflictAction = plan->onConflictAction;
2059
2060 /* We only support DO NOTHING without an inference specification. */
2061 if (onConflictAction == ONCONFLICT_NOTHING)
2062 doNothing = true;
2063 else if (onConflictAction != ONCONFLICT_NONE)
2064 elog(ERROR, "unexpected ON CONFLICT specification: %d",
2065 (int) onConflictAction);
2066 }
2067
2068 /*
2069 * If the foreign table is a partition that doesn't have a corresponding
2070 * RTE entry, we need to create a new RTE
2071 * describing the foreign table for use by deparseInsertSql and
2072 * create_foreign_modify() below, after first copying the parent's RTE and
2073 * modifying some fields to describe the foreign partition to work on.
2074 * However, if this is invoked by UPDATE, the existing RTE may already
2075 * correspond to this partition if it is one of the UPDATE subplan target
2076 * rels; in that case, we can just use the existing RTE as-is.
2077 */
2078 if (resultRelInfo->ri_RangeTableIndex == 0)
2079 {
2080 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2081
2082 rte = list_nth(estate->es_range_table, rootResultRelInfo->ri_RangeTableIndex - 1);
2083 rte = copyObject(rte);
2084 rte->relid = RelationGetRelid(rel);
2085 rte->relkind = RELKIND_FOREIGN_TABLE;
2086
2087 /*
2088 * For UPDATE, we must use the RT index of the first subplan target
2089 * rel's RTE, because the core code would have built expressions for
2090 * the partition, such as RETURNING, using that RT index as varno of
2091 * Vars contained in those expressions.
2092 */
2093 if (plan && plan->operation == CMD_UPDATE &&
2094 rootResultRelInfo->ri_RangeTableIndex == plan->nominalRelation)
2095 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2096 else
2097 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2098 }
2099 else
2100 {
2101 resultRelation = resultRelInfo->ri_RangeTableIndex;
2102 rte = list_nth(estate->es_range_table, resultRelation - 1);
2103 }
2104
2105 /* Construct the SQL command string. */
2106 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2107 resultRelInfo->ri_returningList, &retrieved_attrs);
2108
2109 /* Construct an execution state. */
2110 fmstate = create_foreign_modify(mtstate->ps.state,
2111 rte,
2112 resultRelInfo,
2113 CMD_INSERT,
2114 NULL,
2115 sql.data,
2116 targetAttrs,
2117 retrieved_attrs != NIL,
2118 retrieved_attrs);
2119
2120 /*
2121 * If the given resultRelInfo already has PgFdwModifyState set, it means
2122 * the foreign table is an UPDATE subplan result rel; in which case, store
2123 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2124 */
2125 if (resultRelInfo->ri_FdwState)
2126 {
2127 Assert(plan && plan->operation == CMD_UPDATE);
2128 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2129 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2130 }
2131 else
2132 resultRelInfo->ri_FdwState = fmstate;
2133 }
2134
2135 /*
2136 * postgresEndForeignInsert
2137 * Finish an insert operation on a foreign table
2138 */
2139 static void
postgresEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2140 postgresEndForeignInsert(EState *estate,
2141 ResultRelInfo *resultRelInfo)
2142 {
2143 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2144
2145 Assert(fmstate != NULL);
2146
2147 /*
2148 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2149 * postgresBeginForeignInsert())
2150 */
2151 if (fmstate->aux_fmstate)
2152 fmstate = fmstate->aux_fmstate;
2153
2154 /* Destroy the execution state */
2155 finish_foreign_modify(fmstate);
2156 }
2157
2158 /*
2159 * postgresIsForeignRelUpdatable
2160 * Determine whether a foreign table supports INSERT, UPDATE and/or
2161 * DELETE.
2162 */
2163 static int
postgresIsForeignRelUpdatable(Relation rel)2164 postgresIsForeignRelUpdatable(Relation rel)
2165 {
2166 bool updatable;
2167 ForeignTable *table;
2168 ForeignServer *server;
2169 ListCell *lc;
2170
2171 /*
2172 * By default, all postgres_fdw foreign tables are assumed updatable. This
2173 * can be overridden by a per-server setting, which in turn can be
2174 * overridden by a per-table setting.
2175 */
2176 updatable = true;
2177
2178 table = GetForeignTable(RelationGetRelid(rel));
2179 server = GetForeignServer(table->serverid);
2180
2181 foreach(lc, server->options)
2182 {
2183 DefElem *def = (DefElem *) lfirst(lc);
2184
2185 if (strcmp(def->defname, "updatable") == 0)
2186 updatable = defGetBoolean(def);
2187 }
2188 foreach(lc, table->options)
2189 {
2190 DefElem *def = (DefElem *) lfirst(lc);
2191
2192 if (strcmp(def->defname, "updatable") == 0)
2193 updatable = defGetBoolean(def);
2194 }
2195
2196 /*
2197 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2198 */
2199 return updatable ?
2200 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2201 }
2202
2203 /*
2204 * postgresRecheckForeignScan
2205 * Execute a local join execution plan for a foreign join
2206 */
2207 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2208 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2209 {
2210 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2211 PlanState *outerPlan = outerPlanState(node);
2212 TupleTableSlot *result;
2213
2214 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2215 if (scanrelid > 0)
2216 return true;
2217
2218 Assert(outerPlan != NULL);
2219
2220 /* Execute a local join execution plan */
2221 result = ExecProcNode(outerPlan);
2222 if (TupIsNull(result))
2223 return false;
2224
2225 /* Store result in the given slot */
2226 ExecCopySlot(slot, result);
2227
2228 return true;
2229 }
2230
2231 /*
2232 * postgresPlanDirectModify
2233 * Consider a direct foreign table modification
2234 *
2235 * Decide whether it is safe to modify a foreign table directly, and if so,
2236 * rewrite subplan accordingly.
2237 */
2238 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2239 postgresPlanDirectModify(PlannerInfo *root,
2240 ModifyTable *plan,
2241 Index resultRelation,
2242 int subplan_index)
2243 {
2244 CmdType operation = plan->operation;
2245 Plan *subplan;
2246 RelOptInfo *foreignrel;
2247 RangeTblEntry *rte;
2248 PgFdwRelationInfo *fpinfo;
2249 Relation rel;
2250 StringInfoData sql;
2251 ForeignScan *fscan;
2252 List *targetAttrs = NIL;
2253 List *remote_exprs;
2254 List *params_list = NIL;
2255 List *returningList = NIL;
2256 List *retrieved_attrs = NIL;
2257
2258 /*
2259 * Decide whether it is safe to modify a foreign table directly.
2260 */
2261
2262 /*
2263 * The table modification must be an UPDATE or DELETE.
2264 */
2265 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2266 return false;
2267
2268 /*
2269 * It's unsafe to modify a foreign table directly if there are any local
2270 * joins needed.
2271 */
2272 subplan = (Plan *) list_nth(plan->plans, subplan_index);
2273 if (!IsA(subplan, ForeignScan))
2274 return false;
2275 fscan = (ForeignScan *) subplan;
2276
2277 /*
2278 * It's unsafe to modify a foreign table directly if there are any quals
2279 * that should be evaluated locally.
2280 */
2281 if (subplan->qual != NIL)
2282 return false;
2283
2284 /* Safe to fetch data about the target foreign rel */
2285 if (fscan->scan.scanrelid == 0)
2286 {
2287 foreignrel = find_join_rel(root, fscan->fs_relids);
2288 /* We should have a rel for this foreign join. */
2289 Assert(foreignrel);
2290 }
2291 else
2292 foreignrel = root->simple_rel_array[resultRelation];
2293 rte = root->simple_rte_array[resultRelation];
2294 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2295
2296 /*
2297 * It's unsafe to update a foreign table directly, if any expressions to
2298 * assign to the target columns are unsafe to evaluate remotely.
2299 */
2300 if (operation == CMD_UPDATE)
2301 {
2302 int col;
2303
2304 /*
2305 * We transmit only columns that were explicitly targets of the
2306 * UPDATE, so as to avoid unnecessary data transmission.
2307 */
2308 col = -1;
2309 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2310 {
2311 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2312 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
2313 TargetEntry *tle;
2314
2315 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2316 elog(ERROR, "system-column update is not supported");
2317
2318 tle = get_tle_by_resno(subplan->targetlist, attno);
2319
2320 if (!tle)
2321 elog(ERROR, "attribute number %d not found in subplan targetlist",
2322 attno);
2323
2324 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2325 return false;
2326
2327 targetAttrs = lappend_int(targetAttrs, attno);
2328 }
2329 }
2330
2331 /*
2332 * Ok, rewrite subplan so as to modify the foreign table directly.
2333 */
2334 initStringInfo(&sql);
2335
2336 /*
2337 * Core code already has some lock on each rel being planned, so we can
2338 * use NoLock here.
2339 */
2340 rel = heap_open(rte->relid, NoLock);
2341
2342 /*
2343 * Recall the qual clauses that must be evaluated remotely. (These are
2344 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2345 * doesn't care.)
2346 */
2347 remote_exprs = fpinfo->final_remote_exprs;
2348
2349 /*
2350 * Extract the relevant RETURNING list if any.
2351 */
2352 if (plan->returningLists)
2353 {
2354 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2355
2356 /*
2357 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2358 * we fetch from the foreign server any Vars specified in RETURNING
2359 * that refer not only to the target relation but to non-target
2360 * relations. So we'll deparse them into the RETURNING clause of the
2361 * remote query; use a targetlist consisting of them instead, which
2362 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2363 * node below.
2364 */
2365 if (fscan->scan.scanrelid == 0)
2366 returningList = build_remote_returning(resultRelation, rel,
2367 returningList);
2368 }
2369
2370 /*
2371 * Construct the SQL command string.
2372 */
2373 switch (operation)
2374 {
2375 case CMD_UPDATE:
2376 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2377 foreignrel,
2378 ((Plan *) fscan)->targetlist,
2379 targetAttrs,
2380 remote_exprs, ¶ms_list,
2381 returningList, &retrieved_attrs);
2382 break;
2383 case CMD_DELETE:
2384 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2385 foreignrel,
2386 remote_exprs, ¶ms_list,
2387 returningList, &retrieved_attrs);
2388 break;
2389 default:
2390 elog(ERROR, "unexpected operation: %d", (int) operation);
2391 break;
2392 }
2393
2394 /*
2395 * Update the operation info.
2396 */
2397 fscan->operation = operation;
2398
2399 /*
2400 * Update the fdw_exprs list that will be available to the executor.
2401 */
2402 fscan->fdw_exprs = params_list;
2403
2404 /*
2405 * Update the fdw_private list that will be available to the executor.
2406 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2407 */
2408 fscan->fdw_private = list_make4(makeString(sql.data),
2409 makeInteger((retrieved_attrs != NIL)),
2410 retrieved_attrs,
2411 makeInteger(plan->canSetTag));
2412
2413 /*
2414 * Update the foreign-join-related fields.
2415 */
2416 if (fscan->scan.scanrelid == 0)
2417 {
2418 /* No need for the outer subplan. */
2419 fscan->scan.plan.lefttree = NULL;
2420
2421 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2422 if (returningList)
2423 rebuild_fdw_scan_tlist(fscan, returningList);
2424 }
2425
2426 heap_close(rel, NoLock);
2427 return true;
2428 }
2429
2430 /*
2431 * postgresBeginDirectModify
2432 * Prepare a direct foreign table modification
2433 */
2434 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2435 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2436 {
2437 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2438 EState *estate = node->ss.ps.state;
2439 PgFdwDirectModifyState *dmstate;
2440 Index rtindex;
2441 RangeTblEntry *rte;
2442 Oid userid;
2443 ForeignTable *table;
2444 UserMapping *user;
2445 int numParams;
2446
2447 /*
2448 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2449 */
2450 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2451 return;
2452
2453 /*
2454 * We'll save private state in node->fdw_state.
2455 */
2456 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2457 node->fdw_state = (void *) dmstate;
2458
2459 /*
2460 * Identify which user to do the remote access as. This should match what
2461 * ExecCheckRTEPerms() does.
2462 */
2463 rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
2464 rte = rt_fetch(rtindex, estate->es_range_table);
2465 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2466
2467 /* Get info about foreign table. */
2468 if (fsplan->scan.scanrelid == 0)
2469 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2470 else
2471 dmstate->rel = node->ss.ss_currentRelation;
2472 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2473 user = GetUserMapping(userid, table->serverid);
2474
2475 /*
2476 * Get connection to the foreign server. Connection manager will
2477 * establish new connection if necessary.
2478 */
2479 dmstate->conn = GetConnection(user, false);
2480
2481 /* Update the foreign-join-related fields. */
2482 if (fsplan->scan.scanrelid == 0)
2483 {
2484 /* Save info about foreign table. */
2485 dmstate->resultRel = dmstate->rel;
2486
2487 /*
2488 * Set dmstate->rel to NULL to teach get_returning_data() and
2489 * make_tuple_from_result_row() that columns fetched from the remote
2490 * server are described by fdw_scan_tlist of the foreign-scan plan
2491 * node, not the tuple descriptor for the target relation.
2492 */
2493 dmstate->rel = NULL;
2494 }
2495
2496 /* Initialize state variable */
2497 dmstate->num_tuples = -1; /* -1 means not set yet */
2498
2499 /* Get private info created by planner functions. */
2500 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2501 FdwDirectModifyPrivateUpdateSql));
2502 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2503 FdwDirectModifyPrivateHasReturning));
2504 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2505 FdwDirectModifyPrivateRetrievedAttrs);
2506 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2507 FdwDirectModifyPrivateSetProcessed));
2508
2509 /* Create context for per-tuple temp workspace. */
2510 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2511 "postgres_fdw temporary data",
2512 ALLOCSET_SMALL_SIZES);
2513
2514 /* Prepare for input conversion of RETURNING results. */
2515 if (dmstate->has_returning)
2516 {
2517 TupleDesc tupdesc;
2518
2519 if (fsplan->scan.scanrelid == 0)
2520 tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2521 else
2522 tupdesc = RelationGetDescr(dmstate->rel);
2523
2524 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2525
2526 /*
2527 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2528 * initialize a filter to extract an updated/deleted tuple from a scan
2529 * tuple.
2530 */
2531 if (fsplan->scan.scanrelid == 0)
2532 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2533 }
2534
2535 /*
2536 * Prepare for processing of parameters used in remote query, if any.
2537 */
2538 numParams = list_length(fsplan->fdw_exprs);
2539 dmstate->numParams = numParams;
2540 if (numParams > 0)
2541 prepare_query_params((PlanState *) node,
2542 fsplan->fdw_exprs,
2543 numParams,
2544 &dmstate->param_flinfo,
2545 &dmstate->param_exprs,
2546 &dmstate->param_values);
2547 }
2548
2549 /*
2550 * postgresIterateDirectModify
2551 * Execute a direct foreign table modification
2552 */
2553 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2554 postgresIterateDirectModify(ForeignScanState *node)
2555 {
2556 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2557 EState *estate = node->ss.ps.state;
2558 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2559
2560 /*
2561 * If this is the first call after Begin, execute the statement.
2562 */
2563 if (dmstate->num_tuples == -1)
2564 execute_dml_stmt(node);
2565
2566 /*
2567 * If the local query doesn't specify RETURNING, just clear tuple slot.
2568 */
2569 if (!resultRelInfo->ri_projectReturning)
2570 {
2571 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2572 Instrumentation *instr = node->ss.ps.instrument;
2573
2574 Assert(!dmstate->has_returning);
2575
2576 /* Increment the command es_processed count if necessary. */
2577 if (dmstate->set_processed)
2578 estate->es_processed += dmstate->num_tuples;
2579
2580 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2581 if (instr)
2582 instr->tuplecount += dmstate->num_tuples;
2583
2584 return ExecClearTuple(slot);
2585 }
2586
2587 /*
2588 * Get the next RETURNING tuple.
2589 */
2590 return get_returning_data(node);
2591 }
2592
2593 /*
2594 * postgresEndDirectModify
2595 * Finish a direct foreign table modification
2596 */
2597 static void
postgresEndDirectModify(ForeignScanState * node)2598 postgresEndDirectModify(ForeignScanState *node)
2599 {
2600 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2601
2602 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2603 if (dmstate == NULL)
2604 return;
2605
2606 /* Release PGresult */
2607 if (dmstate->result)
2608 PQclear(dmstate->result);
2609
2610 /* Release remote connection */
2611 ReleaseConnection(dmstate->conn);
2612 dmstate->conn = NULL;
2613
2614 /* close the target relation. */
2615 if (dmstate->resultRel)
2616 ExecCloseScanRelation(dmstate->resultRel);
2617
2618 /* MemoryContext will be deleted automatically. */
2619 }
2620
2621 /*
2622 * postgresExplainForeignScan
2623 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2624 */
2625 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2626 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2627 {
2628 List *fdw_private;
2629 char *sql;
2630 char *relations;
2631
2632 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2633
2634 /*
2635 * Add names of relation handled by the foreign scan when the scan is a
2636 * join
2637 */
2638 if (list_length(fdw_private) > FdwScanPrivateRelations)
2639 {
2640 relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2641 ExplainPropertyText("Relations", relations, es);
2642 }
2643
2644 /*
2645 * Add remote query, when VERBOSE option is specified.
2646 */
2647 if (es->verbose)
2648 {
2649 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2650 ExplainPropertyText("Remote SQL", sql, es);
2651 }
2652 }
2653
2654 /*
2655 * postgresExplainForeignModify
2656 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2657 */
2658 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2659 postgresExplainForeignModify(ModifyTableState *mtstate,
2660 ResultRelInfo *rinfo,
2661 List *fdw_private,
2662 int subplan_index,
2663 ExplainState *es)
2664 {
2665 if (es->verbose)
2666 {
2667 char *sql = strVal(list_nth(fdw_private,
2668 FdwModifyPrivateUpdateSql));
2669
2670 ExplainPropertyText("Remote SQL", sql, es);
2671 }
2672 }
2673
2674 /*
2675 * postgresExplainDirectModify
2676 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2677 * foreign table directly
2678 */
2679 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2680 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2681 {
2682 List *fdw_private;
2683 char *sql;
2684
2685 if (es->verbose)
2686 {
2687 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2688 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2689 ExplainPropertyText("Remote SQL", sql, es);
2690 }
2691 }
2692
2693
2694 /*
2695 * estimate_path_cost_size
2696 * Get cost and size estimates for a foreign scan on given foreign relation
2697 * either a base relation or a join between foreign relations or an upper
2698 * relation containing foreign relations.
2699 *
2700 * param_join_conds are the parameterization clauses with outer relations.
2701 * pathkeys specify the expected sort order if any for given path being costed.
2702 *
2703 * The function returns the cost and size estimates in p_rows, p_width,
2704 * p_startup_cost and p_total_cost variables.
2705 */
2706 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2707 estimate_path_cost_size(PlannerInfo *root,
2708 RelOptInfo *foreignrel,
2709 List *param_join_conds,
2710 List *pathkeys,
2711 double *p_rows, int *p_width,
2712 Cost *p_startup_cost, Cost *p_total_cost)
2713 {
2714 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2715 double rows;
2716 double retrieved_rows;
2717 int width;
2718 Cost startup_cost;
2719 Cost total_cost;
2720 Cost cpu_per_tuple;
2721
2722 /*
2723 * If the table or the server is configured to use remote estimates,
2724 * connect to the foreign server and execute EXPLAIN to estimate the
2725 * number of rows selected by the restriction+join clauses. Otherwise,
2726 * estimate rows using whatever statistics we have locally, in a way
2727 * similar to ordinary tables.
2728 */
2729 if (fpinfo->use_remote_estimate)
2730 {
2731 List *remote_param_join_conds;
2732 List *local_param_join_conds;
2733 StringInfoData sql;
2734 PGconn *conn;
2735 Selectivity local_sel;
2736 QualCost local_cost;
2737 List *fdw_scan_tlist = NIL;
2738 List *remote_conds;
2739
2740 /* Required only to be passed to deparseSelectStmtForRel */
2741 List *retrieved_attrs;
2742
2743 /*
2744 * param_join_conds might contain both clauses that are safe to send
2745 * across, and clauses that aren't.
2746 */
2747 classifyConditions(root, foreignrel, param_join_conds,
2748 &remote_param_join_conds, &local_param_join_conds);
2749
2750 /* Build the list of columns to be fetched from the foreign server. */
2751 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2752 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2753 else
2754 fdw_scan_tlist = NIL;
2755
2756 /*
2757 * The complete list of remote conditions includes everything from
2758 * baserestrictinfo plus any extra join_conds relevant to this
2759 * particular path.
2760 */
2761 remote_conds = list_concat(list_copy(remote_param_join_conds),
2762 fpinfo->remote_conds);
2763
2764 /*
2765 * Construct EXPLAIN query including the desired SELECT, FROM, and
2766 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2767 * values, so don't request params_list.
2768 */
2769 initStringInfo(&sql);
2770 appendStringInfoString(&sql, "EXPLAIN ");
2771 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2772 remote_conds, pathkeys, false,
2773 &retrieved_attrs, NULL);
2774
2775 /* Get the remote estimate */
2776 conn = GetConnection(fpinfo->user, false);
2777 get_remote_estimate(sql.data, conn, &rows, &width,
2778 &startup_cost, &total_cost);
2779 ReleaseConnection(conn);
2780
2781 retrieved_rows = rows;
2782
2783 /* Factor in the selectivity of the locally-checked quals */
2784 local_sel = clauselist_selectivity(root,
2785 local_param_join_conds,
2786 foreignrel->relid,
2787 JOIN_INNER,
2788 NULL);
2789 local_sel *= fpinfo->local_conds_sel;
2790
2791 rows = clamp_row_est(rows * local_sel);
2792
2793 /* Add in the eval cost of the locally-checked quals */
2794 startup_cost += fpinfo->local_conds_cost.startup;
2795 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2796 cost_qual_eval(&local_cost, local_param_join_conds, root);
2797 startup_cost += local_cost.startup;
2798 total_cost += local_cost.per_tuple * retrieved_rows;
2799 }
2800 else
2801 {
2802 Cost run_cost = 0;
2803
2804 /*
2805 * We don't support join conditions in this mode (hence, no
2806 * parameterized paths can be made).
2807 */
2808 Assert(param_join_conds == NIL);
2809
2810 /*
2811 * Use rows/width estimates made by set_baserel_size_estimates() for
2812 * base foreign relations and set_joinrel_size_estimates() for join
2813 * between foreign relations.
2814 */
2815 rows = foreignrel->rows;
2816 width = foreignrel->reltarget->width;
2817
2818 /* Back into an estimate of the number of retrieved rows. */
2819 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2820
2821 /*
2822 * We will come here again and again with different set of pathkeys
2823 * that caller wants to cost. We don't need to calculate the cost of
2824 * bare scan each time. Instead, use the costs if we have cached them
2825 * already.
2826 */
2827 if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2828 {
2829 startup_cost = fpinfo->rel_startup_cost;
2830 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2831 }
2832 else if (IS_JOIN_REL(foreignrel))
2833 {
2834 PgFdwRelationInfo *fpinfo_i;
2835 PgFdwRelationInfo *fpinfo_o;
2836 QualCost join_cost;
2837 QualCost remote_conds_cost;
2838 double nrows;
2839
2840 /* For join we expect inner and outer relations set */
2841 Assert(fpinfo->innerrel && fpinfo->outerrel);
2842
2843 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2844 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2845
2846 /* Estimate of number of rows in cross product */
2847 nrows = fpinfo_i->rows * fpinfo_o->rows;
2848 /* Clamp retrieved rows estimate to at most size of cross product */
2849 retrieved_rows = Min(retrieved_rows, nrows);
2850
2851 /*
2852 * The cost of foreign join is estimated as cost of generating
2853 * rows for the joining relations + cost for applying quals on the
2854 * rows.
2855 */
2856
2857 /*
2858 * Calculate the cost of clauses pushed down to the foreign server
2859 */
2860 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2861 /* Calculate the cost of applying join clauses */
2862 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2863
2864 /*
2865 * Startup cost includes startup cost of joining relations and the
2866 * startup cost for join and other clauses. We do not include the
2867 * startup cost specific to join strategy (e.g. setting up hash
2868 * tables) since we do not know what strategy the foreign server
2869 * is going to use.
2870 */
2871 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2872 startup_cost += join_cost.startup;
2873 startup_cost += remote_conds_cost.startup;
2874 startup_cost += fpinfo->local_conds_cost.startup;
2875
2876 /*
2877 * Run time cost includes:
2878 *
2879 * 1. Run time cost (total_cost - startup_cost) of relations being
2880 * joined
2881 *
2882 * 2. Run time cost of applying join clauses on the cross product
2883 * of the joining relations.
2884 *
2885 * 3. Run time cost of applying pushed down other clauses on the
2886 * result of join
2887 *
2888 * 4. Run time cost of applying nonpushable other clauses locally
2889 * on the result fetched from the foreign server.
2890 */
2891 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2892 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2893 run_cost += nrows * join_cost.per_tuple;
2894 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2895 run_cost += nrows * remote_conds_cost.per_tuple;
2896 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2897 }
2898 else if (IS_UPPER_REL(foreignrel))
2899 {
2900 PgFdwRelationInfo *ofpinfo;
2901 PathTarget *ptarget = foreignrel->reltarget;
2902 AggClauseCosts aggcosts;
2903 double input_rows;
2904 int numGroupCols;
2905 double numGroups = 1;
2906
2907 /* Make sure the core code set the pathtarget. */
2908 Assert(ptarget != NULL);
2909
2910 /*
2911 * This cost model is mixture of costing done for sorted and
2912 * hashed aggregates in cost_agg(). We are not sure which
2913 * strategy will be considered at remote side, thus for
2914 * simplicity, we put all startup related costs in startup_cost
2915 * and all finalization and run cost are added in total_cost.
2916 *
2917 * Also, core does not care about costing HAVING expressions and
2918 * adding that to the costs. So similarly, here too we are not
2919 * considering remote and local conditions for costing.
2920 */
2921
2922 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2923
2924 /* Get rows and width from input rel */
2925 input_rows = ofpinfo->rows;
2926 width = ofpinfo->width;
2927
2928 /* Collect statistics about aggregates for estimating costs. */
2929 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2930 if (root->parse->hasAggs)
2931 {
2932 get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2933 AGGSPLIT_SIMPLE, &aggcosts);
2934
2935 /*
2936 * The cost of aggregates in the HAVING qual will be the same
2937 * for each child as it is for the parent, so there's no need
2938 * to use a translated version of havingQual.
2939 */
2940 get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2941 AGGSPLIT_SIMPLE, &aggcosts);
2942 }
2943
2944 /* Get number of grouping columns and possible number of groups */
2945 numGroupCols = list_length(root->parse->groupClause);
2946 numGroups = estimate_num_groups(root,
2947 get_sortgrouplist_exprs(root->parse->groupClause,
2948 fpinfo->grouped_tlist),
2949 input_rows, NULL);
2950
2951 /*
2952 * Number of rows expected from foreign server will be same as
2953 * that of number of groups.
2954 */
2955 rows = retrieved_rows = numGroups;
2956
2957 /*-----
2958 * Startup cost includes:
2959 * 1. Startup cost for underneath input relation
2960 * 2. Cost of performing aggregation, per cost_agg()
2961 * 3. Startup cost for PathTarget eval
2962 *-----
2963 */
2964 startup_cost = ofpinfo->rel_startup_cost;
2965 startup_cost += aggcosts.transCost.startup;
2966 startup_cost += aggcosts.transCost.per_tuple * input_rows;
2967 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2968 startup_cost += ptarget->cost.startup;
2969
2970 /*-----
2971 * Run time cost includes:
2972 * 1. Run time cost of underneath input relation
2973 * 2. Run time cost of performing aggregation, per cost_agg()
2974 * 3. PathTarget eval cost for each output row
2975 *-----
2976 */
2977 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2978 run_cost += aggcosts.finalCost * numGroups;
2979 run_cost += cpu_tuple_cost * numGroups;
2980 run_cost += ptarget->cost.per_tuple * numGroups;
2981 }
2982 else
2983 {
2984 /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2985 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2986
2987 /*
2988 * Cost as though this were a seqscan, which is pessimistic. We
2989 * effectively imagine the local_conds are being evaluated
2990 * remotely, too.
2991 */
2992 startup_cost = 0;
2993 run_cost = 0;
2994 run_cost += seq_page_cost * foreignrel->pages;
2995
2996 startup_cost += foreignrel->baserestrictcost.startup;
2997 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2998 run_cost += cpu_per_tuple * foreignrel->tuples;
2999 }
3000
3001 /*
3002 * Without remote estimates, we have no real way to estimate the cost
3003 * of generating sorted output. It could be free if the query plan
3004 * the remote side would have chosen generates properly-sorted output
3005 * anyway, but in most cases it will cost something. Estimate a value
3006 * high enough that we won't pick the sorted path when the ordering
3007 * isn't locally useful, but low enough that we'll err on the side of
3008 * pushing down the ORDER BY clause when it's useful to do so.
3009 */
3010 if (pathkeys != NIL)
3011 {
3012 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3013 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3014 }
3015
3016 total_cost = startup_cost + run_cost;
3017 }
3018
3019 /*
3020 * Cache the costs for scans without any pathkeys or parameterization
3021 * before adding the costs for transferring data from the foreign server.
3022 * These costs are useful for costing the join between this relation and
3023 * another foreign relation or to calculate the costs of paths with
3024 * pathkeys for this relation, when the costs can not be obtained from the
3025 * foreign server. This function will be called at least once for every
3026 * foreign relation without pathkeys and parameterization.
3027 */
3028 if (pathkeys == NIL && param_join_conds == NIL)
3029 {
3030 fpinfo->rel_startup_cost = startup_cost;
3031 fpinfo->rel_total_cost = total_cost;
3032 }
3033
3034 /*
3035 * Add some additional cost factors to account for connection overhead
3036 * (fdw_startup_cost), transferring data across the network
3037 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3038 * (cpu_tuple_cost per retrieved row).
3039 */
3040 startup_cost += fpinfo->fdw_startup_cost;
3041 total_cost += fpinfo->fdw_startup_cost;
3042 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3043 total_cost += cpu_tuple_cost * retrieved_rows;
3044
3045 /* Return results. */
3046 *p_rows = rows;
3047 *p_width = width;
3048 *p_startup_cost = startup_cost;
3049 *p_total_cost = total_cost;
3050 }
3051
3052 /*
3053 * Estimate costs of executing a SQL statement remotely.
3054 * The given "sql" must be an EXPLAIN command.
3055 */
3056 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)3057 get_remote_estimate(const char *sql, PGconn *conn,
3058 double *rows, int *width,
3059 Cost *startup_cost, Cost *total_cost)
3060 {
3061 PGresult *volatile res = NULL;
3062
3063 /* PGresult must be released before leaving this function. */
3064 PG_TRY();
3065 {
3066 char *line;
3067 char *p;
3068 int n;
3069
3070 /*
3071 * Execute EXPLAIN remotely.
3072 */
3073 res = pgfdw_exec_query(conn, sql);
3074 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3075 pgfdw_report_error(ERROR, res, conn, false, sql);
3076
3077 /*
3078 * Extract cost numbers for topmost plan node. Note we search for a
3079 * left paren from the end of the line to avoid being confused by
3080 * other uses of parentheses.
3081 */
3082 line = PQgetvalue(res, 0, 0);
3083 p = strrchr(line, '(');
3084 if (p == NULL)
3085 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3086 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3087 startup_cost, total_cost, rows, width);
3088 if (n != 4)
3089 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3090
3091 PQclear(res);
3092 res = NULL;
3093 }
3094 PG_CATCH();
3095 {
3096 if (res)
3097 PQclear(res);
3098 PG_RE_THROW();
3099 }
3100 PG_END_TRY();
3101 }
3102
3103 /*
3104 * Detect whether we want to process an EquivalenceClass member.
3105 *
3106 * This is a callback for use by generate_implied_equalities_for_column.
3107 */
3108 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)3109 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3110 EquivalenceClass *ec, EquivalenceMember *em,
3111 void *arg)
3112 {
3113 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3114 Expr *expr = em->em_expr;
3115
3116 /*
3117 * If we've identified what we're processing in the current scan, we only
3118 * want to match that expression.
3119 */
3120 if (state->current != NULL)
3121 return equal(expr, state->current);
3122
3123 /*
3124 * Otherwise, ignore anything we've already processed.
3125 */
3126 if (list_member(state->already_used, expr))
3127 return false;
3128
3129 /* This is the new target to process. */
3130 state->current = expr;
3131 return true;
3132 }
3133
3134 /*
3135 * Create cursor for node's query with current parameter values.
3136 */
3137 static void
create_cursor(ForeignScanState * node)3138 create_cursor(ForeignScanState *node)
3139 {
3140 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3141 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3142 int numParams = fsstate->numParams;
3143 const char **values = fsstate->param_values;
3144 PGconn *conn = fsstate->conn;
3145 StringInfoData buf;
3146 PGresult *res;
3147
3148 /*
3149 * Construct array of query parameter values in text format. We do the
3150 * conversions in the short-lived per-tuple context, so as not to cause a
3151 * memory leak over repeated scans.
3152 */
3153 if (numParams > 0)
3154 {
3155 MemoryContext oldcontext;
3156
3157 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3158
3159 process_query_params(econtext,
3160 fsstate->param_flinfo,
3161 fsstate->param_exprs,
3162 values);
3163
3164 MemoryContextSwitchTo(oldcontext);
3165 }
3166
3167 /* Construct the DECLARE CURSOR command */
3168 initStringInfo(&buf);
3169 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3170 fsstate->cursor_number, fsstate->query);
3171
3172 /*
3173 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3174 * to infer types for all parameters. Since we explicitly cast every
3175 * parameter (see deparse.c), the "inference" is trivial and will produce
3176 * the desired result. This allows us to avoid assuming that the remote
3177 * server has the same OIDs we do for the parameters' types.
3178 */
3179 if (!PQsendQueryParams(conn, buf.data, numParams,
3180 NULL, values, NULL, NULL, 0))
3181 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3182
3183 /*
3184 * Get the result, and check for success.
3185 *
3186 * We don't use a PG_TRY block here, so be careful not to throw error
3187 * without releasing the PGresult.
3188 */
3189 res = pgfdw_get_result(conn, buf.data);
3190 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3191 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3192 PQclear(res);
3193
3194 /* Mark the cursor as created, and show no tuples have been retrieved */
3195 fsstate->cursor_exists = true;
3196 fsstate->tuples = NULL;
3197 fsstate->num_tuples = 0;
3198 fsstate->next_tuple = 0;
3199 fsstate->fetch_ct_2 = 0;
3200 fsstate->eof_reached = false;
3201
3202 /* Clean up */
3203 pfree(buf.data);
3204 }
3205
3206 /*
3207 * Fetch some more rows from the node's cursor.
3208 */
3209 static void
fetch_more_data(ForeignScanState * node)3210 fetch_more_data(ForeignScanState *node)
3211 {
3212 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3213 PGresult *volatile res = NULL;
3214 MemoryContext oldcontext;
3215
3216 /*
3217 * We'll store the tuples in the batch_cxt. First, flush the previous
3218 * batch.
3219 */
3220 fsstate->tuples = NULL;
3221 MemoryContextReset(fsstate->batch_cxt);
3222 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3223
3224 /* PGresult must be released before leaving this function. */
3225 PG_TRY();
3226 {
3227 PGconn *conn = fsstate->conn;
3228 char sql[64];
3229 int numrows;
3230 int i;
3231
3232 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3233 fsstate->fetch_size, fsstate->cursor_number);
3234
3235 res = pgfdw_exec_query(conn, sql);
3236 /* On error, report the original query, not the FETCH. */
3237 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3238 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3239
3240 /* Convert the data into HeapTuples */
3241 numrows = PQntuples(res);
3242 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3243 fsstate->num_tuples = numrows;
3244 fsstate->next_tuple = 0;
3245
3246 for (i = 0; i < numrows; i++)
3247 {
3248 Assert(IsA(node->ss.ps.plan, ForeignScan));
3249
3250 fsstate->tuples[i] =
3251 make_tuple_from_result_row(res, i,
3252 fsstate->rel,
3253 fsstate->attinmeta,
3254 fsstate->retrieved_attrs,
3255 node,
3256 fsstate->temp_cxt);
3257 }
3258
3259 /* Update fetch_ct_2 */
3260 if (fsstate->fetch_ct_2 < 2)
3261 fsstate->fetch_ct_2++;
3262
3263 /* Must be EOF if we didn't get as many tuples as we asked for. */
3264 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3265
3266 PQclear(res);
3267 res = NULL;
3268 }
3269 PG_CATCH();
3270 {
3271 if (res)
3272 PQclear(res);
3273 PG_RE_THROW();
3274 }
3275 PG_END_TRY();
3276
3277 MemoryContextSwitchTo(oldcontext);
3278 }
3279
3280 /*
3281 * Force assorted GUC parameters to settings that ensure that we'll output
3282 * data values in a form that is unambiguous to the remote server.
3283 *
3284 * This is rather expensive and annoying to do once per row, but there's
3285 * little choice if we want to be sure values are transmitted accurately;
3286 * we can't leave the settings in place between rows for fear of affecting
3287 * user-visible computations.
3288 *
3289 * We use the equivalent of a function SET option to allow the settings to
3290 * persist only until the caller calls reset_transmission_modes(). If an
3291 * error is thrown in between, guc.c will take care of undoing the settings.
3292 *
3293 * The return value is the nestlevel that must be passed to
3294 * reset_transmission_modes() to undo things.
3295 */
3296 int
set_transmission_modes(void)3297 set_transmission_modes(void)
3298 {
3299 int nestlevel = NewGUCNestLevel();
3300
3301 /*
3302 * The values set here should match what pg_dump does. See also
3303 * configure_remote_session in connection.c.
3304 */
3305 if (DateStyle != USE_ISO_DATES)
3306 (void) set_config_option("datestyle", "ISO",
3307 PGC_USERSET, PGC_S_SESSION,
3308 GUC_ACTION_SAVE, true, 0, false);
3309 if (IntervalStyle != INTSTYLE_POSTGRES)
3310 (void) set_config_option("intervalstyle", "postgres",
3311 PGC_USERSET, PGC_S_SESSION,
3312 GUC_ACTION_SAVE, true, 0, false);
3313 if (extra_float_digits < 3)
3314 (void) set_config_option("extra_float_digits", "3",
3315 PGC_USERSET, PGC_S_SESSION,
3316 GUC_ACTION_SAVE, true, 0, false);
3317
3318 return nestlevel;
3319 }
3320
3321 /*
3322 * Undo the effects of set_transmission_modes().
3323 */
3324 void
reset_transmission_modes(int nestlevel)3325 reset_transmission_modes(int nestlevel)
3326 {
3327 AtEOXact_GUC(true, nestlevel);
3328 }
3329
3330 /*
3331 * Utility routine to close a cursor.
3332 */
3333 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3334 close_cursor(PGconn *conn, unsigned int cursor_number)
3335 {
3336 char sql[64];
3337 PGresult *res;
3338
3339 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3340
3341 /*
3342 * We don't use a PG_TRY block here, so be careful not to throw error
3343 * without releasing the PGresult.
3344 */
3345 res = pgfdw_exec_query(conn, sql);
3346 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3347 pgfdw_report_error(ERROR, res, conn, true, sql);
3348 PQclear(res);
3349 }
3350
3351 /*
3352 * create_foreign_modify
3353 * Construct an execution state of a foreign insert/update/delete
3354 * operation
3355 */
3356 static PgFdwModifyState *
create_foreign_modify(EState * estate,RangeTblEntry * rte,ResultRelInfo * resultRelInfo,CmdType operation,Plan * subplan,char * query,List * target_attrs,bool has_returning,List * retrieved_attrs)3357 create_foreign_modify(EState *estate,
3358 RangeTblEntry *rte,
3359 ResultRelInfo *resultRelInfo,
3360 CmdType operation,
3361 Plan *subplan,
3362 char *query,
3363 List *target_attrs,
3364 bool has_returning,
3365 List *retrieved_attrs)
3366 {
3367 PgFdwModifyState *fmstate;
3368 Relation rel = resultRelInfo->ri_RelationDesc;
3369 TupleDesc tupdesc = RelationGetDescr(rel);
3370 Oid userid;
3371 ForeignTable *table;
3372 UserMapping *user;
3373 AttrNumber n_params;
3374 Oid typefnoid;
3375 bool isvarlena;
3376 ListCell *lc;
3377
3378 /* Begin constructing PgFdwModifyState. */
3379 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3380 fmstate->rel = rel;
3381
3382 /*
3383 * Identify which user to do the remote access as. This should match what
3384 * ExecCheckRTEPerms() does.
3385 */
3386 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3387
3388 /* Get info about foreign table. */
3389 table = GetForeignTable(RelationGetRelid(rel));
3390 user = GetUserMapping(userid, table->serverid);
3391
3392 /* Open connection; report that we'll create a prepared statement. */
3393 fmstate->conn = GetConnection(user, true);
3394 fmstate->p_name = NULL; /* prepared statement not made yet */
3395
3396 /* Set up remote query information. */
3397 fmstate->query = query;
3398 fmstate->target_attrs = target_attrs;
3399 fmstate->has_returning = has_returning;
3400 fmstate->retrieved_attrs = retrieved_attrs;
3401
3402 /* Create context for per-tuple temp workspace. */
3403 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3404 "postgres_fdw temporary data",
3405 ALLOCSET_SMALL_SIZES);
3406
3407 /* Prepare for input conversion of RETURNING results. */
3408 if (fmstate->has_returning)
3409 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3410
3411 /* Prepare for output conversion of parameters used in prepared stmt. */
3412 n_params = list_length(fmstate->target_attrs) + 1;
3413 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3414 fmstate->p_nums = 0;
3415
3416 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3417 {
3418 Assert(subplan != NULL);
3419
3420 /* Find the ctid resjunk column in the subplan's result */
3421 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3422 "ctid");
3423 if (!AttributeNumberIsValid(fmstate->ctidAttno))
3424 elog(ERROR, "could not find junk ctid column");
3425
3426 /* First transmittable parameter will be ctid */
3427 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3428 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3429 fmstate->p_nums++;
3430 }
3431
3432 if (operation == CMD_INSERT || operation == CMD_UPDATE)
3433 {
3434 /* Set up for remaining transmittable parameters */
3435 foreach(lc, fmstate->target_attrs)
3436 {
3437 int attnum = lfirst_int(lc);
3438 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3439
3440 Assert(!attr->attisdropped);
3441
3442 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3443 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3444 fmstate->p_nums++;
3445 }
3446 }
3447
3448 Assert(fmstate->p_nums <= n_params);
3449
3450 /* Initialize auxiliary state */
3451 fmstate->aux_fmstate = NULL;
3452
3453 return fmstate;
3454 }
3455
3456 /*
3457 * prepare_foreign_modify
3458 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3459 */
3460 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3461 prepare_foreign_modify(PgFdwModifyState *fmstate)
3462 {
3463 char prep_name[NAMEDATALEN];
3464 char *p_name;
3465 PGresult *res;
3466
3467 /* Construct name we'll use for the prepared statement. */
3468 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3469 GetPrepStmtNumber(fmstate->conn));
3470 p_name = pstrdup(prep_name);
3471
3472 /*
3473 * We intentionally do not specify parameter types here, but leave the
3474 * remote server to derive them by default. This avoids possible problems
3475 * with the remote server using different type OIDs than we do. All of
3476 * the prepared statements we use in this module are simple enough that
3477 * the remote server will make the right choices.
3478 */
3479 if (!PQsendPrepare(fmstate->conn,
3480 p_name,
3481 fmstate->query,
3482 0,
3483 NULL))
3484 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3485
3486 /*
3487 * Get the result, and check for success.
3488 *
3489 * We don't use a PG_TRY block here, so be careful not to throw error
3490 * without releasing the PGresult.
3491 */
3492 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3493 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3494 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3495 PQclear(res);
3496
3497 /* This action shows that the prepare has been done. */
3498 fmstate->p_name = p_name;
3499 }
3500
3501 /*
3502 * convert_prep_stmt_params
3503 * Create array of text strings representing parameter values
3504 *
3505 * tupleid is ctid to send, or NULL if none
3506 * slot is slot to get remaining parameters from, or NULL if none
3507 *
3508 * Data is constructed in temp_cxt; caller should reset that after use.
3509 */
3510 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3511 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3512 ItemPointer tupleid,
3513 TupleTableSlot *slot)
3514 {
3515 const char **p_values;
3516 int pindex = 0;
3517 MemoryContext oldcontext;
3518
3519 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3520
3521 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3522
3523 /* 1st parameter should be ctid, if it's in use */
3524 if (tupleid != NULL)
3525 {
3526 /* don't need set_transmission_modes for TID output */
3527 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3528 PointerGetDatum(tupleid));
3529 pindex++;
3530 }
3531
3532 /* get following parameters from slot */
3533 if (slot != NULL && fmstate->target_attrs != NIL)
3534 {
3535 int nestlevel;
3536 ListCell *lc;
3537
3538 nestlevel = set_transmission_modes();
3539
3540 foreach(lc, fmstate->target_attrs)
3541 {
3542 int attnum = lfirst_int(lc);
3543 Datum value;
3544 bool isnull;
3545
3546 value = slot_getattr(slot, attnum, &isnull);
3547 if (isnull)
3548 p_values[pindex] = NULL;
3549 else
3550 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3551 value);
3552 pindex++;
3553 }
3554
3555 reset_transmission_modes(nestlevel);
3556 }
3557
3558 Assert(pindex == fmstate->p_nums);
3559
3560 MemoryContextSwitchTo(oldcontext);
3561
3562 return p_values;
3563 }
3564
3565 /*
3566 * store_returning_result
3567 * Store the result of a RETURNING clause
3568 *
3569 * On error, be sure to release the PGresult on the way out. Callers do not
3570 * have PG_TRY blocks to ensure this happens.
3571 */
3572 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3573 store_returning_result(PgFdwModifyState *fmstate,
3574 TupleTableSlot *slot, PGresult *res)
3575 {
3576 PG_TRY();
3577 {
3578 HeapTuple newtup;
3579
3580 newtup = make_tuple_from_result_row(res, 0,
3581 fmstate->rel,
3582 fmstate->attinmeta,
3583 fmstate->retrieved_attrs,
3584 NULL,
3585 fmstate->temp_cxt);
3586 /* tuple will be deleted when it is cleared from the slot */
3587 ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3588 }
3589 PG_CATCH();
3590 {
3591 if (res)
3592 PQclear(res);
3593 PG_RE_THROW();
3594 }
3595 PG_END_TRY();
3596 }
3597
3598 /*
3599 * finish_foreign_modify
3600 * Release resources for a foreign insert/update/delete operation
3601 */
3602 static void
finish_foreign_modify(PgFdwModifyState * fmstate)3603 finish_foreign_modify(PgFdwModifyState *fmstate)
3604 {
3605 Assert(fmstate != NULL);
3606
3607 /* If we created a prepared statement, destroy it */
3608 if (fmstate->p_name)
3609 {
3610 char sql[64];
3611 PGresult *res;
3612
3613 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3614
3615 /*
3616 * We don't use a PG_TRY block here, so be careful not to throw error
3617 * without releasing the PGresult.
3618 */
3619 res = pgfdw_exec_query(fmstate->conn, sql);
3620 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3621 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3622 PQclear(res);
3623 fmstate->p_name = NULL;
3624 }
3625
3626 /* Release remote connection */
3627 ReleaseConnection(fmstate->conn);
3628 fmstate->conn = NULL;
3629 }
3630
3631 /*
3632 * build_remote_returning
3633 * Build a RETURNING targetlist of a remote query for performing an
3634 * UPDATE/DELETE .. RETURNING on a join directly
3635 */
3636 static List *
build_remote_returning(Index rtindex,Relation rel,List * returningList)3637 build_remote_returning(Index rtindex, Relation rel, List *returningList)
3638 {
3639 bool have_wholerow = false;
3640 List *tlist = NIL;
3641 List *vars;
3642 ListCell *lc;
3643
3644 Assert(returningList);
3645
3646 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
3647
3648 /*
3649 * If there's a whole-row reference to the target relation, then we'll
3650 * need all the columns of the relation.
3651 */
3652 foreach(lc, vars)
3653 {
3654 Var *var = (Var *) lfirst(lc);
3655
3656 if (IsA(var, Var) &&
3657 var->varno == rtindex &&
3658 var->varattno == InvalidAttrNumber)
3659 {
3660 have_wholerow = true;
3661 break;
3662 }
3663 }
3664
3665 if (have_wholerow)
3666 {
3667 TupleDesc tupdesc = RelationGetDescr(rel);
3668 int i;
3669
3670 for (i = 1; i <= tupdesc->natts; i++)
3671 {
3672 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
3673 Var *var;
3674
3675 /* Ignore dropped attributes. */
3676 if (attr->attisdropped)
3677 continue;
3678
3679 var = makeVar(rtindex,
3680 i,
3681 attr->atttypid,
3682 attr->atttypmod,
3683 attr->attcollation,
3684 0);
3685
3686 tlist = lappend(tlist,
3687 makeTargetEntry((Expr *) var,
3688 list_length(tlist) + 1,
3689 NULL,
3690 false));
3691 }
3692 }
3693
3694 /* Now add any remaining columns to tlist. */
3695 foreach(lc, vars)
3696 {
3697 Var *var = (Var *) lfirst(lc);
3698
3699 /*
3700 * No need for whole-row references to the target relation. We don't
3701 * need system columns other than ctid and oid either, since those are
3702 * set locally.
3703 */
3704 if (IsA(var, Var) &&
3705 var->varno == rtindex &&
3706 var->varattno <= InvalidAttrNumber &&
3707 var->varattno != SelfItemPointerAttributeNumber &&
3708 var->varattno != ObjectIdAttributeNumber)
3709 continue; /* don't need it */
3710
3711 if (tlist_member((Expr *) var, tlist))
3712 continue; /* already got it */
3713
3714 tlist = lappend(tlist,
3715 makeTargetEntry((Expr *) var,
3716 list_length(tlist) + 1,
3717 NULL,
3718 false));
3719 }
3720
3721 list_free(vars);
3722
3723 return tlist;
3724 }
3725
3726 /*
3727 * rebuild_fdw_scan_tlist
3728 * Build new fdw_scan_tlist of given foreign-scan plan node from given
3729 * tlist
3730 *
3731 * There might be columns that the fdw_scan_tlist of the given foreign-scan
3732 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
3733 * have contained resjunk columns such as 'ctid' of the target relation and
3734 * 'wholerow' of non-target relations, but the tlist might not contain them,
3735 * for example. So, adjust the tlist so it contains all the columns specified
3736 * in the fdw_scan_tlist; else setrefs.c will get confused.
3737 */
3738 static void
rebuild_fdw_scan_tlist(ForeignScan * fscan,List * tlist)3739 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
3740 {
3741 List *new_tlist = tlist;
3742 List *old_tlist = fscan->fdw_scan_tlist;
3743 ListCell *lc;
3744
3745 foreach(lc, old_tlist)
3746 {
3747 TargetEntry *tle = (TargetEntry *) lfirst(lc);
3748
3749 if (tlist_member(tle->expr, new_tlist))
3750 continue; /* already got it */
3751
3752 new_tlist = lappend(new_tlist,
3753 makeTargetEntry(tle->expr,
3754 list_length(new_tlist) + 1,
3755 NULL,
3756 false));
3757 }
3758 fscan->fdw_scan_tlist = new_tlist;
3759 }
3760
3761 /*
3762 * Execute a direct UPDATE/DELETE statement.
3763 */
3764 static void
execute_dml_stmt(ForeignScanState * node)3765 execute_dml_stmt(ForeignScanState *node)
3766 {
3767 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3768 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3769 int numParams = dmstate->numParams;
3770 const char **values = dmstate->param_values;
3771
3772 /*
3773 * Construct array of query parameter values in text format.
3774 */
3775 if (numParams > 0)
3776 process_query_params(econtext,
3777 dmstate->param_flinfo,
3778 dmstate->param_exprs,
3779 values);
3780
3781 /*
3782 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3783 * to infer types for all parameters. Since we explicitly cast every
3784 * parameter (see deparse.c), the "inference" is trivial and will produce
3785 * the desired result. This allows us to avoid assuming that the remote
3786 * server has the same OIDs we do for the parameters' types.
3787 */
3788 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3789 NULL, values, NULL, NULL, 0))
3790 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3791
3792 /*
3793 * Get the result, and check for success.
3794 *
3795 * We don't use a PG_TRY block here, so be careful not to throw error
3796 * without releasing the PGresult.
3797 */
3798 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3799 if (PQresultStatus(dmstate->result) !=
3800 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3801 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3802 dmstate->query);
3803
3804 /* Get the number of rows affected. */
3805 if (dmstate->has_returning)
3806 dmstate->num_tuples = PQntuples(dmstate->result);
3807 else
3808 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3809 }
3810
3811 /*
3812 * Get the result of a RETURNING clause.
3813 */
3814 static TupleTableSlot *
get_returning_data(ForeignScanState * node)3815 get_returning_data(ForeignScanState *node)
3816 {
3817 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3818 EState *estate = node->ss.ps.state;
3819 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3820 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3821 TupleTableSlot *resultSlot;
3822
3823 Assert(resultRelInfo->ri_projectReturning);
3824
3825 /* If we didn't get any tuples, must be end of data. */
3826 if (dmstate->next_tuple >= dmstate->num_tuples)
3827 return ExecClearTuple(slot);
3828
3829 /* Increment the command es_processed count if necessary. */
3830 if (dmstate->set_processed)
3831 estate->es_processed += 1;
3832
3833 /*
3834 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3835 * tuple. (has_returning is false when the local query is of the form
3836 * "UPDATE/DELETE .. RETURNING 1" for example.)
3837 */
3838 if (!dmstate->has_returning)
3839 {
3840 ExecStoreAllNullTuple(slot);
3841 resultSlot = slot;
3842 }
3843 else
3844 {
3845 /*
3846 * On error, be sure to release the PGresult on the way out. Callers
3847 * do not have PG_TRY blocks to ensure this happens.
3848 */
3849 PG_TRY();
3850 {
3851 HeapTuple newtup;
3852
3853 newtup = make_tuple_from_result_row(dmstate->result,
3854 dmstate->next_tuple,
3855 dmstate->rel,
3856 dmstate->attinmeta,
3857 dmstate->retrieved_attrs,
3858 node,
3859 dmstate->temp_cxt);
3860 ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3861 }
3862 PG_CATCH();
3863 {
3864 if (dmstate->result)
3865 PQclear(dmstate->result);
3866 PG_RE_THROW();
3867 }
3868 PG_END_TRY();
3869
3870 /* Get the updated/deleted tuple. */
3871 if (dmstate->rel)
3872 resultSlot = slot;
3873 else
3874 resultSlot = apply_returning_filter(dmstate, slot, estate);
3875 }
3876 dmstate->next_tuple++;
3877
3878 /* Make slot available for evaluation of the local query RETURNING list. */
3879 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
3880 resultSlot;
3881
3882 return slot;
3883 }
3884
3885 /*
3886 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
3887 */
3888 static void
init_returning_filter(PgFdwDirectModifyState * dmstate,List * fdw_scan_tlist,Index rtindex)3889 init_returning_filter(PgFdwDirectModifyState *dmstate,
3890 List *fdw_scan_tlist,
3891 Index rtindex)
3892 {
3893 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3894 ListCell *lc;
3895 int i;
3896
3897 /*
3898 * Calculate the mapping between the fdw_scan_tlist's entries and the
3899 * result tuple's attributes.
3900 *
3901 * The "map" is an array of indexes of the result tuple's attributes in
3902 * fdw_scan_tlist, i.e., one entry for every attribute of the result
3903 * tuple. We store zero for any attributes that don't have the
3904 * corresponding entries in that list, marking that a NULL is needed in
3905 * the result tuple.
3906 *
3907 * Also get the indexes of the entries for ctid and oid if any.
3908 */
3909 dmstate->attnoMap = (AttrNumber *)
3910 palloc0(resultTupType->natts * sizeof(AttrNumber));
3911
3912 dmstate->ctidAttno = dmstate->oidAttno = 0;
3913
3914 i = 1;
3915 dmstate->hasSystemCols = false;
3916 foreach(lc, fdw_scan_tlist)
3917 {
3918 TargetEntry *tle = (TargetEntry *) lfirst(lc);
3919 Var *var = (Var *) tle->expr;
3920
3921 Assert(IsA(var, Var));
3922
3923 /*
3924 * If the Var is a column of the target relation to be retrieved from
3925 * the foreign server, get the index of the entry.
3926 */
3927 if (var->varno == rtindex &&
3928 list_member_int(dmstate->retrieved_attrs, i))
3929 {
3930 int attrno = var->varattno;
3931
3932 if (attrno < 0)
3933 {
3934 /*
3935 * We don't retrieve system columns other than ctid and oid.
3936 */
3937 if (attrno == SelfItemPointerAttributeNumber)
3938 dmstate->ctidAttno = i;
3939 else if (attrno == ObjectIdAttributeNumber)
3940 dmstate->oidAttno = i;
3941 else
3942 Assert(false);
3943 dmstate->hasSystemCols = true;
3944 }
3945 else
3946 {
3947 /*
3948 * We don't retrieve whole-row references to the target
3949 * relation either.
3950 */
3951 Assert(attrno > 0);
3952
3953 dmstate->attnoMap[attrno - 1] = i;
3954 }
3955 }
3956 i++;
3957 }
3958 }
3959
3960 /*
3961 * Extract and return an updated/deleted tuple from a scan tuple.
3962 */
3963 static TupleTableSlot *
apply_returning_filter(PgFdwDirectModifyState * dmstate,TupleTableSlot * slot,EState * estate)3964 apply_returning_filter(PgFdwDirectModifyState *dmstate,
3965 TupleTableSlot *slot,
3966 EState *estate)
3967 {
3968 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
3969 TupleTableSlot *resultSlot;
3970 Datum *values;
3971 bool *isnull;
3972 Datum *old_values;
3973 bool *old_isnull;
3974 int i;
3975
3976 /*
3977 * Use the trigger tuple slot as a place to store the result tuple.
3978 */
3979 resultSlot = estate->es_trig_tuple_slot;
3980 if (resultSlot->tts_tupleDescriptor != resultTupType)
3981 ExecSetSlotDescriptor(resultSlot, resultTupType);
3982
3983 /*
3984 * Extract all the values of the scan tuple.
3985 */
3986 slot_getallattrs(slot);
3987 old_values = slot->tts_values;
3988 old_isnull = slot->tts_isnull;
3989
3990 /*
3991 * Prepare to build the result tuple.
3992 */
3993 ExecClearTuple(resultSlot);
3994 values = resultSlot->tts_values;
3995 isnull = resultSlot->tts_isnull;
3996
3997 /*
3998 * Transpose data into proper fields of the result tuple.
3999 */
4000 for (i = 0; i < resultTupType->natts; i++)
4001 {
4002 int j = dmstate->attnoMap[i];
4003
4004 if (j == 0)
4005 {
4006 values[i] = (Datum) 0;
4007 isnull[i] = true;
4008 }
4009 else
4010 {
4011 values[i] = old_values[j - 1];
4012 isnull[i] = old_isnull[j - 1];
4013 }
4014 }
4015
4016 /*
4017 * Build the virtual tuple.
4018 */
4019 ExecStoreVirtualTuple(resultSlot);
4020
4021 /*
4022 * If we have any system columns to return, install them.
4023 */
4024 if (dmstate->hasSystemCols)
4025 {
4026 HeapTuple resultTup = ExecMaterializeSlot(resultSlot);
4027
4028 /* ctid */
4029 if (dmstate->ctidAttno)
4030 {
4031 ItemPointer ctid = NULL;
4032
4033 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4034 resultTup->t_self = *ctid;
4035 }
4036
4037 /* oid */
4038 if (dmstate->oidAttno)
4039 {
4040 Oid oid = InvalidOid;
4041
4042 oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
4043 HeapTupleSetOid(resultTup, oid);
4044 }
4045
4046 /*
4047 * And remaining columns
4048 *
4049 * Note: since we currently don't allow the target relation to appear
4050 * on the nullable side of an outer join, any system columns wouldn't
4051 * go to NULL.
4052 *
4053 * Note: no need to care about tableoid here because it will be
4054 * initialized in ExecProcessReturning().
4055 */
4056 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4057 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4058 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4059 }
4060
4061 /*
4062 * And return the result tuple.
4063 */
4064 return resultSlot;
4065 }
4066
4067 /*
4068 * Prepare for processing of parameters used in remote query.
4069 */
4070 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)4071 prepare_query_params(PlanState *node,
4072 List *fdw_exprs,
4073 int numParams,
4074 FmgrInfo **param_flinfo,
4075 List **param_exprs,
4076 const char ***param_values)
4077 {
4078 int i;
4079 ListCell *lc;
4080
4081 Assert(numParams > 0);
4082
4083 /* Prepare for output conversion of parameters used in remote query. */
4084 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4085
4086 i = 0;
4087 foreach(lc, fdw_exprs)
4088 {
4089 Node *param_expr = (Node *) lfirst(lc);
4090 Oid typefnoid;
4091 bool isvarlena;
4092
4093 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4094 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4095 i++;
4096 }
4097
4098 /*
4099 * Prepare remote-parameter expressions for evaluation. (Note: in
4100 * practice, we expect that all these expressions will be just Params, so
4101 * we could possibly do something more efficient than using the full
4102 * expression-eval machinery for this. But probably there would be little
4103 * benefit, and it'd require postgres_fdw to know more than is desirable
4104 * about Param evaluation.)
4105 */
4106 *param_exprs = ExecInitExprList(fdw_exprs, node);
4107
4108 /* Allocate buffer for text form of query parameters. */
4109 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4110 }
4111
4112 /*
4113 * Construct array of query parameter values in text format.
4114 */
4115 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)4116 process_query_params(ExprContext *econtext,
4117 FmgrInfo *param_flinfo,
4118 List *param_exprs,
4119 const char **param_values)
4120 {
4121 int nestlevel;
4122 int i;
4123 ListCell *lc;
4124
4125 nestlevel = set_transmission_modes();
4126
4127 i = 0;
4128 foreach(lc, param_exprs)
4129 {
4130 ExprState *expr_state = (ExprState *) lfirst(lc);
4131 Datum expr_value;
4132 bool isNull;
4133
4134 /* Evaluate the parameter expression */
4135 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4136
4137 /*
4138 * Get string representation of each parameter value by invoking
4139 * type-specific output function, unless the value is null.
4140 */
4141 if (isNull)
4142 param_values[i] = NULL;
4143 else
4144 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
4145
4146 i++;
4147 }
4148
4149 reset_transmission_modes(nestlevel);
4150 }
4151
4152 /*
4153 * postgresAnalyzeForeignTable
4154 * Test whether analyzing this foreign table is supported
4155 */
4156 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)4157 postgresAnalyzeForeignTable(Relation relation,
4158 AcquireSampleRowsFunc *func,
4159 BlockNumber *totalpages)
4160 {
4161 ForeignTable *table;
4162 UserMapping *user;
4163 PGconn *conn;
4164 StringInfoData sql;
4165 PGresult *volatile res = NULL;
4166
4167 /* Return the row-analysis function pointer */
4168 *func = postgresAcquireSampleRowsFunc;
4169
4170 /*
4171 * Now we have to get the number of pages. It's annoying that the ANALYZE
4172 * API requires us to return that now, because it forces some duplication
4173 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4174 * it's probably not worth redefining that API at this point.
4175 */
4176
4177 /*
4178 * Get the connection to use. We do the remote access as the table's
4179 * owner, even if the ANALYZE was started by some other user.
4180 */
4181 table = GetForeignTable(RelationGetRelid(relation));
4182 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4183 conn = GetConnection(user, false);
4184
4185 /*
4186 * Construct command to get page count for relation.
4187 */
4188 initStringInfo(&sql);
4189 deparseAnalyzeSizeSql(&sql, relation);
4190
4191 /* In what follows, do not risk leaking any PGresults. */
4192 PG_TRY();
4193 {
4194 res = pgfdw_exec_query(conn, sql.data);
4195 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4196 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4197
4198 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4199 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4200 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4201
4202 PQclear(res);
4203 res = NULL;
4204 }
4205 PG_CATCH();
4206 {
4207 if (res)
4208 PQclear(res);
4209 PG_RE_THROW();
4210 }
4211 PG_END_TRY();
4212
4213 ReleaseConnection(conn);
4214
4215 return true;
4216 }
4217
4218 /*
4219 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4220 *
4221 * We fetch the whole table from the remote side and pick out some sample rows.
4222 *
4223 * Selected rows are returned in the caller-allocated array rows[],
4224 * which must have at least targrows entries.
4225 * The actual number of rows selected is returned as the function result.
4226 * We also count the total number of rows in the table and return it into
4227 * *totalrows. Note that *totaldeadrows is always set to 0.
4228 *
4229 * Note that the returned list of rows is not always in order by physical
4230 * position in the table. Therefore, correlation estimates derived later
4231 * may be meaningless, but it's OK because we don't use the estimates
4232 * currently (the planner only pays attention to correlation for indexscans).
4233 */
4234 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)4235 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4236 HeapTuple *rows, int targrows,
4237 double *totalrows,
4238 double *totaldeadrows)
4239 {
4240 PgFdwAnalyzeState astate;
4241 ForeignTable *table;
4242 ForeignServer *server;
4243 UserMapping *user;
4244 PGconn *conn;
4245 unsigned int cursor_number;
4246 StringInfoData sql;
4247 PGresult *volatile res = NULL;
4248
4249 /* Initialize workspace state */
4250 astate.rel = relation;
4251 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4252
4253 astate.rows = rows;
4254 astate.targrows = targrows;
4255 astate.numrows = 0;
4256 astate.samplerows = 0;
4257 astate.rowstoskip = -1; /* -1 means not set yet */
4258 reservoir_init_selection_state(&astate.rstate, targrows);
4259
4260 /* Remember ANALYZE context, and create a per-tuple temp context */
4261 astate.anl_cxt = CurrentMemoryContext;
4262 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4263 "postgres_fdw temporary data",
4264 ALLOCSET_SMALL_SIZES);
4265
4266 /*
4267 * Get the connection to use. We do the remote access as the table's
4268 * owner, even if the ANALYZE was started by some other user.
4269 */
4270 table = GetForeignTable(RelationGetRelid(relation));
4271 server = GetForeignServer(table->serverid);
4272 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4273 conn = GetConnection(user, false);
4274
4275 /*
4276 * Construct cursor that retrieves whole rows from remote.
4277 */
4278 cursor_number = GetCursorNumber(conn);
4279 initStringInfo(&sql);
4280 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
4281 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
4282
4283 /* In what follows, do not risk leaking any PGresults. */
4284 PG_TRY();
4285 {
4286 res = pgfdw_exec_query(conn, sql.data);
4287 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4288 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4289 PQclear(res);
4290 res = NULL;
4291
4292 /* Retrieve and process rows a batch at a time. */
4293 for (;;)
4294 {
4295 char fetch_sql[64];
4296 int fetch_size;
4297 int numrows;
4298 int i;
4299 ListCell *lc;
4300
4301 /* Allow users to cancel long query */
4302 CHECK_FOR_INTERRUPTS();
4303
4304 /*
4305 * XXX possible future improvement: if rowstoskip is large, we
4306 * could issue a MOVE rather than physically fetching the rows,
4307 * then just adjust rowstoskip and samplerows appropriately.
4308 */
4309
4310 /* The fetch size is arbitrary, but shouldn't be enormous. */
4311 fetch_size = 100;
4312 foreach(lc, server->options)
4313 {
4314 DefElem *def = (DefElem *) lfirst(lc);
4315
4316 if (strcmp(def->defname, "fetch_size") == 0)
4317 {
4318 fetch_size = strtol(defGetString(def), NULL, 10);
4319 break;
4320 }
4321 }
4322 foreach(lc, table->options)
4323 {
4324 DefElem *def = (DefElem *) lfirst(lc);
4325
4326 if (strcmp(def->defname, "fetch_size") == 0)
4327 {
4328 fetch_size = strtol(defGetString(def), NULL, 10);
4329 break;
4330 }
4331 }
4332
4333 /* Fetch some rows */
4334 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
4335 fetch_size, cursor_number);
4336
4337 res = pgfdw_exec_query(conn, fetch_sql);
4338 /* On error, report the original query, not the FETCH. */
4339 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4340 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4341
4342 /* Process whatever we got. */
4343 numrows = PQntuples(res);
4344 for (i = 0; i < numrows; i++)
4345 analyze_row_processor(res, i, &astate);
4346
4347 PQclear(res);
4348 res = NULL;
4349
4350 /* Must be EOF if we didn't get all the rows requested. */
4351 if (numrows < fetch_size)
4352 break;
4353 }
4354
4355 /* Close the cursor, just to be tidy. */
4356 close_cursor(conn, cursor_number);
4357 }
4358 PG_CATCH();
4359 {
4360 if (res)
4361 PQclear(res);
4362 PG_RE_THROW();
4363 }
4364 PG_END_TRY();
4365
4366 ReleaseConnection(conn);
4367
4368 /* We assume that we have no dead tuple. */
4369 *totaldeadrows = 0.0;
4370
4371 /* We've retrieved all living tuples from foreign server. */
4372 *totalrows = astate.samplerows;
4373
4374 /*
4375 * Emit some interesting relation info
4376 */
4377 ereport(elevel,
4378 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
4379 RelationGetRelationName(relation),
4380 astate.samplerows, astate.numrows)));
4381
4382 return astate.numrows;
4383 }
4384
4385 /*
4386 * Collect sample rows from the result of query.
4387 * - Use all tuples in sample until target # of samples are collected.
4388 * - Subsequently, replace already-sampled tuples randomly.
4389 */
4390 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)4391 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
4392 {
4393 int targrows = astate->targrows;
4394 int pos; /* array index to store tuple in */
4395 MemoryContext oldcontext;
4396
4397 /* Always increment sample row counter. */
4398 astate->samplerows += 1;
4399
4400 /*
4401 * Determine the slot where this sample row should be stored. Set pos to
4402 * negative value to indicate the row should be skipped.
4403 */
4404 if (astate->numrows < targrows)
4405 {
4406 /* First targrows rows are always included into the sample */
4407 pos = astate->numrows++;
4408 }
4409 else
4410 {
4411 /*
4412 * Now we start replacing tuples in the sample until we reach the end
4413 * of the relation. Same algorithm as in acquire_sample_rows in
4414 * analyze.c; see Jeff Vitter's paper.
4415 */
4416 if (astate->rowstoskip < 0)
4417 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
4418
4419 if (astate->rowstoskip <= 0)
4420 {
4421 /* Choose a random reservoir element to replace. */
4422 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
4423 Assert(pos >= 0 && pos < targrows);
4424 heap_freetuple(astate->rows[pos]);
4425 }
4426 else
4427 {
4428 /* Skip this tuple. */
4429 pos = -1;
4430 }
4431
4432 astate->rowstoskip -= 1;
4433 }
4434
4435 if (pos >= 0)
4436 {
4437 /*
4438 * Create sample tuple from current result row, and store it in the
4439 * position determined above. The tuple has to be created in anl_cxt.
4440 */
4441 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
4442
4443 astate->rows[pos] = make_tuple_from_result_row(res, row,
4444 astate->rel,
4445 astate->attinmeta,
4446 astate->retrieved_attrs,
4447 NULL,
4448 astate->temp_cxt);
4449
4450 MemoryContextSwitchTo(oldcontext);
4451 }
4452 }
4453
4454 /*
4455 * Import a foreign schema
4456 */
4457 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)4458 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
4459 {
4460 List *commands = NIL;
4461 bool import_collate = true;
4462 bool import_default = false;
4463 bool import_not_null = true;
4464 ForeignServer *server;
4465 UserMapping *mapping;
4466 PGconn *conn;
4467 StringInfoData buf;
4468 PGresult *volatile res = NULL;
4469 int numrows,
4470 i;
4471 ListCell *lc;
4472
4473 /* Parse statement options */
4474 foreach(lc, stmt->options)
4475 {
4476 DefElem *def = (DefElem *) lfirst(lc);
4477
4478 if (strcmp(def->defname, "import_collate") == 0)
4479 import_collate = defGetBoolean(def);
4480 else if (strcmp(def->defname, "import_default") == 0)
4481 import_default = defGetBoolean(def);
4482 else if (strcmp(def->defname, "import_not_null") == 0)
4483 import_not_null = defGetBoolean(def);
4484 else
4485 ereport(ERROR,
4486 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
4487 errmsg("invalid option \"%s\"", def->defname)));
4488 }
4489
4490 /*
4491 * Get connection to the foreign server. Connection manager will
4492 * establish new connection if necessary.
4493 */
4494 server = GetForeignServer(serverOid);
4495 mapping = GetUserMapping(GetUserId(), server->serverid);
4496 conn = GetConnection(mapping, false);
4497
4498 /* Don't attempt to import collation if remote server hasn't got it */
4499 if (PQserverVersion(conn) < 90100)
4500 import_collate = false;
4501
4502 /* Create workspace for strings */
4503 initStringInfo(&buf);
4504
4505 /* In what follows, do not risk leaking any PGresults. */
4506 PG_TRY();
4507 {
4508 /* Check that the schema really exists */
4509 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
4510 deparseStringLiteral(&buf, stmt->remote_schema);
4511
4512 res = pgfdw_exec_query(conn, buf.data);
4513 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4514 pgfdw_report_error(ERROR, res, conn, false, buf.data);
4515
4516 if (PQntuples(res) != 1)
4517 ereport(ERROR,
4518 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
4519 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
4520 stmt->remote_schema, server->servername)));
4521
4522 PQclear(res);
4523 res = NULL;
4524 resetStringInfo(&buf);
4525
4526 /*
4527 * Fetch all table data from this schema, possibly restricted by
4528 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
4529 * to EXCEPT/LIMIT TO here, because the core code will filter the
4530 * statements we return according to those lists anyway. But it
4531 * should save a few cycles to not process excluded tables in the
4532 * first place.)
4533 *
4534 * Ignore table data for partitions and only include the definitions
4535 * of the root partitioned tables to allow access to the complete
4536 * remote data set locally in the schema imported.
4537 *
4538 * Note: because we run the connection with search_path restricted to
4539 * pg_catalog, the format_type() and pg_get_expr() outputs will always
4540 * include a schema name for types/functions in other schemas, which
4541 * is what we want.
4542 */
4543 if (import_collate)
4544 appendStringInfoString(&buf,
4545 "SELECT relname, "
4546 " attname, "
4547 " format_type(atttypid, atttypmod), "
4548 " attnotnull, "
4549 " pg_get_expr(adbin, adrelid), "
4550 " collname, "
4551 " collnsp.nspname "
4552 "FROM pg_class c "
4553 " JOIN pg_namespace n ON "
4554 " relnamespace = n.oid "
4555 " LEFT JOIN pg_attribute a ON "
4556 " attrelid = c.oid AND attnum > 0 "
4557 " AND NOT attisdropped "
4558 " LEFT JOIN pg_attrdef ad ON "
4559 " adrelid = c.oid AND adnum = attnum "
4560 " LEFT JOIN pg_collation coll ON "
4561 " coll.oid = attcollation "
4562 " LEFT JOIN pg_namespace collnsp ON "
4563 " collnsp.oid = collnamespace ");
4564 else
4565 appendStringInfoString(&buf,
4566 "SELECT relname, "
4567 " attname, "
4568 " format_type(atttypid, atttypmod), "
4569 " attnotnull, "
4570 " pg_get_expr(adbin, adrelid), "
4571 " NULL, NULL "
4572 "FROM pg_class c "
4573 " JOIN pg_namespace n ON "
4574 " relnamespace = n.oid "
4575 " LEFT JOIN pg_attribute a ON "
4576 " attrelid = c.oid AND attnum > 0 "
4577 " AND NOT attisdropped "
4578 " LEFT JOIN pg_attrdef ad ON "
4579 " adrelid = c.oid AND adnum = attnum ");
4580
4581 appendStringInfoString(&buf,
4582 "WHERE c.relkind IN ("
4583 CppAsString2(RELKIND_RELATION) ","
4584 CppAsString2(RELKIND_VIEW) ","
4585 CppAsString2(RELKIND_FOREIGN_TABLE) ","
4586 CppAsString2(RELKIND_MATVIEW) ","
4587 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
4588 " AND n.nspname = ");
4589 deparseStringLiteral(&buf, stmt->remote_schema);
4590
4591 /* Partitions are supported since Postgres 10 */
4592 if (PQserverVersion(conn) >= 100000)
4593 appendStringInfoString(&buf, " AND NOT c.relispartition ");
4594
4595 /* Apply restrictions for LIMIT TO and EXCEPT */
4596 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
4597 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4598 {
4599 bool first_item = true;
4600
4601 appendStringInfoString(&buf, " AND c.relname ");
4602 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
4603 appendStringInfoString(&buf, "NOT ");
4604 appendStringInfoString(&buf, "IN (");
4605
4606 /* Append list of table names within IN clause */
4607 foreach(lc, stmt->table_list)
4608 {
4609 RangeVar *rv = (RangeVar *) lfirst(lc);
4610
4611 if (first_item)
4612 first_item = false;
4613 else
4614 appendStringInfoString(&buf, ", ");
4615 deparseStringLiteral(&buf, rv->relname);
4616 }
4617 appendStringInfoChar(&buf, ')');
4618 }
4619
4620 /* Append ORDER BY at the end of query to ensure output ordering */
4621 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
4622
4623 /* Fetch the data */
4624 res = pgfdw_exec_query(conn, buf.data);
4625 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4626 pgfdw_report_error(ERROR, res, conn, false, buf.data);
4627
4628 /* Process results */
4629 numrows = PQntuples(res);
4630 /* note: incrementation of i happens in inner loop's while() test */
4631 for (i = 0; i < numrows;)
4632 {
4633 char *tablename = PQgetvalue(res, i, 0);
4634 bool first_item = true;
4635
4636 resetStringInfo(&buf);
4637 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
4638 quote_identifier(tablename));
4639
4640 /* Scan all rows for this table */
4641 do
4642 {
4643 char *attname;
4644 char *typename;
4645 char *attnotnull;
4646 char *attdefault;
4647 char *collname;
4648 char *collnamespace;
4649
4650 /* If table has no columns, we'll see nulls here */
4651 if (PQgetisnull(res, i, 1))
4652 continue;
4653
4654 attname = PQgetvalue(res, i, 1);
4655 typename = PQgetvalue(res, i, 2);
4656 attnotnull = PQgetvalue(res, i, 3);
4657 attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
4658 PQgetvalue(res, i, 4);
4659 collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4660 PQgetvalue(res, i, 5);
4661 collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4662 PQgetvalue(res, i, 6);
4663
4664 if (first_item)
4665 first_item = false;
4666 else
4667 appendStringInfoString(&buf, ",\n");
4668
4669 /* Print column name and type */
4670 appendStringInfo(&buf, " %s %s",
4671 quote_identifier(attname),
4672 typename);
4673
4674 /*
4675 * Add column_name option so that renaming the foreign table's
4676 * column doesn't break the association to the underlying
4677 * column.
4678 */
4679 appendStringInfoString(&buf, " OPTIONS (column_name ");
4680 deparseStringLiteral(&buf, attname);
4681 appendStringInfoChar(&buf, ')');
4682
4683 /* Add COLLATE if needed */
4684 if (import_collate && collname != NULL && collnamespace != NULL)
4685 appendStringInfo(&buf, " COLLATE %s.%s",
4686 quote_identifier(collnamespace),
4687 quote_identifier(collname));
4688
4689 /* Add DEFAULT if needed */
4690 if (import_default && attdefault != NULL)
4691 appendStringInfo(&buf, " DEFAULT %s", attdefault);
4692
4693 /* Add NOT NULL if needed */
4694 if (import_not_null && attnotnull[0] == 't')
4695 appendStringInfoString(&buf, " NOT NULL");
4696 }
4697 while (++i < numrows &&
4698 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4699
4700 /*
4701 * Add server name and table-level options. We specify remote
4702 * schema and table name as options (the latter to ensure that
4703 * renaming the foreign table doesn't break the association).
4704 */
4705 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4706 quote_identifier(server->servername));
4707
4708 appendStringInfoString(&buf, "schema_name ");
4709 deparseStringLiteral(&buf, stmt->remote_schema);
4710 appendStringInfoString(&buf, ", table_name ");
4711 deparseStringLiteral(&buf, tablename);
4712
4713 appendStringInfoString(&buf, ");");
4714
4715 commands = lappend(commands, pstrdup(buf.data));
4716 }
4717
4718 /* Clean up */
4719 PQclear(res);
4720 res = NULL;
4721 }
4722 PG_CATCH();
4723 {
4724 if (res)
4725 PQclear(res);
4726 PG_RE_THROW();
4727 }
4728 PG_END_TRY();
4729
4730 ReleaseConnection(conn);
4731
4732 return commands;
4733 }
4734
4735 /*
4736 * Assess whether the join between inner and outer relations can be pushed down
4737 * to the foreign server. As a side effect, save information we obtain in this
4738 * function to PgFdwRelationInfo passed in.
4739 */
4740 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)4741 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
4742 RelOptInfo *outerrel, RelOptInfo *innerrel,
4743 JoinPathExtraData *extra)
4744 {
4745 PgFdwRelationInfo *fpinfo;
4746 PgFdwRelationInfo *fpinfo_o;
4747 PgFdwRelationInfo *fpinfo_i;
4748 ListCell *lc;
4749 List *joinclauses;
4750
4751 /*
4752 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4753 * Constructing queries representing SEMI and ANTI joins is hard, hence
4754 * not considered right now.
4755 */
4756 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4757 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4758 return false;
4759
4760 /*
4761 * If either of the joining relations is marked as unsafe to pushdown, the
4762 * join can not be pushed down.
4763 */
4764 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4765 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4766 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4767 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4768 !fpinfo_i || !fpinfo_i->pushdown_safe)
4769 return false;
4770
4771 /*
4772 * If joining relations have local conditions, those conditions are
4773 * required to be applied before joining the relations. Hence the join can
4774 * not be pushed down.
4775 */
4776 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4777 return false;
4778
4779 /*
4780 * Merge FDW options. We might be tempted to do this after we have deemed
4781 * the foreign join to be OK. But we must do this beforehand so that we
4782 * know which quals can be evaluated on the foreign server, which might
4783 * depend on shippable_extensions.
4784 */
4785 fpinfo->server = fpinfo_o->server;
4786 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4787
4788 /*
4789 * Separate restrict list into join quals and pushed-down (other) quals.
4790 *
4791 * Join quals belonging to an outer join must all be shippable, else we
4792 * cannot execute the join remotely. Add such quals to 'joinclauses'.
4793 *
4794 * Add other quals to fpinfo->remote_conds if they are shippable, else to
4795 * fpinfo->local_conds. In an inner join it's okay to execute conditions
4796 * either locally or remotely; the same is true for pushed-down conditions
4797 * at an outer join.
4798 *
4799 * Note we might return failure after having already scribbled on
4800 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
4801 * won't consult those lists again if we deem the join unshippable.
4802 */
4803 joinclauses = NIL;
4804 foreach(lc, extra->restrictlist)
4805 {
4806 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4807 bool is_remote_clause = is_foreign_expr(root, joinrel,
4808 rinfo->clause);
4809
4810 if (IS_OUTER_JOIN(jointype) &&
4811 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
4812 {
4813 if (!is_remote_clause)
4814 return false;
4815 joinclauses = lappend(joinclauses, rinfo);
4816 }
4817 else
4818 {
4819 if (is_remote_clause)
4820 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4821 else
4822 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4823 }
4824 }
4825
4826 /*
4827 * deparseExplicitTargetList() isn't smart enough to handle anything other
4828 * than a Var. In particular, if there's some PlaceHolderVar that would
4829 * need to be evaluated within this join tree (because there's an upper
4830 * reference to a quantity that may go to NULL as a result of an outer
4831 * join), then we can't try to push the join down because we'll fail when
4832 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4833 * needs to be evaluated *at the top* of this join tree is OK, because we
4834 * can do that locally after fetching the results from the remote side.
4835 */
4836 foreach(lc, root->placeholder_list)
4837 {
4838 PlaceHolderInfo *phinfo = lfirst(lc);
4839 Relids relids;
4840
4841 /* PlaceHolderInfo refers to parent relids, not child relids. */
4842 relids = IS_OTHER_REL(joinrel) ?
4843 joinrel->top_parent_relids : joinrel->relids;
4844
4845 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4846 bms_nonempty_difference(relids, phinfo->ph_eval_at))
4847 return false;
4848 }
4849
4850 /* Save the join clauses, for later use. */
4851 fpinfo->joinclauses = joinclauses;
4852
4853 fpinfo->outerrel = outerrel;
4854 fpinfo->innerrel = innerrel;
4855 fpinfo->jointype = jointype;
4856
4857 /*
4858 * By default, both the input relations are not required to be deparsed as
4859 * subqueries, but there might be some relations covered by the input
4860 * relations that are required to be deparsed as subqueries, so save the
4861 * relids of those relations for later use by the deparser.
4862 */
4863 fpinfo->make_outerrel_subquery = false;
4864 fpinfo->make_innerrel_subquery = false;
4865 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4866 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4867 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
4868 fpinfo_i->lower_subquery_rels);
4869
4870 /*
4871 * Pull the other remote conditions from the joining relations into join
4872 * clauses or other remote clauses (remote_conds) of this relation
4873 * wherever possible. This avoids building subqueries at every join step.
4874 *
4875 * For an inner join, clauses from both the relations are added to the
4876 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4877 * the outer side are added to remote_conds since those can be evaluated
4878 * after the join is evaluated. The clauses from inner side are added to
4879 * the joinclauses, since they need to be evaluated while constructing the
4880 * join.
4881 *
4882 * For a FULL OUTER JOIN, the other clauses from either relation can not
4883 * be added to the joinclauses or remote_conds, since each relation acts
4884 * as an outer relation for the other.
4885 *
4886 * The joining sides can not have local conditions, thus no need to test
4887 * shippability of the clauses being pulled up.
4888 */
4889 switch (jointype)
4890 {
4891 case JOIN_INNER:
4892 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4893 list_copy(fpinfo_i->remote_conds));
4894 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4895 list_copy(fpinfo_o->remote_conds));
4896 break;
4897
4898 case JOIN_LEFT:
4899 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4900 list_copy(fpinfo_i->remote_conds));
4901 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4902 list_copy(fpinfo_o->remote_conds));
4903 break;
4904
4905 case JOIN_RIGHT:
4906 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4907 list_copy(fpinfo_o->remote_conds));
4908 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4909 list_copy(fpinfo_i->remote_conds));
4910 break;
4911
4912 case JOIN_FULL:
4913
4914 /*
4915 * In this case, if any of the input relations has conditions, we
4916 * need to deparse that relation as a subquery so that the
4917 * conditions can be evaluated before the join. Remember it in
4918 * the fpinfo of this relation so that the deparser can take
4919 * appropriate action. Also, save the relids of base relations
4920 * covered by that relation for later use by the deparser.
4921 */
4922 if (fpinfo_o->remote_conds)
4923 {
4924 fpinfo->make_outerrel_subquery = true;
4925 fpinfo->lower_subquery_rels =
4926 bms_add_members(fpinfo->lower_subquery_rels,
4927 outerrel->relids);
4928 }
4929 if (fpinfo_i->remote_conds)
4930 {
4931 fpinfo->make_innerrel_subquery = true;
4932 fpinfo->lower_subquery_rels =
4933 bms_add_members(fpinfo->lower_subquery_rels,
4934 innerrel->relids);
4935 }
4936 break;
4937
4938 default:
4939 /* Should not happen, we have just checked this above */
4940 elog(ERROR, "unsupported join type %d", jointype);
4941 }
4942
4943 /*
4944 * For an inner join, all restrictions can be treated alike. Treating the
4945 * pushed down conditions as join conditions allows a top level full outer
4946 * join to be deparsed without requiring subqueries.
4947 */
4948 if (jointype == JOIN_INNER)
4949 {
4950 Assert(!fpinfo->joinclauses);
4951 fpinfo->joinclauses = fpinfo->remote_conds;
4952 fpinfo->remote_conds = NIL;
4953 }
4954
4955 /* Mark that this join can be pushed down safely */
4956 fpinfo->pushdown_safe = true;
4957
4958 /* Get user mapping */
4959 if (fpinfo->use_remote_estimate)
4960 {
4961 if (fpinfo_o->use_remote_estimate)
4962 fpinfo->user = fpinfo_o->user;
4963 else
4964 fpinfo->user = fpinfo_i->user;
4965 }
4966 else
4967 fpinfo->user = NULL;
4968
4969 /*
4970 * Set cached relation costs to some negative value, so that we can detect
4971 * when they are set to some sensible costs, during one (usually the
4972 * first) of the calls to estimate_path_cost_size().
4973 */
4974 fpinfo->rel_startup_cost = -1;
4975 fpinfo->rel_total_cost = -1;
4976
4977 /*
4978 * Set the string describing this join relation to be used in EXPLAIN
4979 * output of corresponding ForeignScan.
4980 */
4981 fpinfo->relation_name = makeStringInfo();
4982 appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4983 fpinfo_o->relation_name->data,
4984 get_jointype_name(fpinfo->jointype),
4985 fpinfo_i->relation_name->data);
4986
4987 /*
4988 * Set the relation index. This is defined as the position of this
4989 * joinrel in the join_rel_list list plus the length of the rtable list.
4990 * Note that since this joinrel is at the end of the join_rel_list list
4991 * when we are called, we can get the position by list_length.
4992 */
4993 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4994 fpinfo->relation_index =
4995 list_length(root->parse->rtable) + list_length(root->join_rel_list);
4996
4997 return true;
4998 }
4999
5000 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)5001 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5002 Path *epq_path)
5003 {
5004 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5005 ListCell *lc;
5006
5007 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5008
5009 /* Create one path for each set of pathkeys we found above. */
5010 foreach(lc, useful_pathkeys_list)
5011 {
5012 double rows;
5013 int width;
5014 Cost startup_cost;
5015 Cost total_cost;
5016 List *useful_pathkeys = lfirst(lc);
5017 Path *sorted_epq_path;
5018
5019 estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
5020 &rows, &width, &startup_cost, &total_cost);
5021
5022 /*
5023 * The EPQ path must be at least as well sorted as the path itself, in
5024 * case it gets used as input to a mergejoin.
5025 */
5026 sorted_epq_path = epq_path;
5027 if (sorted_epq_path != NULL &&
5028 !pathkeys_contained_in(useful_pathkeys,
5029 sorted_epq_path->pathkeys))
5030 sorted_epq_path = (Path *)
5031 create_sort_path(root,
5032 rel,
5033 sorted_epq_path,
5034 useful_pathkeys,
5035 -1.0);
5036
5037 add_path(rel, (Path *)
5038 create_foreignscan_path(root, rel,
5039 NULL,
5040 rows,
5041 startup_cost,
5042 total_cost,
5043 useful_pathkeys,
5044 rel->lateral_relids,
5045 sorted_epq_path,
5046 NIL));
5047 }
5048 }
5049
5050 /*
5051 * Parse options from foreign server and apply them to fpinfo.
5052 *
5053 * New options might also require tweaking merge_fdw_options().
5054 */
5055 static void
apply_server_options(PgFdwRelationInfo * fpinfo)5056 apply_server_options(PgFdwRelationInfo *fpinfo)
5057 {
5058 ListCell *lc;
5059
5060 foreach(lc, fpinfo->server->options)
5061 {
5062 DefElem *def = (DefElem *) lfirst(lc);
5063
5064 if (strcmp(def->defname, "use_remote_estimate") == 0)
5065 fpinfo->use_remote_estimate = defGetBoolean(def);
5066 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5067 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
5068 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5069 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
5070 else if (strcmp(def->defname, "extensions") == 0)
5071 fpinfo->shippable_extensions =
5072 ExtractExtensionList(defGetString(def), false);
5073 else if (strcmp(def->defname, "fetch_size") == 0)
5074 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5075 }
5076 }
5077
5078 /*
5079 * Parse options from foreign table and apply them to fpinfo.
5080 *
5081 * New options might also require tweaking merge_fdw_options().
5082 */
5083 static void
apply_table_options(PgFdwRelationInfo * fpinfo)5084 apply_table_options(PgFdwRelationInfo *fpinfo)
5085 {
5086 ListCell *lc;
5087
5088 foreach(lc, fpinfo->table->options)
5089 {
5090 DefElem *def = (DefElem *) lfirst(lc);
5091
5092 if (strcmp(def->defname, "use_remote_estimate") == 0)
5093 fpinfo->use_remote_estimate = defGetBoolean(def);
5094 else if (strcmp(def->defname, "fetch_size") == 0)
5095 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
5096 }
5097 }
5098
5099 /*
5100 * Merge FDW options from input relations into a new set of options for a join
5101 * or an upper rel.
5102 *
5103 * For a join relation, FDW-specific information about the inner and outer
5104 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5105 * fpinfo_o provides the information for the input relation; fpinfo_i is
5106 * expected to NULL.
5107 */
5108 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)5109 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5110 const PgFdwRelationInfo *fpinfo_o,
5111 const PgFdwRelationInfo *fpinfo_i)
5112 {
5113 /* We must always have fpinfo_o. */
5114 Assert(fpinfo_o);
5115
5116 /* fpinfo_i may be NULL, but if present the servers must both match. */
5117 Assert(!fpinfo_i ||
5118 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5119
5120 /*
5121 * Copy the server specific FDW options. (For a join, both relations come
5122 * from the same server, so the server options should have the same value
5123 * for both relations.)
5124 */
5125 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5126 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5127 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5128 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5129 fpinfo->fetch_size = fpinfo_o->fetch_size;
5130
5131 /* Merge the table level options from either side of the join. */
5132 if (fpinfo_i)
5133 {
5134 /*
5135 * We'll prefer to use remote estimates for this join if any table
5136 * from either side of the join is using remote estimates. This is
5137 * most likely going to be preferred since they're already willing to
5138 * pay the price of a round trip to get the remote EXPLAIN. In any
5139 * case it's not entirely clear how we might otherwise handle this
5140 * best.
5141 */
5142 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5143 fpinfo_i->use_remote_estimate;
5144
5145 /*
5146 * Set fetch size to maximum of the joining sides, since we are
5147 * expecting the rows returned by the join to be proportional to the
5148 * relation sizes.
5149 */
5150 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5151 }
5152 }
5153
5154 /*
5155 * postgresGetForeignJoinPaths
5156 * Add possible ForeignPath to joinrel, if join is safe to push down.
5157 */
5158 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)5159 postgresGetForeignJoinPaths(PlannerInfo *root,
5160 RelOptInfo *joinrel,
5161 RelOptInfo *outerrel,
5162 RelOptInfo *innerrel,
5163 JoinType jointype,
5164 JoinPathExtraData *extra)
5165 {
5166 PgFdwRelationInfo *fpinfo;
5167 ForeignPath *joinpath;
5168 double rows;
5169 int width;
5170 Cost startup_cost;
5171 Cost total_cost;
5172 Path *epq_path; /* Path to create plan to be executed when
5173 * EvalPlanQual gets triggered. */
5174
5175 /*
5176 * Skip if this join combination has been considered already.
5177 */
5178 if (joinrel->fdw_private)
5179 return;
5180
5181 /*
5182 * This code does not work for joins with lateral references, since those
5183 * must have parameterized paths, which we don't generate yet.
5184 */
5185 if (!bms_is_empty(joinrel->lateral_relids))
5186 return;
5187
5188 /*
5189 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5190 * that the join relation is already considered, so that we won't waste
5191 * time in judging safety of join pushdown and adding the same paths again
5192 * if found safe. Once we know that this join can be pushed down, we fill
5193 * the entry.
5194 */
5195 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5196 fpinfo->pushdown_safe = false;
5197 joinrel->fdw_private = fpinfo;
5198 /* attrs_used is only for base relations. */
5199 fpinfo->attrs_used = NULL;
5200
5201 /*
5202 * If there is a possibility that EvalPlanQual will be executed, we need
5203 * to be able to reconstruct the row using scans of the base relations.
5204 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5205 * the path list of the joinrel, if one exists. We must be careful to
5206 * call it before adding any ForeignPath, since the ForeignPath might
5207 * dominate the only suitable local path available. We also do it before
5208 * calling foreign_join_ok(), since that function updates fpinfo and marks
5209 * it as pushable if the join is found to be pushable.
5210 */
5211 if (root->parse->commandType == CMD_DELETE ||
5212 root->parse->commandType == CMD_UPDATE ||
5213 root->rowMarks)
5214 {
5215 epq_path = GetExistingLocalJoinPath(joinrel);
5216 if (!epq_path)
5217 {
5218 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
5219 return;
5220 }
5221 }
5222 else
5223 epq_path = NULL;
5224
5225 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
5226 {
5227 /* Free path required for EPQ if we copied one; we don't need it now */
5228 if (epq_path)
5229 pfree(epq_path);
5230 return;
5231 }
5232
5233 /*
5234 * Compute the selectivity and cost of the local_conds, so we don't have
5235 * to do it over again for each path. The best we can do for these
5236 * conditions is to estimate selectivity on the basis of local statistics.
5237 * The local conditions are applied after the join has been computed on
5238 * the remote side like quals in WHERE clause, so pass jointype as
5239 * JOIN_INNER.
5240 */
5241 fpinfo->local_conds_sel = clauselist_selectivity(root,
5242 fpinfo->local_conds,
5243 0,
5244 JOIN_INNER,
5245 NULL);
5246 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
5247
5248 /*
5249 * If we are going to estimate costs locally, estimate the join clause
5250 * selectivity here while we have special join info.
5251 */
5252 if (!fpinfo->use_remote_estimate)
5253 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
5254 0, fpinfo->jointype,
5255 extra->sjinfo);
5256
5257 /* Estimate costs for bare join relation */
5258 estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
5259 &width, &startup_cost, &total_cost);
5260 /* Now update this information in the joinrel */
5261 joinrel->rows = rows;
5262 joinrel->reltarget->width = width;
5263 fpinfo->rows = rows;
5264 fpinfo->width = width;
5265 fpinfo->startup_cost = startup_cost;
5266 fpinfo->total_cost = total_cost;
5267
5268 /*
5269 * Create a new join path and add it to the joinrel which represents a
5270 * join between foreign tables.
5271 */
5272 joinpath = create_foreignscan_path(root,
5273 joinrel,
5274 NULL, /* default pathtarget */
5275 rows,
5276 startup_cost,
5277 total_cost,
5278 NIL, /* no pathkeys */
5279 joinrel->lateral_relids,
5280 epq_path,
5281 NIL); /* no fdw_private */
5282
5283 /* Add generated path into joinrel by add_path(). */
5284 add_path(joinrel, (Path *) joinpath);
5285
5286 /* Consider pathkeys for the join relation */
5287 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
5288
5289 /* XXX Consider parameterized paths for the join relation */
5290 }
5291
5292 /*
5293 * Assess whether the aggregation, grouping and having operations can be pushed
5294 * down to the foreign server. As a side effect, save information we obtain in
5295 * this function to PgFdwRelationInfo of the input relation.
5296 */
5297 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel,Node * havingQual)5298 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
5299 Node *havingQual)
5300 {
5301 Query *query = root->parse;
5302 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
5303 PathTarget *grouping_target = grouped_rel->reltarget;
5304 PgFdwRelationInfo *ofpinfo;
5305 ListCell *lc;
5306 int i;
5307 List *tlist = NIL;
5308
5309 /* We currently don't support pushing Grouping Sets. */
5310 if (query->groupingSets)
5311 return false;
5312
5313 /* Get the fpinfo of the underlying scan relation. */
5314 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
5315
5316 /*
5317 * If underlying scan relation has any local conditions, those conditions
5318 * are required to be applied before performing aggregation. Hence the
5319 * aggregate cannot be pushed down.
5320 */
5321 if (ofpinfo->local_conds)
5322 return false;
5323
5324 /*
5325 * Examine grouping expressions, as well as other expressions we'd need to
5326 * compute, and check whether they are safe to push down to the foreign
5327 * server. All GROUP BY expressions will be part of the grouping target
5328 * and thus there is no need to search for them separately. Add grouping
5329 * expressions into target list which will be passed to foreign server.
5330 *
5331 * A tricky fine point is that we must not put any expression into the
5332 * target list that is just a foreign param (that is, something that
5333 * deparse.c would conclude has to be sent to the foreign server). If we
5334 * do, the expression will also appear in the fdw_exprs list of the plan
5335 * node, and setrefs.c will get confused and decide that the fdw_exprs
5336 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
5337 * a broken plan. Somewhat oddly, it's OK if the expression contains such
5338 * a node, as long as it's not at top level; then no match is possible.
5339 */
5340 i = 0;
5341 foreach(lc, grouping_target->exprs)
5342 {
5343 Expr *expr = (Expr *) lfirst(lc);
5344 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
5345 ListCell *l;
5346
5347 /* Check whether this expression is part of GROUP BY clause */
5348 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
5349 {
5350 TargetEntry *tle;
5351
5352 /*
5353 * If any GROUP BY expression is not shippable, then we cannot
5354 * push down aggregation to the foreign server.
5355 */
5356 if (!is_foreign_expr(root, grouped_rel, expr))
5357 return false;
5358
5359 /*
5360 * If it would be a foreign param, we can't put it into the tlist,
5361 * so we have to fail.
5362 */
5363 if (is_foreign_param(root, grouped_rel, expr))
5364 return false;
5365
5366 /*
5367 * Pushable, so add to tlist. We need to create a TLE for this
5368 * expression and apply the sortgroupref to it. We cannot use
5369 * add_to_flat_tlist() here because that avoids making duplicate
5370 * entries in the tlist. If there are duplicate entries with
5371 * distinct sortgrouprefs, we have to duplicate that situation in
5372 * the output tlist.
5373 */
5374 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
5375 tle->ressortgroupref = sgref;
5376 tlist = lappend(tlist, tle);
5377 }
5378 else
5379 {
5380 /*
5381 * Non-grouping expression we need to compute. Can we ship it
5382 * as-is to the foreign server?
5383 */
5384 if (is_foreign_expr(root, grouped_rel, expr) &&
5385 !is_foreign_param(root, grouped_rel, expr))
5386 {
5387 /* Yes, so add to tlist as-is; OK to suppress duplicates */
5388 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5389 }
5390 else
5391 {
5392 /* Not pushable as a whole; extract its Vars and aggregates */
5393 List *aggvars;
5394
5395 aggvars = pull_var_clause((Node *) expr,
5396 PVC_INCLUDE_AGGREGATES);
5397
5398 /*
5399 * If any aggregate expression is not shippable, then we
5400 * cannot push down aggregation to the foreign server. (We
5401 * don't have to check is_foreign_param, since that certainly
5402 * won't return true for any such expression.)
5403 */
5404 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
5405 return false;
5406
5407 /*
5408 * Add aggregates, if any, into the targetlist. Plain Vars
5409 * outside an aggregate can be ignored, because they should be
5410 * either same as some GROUP BY column or part of some GROUP
5411 * BY expression. In either case, they are already part of
5412 * the targetlist and thus no need to add them again. In fact
5413 * including plain Vars in the tlist when they do not match a
5414 * GROUP BY column would cause the foreign server to complain
5415 * that the shipped query is invalid.
5416 */
5417 foreach(l, aggvars)
5418 {
5419 Expr *expr = (Expr *) lfirst(l);
5420
5421 if (IsA(expr, Aggref))
5422 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5423 }
5424 }
5425 }
5426
5427 i++;
5428 }
5429
5430 /*
5431 * Classify the pushable and non-pushable HAVING clauses and save them in
5432 * remote_conds and local_conds of the grouped rel's fpinfo.
5433 */
5434 if (havingQual)
5435 {
5436 ListCell *lc;
5437
5438 foreach(lc, (List *) havingQual)
5439 {
5440 Expr *expr = (Expr *) lfirst(lc);
5441 RestrictInfo *rinfo;
5442
5443 /*
5444 * Currently, the core code doesn't wrap havingQuals in
5445 * RestrictInfos, so we must make our own.
5446 */
5447 Assert(!IsA(expr, RestrictInfo));
5448 rinfo = make_restrictinfo(expr,
5449 true,
5450 false,
5451 false,
5452 root->qual_security_level,
5453 grouped_rel->relids,
5454 NULL,
5455 NULL);
5456 if (is_foreign_expr(root, grouped_rel, expr))
5457 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5458 else
5459 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5460 }
5461 }
5462
5463 /*
5464 * If there are any local conditions, pull Vars and aggregates from it and
5465 * check whether they are safe to pushdown or not.
5466 */
5467 if (fpinfo->local_conds)
5468 {
5469 List *aggvars = NIL;
5470 ListCell *lc;
5471
5472 foreach(lc, fpinfo->local_conds)
5473 {
5474 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5475
5476 aggvars = list_concat(aggvars,
5477 pull_var_clause((Node *) rinfo->clause,
5478 PVC_INCLUDE_AGGREGATES));
5479 }
5480
5481 foreach(lc, aggvars)
5482 {
5483 Expr *expr = (Expr *) lfirst(lc);
5484
5485 /*
5486 * If aggregates within local conditions are not safe to push
5487 * down, then we cannot push down the query. Vars are already
5488 * part of GROUP BY clause which are checked above, so no need to
5489 * access them again here. Again, we need not check
5490 * is_foreign_param for a foreign aggregate.
5491 */
5492 if (IsA(expr, Aggref))
5493 {
5494 if (!is_foreign_expr(root, grouped_rel, expr))
5495 return false;
5496
5497 tlist = add_to_flat_tlist(tlist, list_make1(expr));
5498 }
5499 }
5500 }
5501
5502 /* Store generated targetlist */
5503 fpinfo->grouped_tlist = tlist;
5504
5505 /* Safe to pushdown */
5506 fpinfo->pushdown_safe = true;
5507
5508 /*
5509 * Set cached relation costs to some negative value, so that we can detect
5510 * when they are set to some sensible costs, during one (usually the
5511 * first) of the calls to estimate_path_cost_size().
5512 */
5513 fpinfo->rel_startup_cost = -1;
5514 fpinfo->rel_total_cost = -1;
5515
5516 /*
5517 * Set the string describing this grouped relation to be used in EXPLAIN
5518 * output of corresponding ForeignScan.
5519 */
5520 fpinfo->relation_name = makeStringInfo();
5521 appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
5522 ofpinfo->relation_name->data);
5523
5524 return true;
5525 }
5526
5527 /*
5528 * postgresGetForeignUpperPaths
5529 * Add paths for post-join operations like aggregation, grouping etc. if
5530 * corresponding operations are safe to push down.
5531 *
5532 * Right now, we only support aggregate, grouping and having clause pushdown.
5533 */
5534 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel,void * extra)5535 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
5536 RelOptInfo *input_rel, RelOptInfo *output_rel,
5537 void *extra)
5538 {
5539 PgFdwRelationInfo *fpinfo;
5540
5541 /*
5542 * If input rel is not safe to pushdown, then simply return as we cannot
5543 * perform any post-join operations on the foreign server.
5544 */
5545 if (!input_rel->fdw_private ||
5546 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
5547 return;
5548
5549 /* Ignore stages we don't support; and skip any duplicate calls. */
5550 if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
5551 return;
5552
5553 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5554 fpinfo->pushdown_safe = false;
5555 output_rel->fdw_private = fpinfo;
5556
5557 add_foreign_grouping_paths(root, input_rel, output_rel,
5558 (GroupPathExtraData *) extra);
5559 }
5560
5561 /*
5562 * add_foreign_grouping_paths
5563 * Add foreign path for grouping and/or aggregation.
5564 *
5565 * Given input_rel represents the underlying scan. The paths are added to the
5566 * given grouped_rel.
5567 */
5568 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel,GroupPathExtraData * extra)5569 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
5570 RelOptInfo *grouped_rel,
5571 GroupPathExtraData *extra)
5572 {
5573 Query *parse = root->parse;
5574 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
5575 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
5576 ForeignPath *grouppath;
5577 double rows;
5578 int width;
5579 Cost startup_cost;
5580 Cost total_cost;
5581
5582 /* Nothing to be done, if there is no grouping or aggregation required. */
5583 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
5584 !root->hasHavingQual)
5585 return;
5586
5587 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
5588 extra->patype == PARTITIONWISE_AGGREGATE_FULL);
5589
5590 /* save the input_rel as outerrel in fpinfo */
5591 fpinfo->outerrel = input_rel;
5592
5593 /*
5594 * Copy foreign table, foreign server, user mapping, FDW options etc.
5595 * details from the input relation's fpinfo.
5596 */
5597 fpinfo->table = ifpinfo->table;
5598 fpinfo->server = ifpinfo->server;
5599 fpinfo->user = ifpinfo->user;
5600 merge_fdw_options(fpinfo, ifpinfo, NULL);
5601
5602 /*
5603 * Assess if it is safe to push down aggregation and grouping.
5604 *
5605 * Use HAVING qual from extra. In case of child partition, it will have
5606 * translated Vars.
5607 */
5608 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
5609 return;
5610
5611 /* Estimate the cost of push down */
5612 estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
5613 &width, &startup_cost, &total_cost);
5614
5615 /* Now update this information in the fpinfo */
5616 fpinfo->rows = rows;
5617 fpinfo->width = width;
5618 fpinfo->startup_cost = startup_cost;
5619 fpinfo->total_cost = total_cost;
5620
5621 /* Create and add foreign path to the grouping relation. */
5622 grouppath = create_foreignscan_path(root,
5623 grouped_rel,
5624 grouped_rel->reltarget,
5625 rows,
5626 startup_cost,
5627 total_cost,
5628 NIL, /* no pathkeys */
5629 grouped_rel->lateral_relids,
5630 NULL,
5631 NIL); /* no fdw_private */
5632
5633 /* Add generated path into grouped_rel by add_path(). */
5634 add_path(grouped_rel, (Path *) grouppath);
5635 }
5636
5637 /*
5638 * Create a tuple from the specified row of the PGresult.
5639 *
5640 * rel is the local representation of the foreign table, attinmeta is
5641 * conversion data for the rel's tupdesc, and retrieved_attrs is an
5642 * integer list of the table column numbers present in the PGresult.
5643 * fsstate is the ForeignScan plan node's execution state.
5644 * temp_context is a working context that can be reset after each tuple.
5645 *
5646 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
5647 * if we're processing a remote join, while fsstate is NULL in a non-query
5648 * context such as ANALYZE, or if we're processing a non-scan query node.
5649 */
5650 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)5651 make_tuple_from_result_row(PGresult *res,
5652 int row,
5653 Relation rel,
5654 AttInMetadata *attinmeta,
5655 List *retrieved_attrs,
5656 ForeignScanState *fsstate,
5657 MemoryContext temp_context)
5658 {
5659 HeapTuple tuple;
5660 TupleDesc tupdesc;
5661 Datum *values;
5662 bool *nulls;
5663 ItemPointer ctid = NULL;
5664 Oid oid = InvalidOid;
5665 ConversionLocation errpos;
5666 ErrorContextCallback errcallback;
5667 MemoryContext oldcontext;
5668 ListCell *lc;
5669 int j;
5670
5671 Assert(row < PQntuples(res));
5672
5673 /*
5674 * Do the following work in a temp context that we reset after each tuple.
5675 * This cleans up not only the data we have direct access to, but any
5676 * cruft the I/O functions might leak.
5677 */
5678 oldcontext = MemoryContextSwitchTo(temp_context);
5679
5680 /*
5681 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
5682 * provided, otherwise look to the scan node's ScanTupleSlot.
5683 */
5684 if (rel)
5685 tupdesc = RelationGetDescr(rel);
5686 else
5687 {
5688 Assert(fsstate);
5689 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
5690 }
5691
5692 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5693 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5694 /* Initialize to nulls for any columns not present in result */
5695 memset(nulls, true, tupdesc->natts * sizeof(bool));
5696
5697 /*
5698 * Set up and install callback to report where conversion error occurs.
5699 */
5700 errpos.cur_attno = 0;
5701 errpos.rel = rel;
5702 errpos.fsstate = fsstate;
5703 errcallback.callback = conversion_error_callback;
5704 errcallback.arg = (void *) &errpos;
5705 errcallback.previous = error_context_stack;
5706 error_context_stack = &errcallback;
5707
5708 /*
5709 * i indexes columns in the relation, j indexes columns in the PGresult.
5710 */
5711 j = 0;
5712 foreach(lc, retrieved_attrs)
5713 {
5714 int i = lfirst_int(lc);
5715 char *valstr;
5716
5717 /* fetch next column's textual value */
5718 if (PQgetisnull(res, row, j))
5719 valstr = NULL;
5720 else
5721 valstr = PQgetvalue(res, row, j);
5722
5723 /*
5724 * convert value to internal representation
5725 *
5726 * Note: we ignore system columns other than ctid and oid in result
5727 */
5728 errpos.cur_attno = i;
5729 if (i > 0)
5730 {
5731 /* ordinary column */
5732 Assert(i <= tupdesc->natts);
5733 nulls[i - 1] = (valstr == NULL);
5734 /* Apply the input function even to nulls, to support domains */
5735 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5736 valstr,
5737 attinmeta->attioparams[i - 1],
5738 attinmeta->atttypmods[i - 1]);
5739 }
5740 else if (i == SelfItemPointerAttributeNumber)
5741 {
5742 /* ctid */
5743 if (valstr != NULL)
5744 {
5745 Datum datum;
5746
5747 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5748 ctid = (ItemPointer) DatumGetPointer(datum);
5749 }
5750 }
5751 else if (i == ObjectIdAttributeNumber)
5752 {
5753 /* oid */
5754 if (valstr != NULL)
5755 {
5756 Datum datum;
5757
5758 datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5759 oid = DatumGetObjectId(datum);
5760 }
5761 }
5762 errpos.cur_attno = 0;
5763
5764 j++;
5765 }
5766
5767 /* Uninstall error context callback. */
5768 error_context_stack = errcallback.previous;
5769
5770 /*
5771 * Check we got the expected number of columns. Note: j == 0 and
5772 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5773 */
5774 if (j > 0 && j != PQnfields(res))
5775 elog(ERROR, "remote query result does not match the foreign table");
5776
5777 /*
5778 * Build the result tuple in caller's memory context.
5779 */
5780 MemoryContextSwitchTo(oldcontext);
5781
5782 tuple = heap_form_tuple(tupdesc, values, nulls);
5783
5784 /*
5785 * If we have a CTID to return, install it in both t_self and t_ctid.
5786 * t_self is the normal place, but if the tuple is converted to a
5787 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5788 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5789 */
5790 if (ctid)
5791 tuple->t_self = tuple->t_data->t_ctid = *ctid;
5792
5793 /*
5794 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5795 * heap_form_tuple. heap_form_tuple actually creates the tuple with
5796 * DatumTupleFields, not HeapTupleFields, but the executor expects
5797 * HeapTupleFields and will happily extract system columns on that
5798 * assumption. If we don't do this then, for example, the tuple length
5799 * ends up in the xmin field, which isn't what we want.
5800 */
5801 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
5802 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
5803 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
5804
5805 /*
5806 * If we have an OID to return, install it.
5807 */
5808 if (OidIsValid(oid))
5809 HeapTupleSetOid(tuple, oid);
5810
5811 /* Clean up */
5812 MemoryContextReset(temp_context);
5813
5814 return tuple;
5815 }
5816
5817 /*
5818 * Callback function which is called when error occurs during column value
5819 * conversion. Print names of column and relation.
5820 *
5821 * Note that this function mustn't do any catalog lookups, since we are in
5822 * an already-failed transaction. Fortunately, we can get the needed info
5823 * from the relation or the query's rangetable instead.
5824 */
5825 static void
conversion_error_callback(void * arg)5826 conversion_error_callback(void *arg)
5827 {
5828 ConversionLocation *errpos = (ConversionLocation *) arg;
5829 Relation rel = errpos->rel;
5830 ForeignScanState *fsstate = errpos->fsstate;
5831 const char *attname = NULL;
5832 const char *relname = NULL;
5833 bool is_wholerow = false;
5834
5835 /*
5836 * If we're in a scan node, always use aliases from the rangetable, for
5837 * consistency between the simple-relation and remote-join cases. Look at
5838 * the relation's tupdesc only if we're not in a scan node.
5839 */
5840 if (fsstate)
5841 {
5842 /* ForeignScan case */
5843 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5844 int varno = 0;
5845 AttrNumber colno = 0;
5846
5847 if (fsplan->scan.scanrelid > 0)
5848 {
5849 /* error occurred in a scan against a foreign table */
5850 varno = fsplan->scan.scanrelid;
5851 colno = errpos->cur_attno;
5852 }
5853 else
5854 {
5855 /* error occurred in a scan against a foreign join */
5856 TargetEntry *tle;
5857
5858 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
5859 errpos->cur_attno - 1);
5860
5861 /*
5862 * Target list can have Vars and expressions. For Vars, we can
5863 * get some information, however for expressions we can't. Thus
5864 * for expressions, just show generic context message.
5865 */
5866 if (IsA(tle->expr, Var))
5867 {
5868 Var *var = (Var *) tle->expr;
5869
5870 varno = var->varno;
5871 colno = var->varattno;
5872 }
5873 }
5874
5875 if (varno > 0)
5876 {
5877 EState *estate = fsstate->ss.ps.state;
5878 RangeTblEntry *rte = rt_fetch(varno, estate->es_range_table);
5879
5880 relname = rte->eref->aliasname;
5881
5882 if (colno == 0)
5883 is_wholerow = true;
5884 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
5885 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
5886 else if (colno == SelfItemPointerAttributeNumber)
5887 attname = "ctid";
5888 else if (colno == ObjectIdAttributeNumber)
5889 attname = "oid";
5890 }
5891 }
5892 else if (rel)
5893 {
5894 /* Non-ForeignScan case (we should always have a rel here) */
5895 TupleDesc tupdesc = RelationGetDescr(rel);
5896
5897 relname = RelationGetRelationName(rel);
5898 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5899 {
5900 Form_pg_attribute attr = TupleDescAttr(tupdesc,
5901 errpos->cur_attno - 1);
5902
5903 attname = NameStr(attr->attname);
5904 }
5905 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5906 attname = "ctid";
5907 }
5908
5909 if (relname && is_wholerow)
5910 errcontext("whole-row reference to foreign table \"%s\"", relname);
5911 else if (relname && attname)
5912 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5913 else
5914 errcontext("processing expression at position %d in select list",
5915 errpos->cur_attno);
5916 }
5917
5918 /*
5919 * Find an equivalence class member expression, all of whose Vars, come from
5920 * the indicated relation.
5921 */
5922 Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)5923 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
5924 {
5925 ListCell *lc_em;
5926
5927 foreach(lc_em, ec->ec_members)
5928 {
5929 EquivalenceMember *em = lfirst(lc_em);
5930
5931 if (bms_is_subset(em->em_relids, rel->relids) &&
5932 !bms_is_empty(em->em_relids))
5933 {
5934 /*
5935 * If there is more than one equivalence member whose Vars are
5936 * taken entirely from this relation, we'll be content to choose
5937 * any one of those.
5938 */
5939 return em->em_expr;
5940 }
5941 }
5942
5943 /* We didn't find any suitable equivalence class expression */
5944 return NULL;
5945 }
5946