1 /*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2017, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "postgres_fdw.h"
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44
45 PG_MODULE_MAGIC;
46
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55
56 /*
57 * Indexes of FDW-private information stored in fdw_private lists.
58 *
59 * These items are indexed with the enum FdwScanPrivateIndex, so an item
60 * can be fetched with list_nth(). For example, to get the SELECT statement:
61 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62 */
63 enum FdwScanPrivateIndex
64 {
65 /* SQL statement to execute remotely (as a String node) */
66 FdwScanPrivateSelectSql,
67 /* Integer list of attribute numbers retrieved by the SELECT */
68 FdwScanPrivateRetrievedAttrs,
69 /* Integer representing the desired fetch_size */
70 FdwScanPrivateFetchSize,
71
72 /*
73 * String describing join i.e. names of relations being joined and types
74 * of join, added when the scan is join
75 */
76 FdwScanPrivateRelations
77 };
78
79 /*
80 * Similarly, this enum describes what's kept in the fdw_private list for
81 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
82 *
83 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84 * 2) Integer list of target attribute numbers for INSERT/UPDATE
85 * (NIL for a DELETE)
86 * 3) Boolean flag showing if the remote query has a RETURNING clause
87 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88 */
89 enum FdwModifyPrivateIndex
90 {
91 /* SQL statement to execute remotely (as a String node) */
92 FdwModifyPrivateUpdateSql,
93 /* Integer list of target attribute numbers for INSERT/UPDATE */
94 FdwModifyPrivateTargetAttnums,
95 /* has-returning flag (as an integer Value node) */
96 FdwModifyPrivateHasReturning,
97 /* Integer list of attribute numbers retrieved by RETURNING */
98 FdwModifyPrivateRetrievedAttrs
99 };
100
101 /*
102 * Similarly, this enum describes what's kept in the fdw_private list for
103 * a ForeignScan node that modifies a foreign table directly. We store:
104 *
105 * 1) UPDATE/DELETE statement text to be sent to the remote server
106 * 2) Boolean flag showing if the remote query has a RETURNING clause
107 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108 * 4) Boolean flag showing if we set the command es_processed
109 */
110 enum FdwDirectModifyPrivateIndex
111 {
112 /* SQL statement to execute remotely (as a String node) */
113 FdwDirectModifyPrivateUpdateSql,
114 /* has-returning flag (as an integer Value node) */
115 FdwDirectModifyPrivateHasReturning,
116 /* Integer list of attribute numbers retrieved by RETURNING */
117 FdwDirectModifyPrivateRetrievedAttrs,
118 /* set-processed flag (as an integer Value node) */
119 FdwDirectModifyPrivateSetProcessed
120 };
121
122 /*
123 * Execution state of a foreign scan using postgres_fdw.
124 */
125 typedef struct PgFdwScanState
126 {
127 Relation rel; /* relcache entry for the foreign table. NULL
128 * for a foreign join scan. */
129 TupleDesc tupdesc; /* tuple descriptor of scan */
130 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
131
132 /* extracted fdw_private data */
133 char *query; /* text of SELECT command */
134 List *retrieved_attrs; /* list of retrieved attribute numbers */
135
136 /* for remote query execution */
137 PGconn *conn; /* connection for the scan */
138 unsigned int cursor_number; /* quasi-unique ID for my cursor */
139 bool cursor_exists; /* have we created the cursor? */
140 int numParams; /* number of parameters passed to query */
141 FmgrInfo *param_flinfo; /* output conversion functions for them */
142 List *param_exprs; /* executable expressions for param values */
143 const char **param_values; /* textual values of query parameters */
144
145 /* for storing result tuples */
146 HeapTuple *tuples; /* array of currently-retrieved tuples */
147 int num_tuples; /* # of tuples in array */
148 int next_tuple; /* index of next one to return */
149
150 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
151 int fetch_ct_2; /* Min(# of fetches done, 2) */
152 bool eof_reached; /* true if last fetch reached EOF */
153
154 /* working memory contexts */
155 MemoryContext batch_cxt; /* context holding current batch of tuples */
156 MemoryContext temp_cxt; /* context for per-tuple temporary data */
157
158 int fetch_size; /* number of tuples per fetch */
159 } PgFdwScanState;
160
161 /*
162 * Execution state of a foreign insert/update/delete operation.
163 */
164 typedef struct PgFdwModifyState
165 {
166 Relation rel; /* relcache entry for the foreign table */
167 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168
169 /* for remote query execution */
170 PGconn *conn; /* connection for the scan */
171 char *p_name; /* name of prepared statement, if created */
172
173 /* extracted fdw_private data */
174 char *query; /* text of INSERT/UPDATE/DELETE command */
175 List *target_attrs; /* list of target attribute numbers */
176 bool has_returning; /* is there a RETURNING clause? */
177 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178
179 /* info about parameters for prepared statement */
180 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181 int p_nums; /* number of parameters to transmit */
182 FmgrInfo *p_flinfo; /* output conversion functions for them */
183
184 /* working memory context */
185 MemoryContext temp_cxt; /* context for per-tuple temporary data */
186 } PgFdwModifyState;
187
188 /*
189 * Execution state of a foreign scan that modifies a foreign table directly.
190 */
191 typedef struct PgFdwDirectModifyState
192 {
193 Relation rel; /* relcache entry for the foreign table */
194 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195
196 /* extracted fdw_private data */
197 char *query; /* text of UPDATE/DELETE command */
198 bool has_returning; /* is there a RETURNING clause? */
199 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200 bool set_processed; /* do we set the command es_processed? */
201
202 /* for remote query execution */
203 PGconn *conn; /* connection for the update */
204 int numParams; /* number of parameters passed to query */
205 FmgrInfo *param_flinfo; /* output conversion functions for them */
206 List *param_exprs; /* executable expressions for param values */
207 const char **param_values; /* textual values of query parameters */
208
209 /* for storing result tuples */
210 PGresult *result; /* result for query */
211 int num_tuples; /* # of result tuples */
212 int next_tuple; /* index of next one to return */
213
214 /* working memory context */
215 MemoryContext temp_cxt; /* context for per-tuple temporary data */
216 } PgFdwDirectModifyState;
217
218 /*
219 * Workspace for analyzing a foreign table.
220 */
221 typedef struct PgFdwAnalyzeState
222 {
223 Relation rel; /* relcache entry for the foreign table */
224 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
225 List *retrieved_attrs; /* attr numbers retrieved by query */
226
227 /* collected sample rows */
228 HeapTuple *rows; /* array of size targrows */
229 int targrows; /* target # of sample rows */
230 int numrows; /* # of sample rows collected */
231
232 /* for random sampling */
233 double samplerows; /* # of rows fetched */
234 double rowstoskip; /* # of rows to skip before next sample */
235 ReservoirStateData rstate; /* state for reservoir sampling */
236
237 /* working memory contexts */
238 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
239 MemoryContext temp_cxt; /* context for per-tuple temporary data */
240 } PgFdwAnalyzeState;
241
242 /*
243 * Identify the attribute where data conversion fails.
244 */
245 typedef struct ConversionLocation
246 {
247 AttrNumber cur_attno; /* attribute number being processed, or 0 */
248 Relation rel; /* foreign table being processed, or NULL */
249 ForeignScanState *fsstate; /* plan node being processed, or NULL */
250 } ConversionLocation;
251
252 /* Callback argument for ec_member_matches_foreign */
253 typedef struct
254 {
255 Expr *current; /* current expr, or NULL if not yet found */
256 List *already_used; /* expressions already dealt with */
257 } ec_member_foreign_arg;
258
259 /*
260 * SQL functions
261 */
262 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
263
264 /*
265 * FDW callback routines
266 */
267 static void postgresGetForeignRelSize(PlannerInfo *root,
268 RelOptInfo *baserel,
269 Oid foreigntableid);
270 static void postgresGetForeignPaths(PlannerInfo *root,
271 RelOptInfo *baserel,
272 Oid foreigntableid);
273 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
274 RelOptInfo *baserel,
275 Oid foreigntableid,
276 ForeignPath *best_path,
277 List *tlist,
278 List *scan_clauses,
279 Plan *outer_plan);
280 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
281 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
282 static void postgresReScanForeignScan(ForeignScanState *node);
283 static void postgresEndForeignScan(ForeignScanState *node);
284 static void postgresAddForeignUpdateTargets(Query *parsetree,
285 RangeTblEntry *target_rte,
286 Relation target_relation);
287 static List *postgresPlanForeignModify(PlannerInfo *root,
288 ModifyTable *plan,
289 Index resultRelation,
290 int subplan_index);
291 static void postgresBeginForeignModify(ModifyTableState *mtstate,
292 ResultRelInfo *resultRelInfo,
293 List *fdw_private,
294 int subplan_index,
295 int eflags);
296 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
297 ResultRelInfo *resultRelInfo,
298 TupleTableSlot *slot,
299 TupleTableSlot *planSlot);
300 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
301 ResultRelInfo *resultRelInfo,
302 TupleTableSlot *slot,
303 TupleTableSlot *planSlot);
304 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
305 ResultRelInfo *resultRelInfo,
306 TupleTableSlot *slot,
307 TupleTableSlot *planSlot);
308 static void postgresEndForeignModify(EState *estate,
309 ResultRelInfo *resultRelInfo);
310 static int postgresIsForeignRelUpdatable(Relation rel);
311 static bool postgresPlanDirectModify(PlannerInfo *root,
312 ModifyTable *plan,
313 Index resultRelation,
314 int subplan_index);
315 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
316 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
317 static void postgresEndDirectModify(ForeignScanState *node);
318 static void postgresExplainForeignScan(ForeignScanState *node,
319 ExplainState *es);
320 static void postgresExplainForeignModify(ModifyTableState *mtstate,
321 ResultRelInfo *rinfo,
322 List *fdw_private,
323 int subplan_index,
324 ExplainState *es);
325 static void postgresExplainDirectModify(ForeignScanState *node,
326 ExplainState *es);
327 static bool postgresAnalyzeForeignTable(Relation relation,
328 AcquireSampleRowsFunc *func,
329 BlockNumber *totalpages);
330 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
331 Oid serverOid);
332 static void postgresGetForeignJoinPaths(PlannerInfo *root,
333 RelOptInfo *joinrel,
334 RelOptInfo *outerrel,
335 RelOptInfo *innerrel,
336 JoinType jointype,
337 JoinPathExtraData *extra);
338 static bool postgresRecheckForeignScan(ForeignScanState *node,
339 TupleTableSlot *slot);
340 static void postgresGetForeignUpperPaths(PlannerInfo *root,
341 UpperRelationKind stage,
342 RelOptInfo *input_rel,
343 RelOptInfo *output_rel);
344
345 /*
346 * Helper functions
347 */
348 static void estimate_path_cost_size(PlannerInfo *root,
349 RelOptInfo *baserel,
350 List *join_conds,
351 List *pathkeys,
352 double *p_rows, int *p_width,
353 Cost *p_startup_cost, Cost *p_total_cost);
354 static void get_remote_estimate(const char *sql,
355 PGconn *conn,
356 double *rows,
357 int *width,
358 Cost *startup_cost,
359 Cost *total_cost);
360 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
361 EquivalenceClass *ec, EquivalenceMember *em,
362 void *arg);
363 static void create_cursor(ForeignScanState *node);
364 static void fetch_more_data(ForeignScanState *node);
365 static void close_cursor(PGconn *conn, unsigned int cursor_number);
366 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
367 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
368 ItemPointer tupleid,
369 TupleTableSlot *slot);
370 static void store_returning_result(PgFdwModifyState *fmstate,
371 TupleTableSlot *slot, PGresult *res);
372 static void execute_dml_stmt(ForeignScanState *node);
373 static TupleTableSlot *get_returning_data(ForeignScanState *node);
374 static void prepare_query_params(PlanState *node,
375 List *fdw_exprs,
376 int numParams,
377 FmgrInfo **param_flinfo,
378 List **param_exprs,
379 const char ***param_values);
380 static void process_query_params(ExprContext *econtext,
381 FmgrInfo *param_flinfo,
382 List *param_exprs,
383 const char **param_values);
384 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
385 HeapTuple *rows, int targrows,
386 double *totalrows,
387 double *totaldeadrows);
388 static void analyze_row_processor(PGresult *res, int row,
389 PgFdwAnalyzeState *astate);
390 static HeapTuple make_tuple_from_result_row(PGresult *res,
391 int row,
392 Relation rel,
393 AttInMetadata *attinmeta,
394 List *retrieved_attrs,
395 ForeignScanState *fsstate,
396 MemoryContext temp_context);
397 static void conversion_error_callback(void *arg);
398 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
399 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
400 JoinPathExtraData *extra);
401 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
402 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
403 RelOptInfo *rel);
404 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
405 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
406 Path *epq_path);
407 static void add_foreign_grouping_paths(PlannerInfo *root,
408 RelOptInfo *input_rel,
409 RelOptInfo *grouped_rel);
410 static void apply_server_options(PgFdwRelationInfo *fpinfo);
411 static void apply_table_options(PgFdwRelationInfo *fpinfo);
412 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
413 const PgFdwRelationInfo *fpinfo_o,
414 const PgFdwRelationInfo *fpinfo_i);
415
416
417 /*
418 * Foreign-data wrapper handler function: return a struct with pointers
419 * to my callback routines.
420 */
421 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)422 postgres_fdw_handler(PG_FUNCTION_ARGS)
423 {
424 FdwRoutine *routine = makeNode(FdwRoutine);
425
426 /* Functions for scanning foreign tables */
427 routine->GetForeignRelSize = postgresGetForeignRelSize;
428 routine->GetForeignPaths = postgresGetForeignPaths;
429 routine->GetForeignPlan = postgresGetForeignPlan;
430 routine->BeginForeignScan = postgresBeginForeignScan;
431 routine->IterateForeignScan = postgresIterateForeignScan;
432 routine->ReScanForeignScan = postgresReScanForeignScan;
433 routine->EndForeignScan = postgresEndForeignScan;
434
435 /* Functions for updating foreign tables */
436 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
437 routine->PlanForeignModify = postgresPlanForeignModify;
438 routine->BeginForeignModify = postgresBeginForeignModify;
439 routine->ExecForeignInsert = postgresExecForeignInsert;
440 routine->ExecForeignUpdate = postgresExecForeignUpdate;
441 routine->ExecForeignDelete = postgresExecForeignDelete;
442 routine->EndForeignModify = postgresEndForeignModify;
443 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
444 routine->PlanDirectModify = postgresPlanDirectModify;
445 routine->BeginDirectModify = postgresBeginDirectModify;
446 routine->IterateDirectModify = postgresIterateDirectModify;
447 routine->EndDirectModify = postgresEndDirectModify;
448
449 /* Function for EvalPlanQual rechecks */
450 routine->RecheckForeignScan = postgresRecheckForeignScan;
451 /* Support functions for EXPLAIN */
452 routine->ExplainForeignScan = postgresExplainForeignScan;
453 routine->ExplainForeignModify = postgresExplainForeignModify;
454 routine->ExplainDirectModify = postgresExplainDirectModify;
455
456 /* Support functions for ANALYZE */
457 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
458
459 /* Support functions for IMPORT FOREIGN SCHEMA */
460 routine->ImportForeignSchema = postgresImportForeignSchema;
461
462 /* Support functions for join push-down */
463 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
464
465 /* Support functions for upper relation push-down */
466 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
467
468 PG_RETURN_POINTER(routine);
469 }
470
471 /*
472 * postgresGetForeignRelSize
473 * Estimate # of rows and width of the result of the scan
474 *
475 * We should consider the effect of all baserestrictinfo clauses here, but
476 * not any join clauses.
477 */
478 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)479 postgresGetForeignRelSize(PlannerInfo *root,
480 RelOptInfo *baserel,
481 Oid foreigntableid)
482 {
483 PgFdwRelationInfo *fpinfo;
484 ListCell *lc;
485 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
486 const char *namespace;
487 const char *relname;
488 const char *refname;
489
490 /*
491 * We use PgFdwRelationInfo to pass various information to subsequent
492 * functions.
493 */
494 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
495 baserel->fdw_private = (void *) fpinfo;
496
497 /* Base foreign tables need to be pushed down always. */
498 fpinfo->pushdown_safe = true;
499
500 /* Look up foreign-table catalog info. */
501 fpinfo->table = GetForeignTable(foreigntableid);
502 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
503
504 /*
505 * Extract user-settable option values. Note that per-table settings of
506 * use_remote_estimate and fetch_size override per-server settings of
507 * them, respectively.
508 */
509 fpinfo->use_remote_estimate = false;
510 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
511 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
512 fpinfo->shippable_extensions = NIL;
513 fpinfo->fetch_size = 100;
514
515 apply_server_options(fpinfo);
516 apply_table_options(fpinfo);
517
518 /*
519 * If the table or the server is configured to use remote estimates,
520 * identify which user to do remote access as during planning. This
521 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
522 * permissions, the query would have failed at runtime anyway.
523 */
524 if (fpinfo->use_remote_estimate)
525 {
526 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
527
528 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
529 }
530 else
531 fpinfo->user = NULL;
532
533 /*
534 * Identify which baserestrictinfo clauses can be sent to the remote
535 * server and which can't.
536 */
537 classifyConditions(root, baserel, baserel->baserestrictinfo,
538 &fpinfo->remote_conds, &fpinfo->local_conds);
539
540 /*
541 * Identify which attributes will need to be retrieved from the remote
542 * server. These include all attrs needed for joins or final output, plus
543 * all attrs used in the local_conds. (Note: if we end up using a
544 * parameterized scan, it's possible that some of the join clauses will be
545 * sent to the remote and thus we wouldn't really need to retrieve the
546 * columns used in them. Doesn't seem worth detecting that case though.)
547 */
548 fpinfo->attrs_used = NULL;
549 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
550 &fpinfo->attrs_used);
551 foreach(lc, fpinfo->local_conds)
552 {
553 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
554
555 pull_varattnos((Node *) rinfo->clause, baserel->relid,
556 &fpinfo->attrs_used);
557 }
558
559 /*
560 * Compute the selectivity and cost of the local_conds, so we don't have
561 * to do it over again for each path. The best we can do for these
562 * conditions is to estimate selectivity on the basis of local statistics.
563 */
564 fpinfo->local_conds_sel = clauselist_selectivity(root,
565 fpinfo->local_conds,
566 baserel->relid,
567 JOIN_INNER,
568 NULL);
569
570 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
571
572 /*
573 * Set cached relation costs to some negative value, so that we can detect
574 * when they are set to some sensible costs during one (usually the first)
575 * of the calls to estimate_path_cost_size().
576 */
577 fpinfo->rel_startup_cost = -1;
578 fpinfo->rel_total_cost = -1;
579
580 /*
581 * If the table or the server is configured to use remote estimates,
582 * connect to the foreign server and execute EXPLAIN to estimate the
583 * number of rows selected by the restriction clauses, as well as the
584 * average row width. Otherwise, estimate using whatever statistics we
585 * have locally, in a way similar to ordinary tables.
586 */
587 if (fpinfo->use_remote_estimate)
588 {
589 /*
590 * Get cost/size estimates with help of remote server. Save the
591 * values in fpinfo so we don't need to do it again to generate the
592 * basic foreign path.
593 */
594 estimate_path_cost_size(root, baserel, NIL, NIL,
595 &fpinfo->rows, &fpinfo->width,
596 &fpinfo->startup_cost, &fpinfo->total_cost);
597
598 /* Report estimated baserel size to planner. */
599 baserel->rows = fpinfo->rows;
600 baserel->reltarget->width = fpinfo->width;
601 }
602 else
603 {
604 /*
605 * If the foreign table has never been ANALYZEd, it will have relpages
606 * and reltuples equal to zero, which most likely has nothing to do
607 * with reality. We can't do a whole lot about that if we're not
608 * allowed to consult the remote server, but we can use a hack similar
609 * to plancat.c's treatment of empty relations: use a minimum size
610 * estimate of 10 pages, and divide by the column-datatype-based width
611 * estimate to get the corresponding number of tuples.
612 */
613 if (baserel->pages == 0 && baserel->tuples == 0)
614 {
615 baserel->pages = 10;
616 baserel->tuples =
617 (10 * BLCKSZ) / (baserel->reltarget->width +
618 MAXALIGN(SizeofHeapTupleHeader));
619 }
620
621 /* Estimate baserel size as best we can with local statistics. */
622 set_baserel_size_estimates(root, baserel);
623
624 /* Fill in basically-bogus cost estimates for use later. */
625 estimate_path_cost_size(root, baserel, NIL, NIL,
626 &fpinfo->rows, &fpinfo->width,
627 &fpinfo->startup_cost, &fpinfo->total_cost);
628 }
629
630 /*
631 * Set the name of relation in fpinfo, while we are constructing it here.
632 * It will be used to build the string describing the join relation in
633 * EXPLAIN output. We can't know whether VERBOSE option is specified or
634 * not, so always schema-qualify the foreign table name.
635 */
636 fpinfo->relation_name = makeStringInfo();
637 namespace = get_namespace_name(get_rel_namespace(foreigntableid));
638 relname = get_rel_name(foreigntableid);
639 refname = rte->eref->aliasname;
640 appendStringInfo(fpinfo->relation_name, "%s.%s",
641 quote_identifier(namespace),
642 quote_identifier(relname));
643 if (*refname && strcmp(refname, relname) != 0)
644 appendStringInfo(fpinfo->relation_name, " %s",
645 quote_identifier(rte->eref->aliasname));
646
647 /* No outer and inner relations. */
648 fpinfo->make_outerrel_subquery = false;
649 fpinfo->make_innerrel_subquery = false;
650 fpinfo->lower_subquery_rels = NULL;
651 /* Set the relation index. */
652 fpinfo->relation_index = baserel->relid;
653 }
654
655 /*
656 * get_useful_ecs_for_relation
657 * Determine which EquivalenceClasses might be involved in useful
658 * orderings of this relation.
659 *
660 * This function is in some respects a mirror image of the core function
661 * pathkeys_useful_for_merging: for a regular table, we know what indexes
662 * we have and want to test whether any of them are useful. For a foreign
663 * table, we don't know what indexes are present on the remote side but
664 * want to speculate about which ones we'd like to use if they existed.
665 *
666 * This function returns a list of potentially-useful equivalence classes,
667 * but it does not guarantee that an EquivalenceMember exists which contains
668 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
669 * ft1.x + t1.x = 0, this function will say that the equivalence class
670 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
671 * t1 is local (or on a different server), it will turn out that no useful
672 * ORDER BY clause can be generated. It's not our job to figure that out
673 * here; we're only interested in identifying relevant ECs.
674 */
675 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)676 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
677 {
678 List *useful_eclass_list = NIL;
679 ListCell *lc;
680 Relids relids;
681
682 /*
683 * First, consider whether any active EC is potentially useful for a merge
684 * join against this relation.
685 */
686 if (rel->has_eclass_joins)
687 {
688 foreach(lc, root->eq_classes)
689 {
690 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
691
692 if (eclass_useful_for_merging(root, cur_ec, rel))
693 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
694 }
695 }
696
697 /*
698 * Next, consider whether there are any non-EC derivable join clauses that
699 * are merge-joinable. If the joininfo list is empty, we can exit
700 * quickly.
701 */
702 if (rel->joininfo == NIL)
703 return useful_eclass_list;
704
705 /* If this is a child rel, we must use the topmost parent rel to search. */
706 if (IS_OTHER_REL(rel))
707 {
708 Assert(!bms_is_empty(rel->top_parent_relids));
709 relids = rel->top_parent_relids;
710 }
711 else
712 relids = rel->relids;
713
714 /* Check each join clause in turn. */
715 foreach(lc, rel->joininfo)
716 {
717 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
718
719 /* Consider only mergejoinable clauses */
720 if (restrictinfo->mergeopfamilies == NIL)
721 continue;
722
723 /* Make sure we've got canonical ECs. */
724 update_mergeclause_eclasses(root, restrictinfo);
725
726 /*
727 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
728 * that left_ec and right_ec will be initialized, per comments in
729 * distribute_qual_to_rels.
730 *
731 * We want to identify which side of this merge-joinable clause
732 * contains columns from the relation produced by this RelOptInfo. We
733 * test for overlap, not containment, because there could be extra
734 * relations on either side. For example, suppose we've got something
735 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
736 * A.y = D.y. The input rel might be the joinrel between A and B, and
737 * we'll consider the join clause A.y = D.y. relids contains a
738 * relation not involved in the join class (B) and the equivalence
739 * class for the left-hand side of the clause contains a relation not
740 * involved in the input rel (C). Despite the fact that we have only
741 * overlap and not containment in either direction, A.y is potentially
742 * useful as a sort column.
743 *
744 * Note that it's even possible that relids overlaps neither side of
745 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
746 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
747 * but overlaps neither side of B. In that case, we just skip this
748 * join clause, since it doesn't suggest a useful sort order for this
749 * relation.
750 */
751 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
752 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
753 restrictinfo->right_ec);
754 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
755 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
756 restrictinfo->left_ec);
757 }
758
759 return useful_eclass_list;
760 }
761
762 /*
763 * get_useful_pathkeys_for_relation
764 * Determine which orderings of a relation might be useful.
765 *
766 * Getting data in sorted order can be useful either because the requested
767 * order matches the final output ordering for the overall query we're
768 * planning, or because it enables an efficient merge join. Here, we try
769 * to figure out which pathkeys to consider.
770 */
771 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)772 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
773 {
774 List *useful_pathkeys_list = NIL;
775 List *useful_eclass_list;
776 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
777 EquivalenceClass *query_ec = NULL;
778 ListCell *lc;
779
780 /*
781 * Pushing the query_pathkeys to the remote server is always worth
782 * considering, because it might let us avoid a local sort.
783 */
784 if (root->query_pathkeys)
785 {
786 bool query_pathkeys_ok = true;
787
788 foreach(lc, root->query_pathkeys)
789 {
790 PathKey *pathkey = (PathKey *) lfirst(lc);
791 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
792 Expr *em_expr;
793
794 /*
795 * The planner and executor don't have any clever strategy for
796 * taking data sorted by a prefix of the query's pathkeys and
797 * getting it to be sorted by all of those pathkeys. We'll just
798 * end up resorting the entire data set. So, unless we can push
799 * down all of the query pathkeys, forget it.
800 *
801 * is_foreign_expr would detect volatile expressions as well, but
802 * checking ec_has_volatile here saves some cycles.
803 */
804 if (pathkey_ec->ec_has_volatile ||
805 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
806 !is_foreign_expr(root, rel, em_expr))
807 {
808 query_pathkeys_ok = false;
809 break;
810 }
811 }
812
813 if (query_pathkeys_ok)
814 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
815 }
816
817 /*
818 * Even if we're not using remote estimates, having the remote side do the
819 * sort generally won't be any worse than doing it locally, and it might
820 * be much better if the remote side can generate data in the right order
821 * without needing a sort at all. However, what we're going to do next is
822 * try to generate pathkeys that seem promising for possible merge joins,
823 * and that's more speculative. A wrong choice might hurt quite a bit, so
824 * bail out if we can't use remote estimates.
825 */
826 if (!fpinfo->use_remote_estimate)
827 return useful_pathkeys_list;
828
829 /* Get the list of interesting EquivalenceClasses. */
830 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
831
832 /* Extract unique EC for query, if any, so we don't consider it again. */
833 if (list_length(root->query_pathkeys) == 1)
834 {
835 PathKey *query_pathkey = linitial(root->query_pathkeys);
836
837 query_ec = query_pathkey->pk_eclass;
838 }
839
840 /*
841 * As a heuristic, the only pathkeys we consider here are those of length
842 * one. It's surely possible to consider more, but since each one we
843 * choose to consider will generate a round-trip to the remote side, we
844 * need to be a bit cautious here. It would sure be nice to have a local
845 * cache of information about remote index definitions...
846 */
847 foreach(lc, useful_eclass_list)
848 {
849 EquivalenceClass *cur_ec = lfirst(lc);
850 Expr *em_expr;
851 PathKey *pathkey;
852
853 /* If redundant with what we did above, skip it. */
854 if (cur_ec == query_ec)
855 continue;
856
857 /* If no pushable expression for this rel, skip it. */
858 em_expr = find_em_expr_for_rel(cur_ec, rel);
859 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
860 continue;
861
862 /* Looks like we can generate a pathkey, so let's do it. */
863 pathkey = make_canonical_pathkey(root, cur_ec,
864 linitial_oid(cur_ec->ec_opfamilies),
865 BTLessStrategyNumber,
866 false);
867 useful_pathkeys_list = lappend(useful_pathkeys_list,
868 list_make1(pathkey));
869 }
870
871 return useful_pathkeys_list;
872 }
873
874 /*
875 * postgresGetForeignPaths
876 * Create possible scan paths for a scan on the foreign table
877 */
878 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)879 postgresGetForeignPaths(PlannerInfo *root,
880 RelOptInfo *baserel,
881 Oid foreigntableid)
882 {
883 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
884 ForeignPath *path;
885 List *ppi_list;
886 ListCell *lc;
887
888 /*
889 * Create simplest ForeignScan path node and add it to baserel. This path
890 * corresponds to SeqScan path of regular tables (though depending on what
891 * baserestrict conditions we were able to send to remote, there might
892 * actually be an indexscan happening there). We already did all the work
893 * to estimate cost and size of this path.
894 *
895 * Although this path uses no join clauses, it could still have required
896 * parameterization due to LATERAL refs in its tlist.
897 */
898 path = create_foreignscan_path(root, baserel,
899 NULL, /* default pathtarget */
900 fpinfo->rows,
901 fpinfo->startup_cost,
902 fpinfo->total_cost,
903 NIL, /* no pathkeys */
904 baserel->lateral_relids,
905 NULL, /* no extra plan */
906 NIL); /* no fdw_private list */
907 add_path(baserel, (Path *) path);
908
909 /* Add paths with pathkeys */
910 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
911
912 /*
913 * If we're not using remote estimates, stop here. We have no way to
914 * estimate whether any join clauses would be worth sending across, so
915 * don't bother building parameterized paths.
916 */
917 if (!fpinfo->use_remote_estimate)
918 return;
919
920 /*
921 * Thumb through all join clauses for the rel to identify which outer
922 * relations could supply one or more safe-to-send-to-remote join clauses.
923 * We'll build a parameterized path for each such outer relation.
924 *
925 * It's convenient to manage this by representing each candidate outer
926 * relation by the ParamPathInfo node for it. We can then use the
927 * ppi_clauses list in the ParamPathInfo node directly as a list of the
928 * interesting join clauses for that rel. This takes care of the
929 * possibility that there are multiple safe join clauses for such a rel,
930 * and also ensures that we account for unsafe join clauses that we'll
931 * still have to enforce locally (since the parameterized-path machinery
932 * insists that we handle all movable clauses).
933 */
934 ppi_list = NIL;
935 foreach(lc, baserel->joininfo)
936 {
937 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
938 Relids required_outer;
939 ParamPathInfo *param_info;
940
941 /* Check if clause can be moved to this rel */
942 if (!join_clause_is_movable_to(rinfo, baserel))
943 continue;
944
945 /* See if it is safe to send to remote */
946 if (!is_foreign_expr(root, baserel, rinfo->clause))
947 continue;
948
949 /* Calculate required outer rels for the resulting path */
950 required_outer = bms_union(rinfo->clause_relids,
951 baserel->lateral_relids);
952 /* We do not want the foreign rel itself listed in required_outer */
953 required_outer = bms_del_member(required_outer, baserel->relid);
954
955 /*
956 * required_outer probably can't be empty here, but if it were, we
957 * couldn't make a parameterized path.
958 */
959 if (bms_is_empty(required_outer))
960 continue;
961
962 /* Get the ParamPathInfo */
963 param_info = get_baserel_parampathinfo(root, baserel,
964 required_outer);
965 Assert(param_info != NULL);
966
967 /*
968 * Add it to list unless we already have it. Testing pointer equality
969 * is OK since get_baserel_parampathinfo won't make duplicates.
970 */
971 ppi_list = list_append_unique_ptr(ppi_list, param_info);
972 }
973
974 /*
975 * The above scan examined only "generic" join clauses, not those that
976 * were absorbed into EquivalenceClauses. See if we can make anything out
977 * of EquivalenceClauses.
978 */
979 if (baserel->has_eclass_joins)
980 {
981 /*
982 * We repeatedly scan the eclass list looking for column references
983 * (or expressions) belonging to the foreign rel. Each time we find
984 * one, we generate a list of equivalence joinclauses for it, and then
985 * see if any are safe to send to the remote. Repeat till there are
986 * no more candidate EC members.
987 */
988 ec_member_foreign_arg arg;
989
990 arg.already_used = NIL;
991 for (;;)
992 {
993 List *clauses;
994
995 /* Make clauses, skipping any that join to lateral_referencers */
996 arg.current = NULL;
997 clauses = generate_implied_equalities_for_column(root,
998 baserel,
999 ec_member_matches_foreign,
1000 (void *) &arg,
1001 baserel->lateral_referencers);
1002
1003 /* Done if there are no more expressions in the foreign rel */
1004 if (arg.current == NULL)
1005 {
1006 Assert(clauses == NIL);
1007 break;
1008 }
1009
1010 /* Scan the extracted join clauses */
1011 foreach(lc, clauses)
1012 {
1013 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1014 Relids required_outer;
1015 ParamPathInfo *param_info;
1016
1017 /* Check if clause can be moved to this rel */
1018 if (!join_clause_is_movable_to(rinfo, baserel))
1019 continue;
1020
1021 /* See if it is safe to send to remote */
1022 if (!is_foreign_expr(root, baserel, rinfo->clause))
1023 continue;
1024
1025 /* Calculate required outer rels for the resulting path */
1026 required_outer = bms_union(rinfo->clause_relids,
1027 baserel->lateral_relids);
1028 required_outer = bms_del_member(required_outer, baserel->relid);
1029 if (bms_is_empty(required_outer))
1030 continue;
1031
1032 /* Get the ParamPathInfo */
1033 param_info = get_baserel_parampathinfo(root, baserel,
1034 required_outer);
1035 Assert(param_info != NULL);
1036
1037 /* Add it to list unless we already have it */
1038 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1039 }
1040
1041 /* Try again, now ignoring the expression we found this time */
1042 arg.already_used = lappend(arg.already_used, arg.current);
1043 }
1044 }
1045
1046 /*
1047 * Now build a path for each useful outer relation.
1048 */
1049 foreach(lc, ppi_list)
1050 {
1051 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1052 double rows;
1053 int width;
1054 Cost startup_cost;
1055 Cost total_cost;
1056
1057 /* Get a cost estimate from the remote */
1058 estimate_path_cost_size(root, baserel,
1059 param_info->ppi_clauses, NIL,
1060 &rows, &width,
1061 &startup_cost, &total_cost);
1062
1063 /*
1064 * ppi_rows currently won't get looked at by anything, but still we
1065 * may as well ensure that it matches our idea of the rowcount.
1066 */
1067 param_info->ppi_rows = rows;
1068
1069 /* Make the path */
1070 path = create_foreignscan_path(root, baserel,
1071 NULL, /* default pathtarget */
1072 rows,
1073 startup_cost,
1074 total_cost,
1075 NIL, /* no pathkeys */
1076 param_info->ppi_req_outer,
1077 NULL,
1078 NIL); /* no fdw_private list */
1079 add_path(baserel, (Path *) path);
1080 }
1081 }
1082
1083 /*
1084 * postgresGetForeignPlan
1085 * Create ForeignScan plan node which implements selected best path
1086 */
1087 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1088 postgresGetForeignPlan(PlannerInfo *root,
1089 RelOptInfo *foreignrel,
1090 Oid foreigntableid,
1091 ForeignPath *best_path,
1092 List *tlist,
1093 List *scan_clauses,
1094 Plan *outer_plan)
1095 {
1096 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1097 Index scan_relid;
1098 List *fdw_private;
1099 List *remote_exprs = NIL;
1100 List *local_exprs = NIL;
1101 List *params_list = NIL;
1102 List *fdw_scan_tlist = NIL;
1103 List *fdw_recheck_quals = NIL;
1104 List *retrieved_attrs;
1105 StringInfoData sql;
1106 ListCell *lc;
1107
1108 if (IS_SIMPLE_REL(foreignrel))
1109 {
1110 /*
1111 * For base relations, set scan_relid as the relid of the relation.
1112 */
1113 scan_relid = foreignrel->relid;
1114
1115 /*
1116 * In a base-relation scan, we must apply the given scan_clauses.
1117 *
1118 * Separate the scan_clauses into those that can be executed remotely
1119 * and those that can't. baserestrictinfo clauses that were
1120 * previously determined to be safe or unsafe by classifyConditions
1121 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1122 * else in the scan_clauses list will be a join clause, which we have
1123 * to check for remote-safety.
1124 *
1125 * Note: the join clauses we see here should be the exact same ones
1126 * previously examined by postgresGetForeignPaths. Possibly it'd be
1127 * worth passing forward the classification work done then, rather
1128 * than repeating it here.
1129 *
1130 * This code must match "extract_actual_clauses(scan_clauses, false)"
1131 * except for the additional decision about remote versus local
1132 * execution.
1133 */
1134 foreach(lc, scan_clauses)
1135 {
1136 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1137
1138 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1139 if (rinfo->pseudoconstant)
1140 continue;
1141
1142 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1143 remote_exprs = lappend(remote_exprs, rinfo->clause);
1144 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1145 local_exprs = lappend(local_exprs, rinfo->clause);
1146 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1147 remote_exprs = lappend(remote_exprs, rinfo->clause);
1148 else
1149 local_exprs = lappend(local_exprs, rinfo->clause);
1150 }
1151
1152 /*
1153 * For a base-relation scan, we have to support EPQ recheck, which
1154 * should recheck all the remote quals.
1155 */
1156 fdw_recheck_quals = remote_exprs;
1157 }
1158 else
1159 {
1160 /*
1161 * Join relation or upper relation - set scan_relid to 0.
1162 */
1163 scan_relid = 0;
1164
1165 /*
1166 * For a join rel, baserestrictinfo is NIL and we are not considering
1167 * parameterization right now, so there should be no scan_clauses for
1168 * a joinrel or an upper rel either.
1169 */
1170 Assert(!scan_clauses);
1171
1172 /*
1173 * Instead we get the conditions to apply from the fdw_private
1174 * structure.
1175 */
1176 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1177 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1178
1179 /*
1180 * We leave fdw_recheck_quals empty in this case, since we never need
1181 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1182 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1183 * If we're planning an upperrel (ie, remote grouping or aggregation)
1184 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1185 * allowed, and indeed we *can't* put the remote clauses into
1186 * fdw_recheck_quals because the unaggregated Vars won't be available
1187 * locally.
1188 */
1189
1190 /* Build the list of columns to be fetched from the foreign server. */
1191 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1192
1193 /*
1194 * Ensure that the outer plan produces a tuple whose descriptor
1195 * matches our scan tuple slot. Also, remove the local conditions
1196 * from outer plan's quals, lest they be evaluated twice, once by the
1197 * local plan and once by the scan.
1198 */
1199 if (outer_plan)
1200 {
1201 ListCell *lc;
1202
1203 /*
1204 * Right now, we only consider grouping and aggregation beyond
1205 * joins. Queries involving aggregates or grouping do not require
1206 * EPQ mechanism, hence should not have an outer plan here.
1207 */
1208 Assert(!IS_UPPER_REL(foreignrel));
1209
1210 /*
1211 * First, update the plan's qual list if possible. In some cases
1212 * the quals might be enforced below the topmost plan level, in
1213 * which case we'll fail to remove them; it's not worth working
1214 * harder than this.
1215 */
1216 foreach(lc, local_exprs)
1217 {
1218 Node *qual = lfirst(lc);
1219
1220 outer_plan->qual = list_delete(outer_plan->qual, qual);
1221
1222 /*
1223 * For an inner join the local conditions of foreign scan plan
1224 * can be part of the joinquals as well. (They might also be
1225 * in the mergequals or hashquals, but we can't touch those
1226 * without breaking the plan.)
1227 */
1228 if (IsA(outer_plan, NestLoop) ||
1229 IsA(outer_plan, MergeJoin) ||
1230 IsA(outer_plan, HashJoin))
1231 {
1232 Join *join_plan = (Join *) outer_plan;
1233
1234 if (join_plan->jointype == JOIN_INNER)
1235 join_plan->joinqual = list_delete(join_plan->joinqual,
1236 qual);
1237 }
1238 }
1239
1240 /*
1241 * Now fix the subplan's tlist --- this might result in inserting
1242 * a Result node atop the plan tree.
1243 */
1244 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1245 best_path->path.parallel_safe);
1246 }
1247 }
1248
1249 /*
1250 * Build the query string to be sent for execution, and identify
1251 * expressions to be sent as parameters.
1252 */
1253 initStringInfo(&sql);
1254 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1255 remote_exprs, best_path->path.pathkeys,
1256 false, &retrieved_attrs, ¶ms_list);
1257
1258 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1259 fpinfo->final_remote_exprs = remote_exprs;
1260
1261 /*
1262 * Build the fdw_private list that will be available to the executor.
1263 * Items in the list must match order in enum FdwScanPrivateIndex.
1264 */
1265 fdw_private = list_make3(makeString(sql.data),
1266 retrieved_attrs,
1267 makeInteger(fpinfo->fetch_size));
1268 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1269 fdw_private = lappend(fdw_private,
1270 makeString(fpinfo->relation_name->data));
1271
1272 /*
1273 * Create the ForeignScan node for the given relation.
1274 *
1275 * Note that the remote parameter expressions are stored in the fdw_exprs
1276 * field of the finished plan node; we can't keep them in private state
1277 * because then they wouldn't be subject to later planner processing.
1278 */
1279 return make_foreignscan(tlist,
1280 local_exprs,
1281 scan_relid,
1282 params_list,
1283 fdw_private,
1284 fdw_scan_tlist,
1285 fdw_recheck_quals,
1286 outer_plan);
1287 }
1288
1289 /*
1290 * postgresBeginForeignScan
1291 * Initiate an executor scan of a foreign PostgreSQL table.
1292 */
1293 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1294 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1295 {
1296 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1297 EState *estate = node->ss.ps.state;
1298 PgFdwScanState *fsstate;
1299 RangeTblEntry *rte;
1300 Oid userid;
1301 ForeignTable *table;
1302 UserMapping *user;
1303 int rtindex;
1304 int numParams;
1305
1306 /*
1307 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1308 */
1309 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1310 return;
1311
1312 /*
1313 * We'll save private state in node->fdw_state.
1314 */
1315 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1316 node->fdw_state = (void *) fsstate;
1317
1318 /*
1319 * Identify which user to do the remote access as. This should match what
1320 * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1321 * lowest-numbered member RTE as a representative; we would get the same
1322 * result from any.
1323 */
1324 if (fsplan->scan.scanrelid > 0)
1325 rtindex = fsplan->scan.scanrelid;
1326 else
1327 rtindex = bms_next_member(fsplan->fs_relids, -1);
1328 rte = rt_fetch(rtindex, estate->es_range_table);
1329 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1330
1331 /* Get info about foreign table. */
1332 table = GetForeignTable(rte->relid);
1333 user = GetUserMapping(userid, table->serverid);
1334
1335 /*
1336 * Get connection to the foreign server. Connection manager will
1337 * establish new connection if necessary.
1338 */
1339 fsstate->conn = GetConnection(user, false);
1340
1341 /* Assign a unique ID for my cursor */
1342 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1343 fsstate->cursor_exists = false;
1344
1345 /* Get private info created by planner functions. */
1346 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1347 FdwScanPrivateSelectSql));
1348 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1349 FdwScanPrivateRetrievedAttrs);
1350 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1351 FdwScanPrivateFetchSize));
1352
1353 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1354 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1355 "postgres_fdw tuple data",
1356 ALLOCSET_DEFAULT_SIZES);
1357 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1358 "postgres_fdw temporary data",
1359 ALLOCSET_SMALL_SIZES);
1360
1361 /*
1362 * Get info we'll need for converting data fetched from the foreign server
1363 * into local representation and error reporting during that process.
1364 */
1365 if (fsplan->scan.scanrelid > 0)
1366 {
1367 fsstate->rel = node->ss.ss_currentRelation;
1368 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1369 }
1370 else
1371 {
1372 fsstate->rel = NULL;
1373 fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1374 }
1375
1376 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1377
1378 /*
1379 * Prepare for processing of parameters used in remote query, if any.
1380 */
1381 numParams = list_length(fsplan->fdw_exprs);
1382 fsstate->numParams = numParams;
1383 if (numParams > 0)
1384 prepare_query_params((PlanState *) node,
1385 fsplan->fdw_exprs,
1386 numParams,
1387 &fsstate->param_flinfo,
1388 &fsstate->param_exprs,
1389 &fsstate->param_values);
1390 }
1391
1392 /*
1393 * postgresIterateForeignScan
1394 * Retrieve next row from the result set, or clear tuple slot to indicate
1395 * EOF.
1396 */
1397 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1398 postgresIterateForeignScan(ForeignScanState *node)
1399 {
1400 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1401 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1402
1403 /*
1404 * If this is the first call after Begin or ReScan, we need to create the
1405 * cursor on the remote side.
1406 */
1407 if (!fsstate->cursor_exists)
1408 create_cursor(node);
1409
1410 /*
1411 * Get some more tuples, if we've run out.
1412 */
1413 if (fsstate->next_tuple >= fsstate->num_tuples)
1414 {
1415 /* No point in another fetch if we already detected EOF, though. */
1416 if (!fsstate->eof_reached)
1417 fetch_more_data(node);
1418 /* If we didn't get any tuples, must be end of data. */
1419 if (fsstate->next_tuple >= fsstate->num_tuples)
1420 return ExecClearTuple(slot);
1421 }
1422
1423 /*
1424 * Return the next tuple.
1425 */
1426 ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1427 slot,
1428 InvalidBuffer,
1429 false);
1430
1431 return slot;
1432 }
1433
1434 /*
1435 * postgresReScanForeignScan
1436 * Restart the scan.
1437 */
1438 static void
postgresReScanForeignScan(ForeignScanState * node)1439 postgresReScanForeignScan(ForeignScanState *node)
1440 {
1441 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1442 char sql[64];
1443 PGresult *res;
1444
1445 /* If we haven't created the cursor yet, nothing to do. */
1446 if (!fsstate->cursor_exists)
1447 return;
1448
1449 /*
1450 * If any internal parameters affecting this node have changed, we'd
1451 * better destroy and recreate the cursor. Otherwise, rewinding it should
1452 * be good enough. If we've only fetched zero or one batch, we needn't
1453 * even rewind the cursor, just rescan what we have.
1454 */
1455 if (node->ss.ps.chgParam != NULL)
1456 {
1457 fsstate->cursor_exists = false;
1458 snprintf(sql, sizeof(sql), "CLOSE c%u",
1459 fsstate->cursor_number);
1460 }
1461 else if (fsstate->fetch_ct_2 > 1)
1462 {
1463 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1464 fsstate->cursor_number);
1465 }
1466 else
1467 {
1468 /* Easy: just rescan what we already have in memory, if anything */
1469 fsstate->next_tuple = 0;
1470 return;
1471 }
1472
1473 /*
1474 * We don't use a PG_TRY block here, so be careful not to throw error
1475 * without releasing the PGresult.
1476 */
1477 res = pgfdw_exec_query(fsstate->conn, sql);
1478 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1479 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1480 PQclear(res);
1481
1482 /* Now force a fresh FETCH. */
1483 fsstate->tuples = NULL;
1484 fsstate->num_tuples = 0;
1485 fsstate->next_tuple = 0;
1486 fsstate->fetch_ct_2 = 0;
1487 fsstate->eof_reached = false;
1488 }
1489
1490 /*
1491 * postgresEndForeignScan
1492 * Finish scanning foreign table and dispose objects used for this scan
1493 */
1494 static void
postgresEndForeignScan(ForeignScanState * node)1495 postgresEndForeignScan(ForeignScanState *node)
1496 {
1497 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1498
1499 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1500 if (fsstate == NULL)
1501 return;
1502
1503 /* Close the cursor if open, to prevent accumulation of cursors */
1504 if (fsstate->cursor_exists)
1505 close_cursor(fsstate->conn, fsstate->cursor_number);
1506
1507 /* Release remote connection */
1508 ReleaseConnection(fsstate->conn);
1509 fsstate->conn = NULL;
1510
1511 /* MemoryContexts will be deleted automatically. */
1512 }
1513
1514 /*
1515 * postgresAddForeignUpdateTargets
1516 * Add resjunk column(s) needed for update/delete on a foreign table
1517 */
1518 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1519 postgresAddForeignUpdateTargets(Query *parsetree,
1520 RangeTblEntry *target_rte,
1521 Relation target_relation)
1522 {
1523 Var *var;
1524 const char *attrname;
1525 TargetEntry *tle;
1526
1527 /*
1528 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1529 */
1530
1531 /* Make a Var representing the desired value */
1532 var = makeVar(parsetree->resultRelation,
1533 SelfItemPointerAttributeNumber,
1534 TIDOID,
1535 -1,
1536 InvalidOid,
1537 0);
1538
1539 /* Wrap it in a resjunk TLE with the right name ... */
1540 attrname = "ctid";
1541
1542 tle = makeTargetEntry((Expr *) var,
1543 list_length(parsetree->targetList) + 1,
1544 pstrdup(attrname),
1545 true);
1546
1547 /* ... and add it to the query's targetlist */
1548 parsetree->targetList = lappend(parsetree->targetList, tle);
1549 }
1550
1551 /*
1552 * postgresPlanForeignModify
1553 * Plan an insert/update/delete operation on a foreign table
1554 */
1555 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1556 postgresPlanForeignModify(PlannerInfo *root,
1557 ModifyTable *plan,
1558 Index resultRelation,
1559 int subplan_index)
1560 {
1561 CmdType operation = plan->operation;
1562 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1563 Relation rel;
1564 StringInfoData sql;
1565 List *targetAttrs = NIL;
1566 List *returningList = NIL;
1567 List *retrieved_attrs = NIL;
1568 bool doNothing = false;
1569
1570 initStringInfo(&sql);
1571
1572 /*
1573 * Core code already has some lock on each rel being planned, so we can
1574 * use NoLock here.
1575 */
1576 rel = heap_open(rte->relid, NoLock);
1577
1578 /*
1579 * In an INSERT, we transmit all columns that are defined in the foreign
1580 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1581 * foreign table, we transmit all columns like INSERT; else we transmit
1582 * only columns that were explicitly targets of the UPDATE, so as to avoid
1583 * unnecessary data transmission. (We can't do that for INSERT since we
1584 * would miss sending default values for columns not listed in the source
1585 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1586 * those triggers might change values for non-target columns, in which
1587 * case we would miss sending changed values for those columns.)
1588 */
1589 if (operation == CMD_INSERT ||
1590 (operation == CMD_UPDATE &&
1591 rel->trigdesc &&
1592 rel->trigdesc->trig_update_before_row))
1593 {
1594 TupleDesc tupdesc = RelationGetDescr(rel);
1595 int attnum;
1596
1597 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1598 {
1599 Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1600
1601 if (!attr->attisdropped)
1602 targetAttrs = lappend_int(targetAttrs, attnum);
1603 }
1604 }
1605 else if (operation == CMD_UPDATE)
1606 {
1607 int col;
1608
1609 col = -1;
1610 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1611 {
1612 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1613 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1614
1615 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1616 elog(ERROR, "system-column update is not supported");
1617 targetAttrs = lappend_int(targetAttrs, attno);
1618 }
1619 }
1620
1621 /*
1622 * Extract the relevant RETURNING list if any.
1623 */
1624 if (plan->returningLists)
1625 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1626
1627 /*
1628 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1629 * should have already been rejected in the optimizer, as presently there
1630 * is no way to recognize an arbiter index on a foreign table. Only DO
1631 * NOTHING is supported without an inference specification.
1632 */
1633 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1634 doNothing = true;
1635 else if (plan->onConflictAction != ONCONFLICT_NONE)
1636 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1637 (int) plan->onConflictAction);
1638
1639 /*
1640 * Construct the SQL command string.
1641 */
1642 switch (operation)
1643 {
1644 case CMD_INSERT:
1645 deparseInsertSql(&sql, root, resultRelation, rel,
1646 targetAttrs, doNothing, returningList,
1647 &retrieved_attrs);
1648 break;
1649 case CMD_UPDATE:
1650 deparseUpdateSql(&sql, root, resultRelation, rel,
1651 targetAttrs, returningList,
1652 &retrieved_attrs);
1653 break;
1654 case CMD_DELETE:
1655 deparseDeleteSql(&sql, root, resultRelation, rel,
1656 returningList,
1657 &retrieved_attrs);
1658 break;
1659 default:
1660 elog(ERROR, "unexpected operation: %d", (int) operation);
1661 break;
1662 }
1663
1664 heap_close(rel, NoLock);
1665
1666 /*
1667 * Build the fdw_private list that will be available to the executor.
1668 * Items in the list must match enum FdwModifyPrivateIndex, above.
1669 */
1670 return list_make4(makeString(sql.data),
1671 targetAttrs,
1672 makeInteger((retrieved_attrs != NIL)),
1673 retrieved_attrs);
1674 }
1675
1676 /*
1677 * postgresBeginForeignModify
1678 * Begin an insert/update/delete operation on a foreign table
1679 */
1680 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1681 postgresBeginForeignModify(ModifyTableState *mtstate,
1682 ResultRelInfo *resultRelInfo,
1683 List *fdw_private,
1684 int subplan_index,
1685 int eflags)
1686 {
1687 PgFdwModifyState *fmstate;
1688 EState *estate = mtstate->ps.state;
1689 CmdType operation = mtstate->operation;
1690 Relation rel = resultRelInfo->ri_RelationDesc;
1691 RangeTblEntry *rte;
1692 Oid userid;
1693 ForeignTable *table;
1694 UserMapping *user;
1695 AttrNumber n_params;
1696 Oid typefnoid;
1697 bool isvarlena;
1698 ListCell *lc;
1699
1700 /*
1701 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1702 * stays NULL.
1703 */
1704 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1705 return;
1706
1707 /* Begin constructing PgFdwModifyState. */
1708 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1709 fmstate->rel = rel;
1710
1711 /*
1712 * Identify which user to do the remote access as. This should match what
1713 * ExecCheckRTEPerms() does.
1714 */
1715 rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1716 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1717
1718 /* Get info about foreign table. */
1719 table = GetForeignTable(RelationGetRelid(rel));
1720 user = GetUserMapping(userid, table->serverid);
1721
1722 /* Open connection; report that we'll create a prepared statement. */
1723 fmstate->conn = GetConnection(user, true);
1724 fmstate->p_name = NULL; /* prepared statement not made yet */
1725
1726 /* Deconstruct fdw_private data. */
1727 fmstate->query = strVal(list_nth(fdw_private,
1728 FdwModifyPrivateUpdateSql));
1729 fmstate->target_attrs = (List *) list_nth(fdw_private,
1730 FdwModifyPrivateTargetAttnums);
1731 fmstate->has_returning = intVal(list_nth(fdw_private,
1732 FdwModifyPrivateHasReturning));
1733 fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1734 FdwModifyPrivateRetrievedAttrs);
1735
1736 /* Create context for per-tuple temp workspace. */
1737 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1738 "postgres_fdw temporary data",
1739 ALLOCSET_SMALL_SIZES);
1740
1741 /* Prepare for input conversion of RETURNING results. */
1742 if (fmstate->has_returning)
1743 fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
1744
1745 /* Prepare for output conversion of parameters used in prepared stmt. */
1746 n_params = list_length(fmstate->target_attrs) + 1;
1747 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1748 fmstate->p_nums = 0;
1749
1750 if (operation == CMD_UPDATE || operation == CMD_DELETE)
1751 {
1752 /* Find the ctid resjunk column in the subplan's result */
1753 Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1754
1755 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1756 "ctid");
1757 if (!AttributeNumberIsValid(fmstate->ctidAttno))
1758 elog(ERROR, "could not find junk ctid column");
1759
1760 /* First transmittable parameter will be ctid */
1761 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1762 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1763 fmstate->p_nums++;
1764 }
1765
1766 if (operation == CMD_INSERT || operation == CMD_UPDATE)
1767 {
1768 /* Set up for remaining transmittable parameters */
1769 foreach(lc, fmstate->target_attrs)
1770 {
1771 int attnum = lfirst_int(lc);
1772 Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1773
1774 Assert(!attr->attisdropped);
1775
1776 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1777 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1778 fmstate->p_nums++;
1779 }
1780 }
1781
1782 Assert(fmstate->p_nums <= n_params);
1783
1784 resultRelInfo->ri_FdwState = fmstate;
1785 }
1786
1787 /*
1788 * postgresExecForeignInsert
1789 * Insert one row into a foreign table
1790 */
1791 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1792 postgresExecForeignInsert(EState *estate,
1793 ResultRelInfo *resultRelInfo,
1794 TupleTableSlot *slot,
1795 TupleTableSlot *planSlot)
1796 {
1797 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1798 const char **p_values;
1799 PGresult *res;
1800 int n_rows;
1801
1802 /* Set up the prepared statement on the remote server, if we didn't yet */
1803 if (!fmstate->p_name)
1804 prepare_foreign_modify(fmstate);
1805
1806 /* Convert parameters needed by prepared statement to text form */
1807 p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1808
1809 /*
1810 * Execute the prepared statement.
1811 */
1812 if (!PQsendQueryPrepared(fmstate->conn,
1813 fmstate->p_name,
1814 fmstate->p_nums,
1815 p_values,
1816 NULL,
1817 NULL,
1818 0))
1819 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1820
1821 /*
1822 * Get the result, and check for success.
1823 *
1824 * We don't use a PG_TRY block here, so be careful not to throw error
1825 * without releasing the PGresult.
1826 */
1827 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1828 if (PQresultStatus(res) !=
1829 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1830 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1831
1832 /* Check number of rows affected, and fetch RETURNING tuple if any */
1833 if (fmstate->has_returning)
1834 {
1835 n_rows = PQntuples(res);
1836 if (n_rows > 0)
1837 store_returning_result(fmstate, slot, res);
1838 }
1839 else
1840 n_rows = atoi(PQcmdTuples(res));
1841
1842 /* And clean up */
1843 PQclear(res);
1844
1845 MemoryContextReset(fmstate->temp_cxt);
1846
1847 /* Return NULL if nothing was inserted on the remote end */
1848 return (n_rows > 0) ? slot : NULL;
1849 }
1850
1851 /*
1852 * postgresExecForeignUpdate
1853 * Update one row in a foreign table
1854 */
1855 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1856 postgresExecForeignUpdate(EState *estate,
1857 ResultRelInfo *resultRelInfo,
1858 TupleTableSlot *slot,
1859 TupleTableSlot *planSlot)
1860 {
1861 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1862 Datum datum;
1863 bool isNull;
1864 const char **p_values;
1865 PGresult *res;
1866 int n_rows;
1867
1868 /* Set up the prepared statement on the remote server, if we didn't yet */
1869 if (!fmstate->p_name)
1870 prepare_foreign_modify(fmstate);
1871
1872 /* Get the ctid that was passed up as a resjunk column */
1873 datum = ExecGetJunkAttribute(planSlot,
1874 fmstate->ctidAttno,
1875 &isNull);
1876 /* shouldn't ever get a null result... */
1877 if (isNull)
1878 elog(ERROR, "ctid is NULL");
1879
1880 /* Convert parameters needed by prepared statement to text form */
1881 p_values = convert_prep_stmt_params(fmstate,
1882 (ItemPointer) DatumGetPointer(datum),
1883 slot);
1884
1885 /*
1886 * Execute the prepared statement.
1887 */
1888 if (!PQsendQueryPrepared(fmstate->conn,
1889 fmstate->p_name,
1890 fmstate->p_nums,
1891 p_values,
1892 NULL,
1893 NULL,
1894 0))
1895 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1896
1897 /*
1898 * Get the result, and check for success.
1899 *
1900 * We don't use a PG_TRY block here, so be careful not to throw error
1901 * without releasing the PGresult.
1902 */
1903 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1904 if (PQresultStatus(res) !=
1905 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1906 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1907
1908 /* Check number of rows affected, and fetch RETURNING tuple if any */
1909 if (fmstate->has_returning)
1910 {
1911 n_rows = PQntuples(res);
1912 if (n_rows > 0)
1913 store_returning_result(fmstate, slot, res);
1914 }
1915 else
1916 n_rows = atoi(PQcmdTuples(res));
1917
1918 /* And clean up */
1919 PQclear(res);
1920
1921 MemoryContextReset(fmstate->temp_cxt);
1922
1923 /* Return NULL if nothing was updated on the remote end */
1924 return (n_rows > 0) ? slot : NULL;
1925 }
1926
1927 /*
1928 * postgresExecForeignDelete
1929 * Delete one row from a foreign table
1930 */
1931 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1932 postgresExecForeignDelete(EState *estate,
1933 ResultRelInfo *resultRelInfo,
1934 TupleTableSlot *slot,
1935 TupleTableSlot *planSlot)
1936 {
1937 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1938 Datum datum;
1939 bool isNull;
1940 const char **p_values;
1941 PGresult *res;
1942 int n_rows;
1943
1944 /* Set up the prepared statement on the remote server, if we didn't yet */
1945 if (!fmstate->p_name)
1946 prepare_foreign_modify(fmstate);
1947
1948 /* Get the ctid that was passed up as a resjunk column */
1949 datum = ExecGetJunkAttribute(planSlot,
1950 fmstate->ctidAttno,
1951 &isNull);
1952 /* shouldn't ever get a null result... */
1953 if (isNull)
1954 elog(ERROR, "ctid is NULL");
1955
1956 /* Convert parameters needed by prepared statement to text form */
1957 p_values = convert_prep_stmt_params(fmstate,
1958 (ItemPointer) DatumGetPointer(datum),
1959 NULL);
1960
1961 /*
1962 * Execute the prepared statement.
1963 */
1964 if (!PQsendQueryPrepared(fmstate->conn,
1965 fmstate->p_name,
1966 fmstate->p_nums,
1967 p_values,
1968 NULL,
1969 NULL,
1970 0))
1971 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1972
1973 /*
1974 * Get the result, and check for success.
1975 *
1976 * We don't use a PG_TRY block here, so be careful not to throw error
1977 * without releasing the PGresult.
1978 */
1979 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1980 if (PQresultStatus(res) !=
1981 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1982 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1983
1984 /* Check number of rows affected, and fetch RETURNING tuple if any */
1985 if (fmstate->has_returning)
1986 {
1987 n_rows = PQntuples(res);
1988 if (n_rows > 0)
1989 store_returning_result(fmstate, slot, res);
1990 }
1991 else
1992 n_rows = atoi(PQcmdTuples(res));
1993
1994 /* And clean up */
1995 PQclear(res);
1996
1997 MemoryContextReset(fmstate->temp_cxt);
1998
1999 /* Return NULL if nothing was deleted on the remote end */
2000 return (n_rows > 0) ? slot : NULL;
2001 }
2002
2003 /*
2004 * postgresEndForeignModify
2005 * Finish an insert/update/delete operation on a foreign table
2006 */
2007 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)2008 postgresEndForeignModify(EState *estate,
2009 ResultRelInfo *resultRelInfo)
2010 {
2011 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2012
2013 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2014 if (fmstate == NULL)
2015 return;
2016
2017 /* If we created a prepared statement, destroy it */
2018 if (fmstate->p_name)
2019 {
2020 char sql[64];
2021 PGresult *res;
2022
2023 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2024
2025 /*
2026 * We don't use a PG_TRY block here, so be careful not to throw error
2027 * without releasing the PGresult.
2028 */
2029 res = pgfdw_exec_query(fmstate->conn, sql);
2030 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2031 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2032 PQclear(res);
2033 fmstate->p_name = NULL;
2034 }
2035
2036 /* Release remote connection */
2037 ReleaseConnection(fmstate->conn);
2038 fmstate->conn = NULL;
2039 }
2040
2041 /*
2042 * postgresIsForeignRelUpdatable
2043 * Determine whether a foreign table supports INSERT, UPDATE and/or
2044 * DELETE.
2045 */
2046 static int
postgresIsForeignRelUpdatable(Relation rel)2047 postgresIsForeignRelUpdatable(Relation rel)
2048 {
2049 bool updatable;
2050 ForeignTable *table;
2051 ForeignServer *server;
2052 ListCell *lc;
2053
2054 /*
2055 * By default, all postgres_fdw foreign tables are assumed updatable. This
2056 * can be overridden by a per-server setting, which in turn can be
2057 * overridden by a per-table setting.
2058 */
2059 updatable = true;
2060
2061 table = GetForeignTable(RelationGetRelid(rel));
2062 server = GetForeignServer(table->serverid);
2063
2064 foreach(lc, server->options)
2065 {
2066 DefElem *def = (DefElem *) lfirst(lc);
2067
2068 if (strcmp(def->defname, "updatable") == 0)
2069 updatable = defGetBoolean(def);
2070 }
2071 foreach(lc, table->options)
2072 {
2073 DefElem *def = (DefElem *) lfirst(lc);
2074
2075 if (strcmp(def->defname, "updatable") == 0)
2076 updatable = defGetBoolean(def);
2077 }
2078
2079 /*
2080 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2081 */
2082 return updatable ?
2083 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2084 }
2085
2086 /*
2087 * postgresRecheckForeignScan
2088 * Execute a local join execution plan for a foreign join
2089 */
2090 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2091 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2092 {
2093 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2094 PlanState *outerPlan = outerPlanState(node);
2095 TupleTableSlot *result;
2096
2097 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2098 if (scanrelid > 0)
2099 return true;
2100
2101 Assert(outerPlan != NULL);
2102
2103 /* Execute a local join execution plan */
2104 result = ExecProcNode(outerPlan);
2105 if (TupIsNull(result))
2106 return false;
2107
2108 /* Store result in the given slot */
2109 ExecCopySlot(slot, result);
2110
2111 return true;
2112 }
2113
2114 /*
2115 * postgresPlanDirectModify
2116 * Consider a direct foreign table modification
2117 *
2118 * Decide whether it is safe to modify a foreign table directly, and if so,
2119 * rewrite subplan accordingly.
2120 */
2121 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2122 postgresPlanDirectModify(PlannerInfo *root,
2123 ModifyTable *plan,
2124 Index resultRelation,
2125 int subplan_index)
2126 {
2127 CmdType operation = plan->operation;
2128 Plan *subplan;
2129 RelOptInfo *foreignrel;
2130 RangeTblEntry *rte;
2131 PgFdwRelationInfo *fpinfo;
2132 Relation rel;
2133 StringInfoData sql;
2134 ForeignScan *fscan;
2135 List *targetAttrs = NIL;
2136 List *remote_exprs;
2137 List *params_list = NIL;
2138 List *returningList = NIL;
2139 List *retrieved_attrs = NIL;
2140
2141 /*
2142 * Decide whether it is safe to modify a foreign table directly.
2143 */
2144
2145 /*
2146 * The table modification must be an UPDATE or DELETE.
2147 */
2148 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2149 return false;
2150
2151 /*
2152 * It's unsafe to modify a foreign table directly if there are any local
2153 * joins needed.
2154 */
2155 subplan = (Plan *) list_nth(plan->plans, subplan_index);
2156 if (!IsA(subplan, ForeignScan))
2157 return false;
2158 fscan = (ForeignScan *) subplan;
2159
2160 /*
2161 * It's unsafe to modify a foreign table directly if there are any quals
2162 * that should be evaluated locally.
2163 */
2164 if (subplan->qual != NIL)
2165 return false;
2166
2167 /*
2168 * We can't handle an UPDATE or DELETE on a foreign join for now.
2169 */
2170 if (fscan->scan.scanrelid == 0)
2171 return false;
2172
2173 /* Safe to fetch data about the target foreign rel */
2174 foreignrel = root->simple_rel_array[resultRelation];
2175 rte = root->simple_rte_array[resultRelation];
2176 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2177
2178 /*
2179 * It's unsafe to update a foreign table directly, if any expressions to
2180 * assign to the target columns are unsafe to evaluate remotely.
2181 */
2182 if (operation == CMD_UPDATE)
2183 {
2184 int col;
2185
2186 /*
2187 * We transmit only columns that were explicitly targets of the
2188 * UPDATE, so as to avoid unnecessary data transmission.
2189 */
2190 col = -1;
2191 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2192 {
2193 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2194 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
2195 TargetEntry *tle;
2196
2197 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2198 elog(ERROR, "system-column update is not supported");
2199
2200 tle = get_tle_by_resno(subplan->targetlist, attno);
2201
2202 if (!tle)
2203 elog(ERROR, "attribute number %d not found in subplan targetlist",
2204 attno);
2205
2206 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2207 return false;
2208
2209 targetAttrs = lappend_int(targetAttrs, attno);
2210 }
2211 }
2212
2213 /*
2214 * Ok, rewrite subplan so as to modify the foreign table directly.
2215 */
2216 initStringInfo(&sql);
2217
2218 /*
2219 * Core code already has some lock on each rel being planned, so we can
2220 * use NoLock here.
2221 */
2222 rel = heap_open(rte->relid, NoLock);
2223
2224 /*
2225 * Recall the qual clauses that must be evaluated remotely. (These are
2226 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2227 * doesn't care.)
2228 */
2229 remote_exprs = fpinfo->final_remote_exprs;
2230
2231 /*
2232 * Extract the relevant RETURNING list if any.
2233 */
2234 if (plan->returningLists)
2235 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2236
2237 /*
2238 * Construct the SQL command string.
2239 */
2240 switch (operation)
2241 {
2242 case CMD_UPDATE:
2243 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2244 ((Plan *) fscan)->targetlist,
2245 targetAttrs,
2246 remote_exprs, ¶ms_list,
2247 returningList, &retrieved_attrs);
2248 break;
2249 case CMD_DELETE:
2250 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2251 remote_exprs, ¶ms_list,
2252 returningList, &retrieved_attrs);
2253 break;
2254 default:
2255 elog(ERROR, "unexpected operation: %d", (int) operation);
2256 break;
2257 }
2258
2259 /*
2260 * Update the operation info.
2261 */
2262 fscan->operation = operation;
2263
2264 /*
2265 * Update the fdw_exprs list that will be available to the executor.
2266 */
2267 fscan->fdw_exprs = params_list;
2268
2269 /*
2270 * Update the fdw_private list that will be available to the executor.
2271 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2272 */
2273 fscan->fdw_private = list_make4(makeString(sql.data),
2274 makeInteger((retrieved_attrs != NIL)),
2275 retrieved_attrs,
2276 makeInteger(plan->canSetTag));
2277
2278 heap_close(rel, NoLock);
2279 return true;
2280 }
2281
2282 /*
2283 * postgresBeginDirectModify
2284 * Prepare a direct foreign table modification
2285 */
2286 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2287 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2288 {
2289 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2290 EState *estate = node->ss.ps.state;
2291 PgFdwDirectModifyState *dmstate;
2292 RangeTblEntry *rte;
2293 Oid userid;
2294 ForeignTable *table;
2295 UserMapping *user;
2296 int numParams;
2297
2298 /*
2299 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2300 */
2301 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2302 return;
2303
2304 /*
2305 * We'll save private state in node->fdw_state.
2306 */
2307 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2308 node->fdw_state = (void *) dmstate;
2309
2310 /*
2311 * Identify which user to do the remote access as. This should match what
2312 * ExecCheckRTEPerms() does.
2313 */
2314 rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2315 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2316
2317 /* Get info about foreign table. */
2318 dmstate->rel = node->ss.ss_currentRelation;
2319 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2320 user = GetUserMapping(userid, table->serverid);
2321
2322 /*
2323 * Get connection to the foreign server. Connection manager will
2324 * establish new connection if necessary.
2325 */
2326 dmstate->conn = GetConnection(user, false);
2327
2328 /* Initialize state variable */
2329 dmstate->num_tuples = -1; /* -1 means not set yet */
2330
2331 /* Get private info created by planner functions. */
2332 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2333 FdwDirectModifyPrivateUpdateSql));
2334 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2335 FdwDirectModifyPrivateHasReturning));
2336 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2337 FdwDirectModifyPrivateRetrievedAttrs);
2338 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2339 FdwDirectModifyPrivateSetProcessed));
2340
2341 /* Create context for per-tuple temp workspace. */
2342 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2343 "postgres_fdw temporary data",
2344 ALLOCSET_SMALL_SIZES);
2345
2346 /* Prepare for input conversion of RETURNING results. */
2347 if (dmstate->has_returning)
2348 dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2349
2350 /*
2351 * Prepare for processing of parameters used in remote query, if any.
2352 */
2353 numParams = list_length(fsplan->fdw_exprs);
2354 dmstate->numParams = numParams;
2355 if (numParams > 0)
2356 prepare_query_params((PlanState *) node,
2357 fsplan->fdw_exprs,
2358 numParams,
2359 &dmstate->param_flinfo,
2360 &dmstate->param_exprs,
2361 &dmstate->param_values);
2362 }
2363
2364 /*
2365 * postgresIterateDirectModify
2366 * Execute a direct foreign table modification
2367 */
2368 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2369 postgresIterateDirectModify(ForeignScanState *node)
2370 {
2371 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2372 EState *estate = node->ss.ps.state;
2373 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2374
2375 /*
2376 * If this is the first call after Begin, execute the statement.
2377 */
2378 if (dmstate->num_tuples == -1)
2379 execute_dml_stmt(node);
2380
2381 /*
2382 * If the local query doesn't specify RETURNING, just clear tuple slot.
2383 */
2384 if (!resultRelInfo->ri_projectReturning)
2385 {
2386 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2387 Instrumentation *instr = node->ss.ps.instrument;
2388
2389 Assert(!dmstate->has_returning);
2390
2391 /* Increment the command es_processed count if necessary. */
2392 if (dmstate->set_processed)
2393 estate->es_processed += dmstate->num_tuples;
2394
2395 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2396 if (instr)
2397 instr->tuplecount += dmstate->num_tuples;
2398
2399 return ExecClearTuple(slot);
2400 }
2401
2402 /*
2403 * Get the next RETURNING tuple.
2404 */
2405 return get_returning_data(node);
2406 }
2407
2408 /*
2409 * postgresEndDirectModify
2410 * Finish a direct foreign table modification
2411 */
2412 static void
postgresEndDirectModify(ForeignScanState * node)2413 postgresEndDirectModify(ForeignScanState *node)
2414 {
2415 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2416
2417 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2418 if (dmstate == NULL)
2419 return;
2420
2421 /* Release PGresult */
2422 if (dmstate->result)
2423 PQclear(dmstate->result);
2424
2425 /* Release remote connection */
2426 ReleaseConnection(dmstate->conn);
2427 dmstate->conn = NULL;
2428
2429 /* MemoryContext will be deleted automatically. */
2430 }
2431
2432 /*
2433 * postgresExplainForeignScan
2434 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2435 */
2436 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2437 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2438 {
2439 List *fdw_private;
2440 char *sql;
2441 char *relations;
2442
2443 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2444
2445 /*
2446 * Add names of relation handled by the foreign scan when the scan is a
2447 * join
2448 */
2449 if (list_length(fdw_private) > FdwScanPrivateRelations)
2450 {
2451 relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2452 ExplainPropertyText("Relations", relations, es);
2453 }
2454
2455 /*
2456 * Add remote query, when VERBOSE option is specified.
2457 */
2458 if (es->verbose)
2459 {
2460 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2461 ExplainPropertyText("Remote SQL", sql, es);
2462 }
2463 }
2464
2465 /*
2466 * postgresExplainForeignModify
2467 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2468 */
2469 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2470 postgresExplainForeignModify(ModifyTableState *mtstate,
2471 ResultRelInfo *rinfo,
2472 List *fdw_private,
2473 int subplan_index,
2474 ExplainState *es)
2475 {
2476 if (es->verbose)
2477 {
2478 char *sql = strVal(list_nth(fdw_private,
2479 FdwModifyPrivateUpdateSql));
2480
2481 ExplainPropertyText("Remote SQL", sql, es);
2482 }
2483 }
2484
2485 /*
2486 * postgresExplainDirectModify
2487 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2488 * foreign table directly
2489 */
2490 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2491 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2492 {
2493 List *fdw_private;
2494 char *sql;
2495
2496 if (es->verbose)
2497 {
2498 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2499 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2500 ExplainPropertyText("Remote SQL", sql, es);
2501 }
2502 }
2503
2504
2505 /*
2506 * estimate_path_cost_size
2507 * Get cost and size estimates for a foreign scan on given foreign relation
2508 * either a base relation or a join between foreign relations or an upper
2509 * relation containing foreign relations.
2510 *
2511 * param_join_conds are the parameterization clauses with outer relations.
2512 * pathkeys specify the expected sort order if any for given path being costed.
2513 *
2514 * The function returns the cost and size estimates in p_rows, p_width,
2515 * p_startup_cost and p_total_cost variables.
2516 */
2517 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2518 estimate_path_cost_size(PlannerInfo *root,
2519 RelOptInfo *foreignrel,
2520 List *param_join_conds,
2521 List *pathkeys,
2522 double *p_rows, int *p_width,
2523 Cost *p_startup_cost, Cost *p_total_cost)
2524 {
2525 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2526 double rows;
2527 double retrieved_rows;
2528 int width;
2529 Cost startup_cost;
2530 Cost total_cost;
2531 Cost cpu_per_tuple;
2532
2533 /*
2534 * If the table or the server is configured to use remote estimates,
2535 * connect to the foreign server and execute EXPLAIN to estimate the
2536 * number of rows selected by the restriction+join clauses. Otherwise,
2537 * estimate rows using whatever statistics we have locally, in a way
2538 * similar to ordinary tables.
2539 */
2540 if (fpinfo->use_remote_estimate)
2541 {
2542 List *remote_param_join_conds;
2543 List *local_param_join_conds;
2544 StringInfoData sql;
2545 PGconn *conn;
2546 Selectivity local_sel;
2547 QualCost local_cost;
2548 List *fdw_scan_tlist = NIL;
2549 List *remote_conds;
2550
2551 /* Required only to be passed to deparseSelectStmtForRel */
2552 List *retrieved_attrs;
2553
2554 /*
2555 * param_join_conds might contain both clauses that are safe to send
2556 * across, and clauses that aren't.
2557 */
2558 classifyConditions(root, foreignrel, param_join_conds,
2559 &remote_param_join_conds, &local_param_join_conds);
2560
2561 /* Build the list of columns to be fetched from the foreign server. */
2562 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2563 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2564 else
2565 fdw_scan_tlist = NIL;
2566
2567 /*
2568 * The complete list of remote conditions includes everything from
2569 * baserestrictinfo plus any extra join_conds relevant to this
2570 * particular path.
2571 */
2572 remote_conds = list_concat(list_copy(remote_param_join_conds),
2573 fpinfo->remote_conds);
2574
2575 /*
2576 * Construct EXPLAIN query including the desired SELECT, FROM, and
2577 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2578 * values, so don't request params_list.
2579 */
2580 initStringInfo(&sql);
2581 appendStringInfoString(&sql, "EXPLAIN ");
2582 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2583 remote_conds, pathkeys, false,
2584 &retrieved_attrs, NULL);
2585
2586 /* Get the remote estimate */
2587 conn = GetConnection(fpinfo->user, false);
2588 get_remote_estimate(sql.data, conn, &rows, &width,
2589 &startup_cost, &total_cost);
2590 ReleaseConnection(conn);
2591
2592 retrieved_rows = rows;
2593
2594 /* Factor in the selectivity of the locally-checked quals */
2595 local_sel = clauselist_selectivity(root,
2596 local_param_join_conds,
2597 foreignrel->relid,
2598 JOIN_INNER,
2599 NULL);
2600 local_sel *= fpinfo->local_conds_sel;
2601
2602 rows = clamp_row_est(rows * local_sel);
2603
2604 /* Add in the eval cost of the locally-checked quals */
2605 startup_cost += fpinfo->local_conds_cost.startup;
2606 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2607 cost_qual_eval(&local_cost, local_param_join_conds, root);
2608 startup_cost += local_cost.startup;
2609 total_cost += local_cost.per_tuple * retrieved_rows;
2610 }
2611 else
2612 {
2613 Cost run_cost = 0;
2614
2615 /*
2616 * We don't support join conditions in this mode (hence, no
2617 * parameterized paths can be made).
2618 */
2619 Assert(param_join_conds == NIL);
2620
2621 /*
2622 * Use rows/width estimates made by set_baserel_size_estimates() for
2623 * base foreign relations and set_joinrel_size_estimates() for join
2624 * between foreign relations.
2625 */
2626 rows = foreignrel->rows;
2627 width = foreignrel->reltarget->width;
2628
2629 /* Back into an estimate of the number of retrieved rows. */
2630 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2631
2632 /*
2633 * We will come here again and again with different set of pathkeys
2634 * that caller wants to cost. We don't need to calculate the cost of
2635 * bare scan each time. Instead, use the costs if we have cached them
2636 * already.
2637 */
2638 if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2639 {
2640 startup_cost = fpinfo->rel_startup_cost;
2641 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2642 }
2643 else if (IS_JOIN_REL(foreignrel))
2644 {
2645 PgFdwRelationInfo *fpinfo_i;
2646 PgFdwRelationInfo *fpinfo_o;
2647 QualCost join_cost;
2648 QualCost remote_conds_cost;
2649 double nrows;
2650
2651 /* For join we expect inner and outer relations set */
2652 Assert(fpinfo->innerrel && fpinfo->outerrel);
2653
2654 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2655 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2656
2657 /* Estimate of number of rows in cross product */
2658 nrows = fpinfo_i->rows * fpinfo_o->rows;
2659 /* Clamp retrieved rows estimate to at most size of cross product */
2660 retrieved_rows = Min(retrieved_rows, nrows);
2661
2662 /*
2663 * The cost of foreign join is estimated as cost of generating
2664 * rows for the joining relations + cost for applying quals on the
2665 * rows.
2666 */
2667
2668 /*
2669 * Calculate the cost of clauses pushed down to the foreign server
2670 */
2671 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2672 /* Calculate the cost of applying join clauses */
2673 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2674
2675 /*
2676 * Startup cost includes startup cost of joining relations and the
2677 * startup cost for join and other clauses. We do not include the
2678 * startup cost specific to join strategy (e.g. setting up hash
2679 * tables) since we do not know what strategy the foreign server
2680 * is going to use.
2681 */
2682 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2683 startup_cost += join_cost.startup;
2684 startup_cost += remote_conds_cost.startup;
2685 startup_cost += fpinfo->local_conds_cost.startup;
2686
2687 /*
2688 * Run time cost includes:
2689 *
2690 * 1. Run time cost (total_cost - startup_cost) of relations being
2691 * joined
2692 *
2693 * 2. Run time cost of applying join clauses on the cross product
2694 * of the joining relations.
2695 *
2696 * 3. Run time cost of applying pushed down other clauses on the
2697 * result of join
2698 *
2699 * 4. Run time cost of applying nonpushable other clauses locally
2700 * on the result fetched from the foreign server.
2701 */
2702 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2703 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2704 run_cost += nrows * join_cost.per_tuple;
2705 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2706 run_cost += nrows * remote_conds_cost.per_tuple;
2707 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2708 }
2709 else if (IS_UPPER_REL(foreignrel))
2710 {
2711 PgFdwRelationInfo *ofpinfo;
2712 PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2713 AggClauseCosts aggcosts;
2714 double input_rows;
2715 int numGroupCols;
2716 double numGroups = 1;
2717
2718 /*
2719 * This cost model is mixture of costing done for sorted and
2720 * hashed aggregates in cost_agg(). We are not sure which
2721 * strategy will be considered at remote side, thus for
2722 * simplicity, we put all startup related costs in startup_cost
2723 * and all finalization and run cost are added in total_cost.
2724 *
2725 * Also, core does not care about costing HAVING expressions and
2726 * adding that to the costs. So similarly, here too we are not
2727 * considering remote and local conditions for costing.
2728 */
2729
2730 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2731
2732 /* Get rows and width from input rel */
2733 input_rows = ofpinfo->rows;
2734 width = ofpinfo->width;
2735
2736 /* Collect statistics about aggregates for estimating costs. */
2737 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2738 if (root->parse->hasAggs)
2739 {
2740 get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2741 AGGSPLIT_SIMPLE, &aggcosts);
2742 get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2743 AGGSPLIT_SIMPLE, &aggcosts);
2744 }
2745
2746 /* Get number of grouping columns and possible number of groups */
2747 numGroupCols = list_length(root->parse->groupClause);
2748 numGroups = estimate_num_groups(root,
2749 get_sortgrouplist_exprs(root->parse->groupClause,
2750 fpinfo->grouped_tlist),
2751 input_rows, NULL);
2752
2753 /*
2754 * Number of rows expected from foreign server will be same as
2755 * that of number of groups.
2756 */
2757 rows = retrieved_rows = numGroups;
2758
2759 /*-----
2760 * Startup cost includes:
2761 * 1. Startup cost for underneath input relation
2762 * 2. Cost of performing aggregation, per cost_agg()
2763 * 3. Startup cost for PathTarget eval
2764 *-----
2765 */
2766 startup_cost = ofpinfo->rel_startup_cost;
2767 startup_cost += aggcosts.transCost.startup;
2768 startup_cost += aggcosts.transCost.per_tuple * input_rows;
2769 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2770 startup_cost += ptarget->cost.startup;
2771
2772 /*-----
2773 * Run time cost includes:
2774 * 1. Run time cost of underneath input relation
2775 * 2. Run time cost of performing aggregation, per cost_agg()
2776 * 3. PathTarget eval cost for each output row
2777 *-----
2778 */
2779 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2780 run_cost += aggcosts.finalCost * numGroups;
2781 run_cost += cpu_tuple_cost * numGroups;
2782 run_cost += ptarget->cost.per_tuple * numGroups;
2783 }
2784 else
2785 {
2786 /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2787 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2788
2789 /*
2790 * Cost as though this were a seqscan, which is pessimistic. We
2791 * effectively imagine the local_conds are being evaluated
2792 * remotely, too.
2793 */
2794 startup_cost = 0;
2795 run_cost = 0;
2796 run_cost += seq_page_cost * foreignrel->pages;
2797
2798 startup_cost += foreignrel->baserestrictcost.startup;
2799 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2800 run_cost += cpu_per_tuple * foreignrel->tuples;
2801 }
2802
2803 /*
2804 * Without remote estimates, we have no real way to estimate the cost
2805 * of generating sorted output. It could be free if the query plan
2806 * the remote side would have chosen generates properly-sorted output
2807 * anyway, but in most cases it will cost something. Estimate a value
2808 * high enough that we won't pick the sorted path when the ordering
2809 * isn't locally useful, but low enough that we'll err on the side of
2810 * pushing down the ORDER BY clause when it's useful to do so.
2811 */
2812 if (pathkeys != NIL)
2813 {
2814 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2815 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2816 }
2817
2818 total_cost = startup_cost + run_cost;
2819 }
2820
2821 /*
2822 * Cache the costs for scans without any pathkeys or parameterization
2823 * before adding the costs for transferring data from the foreign server.
2824 * These costs are useful for costing the join between this relation and
2825 * another foreign relation or to calculate the costs of paths with
2826 * pathkeys for this relation, when the costs can not be obtained from the
2827 * foreign server. This function will be called at least once for every
2828 * foreign relation without pathkeys and parameterization.
2829 */
2830 if (pathkeys == NIL && param_join_conds == NIL)
2831 {
2832 fpinfo->rel_startup_cost = startup_cost;
2833 fpinfo->rel_total_cost = total_cost;
2834 }
2835
2836 /*
2837 * Add some additional cost factors to account for connection overhead
2838 * (fdw_startup_cost), transferring data across the network
2839 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2840 * (cpu_tuple_cost per retrieved row).
2841 */
2842 startup_cost += fpinfo->fdw_startup_cost;
2843 total_cost += fpinfo->fdw_startup_cost;
2844 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2845 total_cost += cpu_tuple_cost * retrieved_rows;
2846
2847 /* Return results. */
2848 *p_rows = rows;
2849 *p_width = width;
2850 *p_startup_cost = startup_cost;
2851 *p_total_cost = total_cost;
2852 }
2853
2854 /*
2855 * Estimate costs of executing a SQL statement remotely.
2856 * The given "sql" must be an EXPLAIN command.
2857 */
2858 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)2859 get_remote_estimate(const char *sql, PGconn *conn,
2860 double *rows, int *width,
2861 Cost *startup_cost, Cost *total_cost)
2862 {
2863 PGresult *volatile res = NULL;
2864
2865 /* PGresult must be released before leaving this function. */
2866 PG_TRY();
2867 {
2868 char *line;
2869 char *p;
2870 int n;
2871
2872 /*
2873 * Execute EXPLAIN remotely.
2874 */
2875 res = pgfdw_exec_query(conn, sql);
2876 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2877 pgfdw_report_error(ERROR, res, conn, false, sql);
2878
2879 /*
2880 * Extract cost numbers for topmost plan node. Note we search for a
2881 * left paren from the end of the line to avoid being confused by
2882 * other uses of parentheses.
2883 */
2884 line = PQgetvalue(res, 0, 0);
2885 p = strrchr(line, '(');
2886 if (p == NULL)
2887 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2888 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2889 startup_cost, total_cost, rows, width);
2890 if (n != 4)
2891 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2892
2893 PQclear(res);
2894 res = NULL;
2895 }
2896 PG_CATCH();
2897 {
2898 if (res)
2899 PQclear(res);
2900 PG_RE_THROW();
2901 }
2902 PG_END_TRY();
2903 }
2904
2905 /*
2906 * Detect whether we want to process an EquivalenceClass member.
2907 *
2908 * This is a callback for use by generate_implied_equalities_for_column.
2909 */
2910 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)2911 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
2912 EquivalenceClass *ec, EquivalenceMember *em,
2913 void *arg)
2914 {
2915 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
2916 Expr *expr = em->em_expr;
2917
2918 /*
2919 * If we've identified what we're processing in the current scan, we only
2920 * want to match that expression.
2921 */
2922 if (state->current != NULL)
2923 return equal(expr, state->current);
2924
2925 /*
2926 * Otherwise, ignore anything we've already processed.
2927 */
2928 if (list_member(state->already_used, expr))
2929 return false;
2930
2931 /* This is the new target to process. */
2932 state->current = expr;
2933 return true;
2934 }
2935
2936 /*
2937 * Create cursor for node's query with current parameter values.
2938 */
2939 static void
create_cursor(ForeignScanState * node)2940 create_cursor(ForeignScanState *node)
2941 {
2942 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2943 ExprContext *econtext = node->ss.ps.ps_ExprContext;
2944 int numParams = fsstate->numParams;
2945 const char **values = fsstate->param_values;
2946 PGconn *conn = fsstate->conn;
2947 StringInfoData buf;
2948 PGresult *res;
2949
2950 /*
2951 * Construct array of query parameter values in text format. We do the
2952 * conversions in the short-lived per-tuple context, so as not to cause a
2953 * memory leak over repeated scans.
2954 */
2955 if (numParams > 0)
2956 {
2957 MemoryContext oldcontext;
2958
2959 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2960
2961 process_query_params(econtext,
2962 fsstate->param_flinfo,
2963 fsstate->param_exprs,
2964 values);
2965
2966 MemoryContextSwitchTo(oldcontext);
2967 }
2968
2969 /* Construct the DECLARE CURSOR command */
2970 initStringInfo(&buf);
2971 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2972 fsstate->cursor_number, fsstate->query);
2973
2974 /*
2975 * Notice that we pass NULL for paramTypes, thus forcing the remote server
2976 * to infer types for all parameters. Since we explicitly cast every
2977 * parameter (see deparse.c), the "inference" is trivial and will produce
2978 * the desired result. This allows us to avoid assuming that the remote
2979 * server has the same OIDs we do for the parameters' types.
2980 */
2981 if (!PQsendQueryParams(conn, buf.data, numParams,
2982 NULL, values, NULL, NULL, 0))
2983 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2984
2985 /*
2986 * Get the result, and check for success.
2987 *
2988 * We don't use a PG_TRY block here, so be careful not to throw error
2989 * without releasing the PGresult.
2990 */
2991 res = pgfdw_get_result(conn, buf.data);
2992 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2993 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2994 PQclear(res);
2995
2996 /* Mark the cursor as created, and show no tuples have been retrieved */
2997 fsstate->cursor_exists = true;
2998 fsstate->tuples = NULL;
2999 fsstate->num_tuples = 0;
3000 fsstate->next_tuple = 0;
3001 fsstate->fetch_ct_2 = 0;
3002 fsstate->eof_reached = false;
3003
3004 /* Clean up */
3005 pfree(buf.data);
3006 }
3007
3008 /*
3009 * Fetch some more rows from the node's cursor.
3010 */
3011 static void
fetch_more_data(ForeignScanState * node)3012 fetch_more_data(ForeignScanState *node)
3013 {
3014 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3015 PGresult *volatile res = NULL;
3016 MemoryContext oldcontext;
3017
3018 /*
3019 * We'll store the tuples in the batch_cxt. First, flush the previous
3020 * batch.
3021 */
3022 fsstate->tuples = NULL;
3023 MemoryContextReset(fsstate->batch_cxt);
3024 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3025
3026 /* PGresult must be released before leaving this function. */
3027 PG_TRY();
3028 {
3029 PGconn *conn = fsstate->conn;
3030 char sql[64];
3031 int numrows;
3032 int i;
3033
3034 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3035 fsstate->fetch_size, fsstate->cursor_number);
3036
3037 res = pgfdw_exec_query(conn, sql);
3038 /* On error, report the original query, not the FETCH. */
3039 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3040 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3041
3042 /* Convert the data into HeapTuples */
3043 numrows = PQntuples(res);
3044 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3045 fsstate->num_tuples = numrows;
3046 fsstate->next_tuple = 0;
3047
3048 for (i = 0; i < numrows; i++)
3049 {
3050 Assert(IsA(node->ss.ps.plan, ForeignScan));
3051
3052 fsstate->tuples[i] =
3053 make_tuple_from_result_row(res, i,
3054 fsstate->rel,
3055 fsstate->attinmeta,
3056 fsstate->retrieved_attrs,
3057 node,
3058 fsstate->temp_cxt);
3059 }
3060
3061 /* Update fetch_ct_2 */
3062 if (fsstate->fetch_ct_2 < 2)
3063 fsstate->fetch_ct_2++;
3064
3065 /* Must be EOF if we didn't get as many tuples as we asked for. */
3066 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3067
3068 PQclear(res);
3069 res = NULL;
3070 }
3071 PG_CATCH();
3072 {
3073 if (res)
3074 PQclear(res);
3075 PG_RE_THROW();
3076 }
3077 PG_END_TRY();
3078
3079 MemoryContextSwitchTo(oldcontext);
3080 }
3081
3082 /*
3083 * Force assorted GUC parameters to settings that ensure that we'll output
3084 * data values in a form that is unambiguous to the remote server.
3085 *
3086 * This is rather expensive and annoying to do once per row, but there's
3087 * little choice if we want to be sure values are transmitted accurately;
3088 * we can't leave the settings in place between rows for fear of affecting
3089 * user-visible computations.
3090 *
3091 * We use the equivalent of a function SET option to allow the settings to
3092 * persist only until the caller calls reset_transmission_modes(). If an
3093 * error is thrown in between, guc.c will take care of undoing the settings.
3094 *
3095 * The return value is the nestlevel that must be passed to
3096 * reset_transmission_modes() to undo things.
3097 */
3098 int
set_transmission_modes(void)3099 set_transmission_modes(void)
3100 {
3101 int nestlevel = NewGUCNestLevel();
3102
3103 /*
3104 * The values set here should match what pg_dump does. See also
3105 * configure_remote_session in connection.c.
3106 */
3107 if (DateStyle != USE_ISO_DATES)
3108 (void) set_config_option("datestyle", "ISO",
3109 PGC_USERSET, PGC_S_SESSION,
3110 GUC_ACTION_SAVE, true, 0, false);
3111 if (IntervalStyle != INTSTYLE_POSTGRES)
3112 (void) set_config_option("intervalstyle", "postgres",
3113 PGC_USERSET, PGC_S_SESSION,
3114 GUC_ACTION_SAVE, true, 0, false);
3115 if (extra_float_digits < 3)
3116 (void) set_config_option("extra_float_digits", "3",
3117 PGC_USERSET, PGC_S_SESSION,
3118 GUC_ACTION_SAVE, true, 0, false);
3119
3120 return nestlevel;
3121 }
3122
3123 /*
3124 * Undo the effects of set_transmission_modes().
3125 */
3126 void
reset_transmission_modes(int nestlevel)3127 reset_transmission_modes(int nestlevel)
3128 {
3129 AtEOXact_GUC(true, nestlevel);
3130 }
3131
3132 /*
3133 * Utility routine to close a cursor.
3134 */
3135 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3136 close_cursor(PGconn *conn, unsigned int cursor_number)
3137 {
3138 char sql[64];
3139 PGresult *res;
3140
3141 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3142
3143 /*
3144 * We don't use a PG_TRY block here, so be careful not to throw error
3145 * without releasing the PGresult.
3146 */
3147 res = pgfdw_exec_query(conn, sql);
3148 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3149 pgfdw_report_error(ERROR, res, conn, true, sql);
3150 PQclear(res);
3151 }
3152
3153 /*
3154 * prepare_foreign_modify
3155 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3156 */
3157 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3158 prepare_foreign_modify(PgFdwModifyState *fmstate)
3159 {
3160 char prep_name[NAMEDATALEN];
3161 char *p_name;
3162 PGresult *res;
3163
3164 /* Construct name we'll use for the prepared statement. */
3165 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3166 GetPrepStmtNumber(fmstate->conn));
3167 p_name = pstrdup(prep_name);
3168
3169 /*
3170 * We intentionally do not specify parameter types here, but leave the
3171 * remote server to derive them by default. This avoids possible problems
3172 * with the remote server using different type OIDs than we do. All of
3173 * the prepared statements we use in this module are simple enough that
3174 * the remote server will make the right choices.
3175 */
3176 if (!PQsendPrepare(fmstate->conn,
3177 p_name,
3178 fmstate->query,
3179 0,
3180 NULL))
3181 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3182
3183 /*
3184 * Get the result, and check for success.
3185 *
3186 * We don't use a PG_TRY block here, so be careful not to throw error
3187 * without releasing the PGresult.
3188 */
3189 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3190 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3191 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3192 PQclear(res);
3193
3194 /* This action shows that the prepare has been done. */
3195 fmstate->p_name = p_name;
3196 }
3197
3198 /*
3199 * convert_prep_stmt_params
3200 * Create array of text strings representing parameter values
3201 *
3202 * tupleid is ctid to send, or NULL if none
3203 * slot is slot to get remaining parameters from, or NULL if none
3204 *
3205 * Data is constructed in temp_cxt; caller should reset that after use.
3206 */
3207 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3208 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3209 ItemPointer tupleid,
3210 TupleTableSlot *slot)
3211 {
3212 const char **p_values;
3213 int pindex = 0;
3214 MemoryContext oldcontext;
3215
3216 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3217
3218 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3219
3220 /* 1st parameter should be ctid, if it's in use */
3221 if (tupleid != NULL)
3222 {
3223 /* don't need set_transmission_modes for TID output */
3224 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3225 PointerGetDatum(tupleid));
3226 pindex++;
3227 }
3228
3229 /* get following parameters from slot */
3230 if (slot != NULL && fmstate->target_attrs != NIL)
3231 {
3232 int nestlevel;
3233 ListCell *lc;
3234
3235 nestlevel = set_transmission_modes();
3236
3237 foreach(lc, fmstate->target_attrs)
3238 {
3239 int attnum = lfirst_int(lc);
3240 Datum value;
3241 bool isnull;
3242
3243 value = slot_getattr(slot, attnum, &isnull);
3244 if (isnull)
3245 p_values[pindex] = NULL;
3246 else
3247 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3248 value);
3249 pindex++;
3250 }
3251
3252 reset_transmission_modes(nestlevel);
3253 }
3254
3255 Assert(pindex == fmstate->p_nums);
3256
3257 MemoryContextSwitchTo(oldcontext);
3258
3259 return p_values;
3260 }
3261
3262 /*
3263 * store_returning_result
3264 * Store the result of a RETURNING clause
3265 *
3266 * On error, be sure to release the PGresult on the way out. Callers do not
3267 * have PG_TRY blocks to ensure this happens.
3268 */
3269 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3270 store_returning_result(PgFdwModifyState *fmstate,
3271 TupleTableSlot *slot, PGresult *res)
3272 {
3273 PG_TRY();
3274 {
3275 HeapTuple newtup;
3276
3277 newtup = make_tuple_from_result_row(res, 0,
3278 fmstate->rel,
3279 fmstate->attinmeta,
3280 fmstate->retrieved_attrs,
3281 NULL,
3282 fmstate->temp_cxt);
3283 /* tuple will be deleted when it is cleared from the slot */
3284 ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3285 }
3286 PG_CATCH();
3287 {
3288 if (res)
3289 PQclear(res);
3290 PG_RE_THROW();
3291 }
3292 PG_END_TRY();
3293 }
3294
3295 /*
3296 * Execute a direct UPDATE/DELETE statement.
3297 */
3298 static void
execute_dml_stmt(ForeignScanState * node)3299 execute_dml_stmt(ForeignScanState *node)
3300 {
3301 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3302 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3303 int numParams = dmstate->numParams;
3304 const char **values = dmstate->param_values;
3305
3306 /*
3307 * Construct array of query parameter values in text format.
3308 */
3309 if (numParams > 0)
3310 process_query_params(econtext,
3311 dmstate->param_flinfo,
3312 dmstate->param_exprs,
3313 values);
3314
3315 /*
3316 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3317 * to infer types for all parameters. Since we explicitly cast every
3318 * parameter (see deparse.c), the "inference" is trivial and will produce
3319 * the desired result. This allows us to avoid assuming that the remote
3320 * server has the same OIDs we do for the parameters' types.
3321 */
3322 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3323 NULL, values, NULL, NULL, 0))
3324 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3325
3326 /*
3327 * Get the result, and check for success.
3328 *
3329 * We don't use a PG_TRY block here, so be careful not to throw error
3330 * without releasing the PGresult.
3331 */
3332 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3333 if (PQresultStatus(dmstate->result) !=
3334 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3335 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3336 dmstate->query);
3337
3338 /* Get the number of rows affected. */
3339 if (dmstate->has_returning)
3340 dmstate->num_tuples = PQntuples(dmstate->result);
3341 else
3342 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3343 }
3344
3345 /*
3346 * Get the result of a RETURNING clause.
3347 */
3348 static TupleTableSlot *
get_returning_data(ForeignScanState * node)3349 get_returning_data(ForeignScanState *node)
3350 {
3351 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3352 EState *estate = node->ss.ps.state;
3353 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3354 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3355
3356 Assert(resultRelInfo->ri_projectReturning);
3357
3358 /* If we didn't get any tuples, must be end of data. */
3359 if (dmstate->next_tuple >= dmstate->num_tuples)
3360 return ExecClearTuple(slot);
3361
3362 /* Increment the command es_processed count if necessary. */
3363 if (dmstate->set_processed)
3364 estate->es_processed += 1;
3365
3366 /*
3367 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3368 * tuple. (has_returning is false when the local query is of the form
3369 * "UPDATE/DELETE .. RETURNING 1" for example.)
3370 */
3371 if (!dmstate->has_returning)
3372 ExecStoreAllNullTuple(slot);
3373 else
3374 {
3375 /*
3376 * On error, be sure to release the PGresult on the way out. Callers
3377 * do not have PG_TRY blocks to ensure this happens.
3378 */
3379 PG_TRY();
3380 {
3381 HeapTuple newtup;
3382
3383 newtup = make_tuple_from_result_row(dmstate->result,
3384 dmstate->next_tuple,
3385 dmstate->rel,
3386 dmstate->attinmeta,
3387 dmstate->retrieved_attrs,
3388 NULL,
3389 dmstate->temp_cxt);
3390 ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3391 }
3392 PG_CATCH();
3393 {
3394 if (dmstate->result)
3395 PQclear(dmstate->result);
3396 PG_RE_THROW();
3397 }
3398 PG_END_TRY();
3399 }
3400 dmstate->next_tuple++;
3401
3402 /* Make slot available for evaluation of the local query RETURNING list. */
3403 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3404
3405 return slot;
3406 }
3407
3408 /*
3409 * Prepare for processing of parameters used in remote query.
3410 */
3411 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)3412 prepare_query_params(PlanState *node,
3413 List *fdw_exprs,
3414 int numParams,
3415 FmgrInfo **param_flinfo,
3416 List **param_exprs,
3417 const char ***param_values)
3418 {
3419 int i;
3420 ListCell *lc;
3421
3422 Assert(numParams > 0);
3423
3424 /* Prepare for output conversion of parameters used in remote query. */
3425 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3426
3427 i = 0;
3428 foreach(lc, fdw_exprs)
3429 {
3430 Node *param_expr = (Node *) lfirst(lc);
3431 Oid typefnoid;
3432 bool isvarlena;
3433
3434 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3435 fmgr_info(typefnoid, &(*param_flinfo)[i]);
3436 i++;
3437 }
3438
3439 /*
3440 * Prepare remote-parameter expressions for evaluation. (Note: in
3441 * practice, we expect that all these expressions will be just Params, so
3442 * we could possibly do something more efficient than using the full
3443 * expression-eval machinery for this. But probably there would be little
3444 * benefit, and it'd require postgres_fdw to know more than is desirable
3445 * about Param evaluation.)
3446 */
3447 *param_exprs = ExecInitExprList(fdw_exprs, node);
3448
3449 /* Allocate buffer for text form of query parameters. */
3450 *param_values = (const char **) palloc0(numParams * sizeof(char *));
3451 }
3452
3453 /*
3454 * Construct array of query parameter values in text format.
3455 */
3456 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)3457 process_query_params(ExprContext *econtext,
3458 FmgrInfo *param_flinfo,
3459 List *param_exprs,
3460 const char **param_values)
3461 {
3462 int nestlevel;
3463 int i;
3464 ListCell *lc;
3465
3466 nestlevel = set_transmission_modes();
3467
3468 i = 0;
3469 foreach(lc, param_exprs)
3470 {
3471 ExprState *expr_state = (ExprState *) lfirst(lc);
3472 Datum expr_value;
3473 bool isNull;
3474
3475 /* Evaluate the parameter expression */
3476 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
3477
3478 /*
3479 * Get string representation of each parameter value by invoking
3480 * type-specific output function, unless the value is null.
3481 */
3482 if (isNull)
3483 param_values[i] = NULL;
3484 else
3485 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
3486
3487 i++;
3488 }
3489
3490 reset_transmission_modes(nestlevel);
3491 }
3492
3493 /*
3494 * postgresAnalyzeForeignTable
3495 * Test whether analyzing this foreign table is supported
3496 */
3497 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)3498 postgresAnalyzeForeignTable(Relation relation,
3499 AcquireSampleRowsFunc *func,
3500 BlockNumber *totalpages)
3501 {
3502 ForeignTable *table;
3503 UserMapping *user;
3504 PGconn *conn;
3505 StringInfoData sql;
3506 PGresult *volatile res = NULL;
3507
3508 /* Return the row-analysis function pointer */
3509 *func = postgresAcquireSampleRowsFunc;
3510
3511 /*
3512 * Now we have to get the number of pages. It's annoying that the ANALYZE
3513 * API requires us to return that now, because it forces some duplication
3514 * of effort between this routine and postgresAcquireSampleRowsFunc. But
3515 * it's probably not worth redefining that API at this point.
3516 */
3517
3518 /*
3519 * Get the connection to use. We do the remote access as the table's
3520 * owner, even if the ANALYZE was started by some other user.
3521 */
3522 table = GetForeignTable(RelationGetRelid(relation));
3523 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3524 conn = GetConnection(user, false);
3525
3526 /*
3527 * Construct command to get page count for relation.
3528 */
3529 initStringInfo(&sql);
3530 deparseAnalyzeSizeSql(&sql, relation);
3531
3532 /* In what follows, do not risk leaking any PGresults. */
3533 PG_TRY();
3534 {
3535 res = pgfdw_exec_query(conn, sql.data);
3536 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3537 pgfdw_report_error(ERROR, res, conn, false, sql.data);
3538
3539 if (PQntuples(res) != 1 || PQnfields(res) != 1)
3540 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3541 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3542
3543 PQclear(res);
3544 res = NULL;
3545 }
3546 PG_CATCH();
3547 {
3548 if (res)
3549 PQclear(res);
3550 PG_RE_THROW();
3551 }
3552 PG_END_TRY();
3553
3554 ReleaseConnection(conn);
3555
3556 return true;
3557 }
3558
3559 /*
3560 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3561 *
3562 * We fetch the whole table from the remote side and pick out some sample rows.
3563 *
3564 * Selected rows are returned in the caller-allocated array rows[],
3565 * which must have at least targrows entries.
3566 * The actual number of rows selected is returned as the function result.
3567 * We also count the total number of rows in the table and return it into
3568 * *totalrows. Note that *totaldeadrows is always set to 0.
3569 *
3570 * Note that the returned list of rows is not always in order by physical
3571 * position in the table. Therefore, correlation estimates derived later
3572 * may be meaningless, but it's OK because we don't use the estimates
3573 * currently (the planner only pays attention to correlation for indexscans).
3574 */
3575 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)3576 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
3577 HeapTuple *rows, int targrows,
3578 double *totalrows,
3579 double *totaldeadrows)
3580 {
3581 PgFdwAnalyzeState astate;
3582 ForeignTable *table;
3583 ForeignServer *server;
3584 UserMapping *user;
3585 PGconn *conn;
3586 unsigned int cursor_number;
3587 StringInfoData sql;
3588 PGresult *volatile res = NULL;
3589
3590 /* Initialize workspace state */
3591 astate.rel = relation;
3592 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
3593
3594 astate.rows = rows;
3595 astate.targrows = targrows;
3596 astate.numrows = 0;
3597 astate.samplerows = 0;
3598 astate.rowstoskip = -1; /* -1 means not set yet */
3599 reservoir_init_selection_state(&astate.rstate, targrows);
3600
3601 /* Remember ANALYZE context, and create a per-tuple temp context */
3602 astate.anl_cxt = CurrentMemoryContext;
3603 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
3604 "postgres_fdw temporary data",
3605 ALLOCSET_SMALL_SIZES);
3606
3607 /*
3608 * Get the connection to use. We do the remote access as the table's
3609 * owner, even if the ANALYZE was started by some other user.
3610 */
3611 table = GetForeignTable(RelationGetRelid(relation));
3612 server = GetForeignServer(table->serverid);
3613 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3614 conn = GetConnection(user, false);
3615
3616 /*
3617 * Construct cursor that retrieves whole rows from remote.
3618 */
3619 cursor_number = GetCursorNumber(conn);
3620 initStringInfo(&sql);
3621 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3622 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3623
3624 /* In what follows, do not risk leaking any PGresults. */
3625 PG_TRY();
3626 {
3627 res = pgfdw_exec_query(conn, sql.data);
3628 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3629 pgfdw_report_error(ERROR, res, conn, false, sql.data);
3630 PQclear(res);
3631 res = NULL;
3632
3633 /* Retrieve and process rows a batch at a time. */
3634 for (;;)
3635 {
3636 char fetch_sql[64];
3637 int fetch_size;
3638 int numrows;
3639 int i;
3640 ListCell *lc;
3641
3642 /* Allow users to cancel long query */
3643 CHECK_FOR_INTERRUPTS();
3644
3645 /*
3646 * XXX possible future improvement: if rowstoskip is large, we
3647 * could issue a MOVE rather than physically fetching the rows,
3648 * then just adjust rowstoskip and samplerows appropriately.
3649 */
3650
3651 /* The fetch size is arbitrary, but shouldn't be enormous. */
3652 fetch_size = 100;
3653 foreach(lc, server->options)
3654 {
3655 DefElem *def = (DefElem *) lfirst(lc);
3656
3657 if (strcmp(def->defname, "fetch_size") == 0)
3658 {
3659 fetch_size = strtol(defGetString(def), NULL, 10);
3660 break;
3661 }
3662 }
3663 foreach(lc, table->options)
3664 {
3665 DefElem *def = (DefElem *) lfirst(lc);
3666
3667 if (strcmp(def->defname, "fetch_size") == 0)
3668 {
3669 fetch_size = strtol(defGetString(def), NULL, 10);
3670 break;
3671 }
3672 }
3673
3674 /* Fetch some rows */
3675 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3676 fetch_size, cursor_number);
3677
3678 res = pgfdw_exec_query(conn, fetch_sql);
3679 /* On error, report the original query, not the FETCH. */
3680 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3681 pgfdw_report_error(ERROR, res, conn, false, sql.data);
3682
3683 /* Process whatever we got. */
3684 numrows = PQntuples(res);
3685 for (i = 0; i < numrows; i++)
3686 analyze_row_processor(res, i, &astate);
3687
3688 PQclear(res);
3689 res = NULL;
3690
3691 /* Must be EOF if we didn't get all the rows requested. */
3692 if (numrows < fetch_size)
3693 break;
3694 }
3695
3696 /* Close the cursor, just to be tidy. */
3697 close_cursor(conn, cursor_number);
3698 }
3699 PG_CATCH();
3700 {
3701 if (res)
3702 PQclear(res);
3703 PG_RE_THROW();
3704 }
3705 PG_END_TRY();
3706
3707 ReleaseConnection(conn);
3708
3709 /* We assume that we have no dead tuple. */
3710 *totaldeadrows = 0.0;
3711
3712 /* We've retrieved all living tuples from foreign server. */
3713 *totalrows = astate.samplerows;
3714
3715 /*
3716 * Emit some interesting relation info
3717 */
3718 ereport(elevel,
3719 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3720 RelationGetRelationName(relation),
3721 astate.samplerows, astate.numrows)));
3722
3723 return astate.numrows;
3724 }
3725
3726 /*
3727 * Collect sample rows from the result of query.
3728 * - Use all tuples in sample until target # of samples are collected.
3729 * - Subsequently, replace already-sampled tuples randomly.
3730 */
3731 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)3732 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
3733 {
3734 int targrows = astate->targrows;
3735 int pos; /* array index to store tuple in */
3736 MemoryContext oldcontext;
3737
3738 /* Always increment sample row counter. */
3739 astate->samplerows += 1;
3740
3741 /*
3742 * Determine the slot where this sample row should be stored. Set pos to
3743 * negative value to indicate the row should be skipped.
3744 */
3745 if (astate->numrows < targrows)
3746 {
3747 /* First targrows rows are always included into the sample */
3748 pos = astate->numrows++;
3749 }
3750 else
3751 {
3752 /*
3753 * Now we start replacing tuples in the sample until we reach the end
3754 * of the relation. Same algorithm as in acquire_sample_rows in
3755 * analyze.c; see Jeff Vitter's paper.
3756 */
3757 if (astate->rowstoskip < 0)
3758 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3759
3760 if (astate->rowstoskip <= 0)
3761 {
3762 /* Choose a random reservoir element to replace. */
3763 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3764 Assert(pos >= 0 && pos < targrows);
3765 heap_freetuple(astate->rows[pos]);
3766 }
3767 else
3768 {
3769 /* Skip this tuple. */
3770 pos = -1;
3771 }
3772
3773 astate->rowstoskip -= 1;
3774 }
3775
3776 if (pos >= 0)
3777 {
3778 /*
3779 * Create sample tuple from current result row, and store it in the
3780 * position determined above. The tuple has to be created in anl_cxt.
3781 */
3782 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3783
3784 astate->rows[pos] = make_tuple_from_result_row(res, row,
3785 astate->rel,
3786 astate->attinmeta,
3787 astate->retrieved_attrs,
3788 NULL,
3789 astate->temp_cxt);
3790
3791 MemoryContextSwitchTo(oldcontext);
3792 }
3793 }
3794
3795 /*
3796 * Import a foreign schema
3797 */
3798 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)3799 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
3800 {
3801 List *commands = NIL;
3802 bool import_collate = true;
3803 bool import_default = false;
3804 bool import_not_null = true;
3805 ForeignServer *server;
3806 UserMapping *mapping;
3807 PGconn *conn;
3808 StringInfoData buf;
3809 PGresult *volatile res = NULL;
3810 int numrows,
3811 i;
3812 ListCell *lc;
3813
3814 /* Parse statement options */
3815 foreach(lc, stmt->options)
3816 {
3817 DefElem *def = (DefElem *) lfirst(lc);
3818
3819 if (strcmp(def->defname, "import_collate") == 0)
3820 import_collate = defGetBoolean(def);
3821 else if (strcmp(def->defname, "import_default") == 0)
3822 import_default = defGetBoolean(def);
3823 else if (strcmp(def->defname, "import_not_null") == 0)
3824 import_not_null = defGetBoolean(def);
3825 else
3826 ereport(ERROR,
3827 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3828 errmsg("invalid option \"%s\"", def->defname)));
3829 }
3830
3831 /*
3832 * Get connection to the foreign server. Connection manager will
3833 * establish new connection if necessary.
3834 */
3835 server = GetForeignServer(serverOid);
3836 mapping = GetUserMapping(GetUserId(), server->serverid);
3837 conn = GetConnection(mapping, false);
3838
3839 /* Don't attempt to import collation if remote server hasn't got it */
3840 if (PQserverVersion(conn) < 90100)
3841 import_collate = false;
3842
3843 /* Create workspace for strings */
3844 initStringInfo(&buf);
3845
3846 /* In what follows, do not risk leaking any PGresults. */
3847 PG_TRY();
3848 {
3849 /* Check that the schema really exists */
3850 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3851 deparseStringLiteral(&buf, stmt->remote_schema);
3852
3853 res = pgfdw_exec_query(conn, buf.data);
3854 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3855 pgfdw_report_error(ERROR, res, conn, false, buf.data);
3856
3857 if (PQntuples(res) != 1)
3858 ereport(ERROR,
3859 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3860 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3861 stmt->remote_schema, server->servername)));
3862
3863 PQclear(res);
3864 res = NULL;
3865 resetStringInfo(&buf);
3866
3867 /*
3868 * Fetch all table data from this schema, possibly restricted by
3869 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3870 * to EXCEPT/LIMIT TO here, because the core code will filter the
3871 * statements we return according to those lists anyway. But it
3872 * should save a few cycles to not process excluded tables in the
3873 * first place.)
3874 *
3875 * Ignore table data for partitions and only include the definitions
3876 * of the root partitioned tables to allow access to the complete
3877 * remote data set locally in the schema imported.
3878 *
3879 * Note: because we run the connection with search_path restricted to
3880 * pg_catalog, the format_type() and pg_get_expr() outputs will always
3881 * include a schema name for types/functions in other schemas, which
3882 * is what we want.
3883 */
3884 if (import_collate)
3885 appendStringInfoString(&buf,
3886 "SELECT relname, "
3887 " attname, "
3888 " format_type(atttypid, atttypmod), "
3889 " attnotnull, "
3890 " pg_get_expr(adbin, adrelid), "
3891 " collname, "
3892 " collnsp.nspname "
3893 "FROM pg_class c "
3894 " JOIN pg_namespace n ON "
3895 " relnamespace = n.oid "
3896 " LEFT JOIN pg_attribute a ON "
3897 " attrelid = c.oid AND attnum > 0 "
3898 " AND NOT attisdropped "
3899 " LEFT JOIN pg_attrdef ad ON "
3900 " adrelid = c.oid AND adnum = attnum "
3901 " LEFT JOIN pg_collation coll ON "
3902 " coll.oid = attcollation "
3903 " LEFT JOIN pg_namespace collnsp ON "
3904 " collnsp.oid = collnamespace ");
3905 else
3906 appendStringInfoString(&buf,
3907 "SELECT relname, "
3908 " attname, "
3909 " format_type(atttypid, atttypmod), "
3910 " attnotnull, "
3911 " pg_get_expr(adbin, adrelid), "
3912 " NULL, NULL "
3913 "FROM pg_class c "
3914 " JOIN pg_namespace n ON "
3915 " relnamespace = n.oid "
3916 " LEFT JOIN pg_attribute a ON "
3917 " attrelid = c.oid AND attnum > 0 "
3918 " AND NOT attisdropped "
3919 " LEFT JOIN pg_attrdef ad ON "
3920 " adrelid = c.oid AND adnum = attnum ");
3921
3922 appendStringInfoString(&buf,
3923 "WHERE c.relkind IN ("
3924 CppAsString2(RELKIND_RELATION) ","
3925 CppAsString2(RELKIND_VIEW) ","
3926 CppAsString2(RELKIND_FOREIGN_TABLE) ","
3927 CppAsString2(RELKIND_MATVIEW) ","
3928 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
3929 " AND n.nspname = ");
3930 deparseStringLiteral(&buf, stmt->remote_schema);
3931
3932 /* Partitions are supported since Postgres 10 */
3933 if (PQserverVersion(conn) >= 100000)
3934 appendStringInfoString(&buf, " AND NOT c.relispartition ");
3935
3936 /* Apply restrictions for LIMIT TO and EXCEPT */
3937 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3938 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3939 {
3940 bool first_item = true;
3941
3942 appendStringInfoString(&buf, " AND c.relname ");
3943 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3944 appendStringInfoString(&buf, "NOT ");
3945 appendStringInfoString(&buf, "IN (");
3946
3947 /* Append list of table names within IN clause */
3948 foreach(lc, stmt->table_list)
3949 {
3950 RangeVar *rv = (RangeVar *) lfirst(lc);
3951
3952 if (first_item)
3953 first_item = false;
3954 else
3955 appendStringInfoString(&buf, ", ");
3956 deparseStringLiteral(&buf, rv->relname);
3957 }
3958 appendStringInfoChar(&buf, ')');
3959 }
3960
3961 /* Append ORDER BY at the end of query to ensure output ordering */
3962 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3963
3964 /* Fetch the data */
3965 res = pgfdw_exec_query(conn, buf.data);
3966 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3967 pgfdw_report_error(ERROR, res, conn, false, buf.data);
3968
3969 /* Process results */
3970 numrows = PQntuples(res);
3971 /* note: incrementation of i happens in inner loop's while() test */
3972 for (i = 0; i < numrows;)
3973 {
3974 char *tablename = PQgetvalue(res, i, 0);
3975 bool first_item = true;
3976
3977 resetStringInfo(&buf);
3978 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3979 quote_identifier(tablename));
3980
3981 /* Scan all rows for this table */
3982 do
3983 {
3984 char *attname;
3985 char *typename;
3986 char *attnotnull;
3987 char *attdefault;
3988 char *collname;
3989 char *collnamespace;
3990
3991 /* If table has no columns, we'll see nulls here */
3992 if (PQgetisnull(res, i, 1))
3993 continue;
3994
3995 attname = PQgetvalue(res, i, 1);
3996 typename = PQgetvalue(res, i, 2);
3997 attnotnull = PQgetvalue(res, i, 3);
3998 attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3999 PQgetvalue(res, i, 4);
4000 collname = PQgetisnull(res, i, 5) ? (char *) NULL :
4001 PQgetvalue(res, i, 5);
4002 collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
4003 PQgetvalue(res, i, 6);
4004
4005 if (first_item)
4006 first_item = false;
4007 else
4008 appendStringInfoString(&buf, ",\n");
4009
4010 /* Print column name and type */
4011 appendStringInfo(&buf, " %s %s",
4012 quote_identifier(attname),
4013 typename);
4014
4015 /*
4016 * Add column_name option so that renaming the foreign table's
4017 * column doesn't break the association to the underlying
4018 * column.
4019 */
4020 appendStringInfoString(&buf, " OPTIONS (column_name ");
4021 deparseStringLiteral(&buf, attname);
4022 appendStringInfoChar(&buf, ')');
4023
4024 /* Add COLLATE if needed */
4025 if (import_collate && collname != NULL && collnamespace != NULL)
4026 appendStringInfo(&buf, " COLLATE %s.%s",
4027 quote_identifier(collnamespace),
4028 quote_identifier(collname));
4029
4030 /* Add DEFAULT if needed */
4031 if (import_default && attdefault != NULL)
4032 appendStringInfo(&buf, " DEFAULT %s", attdefault);
4033
4034 /* Add NOT NULL if needed */
4035 if (import_not_null && attnotnull[0] == 't')
4036 appendStringInfoString(&buf, " NOT NULL");
4037 }
4038 while (++i < numrows &&
4039 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4040
4041 /*
4042 * Add server name and table-level options. We specify remote
4043 * schema and table name as options (the latter to ensure that
4044 * renaming the foreign table doesn't break the association).
4045 */
4046 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4047 quote_identifier(server->servername));
4048
4049 appendStringInfoString(&buf, "schema_name ");
4050 deparseStringLiteral(&buf, stmt->remote_schema);
4051 appendStringInfoString(&buf, ", table_name ");
4052 deparseStringLiteral(&buf, tablename);
4053
4054 appendStringInfoString(&buf, ");");
4055
4056 commands = lappend(commands, pstrdup(buf.data));
4057 }
4058
4059 /* Clean up */
4060 PQclear(res);
4061 res = NULL;
4062 }
4063 PG_CATCH();
4064 {
4065 if (res)
4066 PQclear(res);
4067 PG_RE_THROW();
4068 }
4069 PG_END_TRY();
4070
4071 ReleaseConnection(conn);
4072
4073 return commands;
4074 }
4075
4076 /*
4077 * Assess whether the join between inner and outer relations can be pushed down
4078 * to the foreign server. As a side effect, save information we obtain in this
4079 * function to PgFdwRelationInfo passed in.
4080 */
4081 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)4082 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
4083 RelOptInfo *outerrel, RelOptInfo *innerrel,
4084 JoinPathExtraData *extra)
4085 {
4086 PgFdwRelationInfo *fpinfo;
4087 PgFdwRelationInfo *fpinfo_o;
4088 PgFdwRelationInfo *fpinfo_i;
4089 ListCell *lc;
4090 List *joinclauses;
4091
4092 /*
4093 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4094 * Constructing queries representing SEMI and ANTI joins is hard, hence
4095 * not considered right now.
4096 */
4097 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4098 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4099 return false;
4100
4101 /*
4102 * If either of the joining relations is marked as unsafe to pushdown, the
4103 * join can not be pushed down.
4104 */
4105 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4106 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4107 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4108 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4109 !fpinfo_i || !fpinfo_i->pushdown_safe)
4110 return false;
4111
4112 /*
4113 * If joining relations have local conditions, those conditions are
4114 * required to be applied before joining the relations. Hence the join can
4115 * not be pushed down.
4116 */
4117 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4118 return false;
4119
4120 /*
4121 * Merge FDW options. We might be tempted to do this after we have deemed
4122 * the foreign join to be OK. But we must do this beforehand so that we
4123 * know which quals can be evaluated on the foreign server, which might
4124 * depend on shippable_extensions.
4125 */
4126 fpinfo->server = fpinfo_o->server;
4127 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4128
4129 /*
4130 * Separate restrict list into join quals and pushed-down (other) quals.
4131 *
4132 * Join quals belonging to an outer join must all be shippable, else we
4133 * cannot execute the join remotely. Add such quals to 'joinclauses'.
4134 *
4135 * Add other quals to fpinfo->remote_conds if they are shippable, else to
4136 * fpinfo->local_conds. In an inner join it's okay to execute conditions
4137 * either locally or remotely; the same is true for pushed-down conditions
4138 * at an outer join.
4139 *
4140 * Note we might return failure after having already scribbled on
4141 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
4142 * won't consult those lists again if we deem the join unshippable.
4143 */
4144 joinclauses = NIL;
4145 foreach(lc, extra->restrictlist)
4146 {
4147 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4148 bool is_remote_clause = is_foreign_expr(root, joinrel,
4149 rinfo->clause);
4150
4151 if (IS_OUTER_JOIN(jointype) &&
4152 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
4153 {
4154 if (!is_remote_clause)
4155 return false;
4156 joinclauses = lappend(joinclauses, rinfo);
4157 }
4158 else
4159 {
4160 if (is_remote_clause)
4161 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4162 else
4163 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4164 }
4165 }
4166
4167 /*
4168 * deparseExplicitTargetList() isn't smart enough to handle anything other
4169 * than a Var. In particular, if there's some PlaceHolderVar that would
4170 * need to be evaluated within this join tree (because there's an upper
4171 * reference to a quantity that may go to NULL as a result of an outer
4172 * join), then we can't try to push the join down because we'll fail when
4173 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4174 * needs to be evaluated *at the top* of this join tree is OK, because we
4175 * can do that locally after fetching the results from the remote side.
4176 */
4177 foreach(lc, root->placeholder_list)
4178 {
4179 PlaceHolderInfo *phinfo = lfirst(lc);
4180 Relids relids = joinrel->relids;
4181
4182 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4183 bms_nonempty_difference(relids, phinfo->ph_eval_at))
4184 return false;
4185 }
4186
4187 /* Save the join clauses, for later use. */
4188 fpinfo->joinclauses = joinclauses;
4189
4190 fpinfo->outerrel = outerrel;
4191 fpinfo->innerrel = innerrel;
4192 fpinfo->jointype = jointype;
4193
4194 /*
4195 * By default, both the input relations are not required to be deparsed as
4196 * subqueries, but there might be some relations covered by the input
4197 * relations that are required to be deparsed as subqueries, so save the
4198 * relids of those relations for later use by the deparser.
4199 */
4200 fpinfo->make_outerrel_subquery = false;
4201 fpinfo->make_innerrel_subquery = false;
4202 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4203 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4204 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
4205 fpinfo_i->lower_subquery_rels);
4206
4207 /*
4208 * Pull the other remote conditions from the joining relations into join
4209 * clauses or other remote clauses (remote_conds) of this relation
4210 * wherever possible. This avoids building subqueries at every join step.
4211 *
4212 * For an inner join, clauses from both the relations are added to the
4213 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4214 * the outer side are added to remote_conds since those can be evaluated
4215 * after the join is evaluated. The clauses from inner side are added to
4216 * the joinclauses, since they need to be evaluated while constructing the
4217 * join.
4218 *
4219 * For a FULL OUTER JOIN, the other clauses from either relation can not
4220 * be added to the joinclauses or remote_conds, since each relation acts
4221 * as an outer relation for the other.
4222 *
4223 * The joining sides can not have local conditions, thus no need to test
4224 * shippability of the clauses being pulled up.
4225 */
4226 switch (jointype)
4227 {
4228 case JOIN_INNER:
4229 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4230 list_copy(fpinfo_i->remote_conds));
4231 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4232 list_copy(fpinfo_o->remote_conds));
4233 break;
4234
4235 case JOIN_LEFT:
4236 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4237 list_copy(fpinfo_i->remote_conds));
4238 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4239 list_copy(fpinfo_o->remote_conds));
4240 break;
4241
4242 case JOIN_RIGHT:
4243 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4244 list_copy(fpinfo_o->remote_conds));
4245 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4246 list_copy(fpinfo_i->remote_conds));
4247 break;
4248
4249 case JOIN_FULL:
4250
4251 /*
4252 * In this case, if any of the input relations has conditions, we
4253 * need to deparse that relation as a subquery so that the
4254 * conditions can be evaluated before the join. Remember it in
4255 * the fpinfo of this relation so that the deparser can take
4256 * appropriate action. Also, save the relids of base relations
4257 * covered by that relation for later use by the deparser.
4258 */
4259 if (fpinfo_o->remote_conds)
4260 {
4261 fpinfo->make_outerrel_subquery = true;
4262 fpinfo->lower_subquery_rels =
4263 bms_add_members(fpinfo->lower_subquery_rels,
4264 outerrel->relids);
4265 }
4266 if (fpinfo_i->remote_conds)
4267 {
4268 fpinfo->make_innerrel_subquery = true;
4269 fpinfo->lower_subquery_rels =
4270 bms_add_members(fpinfo->lower_subquery_rels,
4271 innerrel->relids);
4272 }
4273 break;
4274
4275 default:
4276 /* Should not happen, we have just checked this above */
4277 elog(ERROR, "unsupported join type %d", jointype);
4278 }
4279
4280 /*
4281 * For an inner join, all restrictions can be treated alike. Treating the
4282 * pushed down conditions as join conditions allows a top level full outer
4283 * join to be deparsed without requiring subqueries.
4284 */
4285 if (jointype == JOIN_INNER)
4286 {
4287 Assert(!fpinfo->joinclauses);
4288 fpinfo->joinclauses = fpinfo->remote_conds;
4289 fpinfo->remote_conds = NIL;
4290 }
4291
4292 /* Mark that this join can be pushed down safely */
4293 fpinfo->pushdown_safe = true;
4294
4295 /* Get user mapping */
4296 if (fpinfo->use_remote_estimate)
4297 {
4298 if (fpinfo_o->use_remote_estimate)
4299 fpinfo->user = fpinfo_o->user;
4300 else
4301 fpinfo->user = fpinfo_i->user;
4302 }
4303 else
4304 fpinfo->user = NULL;
4305
4306 /*
4307 * Set cached relation costs to some negative value, so that we can detect
4308 * when they are set to some sensible costs, during one (usually the
4309 * first) of the calls to estimate_path_cost_size().
4310 */
4311 fpinfo->rel_startup_cost = -1;
4312 fpinfo->rel_total_cost = -1;
4313
4314 /*
4315 * Set the string describing this join relation to be used in EXPLAIN
4316 * output of corresponding ForeignScan.
4317 */
4318 fpinfo->relation_name = makeStringInfo();
4319 appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4320 fpinfo_o->relation_name->data,
4321 get_jointype_name(fpinfo->jointype),
4322 fpinfo_i->relation_name->data);
4323
4324 /*
4325 * Set the relation index. This is defined as the position of this
4326 * joinrel in the join_rel_list list plus the length of the rtable list.
4327 * Note that since this joinrel is at the end of the join_rel_list list
4328 * when we are called, we can get the position by list_length.
4329 */
4330 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4331 fpinfo->relation_index =
4332 list_length(root->parse->rtable) + list_length(root->join_rel_list);
4333
4334 return true;
4335 }
4336
4337 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)4338 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
4339 Path *epq_path)
4340 {
4341 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4342 ListCell *lc;
4343
4344 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4345
4346 /* Create one path for each set of pathkeys we found above. */
4347 foreach(lc, useful_pathkeys_list)
4348 {
4349 double rows;
4350 int width;
4351 Cost startup_cost;
4352 Cost total_cost;
4353 List *useful_pathkeys = lfirst(lc);
4354 Path *sorted_epq_path;
4355
4356 estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4357 &rows, &width, &startup_cost, &total_cost);
4358
4359 /*
4360 * The EPQ path must be at least as well sorted as the path itself,
4361 * in case it gets used as input to a mergejoin.
4362 */
4363 sorted_epq_path = epq_path;
4364 if (sorted_epq_path != NULL &&
4365 !pathkeys_contained_in(useful_pathkeys,
4366 sorted_epq_path->pathkeys))
4367 sorted_epq_path = (Path *)
4368 create_sort_path(root,
4369 rel,
4370 sorted_epq_path,
4371 useful_pathkeys,
4372 -1.0);
4373
4374 add_path(rel, (Path *)
4375 create_foreignscan_path(root, rel,
4376 NULL,
4377 rows,
4378 startup_cost,
4379 total_cost,
4380 useful_pathkeys,
4381 rel->lateral_relids,
4382 sorted_epq_path,
4383 NIL));
4384 }
4385 }
4386
4387 /*
4388 * Parse options from foreign server and apply them to fpinfo.
4389 *
4390 * New options might also require tweaking merge_fdw_options().
4391 */
4392 static void
apply_server_options(PgFdwRelationInfo * fpinfo)4393 apply_server_options(PgFdwRelationInfo *fpinfo)
4394 {
4395 ListCell *lc;
4396
4397 foreach(lc, fpinfo->server->options)
4398 {
4399 DefElem *def = (DefElem *) lfirst(lc);
4400
4401 if (strcmp(def->defname, "use_remote_estimate") == 0)
4402 fpinfo->use_remote_estimate = defGetBoolean(def);
4403 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
4404 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
4405 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
4406 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
4407 else if (strcmp(def->defname, "extensions") == 0)
4408 fpinfo->shippable_extensions =
4409 ExtractExtensionList(defGetString(def), false);
4410 else if (strcmp(def->defname, "fetch_size") == 0)
4411 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4412 }
4413 }
4414
4415 /*
4416 * Parse options from foreign table and apply them to fpinfo.
4417 *
4418 * New options might also require tweaking merge_fdw_options().
4419 */
4420 static void
apply_table_options(PgFdwRelationInfo * fpinfo)4421 apply_table_options(PgFdwRelationInfo *fpinfo)
4422 {
4423 ListCell *lc;
4424
4425 foreach(lc, fpinfo->table->options)
4426 {
4427 DefElem *def = (DefElem *) lfirst(lc);
4428
4429 if (strcmp(def->defname, "use_remote_estimate") == 0)
4430 fpinfo->use_remote_estimate = defGetBoolean(def);
4431 else if (strcmp(def->defname, "fetch_size") == 0)
4432 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4433 }
4434 }
4435
4436 /*
4437 * Merge FDW options from input relations into a new set of options for a join
4438 * or an upper rel.
4439 *
4440 * For a join relation, FDW-specific information about the inner and outer
4441 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
4442 * fpinfo_o provides the information for the input relation; fpinfo_i is
4443 * expected to NULL.
4444 */
4445 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)4446 merge_fdw_options(PgFdwRelationInfo *fpinfo,
4447 const PgFdwRelationInfo *fpinfo_o,
4448 const PgFdwRelationInfo *fpinfo_i)
4449 {
4450 /* We must always have fpinfo_o. */
4451 Assert(fpinfo_o);
4452
4453 /* fpinfo_i may be NULL, but if present the servers must both match. */
4454 Assert(!fpinfo_i ||
4455 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
4456
4457 /*
4458 * Copy the server specific FDW options. (For a join, both relations come
4459 * from the same server, so the server options should have the same value
4460 * for both relations.)
4461 */
4462 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4463 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4464 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
4465 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
4466 fpinfo->fetch_size = fpinfo_o->fetch_size;
4467
4468 /* Merge the table level options from either side of the join. */
4469 if (fpinfo_i)
4470 {
4471 /*
4472 * We'll prefer to use remote estimates for this join if any table
4473 * from either side of the join is using remote estimates. This is
4474 * most likely going to be preferred since they're already willing to
4475 * pay the price of a round trip to get the remote EXPLAIN. In any
4476 * case it's not entirely clear how we might otherwise handle this
4477 * best.
4478 */
4479 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4480 fpinfo_i->use_remote_estimate;
4481
4482 /*
4483 * Set fetch size to maximum of the joining sides, since we are
4484 * expecting the rows returned by the join to be proportional to the
4485 * relation sizes.
4486 */
4487 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
4488 }
4489 }
4490
4491 /*
4492 * postgresGetForeignJoinPaths
4493 * Add possible ForeignPath to joinrel, if join is safe to push down.
4494 */
4495 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)4496 postgresGetForeignJoinPaths(PlannerInfo *root,
4497 RelOptInfo *joinrel,
4498 RelOptInfo *outerrel,
4499 RelOptInfo *innerrel,
4500 JoinType jointype,
4501 JoinPathExtraData *extra)
4502 {
4503 PgFdwRelationInfo *fpinfo;
4504 ForeignPath *joinpath;
4505 double rows;
4506 int width;
4507 Cost startup_cost;
4508 Cost total_cost;
4509 Path *epq_path; /* Path to create plan to be executed when
4510 * EvalPlanQual gets triggered. */
4511
4512 /*
4513 * Skip if this join combination has been considered already.
4514 */
4515 if (joinrel->fdw_private)
4516 return;
4517
4518 /*
4519 * This code does not work for joins with lateral references, since those
4520 * must have parameterized paths, which we don't generate yet.
4521 */
4522 if (!bms_is_empty(joinrel->lateral_relids))
4523 return;
4524
4525 /*
4526 * Create unfinished PgFdwRelationInfo entry which is used to indicate
4527 * that the join relation is already considered, so that we won't waste
4528 * time in judging safety of join pushdown and adding the same paths again
4529 * if found safe. Once we know that this join can be pushed down, we fill
4530 * the entry.
4531 */
4532 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4533 fpinfo->pushdown_safe = false;
4534 joinrel->fdw_private = fpinfo;
4535 /* attrs_used is only for base relations. */
4536 fpinfo->attrs_used = NULL;
4537
4538 /*
4539 * If there is a possibility that EvalPlanQual will be executed, we need
4540 * to be able to reconstruct the row using scans of the base relations.
4541 * GetExistingLocalJoinPath will find a suitable path for this purpose in
4542 * the path list of the joinrel, if one exists. We must be careful to
4543 * call it before adding any ForeignPath, since the ForeignPath might
4544 * dominate the only suitable local path available. We also do it before
4545 * calling foreign_join_ok(), since that function updates fpinfo and marks
4546 * it as pushable if the join is found to be pushable.
4547 */
4548 if (root->parse->commandType == CMD_DELETE ||
4549 root->parse->commandType == CMD_UPDATE ||
4550 root->rowMarks)
4551 {
4552 epq_path = GetExistingLocalJoinPath(joinrel);
4553 if (!epq_path)
4554 {
4555 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4556 return;
4557 }
4558 }
4559 else
4560 epq_path = NULL;
4561
4562 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4563 {
4564 /* Free path required for EPQ if we copied one; we don't need it now */
4565 if (epq_path)
4566 pfree(epq_path);
4567 return;
4568 }
4569
4570 /*
4571 * Compute the selectivity and cost of the local_conds, so we don't have
4572 * to do it over again for each path. The best we can do for these
4573 * conditions is to estimate selectivity on the basis of local statistics.
4574 * The local conditions are applied after the join has been computed on
4575 * the remote side like quals in WHERE clause, so pass jointype as
4576 * JOIN_INNER.
4577 */
4578 fpinfo->local_conds_sel = clauselist_selectivity(root,
4579 fpinfo->local_conds,
4580 0,
4581 JOIN_INNER,
4582 NULL);
4583 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4584
4585 /*
4586 * If we are going to estimate costs locally, estimate the join clause
4587 * selectivity here while we have special join info.
4588 */
4589 if (!fpinfo->use_remote_estimate)
4590 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4591 0, fpinfo->jointype,
4592 extra->sjinfo);
4593
4594 /* Estimate costs for bare join relation */
4595 estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4596 &width, &startup_cost, &total_cost);
4597 /* Now update this information in the joinrel */
4598 joinrel->rows = rows;
4599 joinrel->reltarget->width = width;
4600 fpinfo->rows = rows;
4601 fpinfo->width = width;
4602 fpinfo->startup_cost = startup_cost;
4603 fpinfo->total_cost = total_cost;
4604
4605 /*
4606 * Create a new join path and add it to the joinrel which represents a
4607 * join between foreign tables.
4608 */
4609 joinpath = create_foreignscan_path(root,
4610 joinrel,
4611 NULL, /* default pathtarget */
4612 rows,
4613 startup_cost,
4614 total_cost,
4615 NIL, /* no pathkeys */
4616 joinrel->lateral_relids,
4617 epq_path,
4618 NIL); /* no fdw_private */
4619
4620 /* Add generated path into joinrel by add_path(). */
4621 add_path(joinrel, (Path *) joinpath);
4622
4623 /* Consider pathkeys for the join relation */
4624 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4625
4626 /* XXX Consider parameterized paths for the join relation */
4627 }
4628
4629 /*
4630 * Assess whether the aggregation, grouping and having operations can be pushed
4631 * down to the foreign server. As a side effect, save information we obtain in
4632 * this function to PgFdwRelationInfo of the input relation.
4633 */
4634 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel)4635 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel)
4636 {
4637 Query *query = root->parse;
4638 PathTarget *grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4639 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
4640 PgFdwRelationInfo *ofpinfo;
4641 ListCell *lc;
4642 int i;
4643 List *tlist = NIL;
4644
4645 /* We currently don't support pushing Grouping Sets. */
4646 if (query->groupingSets)
4647 return false;
4648
4649 /* Get the fpinfo of the underlying scan relation. */
4650 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
4651
4652 /*
4653 * If underlying scan relation has any local conditions, those conditions
4654 * are required to be applied before performing aggregation. Hence the
4655 * aggregate cannot be pushed down.
4656 */
4657 if (ofpinfo->local_conds)
4658 return false;
4659
4660 /*
4661 * Examine grouping expressions, as well as other expressions we'd need to
4662 * compute, and check whether they are safe to push down to the foreign
4663 * server. All GROUP BY expressions will be part of the grouping target
4664 * and thus there is no need to search for them separately. Add grouping
4665 * expressions into target list which will be passed to foreign server.
4666 *
4667 * A tricky fine point is that we must not put any expression into the
4668 * target list that is just a foreign param (that is, something that
4669 * deparse.c would conclude has to be sent to the foreign server). If we
4670 * do, the expression will also appear in the fdw_exprs list of the plan
4671 * node, and setrefs.c will get confused and decide that the fdw_exprs
4672 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
4673 * a broken plan. Somewhat oddly, it's OK if the expression contains such
4674 * a node, as long as it's not at top level; then no match is possible.
4675 */
4676 i = 0;
4677 foreach(lc, grouping_target->exprs)
4678 {
4679 Expr *expr = (Expr *) lfirst(lc);
4680 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
4681 ListCell *l;
4682
4683 /* Check whether this expression is part of GROUP BY clause */
4684 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
4685 {
4686 TargetEntry *tle;
4687
4688 /*
4689 * If any GROUP BY expression is not shippable, then we cannot
4690 * push down aggregation to the foreign server.
4691 */
4692 if (!is_foreign_expr(root, grouped_rel, expr))
4693 return false;
4694
4695 /*
4696 * If it would be a foreign param, we can't put it into the tlist,
4697 * so we have to fail.
4698 */
4699 if (is_foreign_param(root, grouped_rel, expr))
4700 return false;
4701
4702 /*
4703 * Pushable, so add to tlist. We need to create a TLE for this
4704 * expression and apply the sortgroupref to it. We cannot use
4705 * add_to_flat_tlist() here because that avoids making duplicate
4706 * entries in the tlist. If there are duplicate entries with
4707 * distinct sortgrouprefs, we have to duplicate that situation in
4708 * the output tlist.
4709 */
4710 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
4711 tle->ressortgroupref = sgref;
4712 tlist = lappend(tlist, tle);
4713 }
4714 else
4715 {
4716 /*
4717 * Non-grouping expression we need to compute. Can we ship it
4718 * as-is to the foreign server?
4719 */
4720 if (is_foreign_expr(root, grouped_rel, expr) &&
4721 !is_foreign_param(root, grouped_rel, expr))
4722 {
4723 /* Yes, so add to tlist as-is; OK to suppress duplicates */
4724 tlist = add_to_flat_tlist(tlist, list_make1(expr));
4725 }
4726 else
4727 {
4728 /* Not pushable as a whole; extract its Vars and aggregates */
4729 List *aggvars;
4730
4731 aggvars = pull_var_clause((Node *) expr,
4732 PVC_INCLUDE_AGGREGATES);
4733
4734 /*
4735 * If any aggregate expression is not shippable, then we
4736 * cannot push down aggregation to the foreign server. (We
4737 * don't have to check is_foreign_param, since that certainly
4738 * won't return true for any such expression.)
4739 */
4740 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
4741 return false;
4742
4743 /*
4744 * Add aggregates, if any, into the targetlist. Plain Vars
4745 * outside an aggregate can be ignored, because they should be
4746 * either same as some GROUP BY column or part of some GROUP
4747 * BY expression. In either case, they are already part of
4748 * the targetlist and thus no need to add them again. In fact
4749 * including plain Vars in the tlist when they do not match a
4750 * GROUP BY column would cause the foreign server to complain
4751 * that the shipped query is invalid.
4752 */
4753 foreach(l, aggvars)
4754 {
4755 Expr *expr = (Expr *) lfirst(l);
4756
4757 if (IsA(expr, Aggref))
4758 tlist = add_to_flat_tlist(tlist, list_make1(expr));
4759 }
4760 }
4761 }
4762
4763 i++;
4764 }
4765
4766 /*
4767 * Classify the pushable and non-pushable HAVING clauses and save them in
4768 * remote_conds and local_conds of the grouped rel's fpinfo.
4769 */
4770 if (root->hasHavingQual && query->havingQual)
4771 {
4772 ListCell *lc;
4773
4774 foreach(lc, (List *) query->havingQual)
4775 {
4776 Expr *expr = (Expr *) lfirst(lc);
4777 RestrictInfo *rinfo;
4778
4779 /*
4780 * Currently, the core code doesn't wrap havingQuals in
4781 * RestrictInfos, so we must make our own.
4782 */
4783 Assert(!IsA(expr, RestrictInfo));
4784 rinfo = make_restrictinfo(expr,
4785 true,
4786 false,
4787 false,
4788 root->qual_security_level,
4789 grouped_rel->relids,
4790 NULL,
4791 NULL);
4792 if (is_foreign_expr(root, grouped_rel, expr))
4793 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4794 else
4795 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4796 }
4797 }
4798
4799 /*
4800 * If there are any local conditions, pull Vars and aggregates from it and
4801 * check whether they are safe to pushdown or not.
4802 */
4803 if (fpinfo->local_conds)
4804 {
4805 List *aggvars = NIL;
4806 ListCell *lc;
4807
4808 foreach(lc, fpinfo->local_conds)
4809 {
4810 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4811
4812 aggvars = list_concat(aggvars,
4813 pull_var_clause((Node *) rinfo->clause,
4814 PVC_INCLUDE_AGGREGATES));
4815 }
4816
4817 foreach(lc, aggvars)
4818 {
4819 Expr *expr = (Expr *) lfirst(lc);
4820
4821 /*
4822 * If aggregates within local conditions are not safe to push
4823 * down, then we cannot push down the query. Vars are already
4824 * part of GROUP BY clause which are checked above, so no need to
4825 * access them again here. Again, we need not check
4826 * is_foreign_param for a foreign aggregate.
4827 */
4828 if (IsA(expr, Aggref))
4829 {
4830 if (!is_foreign_expr(root, grouped_rel, expr))
4831 return false;
4832
4833 tlist = add_to_flat_tlist(tlist, list_make1(expr));
4834 }
4835 }
4836 }
4837
4838 /* Store generated targetlist */
4839 fpinfo->grouped_tlist = tlist;
4840
4841 /* Safe to pushdown */
4842 fpinfo->pushdown_safe = true;
4843
4844 /*
4845 * Set cached relation costs to some negative value, so that we can detect
4846 * when they are set to some sensible costs, during one (usually the
4847 * first) of the calls to estimate_path_cost_size().
4848 */
4849 fpinfo->rel_startup_cost = -1;
4850 fpinfo->rel_total_cost = -1;
4851
4852 /*
4853 * Set the string describing this grouped relation to be used in EXPLAIN
4854 * output of corresponding ForeignScan.
4855 */
4856 fpinfo->relation_name = makeStringInfo();
4857 appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
4858 ofpinfo->relation_name->data);
4859
4860 return true;
4861 }
4862
4863 /*
4864 * postgresGetForeignUpperPaths
4865 * Add paths for post-join operations like aggregation, grouping etc. if
4866 * corresponding operations are safe to push down.
4867 *
4868 * Right now, we only support aggregate, grouping and having clause pushdown.
4869 */
4870 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel)4871 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
4872 RelOptInfo *input_rel, RelOptInfo *output_rel)
4873 {
4874 PgFdwRelationInfo *fpinfo;
4875
4876 /*
4877 * If input rel is not safe to pushdown, then simply return as we cannot
4878 * perform any post-join operations on the foreign server.
4879 */
4880 if (!input_rel->fdw_private ||
4881 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
4882 return;
4883
4884 /* Ignore stages we don't support; and skip any duplicate calls. */
4885 if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
4886 return;
4887
4888 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4889 fpinfo->pushdown_safe = false;
4890 output_rel->fdw_private = fpinfo;
4891
4892 add_foreign_grouping_paths(root, input_rel, output_rel);
4893 }
4894
4895 /*
4896 * add_foreign_grouping_paths
4897 * Add foreign path for grouping and/or aggregation.
4898 *
4899 * Given input_rel represents the underlying scan. The paths are added to the
4900 * given grouped_rel.
4901 */
4902 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel)4903 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
4904 RelOptInfo *grouped_rel)
4905 {
4906 Query *parse = root->parse;
4907 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
4908 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
4909 ForeignPath *grouppath;
4910 PathTarget *grouping_target;
4911 double rows;
4912 int width;
4913 Cost startup_cost;
4914 Cost total_cost;
4915
4916 /* Nothing to be done, if there is no grouping or aggregation required. */
4917 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
4918 !root->hasHavingQual)
4919 return;
4920
4921 grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4922
4923 /* save the input_rel as outerrel in fpinfo */
4924 fpinfo->outerrel = input_rel;
4925
4926 /*
4927 * Copy foreign table, foreign server, user mapping, FDW options etc.
4928 * details from the input relation's fpinfo.
4929 */
4930 fpinfo->table = ifpinfo->table;
4931 fpinfo->server = ifpinfo->server;
4932 fpinfo->user = ifpinfo->user;
4933 merge_fdw_options(fpinfo, ifpinfo, NULL);
4934
4935 /* Assess if it is safe to push down aggregation and grouping. */
4936 if (!foreign_grouping_ok(root, grouped_rel))
4937 return;
4938
4939 /* Estimate the cost of push down */
4940 estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
4941 &width, &startup_cost, &total_cost);
4942
4943 /* Now update this information in the fpinfo */
4944 fpinfo->rows = rows;
4945 fpinfo->width = width;
4946 fpinfo->startup_cost = startup_cost;
4947 fpinfo->total_cost = total_cost;
4948
4949 /* Create and add foreign path to the grouping relation. */
4950 grouppath = create_foreignscan_path(root,
4951 grouped_rel,
4952 grouping_target,
4953 rows,
4954 startup_cost,
4955 total_cost,
4956 NIL, /* no pathkeys */
4957 grouped_rel->lateral_relids,
4958 NULL,
4959 NIL); /* no fdw_private */
4960
4961 /* Add generated path into grouped_rel by add_path(). */
4962 add_path(grouped_rel, (Path *) grouppath);
4963 }
4964
4965 /*
4966 * Create a tuple from the specified row of the PGresult.
4967 *
4968 * rel is the local representation of the foreign table, attinmeta is
4969 * conversion data for the rel's tupdesc, and retrieved_attrs is an
4970 * integer list of the table column numbers present in the PGresult.
4971 * fsstate is the ForeignScan plan node's execution state.
4972 * temp_context is a working context that can be reset after each tuple.
4973 *
4974 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
4975 * if we're processing a remote join, while fsstate is NULL in a non-query
4976 * context such as ANALYZE, or if we're processing a non-scan query node.
4977 */
4978 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)4979 make_tuple_from_result_row(PGresult *res,
4980 int row,
4981 Relation rel,
4982 AttInMetadata *attinmeta,
4983 List *retrieved_attrs,
4984 ForeignScanState *fsstate,
4985 MemoryContext temp_context)
4986 {
4987 HeapTuple tuple;
4988 TupleDesc tupdesc;
4989 Datum *values;
4990 bool *nulls;
4991 ItemPointer ctid = NULL;
4992 Oid oid = InvalidOid;
4993 ConversionLocation errpos;
4994 ErrorContextCallback errcallback;
4995 MemoryContext oldcontext;
4996 ListCell *lc;
4997 int j;
4998
4999 Assert(row < PQntuples(res));
5000
5001 /*
5002 * Do the following work in a temp context that we reset after each tuple.
5003 * This cleans up not only the data we have direct access to, but any
5004 * cruft the I/O functions might leak.
5005 */
5006 oldcontext = MemoryContextSwitchTo(temp_context);
5007
5008 /*
5009 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
5010 * provided, otherwise look to the scan node's ScanTupleSlot.
5011 */
5012 if (rel)
5013 tupdesc = RelationGetDescr(rel);
5014 else
5015 {
5016 PgFdwScanState *fdw_sstate;
5017
5018 Assert(fsstate);
5019 fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
5020 tupdesc = fdw_sstate->tupdesc;
5021 }
5022
5023 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
5024 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
5025 /* Initialize to nulls for any columns not present in result */
5026 memset(nulls, true, tupdesc->natts * sizeof(bool));
5027
5028 /*
5029 * Set up and install callback to report where conversion error occurs.
5030 */
5031 errpos.cur_attno = 0;
5032 errpos.rel = rel;
5033 errpos.fsstate = fsstate;
5034 errcallback.callback = conversion_error_callback;
5035 errcallback.arg = (void *) &errpos;
5036 errcallback.previous = error_context_stack;
5037 error_context_stack = &errcallback;
5038
5039 /*
5040 * i indexes columns in the relation, j indexes columns in the PGresult.
5041 */
5042 j = 0;
5043 foreach(lc, retrieved_attrs)
5044 {
5045 int i = lfirst_int(lc);
5046 char *valstr;
5047
5048 /* fetch next column's textual value */
5049 if (PQgetisnull(res, row, j))
5050 valstr = NULL;
5051 else
5052 valstr = PQgetvalue(res, row, j);
5053
5054 /*
5055 * convert value to internal representation
5056 *
5057 * Note: we ignore system columns other than ctid and oid in result
5058 */
5059 errpos.cur_attno = i;
5060 if (i > 0)
5061 {
5062 /* ordinary column */
5063 Assert(i <= tupdesc->natts);
5064 nulls[i - 1] = (valstr == NULL);
5065 /* Apply the input function even to nulls, to support domains */
5066 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
5067 valstr,
5068 attinmeta->attioparams[i - 1],
5069 attinmeta->atttypmods[i - 1]);
5070 }
5071 else if (i == SelfItemPointerAttributeNumber)
5072 {
5073 /* ctid */
5074 if (valstr != NULL)
5075 {
5076 Datum datum;
5077
5078 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5079 ctid = (ItemPointer) DatumGetPointer(datum);
5080 }
5081 }
5082 else if (i == ObjectIdAttributeNumber)
5083 {
5084 /* oid */
5085 if (valstr != NULL)
5086 {
5087 Datum datum;
5088
5089 datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5090 oid = DatumGetObjectId(datum);
5091 }
5092 }
5093 errpos.cur_attno = 0;
5094
5095 j++;
5096 }
5097
5098 /* Uninstall error context callback. */
5099 error_context_stack = errcallback.previous;
5100
5101 /*
5102 * Check we got the expected number of columns. Note: j == 0 and
5103 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5104 */
5105 if (j > 0 && j != PQnfields(res))
5106 elog(ERROR, "remote query result does not match the foreign table");
5107
5108 /*
5109 * Build the result tuple in caller's memory context.
5110 */
5111 MemoryContextSwitchTo(oldcontext);
5112
5113 tuple = heap_form_tuple(tupdesc, values, nulls);
5114
5115 /*
5116 * If we have a CTID to return, install it in both t_self and t_ctid.
5117 * t_self is the normal place, but if the tuple is converted to a
5118 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5119 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5120 */
5121 if (ctid)
5122 tuple->t_self = tuple->t_data->t_ctid = *ctid;
5123
5124 /*
5125 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5126 * heap_form_tuple. heap_form_tuple actually creates the tuple with
5127 * DatumTupleFields, not HeapTupleFields, but the executor expects
5128 * HeapTupleFields and will happily extract system columns on that
5129 * assumption. If we don't do this then, for example, the tuple length
5130 * ends up in the xmin field, which isn't what we want.
5131 */
5132 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
5133 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
5134 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
5135
5136 /*
5137 * If we have an OID to return, install it.
5138 */
5139 if (OidIsValid(oid))
5140 HeapTupleSetOid(tuple, oid);
5141
5142 /* Clean up */
5143 MemoryContextReset(temp_context);
5144
5145 return tuple;
5146 }
5147
5148 /*
5149 * Callback function which is called when error occurs during column value
5150 * conversion. Print names of column and relation.
5151 *
5152 * Note that this function mustn't do any catalog lookups, since we are in
5153 * an already-failed transaction. Fortunately, we can get the needed info
5154 * from the relation or the query's rangetable instead.
5155 */
5156 static void
conversion_error_callback(void * arg)5157 conversion_error_callback(void *arg)
5158 {
5159 ConversionLocation *errpos = (ConversionLocation *) arg;
5160 Relation rel = errpos->rel;
5161 ForeignScanState *fsstate = errpos->fsstate;
5162 const char *attname = NULL;
5163 const char *relname = NULL;
5164 bool is_wholerow = false;
5165
5166 /*
5167 * If we're in a scan node, always use aliases from the rangetable, for
5168 * consistency between the simple-relation and remote-join cases. Look at
5169 * the relation's tupdesc only if we're not in a scan node.
5170 */
5171 if (fsstate)
5172 {
5173 /* ForeignScan case */
5174 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5175 int varno = 0;
5176 AttrNumber colno = 0;
5177
5178 if (fsplan->scan.scanrelid > 0)
5179 {
5180 /* error occurred in a scan against a foreign table */
5181 varno = fsplan->scan.scanrelid;
5182 colno = errpos->cur_attno;
5183 }
5184 else
5185 {
5186 /* error occurred in a scan against a foreign join */
5187 TargetEntry *tle;
5188
5189 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
5190 errpos->cur_attno - 1);
5191
5192 /*
5193 * Target list can have Vars and expressions. For Vars, we can
5194 * get some information, however for expressions we can't. Thus
5195 * for expressions, just show generic context message.
5196 */
5197 if (IsA(tle->expr, Var))
5198 {
5199 Var *var = (Var *) tle->expr;
5200
5201 varno = var->varno;
5202 colno = var->varattno;
5203 }
5204 }
5205
5206 if (varno > 0)
5207 {
5208 EState *estate = fsstate->ss.ps.state;
5209 RangeTblEntry *rte = rt_fetch(varno, estate->es_range_table);
5210
5211 relname = rte->eref->aliasname;
5212
5213 if (colno == 0)
5214 is_wholerow = true;
5215 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
5216 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
5217 else if (colno == SelfItemPointerAttributeNumber)
5218 attname = "ctid";
5219 else if (colno == ObjectIdAttributeNumber)
5220 attname = "oid";
5221 }
5222 }
5223 else if (rel)
5224 {
5225 /* Non-ForeignScan case (we should always have a rel here) */
5226 TupleDesc tupdesc = RelationGetDescr(rel);
5227
5228 relname = RelationGetRelationName(rel);
5229 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5230 {
5231 Form_pg_attribute attr = TupleDescAttr(tupdesc,
5232 errpos->cur_attno - 1);
5233
5234 attname = NameStr(attr->attname);
5235 }
5236 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5237 attname = "ctid";
5238 }
5239
5240 if (relname && is_wholerow)
5241 errcontext("whole-row reference to foreign table \"%s\"", relname);
5242 else if (relname && attname)
5243 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5244 else
5245 errcontext("processing expression at position %d in select list",
5246 errpos->cur_attno);
5247 }
5248
5249 /*
5250 * Find an equivalence class member expression, all of whose Vars, come from
5251 * the indicated relation.
5252 */
5253 extern Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)5254 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
5255 {
5256 ListCell *lc_em;
5257
5258 foreach(lc_em, ec->ec_members)
5259 {
5260 EquivalenceMember *em = lfirst(lc_em);
5261
5262 if (bms_is_subset(em->em_relids, rel->relids) &&
5263 !bms_is_empty(em->em_relids))
5264 {
5265 /*
5266 * If there is more than one equivalence member whose Vars are
5267 * taken entirely from this relation, we'll be content to choose
5268 * any one of those.
5269 */
5270 return em->em_expr;
5271 }
5272 }
5273
5274 /* We didn't find any suitable equivalence class expression */
5275 return NULL;
5276 }
5277