1 /*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2016, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "postgres_fdw.h"
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
23 #include "funcapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/pathnode.h"
29 #include "optimizer/paths.h"
30 #include "optimizer/planmain.h"
31 #include "optimizer/restrictinfo.h"
32 #include "optimizer/var.h"
33 #include "optimizer/tlist.h"
34 #include "parser/parsetree.h"
35 #include "utils/builtins.h"
36 #include "utils/guc.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
39 #include "utils/rel.h"
40 #include "utils/sampling.h"
41
42 PG_MODULE_MAGIC;
43
44 /* Default CPU cost to start up a foreign query. */
45 #define DEFAULT_FDW_STARTUP_COST 100.0
46
47 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
48 #define DEFAULT_FDW_TUPLE_COST 0.01
49
50 /* If no remote estimates, assume a sort costs 20% extra */
51 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
52
53 /*
54 * Indexes of FDW-private information stored in fdw_private lists.
55 *
56 * These items are indexed with the enum FdwScanPrivateIndex, so an item
57 * can be fetched with list_nth(). For example, to get the SELECT statement:
58 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
59 */
60 enum FdwScanPrivateIndex
61 {
62 /* SQL statement to execute remotely (as a String node) */
63 FdwScanPrivateSelectSql,
64 /* List of restriction clauses that can be executed remotely */
65 FdwScanPrivateRemoteConds,
66 /* Integer list of attribute numbers retrieved by the SELECT */
67 FdwScanPrivateRetrievedAttrs,
68 /* Integer representing the desired fetch_size */
69 FdwScanPrivateFetchSize,
70
71 /*
72 * String describing join i.e. names of relations being joined and types
73 * of join, added when the scan is join
74 */
75 FdwScanPrivateRelations
76 };
77
78 /*
79 * Similarly, this enum describes what's kept in the fdw_private list for
80 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
81 *
82 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
83 * 2) Integer list of target attribute numbers for INSERT/UPDATE
84 * (NIL for a DELETE)
85 * 3) Boolean flag showing if the remote query has a RETURNING clause
86 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
87 */
88 enum FdwModifyPrivateIndex
89 {
90 /* SQL statement to execute remotely (as a String node) */
91 FdwModifyPrivateUpdateSql,
92 /* Integer list of target attribute numbers for INSERT/UPDATE */
93 FdwModifyPrivateTargetAttnums,
94 /* has-returning flag (as an integer Value node) */
95 FdwModifyPrivateHasReturning,
96 /* Integer list of attribute numbers retrieved by RETURNING */
97 FdwModifyPrivateRetrievedAttrs
98 };
99
100 /*
101 * Similarly, this enum describes what's kept in the fdw_private list for
102 * a ForeignScan node that modifies a foreign table directly. We store:
103 *
104 * 1) UPDATE/DELETE statement text to be sent to the remote server
105 * 2) Boolean flag showing if the remote query has a RETURNING clause
106 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
107 * 4) Boolean flag showing if we set the command es_processed
108 */
109 enum FdwDirectModifyPrivateIndex
110 {
111 /* SQL statement to execute remotely (as a String node) */
112 FdwDirectModifyPrivateUpdateSql,
113 /* has-returning flag (as an integer Value node) */
114 FdwDirectModifyPrivateHasReturning,
115 /* Integer list of attribute numbers retrieved by RETURNING */
116 FdwDirectModifyPrivateRetrievedAttrs,
117 /* set-processed flag (as an integer Value node) */
118 FdwDirectModifyPrivateSetProcessed
119 };
120
121 /*
122 * Execution state of a foreign scan using postgres_fdw.
123 */
124 typedef struct PgFdwScanState
125 {
126 Relation rel; /* relcache entry for the foreign table. NULL
127 * for a foreign join scan. */
128 TupleDesc tupdesc; /* tuple descriptor of scan */
129 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
130
131 /* extracted fdw_private data */
132 char *query; /* text of SELECT command */
133 List *retrieved_attrs; /* list of retrieved attribute numbers */
134
135 /* for remote query execution */
136 PGconn *conn; /* connection for the scan */
137 unsigned int cursor_number; /* quasi-unique ID for my cursor */
138 bool cursor_exists; /* have we created the cursor? */
139 int numParams; /* number of parameters passed to query */
140 FmgrInfo *param_flinfo; /* output conversion functions for them */
141 List *param_exprs; /* executable expressions for param values */
142 const char **param_values; /* textual values of query parameters */
143
144 /* for storing result tuples */
145 HeapTuple *tuples; /* array of currently-retrieved tuples */
146 int num_tuples; /* # of tuples in array */
147 int next_tuple; /* index of next one to return */
148
149 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
150 int fetch_ct_2; /* Min(# of fetches done, 2) */
151 bool eof_reached; /* true if last fetch reached EOF */
152
153 /* working memory contexts */
154 MemoryContext batch_cxt; /* context holding current batch of tuples */
155 MemoryContext temp_cxt; /* context for per-tuple temporary data */
156
157 int fetch_size; /* number of tuples per fetch */
158 } PgFdwScanState;
159
160 /*
161 * Execution state of a foreign insert/update/delete operation.
162 */
163 typedef struct PgFdwModifyState
164 {
165 Relation rel; /* relcache entry for the foreign table */
166 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
167
168 /* for remote query execution */
169 PGconn *conn; /* connection for the scan */
170 char *p_name; /* name of prepared statement, if created */
171
172 /* extracted fdw_private data */
173 char *query; /* text of INSERT/UPDATE/DELETE command */
174 List *target_attrs; /* list of target attribute numbers */
175 bool has_returning; /* is there a RETURNING clause? */
176 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
177
178 /* info about parameters for prepared statement */
179 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
180 int p_nums; /* number of parameters to transmit */
181 FmgrInfo *p_flinfo; /* output conversion functions for them */
182
183 /* working memory context */
184 MemoryContext temp_cxt; /* context for per-tuple temporary data */
185 } PgFdwModifyState;
186
187 /*
188 * Execution state of a foreign scan that modifies a foreign table directly.
189 */
190 typedef struct PgFdwDirectModifyState
191 {
192 Relation rel; /* relcache entry for the foreign table */
193 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
194
195 /* extracted fdw_private data */
196 char *query; /* text of UPDATE/DELETE command */
197 bool has_returning; /* is there a RETURNING clause? */
198 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
199 bool set_processed; /* do we set the command es_processed? */
200
201 /* for remote query execution */
202 PGconn *conn; /* connection for the update */
203 int numParams; /* number of parameters passed to query */
204 FmgrInfo *param_flinfo; /* output conversion functions for them */
205 List *param_exprs; /* executable expressions for param values */
206 const char **param_values; /* textual values of query parameters */
207
208 /* for storing result tuples */
209 PGresult *result; /* result for query */
210 int num_tuples; /* # of result tuples */
211 int next_tuple; /* index of next one to return */
212
213 /* working memory context */
214 MemoryContext temp_cxt; /* context for per-tuple temporary data */
215 } PgFdwDirectModifyState;
216
217 /*
218 * Workspace for analyzing a foreign table.
219 */
220 typedef struct PgFdwAnalyzeState
221 {
222 Relation rel; /* relcache entry for the foreign table */
223 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
224 List *retrieved_attrs; /* attr numbers retrieved by query */
225
226 /* collected sample rows */
227 HeapTuple *rows; /* array of size targrows */
228 int targrows; /* target # of sample rows */
229 int numrows; /* # of sample rows collected */
230
231 /* for random sampling */
232 double samplerows; /* # of rows fetched */
233 double rowstoskip; /* # of rows to skip before next sample */
234 ReservoirStateData rstate; /* state for reservoir sampling */
235
236 /* working memory contexts */
237 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
238 MemoryContext temp_cxt; /* context for per-tuple temporary data */
239 } PgFdwAnalyzeState;
240
241 /*
242 * Identify the attribute where data conversion fails.
243 */
244 typedef struct ConversionLocation
245 {
246 AttrNumber cur_attno; /* attribute number being processed, or 0 */
247 Relation rel; /* foreign table being processed, or NULL */
248 ForeignScanState *fsstate; /* plan node being processed, or NULL */
249 } ConversionLocation;
250
251 /* Callback argument for ec_member_matches_foreign */
252 typedef struct
253 {
254 Expr *current; /* current expr, or NULL if not yet found */
255 List *already_used; /* expressions already dealt with */
256 } ec_member_foreign_arg;
257
258 /*
259 * SQL functions
260 */
261 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
262
263 /*
264 * FDW callback routines
265 */
266 static void postgresGetForeignRelSize(PlannerInfo *root,
267 RelOptInfo *baserel,
268 Oid foreigntableid);
269 static void postgresGetForeignPaths(PlannerInfo *root,
270 RelOptInfo *baserel,
271 Oid foreigntableid);
272 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
273 RelOptInfo *baserel,
274 Oid foreigntableid,
275 ForeignPath *best_path,
276 List *tlist,
277 List *scan_clauses,
278 Plan *outer_plan);
279 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
280 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
281 static void postgresReScanForeignScan(ForeignScanState *node);
282 static void postgresEndForeignScan(ForeignScanState *node);
283 static void postgresAddForeignUpdateTargets(Query *parsetree,
284 RangeTblEntry *target_rte,
285 Relation target_relation);
286 static List *postgresPlanForeignModify(PlannerInfo *root,
287 ModifyTable *plan,
288 Index resultRelation,
289 int subplan_index);
290 static void postgresBeginForeignModify(ModifyTableState *mtstate,
291 ResultRelInfo *resultRelInfo,
292 List *fdw_private,
293 int subplan_index,
294 int eflags);
295 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
296 ResultRelInfo *resultRelInfo,
297 TupleTableSlot *slot,
298 TupleTableSlot *planSlot);
299 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
300 ResultRelInfo *resultRelInfo,
301 TupleTableSlot *slot,
302 TupleTableSlot *planSlot);
303 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
304 ResultRelInfo *resultRelInfo,
305 TupleTableSlot *slot,
306 TupleTableSlot *planSlot);
307 static void postgresEndForeignModify(EState *estate,
308 ResultRelInfo *resultRelInfo);
309 static int postgresIsForeignRelUpdatable(Relation rel);
310 static bool postgresPlanDirectModify(PlannerInfo *root,
311 ModifyTable *plan,
312 Index resultRelation,
313 int subplan_index);
314 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
315 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
316 static void postgresEndDirectModify(ForeignScanState *node);
317 static void postgresExplainForeignScan(ForeignScanState *node,
318 ExplainState *es);
319 static void postgresExplainForeignModify(ModifyTableState *mtstate,
320 ResultRelInfo *rinfo,
321 List *fdw_private,
322 int subplan_index,
323 ExplainState *es);
324 static void postgresExplainDirectModify(ForeignScanState *node,
325 ExplainState *es);
326 static bool postgresAnalyzeForeignTable(Relation relation,
327 AcquireSampleRowsFunc *func,
328 BlockNumber *totalpages);
329 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
330 Oid serverOid);
331 static void postgresGetForeignJoinPaths(PlannerInfo *root,
332 RelOptInfo *joinrel,
333 RelOptInfo *outerrel,
334 RelOptInfo *innerrel,
335 JoinType jointype,
336 JoinPathExtraData *extra);
337 static bool postgresRecheckForeignScan(ForeignScanState *node,
338 TupleTableSlot *slot);
339
340 /*
341 * Helper functions
342 */
343 static void estimate_path_cost_size(PlannerInfo *root,
344 RelOptInfo *baserel,
345 List *join_conds,
346 List *pathkeys,
347 double *p_rows, int *p_width,
348 Cost *p_startup_cost, Cost *p_total_cost);
349 static void get_remote_estimate(const char *sql,
350 PGconn *conn,
351 double *rows,
352 int *width,
353 Cost *startup_cost,
354 Cost *total_cost);
355 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
356 EquivalenceClass *ec, EquivalenceMember *em,
357 void *arg);
358 static void create_cursor(ForeignScanState *node);
359 static void fetch_more_data(ForeignScanState *node);
360 static void close_cursor(PGconn *conn, unsigned int cursor_number);
361 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
362 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
363 ItemPointer tupleid,
364 TupleTableSlot *slot);
365 static void store_returning_result(PgFdwModifyState *fmstate,
366 TupleTableSlot *slot, PGresult *res);
367 static void execute_dml_stmt(ForeignScanState *node);
368 static TupleTableSlot *get_returning_data(ForeignScanState *node);
369 static void prepare_query_params(PlanState *node,
370 List *fdw_exprs,
371 int numParams,
372 FmgrInfo **param_flinfo,
373 List **param_exprs,
374 const char ***param_values);
375 static void process_query_params(ExprContext *econtext,
376 FmgrInfo *param_flinfo,
377 List *param_exprs,
378 const char **param_values);
379 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
380 HeapTuple *rows, int targrows,
381 double *totalrows,
382 double *totaldeadrows);
383 static void analyze_row_processor(PGresult *res, int row,
384 PgFdwAnalyzeState *astate);
385 static HeapTuple make_tuple_from_result_row(PGresult *res,
386 int row,
387 Relation rel,
388 AttInMetadata *attinmeta,
389 List *retrieved_attrs,
390 ForeignScanState *fsstate,
391 MemoryContext temp_context);
392 static void conversion_error_callback(void *arg);
393 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
394 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
395 JoinPathExtraData *extra);
396 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
397 RelOptInfo *rel);
398 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
399 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
400 Path *epq_path);
401
402
403 /*
404 * Foreign-data wrapper handler function: return a struct with pointers
405 * to my callback routines.
406 */
407 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)408 postgres_fdw_handler(PG_FUNCTION_ARGS)
409 {
410 FdwRoutine *routine = makeNode(FdwRoutine);
411
412 /* Functions for scanning foreign tables */
413 routine->GetForeignRelSize = postgresGetForeignRelSize;
414 routine->GetForeignPaths = postgresGetForeignPaths;
415 routine->GetForeignPlan = postgresGetForeignPlan;
416 routine->BeginForeignScan = postgresBeginForeignScan;
417 routine->IterateForeignScan = postgresIterateForeignScan;
418 routine->ReScanForeignScan = postgresReScanForeignScan;
419 routine->EndForeignScan = postgresEndForeignScan;
420
421 /* Functions for updating foreign tables */
422 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
423 routine->PlanForeignModify = postgresPlanForeignModify;
424 routine->BeginForeignModify = postgresBeginForeignModify;
425 routine->ExecForeignInsert = postgresExecForeignInsert;
426 routine->ExecForeignUpdate = postgresExecForeignUpdate;
427 routine->ExecForeignDelete = postgresExecForeignDelete;
428 routine->EndForeignModify = postgresEndForeignModify;
429 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
430 routine->PlanDirectModify = postgresPlanDirectModify;
431 routine->BeginDirectModify = postgresBeginDirectModify;
432 routine->IterateDirectModify = postgresIterateDirectModify;
433 routine->EndDirectModify = postgresEndDirectModify;
434
435 /* Function for EvalPlanQual rechecks */
436 routine->RecheckForeignScan = postgresRecheckForeignScan;
437 /* Support functions for EXPLAIN */
438 routine->ExplainForeignScan = postgresExplainForeignScan;
439 routine->ExplainForeignModify = postgresExplainForeignModify;
440 routine->ExplainDirectModify = postgresExplainDirectModify;
441
442 /* Support functions for ANALYZE */
443 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
444
445 /* Support functions for IMPORT FOREIGN SCHEMA */
446 routine->ImportForeignSchema = postgresImportForeignSchema;
447
448 /* Support functions for join push-down */
449 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
450
451 PG_RETURN_POINTER(routine);
452 }
453
454 /*
455 * postgresGetForeignRelSize
456 * Estimate # of rows and width of the result of the scan
457 *
458 * We should consider the effect of all baserestrictinfo clauses here, but
459 * not any join clauses.
460 */
461 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)462 postgresGetForeignRelSize(PlannerInfo *root,
463 RelOptInfo *baserel,
464 Oid foreigntableid)
465 {
466 PgFdwRelationInfo *fpinfo;
467 ListCell *lc;
468 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
469 const char *namespace;
470 const char *relname;
471 const char *refname;
472
473 /*
474 * We use PgFdwRelationInfo to pass various information to subsequent
475 * functions.
476 */
477 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
478 baserel->fdw_private = (void *) fpinfo;
479
480 /* Base foreign tables need to be push down always. */
481 fpinfo->pushdown_safe = true;
482
483 /* Look up foreign-table catalog info. */
484 fpinfo->table = GetForeignTable(foreigntableid);
485 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
486
487 /*
488 * Extract user-settable option values. Note that per-table settings of
489 * use_remote_estimate and fetch_size override per-server settings of
490 * them, respectively.
491 */
492 fpinfo->use_remote_estimate = false;
493 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
494 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
495 fpinfo->shippable_extensions = NIL;
496 fpinfo->fetch_size = 100;
497
498 foreach(lc, fpinfo->server->options)
499 {
500 DefElem *def = (DefElem *) lfirst(lc);
501
502 if (strcmp(def->defname, "use_remote_estimate") == 0)
503 fpinfo->use_remote_estimate = defGetBoolean(def);
504 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
505 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
506 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
507 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
508 else if (strcmp(def->defname, "extensions") == 0)
509 fpinfo->shippable_extensions =
510 ExtractExtensionList(defGetString(def), false);
511 else if (strcmp(def->defname, "fetch_size") == 0)
512 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
513 }
514 foreach(lc, fpinfo->table->options)
515 {
516 DefElem *def = (DefElem *) lfirst(lc);
517
518 if (strcmp(def->defname, "use_remote_estimate") == 0)
519 fpinfo->use_remote_estimate = defGetBoolean(def);
520 else if (strcmp(def->defname, "fetch_size") == 0)
521 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
522 }
523
524 /*
525 * If the table or the server is configured to use remote estimates,
526 * identify which user to do remote access as during planning. This
527 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
528 * permissions, the query would have failed at runtime anyway.
529 */
530 if (fpinfo->use_remote_estimate)
531 {
532 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
533
534 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
535 }
536 else
537 fpinfo->user = NULL;
538
539 /*
540 * Identify which baserestrictinfo clauses can be sent to the remote
541 * server and which can't.
542 */
543 classifyConditions(root, baserel, baserel->baserestrictinfo,
544 &fpinfo->remote_conds, &fpinfo->local_conds);
545
546 /*
547 * Identify which attributes will need to be retrieved from the remote
548 * server. These include all attrs needed for joins or final output, plus
549 * all attrs used in the local_conds. (Note: if we end up using a
550 * parameterized scan, it's possible that some of the join clauses will be
551 * sent to the remote and thus we wouldn't really need to retrieve the
552 * columns used in them. Doesn't seem worth detecting that case though.)
553 */
554 fpinfo->attrs_used = NULL;
555 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
556 &fpinfo->attrs_used);
557 foreach(lc, fpinfo->local_conds)
558 {
559 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
560
561 pull_varattnos((Node *) rinfo->clause, baserel->relid,
562 &fpinfo->attrs_used);
563 }
564
565 /*
566 * Compute the selectivity and cost of the local_conds, so we don't have
567 * to do it over again for each path. The best we can do for these
568 * conditions is to estimate selectivity on the basis of local statistics.
569 */
570 fpinfo->local_conds_sel = clauselist_selectivity(root,
571 fpinfo->local_conds,
572 baserel->relid,
573 JOIN_INNER,
574 NULL);
575
576 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
577
578 /*
579 * Set cached relation costs to some negative value, so that we can detect
580 * when they are set to some sensible costs during one (usually the first)
581 * of the calls to estimate_path_cost_size().
582 */
583 fpinfo->rel_startup_cost = -1;
584 fpinfo->rel_total_cost = -1;
585
586 /*
587 * If the table or the server is configured to use remote estimates,
588 * connect to the foreign server and execute EXPLAIN to estimate the
589 * number of rows selected by the restriction clauses, as well as the
590 * average row width. Otherwise, estimate using whatever statistics we
591 * have locally, in a way similar to ordinary tables.
592 */
593 if (fpinfo->use_remote_estimate)
594 {
595 /*
596 * Get cost/size estimates with help of remote server. Save the
597 * values in fpinfo so we don't need to do it again to generate the
598 * basic foreign path.
599 */
600 estimate_path_cost_size(root, baserel, NIL, NIL,
601 &fpinfo->rows, &fpinfo->width,
602 &fpinfo->startup_cost, &fpinfo->total_cost);
603
604 /* Report estimated baserel size to planner. */
605 baserel->rows = fpinfo->rows;
606 baserel->reltarget->width = fpinfo->width;
607 }
608 else
609 {
610 /*
611 * If the foreign table has never been ANALYZEd, it will have relpages
612 * and reltuples equal to zero, which most likely has nothing to do
613 * with reality. We can't do a whole lot about that if we're not
614 * allowed to consult the remote server, but we can use a hack similar
615 * to plancat.c's treatment of empty relations: use a minimum size
616 * estimate of 10 pages, and divide by the column-datatype-based width
617 * estimate to get the corresponding number of tuples.
618 */
619 if (baserel->pages == 0 && baserel->tuples == 0)
620 {
621 baserel->pages = 10;
622 baserel->tuples =
623 (10 * BLCKSZ) / (baserel->reltarget->width +
624 MAXALIGN(SizeofHeapTupleHeader));
625 }
626
627 /* Estimate baserel size as best we can with local statistics. */
628 set_baserel_size_estimates(root, baserel);
629
630 /* Fill in basically-bogus cost estimates for use later. */
631 estimate_path_cost_size(root, baserel, NIL, NIL,
632 &fpinfo->rows, &fpinfo->width,
633 &fpinfo->startup_cost, &fpinfo->total_cost);
634 }
635
636 /*
637 * Set the name of relation in fpinfo, while we are constructing it here.
638 * It will be used to build the string describing the join relation in
639 * EXPLAIN output. We can't know whether VERBOSE option is specified or
640 * not, so always schema-qualify the foreign table name.
641 */
642 fpinfo->relation_name = makeStringInfo();
643 namespace = get_namespace_name(get_rel_namespace(foreigntableid));
644 relname = get_rel_name(foreigntableid);
645 refname = rte->eref->aliasname;
646 appendStringInfo(fpinfo->relation_name, "%s.%s",
647 quote_identifier(namespace),
648 quote_identifier(relname));
649 if (*refname && strcmp(refname, relname) != 0)
650 appendStringInfo(fpinfo->relation_name, " %s",
651 quote_identifier(rte->eref->aliasname));
652 }
653
654 /*
655 * get_useful_ecs_for_relation
656 * Determine which EquivalenceClasses might be involved in useful
657 * orderings of this relation.
658 *
659 * This function is in some respects a mirror image of the core function
660 * pathkeys_useful_for_merging: for a regular table, we know what indexes
661 * we have and want to test whether any of them are useful. For a foreign
662 * table, we don't know what indexes are present on the remote side but
663 * want to speculate about which ones we'd like to use if they existed.
664 *
665 * This function returns a list of potentially-useful equivalence classes,
666 * but it does not guarantee that an EquivalenceMember exists which contains
667 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
668 * ft1.x + t1.x = 0, this function will say that the equivalence class
669 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
670 * t1 is local (or on a different server), it will turn out that no useful
671 * ORDER BY clause can be generated. It's not our job to figure that out
672 * here; we're only interested in identifying relevant ECs.
673 */
674 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)675 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
676 {
677 List *useful_eclass_list = NIL;
678 ListCell *lc;
679 Relids relids;
680
681 /*
682 * First, consider whether any active EC is potentially useful for a merge
683 * join against this relation.
684 */
685 if (rel->has_eclass_joins)
686 {
687 foreach(lc, root->eq_classes)
688 {
689 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
690
691 if (eclass_useful_for_merging(root, cur_ec, rel))
692 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
693 }
694 }
695
696 /*
697 * Next, consider whether there are any non-EC derivable join clauses that
698 * are merge-joinable. If the joininfo list is empty, we can exit
699 * quickly.
700 */
701 if (rel->joininfo == NIL)
702 return useful_eclass_list;
703
704 /* If this is a child rel, we must use the topmost parent rel to search. */
705 if (rel->reloptkind == RELOPT_OTHER_MEMBER_REL)
706 relids = find_childrel_top_parent(root, rel)->relids;
707 else
708 relids = rel->relids;
709
710 /* Check each join clause in turn. */
711 foreach(lc, rel->joininfo)
712 {
713 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
714
715 /* Consider only mergejoinable clauses */
716 if (restrictinfo->mergeopfamilies == NIL)
717 continue;
718
719 /* Make sure we've got canonical ECs. */
720 update_mergeclause_eclasses(root, restrictinfo);
721
722 /*
723 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
724 * that left_ec and right_ec will be initialized, per comments in
725 * distribute_qual_to_rels.
726 *
727 * We want to identify which side of this merge-joinable clause
728 * contains columns from the relation produced by this RelOptInfo. We
729 * test for overlap, not containment, because there could be extra
730 * relations on either side. For example, suppose we've got something
731 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
732 * A.y = D.y. The input rel might be the joinrel between A and B, and
733 * we'll consider the join clause A.y = D.y. relids contains a
734 * relation not involved in the join class (B) and the equivalence
735 * class for the left-hand side of the clause contains a relation not
736 * involved in the input rel (C). Despite the fact that we have only
737 * overlap and not containment in either direction, A.y is potentially
738 * useful as a sort column.
739 *
740 * Note that it's even possible that relids overlaps neither side of
741 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
742 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
743 * but overlaps neither side of B. In that case, we just skip this
744 * join clause, since it doesn't suggest a useful sort order for this
745 * relation.
746 */
747 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
748 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
749 restrictinfo->right_ec);
750 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
751 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
752 restrictinfo->left_ec);
753 }
754
755 return useful_eclass_list;
756 }
757
758 /*
759 * get_useful_pathkeys_for_relation
760 * Determine which orderings of a relation might be useful.
761 *
762 * Getting data in sorted order can be useful either because the requested
763 * order matches the final output ordering for the overall query we're
764 * planning, or because it enables an efficient merge join. Here, we try
765 * to figure out which pathkeys to consider.
766 */
767 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)768 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
769 {
770 List *useful_pathkeys_list = NIL;
771 List *useful_eclass_list;
772 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
773 EquivalenceClass *query_ec = NULL;
774 ListCell *lc;
775
776 /*
777 * Pushing the query_pathkeys to the remote server is always worth
778 * considering, because it might let us avoid a local sort.
779 */
780 if (root->query_pathkeys)
781 {
782 bool query_pathkeys_ok = true;
783
784 foreach(lc, root->query_pathkeys)
785 {
786 PathKey *pathkey = (PathKey *) lfirst(lc);
787 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
788 Expr *em_expr;
789
790 /*
791 * The planner and executor don't have any clever strategy for
792 * taking data sorted by a prefix of the query's pathkeys and
793 * getting it to be sorted by all of those pathkeys. We'll just
794 * end up resorting the entire data set. So, unless we can push
795 * down all of the query pathkeys, forget it.
796 *
797 * is_foreign_expr would detect volatile expressions as well, but
798 * checking ec_has_volatile here saves some cycles.
799 */
800 if (pathkey_ec->ec_has_volatile ||
801 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
802 !is_foreign_expr(root, rel, em_expr))
803 {
804 query_pathkeys_ok = false;
805 break;
806 }
807 }
808
809 if (query_pathkeys_ok)
810 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
811 }
812
813 /*
814 * Even if we're not using remote estimates, having the remote side do the
815 * sort generally won't be any worse than doing it locally, and it might
816 * be much better if the remote side can generate data in the right order
817 * without needing a sort at all. However, what we're going to do next is
818 * try to generate pathkeys that seem promising for possible merge joins,
819 * and that's more speculative. A wrong choice might hurt quite a bit, so
820 * bail out if we can't use remote estimates.
821 */
822 if (!fpinfo->use_remote_estimate)
823 return useful_pathkeys_list;
824
825 /* Get the list of interesting EquivalenceClasses. */
826 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
827
828 /* Extract unique EC for query, if any, so we don't consider it again. */
829 if (list_length(root->query_pathkeys) == 1)
830 {
831 PathKey *query_pathkey = linitial(root->query_pathkeys);
832
833 query_ec = query_pathkey->pk_eclass;
834 }
835
836 /*
837 * As a heuristic, the only pathkeys we consider here are those of length
838 * one. It's surely possible to consider more, but since each one we
839 * choose to consider will generate a round-trip to the remote side, we
840 * need to be a bit cautious here. It would sure be nice to have a local
841 * cache of information about remote index definitions...
842 */
843 foreach(lc, useful_eclass_list)
844 {
845 EquivalenceClass *cur_ec = lfirst(lc);
846 Expr *em_expr;
847 PathKey *pathkey;
848
849 /* If redundant with what we did above, skip it. */
850 if (cur_ec == query_ec)
851 continue;
852
853 /* If no pushable expression for this rel, skip it. */
854 em_expr = find_em_expr_for_rel(cur_ec, rel);
855 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
856 continue;
857
858 /* Looks like we can generate a pathkey, so let's do it. */
859 pathkey = make_canonical_pathkey(root, cur_ec,
860 linitial_oid(cur_ec->ec_opfamilies),
861 BTLessStrategyNumber,
862 false);
863 useful_pathkeys_list = lappend(useful_pathkeys_list,
864 list_make1(pathkey));
865 }
866
867 return useful_pathkeys_list;
868 }
869
870 /*
871 * postgresGetForeignPaths
872 * Create possible scan paths for a scan on the foreign table
873 */
874 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)875 postgresGetForeignPaths(PlannerInfo *root,
876 RelOptInfo *baserel,
877 Oid foreigntableid)
878 {
879 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
880 ForeignPath *path;
881 List *ppi_list;
882 ListCell *lc;
883
884 /*
885 * Create simplest ForeignScan path node and add it to baserel. This path
886 * corresponds to SeqScan path of regular tables (though depending on what
887 * baserestrict conditions we were able to send to remote, there might
888 * actually be an indexscan happening there). We already did all the work
889 * to estimate cost and size of this path.
890 *
891 * Although this path uses no join clauses, it could still have required
892 * parameterization due to LATERAL refs in its tlist.
893 */
894 path = create_foreignscan_path(root, baserel,
895 NULL, /* default pathtarget */
896 fpinfo->rows,
897 fpinfo->startup_cost,
898 fpinfo->total_cost,
899 NIL, /* no pathkeys */
900 baserel->lateral_relids,
901 NULL, /* no extra plan */
902 NIL); /* no fdw_private list */
903 add_path(baserel, (Path *) path);
904
905 /* Add paths with pathkeys */
906 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
907
908 /*
909 * If we're not using remote estimates, stop here. We have no way to
910 * estimate whether any join clauses would be worth sending across, so
911 * don't bother building parameterized paths.
912 */
913 if (!fpinfo->use_remote_estimate)
914 return;
915
916 /*
917 * Thumb through all join clauses for the rel to identify which outer
918 * relations could supply one or more safe-to-send-to-remote join clauses.
919 * We'll build a parameterized path for each such outer relation.
920 *
921 * It's convenient to manage this by representing each candidate outer
922 * relation by the ParamPathInfo node for it. We can then use the
923 * ppi_clauses list in the ParamPathInfo node directly as a list of the
924 * interesting join clauses for that rel. This takes care of the
925 * possibility that there are multiple safe join clauses for such a rel,
926 * and also ensures that we account for unsafe join clauses that we'll
927 * still have to enforce locally (since the parameterized-path machinery
928 * insists that we handle all movable clauses).
929 */
930 ppi_list = NIL;
931 foreach(lc, baserel->joininfo)
932 {
933 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
934 Relids required_outer;
935 ParamPathInfo *param_info;
936
937 /* Check if clause can be moved to this rel */
938 if (!join_clause_is_movable_to(rinfo, baserel))
939 continue;
940
941 /* See if it is safe to send to remote */
942 if (!is_foreign_expr(root, baserel, rinfo->clause))
943 continue;
944
945 /* Calculate required outer rels for the resulting path */
946 required_outer = bms_union(rinfo->clause_relids,
947 baserel->lateral_relids);
948 /* We do not want the foreign rel itself listed in required_outer */
949 required_outer = bms_del_member(required_outer, baserel->relid);
950
951 /*
952 * required_outer probably can't be empty here, but if it were, we
953 * couldn't make a parameterized path.
954 */
955 if (bms_is_empty(required_outer))
956 continue;
957
958 /* Get the ParamPathInfo */
959 param_info = get_baserel_parampathinfo(root, baserel,
960 required_outer);
961 Assert(param_info != NULL);
962
963 /*
964 * Add it to list unless we already have it. Testing pointer equality
965 * is OK since get_baserel_parampathinfo won't make duplicates.
966 */
967 ppi_list = list_append_unique_ptr(ppi_list, param_info);
968 }
969
970 /*
971 * The above scan examined only "generic" join clauses, not those that
972 * were absorbed into EquivalenceClauses. See if we can make anything out
973 * of EquivalenceClauses.
974 */
975 if (baserel->has_eclass_joins)
976 {
977 /*
978 * We repeatedly scan the eclass list looking for column references
979 * (or expressions) belonging to the foreign rel. Each time we find
980 * one, we generate a list of equivalence joinclauses for it, and then
981 * see if any are safe to send to the remote. Repeat till there are
982 * no more candidate EC members.
983 */
984 ec_member_foreign_arg arg;
985
986 arg.already_used = NIL;
987 for (;;)
988 {
989 List *clauses;
990
991 /* Make clauses, skipping any that join to lateral_referencers */
992 arg.current = NULL;
993 clauses = generate_implied_equalities_for_column(root,
994 baserel,
995 ec_member_matches_foreign,
996 (void *) &arg,
997 baserel->lateral_referencers);
998
999 /* Done if there are no more expressions in the foreign rel */
1000 if (arg.current == NULL)
1001 {
1002 Assert(clauses == NIL);
1003 break;
1004 }
1005
1006 /* Scan the extracted join clauses */
1007 foreach(lc, clauses)
1008 {
1009 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1010 Relids required_outer;
1011 ParamPathInfo *param_info;
1012
1013 /* Check if clause can be moved to this rel */
1014 if (!join_clause_is_movable_to(rinfo, baserel))
1015 continue;
1016
1017 /* See if it is safe to send to remote */
1018 if (!is_foreign_expr(root, baserel, rinfo->clause))
1019 continue;
1020
1021 /* Calculate required outer rels for the resulting path */
1022 required_outer = bms_union(rinfo->clause_relids,
1023 baserel->lateral_relids);
1024 required_outer = bms_del_member(required_outer, baserel->relid);
1025 if (bms_is_empty(required_outer))
1026 continue;
1027
1028 /* Get the ParamPathInfo */
1029 param_info = get_baserel_parampathinfo(root, baserel,
1030 required_outer);
1031 Assert(param_info != NULL);
1032
1033 /* Add it to list unless we already have it */
1034 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1035 }
1036
1037 /* Try again, now ignoring the expression we found this time */
1038 arg.already_used = lappend(arg.already_used, arg.current);
1039 }
1040 }
1041
1042 /*
1043 * Now build a path for each useful outer relation.
1044 */
1045 foreach(lc, ppi_list)
1046 {
1047 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1048 double rows;
1049 int width;
1050 Cost startup_cost;
1051 Cost total_cost;
1052
1053 /* Get a cost estimate from the remote */
1054 estimate_path_cost_size(root, baserel,
1055 param_info->ppi_clauses, NIL,
1056 &rows, &width,
1057 &startup_cost, &total_cost);
1058
1059 /*
1060 * ppi_rows currently won't get looked at by anything, but still we
1061 * may as well ensure that it matches our idea of the rowcount.
1062 */
1063 param_info->ppi_rows = rows;
1064
1065 /* Make the path */
1066 path = create_foreignscan_path(root, baserel,
1067 NULL, /* default pathtarget */
1068 rows,
1069 startup_cost,
1070 total_cost,
1071 NIL, /* no pathkeys */
1072 param_info->ppi_req_outer,
1073 NULL,
1074 NIL); /* no fdw_private list */
1075 add_path(baserel, (Path *) path);
1076 }
1077 }
1078
1079 /*
1080 * postgresGetForeignPlan
1081 * Create ForeignScan plan node which implements selected best path
1082 */
1083 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1084 postgresGetForeignPlan(PlannerInfo *root,
1085 RelOptInfo *foreignrel,
1086 Oid foreigntableid,
1087 ForeignPath *best_path,
1088 List *tlist,
1089 List *scan_clauses,
1090 Plan *outer_plan)
1091 {
1092 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1093 Index scan_relid;
1094 List *fdw_private;
1095 List *remote_conds = NIL;
1096 List *remote_exprs = NIL;
1097 List *local_exprs = NIL;
1098 List *params_list = NIL;
1099 List *retrieved_attrs;
1100 StringInfoData sql;
1101 ListCell *lc;
1102 List *fdw_scan_tlist = NIL;
1103
1104 /*
1105 * For base relations, set scan_relid as the relid of the relation. For
1106 * other kinds of relations set it to 0.
1107 */
1108 if (foreignrel->reloptkind == RELOPT_BASEREL ||
1109 foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1110 scan_relid = foreignrel->relid;
1111 else
1112 {
1113 scan_relid = 0;
1114
1115 /*
1116 * create_scan_plan() and create_foreignscan_plan() pass
1117 * rel->baserestrictinfo + parameterization clauses through
1118 * scan_clauses. For a join rel->baserestrictinfo is NIL and we are
1119 * not considering parameterization right now, so there should be no
1120 * scan_clauses for a joinrel.
1121 */
1122 Assert(!scan_clauses);
1123 }
1124
1125 /*
1126 * Separate the scan_clauses into those that can be executed remotely and
1127 * those that can't. baserestrictinfo clauses that were previously
1128 * determined to be safe or unsafe by classifyConditions are shown in
1129 * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
1130 * scan_clauses list will be a join clause, which we have to check for
1131 * remote-safety.
1132 *
1133 * Note: the join clauses we see here should be the exact same ones
1134 * previously examined by postgresGetForeignPaths. Possibly it'd be worth
1135 * passing forward the classification work done then, rather than
1136 * repeating it here.
1137 *
1138 * This code must match "extract_actual_clauses(scan_clauses, false)"
1139 * except for the additional decision about remote versus local execution.
1140 * Note however that we don't strip the RestrictInfo nodes from the
1141 * remote_conds list, since appendWhereClause expects a list of
1142 * RestrictInfos.
1143 */
1144 foreach(lc, scan_clauses)
1145 {
1146 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1147
1148 Assert(IsA(rinfo, RestrictInfo));
1149
1150 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1151 if (rinfo->pseudoconstant)
1152 continue;
1153
1154 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1155 {
1156 remote_conds = lappend(remote_conds, rinfo);
1157 remote_exprs = lappend(remote_exprs, rinfo->clause);
1158 }
1159 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1160 local_exprs = lappend(local_exprs, rinfo->clause);
1161 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1162 {
1163 remote_conds = lappend(remote_conds, rinfo);
1164 remote_exprs = lappend(remote_exprs, rinfo->clause);
1165 }
1166 else
1167 local_exprs = lappend(local_exprs, rinfo->clause);
1168 }
1169
1170 if (foreignrel->reloptkind == RELOPT_JOINREL)
1171 {
1172 /* For a join relation, get the conditions from fdw_private structure */
1173 remote_conds = fpinfo->remote_conds;
1174 local_exprs = fpinfo->local_conds;
1175
1176 /* Build the list of columns to be fetched from the foreign server. */
1177 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1178
1179 /*
1180 * Ensure that the outer plan produces a tuple whose descriptor
1181 * matches our scan tuple slot. Also, remove the local conditions
1182 * from outer plan's quals, lest they be evaluated twice, once by the
1183 * local plan and once by the scan.
1184 */
1185 if (outer_plan)
1186 {
1187 ListCell *lc;
1188
1189 /*
1190 * First, update the plan's qual list if possible. In some cases
1191 * the quals might be enforced below the topmost plan level, in
1192 * which case we'll fail to remove them; it's not worth working
1193 * harder than this.
1194 */
1195 foreach(lc, local_exprs)
1196 {
1197 Node *qual = lfirst(lc);
1198
1199 outer_plan->qual = list_delete(outer_plan->qual, qual);
1200
1201 /*
1202 * For an inner join the local conditions of foreign scan plan
1203 * can be part of the joinquals as well. (They might also be
1204 * in the mergequals or hashquals, but we can't touch those
1205 * without breaking the plan.)
1206 */
1207 if (IsA(outer_plan, NestLoop) ||
1208 IsA(outer_plan, MergeJoin) ||
1209 IsA(outer_plan, HashJoin))
1210 {
1211 Join *join_plan = (Join *) outer_plan;
1212
1213 if (join_plan->jointype == JOIN_INNER)
1214 join_plan->joinqual = list_delete(join_plan->joinqual,
1215 qual);
1216 }
1217 }
1218
1219 /*
1220 * Now fix the subplan's tlist --- this might result in inserting
1221 * a Result node atop the plan tree.
1222 */
1223 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1224 best_path->path.parallel_safe);
1225 }
1226 }
1227
1228 /*
1229 * Build the query string to be sent for execution, and identify
1230 * expressions to be sent as parameters.
1231 */
1232 initStringInfo(&sql);
1233 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1234 remote_conds, best_path->path.pathkeys,
1235 &retrieved_attrs, ¶ms_list);
1236
1237 /*
1238 * Build the fdw_private list that will be available to the executor.
1239 * Items in the list must match order in enum FdwScanPrivateIndex.
1240 */
1241 fdw_private = list_make4(makeString(sql.data),
1242 remote_conds,
1243 retrieved_attrs,
1244 makeInteger(fpinfo->fetch_size));
1245 if (foreignrel->reloptkind == RELOPT_JOINREL)
1246 fdw_private = lappend(fdw_private,
1247 makeString(fpinfo->relation_name->data));
1248
1249 /*
1250 * Create the ForeignScan node for the given relation.
1251 *
1252 * Note that the remote parameter expressions are stored in the fdw_exprs
1253 * field of the finished plan node; we can't keep them in private state
1254 * because then they wouldn't be subject to later planner processing.
1255 */
1256 return make_foreignscan(tlist,
1257 local_exprs,
1258 scan_relid,
1259 params_list,
1260 fdw_private,
1261 fdw_scan_tlist,
1262 remote_exprs,
1263 outer_plan);
1264 }
1265
1266 /*
1267 * postgresBeginForeignScan
1268 * Initiate an executor scan of a foreign PostgreSQL table.
1269 */
1270 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1271 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1272 {
1273 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1274 EState *estate = node->ss.ps.state;
1275 PgFdwScanState *fsstate;
1276 RangeTblEntry *rte;
1277 Oid userid;
1278 ForeignTable *table;
1279 UserMapping *user;
1280 int rtindex;
1281 int numParams;
1282
1283 /*
1284 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1285 */
1286 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1287 return;
1288
1289 /*
1290 * We'll save private state in node->fdw_state.
1291 */
1292 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1293 node->fdw_state = (void *) fsstate;
1294
1295 /*
1296 * Identify which user to do the remote access as. This should match what
1297 * ExecCheckRTEPerms() does. In case of a join, use the lowest-numbered
1298 * member RTE as a representative; we would get the same result from any.
1299 */
1300 if (fsplan->scan.scanrelid > 0)
1301 rtindex = fsplan->scan.scanrelid;
1302 else
1303 rtindex = bms_next_member(fsplan->fs_relids, -1);
1304 rte = rt_fetch(rtindex, estate->es_range_table);
1305 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1306
1307 /* Get info about foreign table. */
1308 table = GetForeignTable(rte->relid);
1309 user = GetUserMapping(userid, table->serverid);
1310
1311 /*
1312 * Get connection to the foreign server. Connection manager will
1313 * establish new connection if necessary.
1314 */
1315 fsstate->conn = GetConnection(user, false);
1316
1317 /* Assign a unique ID for my cursor */
1318 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1319 fsstate->cursor_exists = false;
1320
1321 /* Get private info created by planner functions. */
1322 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1323 FdwScanPrivateSelectSql));
1324 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1325 FdwScanPrivateRetrievedAttrs);
1326 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1327 FdwScanPrivateFetchSize));
1328
1329 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1330 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1331 "postgres_fdw tuple data",
1332 ALLOCSET_DEFAULT_SIZES);
1333 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1334 "postgres_fdw temporary data",
1335 ALLOCSET_SMALL_SIZES);
1336
1337 /*
1338 * Get info we'll need for converting data fetched from the foreign server
1339 * into local representation and error reporting during that process.
1340 */
1341 if (fsplan->scan.scanrelid > 0)
1342 {
1343 fsstate->rel = node->ss.ss_currentRelation;
1344 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1345 }
1346 else
1347 {
1348 fsstate->rel = NULL;
1349 fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1350 }
1351
1352 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1353
1354 /*
1355 * Prepare for processing of parameters used in remote query, if any.
1356 */
1357 numParams = list_length(fsplan->fdw_exprs);
1358 fsstate->numParams = numParams;
1359 if (numParams > 0)
1360 prepare_query_params((PlanState *) node,
1361 fsplan->fdw_exprs,
1362 numParams,
1363 &fsstate->param_flinfo,
1364 &fsstate->param_exprs,
1365 &fsstate->param_values);
1366 }
1367
1368 /*
1369 * postgresIterateForeignScan
1370 * Retrieve next row from the result set, or clear tuple slot to indicate
1371 * EOF.
1372 */
1373 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1374 postgresIterateForeignScan(ForeignScanState *node)
1375 {
1376 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1377 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1378
1379 /*
1380 * If this is the first call after Begin or ReScan, we need to create the
1381 * cursor on the remote side.
1382 */
1383 if (!fsstate->cursor_exists)
1384 create_cursor(node);
1385
1386 /*
1387 * Get some more tuples, if we've run out.
1388 */
1389 if (fsstate->next_tuple >= fsstate->num_tuples)
1390 {
1391 /* No point in another fetch if we already detected EOF, though. */
1392 if (!fsstate->eof_reached)
1393 fetch_more_data(node);
1394 /* If we didn't get any tuples, must be end of data. */
1395 if (fsstate->next_tuple >= fsstate->num_tuples)
1396 return ExecClearTuple(slot);
1397 }
1398
1399 /*
1400 * Return the next tuple.
1401 */
1402 ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1403 slot,
1404 InvalidBuffer,
1405 false);
1406
1407 return slot;
1408 }
1409
1410 /*
1411 * postgresReScanForeignScan
1412 * Restart the scan.
1413 */
1414 static void
postgresReScanForeignScan(ForeignScanState * node)1415 postgresReScanForeignScan(ForeignScanState *node)
1416 {
1417 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1418 char sql[64];
1419 PGresult *res;
1420
1421 /* If we haven't created the cursor yet, nothing to do. */
1422 if (!fsstate->cursor_exists)
1423 return;
1424
1425 /*
1426 * If any internal parameters affecting this node have changed, we'd
1427 * better destroy and recreate the cursor. Otherwise, rewinding it should
1428 * be good enough. If we've only fetched zero or one batch, we needn't
1429 * even rewind the cursor, just rescan what we have.
1430 */
1431 if (node->ss.ps.chgParam != NULL)
1432 {
1433 fsstate->cursor_exists = false;
1434 snprintf(sql, sizeof(sql), "CLOSE c%u",
1435 fsstate->cursor_number);
1436 }
1437 else if (fsstate->fetch_ct_2 > 1)
1438 {
1439 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1440 fsstate->cursor_number);
1441 }
1442 else
1443 {
1444 /* Easy: just rescan what we already have in memory, if anything */
1445 fsstate->next_tuple = 0;
1446 return;
1447 }
1448
1449 /*
1450 * We don't use a PG_TRY block here, so be careful not to throw error
1451 * without releasing the PGresult.
1452 */
1453 res = pgfdw_exec_query(fsstate->conn, sql);
1454 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1455 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1456 PQclear(res);
1457
1458 /* Now force a fresh FETCH. */
1459 fsstate->tuples = NULL;
1460 fsstate->num_tuples = 0;
1461 fsstate->next_tuple = 0;
1462 fsstate->fetch_ct_2 = 0;
1463 fsstate->eof_reached = false;
1464 }
1465
1466 /*
1467 * postgresEndForeignScan
1468 * Finish scanning foreign table and dispose objects used for this scan
1469 */
1470 static void
postgresEndForeignScan(ForeignScanState * node)1471 postgresEndForeignScan(ForeignScanState *node)
1472 {
1473 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1474
1475 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1476 if (fsstate == NULL)
1477 return;
1478
1479 /* Close the cursor if open, to prevent accumulation of cursors */
1480 if (fsstate->cursor_exists)
1481 close_cursor(fsstate->conn, fsstate->cursor_number);
1482
1483 /* Release remote connection */
1484 ReleaseConnection(fsstate->conn);
1485 fsstate->conn = NULL;
1486
1487 /* MemoryContexts will be deleted automatically. */
1488 }
1489
1490 /*
1491 * postgresAddForeignUpdateTargets
1492 * Add resjunk column(s) needed for update/delete on a foreign table
1493 */
1494 static void
postgresAddForeignUpdateTargets(Query * parsetree,RangeTblEntry * target_rte,Relation target_relation)1495 postgresAddForeignUpdateTargets(Query *parsetree,
1496 RangeTblEntry *target_rte,
1497 Relation target_relation)
1498 {
1499 Var *var;
1500 const char *attrname;
1501 TargetEntry *tle;
1502
1503 /*
1504 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1505 */
1506
1507 /* Make a Var representing the desired value */
1508 var = makeVar(parsetree->resultRelation,
1509 SelfItemPointerAttributeNumber,
1510 TIDOID,
1511 -1,
1512 InvalidOid,
1513 0);
1514
1515 /* Wrap it in a resjunk TLE with the right name ... */
1516 attrname = "ctid";
1517
1518 tle = makeTargetEntry((Expr *) var,
1519 list_length(parsetree->targetList) + 1,
1520 pstrdup(attrname),
1521 true);
1522
1523 /* ... and add it to the query's targetlist */
1524 parsetree->targetList = lappend(parsetree->targetList, tle);
1525 }
1526
1527 /*
1528 * postgresPlanForeignModify
1529 * Plan an insert/update/delete operation on a foreign table
1530 */
1531 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1532 postgresPlanForeignModify(PlannerInfo *root,
1533 ModifyTable *plan,
1534 Index resultRelation,
1535 int subplan_index)
1536 {
1537 CmdType operation = plan->operation;
1538 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1539 Relation rel;
1540 StringInfoData sql;
1541 List *targetAttrs = NIL;
1542 List *returningList = NIL;
1543 List *retrieved_attrs = NIL;
1544 bool doNothing = false;
1545
1546 initStringInfo(&sql);
1547
1548 /*
1549 * Core code already has some lock on each rel being planned, so we can
1550 * use NoLock here.
1551 */
1552 rel = heap_open(rte->relid, NoLock);
1553
1554 /*
1555 * In an INSERT, we transmit all columns that are defined in the foreign
1556 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1557 * foreign table, we transmit all columns like INSERT; else we transmit
1558 * only columns that were explicitly targets of the UPDATE, so as to avoid
1559 * unnecessary data transmission. (We can't do that for INSERT since we
1560 * would miss sending default values for columns not listed in the source
1561 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1562 * those triggers might change values for non-target columns, in which
1563 * case we would miss sending changed values for those columns.)
1564 */
1565 if (operation == CMD_INSERT ||
1566 (operation == CMD_UPDATE &&
1567 rel->trigdesc &&
1568 rel->trigdesc->trig_update_before_row))
1569 {
1570 TupleDesc tupdesc = RelationGetDescr(rel);
1571 int attnum;
1572
1573 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1574 {
1575 Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1576
1577 if (!attr->attisdropped)
1578 targetAttrs = lappend_int(targetAttrs, attnum);
1579 }
1580 }
1581 else if (operation == CMD_UPDATE)
1582 {
1583 int col;
1584
1585 col = -1;
1586 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1587 {
1588 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1589 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1590
1591 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1592 elog(ERROR, "system-column update is not supported");
1593 targetAttrs = lappend_int(targetAttrs, attno);
1594 }
1595 }
1596
1597 /*
1598 * Extract the relevant RETURNING list if any.
1599 */
1600 if (plan->returningLists)
1601 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1602
1603 /*
1604 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1605 * should have already been rejected in the optimizer, as presently there
1606 * is no way to recognize an arbiter index on a foreign table. Only DO
1607 * NOTHING is supported without an inference specification.
1608 */
1609 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1610 doNothing = true;
1611 else if (plan->onConflictAction != ONCONFLICT_NONE)
1612 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1613 (int) plan->onConflictAction);
1614
1615 /*
1616 * Construct the SQL command string.
1617 */
1618 switch (operation)
1619 {
1620 case CMD_INSERT:
1621 deparseInsertSql(&sql, root, resultRelation, rel,
1622 targetAttrs, doNothing, returningList,
1623 &retrieved_attrs);
1624 break;
1625 case CMD_UPDATE:
1626 deparseUpdateSql(&sql, root, resultRelation, rel,
1627 targetAttrs, returningList,
1628 &retrieved_attrs);
1629 break;
1630 case CMD_DELETE:
1631 deparseDeleteSql(&sql, root, resultRelation, rel,
1632 returningList,
1633 &retrieved_attrs);
1634 break;
1635 default:
1636 elog(ERROR, "unexpected operation: %d", (int) operation);
1637 break;
1638 }
1639
1640 heap_close(rel, NoLock);
1641
1642 /*
1643 * Build the fdw_private list that will be available to the executor.
1644 * Items in the list must match enum FdwModifyPrivateIndex, above.
1645 */
1646 return list_make4(makeString(sql.data),
1647 targetAttrs,
1648 makeInteger((retrieved_attrs != NIL)),
1649 retrieved_attrs);
1650 }
1651
1652 /*
1653 * postgresBeginForeignModify
1654 * Begin an insert/update/delete operation on a foreign table
1655 */
1656 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1657 postgresBeginForeignModify(ModifyTableState *mtstate,
1658 ResultRelInfo *resultRelInfo,
1659 List *fdw_private,
1660 int subplan_index,
1661 int eflags)
1662 {
1663 PgFdwModifyState *fmstate;
1664 EState *estate = mtstate->ps.state;
1665 CmdType operation = mtstate->operation;
1666 Relation rel = resultRelInfo->ri_RelationDesc;
1667 RangeTblEntry *rte;
1668 Oid userid;
1669 ForeignTable *table;
1670 UserMapping *user;
1671 AttrNumber n_params;
1672 Oid typefnoid;
1673 bool isvarlena;
1674 ListCell *lc;
1675
1676 /*
1677 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1678 * stays NULL.
1679 */
1680 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1681 return;
1682
1683 /* Begin constructing PgFdwModifyState. */
1684 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1685 fmstate->rel = rel;
1686
1687 /*
1688 * Identify which user to do the remote access as. This should match what
1689 * ExecCheckRTEPerms() does.
1690 */
1691 rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1692 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1693
1694 /* Get info about foreign table. */
1695 table = GetForeignTable(RelationGetRelid(rel));
1696 user = GetUserMapping(userid, table->serverid);
1697
1698 /* Open connection; report that we'll create a prepared statement. */
1699 fmstate->conn = GetConnection(user, true);
1700 fmstate->p_name = NULL; /* prepared statement not made yet */
1701
1702 /* Deconstruct fdw_private data. */
1703 fmstate->query = strVal(list_nth(fdw_private,
1704 FdwModifyPrivateUpdateSql));
1705 fmstate->target_attrs = (List *) list_nth(fdw_private,
1706 FdwModifyPrivateTargetAttnums);
1707 fmstate->has_returning = intVal(list_nth(fdw_private,
1708 FdwModifyPrivateHasReturning));
1709 fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1710 FdwModifyPrivateRetrievedAttrs);
1711
1712 /* Create context for per-tuple temp workspace. */
1713 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1714 "postgres_fdw temporary data",
1715 ALLOCSET_SMALL_SIZES);
1716
1717 /* Prepare for input conversion of RETURNING results. */
1718 if (fmstate->has_returning)
1719 fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
1720
1721 /* Prepare for output conversion of parameters used in prepared stmt. */
1722 n_params = list_length(fmstate->target_attrs) + 1;
1723 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1724 fmstate->p_nums = 0;
1725
1726 if (operation == CMD_UPDATE || operation == CMD_DELETE)
1727 {
1728 /* Find the ctid resjunk column in the subplan's result */
1729 Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1730
1731 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1732 "ctid");
1733 if (!AttributeNumberIsValid(fmstate->ctidAttno))
1734 elog(ERROR, "could not find junk ctid column");
1735
1736 /* First transmittable parameter will be ctid */
1737 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1738 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1739 fmstate->p_nums++;
1740 }
1741
1742 if (operation == CMD_INSERT || operation == CMD_UPDATE)
1743 {
1744 /* Set up for remaining transmittable parameters */
1745 foreach(lc, fmstate->target_attrs)
1746 {
1747 int attnum = lfirst_int(lc);
1748 Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1749
1750 Assert(!attr->attisdropped);
1751
1752 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1753 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1754 fmstate->p_nums++;
1755 }
1756 }
1757
1758 Assert(fmstate->p_nums <= n_params);
1759
1760 resultRelInfo->ri_FdwState = fmstate;
1761 }
1762
1763 /*
1764 * postgresExecForeignInsert
1765 * Insert one row into a foreign table
1766 */
1767 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1768 postgresExecForeignInsert(EState *estate,
1769 ResultRelInfo *resultRelInfo,
1770 TupleTableSlot *slot,
1771 TupleTableSlot *planSlot)
1772 {
1773 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1774 const char **p_values;
1775 PGresult *res;
1776 int n_rows;
1777
1778 /* Set up the prepared statement on the remote server, if we didn't yet */
1779 if (!fmstate->p_name)
1780 prepare_foreign_modify(fmstate);
1781
1782 /* Convert parameters needed by prepared statement to text form */
1783 p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1784
1785 /*
1786 * Execute the prepared statement.
1787 */
1788 if (!PQsendQueryPrepared(fmstate->conn,
1789 fmstate->p_name,
1790 fmstate->p_nums,
1791 p_values,
1792 NULL,
1793 NULL,
1794 0))
1795 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1796
1797 /*
1798 * Get the result, and check for success.
1799 *
1800 * We don't use a PG_TRY block here, so be careful not to throw error
1801 * without releasing the PGresult.
1802 */
1803 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1804 if (PQresultStatus(res) !=
1805 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1806 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1807
1808 /* Check number of rows affected, and fetch RETURNING tuple if any */
1809 if (fmstate->has_returning)
1810 {
1811 n_rows = PQntuples(res);
1812 if (n_rows > 0)
1813 store_returning_result(fmstate, slot, res);
1814 }
1815 else
1816 n_rows = atoi(PQcmdTuples(res));
1817
1818 /* And clean up */
1819 PQclear(res);
1820
1821 MemoryContextReset(fmstate->temp_cxt);
1822
1823 /* Return NULL if nothing was inserted on the remote end */
1824 return (n_rows > 0) ? slot : NULL;
1825 }
1826
1827 /*
1828 * postgresExecForeignUpdate
1829 * Update one row in a foreign table
1830 */
1831 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1832 postgresExecForeignUpdate(EState *estate,
1833 ResultRelInfo *resultRelInfo,
1834 TupleTableSlot *slot,
1835 TupleTableSlot *planSlot)
1836 {
1837 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1838 Datum datum;
1839 bool isNull;
1840 const char **p_values;
1841 PGresult *res;
1842 int n_rows;
1843
1844 /* Set up the prepared statement on the remote server, if we didn't yet */
1845 if (!fmstate->p_name)
1846 prepare_foreign_modify(fmstate);
1847
1848 /* Get the ctid that was passed up as a resjunk column */
1849 datum = ExecGetJunkAttribute(planSlot,
1850 fmstate->ctidAttno,
1851 &isNull);
1852 /* shouldn't ever get a null result... */
1853 if (isNull)
1854 elog(ERROR, "ctid is NULL");
1855
1856 /* Convert parameters needed by prepared statement to text form */
1857 p_values = convert_prep_stmt_params(fmstate,
1858 (ItemPointer) DatumGetPointer(datum),
1859 slot);
1860
1861 /*
1862 * Execute the prepared statement.
1863 */
1864 if (!PQsendQueryPrepared(fmstate->conn,
1865 fmstate->p_name,
1866 fmstate->p_nums,
1867 p_values,
1868 NULL,
1869 NULL,
1870 0))
1871 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1872
1873 /*
1874 * Get the result, and check for success.
1875 *
1876 * We don't use a PG_TRY block here, so be careful not to throw error
1877 * without releasing the PGresult.
1878 */
1879 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1880 if (PQresultStatus(res) !=
1881 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1882 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1883
1884 /* Check number of rows affected, and fetch RETURNING tuple if any */
1885 if (fmstate->has_returning)
1886 {
1887 n_rows = PQntuples(res);
1888 if (n_rows > 0)
1889 store_returning_result(fmstate, slot, res);
1890 }
1891 else
1892 n_rows = atoi(PQcmdTuples(res));
1893
1894 /* And clean up */
1895 PQclear(res);
1896
1897 MemoryContextReset(fmstate->temp_cxt);
1898
1899 /* Return NULL if nothing was updated on the remote end */
1900 return (n_rows > 0) ? slot : NULL;
1901 }
1902
1903 /*
1904 * postgresExecForeignDelete
1905 * Delete one row from a foreign table
1906 */
1907 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1908 postgresExecForeignDelete(EState *estate,
1909 ResultRelInfo *resultRelInfo,
1910 TupleTableSlot *slot,
1911 TupleTableSlot *planSlot)
1912 {
1913 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1914 Datum datum;
1915 bool isNull;
1916 const char **p_values;
1917 PGresult *res;
1918 int n_rows;
1919
1920 /* Set up the prepared statement on the remote server, if we didn't yet */
1921 if (!fmstate->p_name)
1922 prepare_foreign_modify(fmstate);
1923
1924 /* Get the ctid that was passed up as a resjunk column */
1925 datum = ExecGetJunkAttribute(planSlot,
1926 fmstate->ctidAttno,
1927 &isNull);
1928 /* shouldn't ever get a null result... */
1929 if (isNull)
1930 elog(ERROR, "ctid is NULL");
1931
1932 /* Convert parameters needed by prepared statement to text form */
1933 p_values = convert_prep_stmt_params(fmstate,
1934 (ItemPointer) DatumGetPointer(datum),
1935 NULL);
1936
1937 /*
1938 * Execute the prepared statement.
1939 */
1940 if (!PQsendQueryPrepared(fmstate->conn,
1941 fmstate->p_name,
1942 fmstate->p_nums,
1943 p_values,
1944 NULL,
1945 NULL,
1946 0))
1947 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1948
1949 /*
1950 * Get the result, and check for success.
1951 *
1952 * We don't use a PG_TRY block here, so be careful not to throw error
1953 * without releasing the PGresult.
1954 */
1955 res = pgfdw_get_result(fmstate->conn, fmstate->query);
1956 if (PQresultStatus(res) !=
1957 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1958 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1959
1960 /* Check number of rows affected, and fetch RETURNING tuple if any */
1961 if (fmstate->has_returning)
1962 {
1963 n_rows = PQntuples(res);
1964 if (n_rows > 0)
1965 store_returning_result(fmstate, slot, res);
1966 }
1967 else
1968 n_rows = atoi(PQcmdTuples(res));
1969
1970 /* And clean up */
1971 PQclear(res);
1972
1973 MemoryContextReset(fmstate->temp_cxt);
1974
1975 /* Return NULL if nothing was deleted on the remote end */
1976 return (n_rows > 0) ? slot : NULL;
1977 }
1978
1979 /*
1980 * postgresEndForeignModify
1981 * Finish an insert/update/delete operation on a foreign table
1982 */
1983 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)1984 postgresEndForeignModify(EState *estate,
1985 ResultRelInfo *resultRelInfo)
1986 {
1987 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1988
1989 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1990 if (fmstate == NULL)
1991 return;
1992
1993 /* If we created a prepared statement, destroy it */
1994 if (fmstate->p_name)
1995 {
1996 char sql[64];
1997 PGresult *res;
1998
1999 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2000
2001 /*
2002 * We don't use a PG_TRY block here, so be careful not to throw error
2003 * without releasing the PGresult.
2004 */
2005 res = pgfdw_exec_query(fmstate->conn, sql);
2006 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2007 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2008 PQclear(res);
2009 fmstate->p_name = NULL;
2010 }
2011
2012 /* Release remote connection */
2013 ReleaseConnection(fmstate->conn);
2014 fmstate->conn = NULL;
2015 }
2016
2017 /*
2018 * postgresIsForeignRelUpdatable
2019 * Determine whether a foreign table supports INSERT, UPDATE and/or
2020 * DELETE.
2021 */
2022 static int
postgresIsForeignRelUpdatable(Relation rel)2023 postgresIsForeignRelUpdatable(Relation rel)
2024 {
2025 bool updatable;
2026 ForeignTable *table;
2027 ForeignServer *server;
2028 ListCell *lc;
2029
2030 /*
2031 * By default, all postgres_fdw foreign tables are assumed updatable. This
2032 * can be overridden by a per-server setting, which in turn can be
2033 * overridden by a per-table setting.
2034 */
2035 updatable = true;
2036
2037 table = GetForeignTable(RelationGetRelid(rel));
2038 server = GetForeignServer(table->serverid);
2039
2040 foreach(lc, server->options)
2041 {
2042 DefElem *def = (DefElem *) lfirst(lc);
2043
2044 if (strcmp(def->defname, "updatable") == 0)
2045 updatable = defGetBoolean(def);
2046 }
2047 foreach(lc, table->options)
2048 {
2049 DefElem *def = (DefElem *) lfirst(lc);
2050
2051 if (strcmp(def->defname, "updatable") == 0)
2052 updatable = defGetBoolean(def);
2053 }
2054
2055 /*
2056 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2057 */
2058 return updatable ?
2059 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2060 }
2061
2062 /*
2063 * postgresRecheckForeignScan
2064 * Execute a local join execution plan for a foreign join
2065 */
2066 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2067 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2068 {
2069 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2070 PlanState *outerPlan = outerPlanState(node);
2071 TupleTableSlot *result;
2072
2073 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2074 if (scanrelid > 0)
2075 return true;
2076
2077 Assert(outerPlan != NULL);
2078
2079 /* Execute a local join execution plan */
2080 result = ExecProcNode(outerPlan);
2081 if (TupIsNull(result))
2082 return false;
2083
2084 /* Store result in the given slot */
2085 ExecCopySlot(slot, result);
2086
2087 return true;
2088 }
2089
2090 /*
2091 * postgresPlanDirectModify
2092 * Consider a direct foreign table modification
2093 *
2094 * Decide whether it is safe to modify a foreign table directly, and if so,
2095 * rewrite subplan accordingly.
2096 */
2097 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2098 postgresPlanDirectModify(PlannerInfo *root,
2099 ModifyTable *plan,
2100 Index resultRelation,
2101 int subplan_index)
2102 {
2103 CmdType operation = plan->operation;
2104 Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
2105 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
2106 Relation rel;
2107 StringInfoData sql;
2108 ForeignScan *fscan;
2109 List *targetAttrs = NIL;
2110 List *remote_conds;
2111 List *params_list = NIL;
2112 List *returningList = NIL;
2113 List *retrieved_attrs = NIL;
2114
2115 /*
2116 * Decide whether it is safe to modify a foreign table directly.
2117 */
2118
2119 /*
2120 * The table modification must be an UPDATE or DELETE.
2121 */
2122 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2123 return false;
2124
2125 /*
2126 * It's unsafe to modify a foreign table directly if there are any local
2127 * joins needed.
2128 */
2129 if (!IsA(subplan, ForeignScan))
2130 return false;
2131
2132 /*
2133 * It's unsafe to modify a foreign table directly if there are any quals
2134 * that should be evaluated locally.
2135 */
2136 if (subplan->qual != NIL)
2137 return false;
2138
2139 /*
2140 * We can't handle an UPDATE or DELETE on a foreign join for now.
2141 */
2142 fscan = (ForeignScan *) subplan;
2143 if (fscan->scan.scanrelid == 0)
2144 return false;
2145
2146 /*
2147 * It's unsafe to update a foreign table directly, if any expressions to
2148 * assign to the target columns are unsafe to evaluate remotely.
2149 */
2150 if (operation == CMD_UPDATE)
2151 {
2152 RelOptInfo *baserel = root->simple_rel_array[resultRelation];
2153 int col;
2154
2155 /*
2156 * We transmit only columns that were explicitly targets of the
2157 * UPDATE, so as to avoid unnecessary data transmission.
2158 */
2159 col = -1;
2160 while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2161 {
2162 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2163 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
2164 TargetEntry *tle;
2165
2166 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2167 elog(ERROR, "system-column update is not supported");
2168
2169 tle = get_tle_by_resno(subplan->targetlist, attno);
2170
2171 if (!tle)
2172 elog(ERROR, "attribute number %d not found in subplan targetlist",
2173 attno);
2174
2175 if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
2176 return false;
2177
2178 targetAttrs = lappend_int(targetAttrs, attno);
2179 }
2180 }
2181
2182 /*
2183 * Ok, rewrite subplan so as to modify the foreign table directly.
2184 */
2185 initStringInfo(&sql);
2186
2187 /*
2188 * Core code already has some lock on each rel being planned, so we can
2189 * use NoLock here.
2190 */
2191 rel = heap_open(rte->relid, NoLock);
2192
2193 /*
2194 * Extract the baserestrictinfo clauses that can be evaluated remotely.
2195 */
2196 remote_conds = (List *) list_nth(fscan->fdw_private,
2197 FdwScanPrivateRemoteConds);
2198
2199 /*
2200 * Extract the relevant RETURNING list if any.
2201 */
2202 if (plan->returningLists)
2203 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2204
2205 /*
2206 * Construct the SQL command string.
2207 */
2208 switch (operation)
2209 {
2210 case CMD_UPDATE:
2211 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2212 ((Plan *) fscan)->targetlist,
2213 targetAttrs,
2214 remote_conds, ¶ms_list,
2215 returningList, &retrieved_attrs);
2216 break;
2217 case CMD_DELETE:
2218 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2219 remote_conds, ¶ms_list,
2220 returningList, &retrieved_attrs);
2221 break;
2222 default:
2223 elog(ERROR, "unexpected operation: %d", (int) operation);
2224 break;
2225 }
2226
2227 /*
2228 * Update the operation info.
2229 */
2230 fscan->operation = operation;
2231
2232 /*
2233 * Update the fdw_exprs list that will be available to the executor.
2234 */
2235 fscan->fdw_exprs = params_list;
2236
2237 /*
2238 * Update the fdw_private list that will be available to the executor.
2239 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2240 */
2241 fscan->fdw_private = list_make4(makeString(sql.data),
2242 makeInteger((retrieved_attrs != NIL)),
2243 retrieved_attrs,
2244 makeInteger(plan->canSetTag));
2245
2246 heap_close(rel, NoLock);
2247 return true;
2248 }
2249
2250 /*
2251 * postgresBeginDirectModify
2252 * Prepare a direct foreign table modification
2253 */
2254 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2255 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2256 {
2257 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2258 EState *estate = node->ss.ps.state;
2259 PgFdwDirectModifyState *dmstate;
2260 RangeTblEntry *rte;
2261 Oid userid;
2262 ForeignTable *table;
2263 UserMapping *user;
2264 int numParams;
2265
2266 /*
2267 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2268 */
2269 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2270 return;
2271
2272 /*
2273 * We'll save private state in node->fdw_state.
2274 */
2275 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2276 node->fdw_state = (void *) dmstate;
2277
2278 /*
2279 * Identify which user to do the remote access as. This should match what
2280 * ExecCheckRTEPerms() does.
2281 */
2282 rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2283 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2284
2285 /* Get info about foreign table. */
2286 dmstate->rel = node->ss.ss_currentRelation;
2287 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2288 user = GetUserMapping(userid, table->serverid);
2289
2290 /*
2291 * Get connection to the foreign server. Connection manager will
2292 * establish new connection if necessary.
2293 */
2294 dmstate->conn = GetConnection(user, false);
2295
2296 /* Initialize state variable */
2297 dmstate->num_tuples = -1; /* -1 means not set yet */
2298
2299 /* Get private info created by planner functions. */
2300 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2301 FdwDirectModifyPrivateUpdateSql));
2302 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2303 FdwDirectModifyPrivateHasReturning));
2304 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2305 FdwDirectModifyPrivateRetrievedAttrs);
2306 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2307 FdwDirectModifyPrivateSetProcessed));
2308
2309 /* Create context for per-tuple temp workspace. */
2310 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2311 "postgres_fdw temporary data",
2312 ALLOCSET_SMALL_SIZES);
2313
2314 /* Prepare for input conversion of RETURNING results. */
2315 if (dmstate->has_returning)
2316 dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2317
2318 /*
2319 * Prepare for processing of parameters used in remote query, if any.
2320 */
2321 numParams = list_length(fsplan->fdw_exprs);
2322 dmstate->numParams = numParams;
2323 if (numParams > 0)
2324 prepare_query_params((PlanState *) node,
2325 fsplan->fdw_exprs,
2326 numParams,
2327 &dmstate->param_flinfo,
2328 &dmstate->param_exprs,
2329 &dmstate->param_values);
2330 }
2331
2332 /*
2333 * postgresIterateDirectModify
2334 * Execute a direct foreign table modification
2335 */
2336 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2337 postgresIterateDirectModify(ForeignScanState *node)
2338 {
2339 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2340 EState *estate = node->ss.ps.state;
2341 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2342
2343 /*
2344 * If this is the first call after Begin, execute the statement.
2345 */
2346 if (dmstate->num_tuples == -1)
2347 execute_dml_stmt(node);
2348
2349 /*
2350 * If the local query doesn't specify RETURNING, just clear tuple slot.
2351 */
2352 if (!resultRelInfo->ri_projectReturning)
2353 {
2354 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2355 Instrumentation *instr = node->ss.ps.instrument;
2356
2357 Assert(!dmstate->has_returning);
2358
2359 /* Increment the command es_processed count if necessary. */
2360 if (dmstate->set_processed)
2361 estate->es_processed += dmstate->num_tuples;
2362
2363 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2364 if (instr)
2365 instr->tuplecount += dmstate->num_tuples;
2366
2367 return ExecClearTuple(slot);
2368 }
2369
2370 /*
2371 * Get the next RETURNING tuple.
2372 */
2373 return get_returning_data(node);
2374 }
2375
2376 /*
2377 * postgresEndDirectModify
2378 * Finish a direct foreign table modification
2379 */
2380 static void
postgresEndDirectModify(ForeignScanState * node)2381 postgresEndDirectModify(ForeignScanState *node)
2382 {
2383 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2384
2385 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2386 if (dmstate == NULL)
2387 return;
2388
2389 /* Release PGresult */
2390 if (dmstate->result)
2391 PQclear(dmstate->result);
2392
2393 /* Release remote connection */
2394 ReleaseConnection(dmstate->conn);
2395 dmstate->conn = NULL;
2396
2397 /* MemoryContext will be deleted automatically. */
2398 }
2399
2400 /*
2401 * postgresExplainForeignScan
2402 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2403 */
2404 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2405 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2406 {
2407 List *fdw_private;
2408 char *sql;
2409 char *relations;
2410
2411 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2412
2413 /*
2414 * Add names of relation handled by the foreign scan when the scan is a
2415 * join
2416 */
2417 if (list_length(fdw_private) > FdwScanPrivateRelations)
2418 {
2419 relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2420 ExplainPropertyText("Relations", relations, es);
2421 }
2422
2423 /*
2424 * Add remote query, when VERBOSE option is specified.
2425 */
2426 if (es->verbose)
2427 {
2428 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2429 ExplainPropertyText("Remote SQL", sql, es);
2430 }
2431 }
2432
2433 /*
2434 * postgresExplainForeignModify
2435 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2436 */
2437 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2438 postgresExplainForeignModify(ModifyTableState *mtstate,
2439 ResultRelInfo *rinfo,
2440 List *fdw_private,
2441 int subplan_index,
2442 ExplainState *es)
2443 {
2444 if (es->verbose)
2445 {
2446 char *sql = strVal(list_nth(fdw_private,
2447 FdwModifyPrivateUpdateSql));
2448
2449 ExplainPropertyText("Remote SQL", sql, es);
2450 }
2451 }
2452
2453 /*
2454 * postgresExplainDirectModify
2455 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2456 * foreign table directly
2457 */
2458 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2459 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2460 {
2461 List *fdw_private;
2462 char *sql;
2463
2464 if (es->verbose)
2465 {
2466 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2467 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2468 ExplainPropertyText("Remote SQL", sql, es);
2469 }
2470 }
2471
2472
2473 /*
2474 * estimate_path_cost_size
2475 * Get cost and size estimates for a foreign scan on given foreign relation
2476 * either a base relation or a join between foreign relations.
2477 *
2478 * param_join_conds are the parameterization clauses with outer relations.
2479 * pathkeys specify the expected sort order if any for given path being costed.
2480 *
2481 * The function returns the cost and size estimates in p_rows, p_width,
2482 * p_startup_cost and p_total_cost variables.
2483 */
2484 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)2485 estimate_path_cost_size(PlannerInfo *root,
2486 RelOptInfo *foreignrel,
2487 List *param_join_conds,
2488 List *pathkeys,
2489 double *p_rows, int *p_width,
2490 Cost *p_startup_cost, Cost *p_total_cost)
2491 {
2492 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2493 double rows;
2494 double retrieved_rows;
2495 int width;
2496 Cost startup_cost;
2497 Cost total_cost;
2498 Cost cpu_per_tuple;
2499
2500 /*
2501 * If the table or the server is configured to use remote estimates,
2502 * connect to the foreign server and execute EXPLAIN to estimate the
2503 * number of rows selected by the restriction+join clauses. Otherwise,
2504 * estimate rows using whatever statistics we have locally, in a way
2505 * similar to ordinary tables.
2506 */
2507 if (fpinfo->use_remote_estimate)
2508 {
2509 List *remote_param_join_conds;
2510 List *local_param_join_conds;
2511 StringInfoData sql;
2512 PGconn *conn;
2513 Selectivity local_sel;
2514 QualCost local_cost;
2515 List *fdw_scan_tlist = NIL;
2516 List *remote_conds;
2517
2518 /* Required only to be passed to deparseSelectStmtForRel */
2519 List *retrieved_attrs;
2520
2521 /*
2522 * param_join_conds might contain both clauses that are safe to send
2523 * across, and clauses that aren't.
2524 */
2525 classifyConditions(root, foreignrel, param_join_conds,
2526 &remote_param_join_conds, &local_param_join_conds);
2527
2528 /* Build the list of columns to be fetched from the foreign server. */
2529 if (foreignrel->reloptkind == RELOPT_JOINREL)
2530 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2531 else
2532 fdw_scan_tlist = NIL;
2533
2534 /*
2535 * The complete list of remote conditions includes everything from
2536 * baserestrictinfo plus any extra join_conds relevant to this
2537 * particular path.
2538 */
2539 remote_conds = list_concat(list_copy(remote_param_join_conds),
2540 fpinfo->remote_conds);
2541
2542 /*
2543 * Construct EXPLAIN query including the desired SELECT, FROM, and
2544 * WHERE clauses. Params and other-relation Vars are replaced by dummy
2545 * values, so don't request params_list.
2546 */
2547 initStringInfo(&sql);
2548 appendStringInfoString(&sql, "EXPLAIN ");
2549 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2550 remote_conds, pathkeys, &retrieved_attrs,
2551 NULL);
2552
2553 /* Get the remote estimate */
2554 conn = GetConnection(fpinfo->user, false);
2555 get_remote_estimate(sql.data, conn, &rows, &width,
2556 &startup_cost, &total_cost);
2557 ReleaseConnection(conn);
2558
2559 retrieved_rows = rows;
2560
2561 /* Factor in the selectivity of the locally-checked quals */
2562 local_sel = clauselist_selectivity(root,
2563 local_param_join_conds,
2564 foreignrel->relid,
2565 JOIN_INNER,
2566 NULL);
2567 local_sel *= fpinfo->local_conds_sel;
2568
2569 rows = clamp_row_est(rows * local_sel);
2570
2571 /* Add in the eval cost of the locally-checked quals */
2572 startup_cost += fpinfo->local_conds_cost.startup;
2573 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2574 cost_qual_eval(&local_cost, local_param_join_conds, root);
2575 startup_cost += local_cost.startup;
2576 total_cost += local_cost.per_tuple * retrieved_rows;
2577 }
2578 else
2579 {
2580 Cost run_cost = 0;
2581
2582 /*
2583 * We don't support join conditions in this mode (hence, no
2584 * parameterized paths can be made).
2585 */
2586 Assert(param_join_conds == NIL);
2587
2588 /*
2589 * Use rows/width estimates made by set_baserel_size_estimates() for
2590 * base foreign relations and set_joinrel_size_estimates() for join
2591 * between foreign relations.
2592 */
2593 rows = foreignrel->rows;
2594 width = foreignrel->reltarget->width;
2595
2596 /* Back into an estimate of the number of retrieved rows. */
2597 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2598
2599 /*
2600 * We will come here again and again with different set of pathkeys
2601 * that caller wants to cost. We don't need to calculate the cost of
2602 * bare scan each time. Instead, use the costs if we have cached them
2603 * already.
2604 */
2605 if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2606 {
2607 startup_cost = fpinfo->rel_startup_cost;
2608 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2609 }
2610 else if (foreignrel->reloptkind != RELOPT_JOINREL)
2611 {
2612 /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2613 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2614
2615 /*
2616 * Cost as though this were a seqscan, which is pessimistic. We
2617 * effectively imagine the local_conds are being evaluated
2618 * remotely, too.
2619 */
2620 startup_cost = 0;
2621 run_cost = 0;
2622 run_cost += seq_page_cost * foreignrel->pages;
2623
2624 startup_cost += foreignrel->baserestrictcost.startup;
2625 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2626 run_cost += cpu_per_tuple * foreignrel->tuples;
2627 }
2628 else
2629 {
2630 PgFdwRelationInfo *fpinfo_i;
2631 PgFdwRelationInfo *fpinfo_o;
2632 QualCost join_cost;
2633 QualCost remote_conds_cost;
2634 double nrows;
2635
2636 /* For join we expect inner and outer relations set */
2637 Assert(fpinfo->innerrel && fpinfo->outerrel);
2638
2639 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2640 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2641
2642 /* Estimate of number of rows in cross product */
2643 nrows = fpinfo_i->rows * fpinfo_o->rows;
2644 /* Clamp retrieved rows estimate to at most size of cross product */
2645 retrieved_rows = Min(retrieved_rows, nrows);
2646
2647 /*
2648 * The cost of foreign join is estimated as cost of generating
2649 * rows for the joining relations + cost for applying quals on the
2650 * rows.
2651 */
2652
2653 /* Calculate the cost of clauses pushed down the foreign server */
2654 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2655 /* Calculate the cost of applying join clauses */
2656 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2657
2658 /*
2659 * Startup cost includes startup cost of joining relations and the
2660 * startup cost for join and other clauses. We do not include the
2661 * startup cost specific to join strategy (e.g. setting up hash
2662 * tables) since we do not know what strategy the foreign server
2663 * is going to use.
2664 */
2665 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2666 startup_cost += join_cost.startup;
2667 startup_cost += remote_conds_cost.startup;
2668 startup_cost += fpinfo->local_conds_cost.startup;
2669
2670 /*
2671 * Run time cost includes:
2672 *
2673 * 1. Run time cost (total_cost - startup_cost) of relations being
2674 * joined
2675 *
2676 * 2. Run time cost of applying join clauses on the cross product
2677 * of the joining relations.
2678 *
2679 * 3. Run time cost of applying pushed down other clauses on the
2680 * result of join
2681 *
2682 * 4. Run time cost of applying nonpushable other clauses locally
2683 * on the result fetched from the foreign server.
2684 */
2685 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2686 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2687 run_cost += nrows * join_cost.per_tuple;
2688 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2689 run_cost += nrows * remote_conds_cost.per_tuple;
2690 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2691 }
2692
2693 /*
2694 * Without remote estimates, we have no real way to estimate the cost
2695 * of generating sorted output. It could be free if the query plan
2696 * the remote side would have chosen generates properly-sorted output
2697 * anyway, but in most cases it will cost something. Estimate a value
2698 * high enough that we won't pick the sorted path when the ordering
2699 * isn't locally useful, but low enough that we'll err on the side of
2700 * pushing down the ORDER BY clause when it's useful to do so.
2701 */
2702 if (pathkeys != NIL)
2703 {
2704 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2705 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2706 }
2707
2708 total_cost = startup_cost + run_cost;
2709 }
2710
2711 /*
2712 * Cache the costs for scans without any pathkeys or parameterization
2713 * before adding the costs for transferring data from the foreign server.
2714 * These costs are useful for costing the join between this relation and
2715 * another foreign relation or to calculate the costs of paths with
2716 * pathkeys for this relation, when the costs can not be obtained from the
2717 * foreign server. This function will be called at least once for every
2718 * foreign relation without pathkeys and parameterization.
2719 */
2720 if (pathkeys == NIL && param_join_conds == NIL)
2721 {
2722 fpinfo->rel_startup_cost = startup_cost;
2723 fpinfo->rel_total_cost = total_cost;
2724 }
2725
2726 /*
2727 * Add some additional cost factors to account for connection overhead
2728 * (fdw_startup_cost), transferring data across the network
2729 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2730 * (cpu_tuple_cost per retrieved row).
2731 */
2732 startup_cost += fpinfo->fdw_startup_cost;
2733 total_cost += fpinfo->fdw_startup_cost;
2734 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2735 total_cost += cpu_tuple_cost * retrieved_rows;
2736
2737 /* Return results. */
2738 *p_rows = rows;
2739 *p_width = width;
2740 *p_startup_cost = startup_cost;
2741 *p_total_cost = total_cost;
2742 }
2743
2744 /*
2745 * Estimate costs of executing a SQL statement remotely.
2746 * The given "sql" must be an EXPLAIN command.
2747 */
2748 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)2749 get_remote_estimate(const char *sql, PGconn *conn,
2750 double *rows, int *width,
2751 Cost *startup_cost, Cost *total_cost)
2752 {
2753 PGresult *volatile res = NULL;
2754
2755 /* PGresult must be released before leaving this function. */
2756 PG_TRY();
2757 {
2758 char *line;
2759 char *p;
2760 int n;
2761
2762 /*
2763 * Execute EXPLAIN remotely.
2764 */
2765 res = pgfdw_exec_query(conn, sql);
2766 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2767 pgfdw_report_error(ERROR, res, conn, false, sql);
2768
2769 /*
2770 * Extract cost numbers for topmost plan node. Note we search for a
2771 * left paren from the end of the line to avoid being confused by
2772 * other uses of parentheses.
2773 */
2774 line = PQgetvalue(res, 0, 0);
2775 p = strrchr(line, '(');
2776 if (p == NULL)
2777 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2778 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2779 startup_cost, total_cost, rows, width);
2780 if (n != 4)
2781 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2782
2783 PQclear(res);
2784 res = NULL;
2785 }
2786 PG_CATCH();
2787 {
2788 if (res)
2789 PQclear(res);
2790 PG_RE_THROW();
2791 }
2792 PG_END_TRY();
2793 }
2794
2795 /*
2796 * Detect whether we want to process an EquivalenceClass member.
2797 *
2798 * This is a callback for use by generate_implied_equalities_for_column.
2799 */
2800 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)2801 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
2802 EquivalenceClass *ec, EquivalenceMember *em,
2803 void *arg)
2804 {
2805 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
2806 Expr *expr = em->em_expr;
2807
2808 /*
2809 * If we've identified what we're processing in the current scan, we only
2810 * want to match that expression.
2811 */
2812 if (state->current != NULL)
2813 return equal(expr, state->current);
2814
2815 /*
2816 * Otherwise, ignore anything we've already processed.
2817 */
2818 if (list_member(state->already_used, expr))
2819 return false;
2820
2821 /* This is the new target to process. */
2822 state->current = expr;
2823 return true;
2824 }
2825
2826 /*
2827 * Create cursor for node's query with current parameter values.
2828 */
2829 static void
create_cursor(ForeignScanState * node)2830 create_cursor(ForeignScanState *node)
2831 {
2832 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2833 ExprContext *econtext = node->ss.ps.ps_ExprContext;
2834 int numParams = fsstate->numParams;
2835 const char **values = fsstate->param_values;
2836 PGconn *conn = fsstate->conn;
2837 StringInfoData buf;
2838 PGresult *res;
2839
2840 /*
2841 * Construct array of query parameter values in text format. We do the
2842 * conversions in the short-lived per-tuple context, so as not to cause a
2843 * memory leak over repeated scans.
2844 */
2845 if (numParams > 0)
2846 {
2847 MemoryContext oldcontext;
2848
2849 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2850
2851 process_query_params(econtext,
2852 fsstate->param_flinfo,
2853 fsstate->param_exprs,
2854 values);
2855
2856 MemoryContextSwitchTo(oldcontext);
2857 }
2858
2859 /* Construct the DECLARE CURSOR command */
2860 initStringInfo(&buf);
2861 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2862 fsstate->cursor_number, fsstate->query);
2863
2864 /*
2865 * Notice that we pass NULL for paramTypes, thus forcing the remote server
2866 * to infer types for all parameters. Since we explicitly cast every
2867 * parameter (see deparse.c), the "inference" is trivial and will produce
2868 * the desired result. This allows us to avoid assuming that the remote
2869 * server has the same OIDs we do for the parameters' types.
2870 */
2871 if (!PQsendQueryParams(conn, buf.data, numParams,
2872 NULL, values, NULL, NULL, 0))
2873 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2874
2875 /*
2876 * Get the result, and check for success.
2877 *
2878 * We don't use a PG_TRY block here, so be careful not to throw error
2879 * without releasing the PGresult.
2880 */
2881 res = pgfdw_get_result(conn, buf.data);
2882 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2883 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2884 PQclear(res);
2885
2886 /* Mark the cursor as created, and show no tuples have been retrieved */
2887 fsstate->cursor_exists = true;
2888 fsstate->tuples = NULL;
2889 fsstate->num_tuples = 0;
2890 fsstate->next_tuple = 0;
2891 fsstate->fetch_ct_2 = 0;
2892 fsstate->eof_reached = false;
2893
2894 /* Clean up */
2895 pfree(buf.data);
2896 }
2897
2898 /*
2899 * Fetch some more rows from the node's cursor.
2900 */
2901 static void
fetch_more_data(ForeignScanState * node)2902 fetch_more_data(ForeignScanState *node)
2903 {
2904 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2905 PGresult *volatile res = NULL;
2906 MemoryContext oldcontext;
2907
2908 /*
2909 * We'll store the tuples in the batch_cxt. First, flush the previous
2910 * batch.
2911 */
2912 fsstate->tuples = NULL;
2913 MemoryContextReset(fsstate->batch_cxt);
2914 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
2915
2916 /* PGresult must be released before leaving this function. */
2917 PG_TRY();
2918 {
2919 PGconn *conn = fsstate->conn;
2920 char sql[64];
2921 int numrows;
2922 int i;
2923
2924 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
2925 fsstate->fetch_size, fsstate->cursor_number);
2926
2927 res = pgfdw_exec_query(conn, sql);
2928 /* On error, report the original query, not the FETCH. */
2929 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2930 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
2931
2932 /* Convert the data into HeapTuples */
2933 numrows = PQntuples(res);
2934 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
2935 fsstate->num_tuples = numrows;
2936 fsstate->next_tuple = 0;
2937
2938 for (i = 0; i < numrows; i++)
2939 {
2940 Assert(IsA(node->ss.ps.plan, ForeignScan));
2941
2942 fsstate->tuples[i] =
2943 make_tuple_from_result_row(res, i,
2944 fsstate->rel,
2945 fsstate->attinmeta,
2946 fsstate->retrieved_attrs,
2947 node,
2948 fsstate->temp_cxt);
2949 }
2950
2951 /* Update fetch_ct_2 */
2952 if (fsstate->fetch_ct_2 < 2)
2953 fsstate->fetch_ct_2++;
2954
2955 /* Must be EOF if we didn't get as many tuples as we asked for. */
2956 fsstate->eof_reached = (numrows < fsstate->fetch_size);
2957
2958 PQclear(res);
2959 res = NULL;
2960 }
2961 PG_CATCH();
2962 {
2963 if (res)
2964 PQclear(res);
2965 PG_RE_THROW();
2966 }
2967 PG_END_TRY();
2968
2969 MemoryContextSwitchTo(oldcontext);
2970 }
2971
2972 /*
2973 * Force assorted GUC parameters to settings that ensure that we'll output
2974 * data values in a form that is unambiguous to the remote server.
2975 *
2976 * This is rather expensive and annoying to do once per row, but there's
2977 * little choice if we want to be sure values are transmitted accurately;
2978 * we can't leave the settings in place between rows for fear of affecting
2979 * user-visible computations.
2980 *
2981 * We use the equivalent of a function SET option to allow the settings to
2982 * persist only until the caller calls reset_transmission_modes(). If an
2983 * error is thrown in between, guc.c will take care of undoing the settings.
2984 *
2985 * The return value is the nestlevel that must be passed to
2986 * reset_transmission_modes() to undo things.
2987 */
2988 int
set_transmission_modes(void)2989 set_transmission_modes(void)
2990 {
2991 int nestlevel = NewGUCNestLevel();
2992
2993 /*
2994 * The values set here should match what pg_dump does. See also
2995 * configure_remote_session in connection.c.
2996 */
2997 if (DateStyle != USE_ISO_DATES)
2998 (void) set_config_option("datestyle", "ISO",
2999 PGC_USERSET, PGC_S_SESSION,
3000 GUC_ACTION_SAVE, true, 0, false);
3001 if (IntervalStyle != INTSTYLE_POSTGRES)
3002 (void) set_config_option("intervalstyle", "postgres",
3003 PGC_USERSET, PGC_S_SESSION,
3004 GUC_ACTION_SAVE, true, 0, false);
3005 if (extra_float_digits < 3)
3006 (void) set_config_option("extra_float_digits", "3",
3007 PGC_USERSET, PGC_S_SESSION,
3008 GUC_ACTION_SAVE, true, 0, false);
3009
3010 return nestlevel;
3011 }
3012
3013 /*
3014 * Undo the effects of set_transmission_modes().
3015 */
3016 void
reset_transmission_modes(int nestlevel)3017 reset_transmission_modes(int nestlevel)
3018 {
3019 AtEOXact_GUC(true, nestlevel);
3020 }
3021
3022 /*
3023 * Utility routine to close a cursor.
3024 */
3025 static void
close_cursor(PGconn * conn,unsigned int cursor_number)3026 close_cursor(PGconn *conn, unsigned int cursor_number)
3027 {
3028 char sql[64];
3029 PGresult *res;
3030
3031 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3032
3033 /*
3034 * We don't use a PG_TRY block here, so be careful not to throw error
3035 * without releasing the PGresult.
3036 */
3037 res = pgfdw_exec_query(conn, sql);
3038 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3039 pgfdw_report_error(ERROR, res, conn, true, sql);
3040 PQclear(res);
3041 }
3042
3043 /*
3044 * prepare_foreign_modify
3045 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3046 */
3047 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)3048 prepare_foreign_modify(PgFdwModifyState *fmstate)
3049 {
3050 char prep_name[NAMEDATALEN];
3051 char *p_name;
3052 PGresult *res;
3053
3054 /* Construct name we'll use for the prepared statement. */
3055 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3056 GetPrepStmtNumber(fmstate->conn));
3057 p_name = pstrdup(prep_name);
3058
3059 /*
3060 * We intentionally do not specify parameter types here, but leave the
3061 * remote server to derive them by default. This avoids possible problems
3062 * with the remote server using different type OIDs than we do. All of
3063 * the prepared statements we use in this module are simple enough that
3064 * the remote server will make the right choices.
3065 */
3066 if (!PQsendPrepare(fmstate->conn,
3067 p_name,
3068 fmstate->query,
3069 0,
3070 NULL))
3071 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3072
3073 /*
3074 * Get the result, and check for success.
3075 *
3076 * We don't use a PG_TRY block here, so be careful not to throw error
3077 * without releasing the PGresult.
3078 */
3079 res = pgfdw_get_result(fmstate->conn, fmstate->query);
3080 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3081 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3082 PQclear(res);
3083
3084 /* This action shows that the prepare has been done. */
3085 fmstate->p_name = p_name;
3086 }
3087
3088 /*
3089 * convert_prep_stmt_params
3090 * Create array of text strings representing parameter values
3091 *
3092 * tupleid is ctid to send, or NULL if none
3093 * slot is slot to get remaining parameters from, or NULL if none
3094 *
3095 * Data is constructed in temp_cxt; caller should reset that after use.
3096 */
3097 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot * slot)3098 convert_prep_stmt_params(PgFdwModifyState *fmstate,
3099 ItemPointer tupleid,
3100 TupleTableSlot *slot)
3101 {
3102 const char **p_values;
3103 int pindex = 0;
3104 MemoryContext oldcontext;
3105
3106 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3107
3108 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3109
3110 /* 1st parameter should be ctid, if it's in use */
3111 if (tupleid != NULL)
3112 {
3113 /* don't need set_transmission_modes for TID output */
3114 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3115 PointerGetDatum(tupleid));
3116 pindex++;
3117 }
3118
3119 /* get following parameters from slot */
3120 if (slot != NULL && fmstate->target_attrs != NIL)
3121 {
3122 int nestlevel;
3123 ListCell *lc;
3124
3125 nestlevel = set_transmission_modes();
3126
3127 foreach(lc, fmstate->target_attrs)
3128 {
3129 int attnum = lfirst_int(lc);
3130 Datum value;
3131 bool isnull;
3132
3133 value = slot_getattr(slot, attnum, &isnull);
3134 if (isnull)
3135 p_values[pindex] = NULL;
3136 else
3137 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3138 value);
3139 pindex++;
3140 }
3141
3142 reset_transmission_modes(nestlevel);
3143 }
3144
3145 Assert(pindex == fmstate->p_nums);
3146
3147 MemoryContextSwitchTo(oldcontext);
3148
3149 return p_values;
3150 }
3151
3152 /*
3153 * store_returning_result
3154 * Store the result of a RETURNING clause
3155 *
3156 * On error, be sure to release the PGresult on the way out. Callers do not
3157 * have PG_TRY blocks to ensure this happens.
3158 */
3159 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)3160 store_returning_result(PgFdwModifyState *fmstate,
3161 TupleTableSlot *slot, PGresult *res)
3162 {
3163 PG_TRY();
3164 {
3165 HeapTuple newtup;
3166
3167 newtup = make_tuple_from_result_row(res, 0,
3168 fmstate->rel,
3169 fmstate->attinmeta,
3170 fmstate->retrieved_attrs,
3171 NULL,
3172 fmstate->temp_cxt);
3173 /* tuple will be deleted when it is cleared from the slot */
3174 ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3175 }
3176 PG_CATCH();
3177 {
3178 if (res)
3179 PQclear(res);
3180 PG_RE_THROW();
3181 }
3182 PG_END_TRY();
3183 }
3184
3185 /*
3186 * Execute a direct UPDATE/DELETE statement.
3187 */
3188 static void
execute_dml_stmt(ForeignScanState * node)3189 execute_dml_stmt(ForeignScanState *node)
3190 {
3191 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3192 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3193 int numParams = dmstate->numParams;
3194 const char **values = dmstate->param_values;
3195
3196 /*
3197 * Construct array of query parameter values in text format.
3198 */
3199 if (numParams > 0)
3200 process_query_params(econtext,
3201 dmstate->param_flinfo,
3202 dmstate->param_exprs,
3203 values);
3204
3205 /*
3206 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3207 * to infer types for all parameters. Since we explicitly cast every
3208 * parameter (see deparse.c), the "inference" is trivial and will produce
3209 * the desired result. This allows us to avoid assuming that the remote
3210 * server has the same OIDs we do for the parameters' types.
3211 */
3212 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3213 NULL, values, NULL, NULL, 0))
3214 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3215
3216 /*
3217 * Get the result, and check for success.
3218 *
3219 * We don't use a PG_TRY block here, so be careful not to throw error
3220 * without releasing the PGresult.
3221 */
3222 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3223 if (PQresultStatus(dmstate->result) !=
3224 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
3225 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3226 dmstate->query);
3227
3228 /* Get the number of rows affected. */
3229 if (dmstate->has_returning)
3230 dmstate->num_tuples = PQntuples(dmstate->result);
3231 else
3232 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3233 }
3234
3235 /*
3236 * Get the result of a RETURNING clause.
3237 */
3238 static TupleTableSlot *
get_returning_data(ForeignScanState * node)3239 get_returning_data(ForeignScanState *node)
3240 {
3241 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
3242 EState *estate = node->ss.ps.state;
3243 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3244 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3245
3246 Assert(resultRelInfo->ri_projectReturning);
3247
3248 /* If we didn't get any tuples, must be end of data. */
3249 if (dmstate->next_tuple >= dmstate->num_tuples)
3250 return ExecClearTuple(slot);
3251
3252 /* Increment the command es_processed count if necessary. */
3253 if (dmstate->set_processed)
3254 estate->es_processed += 1;
3255
3256 /*
3257 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3258 * tuple. (has_returning is false when the local query is of the form
3259 * "UPDATE/DELETE .. RETURNING 1" for example.)
3260 */
3261 if (!dmstate->has_returning)
3262 ExecStoreAllNullTuple(slot);
3263 else
3264 {
3265 /*
3266 * On error, be sure to release the PGresult on the way out. Callers
3267 * do not have PG_TRY blocks to ensure this happens.
3268 */
3269 PG_TRY();
3270 {
3271 HeapTuple newtup;
3272
3273 newtup = make_tuple_from_result_row(dmstate->result,
3274 dmstate->next_tuple,
3275 dmstate->rel,
3276 dmstate->attinmeta,
3277 dmstate->retrieved_attrs,
3278 NULL,
3279 dmstate->temp_cxt);
3280 ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3281 }
3282 PG_CATCH();
3283 {
3284 if (dmstate->result)
3285 PQclear(dmstate->result);
3286 PG_RE_THROW();
3287 }
3288 PG_END_TRY();
3289 }
3290 dmstate->next_tuple++;
3291
3292 /* Make slot available for evaluation of the local query RETURNING list. */
3293 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3294
3295 return slot;
3296 }
3297
3298 /*
3299 * Prepare for processing of parameters used in remote query.
3300 */
3301 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)3302 prepare_query_params(PlanState *node,
3303 List *fdw_exprs,
3304 int numParams,
3305 FmgrInfo **param_flinfo,
3306 List **param_exprs,
3307 const char ***param_values)
3308 {
3309 int i;
3310 ListCell *lc;
3311
3312 Assert(numParams > 0);
3313
3314 /* Prepare for output conversion of parameters used in remote query. */
3315 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3316
3317 i = 0;
3318 foreach(lc, fdw_exprs)
3319 {
3320 Node *param_expr = (Node *) lfirst(lc);
3321 Oid typefnoid;
3322 bool isvarlena;
3323
3324 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3325 fmgr_info(typefnoid, &(*param_flinfo)[i]);
3326 i++;
3327 }
3328
3329 /*
3330 * Prepare remote-parameter expressions for evaluation. (Note: in
3331 * practice, we expect that all these expressions will be just Params, so
3332 * we could possibly do something more efficient than using the full
3333 * expression-eval machinery for this. But probably there would be little
3334 * benefit, and it'd require postgres_fdw to know more than is desirable
3335 * about Param evaluation.)
3336 */
3337 *param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
3338
3339 /* Allocate buffer for text form of query parameters. */
3340 *param_values = (const char **) palloc0(numParams * sizeof(char *));
3341 }
3342
3343 /*
3344 * Construct array of query parameter values in text format.
3345 */
3346 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)3347 process_query_params(ExprContext *econtext,
3348 FmgrInfo *param_flinfo,
3349 List *param_exprs,
3350 const char **param_values)
3351 {
3352 int nestlevel;
3353 int i;
3354 ListCell *lc;
3355
3356 nestlevel = set_transmission_modes();
3357
3358 i = 0;
3359 foreach(lc, param_exprs)
3360 {
3361 ExprState *expr_state = (ExprState *) lfirst(lc);
3362 Datum expr_value;
3363 bool isNull;
3364
3365 /* Evaluate the parameter expression */
3366 expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
3367
3368 /*
3369 * Get string representation of each parameter value by invoking
3370 * type-specific output function, unless the value is null.
3371 */
3372 if (isNull)
3373 param_values[i] = NULL;
3374 else
3375 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
3376
3377 i++;
3378 }
3379
3380 reset_transmission_modes(nestlevel);
3381 }
3382
3383 /*
3384 * postgresAnalyzeForeignTable
3385 * Test whether analyzing this foreign table is supported
3386 */
3387 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)3388 postgresAnalyzeForeignTable(Relation relation,
3389 AcquireSampleRowsFunc *func,
3390 BlockNumber *totalpages)
3391 {
3392 ForeignTable *table;
3393 UserMapping *user;
3394 PGconn *conn;
3395 StringInfoData sql;
3396 PGresult *volatile res = NULL;
3397
3398 /* Return the row-analysis function pointer */
3399 *func = postgresAcquireSampleRowsFunc;
3400
3401 /*
3402 * Now we have to get the number of pages. It's annoying that the ANALYZE
3403 * API requires us to return that now, because it forces some duplication
3404 * of effort between this routine and postgresAcquireSampleRowsFunc. But
3405 * it's probably not worth redefining that API at this point.
3406 */
3407
3408 /*
3409 * Get the connection to use. We do the remote access as the table's
3410 * owner, even if the ANALYZE was started by some other user.
3411 */
3412 table = GetForeignTable(RelationGetRelid(relation));
3413 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3414 conn = GetConnection(user, false);
3415
3416 /*
3417 * Construct command to get page count for relation.
3418 */
3419 initStringInfo(&sql);
3420 deparseAnalyzeSizeSql(&sql, relation);
3421
3422 /* In what follows, do not risk leaking any PGresults. */
3423 PG_TRY();
3424 {
3425 res = pgfdw_exec_query(conn, sql.data);
3426 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3427 pgfdw_report_error(ERROR, res, conn, false, sql.data);
3428
3429 if (PQntuples(res) != 1 || PQnfields(res) != 1)
3430 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3431 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3432
3433 PQclear(res);
3434 res = NULL;
3435 }
3436 PG_CATCH();
3437 {
3438 if (res)
3439 PQclear(res);
3440 PG_RE_THROW();
3441 }
3442 PG_END_TRY();
3443
3444 ReleaseConnection(conn);
3445
3446 return true;
3447 }
3448
3449 /*
3450 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3451 *
3452 * We fetch the whole table from the remote side and pick out some sample rows.
3453 *
3454 * Selected rows are returned in the caller-allocated array rows[],
3455 * which must have at least targrows entries.
3456 * The actual number of rows selected is returned as the function result.
3457 * We also count the total number of rows in the table and return it into
3458 * *totalrows. Note that *totaldeadrows is always set to 0.
3459 *
3460 * Note that the returned list of rows is not always in order by physical
3461 * position in the table. Therefore, correlation estimates derived later
3462 * may be meaningless, but it's OK because we don't use the estimates
3463 * currently (the planner only pays attention to correlation for indexscans).
3464 */
3465 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)3466 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
3467 HeapTuple *rows, int targrows,
3468 double *totalrows,
3469 double *totaldeadrows)
3470 {
3471 PgFdwAnalyzeState astate;
3472 ForeignTable *table;
3473 ForeignServer *server;
3474 UserMapping *user;
3475 PGconn *conn;
3476 unsigned int cursor_number;
3477 StringInfoData sql;
3478 PGresult *volatile res = NULL;
3479
3480 /* Initialize workspace state */
3481 astate.rel = relation;
3482 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
3483
3484 astate.rows = rows;
3485 astate.targrows = targrows;
3486 astate.numrows = 0;
3487 astate.samplerows = 0;
3488 astate.rowstoskip = -1; /* -1 means not set yet */
3489 reservoir_init_selection_state(&astate.rstate, targrows);
3490
3491 /* Remember ANALYZE context, and create a per-tuple temp context */
3492 astate.anl_cxt = CurrentMemoryContext;
3493 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
3494 "postgres_fdw temporary data",
3495 ALLOCSET_SMALL_SIZES);
3496
3497 /*
3498 * Get the connection to use. We do the remote access as the table's
3499 * owner, even if the ANALYZE was started by some other user.
3500 */
3501 table = GetForeignTable(RelationGetRelid(relation));
3502 server = GetForeignServer(table->serverid);
3503 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3504 conn = GetConnection(user, false);
3505
3506 /*
3507 * Construct cursor that retrieves whole rows from remote.
3508 */
3509 cursor_number = GetCursorNumber(conn);
3510 initStringInfo(&sql);
3511 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3512 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3513
3514 /* In what follows, do not risk leaking any PGresults. */
3515 PG_TRY();
3516 {
3517 res = pgfdw_exec_query(conn, sql.data);
3518 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3519 pgfdw_report_error(ERROR, res, conn, false, sql.data);
3520 PQclear(res);
3521 res = NULL;
3522
3523 /* Retrieve and process rows a batch at a time. */
3524 for (;;)
3525 {
3526 char fetch_sql[64];
3527 int fetch_size;
3528 int numrows;
3529 int i;
3530 ListCell *lc;
3531
3532 /* Allow users to cancel long query */
3533 CHECK_FOR_INTERRUPTS();
3534
3535 /*
3536 * XXX possible future improvement: if rowstoskip is large, we
3537 * could issue a MOVE rather than physically fetching the rows,
3538 * then just adjust rowstoskip and samplerows appropriately.
3539 */
3540
3541 /* The fetch size is arbitrary, but shouldn't be enormous. */
3542 fetch_size = 100;
3543 foreach(lc, server->options)
3544 {
3545 DefElem *def = (DefElem *) lfirst(lc);
3546
3547 if (strcmp(def->defname, "fetch_size") == 0)
3548 {
3549 fetch_size = strtol(defGetString(def), NULL, 10);
3550 break;
3551 }
3552 }
3553 foreach(lc, table->options)
3554 {
3555 DefElem *def = (DefElem *) lfirst(lc);
3556
3557 if (strcmp(def->defname, "fetch_size") == 0)
3558 {
3559 fetch_size = strtol(defGetString(def), NULL, 10);
3560 break;
3561 }
3562 }
3563
3564 /* Fetch some rows */
3565 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3566 fetch_size, cursor_number);
3567
3568 res = pgfdw_exec_query(conn, fetch_sql);
3569 /* On error, report the original query, not the FETCH. */
3570 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3571 pgfdw_report_error(ERROR, res, conn, false, sql.data);
3572
3573 /* Process whatever we got. */
3574 numrows = PQntuples(res);
3575 for (i = 0; i < numrows; i++)
3576 analyze_row_processor(res, i, &astate);
3577
3578 PQclear(res);
3579 res = NULL;
3580
3581 /* Must be EOF if we didn't get all the rows requested. */
3582 if (numrows < fetch_size)
3583 break;
3584 }
3585
3586 /* Close the cursor, just to be tidy. */
3587 close_cursor(conn, cursor_number);
3588 }
3589 PG_CATCH();
3590 {
3591 if (res)
3592 PQclear(res);
3593 PG_RE_THROW();
3594 }
3595 PG_END_TRY();
3596
3597 ReleaseConnection(conn);
3598
3599 /* We assume that we have no dead tuple. */
3600 *totaldeadrows = 0.0;
3601
3602 /* We've retrieved all living tuples from foreign server. */
3603 *totalrows = astate.samplerows;
3604
3605 /*
3606 * Emit some interesting relation info
3607 */
3608 ereport(elevel,
3609 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3610 RelationGetRelationName(relation),
3611 astate.samplerows, astate.numrows)));
3612
3613 return astate.numrows;
3614 }
3615
3616 /*
3617 * Collect sample rows from the result of query.
3618 * - Use all tuples in sample until target # of samples are collected.
3619 * - Subsequently, replace already-sampled tuples randomly.
3620 */
3621 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)3622 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
3623 {
3624 int targrows = astate->targrows;
3625 int pos; /* array index to store tuple in */
3626 MemoryContext oldcontext;
3627
3628 /* Always increment sample row counter. */
3629 astate->samplerows += 1;
3630
3631 /*
3632 * Determine the slot where this sample row should be stored. Set pos to
3633 * negative value to indicate the row should be skipped.
3634 */
3635 if (astate->numrows < targrows)
3636 {
3637 /* First targrows rows are always included into the sample */
3638 pos = astate->numrows++;
3639 }
3640 else
3641 {
3642 /*
3643 * Now we start replacing tuples in the sample until we reach the end
3644 * of the relation. Same algorithm as in acquire_sample_rows in
3645 * analyze.c; see Jeff Vitter's paper.
3646 */
3647 if (astate->rowstoskip < 0)
3648 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3649
3650 if (astate->rowstoskip <= 0)
3651 {
3652 /* Choose a random reservoir element to replace. */
3653 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3654 Assert(pos >= 0 && pos < targrows);
3655 heap_freetuple(astate->rows[pos]);
3656 }
3657 else
3658 {
3659 /* Skip this tuple. */
3660 pos = -1;
3661 }
3662
3663 astate->rowstoskip -= 1;
3664 }
3665
3666 if (pos >= 0)
3667 {
3668 /*
3669 * Create sample tuple from current result row, and store it in the
3670 * position determined above. The tuple has to be created in anl_cxt.
3671 */
3672 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3673
3674 astate->rows[pos] = make_tuple_from_result_row(res, row,
3675 astate->rel,
3676 astate->attinmeta,
3677 astate->retrieved_attrs,
3678 NULL,
3679 astate->temp_cxt);
3680
3681 MemoryContextSwitchTo(oldcontext);
3682 }
3683 }
3684
3685 /*
3686 * Import a foreign schema
3687 */
3688 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)3689 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
3690 {
3691 List *commands = NIL;
3692 bool import_collate = true;
3693 bool import_default = false;
3694 bool import_not_null = true;
3695 ForeignServer *server;
3696 UserMapping *mapping;
3697 PGconn *conn;
3698 StringInfoData buf;
3699 PGresult *volatile res = NULL;
3700 int numrows,
3701 i;
3702 ListCell *lc;
3703
3704 /* Parse statement options */
3705 foreach(lc, stmt->options)
3706 {
3707 DefElem *def = (DefElem *) lfirst(lc);
3708
3709 if (strcmp(def->defname, "import_collate") == 0)
3710 import_collate = defGetBoolean(def);
3711 else if (strcmp(def->defname, "import_default") == 0)
3712 import_default = defGetBoolean(def);
3713 else if (strcmp(def->defname, "import_not_null") == 0)
3714 import_not_null = defGetBoolean(def);
3715 else
3716 ereport(ERROR,
3717 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3718 errmsg("invalid option \"%s\"", def->defname)));
3719 }
3720
3721 /*
3722 * Get connection to the foreign server. Connection manager will
3723 * establish new connection if necessary.
3724 */
3725 server = GetForeignServer(serverOid);
3726 mapping = GetUserMapping(GetUserId(), server->serverid);
3727 conn = GetConnection(mapping, false);
3728
3729 /* Don't attempt to import collation if remote server hasn't got it */
3730 if (PQserverVersion(conn) < 90100)
3731 import_collate = false;
3732
3733 /* Create workspace for strings */
3734 initStringInfo(&buf);
3735
3736 /* In what follows, do not risk leaking any PGresults. */
3737 PG_TRY();
3738 {
3739 /* Check that the schema really exists */
3740 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3741 deparseStringLiteral(&buf, stmt->remote_schema);
3742
3743 res = pgfdw_exec_query(conn, buf.data);
3744 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3745 pgfdw_report_error(ERROR, res, conn, false, buf.data);
3746
3747 if (PQntuples(res) != 1)
3748 ereport(ERROR,
3749 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3750 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3751 stmt->remote_schema, server->servername)));
3752
3753 PQclear(res);
3754 res = NULL;
3755 resetStringInfo(&buf);
3756
3757 /*
3758 * Fetch all table data from this schema, possibly restricted by
3759 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3760 * to EXCEPT/LIMIT TO here, because the core code will filter the
3761 * statements we return according to those lists anyway. But it
3762 * should save a few cycles to not process excluded tables in the
3763 * first place.)
3764 *
3765 * Note: because we run the connection with search_path restricted to
3766 * pg_catalog, the format_type() and pg_get_expr() outputs will always
3767 * include a schema name for types/functions in other schemas, which
3768 * is what we want.
3769 */
3770 if (import_collate)
3771 appendStringInfoString(&buf,
3772 "SELECT relname, "
3773 " attname, "
3774 " format_type(atttypid, atttypmod), "
3775 " attnotnull, "
3776 " pg_get_expr(adbin, adrelid), "
3777 " collname, "
3778 " collnsp.nspname "
3779 "FROM pg_class c "
3780 " JOIN pg_namespace n ON "
3781 " relnamespace = n.oid "
3782 " LEFT JOIN pg_attribute a ON "
3783 " attrelid = c.oid AND attnum > 0 "
3784 " AND NOT attisdropped "
3785 " LEFT JOIN pg_attrdef ad ON "
3786 " adrelid = c.oid AND adnum = attnum "
3787 " LEFT JOIN pg_collation coll ON "
3788 " coll.oid = attcollation "
3789 " LEFT JOIN pg_namespace collnsp ON "
3790 " collnsp.oid = collnamespace ");
3791 else
3792 appendStringInfoString(&buf,
3793 "SELECT relname, "
3794 " attname, "
3795 " format_type(atttypid, atttypmod), "
3796 " attnotnull, "
3797 " pg_get_expr(adbin, adrelid), "
3798 " NULL, NULL "
3799 "FROM pg_class c "
3800 " JOIN pg_namespace n ON "
3801 " relnamespace = n.oid "
3802 " LEFT JOIN pg_attribute a ON "
3803 " attrelid = c.oid AND attnum > 0 "
3804 " AND NOT attisdropped "
3805 " LEFT JOIN pg_attrdef ad ON "
3806 " adrelid = c.oid AND adnum = attnum ");
3807
3808 appendStringInfoString(&buf,
3809 "WHERE c.relkind IN ('r', 'v', 'f', 'm') "
3810 " AND n.nspname = ");
3811 deparseStringLiteral(&buf, stmt->remote_schema);
3812
3813 /* Apply restrictions for LIMIT TO and EXCEPT */
3814 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3815 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3816 {
3817 bool first_item = true;
3818
3819 appendStringInfoString(&buf, " AND c.relname ");
3820 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3821 appendStringInfoString(&buf, "NOT ");
3822 appendStringInfoString(&buf, "IN (");
3823
3824 /* Append list of table names within IN clause */
3825 foreach(lc, stmt->table_list)
3826 {
3827 RangeVar *rv = (RangeVar *) lfirst(lc);
3828
3829 if (first_item)
3830 first_item = false;
3831 else
3832 appendStringInfoString(&buf, ", ");
3833 deparseStringLiteral(&buf, rv->relname);
3834 }
3835 appendStringInfoChar(&buf, ')');
3836 }
3837
3838 /* Append ORDER BY at the end of query to ensure output ordering */
3839 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3840
3841 /* Fetch the data */
3842 res = pgfdw_exec_query(conn, buf.data);
3843 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3844 pgfdw_report_error(ERROR, res, conn, false, buf.data);
3845
3846 /* Process results */
3847 numrows = PQntuples(res);
3848 /* note: incrementation of i happens in inner loop's while() test */
3849 for (i = 0; i < numrows;)
3850 {
3851 char *tablename = PQgetvalue(res, i, 0);
3852 bool first_item = true;
3853
3854 resetStringInfo(&buf);
3855 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3856 quote_identifier(tablename));
3857
3858 /* Scan all rows for this table */
3859 do
3860 {
3861 char *attname;
3862 char *typename;
3863 char *attnotnull;
3864 char *attdefault;
3865 char *collname;
3866 char *collnamespace;
3867
3868 /* If table has no columns, we'll see nulls here */
3869 if (PQgetisnull(res, i, 1))
3870 continue;
3871
3872 attname = PQgetvalue(res, i, 1);
3873 typename = PQgetvalue(res, i, 2);
3874 attnotnull = PQgetvalue(res, i, 3);
3875 attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3876 PQgetvalue(res, i, 4);
3877 collname = PQgetisnull(res, i, 5) ? (char *) NULL :
3878 PQgetvalue(res, i, 5);
3879 collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
3880 PQgetvalue(res, i, 6);
3881
3882 if (first_item)
3883 first_item = false;
3884 else
3885 appendStringInfoString(&buf, ",\n");
3886
3887 /* Print column name and type */
3888 appendStringInfo(&buf, " %s %s",
3889 quote_identifier(attname),
3890 typename);
3891
3892 /*
3893 * Add column_name option so that renaming the foreign table's
3894 * column doesn't break the association to the underlying
3895 * column.
3896 */
3897 appendStringInfoString(&buf, " OPTIONS (column_name ");
3898 deparseStringLiteral(&buf, attname);
3899 appendStringInfoChar(&buf, ')');
3900
3901 /* Add COLLATE if needed */
3902 if (import_collate && collname != NULL && collnamespace != NULL)
3903 appendStringInfo(&buf, " COLLATE %s.%s",
3904 quote_identifier(collnamespace),
3905 quote_identifier(collname));
3906
3907 /* Add DEFAULT if needed */
3908 if (import_default && attdefault != NULL)
3909 appendStringInfo(&buf, " DEFAULT %s", attdefault);
3910
3911 /* Add NOT NULL if needed */
3912 if (import_not_null && attnotnull[0] == 't')
3913 appendStringInfoString(&buf, " NOT NULL");
3914 }
3915 while (++i < numrows &&
3916 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
3917
3918 /*
3919 * Add server name and table-level options. We specify remote
3920 * schema and table name as options (the latter to ensure that
3921 * renaming the foreign table doesn't break the association).
3922 */
3923 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
3924 quote_identifier(server->servername));
3925
3926 appendStringInfoString(&buf, "schema_name ");
3927 deparseStringLiteral(&buf, stmt->remote_schema);
3928 appendStringInfoString(&buf, ", table_name ");
3929 deparseStringLiteral(&buf, tablename);
3930
3931 appendStringInfoString(&buf, ");");
3932
3933 commands = lappend(commands, pstrdup(buf.data));
3934 }
3935
3936 /* Clean up */
3937 PQclear(res);
3938 res = NULL;
3939 }
3940 PG_CATCH();
3941 {
3942 if (res)
3943 PQclear(res);
3944 PG_RE_THROW();
3945 }
3946 PG_END_TRY();
3947
3948 ReleaseConnection(conn);
3949
3950 return commands;
3951 }
3952
3953 /*
3954 * Assess whether the join between inner and outer relations can be pushed down
3955 * to the foreign server. As a side effect, save information we obtain in this
3956 * function to PgFdwRelationInfo passed in.
3957 */
3958 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)3959 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
3960 RelOptInfo *outerrel, RelOptInfo *innerrel,
3961 JoinPathExtraData *extra)
3962 {
3963 PgFdwRelationInfo *fpinfo;
3964 PgFdwRelationInfo *fpinfo_o;
3965 PgFdwRelationInfo *fpinfo_i;
3966 ListCell *lc;
3967 List *joinclauses;
3968 List *otherclauses;
3969
3970 /*
3971 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
3972 * Constructing queries representing SEMI and ANTI joins is hard, hence
3973 * not considered right now.
3974 */
3975 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
3976 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
3977 return false;
3978
3979 /*
3980 * If either of the joining relations is marked as unsafe to pushdown, the
3981 * join can not be pushed down.
3982 */
3983 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
3984 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
3985 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
3986 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
3987 !fpinfo_i || !fpinfo_i->pushdown_safe)
3988 return false;
3989
3990 /*
3991 * If joining relations have local conditions, those conditions are
3992 * required to be applied before joining the relations. Hence the join can
3993 * not be pushed down.
3994 */
3995 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
3996 return false;
3997
3998 /* Separate restrict list into join quals and quals on join relation */
3999 if (IS_OUTER_JOIN(jointype))
4000 extract_actual_join_clauses(extra->restrictlist,
4001 joinrel->relids,
4002 &joinclauses, &otherclauses);
4003 else
4004 {
4005 /*
4006 * Unlike an outer join, for inner join, the join result contains only
4007 * the rows which satisfy join clauses, similar to the other clause.
4008 * Hence all clauses can be treated as other quals. This helps to push
4009 * a join down to the foreign server even if some of its join quals
4010 * are not safe to pushdown.
4011 */
4012 otherclauses = extract_actual_clauses(extra->restrictlist, false);
4013 joinclauses = NIL;
4014 }
4015
4016 /* Get foreign server */
4017 fpinfo->server = fpinfo_o->server;
4018
4019 /*
4020 * Copy shippable_extensions before checking whether the foreign join is
4021 * OK, so that we know which quals can be evaluated on the foreign server.
4022 */
4023 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
4024
4025 /* Join quals must be safe to push down. */
4026 foreach(lc, joinclauses)
4027 {
4028 Expr *expr = (Expr *) lfirst(lc);
4029
4030 if (!is_foreign_expr(root, joinrel, expr))
4031 return false;
4032 }
4033
4034 /*
4035 * deparseExplicitTargetList() isn't smart enough to handle anything other
4036 * than a Var. In particular, if there's some PlaceHolderVar that would
4037 * need to be evaluated within this join tree (because there's an upper
4038 * reference to a quantity that may go to NULL as a result of an outer
4039 * join), then we can't try to push the join down because we'll fail when
4040 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4041 * needs to be evaluated *at the top* of this join tree is OK, because we
4042 * can do that locally after fetching the results from the remote side.
4043 */
4044 foreach(lc, root->placeholder_list)
4045 {
4046 PlaceHolderInfo *phinfo = lfirst(lc);
4047 Relids relids = joinrel->relids;
4048
4049 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4050 bms_nonempty_difference(relids, phinfo->ph_eval_at))
4051 return false;
4052 }
4053
4054 /* Save the join clauses, for later use. */
4055 fpinfo->joinclauses = joinclauses;
4056
4057 /*
4058 * Other clauses are applied after the join has been performed and thus
4059 * need not be all pushable. We will push those which can be pushed to
4060 * reduce the number of rows fetched from the foreign server. Rest of them
4061 * will be applied locally after fetching join result. Add them to fpinfo
4062 * so that other joins involving this joinrel will know that this joinrel
4063 * has local clauses.
4064 */
4065 foreach(lc, otherclauses)
4066 {
4067 Expr *expr = (Expr *) lfirst(lc);
4068
4069 if (!is_foreign_expr(root, joinrel, expr))
4070 fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4071 else
4072 fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4073 }
4074
4075 fpinfo->outerrel = outerrel;
4076 fpinfo->innerrel = innerrel;
4077 fpinfo->jointype = jointype;
4078
4079 /*
4080 * Pull the other remote conditions from the joining relations into join
4081 * clauses or other remote clauses (remote_conds) of this relation
4082 * wherever possible. This avoids building subqueries at every join step,
4083 * which is not currently supported by the deparser logic.
4084 *
4085 * For an inner join, clauses from both the relations are added to the
4086 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4087 * the outer side are added to remote_conds since those can be evaluated
4088 * after the join is evaluated. The clauses from inner side are added to
4089 * the joinclauses, since they need to evaluated while constructing the
4090 * join.
4091 *
4092 * For a FULL OUTER JOIN, the other clauses from either relation can not
4093 * be added to the joinclauses or remote_conds, since each relation acts
4094 * as an outer relation for the other. Consider such full outer join as
4095 * unshippable because of the reasons mentioned above in this comment.
4096 *
4097 * The joining sides can not have local conditions, thus no need to test
4098 * shippability of the clauses being pulled up.
4099 */
4100 switch (jointype)
4101 {
4102 case JOIN_INNER:
4103 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4104 list_copy(fpinfo_i->remote_conds));
4105 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4106 list_copy(fpinfo_o->remote_conds));
4107 break;
4108
4109 case JOIN_LEFT:
4110 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4111 list_copy(fpinfo_i->remote_conds));
4112 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4113 list_copy(fpinfo_o->remote_conds));
4114 break;
4115
4116 case JOIN_RIGHT:
4117 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4118 list_copy(fpinfo_o->remote_conds));
4119 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4120 list_copy(fpinfo_i->remote_conds));
4121 break;
4122
4123 case JOIN_FULL:
4124 if (fpinfo_i->remote_conds || fpinfo_o->remote_conds)
4125 return false;
4126 break;
4127
4128 default:
4129 /* Should not happen, we have just check this above */
4130 elog(ERROR, "unsupported join type %d", jointype);
4131 }
4132
4133 /*
4134 * For an inner join, all restrictions can be treated alike. Treating the
4135 * pushed down conditions as join conditions allows a top level full outer
4136 * join to be deparsed without requiring subqueries.
4137 */
4138 if (jointype == JOIN_INNER)
4139 {
4140 Assert(!fpinfo->joinclauses);
4141 fpinfo->joinclauses = fpinfo->remote_conds;
4142 fpinfo->remote_conds = NIL;
4143 }
4144
4145 /* Mark that this join can be pushed down safely */
4146 fpinfo->pushdown_safe = true;
4147
4148 /*
4149 * If user is willing to estimate cost for a scan of either of the joining
4150 * relations using EXPLAIN, he intends to estimate scans on that relation
4151 * more accurately. Then, it makes sense to estimate the cost the join
4152 * with that relation more accurately using EXPLAIN.
4153 */
4154 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4155 fpinfo_i->use_remote_estimate;
4156
4157 /* Get user mapping */
4158 if (fpinfo->use_remote_estimate)
4159 {
4160 if (fpinfo_o->use_remote_estimate)
4161 fpinfo->user = fpinfo_o->user;
4162 else
4163 fpinfo->user = fpinfo_i->user;
4164 }
4165 else
4166 fpinfo->user = NULL;
4167
4168 /*
4169 * Since both the joining relations come from the same server, the server
4170 * level options should have same value for both the relations. Pick from
4171 * any side.
4172 */
4173 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4174 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4175
4176 /*
4177 * Set cached relation costs to some negative value, so that we can detect
4178 * when they are set to some sensible costs, during one (usually the
4179 * first) of the calls to estimate_path_cost_size().
4180 */
4181 fpinfo->rel_startup_cost = -1;
4182 fpinfo->rel_total_cost = -1;
4183
4184 /*
4185 * Set fetch size to maximum of the joining sides, since we are expecting
4186 * the rows returned by the join to be proportional to the relation sizes.
4187 */
4188 if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
4189 fpinfo->fetch_size = fpinfo_o->fetch_size;
4190 else
4191 fpinfo->fetch_size = fpinfo_i->fetch_size;
4192
4193 /*
4194 * Set the string describing this join relation to be used in EXPLAIN
4195 * output of corresponding ForeignScan.
4196 */
4197 fpinfo->relation_name = makeStringInfo();
4198 appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4199 fpinfo_o->relation_name->data,
4200 get_jointype_name(fpinfo->jointype),
4201 fpinfo_i->relation_name->data);
4202
4203 return true;
4204 }
4205
4206 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)4207 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
4208 Path *epq_path)
4209 {
4210 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4211 ListCell *lc;
4212
4213 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4214
4215 /* Create one path for each set of pathkeys we found above. */
4216 foreach(lc, useful_pathkeys_list)
4217 {
4218 double rows;
4219 int width;
4220 Cost startup_cost;
4221 Cost total_cost;
4222 List *useful_pathkeys = lfirst(lc);
4223 Path *sorted_epq_path;
4224
4225 estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4226 &rows, &width, &startup_cost, &total_cost);
4227
4228 /*
4229 * The EPQ path must be at least as well sorted as the path itself,
4230 * in case it gets used as input to a mergejoin.
4231 */
4232 sorted_epq_path = epq_path;
4233 if (sorted_epq_path != NULL &&
4234 !pathkeys_contained_in(useful_pathkeys,
4235 sorted_epq_path->pathkeys))
4236 sorted_epq_path = (Path *)
4237 create_sort_path(root,
4238 rel,
4239 sorted_epq_path,
4240 useful_pathkeys,
4241 -1.0);
4242
4243 add_path(rel, (Path *)
4244 create_foreignscan_path(root, rel,
4245 NULL,
4246 rows,
4247 startup_cost,
4248 total_cost,
4249 useful_pathkeys,
4250 rel->lateral_relids,
4251 sorted_epq_path,
4252 NIL));
4253 }
4254 }
4255
4256 /*
4257 * postgresGetForeignJoinPaths
4258 * Add possible ForeignPath to joinrel, if join is safe to push down.
4259 */
4260 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)4261 postgresGetForeignJoinPaths(PlannerInfo *root,
4262 RelOptInfo *joinrel,
4263 RelOptInfo *outerrel,
4264 RelOptInfo *innerrel,
4265 JoinType jointype,
4266 JoinPathExtraData *extra)
4267 {
4268 PgFdwRelationInfo *fpinfo;
4269 ForeignPath *joinpath;
4270 double rows;
4271 int width;
4272 Cost startup_cost;
4273 Cost total_cost;
4274 Path *epq_path; /* Path to create plan to be executed when
4275 * EvalPlanQual gets triggered. */
4276
4277 /*
4278 * Skip if this join combination has been considered already.
4279 */
4280 if (joinrel->fdw_private)
4281 return;
4282
4283 /*
4284 * This code does not work for joins with lateral references, since those
4285 * must have parameterized paths, which we don't generate yet.
4286 */
4287 if (!bms_is_empty(joinrel->lateral_relids))
4288 return;
4289
4290 /*
4291 * Create unfinished PgFdwRelationInfo entry which is used to indicate
4292 * that the join relation is already considered, so that we won't waste
4293 * time in judging safety of join pushdown and adding the same paths again
4294 * if found safe. Once we know that this join can be pushed down, we fill
4295 * the entry.
4296 */
4297 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4298 fpinfo->pushdown_safe = false;
4299 joinrel->fdw_private = fpinfo;
4300 /* attrs_used is only for base relations. */
4301 fpinfo->attrs_used = NULL;
4302
4303 /*
4304 * If there is a possibility that EvalPlanQual will be executed, we need
4305 * to be able to reconstruct the row using scans of the base relations.
4306 * GetExistingLocalJoinPath will find a suitable path for this purpose in
4307 * the path list of the joinrel, if one exists. We must be careful to
4308 * call it before adding any ForeignPath, since the ForeignPath might
4309 * dominate the only suitable local path available. We also do it before
4310 * calling foreign_join_ok(), since that function updates fpinfo and marks
4311 * it as pushable if the join is found to be pushable.
4312 */
4313 if (root->parse->commandType == CMD_DELETE ||
4314 root->parse->commandType == CMD_UPDATE ||
4315 root->rowMarks)
4316 {
4317 epq_path = GetExistingLocalJoinPath(joinrel);
4318 if (!epq_path)
4319 {
4320 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4321 return;
4322 }
4323 }
4324 else
4325 epq_path = NULL;
4326
4327 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4328 {
4329 /* Free path required for EPQ if we copied one; we don't need it now */
4330 if (epq_path)
4331 pfree(epq_path);
4332 return;
4333 }
4334
4335 /*
4336 * Compute the selectivity and cost of the local_conds, so we don't have
4337 * to do it over again for each path. The best we can do for these
4338 * conditions is to estimate selectivity on the basis of local statistics.
4339 * The local conditions are applied after the join has been computed on
4340 * the remote side like quals in WHERE clause, so pass jointype as
4341 * JOIN_INNER.
4342 */
4343 fpinfo->local_conds_sel = clauselist_selectivity(root,
4344 fpinfo->local_conds,
4345 0,
4346 JOIN_INNER,
4347 NULL);
4348 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4349
4350 /*
4351 * If we are going to estimate costs locally, estimate the join clause
4352 * selectivity here while we have special join info.
4353 */
4354 if (!fpinfo->use_remote_estimate)
4355 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4356 0, fpinfo->jointype,
4357 extra->sjinfo);
4358
4359 /* Estimate costs for bare join relation */
4360 estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4361 &width, &startup_cost, &total_cost);
4362 /* Now update this information in the joinrel */
4363 joinrel->rows = rows;
4364 joinrel->reltarget->width = width;
4365 fpinfo->rows = rows;
4366 fpinfo->width = width;
4367 fpinfo->startup_cost = startup_cost;
4368 fpinfo->total_cost = total_cost;
4369
4370 /*
4371 * Create a new join path and add it to the joinrel which represents a
4372 * join between foreign tables.
4373 */
4374 joinpath = create_foreignscan_path(root,
4375 joinrel,
4376 NULL, /* default pathtarget */
4377 rows,
4378 startup_cost,
4379 total_cost,
4380 NIL, /* no pathkeys */
4381 joinrel->lateral_relids,
4382 epq_path,
4383 NULL); /* no fdw_private */
4384
4385 /* Add generated path into joinrel by add_path(). */
4386 add_path(joinrel, (Path *) joinpath);
4387
4388 /* Consider pathkeys for the join relation */
4389 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4390
4391 /* XXX Consider parameterized paths for the join relation */
4392 }
4393
4394 /*
4395 * Create a tuple from the specified row of the PGresult.
4396 *
4397 * rel is the local representation of the foreign table, attinmeta is
4398 * conversion data for the rel's tupdesc, and retrieved_attrs is an
4399 * integer list of the table column numbers present in the PGresult.
4400 * fsstate is the ForeignScan plan node's execution state.
4401 * temp_context is a working context that can be reset after each tuple.
4402 *
4403 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
4404 * if we're processing a remote join, while fsstate is NULL in a non-query
4405 * context such as ANALYZE, or if we're processing a non-scan query node.
4406 */
4407 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)4408 make_tuple_from_result_row(PGresult *res,
4409 int row,
4410 Relation rel,
4411 AttInMetadata *attinmeta,
4412 List *retrieved_attrs,
4413 ForeignScanState *fsstate,
4414 MemoryContext temp_context)
4415 {
4416 HeapTuple tuple;
4417 TupleDesc tupdesc;
4418 Datum *values;
4419 bool *nulls;
4420 ItemPointer ctid = NULL;
4421 ConversionLocation errpos;
4422 ErrorContextCallback errcallback;
4423 MemoryContext oldcontext;
4424 ListCell *lc;
4425 int j;
4426
4427 Assert(row < PQntuples(res));
4428
4429 /*
4430 * Do the following work in a temp context that we reset after each tuple.
4431 * This cleans up not only the data we have direct access to, but any
4432 * cruft the I/O functions might leak.
4433 */
4434 oldcontext = MemoryContextSwitchTo(temp_context);
4435
4436 /*
4437 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
4438 * provided, otherwise look to the scan node's ScanTupleSlot.
4439 */
4440 if (rel)
4441 tupdesc = RelationGetDescr(rel);
4442 else
4443 {
4444 PgFdwScanState *fdw_sstate;
4445
4446 Assert(fsstate);
4447 fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4448 tupdesc = fdw_sstate->tupdesc;
4449 }
4450
4451 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4452 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4453 /* Initialize to nulls for any columns not present in result */
4454 memset(nulls, true, tupdesc->natts * sizeof(bool));
4455
4456 /*
4457 * Set up and install callback to report where conversion error occurs.
4458 */
4459 errpos.cur_attno = 0;
4460 errpos.rel = rel;
4461 errpos.fsstate = fsstate;
4462 errcallback.callback = conversion_error_callback;
4463 errcallback.arg = (void *) &errpos;
4464 errcallback.previous = error_context_stack;
4465 error_context_stack = &errcallback;
4466
4467 /*
4468 * i indexes columns in the relation, j indexes columns in the PGresult.
4469 */
4470 j = 0;
4471 foreach(lc, retrieved_attrs)
4472 {
4473 int i = lfirst_int(lc);
4474 char *valstr;
4475
4476 /* fetch next column's textual value */
4477 if (PQgetisnull(res, row, j))
4478 valstr = NULL;
4479 else
4480 valstr = PQgetvalue(res, row, j);
4481
4482 /* convert value to internal representation */
4483 errpos.cur_attno = i;
4484 if (i > 0)
4485 {
4486 /* ordinary column */
4487 Assert(i <= tupdesc->natts);
4488 nulls[i - 1] = (valstr == NULL);
4489 /* Apply the input function even to nulls, to support domains */
4490 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4491 valstr,
4492 attinmeta->attioparams[i - 1],
4493 attinmeta->atttypmods[i - 1]);
4494 }
4495 else if (i == SelfItemPointerAttributeNumber)
4496 {
4497 /* ctid --- note we ignore any other system column in result */
4498 if (valstr != NULL)
4499 {
4500 Datum datum;
4501
4502 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
4503 ctid = (ItemPointer) DatumGetPointer(datum);
4504 }
4505 }
4506 errpos.cur_attno = 0;
4507
4508 j++;
4509 }
4510
4511 /* Uninstall error context callback. */
4512 error_context_stack = errcallback.previous;
4513
4514 /*
4515 * Check we got the expected number of columns. Note: j == 0 and
4516 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
4517 */
4518 if (j > 0 && j != PQnfields(res))
4519 elog(ERROR, "remote query result does not match the foreign table");
4520
4521 /*
4522 * Build the result tuple in caller's memory context.
4523 */
4524 MemoryContextSwitchTo(oldcontext);
4525
4526 tuple = heap_form_tuple(tupdesc, values, nulls);
4527
4528 /*
4529 * If we have a CTID to return, install it in both t_self and t_ctid.
4530 * t_self is the normal place, but if the tuple is converted to a
4531 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
4532 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
4533 */
4534 if (ctid)
4535 tuple->t_self = tuple->t_data->t_ctid = *ctid;
4536
4537 /*
4538 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
4539 * heap_form_tuple. heap_form_tuple actually creates the tuple with
4540 * DatumTupleFields, not HeapTupleFields, but the executor expects
4541 * HeapTupleFields and will happily extract system columns on that
4542 * assumption. If we don't do this then, for example, the tuple length
4543 * ends up in the xmin field, which isn't what we want.
4544 */
4545 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
4546 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
4547 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
4548
4549 /* Clean up */
4550 MemoryContextReset(temp_context);
4551
4552 return tuple;
4553 }
4554
4555 /*
4556 * Callback function which is called when error occurs during column value
4557 * conversion. Print names of column and relation.
4558 *
4559 * Note that this function mustn't do any catalog lookups, since we are in
4560 * an already-failed transaction. Fortunately, we can get the needed info
4561 * from the relation or the query's rangetable instead.
4562 */
4563 static void
conversion_error_callback(void * arg)4564 conversion_error_callback(void *arg)
4565 {
4566 ConversionLocation *errpos = (ConversionLocation *) arg;
4567 Relation rel = errpos->rel;
4568 ForeignScanState *fsstate = errpos->fsstate;
4569 const char *attname = NULL;
4570 const char *relname = NULL;
4571 bool is_wholerow = false;
4572
4573 /*
4574 * If we're in a scan node, always use aliases from the rangetable, for
4575 * consistency between the simple-relation and remote-join cases. Look at
4576 * the relation's tupdesc only if we're not in a scan node.
4577 */
4578 if (fsstate)
4579 {
4580 /* ForeignScan case */
4581 ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan;
4582 int varno = 0;
4583 AttrNumber colno = 0;
4584
4585 if (fsplan->scan.scanrelid > 0)
4586 {
4587 /* error occurred in a scan against a foreign table */
4588 varno = fsplan->scan.scanrelid;
4589 colno = errpos->cur_attno;
4590 }
4591 else
4592 {
4593 /* error occurred in a scan against a foreign join */
4594 TargetEntry *tle;
4595
4596 Assert(IsA(fsplan, ForeignScan));
4597 tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist,
4598 errpos->cur_attno - 1);
4599 Assert(IsA(tle, TargetEntry));
4600
4601 /*
4602 * Target list can have Vars and expressions. For Vars, we can
4603 * get some information, however for expressions we can't. Thus
4604 * for expressions, just show generic context message.
4605 */
4606 if (IsA(tle->expr, Var))
4607 {
4608 Var *var = (Var *) tle->expr;
4609
4610 varno = var->varno;
4611 colno = var->varattno;
4612 }
4613 }
4614
4615 if (varno > 0)
4616 {
4617 EState *estate = fsstate->ss.ps.state;
4618 RangeTblEntry *rte = rt_fetch(varno, estate->es_range_table);
4619
4620 relname = rte->eref->aliasname;
4621
4622 if (colno == 0)
4623 is_wholerow = true;
4624 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
4625 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
4626 else if (colno == SelfItemPointerAttributeNumber)
4627 attname = "ctid";
4628 else if (colno == ObjectIdAttributeNumber)
4629 attname = "oid";
4630 }
4631 }
4632 else if (rel)
4633 {
4634 /* Non-ForeignScan case (we should always have a rel here) */
4635 TupleDesc tupdesc = RelationGetDescr(rel);
4636
4637 relname = RelationGetRelationName(rel);
4638 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
4639 {
4640 Form_pg_attribute attr = TupleDescAttr(tupdesc,
4641 errpos->cur_attno - 1);
4642
4643 attname = NameStr(attr->attname);
4644 }
4645 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
4646 attname = "ctid";
4647 }
4648
4649 if (relname && is_wholerow)
4650 errcontext("whole-row reference to foreign table \"%s\"", relname);
4651 else if (relname && attname)
4652 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
4653 else
4654 errcontext("processing expression at position %d in select list",
4655 errpos->cur_attno);
4656 }
4657
4658 /*
4659 * Find an equivalence class member expression, all of whose Vars, come from
4660 * the indicated relation.
4661 */
4662 extern Expr *
find_em_expr_for_rel(EquivalenceClass * ec,RelOptInfo * rel)4663 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
4664 {
4665 ListCell *lc_em;
4666
4667 foreach(lc_em, ec->ec_members)
4668 {
4669 EquivalenceMember *em = lfirst(lc_em);
4670
4671 if (bms_is_subset(em->em_relids, rel->relids) &&
4672 !bms_is_empty(em->em_relids))
4673 {
4674 /*
4675 * If there is more than one equivalence member whose Vars are
4676 * taken entirely from this relation, we'll be content to choose
4677 * any one of those.
4678 */
4679 return em->em_expr;
4680 }
4681 }
4682
4683 /* We didn't find any suitable equivalence class expression */
4684 return NULL;
4685 }
4686