1 /*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include <limits.h>
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "commands/defrem.h"
22 #include "commands/explain.h"
23 #include "commands/vacuum.h"
24 #include "executor/execAsync.h"
25 #include "foreign/fdwapi.h"
26 #include "funcapi.h"
27 #include "miscadmin.h"
28 #include "nodes/makefuncs.h"
29 #include "nodes/nodeFuncs.h"
30 #include "optimizer/appendinfo.h"
31 #include "optimizer/clauses.h"
32 #include "optimizer/cost.h"
33 #include "optimizer/optimizer.h"
34 #include "optimizer/pathnode.h"
35 #include "optimizer/paths.h"
36 #include "optimizer/planmain.h"
37 #include "optimizer/prep.h"
38 #include "optimizer/restrictinfo.h"
39 #include "optimizer/tlist.h"
40 #include "parser/parsetree.h"
41 #include "postgres_fdw.h"
42 #include "storage/latch.h"
43 #include "utils/builtins.h"
44 #include "utils/float.h"
45 #include "utils/guc.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/rel.h"
49 #include "utils/sampling.h"
50 #include "utils/selfuncs.h"
51
52 PG_MODULE_MAGIC;
53
54 /* Default CPU cost to start up a foreign query. */
55 #define DEFAULT_FDW_STARTUP_COST 100.0
56
57 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
58 #define DEFAULT_FDW_TUPLE_COST 0.01
59
60 /* If no remote estimates, assume a sort costs 20% extra */
61 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
62
63 /*
64 * Indexes of FDW-private information stored in fdw_private lists.
65 *
66 * These items are indexed with the enum FdwScanPrivateIndex, so an item
67 * can be fetched with list_nth(). For example, to get the SELECT statement:
68 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
69 */
70 enum FdwScanPrivateIndex
71 {
72 /* SQL statement to execute remotely (as a String node) */
73 FdwScanPrivateSelectSql,
74 /* Integer list of attribute numbers retrieved by the SELECT */
75 FdwScanPrivateRetrievedAttrs,
76 /* Integer representing the desired fetch_size */
77 FdwScanPrivateFetchSize,
78
79 /*
80 * String describing join i.e. names of relations being joined and types
81 * of join, added when the scan is join
82 */
83 FdwScanPrivateRelations
84 };
85
86 /*
87 * Similarly, this enum describes what's kept in the fdw_private list for
88 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
89 *
90 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
91 * 2) Integer list of target attribute numbers for INSERT/UPDATE
92 * (NIL for a DELETE)
93 * 3) Length till the end of VALUES clause for INSERT
94 * (-1 for a DELETE/UPDATE)
95 * 4) Boolean flag showing if the remote query has a RETURNING clause
96 * 5) Integer list of attribute numbers retrieved by RETURNING, if any
97 */
98 enum FdwModifyPrivateIndex
99 {
100 /* SQL statement to execute remotely (as a String node) */
101 FdwModifyPrivateUpdateSql,
102 /* Integer list of target attribute numbers for INSERT/UPDATE */
103 FdwModifyPrivateTargetAttnums,
104 /* Length till the end of VALUES clause (as an integer Value node) */
105 FdwModifyPrivateLen,
106 /* has-returning flag (as an integer Value node) */
107 FdwModifyPrivateHasReturning,
108 /* Integer list of attribute numbers retrieved by RETURNING */
109 FdwModifyPrivateRetrievedAttrs
110 };
111
112 /*
113 * Similarly, this enum describes what's kept in the fdw_private list for
114 * a ForeignScan node that modifies a foreign table directly. We store:
115 *
116 * 1) UPDATE/DELETE statement text to be sent to the remote server
117 * 2) Boolean flag showing if the remote query has a RETURNING clause
118 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
119 * 4) Boolean flag showing if we set the command es_processed
120 */
121 enum FdwDirectModifyPrivateIndex
122 {
123 /* SQL statement to execute remotely (as a String node) */
124 FdwDirectModifyPrivateUpdateSql,
125 /* has-returning flag (as an integer Value node) */
126 FdwDirectModifyPrivateHasReturning,
127 /* Integer list of attribute numbers retrieved by RETURNING */
128 FdwDirectModifyPrivateRetrievedAttrs,
129 /* set-processed flag (as an integer Value node) */
130 FdwDirectModifyPrivateSetProcessed
131 };
132
133 /*
134 * Execution state of a foreign scan using postgres_fdw.
135 */
136 typedef struct PgFdwScanState
137 {
138 Relation rel; /* relcache entry for the foreign table. NULL
139 * for a foreign join scan. */
140 TupleDesc tupdesc; /* tuple descriptor of scan */
141 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
142
143 /* extracted fdw_private data */
144 char *query; /* text of SELECT command */
145 List *retrieved_attrs; /* list of retrieved attribute numbers */
146
147 /* for remote query execution */
148 PGconn *conn; /* connection for the scan */
149 PgFdwConnState *conn_state; /* extra per-connection state */
150 unsigned int cursor_number; /* quasi-unique ID for my cursor */
151 bool cursor_exists; /* have we created the cursor? */
152 int numParams; /* number of parameters passed to query */
153 FmgrInfo *param_flinfo; /* output conversion functions for them */
154 List *param_exprs; /* executable expressions for param values */
155 const char **param_values; /* textual values of query parameters */
156
157 /* for storing result tuples */
158 HeapTuple *tuples; /* array of currently-retrieved tuples */
159 int num_tuples; /* # of tuples in array */
160 int next_tuple; /* index of next one to return */
161
162 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
163 int fetch_ct_2; /* Min(# of fetches done, 2) */
164 bool eof_reached; /* true if last fetch reached EOF */
165
166 /* for asynchronous execution */
167 bool async_capable; /* engage asynchronous-capable logic? */
168
169 /* working memory contexts */
170 MemoryContext batch_cxt; /* context holding current batch of tuples */
171 MemoryContext temp_cxt; /* context for per-tuple temporary data */
172
173 int fetch_size; /* number of tuples per fetch */
174 } PgFdwScanState;
175
176 /*
177 * Execution state of a foreign insert/update/delete operation.
178 */
179 typedef struct PgFdwModifyState
180 {
181 Relation rel; /* relcache entry for the foreign table */
182 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
183
184 /* for remote query execution */
185 PGconn *conn; /* connection for the scan */
186 PgFdwConnState *conn_state; /* extra per-connection state */
187 char *p_name; /* name of prepared statement, if created */
188
189 /* extracted fdw_private data */
190 char *query; /* text of INSERT/UPDATE/DELETE command */
191 char *orig_query; /* original text of INSERT command */
192 List *target_attrs; /* list of target attribute numbers */
193 int values_end; /* length up to the end of VALUES */
194 int batch_size; /* value of FDW option "batch_size" */
195 bool has_returning; /* is there a RETURNING clause? */
196 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
197
198 /* info about parameters for prepared statement */
199 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
200 int p_nums; /* number of parameters to transmit */
201 FmgrInfo *p_flinfo; /* output conversion functions for them */
202
203 /* batch operation stuff */
204 int num_slots; /* number of slots to insert */
205
206 /* working memory context */
207 MemoryContext temp_cxt; /* context for per-tuple temporary data */
208
209 /* for update row movement if subplan result rel */
210 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
211 * created */
212 } PgFdwModifyState;
213
214 /*
215 * Execution state of a foreign scan that modifies a foreign table directly.
216 */
217 typedef struct PgFdwDirectModifyState
218 {
219 Relation rel; /* relcache entry for the foreign table */
220 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
221
222 /* extracted fdw_private data */
223 char *query; /* text of UPDATE/DELETE command */
224 bool has_returning; /* is there a RETURNING clause? */
225 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
226 bool set_processed; /* do we set the command es_processed? */
227
228 /* for remote query execution */
229 PGconn *conn; /* connection for the update */
230 PgFdwConnState *conn_state; /* extra per-connection state */
231 int numParams; /* number of parameters passed to query */
232 FmgrInfo *param_flinfo; /* output conversion functions for them */
233 List *param_exprs; /* executable expressions for param values */
234 const char **param_values; /* textual values of query parameters */
235
236 /* for storing result tuples */
237 PGresult *result; /* result for query */
238 int num_tuples; /* # of result tuples */
239 int next_tuple; /* index of next one to return */
240 Relation resultRel; /* relcache entry for the target relation */
241 AttrNumber *attnoMap; /* array of attnums of input user columns */
242 AttrNumber ctidAttno; /* attnum of input ctid column */
243 AttrNumber oidAttno; /* attnum of input oid column */
244 bool hasSystemCols; /* are there system columns of resultRel? */
245
246 /* working memory context */
247 MemoryContext temp_cxt; /* context for per-tuple temporary data */
248 } PgFdwDirectModifyState;
249
250 /*
251 * Workspace for analyzing a foreign table.
252 */
253 typedef struct PgFdwAnalyzeState
254 {
255 Relation rel; /* relcache entry for the foreign table */
256 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
257 List *retrieved_attrs; /* attr numbers retrieved by query */
258
259 /* collected sample rows */
260 HeapTuple *rows; /* array of size targrows */
261 int targrows; /* target # of sample rows */
262 int numrows; /* # of sample rows collected */
263
264 /* for random sampling */
265 double samplerows; /* # of rows fetched */
266 double rowstoskip; /* # of rows to skip before next sample */
267 ReservoirStateData rstate; /* state for reservoir sampling */
268
269 /* working memory contexts */
270 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
271 MemoryContext temp_cxt; /* context for per-tuple temporary data */
272 } PgFdwAnalyzeState;
273
274 /*
275 * This enum describes what's kept in the fdw_private list for a ForeignPath.
276 * We store:
277 *
278 * 1) Boolean flag showing if the remote query has the final sort
279 * 2) Boolean flag showing if the remote query has the LIMIT clause
280 */
281 enum FdwPathPrivateIndex
282 {
283 /* has-final-sort flag (as an integer Value node) */
284 FdwPathPrivateHasFinalSort,
285 /* has-limit flag (as an integer Value node) */
286 FdwPathPrivateHasLimit
287 };
288
289 /* Struct for extra information passed to estimate_path_cost_size() */
290 typedef struct
291 {
292 PathTarget *target;
293 bool has_final_sort;
294 bool has_limit;
295 double limit_tuples;
296 int64 count_est;
297 int64 offset_est;
298 } PgFdwPathExtraData;
299
300 /*
301 * Identify the attribute where data conversion fails.
302 */
303 typedef struct ConversionLocation
304 {
305 AttrNumber cur_attno; /* attribute number being processed, or 0 */
306 Relation rel; /* foreign table being processed, or NULL */
307 ForeignScanState *fsstate; /* plan node being processed, or NULL */
308 } ConversionLocation;
309
310 /* Callback argument for ec_member_matches_foreign */
311 typedef struct
312 {
313 Expr *current; /* current expr, or NULL if not yet found */
314 List *already_used; /* expressions already dealt with */
315 } ec_member_foreign_arg;
316
317 /*
318 * SQL functions
319 */
320 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
321
322 /*
323 * FDW callback routines
324 */
325 static void postgresGetForeignRelSize(PlannerInfo *root,
326 RelOptInfo *baserel,
327 Oid foreigntableid);
328 static void postgresGetForeignPaths(PlannerInfo *root,
329 RelOptInfo *baserel,
330 Oid foreigntableid);
331 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
332 RelOptInfo *foreignrel,
333 Oid foreigntableid,
334 ForeignPath *best_path,
335 List *tlist,
336 List *scan_clauses,
337 Plan *outer_plan);
338 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
339 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
340 static void postgresReScanForeignScan(ForeignScanState *node);
341 static void postgresEndForeignScan(ForeignScanState *node);
342 static void postgresAddForeignUpdateTargets(PlannerInfo *root,
343 Index rtindex,
344 RangeTblEntry *target_rte,
345 Relation target_relation);
346 static List *postgresPlanForeignModify(PlannerInfo *root,
347 ModifyTable *plan,
348 Index resultRelation,
349 int subplan_index);
350 static void postgresBeginForeignModify(ModifyTableState *mtstate,
351 ResultRelInfo *resultRelInfo,
352 List *fdw_private,
353 int subplan_index,
354 int eflags);
355 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
356 ResultRelInfo *resultRelInfo,
357 TupleTableSlot *slot,
358 TupleTableSlot *planSlot);
359 static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
360 ResultRelInfo *resultRelInfo,
361 TupleTableSlot **slots,
362 TupleTableSlot **planSlots,
363 int *numSlots);
364 static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
365 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
366 ResultRelInfo *resultRelInfo,
367 TupleTableSlot *slot,
368 TupleTableSlot *planSlot);
369 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
370 ResultRelInfo *resultRelInfo,
371 TupleTableSlot *slot,
372 TupleTableSlot *planSlot);
373 static void postgresEndForeignModify(EState *estate,
374 ResultRelInfo *resultRelInfo);
375 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
376 ResultRelInfo *resultRelInfo);
377 static void postgresEndForeignInsert(EState *estate,
378 ResultRelInfo *resultRelInfo);
379 static int postgresIsForeignRelUpdatable(Relation rel);
380 static bool postgresPlanDirectModify(PlannerInfo *root,
381 ModifyTable *plan,
382 Index resultRelation,
383 int subplan_index);
384 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
385 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
386 static void postgresEndDirectModify(ForeignScanState *node);
387 static void postgresExplainForeignScan(ForeignScanState *node,
388 ExplainState *es);
389 static void postgresExplainForeignModify(ModifyTableState *mtstate,
390 ResultRelInfo *rinfo,
391 List *fdw_private,
392 int subplan_index,
393 ExplainState *es);
394 static void postgresExplainDirectModify(ForeignScanState *node,
395 ExplainState *es);
396 static void postgresExecForeignTruncate(List *rels,
397 DropBehavior behavior,
398 bool restart_seqs);
399 static bool postgresAnalyzeForeignTable(Relation relation,
400 AcquireSampleRowsFunc *func,
401 BlockNumber *totalpages);
402 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
403 Oid serverOid);
404 static void postgresGetForeignJoinPaths(PlannerInfo *root,
405 RelOptInfo *joinrel,
406 RelOptInfo *outerrel,
407 RelOptInfo *innerrel,
408 JoinType jointype,
409 JoinPathExtraData *extra);
410 static bool postgresRecheckForeignScan(ForeignScanState *node,
411 TupleTableSlot *slot);
412 static void postgresGetForeignUpperPaths(PlannerInfo *root,
413 UpperRelationKind stage,
414 RelOptInfo *input_rel,
415 RelOptInfo *output_rel,
416 void *extra);
417 static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
418 static void postgresForeignAsyncRequest(AsyncRequest *areq);
419 static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
420 static void postgresForeignAsyncNotify(AsyncRequest *areq);
421
422 /*
423 * Helper functions
424 */
425 static void estimate_path_cost_size(PlannerInfo *root,
426 RelOptInfo *foreignrel,
427 List *param_join_conds,
428 List *pathkeys,
429 PgFdwPathExtraData *fpextra,
430 double *p_rows, int *p_width,
431 Cost *p_startup_cost, Cost *p_total_cost);
432 static void get_remote_estimate(const char *sql,
433 PGconn *conn,
434 double *rows,
435 int *width,
436 Cost *startup_cost,
437 Cost *total_cost);
438 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
439 List *pathkeys,
440 double retrieved_rows,
441 double width,
442 double limit_tuples,
443 Cost *p_startup_cost,
444 Cost *p_run_cost);
445 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
446 EquivalenceClass *ec, EquivalenceMember *em,
447 void *arg);
448 static void create_cursor(ForeignScanState *node);
449 static void fetch_more_data(ForeignScanState *node);
450 static void close_cursor(PGconn *conn, unsigned int cursor_number,
451 PgFdwConnState *conn_state);
452 static PgFdwModifyState *create_foreign_modify(EState *estate,
453 RangeTblEntry *rte,
454 ResultRelInfo *resultRelInfo,
455 CmdType operation,
456 Plan *subplan,
457 char *query,
458 List *target_attrs,
459 int len,
460 bool has_returning,
461 List *retrieved_attrs);
462 static TupleTableSlot **execute_foreign_modify(EState *estate,
463 ResultRelInfo *resultRelInfo,
464 CmdType operation,
465 TupleTableSlot **slots,
466 TupleTableSlot **planSlots,
467 int *numSlots);
468 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
469 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
470 ItemPointer tupleid,
471 TupleTableSlot **slots,
472 int numSlots);
473 static void store_returning_result(PgFdwModifyState *fmstate,
474 TupleTableSlot *slot, PGresult *res);
475 static void finish_foreign_modify(PgFdwModifyState *fmstate);
476 static void deallocate_query(PgFdwModifyState *fmstate);
477 static List *build_remote_returning(Index rtindex, Relation rel,
478 List *returningList);
479 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
480 static void execute_dml_stmt(ForeignScanState *node);
481 static TupleTableSlot *get_returning_data(ForeignScanState *node);
482 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
483 List *fdw_scan_tlist,
484 Index rtindex);
485 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
486 ResultRelInfo *resultRelInfo,
487 TupleTableSlot *slot,
488 EState *estate);
489 static void prepare_query_params(PlanState *node,
490 List *fdw_exprs,
491 int numParams,
492 FmgrInfo **param_flinfo,
493 List **param_exprs,
494 const char ***param_values);
495 static void process_query_params(ExprContext *econtext,
496 FmgrInfo *param_flinfo,
497 List *param_exprs,
498 const char **param_values);
499 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
500 HeapTuple *rows, int targrows,
501 double *totalrows,
502 double *totaldeadrows);
503 static void analyze_row_processor(PGresult *res, int row,
504 PgFdwAnalyzeState *astate);
505 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
506 static void fetch_more_data_begin(AsyncRequest *areq);
507 static void complete_pending_request(AsyncRequest *areq);
508 static HeapTuple make_tuple_from_result_row(PGresult *res,
509 int row,
510 Relation rel,
511 AttInMetadata *attinmeta,
512 List *retrieved_attrs,
513 ForeignScanState *fsstate,
514 MemoryContext temp_context);
515 static void conversion_error_callback(void *arg);
516 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
517 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
518 JoinPathExtraData *extra);
519 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
520 Node *havingQual);
521 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
522 RelOptInfo *rel);
523 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
524 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
525 Path *epq_path);
526 static void add_foreign_grouping_paths(PlannerInfo *root,
527 RelOptInfo *input_rel,
528 RelOptInfo *grouped_rel,
529 GroupPathExtraData *extra);
530 static void add_foreign_ordered_paths(PlannerInfo *root,
531 RelOptInfo *input_rel,
532 RelOptInfo *ordered_rel);
533 static void add_foreign_final_paths(PlannerInfo *root,
534 RelOptInfo *input_rel,
535 RelOptInfo *final_rel,
536 FinalPathExtraData *extra);
537 static void apply_server_options(PgFdwRelationInfo *fpinfo);
538 static void apply_table_options(PgFdwRelationInfo *fpinfo);
539 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
540 const PgFdwRelationInfo *fpinfo_o,
541 const PgFdwRelationInfo *fpinfo_i);
542 static int get_batch_size_option(Relation rel);
543
544
545 /*
546 * Foreign-data wrapper handler function: return a struct with pointers
547 * to my callback routines.
548 */
549 Datum
postgres_fdw_handler(PG_FUNCTION_ARGS)550 postgres_fdw_handler(PG_FUNCTION_ARGS)
551 {
552 FdwRoutine *routine = makeNode(FdwRoutine);
553
554 /* Functions for scanning foreign tables */
555 routine->GetForeignRelSize = postgresGetForeignRelSize;
556 routine->GetForeignPaths = postgresGetForeignPaths;
557 routine->GetForeignPlan = postgresGetForeignPlan;
558 routine->BeginForeignScan = postgresBeginForeignScan;
559 routine->IterateForeignScan = postgresIterateForeignScan;
560 routine->ReScanForeignScan = postgresReScanForeignScan;
561 routine->EndForeignScan = postgresEndForeignScan;
562
563 /* Functions for updating foreign tables */
564 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
565 routine->PlanForeignModify = postgresPlanForeignModify;
566 routine->BeginForeignModify = postgresBeginForeignModify;
567 routine->ExecForeignInsert = postgresExecForeignInsert;
568 routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
569 routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize;
570 routine->ExecForeignUpdate = postgresExecForeignUpdate;
571 routine->ExecForeignDelete = postgresExecForeignDelete;
572 routine->EndForeignModify = postgresEndForeignModify;
573 routine->BeginForeignInsert = postgresBeginForeignInsert;
574 routine->EndForeignInsert = postgresEndForeignInsert;
575 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
576 routine->PlanDirectModify = postgresPlanDirectModify;
577 routine->BeginDirectModify = postgresBeginDirectModify;
578 routine->IterateDirectModify = postgresIterateDirectModify;
579 routine->EndDirectModify = postgresEndDirectModify;
580
581 /* Function for EvalPlanQual rechecks */
582 routine->RecheckForeignScan = postgresRecheckForeignScan;
583 /* Support functions for EXPLAIN */
584 routine->ExplainForeignScan = postgresExplainForeignScan;
585 routine->ExplainForeignModify = postgresExplainForeignModify;
586 routine->ExplainDirectModify = postgresExplainDirectModify;
587
588 /* Support function for TRUNCATE */
589 routine->ExecForeignTruncate = postgresExecForeignTruncate;
590
591 /* Support functions for ANALYZE */
592 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
593
594 /* Support functions for IMPORT FOREIGN SCHEMA */
595 routine->ImportForeignSchema = postgresImportForeignSchema;
596
597 /* Support functions for join push-down */
598 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
599
600 /* Support functions for upper relation push-down */
601 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
602
603 /* Support functions for asynchronous execution */
604 routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
605 routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
606 routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
607 routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
608
609 PG_RETURN_POINTER(routine);
610 }
611
612 /*
613 * postgresGetForeignRelSize
614 * Estimate # of rows and width of the result of the scan
615 *
616 * We should consider the effect of all baserestrictinfo clauses here, but
617 * not any join clauses.
618 */
619 static void
postgresGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)620 postgresGetForeignRelSize(PlannerInfo *root,
621 RelOptInfo *baserel,
622 Oid foreigntableid)
623 {
624 PgFdwRelationInfo *fpinfo;
625 ListCell *lc;
626 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
627
628 /*
629 * We use PgFdwRelationInfo to pass various information to subsequent
630 * functions.
631 */
632 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
633 baserel->fdw_private = (void *) fpinfo;
634
635 /* Base foreign tables need to be pushed down always. */
636 fpinfo->pushdown_safe = true;
637
638 /* Look up foreign-table catalog info. */
639 fpinfo->table = GetForeignTable(foreigntableid);
640 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
641
642 /*
643 * Extract user-settable option values. Note that per-table settings of
644 * use_remote_estimate, fetch_size and async_capable override per-server
645 * settings of them, respectively.
646 */
647 fpinfo->use_remote_estimate = false;
648 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
649 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
650 fpinfo->shippable_extensions = NIL;
651 fpinfo->fetch_size = 100;
652 fpinfo->async_capable = false;
653
654 apply_server_options(fpinfo);
655 apply_table_options(fpinfo);
656
657 /*
658 * If the table or the server is configured to use remote estimates,
659 * identify which user to do remote access as during planning. This
660 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
661 * permissions, the query would have failed at runtime anyway.
662 */
663 if (fpinfo->use_remote_estimate)
664 {
665 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
666
667 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
668 }
669 else
670 fpinfo->user = NULL;
671
672 /*
673 * Identify which baserestrictinfo clauses can be sent to the remote
674 * server and which can't.
675 */
676 classifyConditions(root, baserel, baserel->baserestrictinfo,
677 &fpinfo->remote_conds, &fpinfo->local_conds);
678
679 /*
680 * Identify which attributes will need to be retrieved from the remote
681 * server. These include all attrs needed for joins or final output, plus
682 * all attrs used in the local_conds. (Note: if we end up using a
683 * parameterized scan, it's possible that some of the join clauses will be
684 * sent to the remote and thus we wouldn't really need to retrieve the
685 * columns used in them. Doesn't seem worth detecting that case though.)
686 */
687 fpinfo->attrs_used = NULL;
688 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
689 &fpinfo->attrs_used);
690 foreach(lc, fpinfo->local_conds)
691 {
692 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
693
694 pull_varattnos((Node *) rinfo->clause, baserel->relid,
695 &fpinfo->attrs_used);
696 }
697
698 /*
699 * Compute the selectivity and cost of the local_conds, so we don't have
700 * to do it over again for each path. The best we can do for these
701 * conditions is to estimate selectivity on the basis of local statistics.
702 */
703 fpinfo->local_conds_sel = clauselist_selectivity(root,
704 fpinfo->local_conds,
705 baserel->relid,
706 JOIN_INNER,
707 NULL);
708
709 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
710
711 /*
712 * Set # of retrieved rows and cached relation costs to some negative
713 * value, so that we can detect when they are set to some sensible values,
714 * during one (usually the first) of the calls to estimate_path_cost_size.
715 */
716 fpinfo->retrieved_rows = -1;
717 fpinfo->rel_startup_cost = -1;
718 fpinfo->rel_total_cost = -1;
719
720 /*
721 * If the table or the server is configured to use remote estimates,
722 * connect to the foreign server and execute EXPLAIN to estimate the
723 * number of rows selected by the restriction clauses, as well as the
724 * average row width. Otherwise, estimate using whatever statistics we
725 * have locally, in a way similar to ordinary tables.
726 */
727 if (fpinfo->use_remote_estimate)
728 {
729 /*
730 * Get cost/size estimates with help of remote server. Save the
731 * values in fpinfo so we don't need to do it again to generate the
732 * basic foreign path.
733 */
734 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
735 &fpinfo->rows, &fpinfo->width,
736 &fpinfo->startup_cost, &fpinfo->total_cost);
737
738 /* Report estimated baserel size to planner. */
739 baserel->rows = fpinfo->rows;
740 baserel->reltarget->width = fpinfo->width;
741 }
742 else
743 {
744 /*
745 * If the foreign table has never been ANALYZEd, it will have
746 * reltuples < 0, meaning "unknown". We can't do much if we're not
747 * allowed to consult the remote server, but we can use a hack similar
748 * to plancat.c's treatment of empty relations: use a minimum size
749 * estimate of 10 pages, and divide by the column-datatype-based width
750 * estimate to get the corresponding number of tuples.
751 */
752 if (baserel->tuples < 0)
753 {
754 baserel->pages = 10;
755 baserel->tuples =
756 (10 * BLCKSZ) / (baserel->reltarget->width +
757 MAXALIGN(SizeofHeapTupleHeader));
758 }
759
760 /* Estimate baserel size as best we can with local statistics. */
761 set_baserel_size_estimates(root, baserel);
762
763 /* Fill in basically-bogus cost estimates for use later. */
764 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
765 &fpinfo->rows, &fpinfo->width,
766 &fpinfo->startup_cost, &fpinfo->total_cost);
767 }
768
769 /*
770 * fpinfo->relation_name gets the numeric rangetable index of the foreign
771 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
772 * human-readable string at that time.)
773 */
774 fpinfo->relation_name = psprintf("%u", baserel->relid);
775
776 /* No outer and inner relations. */
777 fpinfo->make_outerrel_subquery = false;
778 fpinfo->make_innerrel_subquery = false;
779 fpinfo->lower_subquery_rels = NULL;
780 /* Set the relation index. */
781 fpinfo->relation_index = baserel->relid;
782 }
783
784 /*
785 * get_useful_ecs_for_relation
786 * Determine which EquivalenceClasses might be involved in useful
787 * orderings of this relation.
788 *
789 * This function is in some respects a mirror image of the core function
790 * pathkeys_useful_for_merging: for a regular table, we know what indexes
791 * we have and want to test whether any of them are useful. For a foreign
792 * table, we don't know what indexes are present on the remote side but
793 * want to speculate about which ones we'd like to use if they existed.
794 *
795 * This function returns a list of potentially-useful equivalence classes,
796 * but it does not guarantee that an EquivalenceMember exists which contains
797 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
798 * ft1.x + t1.x = 0, this function will say that the equivalence class
799 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
800 * t1 is local (or on a different server), it will turn out that no useful
801 * ORDER BY clause can be generated. It's not our job to figure that out
802 * here; we're only interested in identifying relevant ECs.
803 */
804 static List *
get_useful_ecs_for_relation(PlannerInfo * root,RelOptInfo * rel)805 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
806 {
807 List *useful_eclass_list = NIL;
808 ListCell *lc;
809 Relids relids;
810
811 /*
812 * First, consider whether any active EC is potentially useful for a merge
813 * join against this relation.
814 */
815 if (rel->has_eclass_joins)
816 {
817 foreach(lc, root->eq_classes)
818 {
819 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
820
821 if (eclass_useful_for_merging(root, cur_ec, rel))
822 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
823 }
824 }
825
826 /*
827 * Next, consider whether there are any non-EC derivable join clauses that
828 * are merge-joinable. If the joininfo list is empty, we can exit
829 * quickly.
830 */
831 if (rel->joininfo == NIL)
832 return useful_eclass_list;
833
834 /* If this is a child rel, we must use the topmost parent rel to search. */
835 if (IS_OTHER_REL(rel))
836 {
837 Assert(!bms_is_empty(rel->top_parent_relids));
838 relids = rel->top_parent_relids;
839 }
840 else
841 relids = rel->relids;
842
843 /* Check each join clause in turn. */
844 foreach(lc, rel->joininfo)
845 {
846 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
847
848 /* Consider only mergejoinable clauses */
849 if (restrictinfo->mergeopfamilies == NIL)
850 continue;
851
852 /* Make sure we've got canonical ECs. */
853 update_mergeclause_eclasses(root, restrictinfo);
854
855 /*
856 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
857 * that left_ec and right_ec will be initialized, per comments in
858 * distribute_qual_to_rels.
859 *
860 * We want to identify which side of this merge-joinable clause
861 * contains columns from the relation produced by this RelOptInfo. We
862 * test for overlap, not containment, because there could be extra
863 * relations on either side. For example, suppose we've got something
864 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
865 * A.y = D.y. The input rel might be the joinrel between A and B, and
866 * we'll consider the join clause A.y = D.y. relids contains a
867 * relation not involved in the join class (B) and the equivalence
868 * class for the left-hand side of the clause contains a relation not
869 * involved in the input rel (C). Despite the fact that we have only
870 * overlap and not containment in either direction, A.y is potentially
871 * useful as a sort column.
872 *
873 * Note that it's even possible that relids overlaps neither side of
874 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
875 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
876 * but overlaps neither side of B. In that case, we just skip this
877 * join clause, since it doesn't suggest a useful sort order for this
878 * relation.
879 */
880 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
881 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
882 restrictinfo->right_ec);
883 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
884 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
885 restrictinfo->left_ec);
886 }
887
888 return useful_eclass_list;
889 }
890
891 /*
892 * get_useful_pathkeys_for_relation
893 * Determine which orderings of a relation might be useful.
894 *
895 * Getting data in sorted order can be useful either because the requested
896 * order matches the final output ordering for the overall query we're
897 * planning, or because it enables an efficient merge join. Here, we try
898 * to figure out which pathkeys to consider.
899 */
900 static List *
get_useful_pathkeys_for_relation(PlannerInfo * root,RelOptInfo * rel)901 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
902 {
903 List *useful_pathkeys_list = NIL;
904 List *useful_eclass_list;
905 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
906 EquivalenceClass *query_ec = NULL;
907 ListCell *lc;
908
909 /*
910 * Pushing the query_pathkeys to the remote server is always worth
911 * considering, because it might let us avoid a local sort.
912 */
913 fpinfo->qp_is_pushdown_safe = false;
914 if (root->query_pathkeys)
915 {
916 bool query_pathkeys_ok = true;
917
918 foreach(lc, root->query_pathkeys)
919 {
920 PathKey *pathkey = (PathKey *) lfirst(lc);
921 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
922 Expr *em_expr;
923
924 /*
925 * The planner and executor don't have any clever strategy for
926 * taking data sorted by a prefix of the query's pathkeys and
927 * getting it to be sorted by all of those pathkeys. We'll just
928 * end up resorting the entire data set. So, unless we can push
929 * down all of the query pathkeys, forget it.
930 *
931 * is_foreign_expr would detect volatile expressions as well, but
932 * checking ec_has_volatile here saves some cycles.
933 */
934 if (pathkey_ec->ec_has_volatile ||
935 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
936 !is_foreign_expr(root, rel, em_expr))
937 {
938 query_pathkeys_ok = false;
939 break;
940 }
941 }
942
943 if (query_pathkeys_ok)
944 {
945 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
946 fpinfo->qp_is_pushdown_safe = true;
947 }
948 }
949
950 /*
951 * Even if we're not using remote estimates, having the remote side do the
952 * sort generally won't be any worse than doing it locally, and it might
953 * be much better if the remote side can generate data in the right order
954 * without needing a sort at all. However, what we're going to do next is
955 * try to generate pathkeys that seem promising for possible merge joins,
956 * and that's more speculative. A wrong choice might hurt quite a bit, so
957 * bail out if we can't use remote estimates.
958 */
959 if (!fpinfo->use_remote_estimate)
960 return useful_pathkeys_list;
961
962 /* Get the list of interesting EquivalenceClasses. */
963 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
964
965 /* Extract unique EC for query, if any, so we don't consider it again. */
966 if (list_length(root->query_pathkeys) == 1)
967 {
968 PathKey *query_pathkey = linitial(root->query_pathkeys);
969
970 query_ec = query_pathkey->pk_eclass;
971 }
972
973 /*
974 * As a heuristic, the only pathkeys we consider here are those of length
975 * one. It's surely possible to consider more, but since each one we
976 * choose to consider will generate a round-trip to the remote side, we
977 * need to be a bit cautious here. It would sure be nice to have a local
978 * cache of information about remote index definitions...
979 */
980 foreach(lc, useful_eclass_list)
981 {
982 EquivalenceClass *cur_ec = lfirst(lc);
983 Expr *em_expr;
984 PathKey *pathkey;
985
986 /* If redundant with what we did above, skip it. */
987 if (cur_ec == query_ec)
988 continue;
989
990 /* If no pushable expression for this rel, skip it. */
991 em_expr = find_em_expr_for_rel(cur_ec, rel);
992 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
993 continue;
994
995 /* Looks like we can generate a pathkey, so let's do it. */
996 pathkey = make_canonical_pathkey(root, cur_ec,
997 linitial_oid(cur_ec->ec_opfamilies),
998 BTLessStrategyNumber,
999 false);
1000 useful_pathkeys_list = lappend(useful_pathkeys_list,
1001 list_make1(pathkey));
1002 }
1003
1004 return useful_pathkeys_list;
1005 }
1006
1007 /*
1008 * postgresGetForeignPaths
1009 * Create possible scan paths for a scan on the foreign table
1010 */
1011 static void
postgresGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)1012 postgresGetForeignPaths(PlannerInfo *root,
1013 RelOptInfo *baserel,
1014 Oid foreigntableid)
1015 {
1016 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1017 ForeignPath *path;
1018 List *ppi_list;
1019 ListCell *lc;
1020
1021 /*
1022 * Create simplest ForeignScan path node and add it to baserel. This path
1023 * corresponds to SeqScan path of regular tables (though depending on what
1024 * baserestrict conditions we were able to send to remote, there might
1025 * actually be an indexscan happening there). We already did all the work
1026 * to estimate cost and size of this path.
1027 *
1028 * Although this path uses no join clauses, it could still have required
1029 * parameterization due to LATERAL refs in its tlist.
1030 */
1031 path = create_foreignscan_path(root, baserel,
1032 NULL, /* default pathtarget */
1033 fpinfo->rows,
1034 fpinfo->startup_cost,
1035 fpinfo->total_cost,
1036 NIL, /* no pathkeys */
1037 baserel->lateral_relids,
1038 NULL, /* no extra plan */
1039 NIL); /* no fdw_private list */
1040 add_path(baserel, (Path *) path);
1041
1042 /* Add paths with pathkeys */
1043 add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1044
1045 /*
1046 * If we're not using remote estimates, stop here. We have no way to
1047 * estimate whether any join clauses would be worth sending across, so
1048 * don't bother building parameterized paths.
1049 */
1050 if (!fpinfo->use_remote_estimate)
1051 return;
1052
1053 /*
1054 * Thumb through all join clauses for the rel to identify which outer
1055 * relations could supply one or more safe-to-send-to-remote join clauses.
1056 * We'll build a parameterized path for each such outer relation.
1057 *
1058 * It's convenient to manage this by representing each candidate outer
1059 * relation by the ParamPathInfo node for it. We can then use the
1060 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1061 * interesting join clauses for that rel. This takes care of the
1062 * possibility that there are multiple safe join clauses for such a rel,
1063 * and also ensures that we account for unsafe join clauses that we'll
1064 * still have to enforce locally (since the parameterized-path machinery
1065 * insists that we handle all movable clauses).
1066 */
1067 ppi_list = NIL;
1068 foreach(lc, baserel->joininfo)
1069 {
1070 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1071 Relids required_outer;
1072 ParamPathInfo *param_info;
1073
1074 /* Check if clause can be moved to this rel */
1075 if (!join_clause_is_movable_to(rinfo, baserel))
1076 continue;
1077
1078 /* See if it is safe to send to remote */
1079 if (!is_foreign_expr(root, baserel, rinfo->clause))
1080 continue;
1081
1082 /* Calculate required outer rels for the resulting path */
1083 required_outer = bms_union(rinfo->clause_relids,
1084 baserel->lateral_relids);
1085 /* We do not want the foreign rel itself listed in required_outer */
1086 required_outer = bms_del_member(required_outer, baserel->relid);
1087
1088 /*
1089 * required_outer probably can't be empty here, but if it were, we
1090 * couldn't make a parameterized path.
1091 */
1092 if (bms_is_empty(required_outer))
1093 continue;
1094
1095 /* Get the ParamPathInfo */
1096 param_info = get_baserel_parampathinfo(root, baserel,
1097 required_outer);
1098 Assert(param_info != NULL);
1099
1100 /*
1101 * Add it to list unless we already have it. Testing pointer equality
1102 * is OK since get_baserel_parampathinfo won't make duplicates.
1103 */
1104 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1105 }
1106
1107 /*
1108 * The above scan examined only "generic" join clauses, not those that
1109 * were absorbed into EquivalenceClauses. See if we can make anything out
1110 * of EquivalenceClauses.
1111 */
1112 if (baserel->has_eclass_joins)
1113 {
1114 /*
1115 * We repeatedly scan the eclass list looking for column references
1116 * (or expressions) belonging to the foreign rel. Each time we find
1117 * one, we generate a list of equivalence joinclauses for it, and then
1118 * see if any are safe to send to the remote. Repeat till there are
1119 * no more candidate EC members.
1120 */
1121 ec_member_foreign_arg arg;
1122
1123 arg.already_used = NIL;
1124 for (;;)
1125 {
1126 List *clauses;
1127
1128 /* Make clauses, skipping any that join to lateral_referencers */
1129 arg.current = NULL;
1130 clauses = generate_implied_equalities_for_column(root,
1131 baserel,
1132 ec_member_matches_foreign,
1133 (void *) &arg,
1134 baserel->lateral_referencers);
1135
1136 /* Done if there are no more expressions in the foreign rel */
1137 if (arg.current == NULL)
1138 {
1139 Assert(clauses == NIL);
1140 break;
1141 }
1142
1143 /* Scan the extracted join clauses */
1144 foreach(lc, clauses)
1145 {
1146 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1147 Relids required_outer;
1148 ParamPathInfo *param_info;
1149
1150 /* Check if clause can be moved to this rel */
1151 if (!join_clause_is_movable_to(rinfo, baserel))
1152 continue;
1153
1154 /* See if it is safe to send to remote */
1155 if (!is_foreign_expr(root, baserel, rinfo->clause))
1156 continue;
1157
1158 /* Calculate required outer rels for the resulting path */
1159 required_outer = bms_union(rinfo->clause_relids,
1160 baserel->lateral_relids);
1161 required_outer = bms_del_member(required_outer, baserel->relid);
1162 if (bms_is_empty(required_outer))
1163 continue;
1164
1165 /* Get the ParamPathInfo */
1166 param_info = get_baserel_parampathinfo(root, baserel,
1167 required_outer);
1168 Assert(param_info != NULL);
1169
1170 /* Add it to list unless we already have it */
1171 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1172 }
1173
1174 /* Try again, now ignoring the expression we found this time */
1175 arg.already_used = lappend(arg.already_used, arg.current);
1176 }
1177 }
1178
1179 /*
1180 * Now build a path for each useful outer relation.
1181 */
1182 foreach(lc, ppi_list)
1183 {
1184 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1185 double rows;
1186 int width;
1187 Cost startup_cost;
1188 Cost total_cost;
1189
1190 /* Get a cost estimate from the remote */
1191 estimate_path_cost_size(root, baserel,
1192 param_info->ppi_clauses, NIL, NULL,
1193 &rows, &width,
1194 &startup_cost, &total_cost);
1195
1196 /*
1197 * ppi_rows currently won't get looked at by anything, but still we
1198 * may as well ensure that it matches our idea of the rowcount.
1199 */
1200 param_info->ppi_rows = rows;
1201
1202 /* Make the path */
1203 path = create_foreignscan_path(root, baserel,
1204 NULL, /* default pathtarget */
1205 rows,
1206 startup_cost,
1207 total_cost,
1208 NIL, /* no pathkeys */
1209 param_info->ppi_req_outer,
1210 NULL,
1211 NIL); /* no fdw_private list */
1212 add_path(baserel, (Path *) path);
1213 }
1214 }
1215
1216 /*
1217 * postgresGetForeignPlan
1218 * Create ForeignScan plan node which implements selected best path
1219 */
1220 static ForeignScan *
postgresGetForeignPlan(PlannerInfo * root,RelOptInfo * foreignrel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)1221 postgresGetForeignPlan(PlannerInfo *root,
1222 RelOptInfo *foreignrel,
1223 Oid foreigntableid,
1224 ForeignPath *best_path,
1225 List *tlist,
1226 List *scan_clauses,
1227 Plan *outer_plan)
1228 {
1229 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1230 Index scan_relid;
1231 List *fdw_private;
1232 List *remote_exprs = NIL;
1233 List *local_exprs = NIL;
1234 List *params_list = NIL;
1235 List *fdw_scan_tlist = NIL;
1236 List *fdw_recheck_quals = NIL;
1237 List *retrieved_attrs;
1238 StringInfoData sql;
1239 bool has_final_sort = false;
1240 bool has_limit = false;
1241 ListCell *lc;
1242
1243 /*
1244 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1245 */
1246 if (best_path->fdw_private)
1247 {
1248 has_final_sort = intVal(list_nth(best_path->fdw_private,
1249 FdwPathPrivateHasFinalSort));
1250 has_limit = intVal(list_nth(best_path->fdw_private,
1251 FdwPathPrivateHasLimit));
1252 }
1253
1254 if (IS_SIMPLE_REL(foreignrel))
1255 {
1256 /*
1257 * For base relations, set scan_relid as the relid of the relation.
1258 */
1259 scan_relid = foreignrel->relid;
1260
1261 /*
1262 * In a base-relation scan, we must apply the given scan_clauses.
1263 *
1264 * Separate the scan_clauses into those that can be executed remotely
1265 * and those that can't. baserestrictinfo clauses that were
1266 * previously determined to be safe or unsafe by classifyConditions
1267 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1268 * else in the scan_clauses list will be a join clause, which we have
1269 * to check for remote-safety.
1270 *
1271 * Note: the join clauses we see here should be the exact same ones
1272 * previously examined by postgresGetForeignPaths. Possibly it'd be
1273 * worth passing forward the classification work done then, rather
1274 * than repeating it here.
1275 *
1276 * This code must match "extract_actual_clauses(scan_clauses, false)"
1277 * except for the additional decision about remote versus local
1278 * execution.
1279 */
1280 foreach(lc, scan_clauses)
1281 {
1282 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1283
1284 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1285 if (rinfo->pseudoconstant)
1286 continue;
1287
1288 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1289 remote_exprs = lappend(remote_exprs, rinfo->clause);
1290 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1291 local_exprs = lappend(local_exprs, rinfo->clause);
1292 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1293 remote_exprs = lappend(remote_exprs, rinfo->clause);
1294 else
1295 local_exprs = lappend(local_exprs, rinfo->clause);
1296 }
1297
1298 /*
1299 * For a base-relation scan, we have to support EPQ recheck, which
1300 * should recheck all the remote quals.
1301 */
1302 fdw_recheck_quals = remote_exprs;
1303 }
1304 else
1305 {
1306 /*
1307 * Join relation or upper relation - set scan_relid to 0.
1308 */
1309 scan_relid = 0;
1310
1311 /*
1312 * For a join rel, baserestrictinfo is NIL and we are not considering
1313 * parameterization right now, so there should be no scan_clauses for
1314 * a joinrel or an upper rel either.
1315 */
1316 Assert(!scan_clauses);
1317
1318 /*
1319 * Instead we get the conditions to apply from the fdw_private
1320 * structure.
1321 */
1322 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1323 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1324
1325 /*
1326 * We leave fdw_recheck_quals empty in this case, since we never need
1327 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1328 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1329 * If we're planning an upperrel (ie, remote grouping or aggregation)
1330 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1331 * allowed, and indeed we *can't* put the remote clauses into
1332 * fdw_recheck_quals because the unaggregated Vars won't be available
1333 * locally.
1334 */
1335
1336 /* Build the list of columns to be fetched from the foreign server. */
1337 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1338
1339 /*
1340 * Ensure that the outer plan produces a tuple whose descriptor
1341 * matches our scan tuple slot. Also, remove the local conditions
1342 * from outer plan's quals, lest they be evaluated twice, once by the
1343 * local plan and once by the scan.
1344 */
1345 if (outer_plan)
1346 {
1347 ListCell *lc;
1348
1349 /*
1350 * Right now, we only consider grouping and aggregation beyond
1351 * joins. Queries involving aggregates or grouping do not require
1352 * EPQ mechanism, hence should not have an outer plan here.
1353 */
1354 Assert(!IS_UPPER_REL(foreignrel));
1355
1356 /*
1357 * First, update the plan's qual list if possible. In some cases
1358 * the quals might be enforced below the topmost plan level, in
1359 * which case we'll fail to remove them; it's not worth working
1360 * harder than this.
1361 */
1362 foreach(lc, local_exprs)
1363 {
1364 Node *qual = lfirst(lc);
1365
1366 outer_plan->qual = list_delete(outer_plan->qual, qual);
1367
1368 /*
1369 * For an inner join the local conditions of foreign scan plan
1370 * can be part of the joinquals as well. (They might also be
1371 * in the mergequals or hashquals, but we can't touch those
1372 * without breaking the plan.)
1373 */
1374 if (IsA(outer_plan, NestLoop) ||
1375 IsA(outer_plan, MergeJoin) ||
1376 IsA(outer_plan, HashJoin))
1377 {
1378 Join *join_plan = (Join *) outer_plan;
1379
1380 if (join_plan->jointype == JOIN_INNER)
1381 join_plan->joinqual = list_delete(join_plan->joinqual,
1382 qual);
1383 }
1384 }
1385
1386 /*
1387 * Now fix the subplan's tlist --- this might result in inserting
1388 * a Result node atop the plan tree.
1389 */
1390 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1391 best_path->path.parallel_safe);
1392 }
1393 }
1394
1395 /*
1396 * Build the query string to be sent for execution, and identify
1397 * expressions to be sent as parameters.
1398 */
1399 initStringInfo(&sql);
1400 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1401 remote_exprs, best_path->path.pathkeys,
1402 has_final_sort, has_limit, false,
1403 &retrieved_attrs, ¶ms_list);
1404
1405 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1406 fpinfo->final_remote_exprs = remote_exprs;
1407
1408 /*
1409 * Build the fdw_private list that will be available to the executor.
1410 * Items in the list must match order in enum FdwScanPrivateIndex.
1411 */
1412 fdw_private = list_make3(makeString(sql.data),
1413 retrieved_attrs,
1414 makeInteger(fpinfo->fetch_size));
1415 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1416 fdw_private = lappend(fdw_private,
1417 makeString(fpinfo->relation_name));
1418
1419 /*
1420 * Create the ForeignScan node for the given relation.
1421 *
1422 * Note that the remote parameter expressions are stored in the fdw_exprs
1423 * field of the finished plan node; we can't keep them in private state
1424 * because then they wouldn't be subject to later planner processing.
1425 */
1426 return make_foreignscan(tlist,
1427 local_exprs,
1428 scan_relid,
1429 params_list,
1430 fdw_private,
1431 fdw_scan_tlist,
1432 fdw_recheck_quals,
1433 outer_plan);
1434 }
1435
1436 /*
1437 * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1438 */
1439 static TupleDesc
get_tupdesc_for_join_scan_tuples(ForeignScanState * node)1440 get_tupdesc_for_join_scan_tuples(ForeignScanState *node)
1441 {
1442 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1443 EState *estate = node->ss.ps.state;
1444 TupleDesc tupdesc;
1445
1446 /*
1447 * The core code has already set up a scan tuple slot based on
1448 * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1449 * but there's one case where it isn't. If we have any whole-row row
1450 * identifier Vars, they may have vartype RECORD, and we need to replace
1451 * that with the associated table's actual composite type. This ensures
1452 * that when we read those ROW() expression values from the remote server,
1453 * we can convert them to a composite type the local server knows.
1454 */
1455 tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1456 for (int i = 0; i < tupdesc->natts; i++)
1457 {
1458 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1459 Var *var;
1460 RangeTblEntry *rte;
1461 Oid reltype;
1462
1463 /* Nothing to do if it's not a generic RECORD attribute */
1464 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1465 continue;
1466
1467 /*
1468 * If we can't identify the referenced table, do nothing. This'll
1469 * likely lead to failure later, but perhaps we can muddle through.
1470 */
1471 var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1472 i)->expr;
1473 if (!IsA(var, Var) || var->varattno != 0)
1474 continue;
1475 rte = list_nth(estate->es_range_table, var->varno - 1);
1476 if (rte->rtekind != RTE_RELATION)
1477 continue;
1478 reltype = get_rel_type_id(rte->relid);
1479 if (!OidIsValid(reltype))
1480 continue;
1481 att->atttypid = reltype;
1482 /* shouldn't need to change anything else */
1483 }
1484 return tupdesc;
1485 }
1486
1487 /*
1488 * postgresBeginForeignScan
1489 * Initiate an executor scan of a foreign PostgreSQL table.
1490 */
1491 static void
postgresBeginForeignScan(ForeignScanState * node,int eflags)1492 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1493 {
1494 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1495 EState *estate = node->ss.ps.state;
1496 PgFdwScanState *fsstate;
1497 RangeTblEntry *rte;
1498 Oid userid;
1499 ForeignTable *table;
1500 UserMapping *user;
1501 int rtindex;
1502 int numParams;
1503
1504 /*
1505 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1506 */
1507 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1508 return;
1509
1510 /*
1511 * We'll save private state in node->fdw_state.
1512 */
1513 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1514 node->fdw_state = (void *) fsstate;
1515
1516 /*
1517 * Identify which user to do the remote access as. This should match what
1518 * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1519 * lowest-numbered member RTE as a representative; we would get the same
1520 * result from any.
1521 */
1522 if (fsplan->scan.scanrelid > 0)
1523 rtindex = fsplan->scan.scanrelid;
1524 else
1525 rtindex = bms_next_member(fsplan->fs_relids, -1);
1526 rte = exec_rt_fetch(rtindex, estate);
1527 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1528
1529 /* Get info about foreign table. */
1530 table = GetForeignTable(rte->relid);
1531 user = GetUserMapping(userid, table->serverid);
1532
1533 /*
1534 * Get connection to the foreign server. Connection manager will
1535 * establish new connection if necessary.
1536 */
1537 fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1538
1539 /* Assign a unique ID for my cursor */
1540 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1541 fsstate->cursor_exists = false;
1542
1543 /* Get private info created by planner functions. */
1544 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1545 FdwScanPrivateSelectSql));
1546 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1547 FdwScanPrivateRetrievedAttrs);
1548 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1549 FdwScanPrivateFetchSize));
1550
1551 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1552 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1553 "postgres_fdw tuple data",
1554 ALLOCSET_DEFAULT_SIZES);
1555 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1556 "postgres_fdw temporary data",
1557 ALLOCSET_SMALL_SIZES);
1558
1559 /*
1560 * Get info we'll need for converting data fetched from the foreign server
1561 * into local representation and error reporting during that process.
1562 */
1563 if (fsplan->scan.scanrelid > 0)
1564 {
1565 fsstate->rel = node->ss.ss_currentRelation;
1566 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1567 }
1568 else
1569 {
1570 fsstate->rel = NULL;
1571 fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1572 }
1573
1574 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1575
1576 /*
1577 * Prepare for processing of parameters used in remote query, if any.
1578 */
1579 numParams = list_length(fsplan->fdw_exprs);
1580 fsstate->numParams = numParams;
1581 if (numParams > 0)
1582 prepare_query_params((PlanState *) node,
1583 fsplan->fdw_exprs,
1584 numParams,
1585 &fsstate->param_flinfo,
1586 &fsstate->param_exprs,
1587 &fsstate->param_values);
1588
1589 /* Set the async-capable flag */
1590 fsstate->async_capable = node->ss.ps.async_capable;
1591 }
1592
1593 /*
1594 * postgresIterateForeignScan
1595 * Retrieve next row from the result set, or clear tuple slot to indicate
1596 * EOF.
1597 */
1598 static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState * node)1599 postgresIterateForeignScan(ForeignScanState *node)
1600 {
1601 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1602 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1603
1604 /*
1605 * In sync mode, if this is the first call after Begin or ReScan, we need
1606 * to create the cursor on the remote side. In async mode, we would have
1607 * already created the cursor before we get here, even if this is the
1608 * first call after Begin or ReScan.
1609 */
1610 if (!fsstate->cursor_exists)
1611 create_cursor(node);
1612
1613 /*
1614 * Get some more tuples, if we've run out.
1615 */
1616 if (fsstate->next_tuple >= fsstate->num_tuples)
1617 {
1618 /* In async mode, just clear tuple slot. */
1619 if (fsstate->async_capable)
1620 return ExecClearTuple(slot);
1621 /* No point in another fetch if we already detected EOF, though. */
1622 if (!fsstate->eof_reached)
1623 fetch_more_data(node);
1624 /* If we didn't get any tuples, must be end of data. */
1625 if (fsstate->next_tuple >= fsstate->num_tuples)
1626 return ExecClearTuple(slot);
1627 }
1628
1629 /*
1630 * Return the next tuple.
1631 */
1632 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1633 slot,
1634 false);
1635
1636 return slot;
1637 }
1638
1639 /*
1640 * postgresReScanForeignScan
1641 * Restart the scan.
1642 */
1643 static void
postgresReScanForeignScan(ForeignScanState * node)1644 postgresReScanForeignScan(ForeignScanState *node)
1645 {
1646 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1647 char sql[64];
1648 PGresult *res;
1649
1650 /* If we haven't created the cursor yet, nothing to do. */
1651 if (!fsstate->cursor_exists)
1652 return;
1653
1654 /*
1655 * If any internal parameters affecting this node have changed, we'd
1656 * better destroy and recreate the cursor. Otherwise, rewinding it should
1657 * be good enough. If we've only fetched zero or one batch, we needn't
1658 * even rewind the cursor, just rescan what we have.
1659 */
1660 if (node->ss.ps.chgParam != NULL)
1661 {
1662 fsstate->cursor_exists = false;
1663 snprintf(sql, sizeof(sql), "CLOSE c%u",
1664 fsstate->cursor_number);
1665 }
1666 else if (fsstate->fetch_ct_2 > 1)
1667 {
1668 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1669 fsstate->cursor_number);
1670 }
1671 else
1672 {
1673 /* Easy: just rescan what we already have in memory, if anything */
1674 fsstate->next_tuple = 0;
1675 return;
1676 }
1677
1678 /*
1679 * We don't use a PG_TRY block here, so be careful not to throw error
1680 * without releasing the PGresult.
1681 */
1682 res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1683 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1684 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1685 PQclear(res);
1686
1687 /* Now force a fresh FETCH. */
1688 fsstate->tuples = NULL;
1689 fsstate->num_tuples = 0;
1690 fsstate->next_tuple = 0;
1691 fsstate->fetch_ct_2 = 0;
1692 fsstate->eof_reached = false;
1693 }
1694
1695 /*
1696 * postgresEndForeignScan
1697 * Finish scanning foreign table and dispose objects used for this scan
1698 */
1699 static void
postgresEndForeignScan(ForeignScanState * node)1700 postgresEndForeignScan(ForeignScanState *node)
1701 {
1702 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1703
1704 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1705 if (fsstate == NULL)
1706 return;
1707
1708 /* Close the cursor if open, to prevent accumulation of cursors */
1709 if (fsstate->cursor_exists)
1710 close_cursor(fsstate->conn, fsstate->cursor_number,
1711 fsstate->conn_state);
1712
1713 /* Release remote connection */
1714 ReleaseConnection(fsstate->conn);
1715 fsstate->conn = NULL;
1716
1717 /* MemoryContexts will be deleted automatically. */
1718 }
1719
1720 /*
1721 * postgresAddForeignUpdateTargets
1722 * Add resjunk column(s) needed for update/delete on a foreign table
1723 */
1724 static void
postgresAddForeignUpdateTargets(PlannerInfo * root,Index rtindex,RangeTblEntry * target_rte,Relation target_relation)1725 postgresAddForeignUpdateTargets(PlannerInfo *root,
1726 Index rtindex,
1727 RangeTblEntry *target_rte,
1728 Relation target_relation)
1729 {
1730 Var *var;
1731
1732 /*
1733 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1734 */
1735
1736 /* Make a Var representing the desired value */
1737 var = makeVar(rtindex,
1738 SelfItemPointerAttributeNumber,
1739 TIDOID,
1740 -1,
1741 InvalidOid,
1742 0);
1743
1744 /* Register it as a row-identity column needed by this target rel */
1745 add_row_identity_var(root, var, rtindex, "ctid");
1746 }
1747
1748 /*
1749 * postgresPlanForeignModify
1750 * Plan an insert/update/delete operation on a foreign table
1751 */
1752 static List *
postgresPlanForeignModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)1753 postgresPlanForeignModify(PlannerInfo *root,
1754 ModifyTable *plan,
1755 Index resultRelation,
1756 int subplan_index)
1757 {
1758 CmdType operation = plan->operation;
1759 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1760 Relation rel;
1761 StringInfoData sql;
1762 List *targetAttrs = NIL;
1763 List *withCheckOptionList = NIL;
1764 List *returningList = NIL;
1765 List *retrieved_attrs = NIL;
1766 bool doNothing = false;
1767 int values_end_len = -1;
1768
1769 initStringInfo(&sql);
1770
1771 /*
1772 * Core code already has some lock on each rel being planned, so we can
1773 * use NoLock here.
1774 */
1775 rel = table_open(rte->relid, NoLock);
1776
1777 /*
1778 * In an INSERT, we transmit all columns that are defined in the foreign
1779 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1780 * foreign table, we transmit all columns like INSERT; else we transmit
1781 * only columns that were explicitly targets of the UPDATE, so as to avoid
1782 * unnecessary data transmission. (We can't do that for INSERT since we
1783 * would miss sending default values for columns not listed in the source
1784 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1785 * those triggers might change values for non-target columns, in which
1786 * case we would miss sending changed values for those columns.)
1787 */
1788 if (operation == CMD_INSERT ||
1789 (operation == CMD_UPDATE &&
1790 rel->trigdesc &&
1791 rel->trigdesc->trig_update_before_row))
1792 {
1793 TupleDesc tupdesc = RelationGetDescr(rel);
1794 int attnum;
1795
1796 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1797 {
1798 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1799
1800 if (!attr->attisdropped)
1801 targetAttrs = lappend_int(targetAttrs, attnum);
1802 }
1803 }
1804 else if (operation == CMD_UPDATE)
1805 {
1806 int col;
1807 Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1808
1809 col = -1;
1810 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1811 {
1812 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1813 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1814
1815 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1816 elog(ERROR, "system-column update is not supported");
1817 targetAttrs = lappend_int(targetAttrs, attno);
1818 }
1819 }
1820
1821 /*
1822 * Extract the relevant WITH CHECK OPTION list if any.
1823 */
1824 if (plan->withCheckOptionLists)
1825 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1826 subplan_index);
1827
1828 /*
1829 * Extract the relevant RETURNING list if any.
1830 */
1831 if (plan->returningLists)
1832 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1833
1834 /*
1835 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1836 * should have already been rejected in the optimizer, as presently there
1837 * is no way to recognize an arbiter index on a foreign table. Only DO
1838 * NOTHING is supported without an inference specification.
1839 */
1840 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1841 doNothing = true;
1842 else if (plan->onConflictAction != ONCONFLICT_NONE)
1843 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1844 (int) plan->onConflictAction);
1845
1846 /*
1847 * Construct the SQL command string.
1848 */
1849 switch (operation)
1850 {
1851 case CMD_INSERT:
1852 deparseInsertSql(&sql, rte, resultRelation, rel,
1853 targetAttrs, doNothing,
1854 withCheckOptionList, returningList,
1855 &retrieved_attrs, &values_end_len);
1856 break;
1857 case CMD_UPDATE:
1858 deparseUpdateSql(&sql, rte, resultRelation, rel,
1859 targetAttrs,
1860 withCheckOptionList, returningList,
1861 &retrieved_attrs);
1862 break;
1863 case CMD_DELETE:
1864 deparseDeleteSql(&sql, rte, resultRelation, rel,
1865 returningList,
1866 &retrieved_attrs);
1867 break;
1868 default:
1869 elog(ERROR, "unexpected operation: %d", (int) operation);
1870 break;
1871 }
1872
1873 table_close(rel, NoLock);
1874
1875 /*
1876 * Build the fdw_private list that will be available to the executor.
1877 * Items in the list must match enum FdwModifyPrivateIndex, above.
1878 */
1879 return list_make5(makeString(sql.data),
1880 targetAttrs,
1881 makeInteger(values_end_len),
1882 makeInteger((retrieved_attrs != NIL)),
1883 retrieved_attrs);
1884 }
1885
1886 /*
1887 * postgresBeginForeignModify
1888 * Begin an insert/update/delete operation on a foreign table
1889 */
1890 static void
postgresBeginForeignModify(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo,List * fdw_private,int subplan_index,int eflags)1891 postgresBeginForeignModify(ModifyTableState *mtstate,
1892 ResultRelInfo *resultRelInfo,
1893 List *fdw_private,
1894 int subplan_index,
1895 int eflags)
1896 {
1897 PgFdwModifyState *fmstate;
1898 char *query;
1899 List *target_attrs;
1900 bool has_returning;
1901 int values_end_len;
1902 List *retrieved_attrs;
1903 RangeTblEntry *rte;
1904
1905 /*
1906 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1907 * stays NULL.
1908 */
1909 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1910 return;
1911
1912 /* Deconstruct fdw_private data. */
1913 query = strVal(list_nth(fdw_private,
1914 FdwModifyPrivateUpdateSql));
1915 target_attrs = (List *) list_nth(fdw_private,
1916 FdwModifyPrivateTargetAttnums);
1917 values_end_len = intVal(list_nth(fdw_private,
1918 FdwModifyPrivateLen));
1919 has_returning = intVal(list_nth(fdw_private,
1920 FdwModifyPrivateHasReturning));
1921 retrieved_attrs = (List *) list_nth(fdw_private,
1922 FdwModifyPrivateRetrievedAttrs);
1923
1924 /* Find RTE. */
1925 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1926 mtstate->ps.state);
1927
1928 /* Construct an execution state. */
1929 fmstate = create_foreign_modify(mtstate->ps.state,
1930 rte,
1931 resultRelInfo,
1932 mtstate->operation,
1933 outerPlanState(mtstate)->plan,
1934 query,
1935 target_attrs,
1936 values_end_len,
1937 has_returning,
1938 retrieved_attrs);
1939
1940 resultRelInfo->ri_FdwState = fmstate;
1941 }
1942
1943 /*
1944 * postgresExecForeignInsert
1945 * Insert one row into a foreign table
1946 */
1947 static TupleTableSlot *
postgresExecForeignInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)1948 postgresExecForeignInsert(EState *estate,
1949 ResultRelInfo *resultRelInfo,
1950 TupleTableSlot *slot,
1951 TupleTableSlot *planSlot)
1952 {
1953 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1954 TupleTableSlot **rslot;
1955 int numSlots = 1;
1956
1957 /*
1958 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1959 * postgresBeginForeignInsert())
1960 */
1961 if (fmstate->aux_fmstate)
1962 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1963 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1964 &slot, &planSlot, &numSlots);
1965 /* Revert that change */
1966 if (fmstate->aux_fmstate)
1967 resultRelInfo->ri_FdwState = fmstate;
1968
1969 return rslot ? *rslot : NULL;
1970 }
1971
1972 /*
1973 * postgresExecForeignBatchInsert
1974 * Insert multiple rows into a foreign table
1975 */
1976 static TupleTableSlot **
postgresExecForeignBatchInsert(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot ** slots,TupleTableSlot ** planSlots,int * numSlots)1977 postgresExecForeignBatchInsert(EState *estate,
1978 ResultRelInfo *resultRelInfo,
1979 TupleTableSlot **slots,
1980 TupleTableSlot **planSlots,
1981 int *numSlots)
1982 {
1983 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1984 TupleTableSlot **rslot;
1985
1986 /*
1987 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1988 * postgresBeginForeignInsert())
1989 */
1990 if (fmstate->aux_fmstate)
1991 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1992 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1993 slots, planSlots, numSlots);
1994 /* Revert that change */
1995 if (fmstate->aux_fmstate)
1996 resultRelInfo->ri_FdwState = fmstate;
1997
1998 return rslot;
1999 }
2000
2001 /*
2002 * postgresGetForeignModifyBatchSize
2003 * Determine the maximum number of tuples that can be inserted in bulk
2004 *
2005 * Returns the batch size specified for server or table. When batching is not
2006 * allowed (e.g. for tables with AFTER ROW triggers or with RETURNING clause),
2007 * returns 1.
2008 */
2009 static int
postgresGetForeignModifyBatchSize(ResultRelInfo * resultRelInfo)2010 postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
2011 {
2012 int batch_size;
2013 PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState ?
2014 (PgFdwModifyState *) resultRelInfo->ri_FdwState :
2015 NULL;
2016
2017 /* should be called only once */
2018 Assert(resultRelInfo->ri_BatchSize == 0);
2019
2020 /*
2021 * Should never get called when the insert is being performed as part of a
2022 * row movement operation.
2023 */
2024 Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2025
2026 /*
2027 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2028 * the option directly in server/table options. Otherwise just use the
2029 * value we determined earlier.
2030 */
2031 if (fmstate)
2032 batch_size = fmstate->batch_size;
2033 else
2034 batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2035
2036 /* Disable batching when we have to use RETURNING. */
2037 if (resultRelInfo->ri_projectReturning != NULL ||
2038 (resultRelInfo->ri_TrigDesc &&
2039 resultRelInfo->ri_TrigDesc->trig_insert_after_row))
2040 return 1;
2041
2042 /*
2043 * Otherwise use the batch size specified for server/table. The number of
2044 * parameters in a batch is limited to 65535 (uint16), so make sure we
2045 * don't exceed this limit by using the maximum batch_size possible.
2046 */
2047 if (fmstate && fmstate->p_nums > 0)
2048 batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2049
2050 return batch_size;
2051 }
2052
2053 /*
2054 * postgresExecForeignUpdate
2055 * Update one row in a foreign table
2056 */
2057 static TupleTableSlot *
postgresExecForeignUpdate(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)2058 postgresExecForeignUpdate(EState *estate,
2059 ResultRelInfo *resultRelInfo,
2060 TupleTableSlot *slot,
2061 TupleTableSlot *planSlot)
2062 {
2063 TupleTableSlot **rslot;
2064 int numSlots = 1;
2065
2066 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2067 &slot, &planSlot, &numSlots);
2068
2069 return rslot ? rslot[0] : NULL;
2070 }
2071
2072 /*
2073 * postgresExecForeignDelete
2074 * Delete one row from a foreign table
2075 */
2076 static TupleTableSlot *
postgresExecForeignDelete(EState * estate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,TupleTableSlot * planSlot)2077 postgresExecForeignDelete(EState *estate,
2078 ResultRelInfo *resultRelInfo,
2079 TupleTableSlot *slot,
2080 TupleTableSlot *planSlot)
2081 {
2082 TupleTableSlot **rslot;
2083 int numSlots = 1;
2084
2085 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2086 &slot, &planSlot, &numSlots);
2087
2088 return rslot ? rslot[0] : NULL;
2089 }
2090
2091 /*
2092 * postgresEndForeignModify
2093 * Finish an insert/update/delete operation on a foreign table
2094 */
2095 static void
postgresEndForeignModify(EState * estate,ResultRelInfo * resultRelInfo)2096 postgresEndForeignModify(EState *estate,
2097 ResultRelInfo *resultRelInfo)
2098 {
2099 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2100
2101 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2102 if (fmstate == NULL)
2103 return;
2104
2105 /* Destroy the execution state */
2106 finish_foreign_modify(fmstate);
2107 }
2108
2109 /*
2110 * postgresBeginForeignInsert
2111 * Begin an insert operation on a foreign table
2112 */
2113 static void
postgresBeginForeignInsert(ModifyTableState * mtstate,ResultRelInfo * resultRelInfo)2114 postgresBeginForeignInsert(ModifyTableState *mtstate,
2115 ResultRelInfo *resultRelInfo)
2116 {
2117 PgFdwModifyState *fmstate;
2118 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2119 EState *estate = mtstate->ps.state;
2120 Index resultRelation;
2121 Relation rel = resultRelInfo->ri_RelationDesc;
2122 RangeTblEntry *rte;
2123 TupleDesc tupdesc = RelationGetDescr(rel);
2124 int attnum;
2125 int values_end_len;
2126 StringInfoData sql;
2127 List *targetAttrs = NIL;
2128 List *retrieved_attrs = NIL;
2129 bool doNothing = false;
2130
2131 /*
2132 * If the foreign table we are about to insert routed rows into is also an
2133 * UPDATE subplan result rel that will be updated later, proceeding with
2134 * the INSERT will result in the later UPDATE incorrectly modifying those
2135 * routed rows, so prevent the INSERT --- it would be nice if we could
2136 * handle this case; but for now, throw an error for safety.
2137 */
2138 if (plan && plan->operation == CMD_UPDATE &&
2139 (resultRelInfo->ri_usesFdwDirectModify ||
2140 resultRelInfo->ri_FdwState))
2141 ereport(ERROR,
2142 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2143 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2144 RelationGetRelationName(rel))));
2145
2146 initStringInfo(&sql);
2147
2148 /* We transmit all columns that are defined in the foreign table. */
2149 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2150 {
2151 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2152
2153 if (!attr->attisdropped)
2154 targetAttrs = lappend_int(targetAttrs, attnum);
2155 }
2156
2157 /* Check if we add the ON CONFLICT clause to the remote query. */
2158 if (plan)
2159 {
2160 OnConflictAction onConflictAction = plan->onConflictAction;
2161
2162 /* We only support DO NOTHING without an inference specification. */
2163 if (onConflictAction == ONCONFLICT_NOTHING)
2164 doNothing = true;
2165 else if (onConflictAction != ONCONFLICT_NONE)
2166 elog(ERROR, "unexpected ON CONFLICT specification: %d",
2167 (int) onConflictAction);
2168 }
2169
2170 /*
2171 * If the foreign table is a partition that doesn't have a corresponding
2172 * RTE entry, we need to create a new RTE describing the foreign table for
2173 * use by deparseInsertSql and create_foreign_modify() below, after first
2174 * copying the parent's RTE and modifying some fields to describe the
2175 * foreign partition to work on. However, if this is invoked by UPDATE,
2176 * the existing RTE may already correspond to this partition if it is one
2177 * of the UPDATE subplan target rels; in that case, we can just use the
2178 * existing RTE as-is.
2179 */
2180 if (resultRelInfo->ri_RangeTableIndex == 0)
2181 {
2182 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2183
2184 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2185 rte = copyObject(rte);
2186 rte->relid = RelationGetRelid(rel);
2187 rte->relkind = RELKIND_FOREIGN_TABLE;
2188
2189 /*
2190 * For UPDATE, we must use the RT index of the first subplan target
2191 * rel's RTE, because the core code would have built expressions for
2192 * the partition, such as RETURNING, using that RT index as varno of
2193 * Vars contained in those expressions.
2194 */
2195 if (plan && plan->operation == CMD_UPDATE &&
2196 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2197 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2198 else
2199 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2200 }
2201 else
2202 {
2203 resultRelation = resultRelInfo->ri_RangeTableIndex;
2204 rte = exec_rt_fetch(resultRelation, estate);
2205 }
2206
2207 /* Construct the SQL command string. */
2208 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2209 resultRelInfo->ri_WithCheckOptions,
2210 resultRelInfo->ri_returningList,
2211 &retrieved_attrs, &values_end_len);
2212
2213 /* Construct an execution state. */
2214 fmstate = create_foreign_modify(mtstate->ps.state,
2215 rte,
2216 resultRelInfo,
2217 CMD_INSERT,
2218 NULL,
2219 sql.data,
2220 targetAttrs,
2221 values_end_len,
2222 retrieved_attrs != NIL,
2223 retrieved_attrs);
2224
2225 /*
2226 * If the given resultRelInfo already has PgFdwModifyState set, it means
2227 * the foreign table is an UPDATE subplan result rel; in which case, store
2228 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2229 */
2230 if (resultRelInfo->ri_FdwState)
2231 {
2232 Assert(plan && plan->operation == CMD_UPDATE);
2233 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2234 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2235 }
2236 else
2237 resultRelInfo->ri_FdwState = fmstate;
2238 }
2239
2240 /*
2241 * postgresEndForeignInsert
2242 * Finish an insert operation on a foreign table
2243 */
2244 static void
postgresEndForeignInsert(EState * estate,ResultRelInfo * resultRelInfo)2245 postgresEndForeignInsert(EState *estate,
2246 ResultRelInfo *resultRelInfo)
2247 {
2248 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2249
2250 Assert(fmstate != NULL);
2251
2252 /*
2253 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2254 * postgresBeginForeignInsert())
2255 */
2256 if (fmstate->aux_fmstate)
2257 fmstate = fmstate->aux_fmstate;
2258
2259 /* Destroy the execution state */
2260 finish_foreign_modify(fmstate);
2261 }
2262
2263 /*
2264 * postgresIsForeignRelUpdatable
2265 * Determine whether a foreign table supports INSERT, UPDATE and/or
2266 * DELETE.
2267 */
2268 static int
postgresIsForeignRelUpdatable(Relation rel)2269 postgresIsForeignRelUpdatable(Relation rel)
2270 {
2271 bool updatable;
2272 ForeignTable *table;
2273 ForeignServer *server;
2274 ListCell *lc;
2275
2276 /*
2277 * By default, all postgres_fdw foreign tables are assumed updatable. This
2278 * can be overridden by a per-server setting, which in turn can be
2279 * overridden by a per-table setting.
2280 */
2281 updatable = true;
2282
2283 table = GetForeignTable(RelationGetRelid(rel));
2284 server = GetForeignServer(table->serverid);
2285
2286 foreach(lc, server->options)
2287 {
2288 DefElem *def = (DefElem *) lfirst(lc);
2289
2290 if (strcmp(def->defname, "updatable") == 0)
2291 updatable = defGetBoolean(def);
2292 }
2293 foreach(lc, table->options)
2294 {
2295 DefElem *def = (DefElem *) lfirst(lc);
2296
2297 if (strcmp(def->defname, "updatable") == 0)
2298 updatable = defGetBoolean(def);
2299 }
2300
2301 /*
2302 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2303 */
2304 return updatable ?
2305 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2306 }
2307
2308 /*
2309 * postgresRecheckForeignScan
2310 * Execute a local join execution plan for a foreign join
2311 */
2312 static bool
postgresRecheckForeignScan(ForeignScanState * node,TupleTableSlot * slot)2313 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2314 {
2315 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2316 PlanState *outerPlan = outerPlanState(node);
2317 TupleTableSlot *result;
2318
2319 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2320 if (scanrelid > 0)
2321 return true;
2322
2323 Assert(outerPlan != NULL);
2324
2325 /* Execute a local join execution plan */
2326 result = ExecProcNode(outerPlan);
2327 if (TupIsNull(result))
2328 return false;
2329
2330 /* Store result in the given slot */
2331 ExecCopySlot(slot, result);
2332
2333 return true;
2334 }
2335
2336 /*
2337 * find_modifytable_subplan
2338 * Helper routine for postgresPlanDirectModify to find the
2339 * ModifyTable subplan node that scans the specified RTI.
2340 *
2341 * Returns NULL if the subplan couldn't be identified. That's not a fatal
2342 * error condition, we just abandon trying to do the update directly.
2343 */
2344 static ForeignScan *
find_modifytable_subplan(PlannerInfo * root,ModifyTable * plan,Index rtindex,int subplan_index)2345 find_modifytable_subplan(PlannerInfo *root,
2346 ModifyTable *plan,
2347 Index rtindex,
2348 int subplan_index)
2349 {
2350 Plan *subplan = outerPlan(plan);
2351
2352 /*
2353 * The cases we support are (1) the desired ForeignScan is the immediate
2354 * child of ModifyTable, or (2) it is the subplan_index'th child of an
2355 * Append node that is the immediate child of ModifyTable. There is no
2356 * point in looking further down, as that would mean that local joins are
2357 * involved, so we can't do the update directly.
2358 *
2359 * There could be a Result atop the Append too, acting to compute the
2360 * UPDATE targetlist values. We ignore that here; the tlist will be
2361 * checked by our caller.
2362 *
2363 * In principle we could examine all the children of the Append, but it's
2364 * currently unlikely that the core planner would generate such a plan
2365 * with the children out-of-order. Moreover, such a search risks costing
2366 * O(N^2) time when there are a lot of children.
2367 */
2368 if (IsA(subplan, Append))
2369 {
2370 Append *appendplan = (Append *) subplan;
2371
2372 if (subplan_index < list_length(appendplan->appendplans))
2373 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2374 }
2375 else if (IsA(subplan, Result) &&
2376 outerPlan(subplan) != NULL &&
2377 IsA(outerPlan(subplan), Append))
2378 {
2379 Append *appendplan = (Append *) outerPlan(subplan);
2380
2381 if (subplan_index < list_length(appendplan->appendplans))
2382 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2383 }
2384
2385 /* Now, have we got a ForeignScan on the desired rel? */
2386 if (IsA(subplan, ForeignScan))
2387 {
2388 ForeignScan *fscan = (ForeignScan *) subplan;
2389
2390 if (bms_is_member(rtindex, fscan->fs_relids))
2391 return fscan;
2392 }
2393
2394 return NULL;
2395 }
2396
2397 /*
2398 * postgresPlanDirectModify
2399 * Consider a direct foreign table modification
2400 *
2401 * Decide whether it is safe to modify a foreign table directly, and if so,
2402 * rewrite subplan accordingly.
2403 */
2404 static bool
postgresPlanDirectModify(PlannerInfo * root,ModifyTable * plan,Index resultRelation,int subplan_index)2405 postgresPlanDirectModify(PlannerInfo *root,
2406 ModifyTable *plan,
2407 Index resultRelation,
2408 int subplan_index)
2409 {
2410 CmdType operation = plan->operation;
2411 RelOptInfo *foreignrel;
2412 RangeTblEntry *rte;
2413 PgFdwRelationInfo *fpinfo;
2414 Relation rel;
2415 StringInfoData sql;
2416 ForeignScan *fscan;
2417 List *processed_tlist = NIL;
2418 List *targetAttrs = NIL;
2419 List *remote_exprs;
2420 List *params_list = NIL;
2421 List *returningList = NIL;
2422 List *retrieved_attrs = NIL;
2423
2424 /*
2425 * Decide whether it is safe to modify a foreign table directly.
2426 */
2427
2428 /*
2429 * The table modification must be an UPDATE or DELETE.
2430 */
2431 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2432 return false;
2433
2434 /*
2435 * Try to locate the ForeignScan subplan that's scanning resultRelation.
2436 */
2437 fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2438 if (!fscan)
2439 return false;
2440
2441 /*
2442 * It's unsafe to modify a foreign table directly if there are any quals
2443 * that should be evaluated locally.
2444 */
2445 if (fscan->scan.plan.qual != NIL)
2446 return false;
2447
2448 /* Safe to fetch data about the target foreign rel */
2449 if (fscan->scan.scanrelid == 0)
2450 {
2451 foreignrel = find_join_rel(root, fscan->fs_relids);
2452 /* We should have a rel for this foreign join. */
2453 Assert(foreignrel);
2454 }
2455 else
2456 foreignrel = root->simple_rel_array[resultRelation];
2457 rte = root->simple_rte_array[resultRelation];
2458 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2459
2460 /*
2461 * It's unsafe to update a foreign table directly, if any expressions to
2462 * assign to the target columns are unsafe to evaluate remotely.
2463 */
2464 if (operation == CMD_UPDATE)
2465 {
2466 ListCell *lc,
2467 *lc2;
2468
2469 /*
2470 * The expressions of concern are the first N columns of the processed
2471 * targetlist, where N is the length of the rel's update_colnos.
2472 */
2473 get_translated_update_targetlist(root, resultRelation,
2474 &processed_tlist, &targetAttrs);
2475 forboth(lc, processed_tlist, lc2, targetAttrs)
2476 {
2477 TargetEntry *tle = lfirst_node(TargetEntry, lc);
2478 AttrNumber attno = lfirst_int(lc2);
2479
2480 /* update's new-value expressions shouldn't be resjunk */
2481 Assert(!tle->resjunk);
2482
2483 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2484 elog(ERROR, "system-column update is not supported");
2485
2486 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2487 return false;
2488 }
2489 }
2490
2491 /*
2492 * Ok, rewrite subplan so as to modify the foreign table directly.
2493 */
2494 initStringInfo(&sql);
2495
2496 /*
2497 * Core code already has some lock on each rel being planned, so we can
2498 * use NoLock here.
2499 */
2500 rel = table_open(rte->relid, NoLock);
2501
2502 /*
2503 * Recall the qual clauses that must be evaluated remotely. (These are
2504 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2505 * doesn't care.)
2506 */
2507 remote_exprs = fpinfo->final_remote_exprs;
2508
2509 /*
2510 * Extract the relevant RETURNING list if any.
2511 */
2512 if (plan->returningLists)
2513 {
2514 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2515
2516 /*
2517 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2518 * we fetch from the foreign server any Vars specified in RETURNING
2519 * that refer not only to the target relation but to non-target
2520 * relations. So we'll deparse them into the RETURNING clause of the
2521 * remote query; use a targetlist consisting of them instead, which
2522 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2523 * node below.
2524 */
2525 if (fscan->scan.scanrelid == 0)
2526 returningList = build_remote_returning(resultRelation, rel,
2527 returningList);
2528 }
2529
2530 /*
2531 * Construct the SQL command string.
2532 */
2533 switch (operation)
2534 {
2535 case CMD_UPDATE:
2536 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2537 foreignrel,
2538 processed_tlist,
2539 targetAttrs,
2540 remote_exprs, ¶ms_list,
2541 returningList, &retrieved_attrs);
2542 break;
2543 case CMD_DELETE:
2544 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2545 foreignrel,
2546 remote_exprs, ¶ms_list,
2547 returningList, &retrieved_attrs);
2548 break;
2549 default:
2550 elog(ERROR, "unexpected operation: %d", (int) operation);
2551 break;
2552 }
2553
2554 /*
2555 * Update the operation and target relation info.
2556 */
2557 fscan->operation = operation;
2558 fscan->resultRelation = resultRelation;
2559
2560 /*
2561 * Update the fdw_exprs list that will be available to the executor.
2562 */
2563 fscan->fdw_exprs = params_list;
2564
2565 /*
2566 * Update the fdw_private list that will be available to the executor.
2567 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2568 */
2569 fscan->fdw_private = list_make4(makeString(sql.data),
2570 makeInteger((retrieved_attrs != NIL)),
2571 retrieved_attrs,
2572 makeInteger(plan->canSetTag));
2573
2574 /*
2575 * Update the foreign-join-related fields.
2576 */
2577 if (fscan->scan.scanrelid == 0)
2578 {
2579 /* No need for the outer subplan. */
2580 fscan->scan.plan.lefttree = NULL;
2581
2582 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2583 if (returningList)
2584 rebuild_fdw_scan_tlist(fscan, returningList);
2585 }
2586
2587 /*
2588 * Finally, unset the async-capable flag if it is set, as we currently
2589 * don't support asynchronous execution of direct modifications.
2590 */
2591 if (fscan->scan.plan.async_capable)
2592 fscan->scan.plan.async_capable = false;
2593
2594 table_close(rel, NoLock);
2595 return true;
2596 }
2597
2598 /*
2599 * postgresBeginDirectModify
2600 * Prepare a direct foreign table modification
2601 */
2602 static void
postgresBeginDirectModify(ForeignScanState * node,int eflags)2603 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2604 {
2605 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2606 EState *estate = node->ss.ps.state;
2607 PgFdwDirectModifyState *dmstate;
2608 Index rtindex;
2609 RangeTblEntry *rte;
2610 Oid userid;
2611 ForeignTable *table;
2612 UserMapping *user;
2613 int numParams;
2614
2615 /*
2616 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2617 */
2618 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2619 return;
2620
2621 /*
2622 * We'll save private state in node->fdw_state.
2623 */
2624 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2625 node->fdw_state = (void *) dmstate;
2626
2627 /*
2628 * Identify which user to do the remote access as. This should match what
2629 * ExecCheckRTEPerms() does.
2630 */
2631 rtindex = node->resultRelInfo->ri_RangeTableIndex;
2632 rte = exec_rt_fetch(rtindex, estate);
2633 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2634
2635 /* Get info about foreign table. */
2636 if (fsplan->scan.scanrelid == 0)
2637 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2638 else
2639 dmstate->rel = node->ss.ss_currentRelation;
2640 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2641 user = GetUserMapping(userid, table->serverid);
2642
2643 /*
2644 * Get connection to the foreign server. Connection manager will
2645 * establish new connection if necessary.
2646 */
2647 dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2648
2649 /* Update the foreign-join-related fields. */
2650 if (fsplan->scan.scanrelid == 0)
2651 {
2652 /* Save info about foreign table. */
2653 dmstate->resultRel = dmstate->rel;
2654
2655 /*
2656 * Set dmstate->rel to NULL to teach get_returning_data() and
2657 * make_tuple_from_result_row() that columns fetched from the remote
2658 * server are described by fdw_scan_tlist of the foreign-scan plan
2659 * node, not the tuple descriptor for the target relation.
2660 */
2661 dmstate->rel = NULL;
2662 }
2663
2664 /* Initialize state variable */
2665 dmstate->num_tuples = -1; /* -1 means not set yet */
2666
2667 /* Get private info created by planner functions. */
2668 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2669 FdwDirectModifyPrivateUpdateSql));
2670 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2671 FdwDirectModifyPrivateHasReturning));
2672 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2673 FdwDirectModifyPrivateRetrievedAttrs);
2674 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2675 FdwDirectModifyPrivateSetProcessed));
2676
2677 /* Create context for per-tuple temp workspace. */
2678 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2679 "postgres_fdw temporary data",
2680 ALLOCSET_SMALL_SIZES);
2681
2682 /* Prepare for input conversion of RETURNING results. */
2683 if (dmstate->has_returning)
2684 {
2685 TupleDesc tupdesc;
2686
2687 if (fsplan->scan.scanrelid == 0)
2688 tupdesc = get_tupdesc_for_join_scan_tuples(node);
2689 else
2690 tupdesc = RelationGetDescr(dmstate->rel);
2691
2692 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2693
2694 /*
2695 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2696 * initialize a filter to extract an updated/deleted tuple from a scan
2697 * tuple.
2698 */
2699 if (fsplan->scan.scanrelid == 0)
2700 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2701 }
2702
2703 /*
2704 * Prepare for processing of parameters used in remote query, if any.
2705 */
2706 numParams = list_length(fsplan->fdw_exprs);
2707 dmstate->numParams = numParams;
2708 if (numParams > 0)
2709 prepare_query_params((PlanState *) node,
2710 fsplan->fdw_exprs,
2711 numParams,
2712 &dmstate->param_flinfo,
2713 &dmstate->param_exprs,
2714 &dmstate->param_values);
2715 }
2716
2717 /*
2718 * postgresIterateDirectModify
2719 * Execute a direct foreign table modification
2720 */
2721 static TupleTableSlot *
postgresIterateDirectModify(ForeignScanState * node)2722 postgresIterateDirectModify(ForeignScanState *node)
2723 {
2724 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2725 EState *estate = node->ss.ps.state;
2726 ResultRelInfo *resultRelInfo = node->resultRelInfo;
2727
2728 /*
2729 * If this is the first call after Begin, execute the statement.
2730 */
2731 if (dmstate->num_tuples == -1)
2732 execute_dml_stmt(node);
2733
2734 /*
2735 * If the local query doesn't specify RETURNING, just clear tuple slot.
2736 */
2737 if (!resultRelInfo->ri_projectReturning)
2738 {
2739 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2740 Instrumentation *instr = node->ss.ps.instrument;
2741
2742 Assert(!dmstate->has_returning);
2743
2744 /* Increment the command es_processed count if necessary. */
2745 if (dmstate->set_processed)
2746 estate->es_processed += dmstate->num_tuples;
2747
2748 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2749 if (instr)
2750 instr->tuplecount += dmstate->num_tuples;
2751
2752 return ExecClearTuple(slot);
2753 }
2754
2755 /*
2756 * Get the next RETURNING tuple.
2757 */
2758 return get_returning_data(node);
2759 }
2760
2761 /*
2762 * postgresEndDirectModify
2763 * Finish a direct foreign table modification
2764 */
2765 static void
postgresEndDirectModify(ForeignScanState * node)2766 postgresEndDirectModify(ForeignScanState *node)
2767 {
2768 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2769
2770 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2771 if (dmstate == NULL)
2772 return;
2773
2774 /* Release PGresult */
2775 if (dmstate->result)
2776 PQclear(dmstate->result);
2777
2778 /* Release remote connection */
2779 ReleaseConnection(dmstate->conn);
2780 dmstate->conn = NULL;
2781
2782 /* MemoryContext will be deleted automatically. */
2783 }
2784
2785 /*
2786 * postgresExplainForeignScan
2787 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2788 */
2789 static void
postgresExplainForeignScan(ForeignScanState * node,ExplainState * es)2790 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2791 {
2792 ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2793 List *fdw_private = plan->fdw_private;
2794
2795 /*
2796 * Identify foreign scans that are really joins or upper relations. The
2797 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2798 * digit string(s), which are RT indexes, with the correct relation names.
2799 * We do that here, not when the plan is created, because we can't know
2800 * what aliases ruleutils.c will assign at plan creation time.
2801 */
2802 if (list_length(fdw_private) > FdwScanPrivateRelations)
2803 {
2804 StringInfo relations;
2805 char *rawrelations;
2806 char *ptr;
2807 int minrti,
2808 rtoffset;
2809
2810 rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2811
2812 /*
2813 * A difficulty with using a string representation of RT indexes is
2814 * that setrefs.c won't update the string when flattening the
2815 * rangetable. To find out what rtoffset was applied, identify the
2816 * minimum RT index appearing in the string and compare it to the
2817 * minimum member of plan->fs_relids. (We expect all the relids in
2818 * the join will have been offset by the same amount; the Asserts
2819 * below should catch it if that ever changes.)
2820 */
2821 minrti = INT_MAX;
2822 ptr = rawrelations;
2823 while (*ptr)
2824 {
2825 if (isdigit((unsigned char) *ptr))
2826 {
2827 int rti = strtol(ptr, &ptr, 10);
2828
2829 if (rti < minrti)
2830 minrti = rti;
2831 }
2832 else
2833 ptr++;
2834 }
2835 rtoffset = bms_next_member(plan->fs_relids, -1) - minrti;
2836
2837 /* Now we can translate the string */
2838 relations = makeStringInfo();
2839 ptr = rawrelations;
2840 while (*ptr)
2841 {
2842 if (isdigit((unsigned char) *ptr))
2843 {
2844 int rti = strtol(ptr, &ptr, 10);
2845 RangeTblEntry *rte;
2846 char *relname;
2847 char *refname;
2848
2849 rti += rtoffset;
2850 Assert(bms_is_member(rti, plan->fs_relids));
2851 rte = rt_fetch(rti, es->rtable);
2852 Assert(rte->rtekind == RTE_RELATION);
2853 /* This logic should agree with explain.c's ExplainTargetRel */
2854 relname = get_rel_name(rte->relid);
2855 if (es->verbose)
2856 {
2857 char *namespace;
2858
2859 namespace = get_namespace_name(get_rel_namespace(rte->relid));
2860 appendStringInfo(relations, "%s.%s",
2861 quote_identifier(namespace),
2862 quote_identifier(relname));
2863 }
2864 else
2865 appendStringInfoString(relations,
2866 quote_identifier(relname));
2867 refname = (char *) list_nth(es->rtable_names, rti - 1);
2868 if (refname == NULL)
2869 refname = rte->eref->aliasname;
2870 if (strcmp(refname, relname) != 0)
2871 appendStringInfo(relations, " %s",
2872 quote_identifier(refname));
2873 }
2874 else
2875 appendStringInfoChar(relations, *ptr++);
2876 }
2877 ExplainPropertyText("Relations", relations->data, es);
2878 }
2879
2880 /*
2881 * Add remote query, when VERBOSE option is specified.
2882 */
2883 if (es->verbose)
2884 {
2885 char *sql;
2886
2887 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2888 ExplainPropertyText("Remote SQL", sql, es);
2889 }
2890 }
2891
2892 /*
2893 * postgresExplainForeignModify
2894 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2895 */
2896 static void
postgresExplainForeignModify(ModifyTableState * mtstate,ResultRelInfo * rinfo,List * fdw_private,int subplan_index,ExplainState * es)2897 postgresExplainForeignModify(ModifyTableState *mtstate,
2898 ResultRelInfo *rinfo,
2899 List *fdw_private,
2900 int subplan_index,
2901 ExplainState *es)
2902 {
2903 if (es->verbose)
2904 {
2905 char *sql = strVal(list_nth(fdw_private,
2906 FdwModifyPrivateUpdateSql));
2907
2908 ExplainPropertyText("Remote SQL", sql, es);
2909
2910 /*
2911 * For INSERT we should always have batch size >= 1, but UPDATE and
2912 * DELETE don't support batching so don't show the property.
2913 */
2914 if (rinfo->ri_BatchSize > 0)
2915 ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2916 }
2917 }
2918
2919 /*
2920 * postgresExplainDirectModify
2921 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2922 * foreign table directly
2923 */
2924 static void
postgresExplainDirectModify(ForeignScanState * node,ExplainState * es)2925 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2926 {
2927 List *fdw_private;
2928 char *sql;
2929
2930 if (es->verbose)
2931 {
2932 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2933 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2934 ExplainPropertyText("Remote SQL", sql, es);
2935 }
2936 }
2937
2938 /*
2939 * postgresExecForeignTruncate
2940 * Truncate one or more foreign tables
2941 */
2942 static void
postgresExecForeignTruncate(List * rels,DropBehavior behavior,bool restart_seqs)2943 postgresExecForeignTruncate(List *rels,
2944 DropBehavior behavior,
2945 bool restart_seqs)
2946 {
2947 Oid serverid = InvalidOid;
2948 UserMapping *user = NULL;
2949 PGconn *conn = NULL;
2950 StringInfoData sql;
2951 ListCell *lc;
2952 bool server_truncatable = true;
2953
2954 /*
2955 * By default, all postgres_fdw foreign tables are assumed truncatable.
2956 * This can be overridden by a per-server setting, which in turn can be
2957 * overridden by a per-table setting.
2958 */
2959 foreach(lc, rels)
2960 {
2961 ForeignServer *server = NULL;
2962 Relation rel = lfirst(lc);
2963 ForeignTable *table = GetForeignTable(RelationGetRelid(rel));
2964 ListCell *cell;
2965 bool truncatable;
2966
2967 /*
2968 * First time through, determine whether the foreign server allows
2969 * truncates. Since all specified foreign tables are assumed to belong
2970 * to the same foreign server, this result can be used for other
2971 * foreign tables.
2972 */
2973 if (!OidIsValid(serverid))
2974 {
2975 serverid = table->serverid;
2976 server = GetForeignServer(serverid);
2977
2978 foreach(cell, server->options)
2979 {
2980 DefElem *defel = (DefElem *) lfirst(cell);
2981
2982 if (strcmp(defel->defname, "truncatable") == 0)
2983 {
2984 server_truncatable = defGetBoolean(defel);
2985 break;
2986 }
2987 }
2988 }
2989
2990 /*
2991 * Confirm that all specified foreign tables belong to the same
2992 * foreign server.
2993 */
2994 Assert(table->serverid == serverid);
2995
2996 /* Determine whether this foreign table allows truncations */
2997 truncatable = server_truncatable;
2998 foreach(cell, table->options)
2999 {
3000 DefElem *defel = (DefElem *) lfirst(cell);
3001
3002 if (strcmp(defel->defname, "truncatable") == 0)
3003 {
3004 truncatable = defGetBoolean(defel);
3005 break;
3006 }
3007 }
3008
3009 if (!truncatable)
3010 ereport(ERROR,
3011 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3012 errmsg("foreign table \"%s\" does not allow truncates",
3013 RelationGetRelationName(rel))));
3014 }
3015 Assert(OidIsValid(serverid));
3016
3017 /*
3018 * Get connection to the foreign server. Connection manager will
3019 * establish new connection if necessary.
3020 */
3021 user = GetUserMapping(GetUserId(), serverid);
3022 conn = GetConnection(user, false, NULL);
3023
3024 /* Construct the TRUNCATE command string */
3025 initStringInfo(&sql);
3026 deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3027
3028 /* Issue the TRUNCATE command to remote server */
3029 do_sql_command(conn, sql.data);
3030
3031 pfree(sql.data);
3032 }
3033
3034 /*
3035 * estimate_path_cost_size
3036 * Get cost and size estimates for a foreign scan on given foreign relation
3037 * either a base relation or a join between foreign relations or an upper
3038 * relation containing foreign relations.
3039 *
3040 * param_join_conds are the parameterization clauses with outer relations.
3041 * pathkeys specify the expected sort order if any for given path being costed.
3042 * fpextra specifies additional post-scan/join-processing steps such as the
3043 * final sort and the LIMIT restriction.
3044 *
3045 * The function returns the cost and size estimates in p_rows, p_width,
3046 * p_startup_cost and p_total_cost variables.
3047 */
3048 static void
estimate_path_cost_size(PlannerInfo * root,RelOptInfo * foreignrel,List * param_join_conds,List * pathkeys,PgFdwPathExtraData * fpextra,double * p_rows,int * p_width,Cost * p_startup_cost,Cost * p_total_cost)3049 estimate_path_cost_size(PlannerInfo *root,
3050 RelOptInfo *foreignrel,
3051 List *param_join_conds,
3052 List *pathkeys,
3053 PgFdwPathExtraData *fpextra,
3054 double *p_rows, int *p_width,
3055 Cost *p_startup_cost, Cost *p_total_cost)
3056 {
3057 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3058 double rows;
3059 double retrieved_rows;
3060 int width;
3061 Cost startup_cost;
3062 Cost total_cost;
3063
3064 /* Make sure the core code has set up the relation's reltarget */
3065 Assert(foreignrel->reltarget);
3066
3067 /*
3068 * If the table or the server is configured to use remote estimates,
3069 * connect to the foreign server and execute EXPLAIN to estimate the
3070 * number of rows selected by the restriction+join clauses. Otherwise,
3071 * estimate rows using whatever statistics we have locally, in a way
3072 * similar to ordinary tables.
3073 */
3074 if (fpinfo->use_remote_estimate)
3075 {
3076 List *remote_param_join_conds;
3077 List *local_param_join_conds;
3078 StringInfoData sql;
3079 PGconn *conn;
3080 Selectivity local_sel;
3081 QualCost local_cost;
3082 List *fdw_scan_tlist = NIL;
3083 List *remote_conds;
3084
3085 /* Required only to be passed to deparseSelectStmtForRel */
3086 List *retrieved_attrs;
3087
3088 /*
3089 * param_join_conds might contain both clauses that are safe to send
3090 * across, and clauses that aren't.
3091 */
3092 classifyConditions(root, foreignrel, param_join_conds,
3093 &remote_param_join_conds, &local_param_join_conds);
3094
3095 /* Build the list of columns to be fetched from the foreign server. */
3096 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3097 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3098 else
3099 fdw_scan_tlist = NIL;
3100
3101 /*
3102 * The complete list of remote conditions includes everything from
3103 * baserestrictinfo plus any extra join_conds relevant to this
3104 * particular path.
3105 */
3106 remote_conds = list_concat(remote_param_join_conds,
3107 fpinfo->remote_conds);
3108
3109 /*
3110 * Construct EXPLAIN query including the desired SELECT, FROM, and
3111 * WHERE clauses. Params and other-relation Vars are replaced by dummy
3112 * values, so don't request params_list.
3113 */
3114 initStringInfo(&sql);
3115 appendStringInfoString(&sql, "EXPLAIN ");
3116 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3117 remote_conds, pathkeys,
3118 fpextra ? fpextra->has_final_sort : false,
3119 fpextra ? fpextra->has_limit : false,
3120 false, &retrieved_attrs, NULL);
3121
3122 /* Get the remote estimate */
3123 conn = GetConnection(fpinfo->user, false, NULL);
3124 get_remote_estimate(sql.data, conn, &rows, &width,
3125 &startup_cost, &total_cost);
3126 ReleaseConnection(conn);
3127
3128 retrieved_rows = rows;
3129
3130 /* Factor in the selectivity of the locally-checked quals */
3131 local_sel = clauselist_selectivity(root,
3132 local_param_join_conds,
3133 foreignrel->relid,
3134 JOIN_INNER,
3135 NULL);
3136 local_sel *= fpinfo->local_conds_sel;
3137
3138 rows = clamp_row_est(rows * local_sel);
3139
3140 /* Add in the eval cost of the locally-checked quals */
3141 startup_cost += fpinfo->local_conds_cost.startup;
3142 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3143 cost_qual_eval(&local_cost, local_param_join_conds, root);
3144 startup_cost += local_cost.startup;
3145 total_cost += local_cost.per_tuple * retrieved_rows;
3146
3147 /*
3148 * Add in tlist eval cost for each output row. In case of an
3149 * aggregate, some of the tlist expressions such as grouping
3150 * expressions will be evaluated remotely, so adjust the costs.
3151 */
3152 startup_cost += foreignrel->reltarget->cost.startup;
3153 total_cost += foreignrel->reltarget->cost.startup;
3154 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3155 if (IS_UPPER_REL(foreignrel))
3156 {
3157 QualCost tlist_cost;
3158
3159 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3160 startup_cost -= tlist_cost.startup;
3161 total_cost -= tlist_cost.startup;
3162 total_cost -= tlist_cost.per_tuple * rows;
3163 }
3164 }
3165 else
3166 {
3167 Cost run_cost = 0;
3168
3169 /*
3170 * We don't support join conditions in this mode (hence, no
3171 * parameterized paths can be made).
3172 */
3173 Assert(param_join_conds == NIL);
3174
3175 /*
3176 * We will come here again and again with different set of pathkeys or
3177 * additional post-scan/join-processing steps that caller wants to
3178 * cost. We don't need to calculate the cost/size estimates for the
3179 * underlying scan, join, or grouping each time. Instead, use those
3180 * estimates if we have cached them already.
3181 */
3182 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3183 {
3184 Assert(fpinfo->retrieved_rows >= 0);
3185
3186 rows = fpinfo->rows;
3187 retrieved_rows = fpinfo->retrieved_rows;
3188 width = fpinfo->width;
3189 startup_cost = fpinfo->rel_startup_cost;
3190 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3191
3192 /*
3193 * If we estimate the costs of a foreign scan or a foreign join
3194 * with additional post-scan/join-processing steps, the scan or
3195 * join costs obtained from the cache wouldn't yet contain the
3196 * eval costs for the final scan/join target, which would've been
3197 * updated by apply_scanjoin_target_to_paths(); add the eval costs
3198 * now.
3199 */
3200 if (fpextra && !IS_UPPER_REL(foreignrel))
3201 {
3202 /* Shouldn't get here unless we have LIMIT */
3203 Assert(fpextra->has_limit);
3204 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3205 foreignrel->reloptkind == RELOPT_JOINREL);
3206 startup_cost += foreignrel->reltarget->cost.startup;
3207 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3208 }
3209 }
3210 else if (IS_JOIN_REL(foreignrel))
3211 {
3212 PgFdwRelationInfo *fpinfo_i;
3213 PgFdwRelationInfo *fpinfo_o;
3214 QualCost join_cost;
3215 QualCost remote_conds_cost;
3216 double nrows;
3217
3218 /* Use rows/width estimates made by the core code. */
3219 rows = foreignrel->rows;
3220 width = foreignrel->reltarget->width;
3221
3222 /* For join we expect inner and outer relations set */
3223 Assert(fpinfo->innerrel && fpinfo->outerrel);
3224
3225 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3226 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3227
3228 /* Estimate of number of rows in cross product */
3229 nrows = fpinfo_i->rows * fpinfo_o->rows;
3230
3231 /*
3232 * Back into an estimate of the number of retrieved rows. Just in
3233 * case this is nuts, clamp to at most nrows.
3234 */
3235 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3236 retrieved_rows = Min(retrieved_rows, nrows);
3237
3238 /*
3239 * The cost of foreign join is estimated as cost of generating
3240 * rows for the joining relations + cost for applying quals on the
3241 * rows.
3242 */
3243
3244 /*
3245 * Calculate the cost of clauses pushed down to the foreign server
3246 */
3247 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3248 /* Calculate the cost of applying join clauses */
3249 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3250
3251 /*
3252 * Startup cost includes startup cost of joining relations and the
3253 * startup cost for join and other clauses. We do not include the
3254 * startup cost specific to join strategy (e.g. setting up hash
3255 * tables) since we do not know what strategy the foreign server
3256 * is going to use.
3257 */
3258 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3259 startup_cost += join_cost.startup;
3260 startup_cost += remote_conds_cost.startup;
3261 startup_cost += fpinfo->local_conds_cost.startup;
3262
3263 /*
3264 * Run time cost includes:
3265 *
3266 * 1. Run time cost (total_cost - startup_cost) of relations being
3267 * joined
3268 *
3269 * 2. Run time cost of applying join clauses on the cross product
3270 * of the joining relations.
3271 *
3272 * 3. Run time cost of applying pushed down other clauses on the
3273 * result of join
3274 *
3275 * 4. Run time cost of applying nonpushable other clauses locally
3276 * on the result fetched from the foreign server.
3277 */
3278 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3279 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3280 run_cost += nrows * join_cost.per_tuple;
3281 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3282 run_cost += nrows * remote_conds_cost.per_tuple;
3283 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3284
3285 /* Add in tlist eval cost for each output row */
3286 startup_cost += foreignrel->reltarget->cost.startup;
3287 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3288 }
3289 else if (IS_UPPER_REL(foreignrel))
3290 {
3291 RelOptInfo *outerrel = fpinfo->outerrel;
3292 PgFdwRelationInfo *ofpinfo;
3293 AggClauseCosts aggcosts;
3294 double input_rows;
3295 int numGroupCols;
3296 double numGroups = 1;
3297
3298 /* The upper relation should have its outer relation set */
3299 Assert(outerrel);
3300 /* and that outer relation should have its reltarget set */
3301 Assert(outerrel->reltarget);
3302
3303 /*
3304 * This cost model is mixture of costing done for sorted and
3305 * hashed aggregates in cost_agg(). We are not sure which
3306 * strategy will be considered at remote side, thus for
3307 * simplicity, we put all startup related costs in startup_cost
3308 * and all finalization and run cost are added in total_cost.
3309 */
3310
3311 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3312
3313 /* Get rows from input rel */
3314 input_rows = ofpinfo->rows;
3315
3316 /* Collect statistics about aggregates for estimating costs. */
3317 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3318 if (root->parse->hasAggs)
3319 {
3320 get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts);
3321 }
3322
3323 /* Get number of grouping columns and possible number of groups */
3324 numGroupCols = list_length(root->parse->groupClause);
3325 numGroups = estimate_num_groups(root,
3326 get_sortgrouplist_exprs(root->parse->groupClause,
3327 fpinfo->grouped_tlist),
3328 input_rows, NULL, NULL);
3329
3330 /*
3331 * Get the retrieved_rows and rows estimates. If there are HAVING
3332 * quals, account for their selectivity.
3333 */
3334 if (root->parse->havingQual)
3335 {
3336 /* Factor in the selectivity of the remotely-checked quals */
3337 retrieved_rows =
3338 clamp_row_est(numGroups *
3339 clauselist_selectivity(root,
3340 fpinfo->remote_conds,
3341 0,
3342 JOIN_INNER,
3343 NULL));
3344 /* Factor in the selectivity of the locally-checked quals */
3345 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3346 }
3347 else
3348 {
3349 rows = retrieved_rows = numGroups;
3350 }
3351
3352 /* Use width estimate made by the core code. */
3353 width = foreignrel->reltarget->width;
3354
3355 /*-----
3356 * Startup cost includes:
3357 * 1. Startup cost for underneath input relation, adjusted for
3358 * tlist replacement by apply_scanjoin_target_to_paths()
3359 * 2. Cost of performing aggregation, per cost_agg()
3360 *-----
3361 */
3362 startup_cost = ofpinfo->rel_startup_cost;
3363 startup_cost += outerrel->reltarget->cost.startup;
3364 startup_cost += aggcosts.transCost.startup;
3365 startup_cost += aggcosts.transCost.per_tuple * input_rows;
3366 startup_cost += aggcosts.finalCost.startup;
3367 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3368
3369 /*-----
3370 * Run time cost includes:
3371 * 1. Run time cost of underneath input relation, adjusted for
3372 * tlist replacement by apply_scanjoin_target_to_paths()
3373 * 2. Run time cost of performing aggregation, per cost_agg()
3374 *-----
3375 */
3376 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3377 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3378 run_cost += aggcosts.finalCost.per_tuple * numGroups;
3379 run_cost += cpu_tuple_cost * numGroups;
3380
3381 /* Account for the eval cost of HAVING quals, if any */
3382 if (root->parse->havingQual)
3383 {
3384 QualCost remote_cost;
3385
3386 /* Add in the eval cost of the remotely-checked quals */
3387 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3388 startup_cost += remote_cost.startup;
3389 run_cost += remote_cost.per_tuple * numGroups;
3390 /* Add in the eval cost of the locally-checked quals */
3391 startup_cost += fpinfo->local_conds_cost.startup;
3392 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3393 }
3394
3395 /* Add in tlist eval cost for each output row */
3396 startup_cost += foreignrel->reltarget->cost.startup;
3397 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3398 }
3399 else
3400 {
3401 Cost cpu_per_tuple;
3402
3403 /* Use rows/width estimates made by set_baserel_size_estimates. */
3404 rows = foreignrel->rows;
3405 width = foreignrel->reltarget->width;
3406
3407 /*
3408 * Back into an estimate of the number of retrieved rows. Just in
3409 * case this is nuts, clamp to at most foreignrel->tuples.
3410 */
3411 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3412 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3413
3414 /*
3415 * Cost as though this were a seqscan, which is pessimistic. We
3416 * effectively imagine the local_conds are being evaluated
3417 * remotely, too.
3418 */
3419 startup_cost = 0;
3420 run_cost = 0;
3421 run_cost += seq_page_cost * foreignrel->pages;
3422
3423 startup_cost += foreignrel->baserestrictcost.startup;
3424 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3425 run_cost += cpu_per_tuple * foreignrel->tuples;
3426
3427 /* Add in tlist eval cost for each output row */
3428 startup_cost += foreignrel->reltarget->cost.startup;
3429 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3430 }
3431
3432 /*
3433 * Without remote estimates, we have no real way to estimate the cost
3434 * of generating sorted output. It could be free if the query plan
3435 * the remote side would have chosen generates properly-sorted output
3436 * anyway, but in most cases it will cost something. Estimate a value
3437 * high enough that we won't pick the sorted path when the ordering
3438 * isn't locally useful, but low enough that we'll err on the side of
3439 * pushing down the ORDER BY clause when it's useful to do so.
3440 */
3441 if (pathkeys != NIL)
3442 {
3443 if (IS_UPPER_REL(foreignrel))
3444 {
3445 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3446 fpinfo->stage == UPPERREL_GROUP_AGG);
3447 adjust_foreign_grouping_path_cost(root, pathkeys,
3448 retrieved_rows, width,
3449 fpextra->limit_tuples,
3450 &startup_cost, &run_cost);
3451 }
3452 else
3453 {
3454 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3455 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3456 }
3457 }
3458
3459 total_cost = startup_cost + run_cost;
3460
3461 /* Adjust the cost estimates if we have LIMIT */
3462 if (fpextra && fpextra->has_limit)
3463 {
3464 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3465 fpextra->offset_est, fpextra->count_est);
3466 retrieved_rows = rows;
3467 }
3468 }
3469
3470 /*
3471 * If this includes the final sort step, the given target, which will be
3472 * applied to the resulting path, might have different expressions from
3473 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3474 * eval costs.
3475 */
3476 if (fpextra && fpextra->has_final_sort &&
3477 fpextra->target != foreignrel->reltarget)
3478 {
3479 QualCost oldcost = foreignrel->reltarget->cost;
3480 QualCost newcost = fpextra->target->cost;
3481
3482 startup_cost += newcost.startup - oldcost.startup;
3483 total_cost += newcost.startup - oldcost.startup;
3484 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3485 }
3486
3487 /*
3488 * Cache the retrieved rows and cost estimates for scans, joins, or
3489 * groupings without any parameterization, pathkeys, or additional
3490 * post-scan/join-processing steps, before adding the costs for
3491 * transferring data from the foreign server. These estimates are useful
3492 * for costing remote joins involving this relation or costing other
3493 * remote operations on this relation such as remote sorts and remote
3494 * LIMIT restrictions, when the costs can not be obtained from the foreign
3495 * server. This function will be called at least once for every foreign
3496 * relation without any parameterization, pathkeys, or additional
3497 * post-scan/join-processing steps.
3498 */
3499 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3500 {
3501 fpinfo->retrieved_rows = retrieved_rows;
3502 fpinfo->rel_startup_cost = startup_cost;
3503 fpinfo->rel_total_cost = total_cost;
3504 }
3505
3506 /*
3507 * Add some additional cost factors to account for connection overhead
3508 * (fdw_startup_cost), transferring data across the network
3509 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3510 * (cpu_tuple_cost per retrieved row).
3511 */
3512 startup_cost += fpinfo->fdw_startup_cost;
3513 total_cost += fpinfo->fdw_startup_cost;
3514 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3515 total_cost += cpu_tuple_cost * retrieved_rows;
3516
3517 /*
3518 * If we have LIMIT, we should prefer performing the restriction remotely
3519 * rather than locally, as the former avoids extra row fetches from the
3520 * remote that the latter might cause. But since the core code doesn't
3521 * account for such fetches when estimating the costs of the local
3522 * restriction (see create_limit_path()), there would be no difference
3523 * between the costs of the local restriction and the costs of the remote
3524 * restriction estimated above if we don't use remote estimates (except
3525 * for the case where the foreignrel is a grouping relation, the given
3526 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3527 * accounted for in costing the remote restriction). Tweak the costs of
3528 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3529 * one.
3530 */
3531 if (!fpinfo->use_remote_estimate &&
3532 fpextra && fpextra->has_limit &&
3533 fpextra->limit_tuples > 0 &&
3534 fpextra->limit_tuples < fpinfo->rows)
3535 {
3536 Assert(fpinfo->rows > 0);
3537 total_cost -= (total_cost - startup_cost) * 0.05 *
3538 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3539 }
3540
3541 /* Return results. */
3542 *p_rows = rows;
3543 *p_width = width;
3544 *p_startup_cost = startup_cost;
3545 *p_total_cost = total_cost;
3546 }
3547
3548 /*
3549 * Estimate costs of executing a SQL statement remotely.
3550 * The given "sql" must be an EXPLAIN command.
3551 */
3552 static void
get_remote_estimate(const char * sql,PGconn * conn,double * rows,int * width,Cost * startup_cost,Cost * total_cost)3553 get_remote_estimate(const char *sql, PGconn *conn,
3554 double *rows, int *width,
3555 Cost *startup_cost, Cost *total_cost)
3556 {
3557 PGresult *volatile res = NULL;
3558
3559 /* PGresult must be released before leaving this function. */
3560 PG_TRY();
3561 {
3562 char *line;
3563 char *p;
3564 int n;
3565
3566 /*
3567 * Execute EXPLAIN remotely.
3568 */
3569 res = pgfdw_exec_query(conn, sql, NULL);
3570 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3571 pgfdw_report_error(ERROR, res, conn, false, sql);
3572
3573 /*
3574 * Extract cost numbers for topmost plan node. Note we search for a
3575 * left paren from the end of the line to avoid being confused by
3576 * other uses of parentheses.
3577 */
3578 line = PQgetvalue(res, 0, 0);
3579 p = strrchr(line, '(');
3580 if (p == NULL)
3581 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3582 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3583 startup_cost, total_cost, rows, width);
3584 if (n != 4)
3585 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3586 }
3587 PG_FINALLY();
3588 {
3589 if (res)
3590 PQclear(res);
3591 }
3592 PG_END_TRY();
3593 }
3594
3595 /*
3596 * Adjust the cost estimates of a foreign grouping path to include the cost of
3597 * generating properly-sorted output.
3598 */
3599 static void
adjust_foreign_grouping_path_cost(PlannerInfo * root,List * pathkeys,double retrieved_rows,double width,double limit_tuples,Cost * p_startup_cost,Cost * p_run_cost)3600 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3601 List *pathkeys,
3602 double retrieved_rows,
3603 double width,
3604 double limit_tuples,
3605 Cost *p_startup_cost,
3606 Cost *p_run_cost)
3607 {
3608 /*
3609 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3610 * side is unlikely to generate properly-sorted output, so it would need
3611 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3612 * if the GROUP BY clause is sort-able but isn't a superset of the given
3613 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3614 * costs by applying the same heuristic as for the scan or join case.
3615 */
3616 if (!grouping_is_sortable(root->parse->groupClause) ||
3617 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3618 {
3619 Path sort_path; /* dummy for result of cost_sort */
3620
3621 cost_sort(&sort_path,
3622 root,
3623 pathkeys,
3624 *p_startup_cost + *p_run_cost,
3625 retrieved_rows,
3626 width,
3627 0.0,
3628 work_mem,
3629 limit_tuples);
3630
3631 *p_startup_cost = sort_path.startup_cost;
3632 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3633 }
3634 else
3635 {
3636 /*
3637 * The default extra cost seems too large for foreign-grouping cases;
3638 * add 1/4th of that default.
3639 */
3640 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3641 - 1.0) * 0.25;
3642
3643 *p_startup_cost *= sort_multiplier;
3644 *p_run_cost *= sort_multiplier;
3645 }
3646 }
3647
3648 /*
3649 * Detect whether we want to process an EquivalenceClass member.
3650 *
3651 * This is a callback for use by generate_implied_equalities_for_column.
3652 */
3653 static bool
ec_member_matches_foreign(PlannerInfo * root,RelOptInfo * rel,EquivalenceClass * ec,EquivalenceMember * em,void * arg)3654 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3655 EquivalenceClass *ec, EquivalenceMember *em,
3656 void *arg)
3657 {
3658 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3659 Expr *expr = em->em_expr;
3660
3661 /*
3662 * If we've identified what we're processing in the current scan, we only
3663 * want to match that expression.
3664 */
3665 if (state->current != NULL)
3666 return equal(expr, state->current);
3667
3668 /*
3669 * Otherwise, ignore anything we've already processed.
3670 */
3671 if (list_member(state->already_used, expr))
3672 return false;
3673
3674 /* This is the new target to process. */
3675 state->current = expr;
3676 return true;
3677 }
3678
3679 /*
3680 * Create cursor for node's query with current parameter values.
3681 */
3682 static void
create_cursor(ForeignScanState * node)3683 create_cursor(ForeignScanState *node)
3684 {
3685 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3686 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3687 int numParams = fsstate->numParams;
3688 const char **values = fsstate->param_values;
3689 PGconn *conn = fsstate->conn;
3690 StringInfoData buf;
3691 PGresult *res;
3692
3693 /* First, process a pending asynchronous request, if any. */
3694 if (fsstate->conn_state->pendingAreq)
3695 process_pending_request(fsstate->conn_state->pendingAreq);
3696
3697 /*
3698 * Construct array of query parameter values in text format. We do the
3699 * conversions in the short-lived per-tuple context, so as not to cause a
3700 * memory leak over repeated scans.
3701 */
3702 if (numParams > 0)
3703 {
3704 MemoryContext oldcontext;
3705
3706 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3707
3708 process_query_params(econtext,
3709 fsstate->param_flinfo,
3710 fsstate->param_exprs,
3711 values);
3712
3713 MemoryContextSwitchTo(oldcontext);
3714 }
3715
3716 /* Construct the DECLARE CURSOR command */
3717 initStringInfo(&buf);
3718 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3719 fsstate->cursor_number, fsstate->query);
3720
3721 /*
3722 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3723 * to infer types for all parameters. Since we explicitly cast every
3724 * parameter (see deparse.c), the "inference" is trivial and will produce
3725 * the desired result. This allows us to avoid assuming that the remote
3726 * server has the same OIDs we do for the parameters' types.
3727 */
3728 if (!PQsendQueryParams(conn, buf.data, numParams,
3729 NULL, values, NULL, NULL, 0))
3730 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3731
3732 /*
3733 * Get the result, and check for success.
3734 *
3735 * We don't use a PG_TRY block here, so be careful not to throw error
3736 * without releasing the PGresult.
3737 */
3738 res = pgfdw_get_result(conn, buf.data);
3739 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3740 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3741 PQclear(res);
3742
3743 /* Mark the cursor as created, and show no tuples have been retrieved */
3744 fsstate->cursor_exists = true;
3745 fsstate->tuples = NULL;
3746 fsstate->num_tuples = 0;
3747 fsstate->next_tuple = 0;
3748 fsstate->fetch_ct_2 = 0;
3749 fsstate->eof_reached = false;
3750
3751 /* Clean up */
3752 pfree(buf.data);
3753 }
3754
3755 /*
3756 * Fetch some more rows from the node's cursor.
3757 */
3758 static void
fetch_more_data(ForeignScanState * node)3759 fetch_more_data(ForeignScanState *node)
3760 {
3761 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3762 PGresult *volatile res = NULL;
3763 MemoryContext oldcontext;
3764
3765 /*
3766 * We'll store the tuples in the batch_cxt. First, flush the previous
3767 * batch.
3768 */
3769 fsstate->tuples = NULL;
3770 MemoryContextReset(fsstate->batch_cxt);
3771 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3772
3773 /* PGresult must be released before leaving this function. */
3774 PG_TRY();
3775 {
3776 PGconn *conn = fsstate->conn;
3777 int numrows;
3778 int i;
3779
3780 if (fsstate->async_capable)
3781 {
3782 Assert(fsstate->conn_state->pendingAreq);
3783
3784 /*
3785 * The query was already sent by an earlier call to
3786 * fetch_more_data_begin. So now we just fetch the result.
3787 */
3788 res = pgfdw_get_result(conn, fsstate->query);
3789 /* On error, report the original query, not the FETCH. */
3790 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3791 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3792
3793 /* Reset per-connection state */
3794 fsstate->conn_state->pendingAreq = NULL;
3795 }
3796 else
3797 {
3798 char sql[64];
3799
3800 /* This is a regular synchronous fetch. */
3801 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3802 fsstate->fetch_size, fsstate->cursor_number);
3803
3804 res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3805 /* On error, report the original query, not the FETCH. */
3806 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3807 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3808 }
3809
3810 /* Convert the data into HeapTuples */
3811 numrows = PQntuples(res);
3812 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3813 fsstate->num_tuples = numrows;
3814 fsstate->next_tuple = 0;
3815
3816 for (i = 0; i < numrows; i++)
3817 {
3818 Assert(IsA(node->ss.ps.plan, ForeignScan));
3819
3820 fsstate->tuples[i] =
3821 make_tuple_from_result_row(res, i,
3822 fsstate->rel,
3823 fsstate->attinmeta,
3824 fsstate->retrieved_attrs,
3825 node,
3826 fsstate->temp_cxt);
3827 }
3828
3829 /* Update fetch_ct_2 */
3830 if (fsstate->fetch_ct_2 < 2)
3831 fsstate->fetch_ct_2++;
3832
3833 /* Must be EOF if we didn't get as many tuples as we asked for. */
3834 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3835 }
3836 PG_FINALLY();
3837 {
3838 if (res)
3839 PQclear(res);
3840 }
3841 PG_END_TRY();
3842
3843 MemoryContextSwitchTo(oldcontext);
3844 }
3845
3846 /*
3847 * Force assorted GUC parameters to settings that ensure that we'll output
3848 * data values in a form that is unambiguous to the remote server.
3849 *
3850 * This is rather expensive and annoying to do once per row, but there's
3851 * little choice if we want to be sure values are transmitted accurately;
3852 * we can't leave the settings in place between rows for fear of affecting
3853 * user-visible computations.
3854 *
3855 * We use the equivalent of a function SET option to allow the settings to
3856 * persist only until the caller calls reset_transmission_modes(). If an
3857 * error is thrown in between, guc.c will take care of undoing the settings.
3858 *
3859 * The return value is the nestlevel that must be passed to
3860 * reset_transmission_modes() to undo things.
3861 */
3862 int
set_transmission_modes(void)3863 set_transmission_modes(void)
3864 {
3865 int nestlevel = NewGUCNestLevel();
3866
3867 /*
3868 * The values set here should match what pg_dump does. See also
3869 * configure_remote_session in connection.c.
3870 */
3871 if (DateStyle != USE_ISO_DATES)
3872 (void) set_config_option("datestyle", "ISO",
3873 PGC_USERSET, PGC_S_SESSION,
3874 GUC_ACTION_SAVE, true, 0, false);
3875 if (IntervalStyle != INTSTYLE_POSTGRES)
3876 (void) set_config_option("intervalstyle", "postgres",
3877 PGC_USERSET, PGC_S_SESSION,
3878 GUC_ACTION_SAVE, true, 0, false);
3879 if (extra_float_digits < 3)
3880 (void) set_config_option("extra_float_digits", "3",
3881 PGC_USERSET, PGC_S_SESSION,
3882 GUC_ACTION_SAVE, true, 0, false);
3883
3884 return nestlevel;
3885 }
3886
3887 /*
3888 * Undo the effects of set_transmission_modes().
3889 */
3890 void
reset_transmission_modes(int nestlevel)3891 reset_transmission_modes(int nestlevel)
3892 {
3893 AtEOXact_GUC(true, nestlevel);
3894 }
3895
3896 /*
3897 * Utility routine to close a cursor.
3898 */
3899 static void
close_cursor(PGconn * conn,unsigned int cursor_number,PgFdwConnState * conn_state)3900 close_cursor(PGconn *conn, unsigned int cursor_number,
3901 PgFdwConnState *conn_state)
3902 {
3903 char sql[64];
3904 PGresult *res;
3905
3906 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3907
3908 /*
3909 * We don't use a PG_TRY block here, so be careful not to throw error
3910 * without releasing the PGresult.
3911 */
3912 res = pgfdw_exec_query(conn, sql, conn_state);
3913 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3914 pgfdw_report_error(ERROR, res, conn, true, sql);
3915 PQclear(res);
3916 }
3917
3918 /*
3919 * create_foreign_modify
3920 * Construct an execution state of a foreign insert/update/delete
3921 * operation
3922 */
3923 static PgFdwModifyState *
create_foreign_modify(EState * estate,RangeTblEntry * rte,ResultRelInfo * resultRelInfo,CmdType operation,Plan * subplan,char * query,List * target_attrs,int values_end,bool has_returning,List * retrieved_attrs)3924 create_foreign_modify(EState *estate,
3925 RangeTblEntry *rte,
3926 ResultRelInfo *resultRelInfo,
3927 CmdType operation,
3928 Plan *subplan,
3929 char *query,
3930 List *target_attrs,
3931 int values_end,
3932 bool has_returning,
3933 List *retrieved_attrs)
3934 {
3935 PgFdwModifyState *fmstate;
3936 Relation rel = resultRelInfo->ri_RelationDesc;
3937 TupleDesc tupdesc = RelationGetDescr(rel);
3938 Oid userid;
3939 ForeignTable *table;
3940 UserMapping *user;
3941 AttrNumber n_params;
3942 Oid typefnoid;
3943 bool isvarlena;
3944 ListCell *lc;
3945
3946 /* Begin constructing PgFdwModifyState. */
3947 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3948 fmstate->rel = rel;
3949
3950 /*
3951 * Identify which user to do the remote access as. This should match what
3952 * ExecCheckRTEPerms() does.
3953 */
3954 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3955
3956 /* Get info about foreign table. */
3957 table = GetForeignTable(RelationGetRelid(rel));
3958 user = GetUserMapping(userid, table->serverid);
3959
3960 /* Open connection; report that we'll create a prepared statement. */
3961 fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
3962 fmstate->p_name = NULL; /* prepared statement not made yet */
3963
3964 /* Set up remote query information. */
3965 fmstate->query = query;
3966 if (operation == CMD_INSERT)
3967 {
3968 fmstate->query = pstrdup(fmstate->query);
3969 fmstate->orig_query = pstrdup(fmstate->query);
3970 }
3971 fmstate->target_attrs = target_attrs;
3972 fmstate->values_end = values_end;
3973 fmstate->has_returning = has_returning;
3974 fmstate->retrieved_attrs = retrieved_attrs;
3975
3976 /* Create context for per-tuple temp workspace. */
3977 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3978 "postgres_fdw temporary data",
3979 ALLOCSET_SMALL_SIZES);
3980
3981 /* Prepare for input conversion of RETURNING results. */
3982 if (fmstate->has_returning)
3983 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3984
3985 /* Prepare for output conversion of parameters used in prepared stmt. */
3986 n_params = list_length(fmstate->target_attrs) + 1;
3987 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3988 fmstate->p_nums = 0;
3989
3990 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3991 {
3992 Assert(subplan != NULL);
3993
3994 /* Find the ctid resjunk column in the subplan's result */
3995 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3996 "ctid");
3997 if (!AttributeNumberIsValid(fmstate->ctidAttno))
3998 elog(ERROR, "could not find junk ctid column");
3999
4000 /* First transmittable parameter will be ctid */
4001 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4002 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4003 fmstate->p_nums++;
4004 }
4005
4006 if (operation == CMD_INSERT || operation == CMD_UPDATE)
4007 {
4008 /* Set up for remaining transmittable parameters */
4009 foreach(lc, fmstate->target_attrs)
4010 {
4011 int attnum = lfirst_int(lc);
4012 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4013
4014 Assert(!attr->attisdropped);
4015
4016 /* Ignore generated columns; they are set to DEFAULT */
4017 if (attr->attgenerated)
4018 continue;
4019 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4020 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4021 fmstate->p_nums++;
4022 }
4023 }
4024
4025 Assert(fmstate->p_nums <= n_params);
4026
4027 /* Set batch_size from foreign server/table options. */
4028 if (operation == CMD_INSERT)
4029 fmstate->batch_size = get_batch_size_option(rel);
4030
4031 fmstate->num_slots = 1;
4032
4033 /* Initialize auxiliary state */
4034 fmstate->aux_fmstate = NULL;
4035
4036 return fmstate;
4037 }
4038
4039 /*
4040 * execute_foreign_modify
4041 * Perform foreign-table modification as required, and fetch RETURNING
4042 * result if any. (This is the shared guts of postgresExecForeignInsert,
4043 * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4044 * postgresExecForeignDelete.)
4045 */
4046 static TupleTableSlot **
execute_foreign_modify(EState * estate,ResultRelInfo * resultRelInfo,CmdType operation,TupleTableSlot ** slots,TupleTableSlot ** planSlots,int * numSlots)4047 execute_foreign_modify(EState *estate,
4048 ResultRelInfo *resultRelInfo,
4049 CmdType operation,
4050 TupleTableSlot **slots,
4051 TupleTableSlot **planSlots,
4052 int *numSlots)
4053 {
4054 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4055 ItemPointer ctid = NULL;
4056 const char **p_values;
4057 PGresult *res;
4058 int n_rows;
4059 StringInfoData sql;
4060
4061 /* The operation should be INSERT, UPDATE, or DELETE */
4062 Assert(operation == CMD_INSERT ||
4063 operation == CMD_UPDATE ||
4064 operation == CMD_DELETE);
4065
4066 /* First, process a pending asynchronous request, if any. */
4067 if (fmstate->conn_state->pendingAreq)
4068 process_pending_request(fmstate->conn_state->pendingAreq);
4069
4070 /*
4071 * If the existing query was deparsed and prepared for a different number
4072 * of rows, rebuild it for the proper number.
4073 */
4074 if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4075 {
4076 /* Destroy the prepared statement created previously */
4077 if (fmstate->p_name)
4078 deallocate_query(fmstate);
4079
4080 /* Build INSERT string with numSlots records in its VALUES clause. */
4081 initStringInfo(&sql);
4082 rebuildInsertSql(&sql, fmstate->rel,
4083 fmstate->orig_query, fmstate->target_attrs,
4084 fmstate->values_end, fmstate->p_nums,
4085 *numSlots - 1);
4086 pfree(fmstate->query);
4087 fmstate->query = sql.data;
4088 fmstate->num_slots = *numSlots;
4089 }
4090
4091 /* Set up the prepared statement on the remote server, if we didn't yet */
4092 if (!fmstate->p_name)
4093 prepare_foreign_modify(fmstate);
4094
4095 /*
4096 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4097 */
4098 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4099 {
4100 Datum datum;
4101 bool isNull;
4102
4103 datum = ExecGetJunkAttribute(planSlots[0],
4104 fmstate->ctidAttno,
4105 &isNull);
4106 /* shouldn't ever get a null result... */
4107 if (isNull)
4108 elog(ERROR, "ctid is NULL");
4109 ctid = (ItemPointer) DatumGetPointer(datum);
4110 }
4111
4112 /* Convert parameters needed by prepared statement to text form */
4113 p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4114
4115 /*
4116 * Execute the prepared statement.
4117 */
4118 if (!PQsendQueryPrepared(fmstate->conn,
4119 fmstate->p_name,
4120 fmstate->p_nums * (*numSlots),
4121 p_values,
4122 NULL,
4123 NULL,
4124 0))
4125 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4126
4127 /*
4128 * Get the result, and check for success.
4129 *
4130 * We don't use a PG_TRY block here, so be careful not to throw error
4131 * without releasing the PGresult.
4132 */
4133 res = pgfdw_get_result(fmstate->conn, fmstate->query);
4134 if (PQresultStatus(res) !=
4135 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4136 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4137
4138 /* Check number of rows affected, and fetch RETURNING tuple if any */
4139 if (fmstate->has_returning)
4140 {
4141 Assert(*numSlots == 1);
4142 n_rows = PQntuples(res);
4143 if (n_rows > 0)
4144 store_returning_result(fmstate, slots[0], res);
4145 }
4146 else
4147 n_rows = atoi(PQcmdTuples(res));
4148
4149 /* And clean up */
4150 PQclear(res);
4151
4152 MemoryContextReset(fmstate->temp_cxt);
4153
4154 *numSlots = n_rows;
4155
4156 /*
4157 * Return NULL if nothing was inserted/updated/deleted on the remote end
4158 */
4159 return (n_rows > 0) ? slots : NULL;
4160 }
4161
4162 /*
4163 * prepare_foreign_modify
4164 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4165 */
4166 static void
prepare_foreign_modify(PgFdwModifyState * fmstate)4167 prepare_foreign_modify(PgFdwModifyState *fmstate)
4168 {
4169 char prep_name[NAMEDATALEN];
4170 char *p_name;
4171 PGresult *res;
4172
4173 /*
4174 * The caller would already have processed a pending asynchronous request
4175 * if any, so no need to do it here.
4176 */
4177
4178 /* Construct name we'll use for the prepared statement. */
4179 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4180 GetPrepStmtNumber(fmstate->conn));
4181 p_name = pstrdup(prep_name);
4182
4183 /*
4184 * We intentionally do not specify parameter types here, but leave the
4185 * remote server to derive them by default. This avoids possible problems
4186 * with the remote server using different type OIDs than we do. All of
4187 * the prepared statements we use in this module are simple enough that
4188 * the remote server will make the right choices.
4189 */
4190 if (!PQsendPrepare(fmstate->conn,
4191 p_name,
4192 fmstate->query,
4193 0,
4194 NULL))
4195 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4196
4197 /*
4198 * Get the result, and check for success.
4199 *
4200 * We don't use a PG_TRY block here, so be careful not to throw error
4201 * without releasing the PGresult.
4202 */
4203 res = pgfdw_get_result(fmstate->conn, fmstate->query);
4204 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4205 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4206 PQclear(res);
4207
4208 /* This action shows that the prepare has been done. */
4209 fmstate->p_name = p_name;
4210 }
4211
4212 /*
4213 * convert_prep_stmt_params
4214 * Create array of text strings representing parameter values
4215 *
4216 * tupleid is ctid to send, or NULL if none
4217 * slot is slot to get remaining parameters from, or NULL if none
4218 *
4219 * Data is constructed in temp_cxt; caller should reset that after use.
4220 */
4221 static const char **
convert_prep_stmt_params(PgFdwModifyState * fmstate,ItemPointer tupleid,TupleTableSlot ** slots,int numSlots)4222 convert_prep_stmt_params(PgFdwModifyState *fmstate,
4223 ItemPointer tupleid,
4224 TupleTableSlot **slots,
4225 int numSlots)
4226 {
4227 const char **p_values;
4228 int i;
4229 int j;
4230 int pindex = 0;
4231 MemoryContext oldcontext;
4232
4233 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4234
4235 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4236
4237 /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4238 Assert(!(tupleid != NULL && numSlots > 1));
4239
4240 /* 1st parameter should be ctid, if it's in use */
4241 if (tupleid != NULL)
4242 {
4243 Assert(numSlots == 1);
4244 /* don't need set_transmission_modes for TID output */
4245 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4246 PointerGetDatum(tupleid));
4247 pindex++;
4248 }
4249
4250 /* get following parameters from slots */
4251 if (slots != NULL && fmstate->target_attrs != NIL)
4252 {
4253 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4254 int nestlevel;
4255 ListCell *lc;
4256
4257 nestlevel = set_transmission_modes();
4258
4259 for (i = 0; i < numSlots; i++)
4260 {
4261 j = (tupleid != NULL) ? 1 : 0;
4262 foreach(lc, fmstate->target_attrs)
4263 {
4264 int attnum = lfirst_int(lc);
4265 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4266 Datum value;
4267 bool isnull;
4268
4269 /* Ignore generated columns; they are set to DEFAULT */
4270 if (attr->attgenerated)
4271 continue;
4272 value = slot_getattr(slots[i], attnum, &isnull);
4273 if (isnull)
4274 p_values[pindex] = NULL;
4275 else
4276 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4277 value);
4278 pindex++;
4279 j++;
4280 }
4281 }
4282
4283 reset_transmission_modes(nestlevel);
4284 }
4285
4286 Assert(pindex == fmstate->p_nums * numSlots);
4287
4288 MemoryContextSwitchTo(oldcontext);
4289
4290 return p_values;
4291 }
4292
4293 /*
4294 * store_returning_result
4295 * Store the result of a RETURNING clause
4296 *
4297 * On error, be sure to release the PGresult on the way out. Callers do not
4298 * have PG_TRY blocks to ensure this happens.
4299 */
4300 static void
store_returning_result(PgFdwModifyState * fmstate,TupleTableSlot * slot,PGresult * res)4301 store_returning_result(PgFdwModifyState *fmstate,
4302 TupleTableSlot *slot, PGresult *res)
4303 {
4304 PG_TRY();
4305 {
4306 HeapTuple newtup;
4307
4308 newtup = make_tuple_from_result_row(res, 0,
4309 fmstate->rel,
4310 fmstate->attinmeta,
4311 fmstate->retrieved_attrs,
4312 NULL,
4313 fmstate->temp_cxt);
4314
4315 /*
4316 * The returning slot will not necessarily be suitable to store
4317 * heaptuples directly, so allow for conversion.
4318 */
4319 ExecForceStoreHeapTuple(newtup, slot, true);
4320 }
4321 PG_CATCH();
4322 {
4323 if (res)
4324 PQclear(res);
4325 PG_RE_THROW();
4326 }
4327 PG_END_TRY();
4328 }
4329
4330 /*
4331 * finish_foreign_modify
4332 * Release resources for a foreign insert/update/delete operation
4333 */
4334 static void
finish_foreign_modify(PgFdwModifyState * fmstate)4335 finish_foreign_modify(PgFdwModifyState *fmstate)
4336 {
4337 Assert(fmstate != NULL);
4338
4339 /* If we created a prepared statement, destroy it */
4340 deallocate_query(fmstate);
4341
4342 /* Release remote connection */
4343 ReleaseConnection(fmstate->conn);
4344 fmstate->conn = NULL;
4345 }
4346
4347 /*
4348 * deallocate_query
4349 * Deallocate a prepared statement for a foreign insert/update/delete
4350 * operation
4351 */
4352 static void
deallocate_query(PgFdwModifyState * fmstate)4353 deallocate_query(PgFdwModifyState *fmstate)
4354 {
4355 char sql[64];
4356 PGresult *res;
4357
4358 /* do nothing if the query is not allocated */
4359 if (!fmstate->p_name)
4360 return;
4361
4362 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4363
4364 /*
4365 * We don't use a PG_TRY block here, so be careful not to throw error
4366 * without releasing the PGresult.
4367 */
4368 res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4369 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4370 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4371 PQclear(res);
4372 pfree(fmstate->p_name);
4373 fmstate->p_name = NULL;
4374 }
4375
4376 /*
4377 * build_remote_returning
4378 * Build a RETURNING targetlist of a remote query for performing an
4379 * UPDATE/DELETE .. RETURNING on a join directly
4380 */
4381 static List *
build_remote_returning(Index rtindex,Relation rel,List * returningList)4382 build_remote_returning(Index rtindex, Relation rel, List *returningList)
4383 {
4384 bool have_wholerow = false;
4385 List *tlist = NIL;
4386 List *vars;
4387 ListCell *lc;
4388
4389 Assert(returningList);
4390
4391 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4392
4393 /*
4394 * If there's a whole-row reference to the target relation, then we'll
4395 * need all the columns of the relation.
4396 */
4397 foreach(lc, vars)
4398 {
4399 Var *var = (Var *) lfirst(lc);
4400
4401 if (IsA(var, Var) &&
4402 var->varno == rtindex &&
4403 var->varattno == InvalidAttrNumber)
4404 {
4405 have_wholerow = true;
4406 break;
4407 }
4408 }
4409
4410 if (have_wholerow)
4411 {
4412 TupleDesc tupdesc = RelationGetDescr(rel);
4413 int i;
4414
4415 for (i = 1; i <= tupdesc->natts; i++)
4416 {
4417 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4418 Var *var;
4419
4420 /* Ignore dropped attributes. */
4421 if (attr->attisdropped)
4422 continue;
4423
4424 var = makeVar(rtindex,
4425 i,
4426 attr->atttypid,
4427 attr->atttypmod,
4428 attr->attcollation,
4429 0);
4430
4431 tlist = lappend(tlist,
4432 makeTargetEntry((Expr *) var,
4433 list_length(tlist) + 1,
4434 NULL,
4435 false));
4436 }
4437 }
4438
4439 /* Now add any remaining columns to tlist. */
4440 foreach(lc, vars)
4441 {
4442 Var *var = (Var *) lfirst(lc);
4443
4444 /*
4445 * No need for whole-row references to the target relation. We don't
4446 * need system columns other than ctid and oid either, since those are
4447 * set locally.
4448 */
4449 if (IsA(var, Var) &&
4450 var->varno == rtindex &&
4451 var->varattno <= InvalidAttrNumber &&
4452 var->varattno != SelfItemPointerAttributeNumber)
4453 continue; /* don't need it */
4454
4455 if (tlist_member((Expr *) var, tlist))
4456 continue; /* already got it */
4457
4458 tlist = lappend(tlist,
4459 makeTargetEntry((Expr *) var,
4460 list_length(tlist) + 1,
4461 NULL,
4462 false));
4463 }
4464
4465 list_free(vars);
4466
4467 return tlist;
4468 }
4469
4470 /*
4471 * rebuild_fdw_scan_tlist
4472 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4473 * tlist
4474 *
4475 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4476 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4477 * have contained resjunk columns such as 'ctid' of the target relation and
4478 * 'wholerow' of non-target relations, but the tlist might not contain them,
4479 * for example. So, adjust the tlist so it contains all the columns specified
4480 * in the fdw_scan_tlist; else setrefs.c will get confused.
4481 */
4482 static void
rebuild_fdw_scan_tlist(ForeignScan * fscan,List * tlist)4483 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
4484 {
4485 List *new_tlist = tlist;
4486 List *old_tlist = fscan->fdw_scan_tlist;
4487 ListCell *lc;
4488
4489 foreach(lc, old_tlist)
4490 {
4491 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4492
4493 if (tlist_member(tle->expr, new_tlist))
4494 continue; /* already got it */
4495
4496 new_tlist = lappend(new_tlist,
4497 makeTargetEntry(tle->expr,
4498 list_length(new_tlist) + 1,
4499 NULL,
4500 false));
4501 }
4502 fscan->fdw_scan_tlist = new_tlist;
4503 }
4504
4505 /*
4506 * Execute a direct UPDATE/DELETE statement.
4507 */
4508 static void
execute_dml_stmt(ForeignScanState * node)4509 execute_dml_stmt(ForeignScanState *node)
4510 {
4511 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4512 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4513 int numParams = dmstate->numParams;
4514 const char **values = dmstate->param_values;
4515
4516 /* First, process a pending asynchronous request, if any. */
4517 if (dmstate->conn_state->pendingAreq)
4518 process_pending_request(dmstate->conn_state->pendingAreq);
4519
4520 /*
4521 * Construct array of query parameter values in text format.
4522 */
4523 if (numParams > 0)
4524 process_query_params(econtext,
4525 dmstate->param_flinfo,
4526 dmstate->param_exprs,
4527 values);
4528
4529 /*
4530 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4531 * to infer types for all parameters. Since we explicitly cast every
4532 * parameter (see deparse.c), the "inference" is trivial and will produce
4533 * the desired result. This allows us to avoid assuming that the remote
4534 * server has the same OIDs we do for the parameters' types.
4535 */
4536 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4537 NULL, values, NULL, NULL, 0))
4538 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4539
4540 /*
4541 * Get the result, and check for success.
4542 *
4543 * We don't use a PG_TRY block here, so be careful not to throw error
4544 * without releasing the PGresult.
4545 */
4546 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4547 if (PQresultStatus(dmstate->result) !=
4548 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4549 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4550 dmstate->query);
4551
4552 /* Get the number of rows affected. */
4553 if (dmstate->has_returning)
4554 dmstate->num_tuples = PQntuples(dmstate->result);
4555 else
4556 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4557 }
4558
4559 /*
4560 * Get the result of a RETURNING clause.
4561 */
4562 static TupleTableSlot *
get_returning_data(ForeignScanState * node)4563 get_returning_data(ForeignScanState *node)
4564 {
4565 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4566 EState *estate = node->ss.ps.state;
4567 ResultRelInfo *resultRelInfo = node->resultRelInfo;
4568 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4569 TupleTableSlot *resultSlot;
4570
4571 Assert(resultRelInfo->ri_projectReturning);
4572
4573 /* If we didn't get any tuples, must be end of data. */
4574 if (dmstate->next_tuple >= dmstate->num_tuples)
4575 return ExecClearTuple(slot);
4576
4577 /* Increment the command es_processed count if necessary. */
4578 if (dmstate->set_processed)
4579 estate->es_processed += 1;
4580
4581 /*
4582 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4583 * tuple. (has_returning is false when the local query is of the form
4584 * "UPDATE/DELETE .. RETURNING 1" for example.)
4585 */
4586 if (!dmstate->has_returning)
4587 {
4588 ExecStoreAllNullTuple(slot);
4589 resultSlot = slot;
4590 }
4591 else
4592 {
4593 /*
4594 * On error, be sure to release the PGresult on the way out. Callers
4595 * do not have PG_TRY blocks to ensure this happens.
4596 */
4597 PG_TRY();
4598 {
4599 HeapTuple newtup;
4600
4601 newtup = make_tuple_from_result_row(dmstate->result,
4602 dmstate->next_tuple,
4603 dmstate->rel,
4604 dmstate->attinmeta,
4605 dmstate->retrieved_attrs,
4606 node,
4607 dmstate->temp_cxt);
4608 ExecStoreHeapTuple(newtup, slot, false);
4609 }
4610 PG_CATCH();
4611 {
4612 if (dmstate->result)
4613 PQclear(dmstate->result);
4614 PG_RE_THROW();
4615 }
4616 PG_END_TRY();
4617
4618 /* Get the updated/deleted tuple. */
4619 if (dmstate->rel)
4620 resultSlot = slot;
4621 else
4622 resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4623 }
4624 dmstate->next_tuple++;
4625
4626 /* Make slot available for evaluation of the local query RETURNING list. */
4627 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4628 resultSlot;
4629
4630 return slot;
4631 }
4632
4633 /*
4634 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4635 */
4636 static void
init_returning_filter(PgFdwDirectModifyState * dmstate,List * fdw_scan_tlist,Index rtindex)4637 init_returning_filter(PgFdwDirectModifyState *dmstate,
4638 List *fdw_scan_tlist,
4639 Index rtindex)
4640 {
4641 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4642 ListCell *lc;
4643 int i;
4644
4645 /*
4646 * Calculate the mapping between the fdw_scan_tlist's entries and the
4647 * result tuple's attributes.
4648 *
4649 * The "map" is an array of indexes of the result tuple's attributes in
4650 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4651 * tuple. We store zero for any attributes that don't have the
4652 * corresponding entries in that list, marking that a NULL is needed in
4653 * the result tuple.
4654 *
4655 * Also get the indexes of the entries for ctid and oid if any.
4656 */
4657 dmstate->attnoMap = (AttrNumber *)
4658 palloc0(resultTupType->natts * sizeof(AttrNumber));
4659
4660 dmstate->ctidAttno = dmstate->oidAttno = 0;
4661
4662 i = 1;
4663 dmstate->hasSystemCols = false;
4664 foreach(lc, fdw_scan_tlist)
4665 {
4666 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4667 Var *var = (Var *) tle->expr;
4668
4669 Assert(IsA(var, Var));
4670
4671 /*
4672 * If the Var is a column of the target relation to be retrieved from
4673 * the foreign server, get the index of the entry.
4674 */
4675 if (var->varno == rtindex &&
4676 list_member_int(dmstate->retrieved_attrs, i))
4677 {
4678 int attrno = var->varattno;
4679
4680 if (attrno < 0)
4681 {
4682 /*
4683 * We don't retrieve system columns other than ctid and oid.
4684 */
4685 if (attrno == SelfItemPointerAttributeNumber)
4686 dmstate->ctidAttno = i;
4687 else
4688 Assert(false);
4689 dmstate->hasSystemCols = true;
4690 }
4691 else
4692 {
4693 /*
4694 * We don't retrieve whole-row references to the target
4695 * relation either.
4696 */
4697 Assert(attrno > 0);
4698
4699 dmstate->attnoMap[attrno - 1] = i;
4700 }
4701 }
4702 i++;
4703 }
4704 }
4705
4706 /*
4707 * Extract and return an updated/deleted tuple from a scan tuple.
4708 */
4709 static TupleTableSlot *
apply_returning_filter(PgFdwDirectModifyState * dmstate,ResultRelInfo * resultRelInfo,TupleTableSlot * slot,EState * estate)4710 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4711 ResultRelInfo *resultRelInfo,
4712 TupleTableSlot *slot,
4713 EState *estate)
4714 {
4715 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4716 TupleTableSlot *resultSlot;
4717 Datum *values;
4718 bool *isnull;
4719 Datum *old_values;
4720 bool *old_isnull;
4721 int i;
4722
4723 /*
4724 * Use the return tuple slot as a place to store the result tuple.
4725 */
4726 resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4727
4728 /*
4729 * Extract all the values of the scan tuple.
4730 */
4731 slot_getallattrs(slot);
4732 old_values = slot->tts_values;
4733 old_isnull = slot->tts_isnull;
4734
4735 /*
4736 * Prepare to build the result tuple.
4737 */
4738 ExecClearTuple(resultSlot);
4739 values = resultSlot->tts_values;
4740 isnull = resultSlot->tts_isnull;
4741
4742 /*
4743 * Transpose data into proper fields of the result tuple.
4744 */
4745 for (i = 0; i < resultTupType->natts; i++)
4746 {
4747 int j = dmstate->attnoMap[i];
4748
4749 if (j == 0)
4750 {
4751 values[i] = (Datum) 0;
4752 isnull[i] = true;
4753 }
4754 else
4755 {
4756 values[i] = old_values[j - 1];
4757 isnull[i] = old_isnull[j - 1];
4758 }
4759 }
4760
4761 /*
4762 * Build the virtual tuple.
4763 */
4764 ExecStoreVirtualTuple(resultSlot);
4765
4766 /*
4767 * If we have any system columns to return, materialize a heap tuple in
4768 * the slot from column values set above and install system columns in
4769 * that tuple.
4770 */
4771 if (dmstate->hasSystemCols)
4772 {
4773 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4774
4775 /* ctid */
4776 if (dmstate->ctidAttno)
4777 {
4778 ItemPointer ctid = NULL;
4779
4780 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4781 resultTup->t_self = *ctid;
4782 }
4783
4784 /*
4785 * And remaining columns
4786 *
4787 * Note: since we currently don't allow the target relation to appear
4788 * on the nullable side of an outer join, any system columns wouldn't
4789 * go to NULL.
4790 *
4791 * Note: no need to care about tableoid here because it will be
4792 * initialized in ExecProcessReturning().
4793 */
4794 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4795 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4796 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4797 }
4798
4799 /*
4800 * And return the result tuple.
4801 */
4802 return resultSlot;
4803 }
4804
4805 /*
4806 * Prepare for processing of parameters used in remote query.
4807 */
4808 static void
prepare_query_params(PlanState * node,List * fdw_exprs,int numParams,FmgrInfo ** param_flinfo,List ** param_exprs,const char *** param_values)4809 prepare_query_params(PlanState *node,
4810 List *fdw_exprs,
4811 int numParams,
4812 FmgrInfo **param_flinfo,
4813 List **param_exprs,
4814 const char ***param_values)
4815 {
4816 int i;
4817 ListCell *lc;
4818
4819 Assert(numParams > 0);
4820
4821 /* Prepare for output conversion of parameters used in remote query. */
4822 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4823
4824 i = 0;
4825 foreach(lc, fdw_exprs)
4826 {
4827 Node *param_expr = (Node *) lfirst(lc);
4828 Oid typefnoid;
4829 bool isvarlena;
4830
4831 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4832 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4833 i++;
4834 }
4835
4836 /*
4837 * Prepare remote-parameter expressions for evaluation. (Note: in
4838 * practice, we expect that all these expressions will be just Params, so
4839 * we could possibly do something more efficient than using the full
4840 * expression-eval machinery for this. But probably there would be little
4841 * benefit, and it'd require postgres_fdw to know more than is desirable
4842 * about Param evaluation.)
4843 */
4844 *param_exprs = ExecInitExprList(fdw_exprs, node);
4845
4846 /* Allocate buffer for text form of query parameters. */
4847 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4848 }
4849
4850 /*
4851 * Construct array of query parameter values in text format.
4852 */
4853 static void
process_query_params(ExprContext * econtext,FmgrInfo * param_flinfo,List * param_exprs,const char ** param_values)4854 process_query_params(ExprContext *econtext,
4855 FmgrInfo *param_flinfo,
4856 List *param_exprs,
4857 const char **param_values)
4858 {
4859 int nestlevel;
4860 int i;
4861 ListCell *lc;
4862
4863 nestlevel = set_transmission_modes();
4864
4865 i = 0;
4866 foreach(lc, param_exprs)
4867 {
4868 ExprState *expr_state = (ExprState *) lfirst(lc);
4869 Datum expr_value;
4870 bool isNull;
4871
4872 /* Evaluate the parameter expression */
4873 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4874
4875 /*
4876 * Get string representation of each parameter value by invoking
4877 * type-specific output function, unless the value is null.
4878 */
4879 if (isNull)
4880 param_values[i] = NULL;
4881 else
4882 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
4883
4884 i++;
4885 }
4886
4887 reset_transmission_modes(nestlevel);
4888 }
4889
4890 /*
4891 * postgresAnalyzeForeignTable
4892 * Test whether analyzing this foreign table is supported
4893 */
4894 static bool
postgresAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)4895 postgresAnalyzeForeignTable(Relation relation,
4896 AcquireSampleRowsFunc *func,
4897 BlockNumber *totalpages)
4898 {
4899 ForeignTable *table;
4900 UserMapping *user;
4901 PGconn *conn;
4902 StringInfoData sql;
4903 PGresult *volatile res = NULL;
4904
4905 /* Return the row-analysis function pointer */
4906 *func = postgresAcquireSampleRowsFunc;
4907
4908 /*
4909 * Now we have to get the number of pages. It's annoying that the ANALYZE
4910 * API requires us to return that now, because it forces some duplication
4911 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4912 * it's probably not worth redefining that API at this point.
4913 */
4914
4915 /*
4916 * Get the connection to use. We do the remote access as the table's
4917 * owner, even if the ANALYZE was started by some other user.
4918 */
4919 table = GetForeignTable(RelationGetRelid(relation));
4920 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4921 conn = GetConnection(user, false, NULL);
4922
4923 /*
4924 * Construct command to get page count for relation.
4925 */
4926 initStringInfo(&sql);
4927 deparseAnalyzeSizeSql(&sql, relation);
4928
4929 /* In what follows, do not risk leaking any PGresults. */
4930 PG_TRY();
4931 {
4932 res = pgfdw_exec_query(conn, sql.data, NULL);
4933 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4934 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4935
4936 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4937 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4938 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4939 }
4940 PG_FINALLY();
4941 {
4942 if (res)
4943 PQclear(res);
4944 }
4945 PG_END_TRY();
4946
4947 ReleaseConnection(conn);
4948
4949 return true;
4950 }
4951
4952 /*
4953 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4954 *
4955 * We fetch the whole table from the remote side and pick out some sample rows.
4956 *
4957 * Selected rows are returned in the caller-allocated array rows[],
4958 * which must have at least targrows entries.
4959 * The actual number of rows selected is returned as the function result.
4960 * We also count the total number of rows in the table and return it into
4961 * *totalrows. Note that *totaldeadrows is always set to 0.
4962 *
4963 * Note that the returned list of rows is not always in order by physical
4964 * position in the table. Therefore, correlation estimates derived later
4965 * may be meaningless, but it's OK because we don't use the estimates
4966 * currently (the planner only pays attention to correlation for indexscans).
4967 */
4968 static int
postgresAcquireSampleRowsFunc(Relation relation,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)4969 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
4970 HeapTuple *rows, int targrows,
4971 double *totalrows,
4972 double *totaldeadrows)
4973 {
4974 PgFdwAnalyzeState astate;
4975 ForeignTable *table;
4976 ForeignServer *server;
4977 UserMapping *user;
4978 PGconn *conn;
4979 unsigned int cursor_number;
4980 StringInfoData sql;
4981 PGresult *volatile res = NULL;
4982
4983 /* Initialize workspace state */
4984 astate.rel = relation;
4985 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
4986
4987 astate.rows = rows;
4988 astate.targrows = targrows;
4989 astate.numrows = 0;
4990 astate.samplerows = 0;
4991 astate.rowstoskip = -1; /* -1 means not set yet */
4992 reservoir_init_selection_state(&astate.rstate, targrows);
4993
4994 /* Remember ANALYZE context, and create a per-tuple temp context */
4995 astate.anl_cxt = CurrentMemoryContext;
4996 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
4997 "postgres_fdw temporary data",
4998 ALLOCSET_SMALL_SIZES);
4999
5000 /*
5001 * Get the connection to use. We do the remote access as the table's
5002 * owner, even if the ANALYZE was started by some other user.
5003 */
5004 table = GetForeignTable(RelationGetRelid(relation));
5005 server = GetForeignServer(table->serverid);
5006 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5007 conn = GetConnection(user, false, NULL);
5008
5009 /*
5010 * Construct cursor that retrieves whole rows from remote.
5011 */
5012 cursor_number = GetCursorNumber(conn);
5013 initStringInfo(&sql);
5014 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5015 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
5016
5017 /* In what follows, do not risk leaking any PGresults. */
5018 PG_TRY();
5019 {
5020 char fetch_sql[64];
5021 int fetch_size;
5022 ListCell *lc;
5023
5024 res = pgfdw_exec_query(conn, sql.data, NULL);
5025 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5026 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5027 PQclear(res);
5028 res = NULL;
5029
5030 /*
5031 * Determine the fetch size. The default is arbitrary, but shouldn't
5032 * be enormous.
5033 */
5034 fetch_size = 100;
5035 foreach(lc, server->options)
5036 {
5037 DefElem *def = (DefElem *) lfirst(lc);
5038
5039 if (strcmp(def->defname, "fetch_size") == 0)
5040 {
5041 (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5042 break;
5043 }
5044 }
5045 foreach(lc, table->options)
5046 {
5047 DefElem *def = (DefElem *) lfirst(lc);
5048
5049 if (strcmp(def->defname, "fetch_size") == 0)
5050 {
5051 (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5052 break;
5053 }
5054 }
5055
5056 /* Construct command to fetch rows from remote. */
5057 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5058 fetch_size, cursor_number);
5059
5060 /* Retrieve and process rows a batch at a time. */
5061 for (;;)
5062 {
5063 int numrows;
5064 int i;
5065
5066 /* Allow users to cancel long query */
5067 CHECK_FOR_INTERRUPTS();
5068
5069 /*
5070 * XXX possible future improvement: if rowstoskip is large, we
5071 * could issue a MOVE rather than physically fetching the rows,
5072 * then just adjust rowstoskip and samplerows appropriately.
5073 */
5074
5075 /* Fetch some rows */
5076 res = pgfdw_exec_query(conn, fetch_sql, NULL);
5077 /* On error, report the original query, not the FETCH. */
5078 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5079 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5080
5081 /* Process whatever we got. */
5082 numrows = PQntuples(res);
5083 for (i = 0; i < numrows; i++)
5084 analyze_row_processor(res, i, &astate);
5085
5086 PQclear(res);
5087 res = NULL;
5088
5089 /* Must be EOF if we didn't get all the rows requested. */
5090 if (numrows < fetch_size)
5091 break;
5092 }
5093
5094 /* Close the cursor, just to be tidy. */
5095 close_cursor(conn, cursor_number, NULL);
5096 }
5097 PG_CATCH();
5098 {
5099 if (res)
5100 PQclear(res);
5101 PG_RE_THROW();
5102 }
5103 PG_END_TRY();
5104
5105 ReleaseConnection(conn);
5106
5107 /* We assume that we have no dead tuple. */
5108 *totaldeadrows = 0.0;
5109
5110 /* We've retrieved all living tuples from foreign server. */
5111 *totalrows = astate.samplerows;
5112
5113 /*
5114 * Emit some interesting relation info
5115 */
5116 ereport(elevel,
5117 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5118 RelationGetRelationName(relation),
5119 astate.samplerows, astate.numrows)));
5120
5121 return astate.numrows;
5122 }
5123
5124 /*
5125 * Collect sample rows from the result of query.
5126 * - Use all tuples in sample until target # of samples are collected.
5127 * - Subsequently, replace already-sampled tuples randomly.
5128 */
5129 static void
analyze_row_processor(PGresult * res,int row,PgFdwAnalyzeState * astate)5130 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
5131 {
5132 int targrows = astate->targrows;
5133 int pos; /* array index to store tuple in */
5134 MemoryContext oldcontext;
5135
5136 /* Always increment sample row counter. */
5137 astate->samplerows += 1;
5138
5139 /*
5140 * Determine the slot where this sample row should be stored. Set pos to
5141 * negative value to indicate the row should be skipped.
5142 */
5143 if (astate->numrows < targrows)
5144 {
5145 /* First targrows rows are always included into the sample */
5146 pos = astate->numrows++;
5147 }
5148 else
5149 {
5150 /*
5151 * Now we start replacing tuples in the sample until we reach the end
5152 * of the relation. Same algorithm as in acquire_sample_rows in
5153 * analyze.c; see Jeff Vitter's paper.
5154 */
5155 if (astate->rowstoskip < 0)
5156 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5157
5158 if (astate->rowstoskip <= 0)
5159 {
5160 /* Choose a random reservoir element to replace. */
5161 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
5162 Assert(pos >= 0 && pos < targrows);
5163 heap_freetuple(astate->rows[pos]);
5164 }
5165 else
5166 {
5167 /* Skip this tuple. */
5168 pos = -1;
5169 }
5170
5171 astate->rowstoskip -= 1;
5172 }
5173
5174 if (pos >= 0)
5175 {
5176 /*
5177 * Create sample tuple from current result row, and store it in the
5178 * position determined above. The tuple has to be created in anl_cxt.
5179 */
5180 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5181
5182 astate->rows[pos] = make_tuple_from_result_row(res, row,
5183 astate->rel,
5184 astate->attinmeta,
5185 astate->retrieved_attrs,
5186 NULL,
5187 astate->temp_cxt);
5188
5189 MemoryContextSwitchTo(oldcontext);
5190 }
5191 }
5192
5193 /*
5194 * Import a foreign schema
5195 */
5196 static List *
postgresImportForeignSchema(ImportForeignSchemaStmt * stmt,Oid serverOid)5197 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
5198 {
5199 List *commands = NIL;
5200 bool import_collate = true;
5201 bool import_default = false;
5202 bool import_generated = true;
5203 bool import_not_null = true;
5204 ForeignServer *server;
5205 UserMapping *mapping;
5206 PGconn *conn;
5207 StringInfoData buf;
5208 PGresult *volatile res = NULL;
5209 int numrows,
5210 i;
5211 ListCell *lc;
5212
5213 /* Parse statement options */
5214 foreach(lc, stmt->options)
5215 {
5216 DefElem *def = (DefElem *) lfirst(lc);
5217
5218 if (strcmp(def->defname, "import_collate") == 0)
5219 import_collate = defGetBoolean(def);
5220 else if (strcmp(def->defname, "import_default") == 0)
5221 import_default = defGetBoolean(def);
5222 else if (strcmp(def->defname, "import_generated") == 0)
5223 import_generated = defGetBoolean(def);
5224 else if (strcmp(def->defname, "import_not_null") == 0)
5225 import_not_null = defGetBoolean(def);
5226 else
5227 ereport(ERROR,
5228 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5229 errmsg("invalid option \"%s\"", def->defname)));
5230 }
5231
5232 /*
5233 * Get connection to the foreign server. Connection manager will
5234 * establish new connection if necessary.
5235 */
5236 server = GetForeignServer(serverOid);
5237 mapping = GetUserMapping(GetUserId(), server->serverid);
5238 conn = GetConnection(mapping, false, NULL);
5239
5240 /* Don't attempt to import collation if remote server hasn't got it */
5241 if (PQserverVersion(conn) < 90100)
5242 import_collate = false;
5243
5244 /* Create workspace for strings */
5245 initStringInfo(&buf);
5246
5247 /* In what follows, do not risk leaking any PGresults. */
5248 PG_TRY();
5249 {
5250 /* Check that the schema really exists */
5251 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5252 deparseStringLiteral(&buf, stmt->remote_schema);
5253
5254 res = pgfdw_exec_query(conn, buf.data, NULL);
5255 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5256 pgfdw_report_error(ERROR, res, conn, false, buf.data);
5257
5258 if (PQntuples(res) != 1)
5259 ereport(ERROR,
5260 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5261 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5262 stmt->remote_schema, server->servername)));
5263
5264 PQclear(res);
5265 res = NULL;
5266 resetStringInfo(&buf);
5267
5268 /*
5269 * Fetch all table data from this schema, possibly restricted by
5270 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5271 * to EXCEPT/LIMIT TO here, because the core code will filter the
5272 * statements we return according to those lists anyway. But it
5273 * should save a few cycles to not process excluded tables in the
5274 * first place.)
5275 *
5276 * Import table data for partitions only when they are explicitly
5277 * specified in LIMIT TO clause. Otherwise ignore them and only
5278 * include the definitions of the root partitioned tables to allow
5279 * access to the complete remote data set locally in the schema
5280 * imported.
5281 *
5282 * Note: because we run the connection with search_path restricted to
5283 * pg_catalog, the format_type() and pg_get_expr() outputs will always
5284 * include a schema name for types/functions in other schemas, which
5285 * is what we want.
5286 */
5287 appendStringInfoString(&buf,
5288 "SELECT relname, "
5289 " attname, "
5290 " format_type(atttypid, atttypmod), "
5291 " attnotnull, ");
5292
5293 /* Generated columns are supported since Postgres 12 */
5294 if (PQserverVersion(conn) >= 120000)
5295 appendStringInfoString(&buf,
5296 " attgenerated, "
5297 " pg_get_expr(adbin, adrelid), ");
5298 else
5299 appendStringInfoString(&buf,
5300 " NULL, "
5301 " pg_get_expr(adbin, adrelid), ");
5302
5303 if (import_collate)
5304 appendStringInfoString(&buf,
5305 " collname, "
5306 " collnsp.nspname "
5307 "FROM pg_class c "
5308 " JOIN pg_namespace n ON "
5309 " relnamespace = n.oid "
5310 " LEFT JOIN pg_attribute a ON "
5311 " attrelid = c.oid AND attnum > 0 "
5312 " AND NOT attisdropped "
5313 " LEFT JOIN pg_attrdef ad ON "
5314 " adrelid = c.oid AND adnum = attnum "
5315 " LEFT JOIN pg_collation coll ON "
5316 " coll.oid = attcollation "
5317 " LEFT JOIN pg_namespace collnsp ON "
5318 " collnsp.oid = collnamespace ");
5319 else
5320 appendStringInfoString(&buf,
5321 " NULL, NULL "
5322 "FROM pg_class c "
5323 " JOIN pg_namespace n ON "
5324 " relnamespace = n.oid "
5325 " LEFT JOIN pg_attribute a ON "
5326 " attrelid = c.oid AND attnum > 0 "
5327 " AND NOT attisdropped "
5328 " LEFT JOIN pg_attrdef ad ON "
5329 " adrelid = c.oid AND adnum = attnum ");
5330
5331 appendStringInfoString(&buf,
5332 "WHERE c.relkind IN ("
5333 CppAsString2(RELKIND_RELATION) ","
5334 CppAsString2(RELKIND_VIEW) ","
5335 CppAsString2(RELKIND_FOREIGN_TABLE) ","
5336 CppAsString2(RELKIND_MATVIEW) ","
5337 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
5338 " AND n.nspname = ");
5339 deparseStringLiteral(&buf, stmt->remote_schema);
5340
5341 /* Partitions are supported since Postgres 10 */
5342 if (PQserverVersion(conn) >= 100000 &&
5343 stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
5344 appendStringInfoString(&buf, " AND NOT c.relispartition ");
5345
5346 /* Apply restrictions for LIMIT TO and EXCEPT */
5347 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
5348 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5349 {
5350 bool first_item = true;
5351
5352 appendStringInfoString(&buf, " AND c.relname ");
5353 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5354 appendStringInfoString(&buf, "NOT ");
5355 appendStringInfoString(&buf, "IN (");
5356
5357 /* Append list of table names within IN clause */
5358 foreach(lc, stmt->table_list)
5359 {
5360 RangeVar *rv = (RangeVar *) lfirst(lc);
5361
5362 if (first_item)
5363 first_item = false;
5364 else
5365 appendStringInfoString(&buf, ", ");
5366 deparseStringLiteral(&buf, rv->relname);
5367 }
5368 appendStringInfoChar(&buf, ')');
5369 }
5370
5371 /* Append ORDER BY at the end of query to ensure output ordering */
5372 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
5373
5374 /* Fetch the data */
5375 res = pgfdw_exec_query(conn, buf.data, NULL);
5376 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5377 pgfdw_report_error(ERROR, res, conn, false, buf.data);
5378
5379 /* Process results */
5380 numrows = PQntuples(res);
5381 /* note: incrementation of i happens in inner loop's while() test */
5382 for (i = 0; i < numrows;)
5383 {
5384 char *tablename = PQgetvalue(res, i, 0);
5385 bool first_item = true;
5386
5387 resetStringInfo(&buf);
5388 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
5389 quote_identifier(tablename));
5390
5391 /* Scan all rows for this table */
5392 do
5393 {
5394 char *attname;
5395 char *typename;
5396 char *attnotnull;
5397 char *attgenerated;
5398 char *attdefault;
5399 char *collname;
5400 char *collnamespace;
5401
5402 /* If table has no columns, we'll see nulls here */
5403 if (PQgetisnull(res, i, 1))
5404 continue;
5405
5406 attname = PQgetvalue(res, i, 1);
5407 typename = PQgetvalue(res, i, 2);
5408 attnotnull = PQgetvalue(res, i, 3);
5409 attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL :
5410 PQgetvalue(res, i, 4);
5411 attdefault = PQgetisnull(res, i, 5) ? (char *) NULL :
5412 PQgetvalue(res, i, 5);
5413 collname = PQgetisnull(res, i, 6) ? (char *) NULL :
5414 PQgetvalue(res, i, 6);
5415 collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
5416 PQgetvalue(res, i, 7);
5417
5418 if (first_item)
5419 first_item = false;
5420 else
5421 appendStringInfoString(&buf, ",\n");
5422
5423 /* Print column name and type */
5424 appendStringInfo(&buf, " %s %s",
5425 quote_identifier(attname),
5426 typename);
5427
5428 /*
5429 * Add column_name option so that renaming the foreign table's
5430 * column doesn't break the association to the underlying
5431 * column.
5432 */
5433 appendStringInfoString(&buf, " OPTIONS (column_name ");
5434 deparseStringLiteral(&buf, attname);
5435 appendStringInfoChar(&buf, ')');
5436
5437 /* Add COLLATE if needed */
5438 if (import_collate && collname != NULL && collnamespace != NULL)
5439 appendStringInfo(&buf, " COLLATE %s.%s",
5440 quote_identifier(collnamespace),
5441 quote_identifier(collname));
5442
5443 /* Add DEFAULT if needed */
5444 if (import_default && attdefault != NULL &&
5445 (!attgenerated || !attgenerated[0]))
5446 appendStringInfo(&buf, " DEFAULT %s", attdefault);
5447
5448 /* Add GENERATED if needed */
5449 if (import_generated && attgenerated != NULL &&
5450 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
5451 {
5452 Assert(attdefault != NULL);
5453 appendStringInfo(&buf,
5454 " GENERATED ALWAYS AS (%s) STORED",
5455 attdefault);
5456 }
5457
5458 /* Add NOT NULL if needed */
5459 if (import_not_null && attnotnull[0] == 't')
5460 appendStringInfoString(&buf, " NOT NULL");
5461 }
5462 while (++i < numrows &&
5463 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
5464
5465 /*
5466 * Add server name and table-level options. We specify remote
5467 * schema and table name as options (the latter to ensure that
5468 * renaming the foreign table doesn't break the association).
5469 */
5470 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5471 quote_identifier(server->servername));
5472
5473 appendStringInfoString(&buf, "schema_name ");
5474 deparseStringLiteral(&buf, stmt->remote_schema);
5475 appendStringInfoString(&buf, ", table_name ");
5476 deparseStringLiteral(&buf, tablename);
5477
5478 appendStringInfoString(&buf, ");");
5479
5480 commands = lappend(commands, pstrdup(buf.data));
5481 }
5482 }
5483 PG_FINALLY();
5484 {
5485 if (res)
5486 PQclear(res);
5487 }
5488 PG_END_TRY();
5489
5490 ReleaseConnection(conn);
5491
5492 return commands;
5493 }
5494
5495 /*
5496 * Assess whether the join between inner and outer relations can be pushed down
5497 * to the foreign server. As a side effect, save information we obtain in this
5498 * function to PgFdwRelationInfo passed in.
5499 */
5500 static bool
foreign_join_ok(PlannerInfo * root,RelOptInfo * joinrel,JoinType jointype,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinPathExtraData * extra)5501 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
5502 RelOptInfo *outerrel, RelOptInfo *innerrel,
5503 JoinPathExtraData *extra)
5504 {
5505 PgFdwRelationInfo *fpinfo;
5506 PgFdwRelationInfo *fpinfo_o;
5507 PgFdwRelationInfo *fpinfo_i;
5508 ListCell *lc;
5509 List *joinclauses;
5510
5511 /*
5512 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
5513 * Constructing queries representing SEMI and ANTI joins is hard, hence
5514 * not considered right now.
5515 */
5516 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5517 jointype != JOIN_RIGHT && jointype != JOIN_FULL)
5518 return false;
5519
5520 /*
5521 * If either of the joining relations is marked as unsafe to pushdown, the
5522 * join can not be pushed down.
5523 */
5524 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5525 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5526 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5527 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5528 !fpinfo_i || !fpinfo_i->pushdown_safe)
5529 return false;
5530
5531 /*
5532 * If joining relations have local conditions, those conditions are
5533 * required to be applied before joining the relations. Hence the join can
5534 * not be pushed down.
5535 */
5536 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5537 return false;
5538
5539 /*
5540 * Merge FDW options. We might be tempted to do this after we have deemed
5541 * the foreign join to be OK. But we must do this beforehand so that we
5542 * know which quals can be evaluated on the foreign server, which might
5543 * depend on shippable_extensions.
5544 */
5545 fpinfo->server = fpinfo_o->server;
5546 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5547
5548 /*
5549 * Separate restrict list into join quals and pushed-down (other) quals.
5550 *
5551 * Join quals belonging to an outer join must all be shippable, else we
5552 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5553 *
5554 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5555 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5556 * either locally or remotely; the same is true for pushed-down conditions
5557 * at an outer join.
5558 *
5559 * Note we might return failure after having already scribbled on
5560 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5561 * won't consult those lists again if we deem the join unshippable.
5562 */
5563 joinclauses = NIL;
5564 foreach(lc, extra->restrictlist)
5565 {
5566 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5567 bool is_remote_clause = is_foreign_expr(root, joinrel,
5568 rinfo->clause);
5569
5570 if (IS_OUTER_JOIN(jointype) &&
5571 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5572 {
5573 if (!is_remote_clause)
5574 return false;
5575 joinclauses = lappend(joinclauses, rinfo);
5576 }
5577 else
5578 {
5579 if (is_remote_clause)
5580 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5581 else
5582 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5583 }
5584 }
5585
5586 /*
5587 * deparseExplicitTargetList() isn't smart enough to handle anything other
5588 * than a Var. In particular, if there's some PlaceHolderVar that would
5589 * need to be evaluated within this join tree (because there's an upper
5590 * reference to a quantity that may go to NULL as a result of an outer
5591 * join), then we can't try to push the join down because we'll fail when
5592 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5593 * needs to be evaluated *at the top* of this join tree is OK, because we
5594 * can do that locally after fetching the results from the remote side.
5595 */
5596 foreach(lc, root->placeholder_list)
5597 {
5598 PlaceHolderInfo *phinfo = lfirst(lc);
5599 Relids relids;
5600
5601 /* PlaceHolderInfo refers to parent relids, not child relids. */
5602 relids = IS_OTHER_REL(joinrel) ?
5603 joinrel->top_parent_relids : joinrel->relids;
5604
5605 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5606 bms_nonempty_difference(relids, phinfo->ph_eval_at))
5607 return false;
5608 }
5609
5610 /* Save the join clauses, for later use. */
5611 fpinfo->joinclauses = joinclauses;
5612
5613 fpinfo->outerrel = outerrel;
5614 fpinfo->innerrel = innerrel;
5615 fpinfo->jointype = jointype;
5616
5617 /*
5618 * By default, both the input relations are not required to be deparsed as
5619 * subqueries, but there might be some relations covered by the input
5620 * relations that are required to be deparsed as subqueries, so save the
5621 * relids of those relations for later use by the deparser.
5622 */
5623 fpinfo->make_outerrel_subquery = false;
5624 fpinfo->make_innerrel_subquery = false;
5625 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5626 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5627 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5628 fpinfo_i->lower_subquery_rels);
5629
5630 /*
5631 * Pull the other remote conditions from the joining relations into join
5632 * clauses or other remote clauses (remote_conds) of this relation
5633 * wherever possible. This avoids building subqueries at every join step.
5634 *
5635 * For an inner join, clauses from both the relations are added to the
5636 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5637 * the outer side are added to remote_conds since those can be evaluated
5638 * after the join is evaluated. The clauses from inner side are added to
5639 * the joinclauses, since they need to be evaluated while constructing the
5640 * join.
5641 *
5642 * For a FULL OUTER JOIN, the other clauses from either relation can not
5643 * be added to the joinclauses or remote_conds, since each relation acts
5644 * as an outer relation for the other.
5645 *
5646 * The joining sides can not have local conditions, thus no need to test
5647 * shippability of the clauses being pulled up.
5648 */
5649 switch (jointype)
5650 {
5651 case JOIN_INNER:
5652 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5653 fpinfo_i->remote_conds);
5654 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5655 fpinfo_o->remote_conds);
5656 break;
5657
5658 case JOIN_LEFT:
5659 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5660 fpinfo_i->remote_conds);
5661 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5662 fpinfo_o->remote_conds);
5663 break;
5664
5665 case JOIN_RIGHT:
5666 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5667 fpinfo_o->remote_conds);
5668 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5669 fpinfo_i->remote_conds);
5670 break;
5671
5672 case JOIN_FULL:
5673
5674 /*
5675 * In this case, if any of the input relations has conditions, we
5676 * need to deparse that relation as a subquery so that the
5677 * conditions can be evaluated before the join. Remember it in
5678 * the fpinfo of this relation so that the deparser can take
5679 * appropriate action. Also, save the relids of base relations
5680 * covered by that relation for later use by the deparser.
5681 */
5682 if (fpinfo_o->remote_conds)
5683 {
5684 fpinfo->make_outerrel_subquery = true;
5685 fpinfo->lower_subquery_rels =
5686 bms_add_members(fpinfo->lower_subquery_rels,
5687 outerrel->relids);
5688 }
5689 if (fpinfo_i->remote_conds)
5690 {
5691 fpinfo->make_innerrel_subquery = true;
5692 fpinfo->lower_subquery_rels =
5693 bms_add_members(fpinfo->lower_subquery_rels,
5694 innerrel->relids);
5695 }
5696 break;
5697
5698 default:
5699 /* Should not happen, we have just checked this above */
5700 elog(ERROR, "unsupported join type %d", jointype);
5701 }
5702
5703 /*
5704 * For an inner join, all restrictions can be treated alike. Treating the
5705 * pushed down conditions as join conditions allows a top level full outer
5706 * join to be deparsed without requiring subqueries.
5707 */
5708 if (jointype == JOIN_INNER)
5709 {
5710 Assert(!fpinfo->joinclauses);
5711 fpinfo->joinclauses = fpinfo->remote_conds;
5712 fpinfo->remote_conds = NIL;
5713 }
5714
5715 /* Mark that this join can be pushed down safely */
5716 fpinfo->pushdown_safe = true;
5717
5718 /* Get user mapping */
5719 if (fpinfo->use_remote_estimate)
5720 {
5721 if (fpinfo_o->use_remote_estimate)
5722 fpinfo->user = fpinfo_o->user;
5723 else
5724 fpinfo->user = fpinfo_i->user;
5725 }
5726 else
5727 fpinfo->user = NULL;
5728
5729 /*
5730 * Set # of retrieved rows and cached relation costs to some negative
5731 * value, so that we can detect when they are set to some sensible values,
5732 * during one (usually the first) of the calls to estimate_path_cost_size.
5733 */
5734 fpinfo->retrieved_rows = -1;
5735 fpinfo->rel_startup_cost = -1;
5736 fpinfo->rel_total_cost = -1;
5737
5738 /*
5739 * Set the string describing this join relation to be used in EXPLAIN
5740 * output of corresponding ForeignScan. Note that the decoration we add
5741 * to the base relation names mustn't include any digits, or it'll confuse
5742 * postgresExplainForeignScan.
5743 */
5744 fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
5745 fpinfo_o->relation_name,
5746 get_jointype_name(fpinfo->jointype),
5747 fpinfo_i->relation_name);
5748
5749 /*
5750 * Set the relation index. This is defined as the position of this
5751 * joinrel in the join_rel_list list plus the length of the rtable list.
5752 * Note that since this joinrel is at the end of the join_rel_list list
5753 * when we are called, we can get the position by list_length.
5754 */
5755 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
5756 fpinfo->relation_index =
5757 list_length(root->parse->rtable) + list_length(root->join_rel_list);
5758
5759 return true;
5760 }
5761
5762 static void
add_paths_with_pathkeys_for_rel(PlannerInfo * root,RelOptInfo * rel,Path * epq_path)5763 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
5764 Path *epq_path)
5765 {
5766 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
5767 ListCell *lc;
5768
5769 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
5770
5771 /* Create one path for each set of pathkeys we found above. */
5772 foreach(lc, useful_pathkeys_list)
5773 {
5774 double rows;
5775 int width;
5776 Cost startup_cost;
5777 Cost total_cost;
5778 List *useful_pathkeys = lfirst(lc);
5779 Path *sorted_epq_path;
5780
5781 estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
5782 &rows, &width, &startup_cost, &total_cost);
5783
5784 /*
5785 * The EPQ path must be at least as well sorted as the path itself, in
5786 * case it gets used as input to a mergejoin.
5787 */
5788 sorted_epq_path = epq_path;
5789 if (sorted_epq_path != NULL &&
5790 !pathkeys_contained_in(useful_pathkeys,
5791 sorted_epq_path->pathkeys))
5792 sorted_epq_path = (Path *)
5793 create_sort_path(root,
5794 rel,
5795 sorted_epq_path,
5796 useful_pathkeys,
5797 -1.0);
5798
5799 if (IS_SIMPLE_REL(rel))
5800 add_path(rel, (Path *)
5801 create_foreignscan_path(root, rel,
5802 NULL,
5803 rows,
5804 startup_cost,
5805 total_cost,
5806 useful_pathkeys,
5807 rel->lateral_relids,
5808 sorted_epq_path,
5809 NIL));
5810 else
5811 add_path(rel, (Path *)
5812 create_foreign_join_path(root, rel,
5813 NULL,
5814 rows,
5815 startup_cost,
5816 total_cost,
5817 useful_pathkeys,
5818 rel->lateral_relids,
5819 sorted_epq_path,
5820 NIL));
5821 }
5822 }
5823
5824 /*
5825 * Parse options from foreign server and apply them to fpinfo.
5826 *
5827 * New options might also require tweaking merge_fdw_options().
5828 */
5829 static void
apply_server_options(PgFdwRelationInfo * fpinfo)5830 apply_server_options(PgFdwRelationInfo *fpinfo)
5831 {
5832 ListCell *lc;
5833
5834 foreach(lc, fpinfo->server->options)
5835 {
5836 DefElem *def = (DefElem *) lfirst(lc);
5837
5838 if (strcmp(def->defname, "use_remote_estimate") == 0)
5839 fpinfo->use_remote_estimate = defGetBoolean(def);
5840 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
5841 (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0,
5842 NULL);
5843 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
5844 (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0,
5845 NULL);
5846 else if (strcmp(def->defname, "extensions") == 0)
5847 fpinfo->shippable_extensions =
5848 ExtractExtensionList(defGetString(def), false);
5849 else if (strcmp(def->defname, "fetch_size") == 0)
5850 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
5851 else if (strcmp(def->defname, "async_capable") == 0)
5852 fpinfo->async_capable = defGetBoolean(def);
5853 }
5854 }
5855
5856 /*
5857 * Parse options from foreign table and apply them to fpinfo.
5858 *
5859 * New options might also require tweaking merge_fdw_options().
5860 */
5861 static void
apply_table_options(PgFdwRelationInfo * fpinfo)5862 apply_table_options(PgFdwRelationInfo *fpinfo)
5863 {
5864 ListCell *lc;
5865
5866 foreach(lc, fpinfo->table->options)
5867 {
5868 DefElem *def = (DefElem *) lfirst(lc);
5869
5870 if (strcmp(def->defname, "use_remote_estimate") == 0)
5871 fpinfo->use_remote_estimate = defGetBoolean(def);
5872 else if (strcmp(def->defname, "fetch_size") == 0)
5873 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
5874 else if (strcmp(def->defname, "async_capable") == 0)
5875 fpinfo->async_capable = defGetBoolean(def);
5876 }
5877 }
5878
5879 /*
5880 * Merge FDW options from input relations into a new set of options for a join
5881 * or an upper rel.
5882 *
5883 * For a join relation, FDW-specific information about the inner and outer
5884 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
5885 * fpinfo_o provides the information for the input relation; fpinfo_i is
5886 * expected to NULL.
5887 */
5888 static void
merge_fdw_options(PgFdwRelationInfo * fpinfo,const PgFdwRelationInfo * fpinfo_o,const PgFdwRelationInfo * fpinfo_i)5889 merge_fdw_options(PgFdwRelationInfo *fpinfo,
5890 const PgFdwRelationInfo *fpinfo_o,
5891 const PgFdwRelationInfo *fpinfo_i)
5892 {
5893 /* We must always have fpinfo_o. */
5894 Assert(fpinfo_o);
5895
5896 /* fpinfo_i may be NULL, but if present the servers must both match. */
5897 Assert(!fpinfo_i ||
5898 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
5899
5900 /*
5901 * Copy the server specific FDW options. (For a join, both relations come
5902 * from the same server, so the server options should have the same value
5903 * for both relations.)
5904 */
5905 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
5906 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
5907 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
5908 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
5909 fpinfo->fetch_size = fpinfo_o->fetch_size;
5910 fpinfo->async_capable = fpinfo_o->async_capable;
5911
5912 /* Merge the table level options from either side of the join. */
5913 if (fpinfo_i)
5914 {
5915 /*
5916 * We'll prefer to use remote estimates for this join if any table
5917 * from either side of the join is using remote estimates. This is
5918 * most likely going to be preferred since they're already willing to
5919 * pay the price of a round trip to get the remote EXPLAIN. In any
5920 * case it's not entirely clear how we might otherwise handle this
5921 * best.
5922 */
5923 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
5924 fpinfo_i->use_remote_estimate;
5925
5926 /*
5927 * Set fetch size to maximum of the joining sides, since we are
5928 * expecting the rows returned by the join to be proportional to the
5929 * relation sizes.
5930 */
5931 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
5932
5933 /*
5934 * We'll prefer to consider this join async-capable if any table from
5935 * either side of the join is considered async-capable. This would be
5936 * reasonable because in that case the foreign server would have its
5937 * own resources to scan that table asynchronously, and the join could
5938 * also be computed asynchronously using the resources.
5939 */
5940 fpinfo->async_capable = fpinfo_o->async_capable ||
5941 fpinfo_i->async_capable;
5942 }
5943 }
5944
5945 /*
5946 * postgresGetForeignJoinPaths
5947 * Add possible ForeignPath to joinrel, if join is safe to push down.
5948 */
5949 static void
postgresGetForeignJoinPaths(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)5950 postgresGetForeignJoinPaths(PlannerInfo *root,
5951 RelOptInfo *joinrel,
5952 RelOptInfo *outerrel,
5953 RelOptInfo *innerrel,
5954 JoinType jointype,
5955 JoinPathExtraData *extra)
5956 {
5957 PgFdwRelationInfo *fpinfo;
5958 ForeignPath *joinpath;
5959 double rows;
5960 int width;
5961 Cost startup_cost;
5962 Cost total_cost;
5963 Path *epq_path; /* Path to create plan to be executed when
5964 * EvalPlanQual gets triggered. */
5965
5966 /*
5967 * Skip if this join combination has been considered already.
5968 */
5969 if (joinrel->fdw_private)
5970 return;
5971
5972 /*
5973 * This code does not work for joins with lateral references, since those
5974 * must have parameterized paths, which we don't generate yet.
5975 */
5976 if (!bms_is_empty(joinrel->lateral_relids))
5977 return;
5978
5979 /*
5980 * Create unfinished PgFdwRelationInfo entry which is used to indicate
5981 * that the join relation is already considered, so that we won't waste
5982 * time in judging safety of join pushdown and adding the same paths again
5983 * if found safe. Once we know that this join can be pushed down, we fill
5984 * the entry.
5985 */
5986 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
5987 fpinfo->pushdown_safe = false;
5988 joinrel->fdw_private = fpinfo;
5989 /* attrs_used is only for base relations. */
5990 fpinfo->attrs_used = NULL;
5991
5992 /*
5993 * If there is a possibility that EvalPlanQual will be executed, we need
5994 * to be able to reconstruct the row using scans of the base relations.
5995 * GetExistingLocalJoinPath will find a suitable path for this purpose in
5996 * the path list of the joinrel, if one exists. We must be careful to
5997 * call it before adding any ForeignPath, since the ForeignPath might
5998 * dominate the only suitable local path available. We also do it before
5999 * calling foreign_join_ok(), since that function updates fpinfo and marks
6000 * it as pushable if the join is found to be pushable.
6001 */
6002 if (root->parse->commandType == CMD_DELETE ||
6003 root->parse->commandType == CMD_UPDATE ||
6004 root->rowMarks)
6005 {
6006 epq_path = GetExistingLocalJoinPath(joinrel);
6007 if (!epq_path)
6008 {
6009 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
6010 return;
6011 }
6012 }
6013 else
6014 epq_path = NULL;
6015
6016 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
6017 {
6018 /* Free path required for EPQ if we copied one; we don't need it now */
6019 if (epq_path)
6020 pfree(epq_path);
6021 return;
6022 }
6023
6024 /*
6025 * Compute the selectivity and cost of the local_conds, so we don't have
6026 * to do it over again for each path. The best we can do for these
6027 * conditions is to estimate selectivity on the basis of local statistics.
6028 * The local conditions are applied after the join has been computed on
6029 * the remote side like quals in WHERE clause, so pass jointype as
6030 * JOIN_INNER.
6031 */
6032 fpinfo->local_conds_sel = clauselist_selectivity(root,
6033 fpinfo->local_conds,
6034 0,
6035 JOIN_INNER,
6036 NULL);
6037 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6038
6039 /*
6040 * If we are going to estimate costs locally, estimate the join clause
6041 * selectivity here while we have special join info.
6042 */
6043 if (!fpinfo->use_remote_estimate)
6044 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
6045 0, fpinfo->jointype,
6046 extra->sjinfo);
6047
6048 /* Estimate costs for bare join relation */
6049 estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
6050 &rows, &width, &startup_cost, &total_cost);
6051 /* Now update this information in the joinrel */
6052 joinrel->rows = rows;
6053 joinrel->reltarget->width = width;
6054 fpinfo->rows = rows;
6055 fpinfo->width = width;
6056 fpinfo->startup_cost = startup_cost;
6057 fpinfo->total_cost = total_cost;
6058
6059 /*
6060 * Create a new join path and add it to the joinrel which represents a
6061 * join between foreign tables.
6062 */
6063 joinpath = create_foreign_join_path(root,
6064 joinrel,
6065 NULL, /* default pathtarget */
6066 rows,
6067 startup_cost,
6068 total_cost,
6069 NIL, /* no pathkeys */
6070 joinrel->lateral_relids,
6071 epq_path,
6072 NIL); /* no fdw_private */
6073
6074 /* Add generated path into joinrel by add_path(). */
6075 add_path(joinrel, (Path *) joinpath);
6076
6077 /* Consider pathkeys for the join relation */
6078 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
6079
6080 /* XXX Consider parameterized paths for the join relation */
6081 }
6082
6083 /*
6084 * Assess whether the aggregation, grouping and having operations can be pushed
6085 * down to the foreign server. As a side effect, save information we obtain in
6086 * this function to PgFdwRelationInfo of the input relation.
6087 */
6088 static bool
foreign_grouping_ok(PlannerInfo * root,RelOptInfo * grouped_rel,Node * havingQual)6089 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
6090 Node *havingQual)
6091 {
6092 Query *query = root->parse;
6093 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
6094 PathTarget *grouping_target = grouped_rel->reltarget;
6095 PgFdwRelationInfo *ofpinfo;
6096 ListCell *lc;
6097 int i;
6098 List *tlist = NIL;
6099
6100 /* We currently don't support pushing Grouping Sets. */
6101 if (query->groupingSets)
6102 return false;
6103
6104 /* Get the fpinfo of the underlying scan relation. */
6105 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
6106
6107 /*
6108 * If underlying scan relation has any local conditions, those conditions
6109 * are required to be applied before performing aggregation. Hence the
6110 * aggregate cannot be pushed down.
6111 */
6112 if (ofpinfo->local_conds)
6113 return false;
6114
6115 /*
6116 * Examine grouping expressions, as well as other expressions we'd need to
6117 * compute, and check whether they are safe to push down to the foreign
6118 * server. All GROUP BY expressions will be part of the grouping target
6119 * and thus there is no need to search for them separately. Add grouping
6120 * expressions into target list which will be passed to foreign server.
6121 *
6122 * A tricky fine point is that we must not put any expression into the
6123 * target list that is just a foreign param (that is, something that
6124 * deparse.c would conclude has to be sent to the foreign server). If we
6125 * do, the expression will also appear in the fdw_exprs list of the plan
6126 * node, and setrefs.c will get confused and decide that the fdw_exprs
6127 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
6128 * a broken plan. Somewhat oddly, it's OK if the expression contains such
6129 * a node, as long as it's not at top level; then no match is possible.
6130 */
6131 i = 0;
6132 foreach(lc, grouping_target->exprs)
6133 {
6134 Expr *expr = (Expr *) lfirst(lc);
6135 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
6136 ListCell *l;
6137
6138 /* Check whether this expression is part of GROUP BY clause */
6139 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
6140 {
6141 TargetEntry *tle;
6142
6143 /*
6144 * If any GROUP BY expression is not shippable, then we cannot
6145 * push down aggregation to the foreign server.
6146 */
6147 if (!is_foreign_expr(root, grouped_rel, expr))
6148 return false;
6149
6150 /*
6151 * If it would be a foreign param, we can't put it into the tlist,
6152 * so we have to fail.
6153 */
6154 if (is_foreign_param(root, grouped_rel, expr))
6155 return false;
6156
6157 /*
6158 * Pushable, so add to tlist. We need to create a TLE for this
6159 * expression and apply the sortgroupref to it. We cannot use
6160 * add_to_flat_tlist() here because that avoids making duplicate
6161 * entries in the tlist. If there are duplicate entries with
6162 * distinct sortgrouprefs, we have to duplicate that situation in
6163 * the output tlist.
6164 */
6165 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
6166 tle->ressortgroupref = sgref;
6167 tlist = lappend(tlist, tle);
6168 }
6169 else
6170 {
6171 /*
6172 * Non-grouping expression we need to compute. Can we ship it
6173 * as-is to the foreign server?
6174 */
6175 if (is_foreign_expr(root, grouped_rel, expr) &&
6176 !is_foreign_param(root, grouped_rel, expr))
6177 {
6178 /* Yes, so add to tlist as-is; OK to suppress duplicates */
6179 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6180 }
6181 else
6182 {
6183 /* Not pushable as a whole; extract its Vars and aggregates */
6184 List *aggvars;
6185
6186 aggvars = pull_var_clause((Node *) expr,
6187 PVC_INCLUDE_AGGREGATES);
6188
6189 /*
6190 * If any aggregate expression is not shippable, then we
6191 * cannot push down aggregation to the foreign server. (We
6192 * don't have to check is_foreign_param, since that certainly
6193 * won't return true for any such expression.)
6194 */
6195 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
6196 return false;
6197
6198 /*
6199 * Add aggregates, if any, into the targetlist. Plain Vars
6200 * outside an aggregate can be ignored, because they should be
6201 * either same as some GROUP BY column or part of some GROUP
6202 * BY expression. In either case, they are already part of
6203 * the targetlist and thus no need to add them again. In fact
6204 * including plain Vars in the tlist when they do not match a
6205 * GROUP BY column would cause the foreign server to complain
6206 * that the shipped query is invalid.
6207 */
6208 foreach(l, aggvars)
6209 {
6210 Expr *expr = (Expr *) lfirst(l);
6211
6212 if (IsA(expr, Aggref))
6213 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6214 }
6215 }
6216 }
6217
6218 i++;
6219 }
6220
6221 /*
6222 * Classify the pushable and non-pushable HAVING clauses and save them in
6223 * remote_conds and local_conds of the grouped rel's fpinfo.
6224 */
6225 if (havingQual)
6226 {
6227 ListCell *lc;
6228
6229 foreach(lc, (List *) havingQual)
6230 {
6231 Expr *expr = (Expr *) lfirst(lc);
6232 RestrictInfo *rinfo;
6233
6234 /*
6235 * Currently, the core code doesn't wrap havingQuals in
6236 * RestrictInfos, so we must make our own.
6237 */
6238 Assert(!IsA(expr, RestrictInfo));
6239 rinfo = make_restrictinfo(root,
6240 expr,
6241 true,
6242 false,
6243 false,
6244 root->qual_security_level,
6245 grouped_rel->relids,
6246 NULL,
6247 NULL);
6248 if (is_foreign_expr(root, grouped_rel, expr))
6249 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
6250 else
6251 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
6252 }
6253 }
6254
6255 /*
6256 * If there are any local conditions, pull Vars and aggregates from it and
6257 * check whether they are safe to pushdown or not.
6258 */
6259 if (fpinfo->local_conds)
6260 {
6261 List *aggvars = NIL;
6262 ListCell *lc;
6263
6264 foreach(lc, fpinfo->local_conds)
6265 {
6266 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
6267
6268 aggvars = list_concat(aggvars,
6269 pull_var_clause((Node *) rinfo->clause,
6270 PVC_INCLUDE_AGGREGATES));
6271 }
6272
6273 foreach(lc, aggvars)
6274 {
6275 Expr *expr = (Expr *) lfirst(lc);
6276
6277 /*
6278 * If aggregates within local conditions are not safe to push
6279 * down, then we cannot push down the query. Vars are already
6280 * part of GROUP BY clause which are checked above, so no need to
6281 * access them again here. Again, we need not check
6282 * is_foreign_param for a foreign aggregate.
6283 */
6284 if (IsA(expr, Aggref))
6285 {
6286 if (!is_foreign_expr(root, grouped_rel, expr))
6287 return false;
6288
6289 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6290 }
6291 }
6292 }
6293
6294 /* Store generated targetlist */
6295 fpinfo->grouped_tlist = tlist;
6296
6297 /* Safe to pushdown */
6298 fpinfo->pushdown_safe = true;
6299
6300 /*
6301 * Set # of retrieved rows and cached relation costs to some negative
6302 * value, so that we can detect when they are set to some sensible values,
6303 * during one (usually the first) of the calls to estimate_path_cost_size.
6304 */
6305 fpinfo->retrieved_rows = -1;
6306 fpinfo->rel_startup_cost = -1;
6307 fpinfo->rel_total_cost = -1;
6308
6309 /*
6310 * Set the string describing this grouped relation to be used in EXPLAIN
6311 * output of corresponding ForeignScan. Note that the decoration we add
6312 * to the base relation name mustn't include any digits, or it'll confuse
6313 * postgresExplainForeignScan.
6314 */
6315 fpinfo->relation_name = psprintf("Aggregate on (%s)",
6316 ofpinfo->relation_name);
6317
6318 return true;
6319 }
6320
6321 /*
6322 * postgresGetForeignUpperPaths
6323 * Add paths for post-join operations like aggregation, grouping etc. if
6324 * corresponding operations are safe to push down.
6325 */
6326 static void
postgresGetForeignUpperPaths(PlannerInfo * root,UpperRelationKind stage,RelOptInfo * input_rel,RelOptInfo * output_rel,void * extra)6327 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
6328 RelOptInfo *input_rel, RelOptInfo *output_rel,
6329 void *extra)
6330 {
6331 PgFdwRelationInfo *fpinfo;
6332
6333 /*
6334 * If input rel is not safe to pushdown, then simply return as we cannot
6335 * perform any post-join operations on the foreign server.
6336 */
6337 if (!input_rel->fdw_private ||
6338 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
6339 return;
6340
6341 /* Ignore stages we don't support; and skip any duplicate calls. */
6342 if ((stage != UPPERREL_GROUP_AGG &&
6343 stage != UPPERREL_ORDERED &&
6344 stage != UPPERREL_FINAL) ||
6345 output_rel->fdw_private)
6346 return;
6347
6348 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
6349 fpinfo->pushdown_safe = false;
6350 fpinfo->stage = stage;
6351 output_rel->fdw_private = fpinfo;
6352
6353 switch (stage)
6354 {
6355 case UPPERREL_GROUP_AGG:
6356 add_foreign_grouping_paths(root, input_rel, output_rel,
6357 (GroupPathExtraData *) extra);
6358 break;
6359 case UPPERREL_ORDERED:
6360 add_foreign_ordered_paths(root, input_rel, output_rel);
6361 break;
6362 case UPPERREL_FINAL:
6363 add_foreign_final_paths(root, input_rel, output_rel,
6364 (FinalPathExtraData *) extra);
6365 break;
6366 default:
6367 elog(ERROR, "unexpected upper relation: %d", (int) stage);
6368 break;
6369 }
6370 }
6371
6372 /*
6373 * add_foreign_grouping_paths
6374 * Add foreign path for grouping and/or aggregation.
6375 *
6376 * Given input_rel represents the underlying scan. The paths are added to the
6377 * given grouped_rel.
6378 */
6379 static void
add_foreign_grouping_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * grouped_rel,GroupPathExtraData * extra)6380 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
6381 RelOptInfo *grouped_rel,
6382 GroupPathExtraData *extra)
6383 {
6384 Query *parse = root->parse;
6385 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6386 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
6387 ForeignPath *grouppath;
6388 double rows;
6389 int width;
6390 Cost startup_cost;
6391 Cost total_cost;
6392
6393 /* Nothing to be done, if there is no grouping or aggregation required. */
6394 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
6395 !root->hasHavingQual)
6396 return;
6397
6398 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
6399 extra->patype == PARTITIONWISE_AGGREGATE_FULL);
6400
6401 /* save the input_rel as outerrel in fpinfo */
6402 fpinfo->outerrel = input_rel;
6403
6404 /*
6405 * Copy foreign table, foreign server, user mapping, FDW options etc.
6406 * details from the input relation's fpinfo.
6407 */
6408 fpinfo->table = ifpinfo->table;
6409 fpinfo->server = ifpinfo->server;
6410 fpinfo->user = ifpinfo->user;
6411 merge_fdw_options(fpinfo, ifpinfo, NULL);
6412
6413 /*
6414 * Assess if it is safe to push down aggregation and grouping.
6415 *
6416 * Use HAVING qual from extra. In case of child partition, it will have
6417 * translated Vars.
6418 */
6419 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
6420 return;
6421
6422 /*
6423 * Compute the selectivity and cost of the local_conds, so we don't have
6424 * to do it over again for each path. (Currently we create just a single
6425 * path here, but in future it would be possible that we build more paths
6426 * such as pre-sorted paths as in postgresGetForeignPaths and
6427 * postgresGetForeignJoinPaths.) The best we can do for these conditions
6428 * is to estimate selectivity on the basis of local statistics.
6429 */
6430 fpinfo->local_conds_sel = clauselist_selectivity(root,
6431 fpinfo->local_conds,
6432 0,
6433 JOIN_INNER,
6434 NULL);
6435
6436 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6437
6438 /* Estimate the cost of push down */
6439 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
6440 &rows, &width, &startup_cost, &total_cost);
6441
6442 /* Now update this information in the fpinfo */
6443 fpinfo->rows = rows;
6444 fpinfo->width = width;
6445 fpinfo->startup_cost = startup_cost;
6446 fpinfo->total_cost = total_cost;
6447
6448 /* Create and add foreign path to the grouping relation. */
6449 grouppath = create_foreign_upper_path(root,
6450 grouped_rel,
6451 grouped_rel->reltarget,
6452 rows,
6453 startup_cost,
6454 total_cost,
6455 NIL, /* no pathkeys */
6456 NULL,
6457 NIL); /* no fdw_private */
6458
6459 /* Add generated path into grouped_rel by add_path(). */
6460 add_path(grouped_rel, (Path *) grouppath);
6461 }
6462
6463 /*
6464 * add_foreign_ordered_paths
6465 * Add foreign paths for performing the final sort remotely.
6466 *
6467 * Given input_rel contains the source-data Paths. The paths are added to the
6468 * given ordered_rel.
6469 */
6470 static void
add_foreign_ordered_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * ordered_rel)6471 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
6472 RelOptInfo *ordered_rel)
6473 {
6474 Query *parse = root->parse;
6475 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6476 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
6477 PgFdwPathExtraData *fpextra;
6478 double rows;
6479 int width;
6480 Cost startup_cost;
6481 Cost total_cost;
6482 List *fdw_private;
6483 ForeignPath *ordered_path;
6484 ListCell *lc;
6485
6486 /* Shouldn't get here unless the query has ORDER BY */
6487 Assert(parse->sortClause);
6488
6489 /* We don't support cases where there are any SRFs in the targetlist */
6490 if (parse->hasTargetSRFs)
6491 return;
6492
6493 /* Save the input_rel as outerrel in fpinfo */
6494 fpinfo->outerrel = input_rel;
6495
6496 /*
6497 * Copy foreign table, foreign server, user mapping, FDW options etc.
6498 * details from the input relation's fpinfo.
6499 */
6500 fpinfo->table = ifpinfo->table;
6501 fpinfo->server = ifpinfo->server;
6502 fpinfo->user = ifpinfo->user;
6503 merge_fdw_options(fpinfo, ifpinfo, NULL);
6504
6505 /*
6506 * If the input_rel is a base or join relation, we would already have
6507 * considered pushing down the final sort to the remote server when
6508 * creating pre-sorted foreign paths for that relation, because the
6509 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6510 * standard_qp_callback()).
6511 */
6512 if (input_rel->reloptkind == RELOPT_BASEREL ||
6513 input_rel->reloptkind == RELOPT_JOINREL)
6514 {
6515 Assert(root->query_pathkeys == root->sort_pathkeys);
6516
6517 /* Safe to push down if the query_pathkeys is safe to push down */
6518 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
6519
6520 return;
6521 }
6522
6523 /* The input_rel should be a grouping relation */
6524 Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
6525 ifpinfo->stage == UPPERREL_GROUP_AGG);
6526
6527 /*
6528 * We try to create a path below by extending a simple foreign path for
6529 * the underlying grouping relation to perform the final sort remotely,
6530 * which is stored into the fdw_private list of the resulting path.
6531 */
6532
6533 /* Assess if it is safe to push down the final sort */
6534 foreach(lc, root->sort_pathkeys)
6535 {
6536 PathKey *pathkey = (PathKey *) lfirst(lc);
6537 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6538 Expr *sort_expr;
6539
6540 /*
6541 * is_foreign_expr would detect volatile expressions as well, but
6542 * checking ec_has_volatile here saves some cycles.
6543 */
6544 if (pathkey_ec->ec_has_volatile)
6545 return;
6546
6547 /* Get the sort expression for the pathkey_ec */
6548 sort_expr = find_em_expr_for_input_target(root,
6549 pathkey_ec,
6550 input_rel->reltarget);
6551
6552 /* If it's unsafe to remote, we cannot push down the final sort */
6553 if (!is_foreign_expr(root, input_rel, sort_expr))
6554 return;
6555 }
6556
6557 /* Safe to push down */
6558 fpinfo->pushdown_safe = true;
6559
6560 /* Construct PgFdwPathExtraData */
6561 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6562 fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6563 fpextra->has_final_sort = true;
6564
6565 /* Estimate the costs of performing the final sort remotely */
6566 estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6567 &rows, &width, &startup_cost, &total_cost);
6568
6569 /*
6570 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6571 * Items in the list must match order in enum FdwPathPrivateIndex.
6572 */
6573 fdw_private = list_make2(makeInteger(true), makeInteger(false));
6574
6575 /* Create foreign ordering path */
6576 ordered_path = create_foreign_upper_path(root,
6577 input_rel,
6578 root->upper_targets[UPPERREL_ORDERED],
6579 rows,
6580 startup_cost,
6581 total_cost,
6582 root->sort_pathkeys,
6583 NULL, /* no extra plan */
6584 fdw_private);
6585
6586 /* and add it to the ordered_rel */
6587 add_path(ordered_rel, (Path *) ordered_path);
6588 }
6589
6590 /*
6591 * add_foreign_final_paths
6592 * Add foreign paths for performing the final processing remotely.
6593 *
6594 * Given input_rel contains the source-data Paths. The paths are added to the
6595 * given final_rel.
6596 */
6597 static void
add_foreign_final_paths(PlannerInfo * root,RelOptInfo * input_rel,RelOptInfo * final_rel,FinalPathExtraData * extra)6598 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6599 RelOptInfo *final_rel,
6600 FinalPathExtraData *extra)
6601 {
6602 Query *parse = root->parse;
6603 PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6604 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6605 bool has_final_sort = false;
6606 List *pathkeys = NIL;
6607 PgFdwPathExtraData *fpextra;
6608 bool save_use_remote_estimate = false;
6609 double rows;
6610 int width;
6611 Cost startup_cost;
6612 Cost total_cost;
6613 List *fdw_private;
6614 ForeignPath *final_path;
6615
6616 /*
6617 * Currently, we only support this for SELECT commands
6618 */
6619 if (parse->commandType != CMD_SELECT)
6620 return;
6621
6622 /*
6623 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
6624 * to add a LIMIT node
6625 */
6626 if (!parse->rowMarks && !extra->limit_needed)
6627 return;
6628
6629 /* We don't support cases where there are any SRFs in the targetlist */
6630 if (parse->hasTargetSRFs)
6631 return;
6632
6633 /* Save the input_rel as outerrel in fpinfo */
6634 fpinfo->outerrel = input_rel;
6635
6636 /*
6637 * Copy foreign table, foreign server, user mapping, FDW options etc.
6638 * details from the input relation's fpinfo.
6639 */
6640 fpinfo->table = ifpinfo->table;
6641 fpinfo->server = ifpinfo->server;
6642 fpinfo->user = ifpinfo->user;
6643 merge_fdw_options(fpinfo, ifpinfo, NULL);
6644
6645 /*
6646 * If there is no need to add a LIMIT node, there might be a ForeignPath
6647 * in the input_rel's pathlist that implements all behavior of the query.
6648 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
6649 * (if any) before we get here.
6650 */
6651 if (!extra->limit_needed)
6652 {
6653 ListCell *lc;
6654
6655 Assert(parse->rowMarks);
6656
6657 /*
6658 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
6659 * so the input_rel should be a base, join, or ordered relation; and
6660 * if it's an ordered relation, its input relation should be a base or
6661 * join relation.
6662 */
6663 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6664 input_rel->reloptkind == RELOPT_JOINREL ||
6665 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6666 ifpinfo->stage == UPPERREL_ORDERED &&
6667 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
6668 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
6669
6670 foreach(lc, input_rel->pathlist)
6671 {
6672 Path *path = (Path *) lfirst(lc);
6673
6674 /*
6675 * apply_scanjoin_target_to_paths() uses create_projection_path()
6676 * to adjust each of its input paths if needed, whereas
6677 * create_ordered_paths() uses apply_projection_to_path() to do
6678 * that. So the former might have put a ProjectionPath on top of
6679 * the ForeignPath; look through ProjectionPath and see if the
6680 * path underneath it is ForeignPath.
6681 */
6682 if (IsA(path, ForeignPath) ||
6683 (IsA(path, ProjectionPath) &&
6684 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
6685 {
6686 /*
6687 * Create foreign final path; this gets rid of a
6688 * no-longer-needed outer plan (if any), which makes the
6689 * EXPLAIN output look cleaner
6690 */
6691 final_path = create_foreign_upper_path(root,
6692 path->parent,
6693 path->pathtarget,
6694 path->rows,
6695 path->startup_cost,
6696 path->total_cost,
6697 path->pathkeys,
6698 NULL, /* no extra plan */
6699 NULL); /* no fdw_private */
6700
6701 /* and add it to the final_rel */
6702 add_path(final_rel, (Path *) final_path);
6703
6704 /* Safe to push down */
6705 fpinfo->pushdown_safe = true;
6706
6707 return;
6708 }
6709 }
6710
6711 /*
6712 * If we get here it means no ForeignPaths; since we would already
6713 * have considered pushing down all operations for the query to the
6714 * remote server, give up on it.
6715 */
6716 return;
6717 }
6718
6719 Assert(extra->limit_needed);
6720
6721 /*
6722 * If the input_rel is an ordered relation, replace the input_rel with its
6723 * input relation
6724 */
6725 if (input_rel->reloptkind == RELOPT_UPPER_REL &&
6726 ifpinfo->stage == UPPERREL_ORDERED)
6727 {
6728 input_rel = ifpinfo->outerrel;
6729 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6730 has_final_sort = true;
6731 pathkeys = root->sort_pathkeys;
6732 }
6733
6734 /* The input_rel should be a base, join, or grouping relation */
6735 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
6736 input_rel->reloptkind == RELOPT_JOINREL ||
6737 (input_rel->reloptkind == RELOPT_UPPER_REL &&
6738 ifpinfo->stage == UPPERREL_GROUP_AGG));
6739
6740 /*
6741 * We try to create a path below by extending a simple foreign path for
6742 * the underlying base, join, or grouping relation to perform the final
6743 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
6744 * stored into the fdw_private list of the resulting path. (We
6745 * re-estimate the costs of sorting the underlying relation, if
6746 * has_final_sort.)
6747 */
6748
6749 /*
6750 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
6751 * server
6752 */
6753
6754 /*
6755 * If the underlying relation has any local conditions, the LIMIT/OFFSET
6756 * cannot be pushed down.
6757 */
6758 if (ifpinfo->local_conds)
6759 return;
6760
6761 /*
6762 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
6763 * not safe to remote.
6764 */
6765 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
6766 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
6767 return;
6768
6769 /* Safe to push down */
6770 fpinfo->pushdown_safe = true;
6771
6772 /* Construct PgFdwPathExtraData */
6773 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6774 fpextra->target = root->upper_targets[UPPERREL_FINAL];
6775 fpextra->has_final_sort = has_final_sort;
6776 fpextra->has_limit = extra->limit_needed;
6777 fpextra->limit_tuples = extra->limit_tuples;
6778 fpextra->count_est = extra->count_est;
6779 fpextra->offset_est = extra->offset_est;
6780
6781 /*
6782 * Estimate the costs of performing the final sort and the LIMIT
6783 * restriction remotely. If has_final_sort is false, we wouldn't need to
6784 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
6785 * roughly estimated using the costs we already have for the underlying
6786 * relation, in the same way as when use_remote_estimate is false. Since
6787 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
6788 * false in that case.
6789 */
6790 if (!fpextra->has_final_sort)
6791 {
6792 save_use_remote_estimate = ifpinfo->use_remote_estimate;
6793 ifpinfo->use_remote_estimate = false;
6794 }
6795 estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
6796 &rows, &width, &startup_cost, &total_cost);
6797 if (!fpextra->has_final_sort)
6798 ifpinfo->use_remote_estimate = save_use_remote_estimate;
6799
6800 /*
6801 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6802 * Items in the list must match order in enum FdwPathPrivateIndex.
6803 */
6804 fdw_private = list_make2(makeInteger(has_final_sort),
6805 makeInteger(extra->limit_needed));
6806
6807 /*
6808 * Create foreign final path; this gets rid of a no-longer-needed outer
6809 * plan (if any), which makes the EXPLAIN output look cleaner
6810 */
6811 final_path = create_foreign_upper_path(root,
6812 input_rel,
6813 root->upper_targets[UPPERREL_FINAL],
6814 rows,
6815 startup_cost,
6816 total_cost,
6817 pathkeys,
6818 NULL, /* no extra plan */
6819 fdw_private);
6820
6821 /* and add it to the final_rel */
6822 add_path(final_rel, (Path *) final_path);
6823 }
6824
6825 /*
6826 * postgresIsForeignPathAsyncCapable
6827 * Check whether a given ForeignPath node is async-capable.
6828 */
6829 static bool
postgresIsForeignPathAsyncCapable(ForeignPath * path)6830 postgresIsForeignPathAsyncCapable(ForeignPath *path)
6831 {
6832 RelOptInfo *rel = ((Path *) path)->parent;
6833 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
6834
6835 return fpinfo->async_capable;
6836 }
6837
6838 /*
6839 * postgresForeignAsyncRequest
6840 * Asynchronously request next tuple from a foreign PostgreSQL table.
6841 */
6842 static void
postgresForeignAsyncRequest(AsyncRequest * areq)6843 postgresForeignAsyncRequest(AsyncRequest *areq)
6844 {
6845 produce_tuple_asynchronously(areq, true);
6846 }
6847
6848 /*
6849 * postgresForeignAsyncConfigureWait
6850 * Configure a file descriptor event for which we wish to wait.
6851 */
6852 static void
postgresForeignAsyncConfigureWait(AsyncRequest * areq)6853 postgresForeignAsyncConfigureWait(AsyncRequest *areq)
6854 {
6855 ForeignScanState *node = (ForeignScanState *) areq->requestee;
6856 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
6857 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
6858 AppendState *requestor = (AppendState *) areq->requestor;
6859 WaitEventSet *set = requestor->as_eventset;
6860
6861 /* This should not be called unless callback_pending */
6862 Assert(areq->callback_pending);
6863
6864 /*
6865 * If process_pending_request() has been invoked on the given request
6866 * before we get here, we might have some tuples already; in which case
6867 * complete the request
6868 */
6869 if (fsstate->next_tuple < fsstate->num_tuples)
6870 {
6871 complete_pending_request(areq);
6872 if (areq->request_complete)
6873 return;
6874 Assert(areq->callback_pending);
6875 }
6876
6877 /* We must have run out of tuples */
6878 Assert(fsstate->next_tuple >= fsstate->num_tuples);
6879
6880 /* The core code would have registered postmaster death event */
6881 Assert(GetNumRegisteredWaitEvents(set) >= 1);
6882
6883 /* Begin an asynchronous data fetch if not already done */
6884 if (!pendingAreq)
6885 fetch_more_data_begin(areq);
6886 else if (pendingAreq->requestor != areq->requestor)
6887 {
6888 /*
6889 * This is the case when the in-process request was made by another
6890 * Append. Note that it might be useless to process the request,
6891 * because the query might not need tuples from that Append anymore.
6892 * If there are any child subplans of the same parent that are ready
6893 * for new requests, skip the given request. Likewise, if there are
6894 * any configured events other than the postmaster death event, skip
6895 * it. Otherwise, process the in-process request, then begin a fetch
6896 * to configure the event below, because we might otherwise end up
6897 * with no configured events other than the postmaster death event.
6898 */
6899 if (!bms_is_empty(requestor->as_needrequest))
6900 return;
6901 if (GetNumRegisteredWaitEvents(set) > 1)
6902 return;
6903 process_pending_request(pendingAreq);
6904 fetch_more_data_begin(areq);
6905 }
6906 else if (pendingAreq->requestee != areq->requestee)
6907 {
6908 /*
6909 * This is the case when the in-process request was made by the same
6910 * parent but for a different child. Since we configure only the
6911 * event for the request made for that child, skip the given request.
6912 */
6913 return;
6914 }
6915 else
6916 Assert(pendingAreq == areq);
6917
6918 AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
6919 NULL, areq);
6920 }
6921
6922 /*
6923 * postgresForeignAsyncNotify
6924 * Fetch some more tuples from a file descriptor that becomes ready,
6925 * requesting next tuple.
6926 */
6927 static void
postgresForeignAsyncNotify(AsyncRequest * areq)6928 postgresForeignAsyncNotify(AsyncRequest *areq)
6929 {
6930 ForeignScanState *node = (ForeignScanState *) areq->requestee;
6931 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
6932
6933 /* The core code would have initialized the callback_pending flag */
6934 Assert(!areq->callback_pending);
6935
6936 /*
6937 * If process_pending_request() has been invoked on the given request
6938 * before we get here, we might have some tuples already; in which case
6939 * produce the next tuple
6940 */
6941 if (fsstate->next_tuple < fsstate->num_tuples)
6942 {
6943 produce_tuple_asynchronously(areq, true);
6944 return;
6945 }
6946
6947 /* We must have run out of tuples */
6948 Assert(fsstate->next_tuple >= fsstate->num_tuples);
6949
6950 /* The request should be currently in-process */
6951 Assert(fsstate->conn_state->pendingAreq == areq);
6952
6953 /* On error, report the original query, not the FETCH. */
6954 if (!PQconsumeInput(fsstate->conn))
6955 pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
6956
6957 fetch_more_data(node);
6958
6959 produce_tuple_asynchronously(areq, true);
6960 }
6961
6962 /*
6963 * Asynchronously produce next tuple from a foreign PostgreSQL table.
6964 */
6965 static void
produce_tuple_asynchronously(AsyncRequest * areq,bool fetch)6966 produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
6967 {
6968 ForeignScanState *node = (ForeignScanState *) areq->requestee;
6969 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
6970 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
6971 TupleTableSlot *result;
6972
6973 /* This should not be called if the request is currently in-process */
6974 Assert(areq != pendingAreq);
6975
6976 /* Fetch some more tuples, if we've run out */
6977 if (fsstate->next_tuple >= fsstate->num_tuples)
6978 {
6979 /* No point in another fetch if we already detected EOF, though */
6980 if (!fsstate->eof_reached)
6981 {
6982 /* Mark the request as pending for a callback */
6983 ExecAsyncRequestPending(areq);
6984 /* Begin another fetch if requested and if no pending request */
6985 if (fetch && !pendingAreq)
6986 fetch_more_data_begin(areq);
6987 }
6988 else
6989 {
6990 /* There's nothing more to do; just return a NULL pointer */
6991 result = NULL;
6992 /* Mark the request as complete */
6993 ExecAsyncRequestDone(areq, result);
6994 }
6995 return;
6996 }
6997
6998 /* Get a tuple from the ForeignScan node */
6999 result = areq->requestee->ExecProcNodeReal(areq->requestee);
7000 if (!TupIsNull(result))
7001 {
7002 /* Mark the request as complete */
7003 ExecAsyncRequestDone(areq, result);
7004 return;
7005 }
7006 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7007
7008 /* Fetch some more tuples, if we've not detected EOF yet */
7009 if (!fsstate->eof_reached)
7010 {
7011 /* Mark the request as pending for a callback */
7012 ExecAsyncRequestPending(areq);
7013 /* Begin another fetch if requested and if no pending request */
7014 if (fetch && !pendingAreq)
7015 fetch_more_data_begin(areq);
7016 }
7017 else
7018 {
7019 /* There's nothing more to do; just return a NULL pointer */
7020 result = NULL;
7021 /* Mark the request as complete */
7022 ExecAsyncRequestDone(areq, result);
7023 }
7024 }
7025
7026 /*
7027 * Begin an asynchronous data fetch.
7028 *
7029 * Note: this function assumes there is no currently-in-progress asynchronous
7030 * data fetch.
7031 *
7032 * Note: fetch_more_data must be called to fetch the result.
7033 */
7034 static void
fetch_more_data_begin(AsyncRequest * areq)7035 fetch_more_data_begin(AsyncRequest *areq)
7036 {
7037 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7038 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7039 char sql[64];
7040
7041 Assert(!fsstate->conn_state->pendingAreq);
7042
7043 /* Create the cursor synchronously. */
7044 if (!fsstate->cursor_exists)
7045 create_cursor(node);
7046
7047 /* We will send this query, but not wait for the response. */
7048 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
7049 fsstate->fetch_size, fsstate->cursor_number);
7050
7051 if (PQsendQuery(fsstate->conn, sql) < 0)
7052 pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
7053
7054 /* Remember that the request is in process */
7055 fsstate->conn_state->pendingAreq = areq;
7056 }
7057
7058 /*
7059 * Process a pending asynchronous request.
7060 */
7061 void
process_pending_request(AsyncRequest * areq)7062 process_pending_request(AsyncRequest *areq)
7063 {
7064 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7065 PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state;
7066
7067 /* The request would have been pending for a callback */
7068 Assert(areq->callback_pending);
7069
7070 /* The request should be currently in-process */
7071 Assert(fsstate->conn_state->pendingAreq == areq);
7072
7073 fetch_more_data(node);
7074
7075 /*
7076 * If we didn't get any tuples, must be end of data; complete the request
7077 * now. Otherwise, we postpone completing the request until we are called
7078 * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify().
7079 */
7080 if (fsstate->next_tuple >= fsstate->num_tuples)
7081 {
7082 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7083 areq->callback_pending = false;
7084 /* Mark the request as complete */
7085 ExecAsyncRequestDone(areq, NULL);
7086 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7087 ExecAsyncResponse(areq);
7088 }
7089 }
7090
7091 /*
7092 * Complete a pending asynchronous request.
7093 */
7094 static void
complete_pending_request(AsyncRequest * areq)7095 complete_pending_request(AsyncRequest *areq)
7096 {
7097 /* The request would have been pending for a callback */
7098 Assert(areq->callback_pending);
7099
7100 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7101 areq->callback_pending = false;
7102
7103 /* We begin a fetch afterwards if necessary; don't fetch */
7104 produce_tuple_asynchronously(areq, false);
7105
7106 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7107 ExecAsyncResponse(areq);
7108
7109 /* Also, we do instrumentation ourselves, if required */
7110 if (areq->requestee->instrument)
7111 InstrUpdateTupleCount(areq->requestee->instrument,
7112 TupIsNull(areq->result) ? 0.0 : 1.0);
7113 }
7114
7115 /*
7116 * Create a tuple from the specified row of the PGresult.
7117 *
7118 * rel is the local representation of the foreign table, attinmeta is
7119 * conversion data for the rel's tupdesc, and retrieved_attrs is an
7120 * integer list of the table column numbers present in the PGresult.
7121 * fsstate is the ForeignScan plan node's execution state.
7122 * temp_context is a working context that can be reset after each tuple.
7123 *
7124 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
7125 * if we're processing a remote join, while fsstate is NULL in a non-query
7126 * context such as ANALYZE, or if we're processing a non-scan query node.
7127 */
7128 static HeapTuple
make_tuple_from_result_row(PGresult * res,int row,Relation rel,AttInMetadata * attinmeta,List * retrieved_attrs,ForeignScanState * fsstate,MemoryContext temp_context)7129 make_tuple_from_result_row(PGresult *res,
7130 int row,
7131 Relation rel,
7132 AttInMetadata *attinmeta,
7133 List *retrieved_attrs,
7134 ForeignScanState *fsstate,
7135 MemoryContext temp_context)
7136 {
7137 HeapTuple tuple;
7138 TupleDesc tupdesc;
7139 Datum *values;
7140 bool *nulls;
7141 ItemPointer ctid = NULL;
7142 ConversionLocation errpos;
7143 ErrorContextCallback errcallback;
7144 MemoryContext oldcontext;
7145 ListCell *lc;
7146 int j;
7147
7148 Assert(row < PQntuples(res));
7149
7150 /*
7151 * Do the following work in a temp context that we reset after each tuple.
7152 * This cleans up not only the data we have direct access to, but any
7153 * cruft the I/O functions might leak.
7154 */
7155 oldcontext = MemoryContextSwitchTo(temp_context);
7156
7157 /*
7158 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
7159 * provided, otherwise look to the scan node's ScanTupleSlot.
7160 */
7161 if (rel)
7162 tupdesc = RelationGetDescr(rel);
7163 else
7164 {
7165 Assert(fsstate);
7166 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
7167 }
7168
7169 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
7170 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
7171 /* Initialize to nulls for any columns not present in result */
7172 memset(nulls, true, tupdesc->natts * sizeof(bool));
7173
7174 /*
7175 * Set up and install callback to report where conversion error occurs.
7176 */
7177 errpos.cur_attno = 0;
7178 errpos.rel = rel;
7179 errpos.fsstate = fsstate;
7180 errcallback.callback = conversion_error_callback;
7181 errcallback.arg = (void *) &errpos;
7182 errcallback.previous = error_context_stack;
7183 error_context_stack = &errcallback;
7184
7185 /*
7186 * i indexes columns in the relation, j indexes columns in the PGresult.
7187 */
7188 j = 0;
7189 foreach(lc, retrieved_attrs)
7190 {
7191 int i = lfirst_int(lc);
7192 char *valstr;
7193
7194 /* fetch next column's textual value */
7195 if (PQgetisnull(res, row, j))
7196 valstr = NULL;
7197 else
7198 valstr = PQgetvalue(res, row, j);
7199
7200 /*
7201 * convert value to internal representation
7202 *
7203 * Note: we ignore system columns other than ctid and oid in result
7204 */
7205 errpos.cur_attno = i;
7206 if (i > 0)
7207 {
7208 /* ordinary column */
7209 Assert(i <= tupdesc->natts);
7210 nulls[i - 1] = (valstr == NULL);
7211 /* Apply the input function even to nulls, to support domains */
7212 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
7213 valstr,
7214 attinmeta->attioparams[i - 1],
7215 attinmeta->atttypmods[i - 1]);
7216 }
7217 else if (i == SelfItemPointerAttributeNumber)
7218 {
7219 /* ctid */
7220 if (valstr != NULL)
7221 {
7222 Datum datum;
7223
7224 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
7225 ctid = (ItemPointer) DatumGetPointer(datum);
7226 }
7227 }
7228 errpos.cur_attno = 0;
7229
7230 j++;
7231 }
7232
7233 /* Uninstall error context callback. */
7234 error_context_stack = errcallback.previous;
7235
7236 /*
7237 * Check we got the expected number of columns. Note: j == 0 and
7238 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
7239 */
7240 if (j > 0 && j != PQnfields(res))
7241 elog(ERROR, "remote query result does not match the foreign table");
7242
7243 /*
7244 * Build the result tuple in caller's memory context.
7245 */
7246 MemoryContextSwitchTo(oldcontext);
7247
7248 tuple = heap_form_tuple(tupdesc, values, nulls);
7249
7250 /*
7251 * If we have a CTID to return, install it in both t_self and t_ctid.
7252 * t_self is the normal place, but if the tuple is converted to a
7253 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
7254 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
7255 */
7256 if (ctid)
7257 tuple->t_self = tuple->t_data->t_ctid = *ctid;
7258
7259 /*
7260 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
7261 * heap_form_tuple. heap_form_tuple actually creates the tuple with
7262 * DatumTupleFields, not HeapTupleFields, but the executor expects
7263 * HeapTupleFields and will happily extract system columns on that
7264 * assumption. If we don't do this then, for example, the tuple length
7265 * ends up in the xmin field, which isn't what we want.
7266 */
7267 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
7268 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
7269 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
7270
7271 /* Clean up */
7272 MemoryContextReset(temp_context);
7273
7274 return tuple;
7275 }
7276
7277 /*
7278 * Callback function which is called when error occurs during column value
7279 * conversion. Print names of column and relation.
7280 *
7281 * Note that this function mustn't do any catalog lookups, since we are in
7282 * an already-failed transaction. Fortunately, we can get the needed info
7283 * from the relation or the query's rangetable instead.
7284 */
7285 static void
conversion_error_callback(void * arg)7286 conversion_error_callback(void *arg)
7287 {
7288 ConversionLocation *errpos = (ConversionLocation *) arg;
7289 Relation rel = errpos->rel;
7290 ForeignScanState *fsstate = errpos->fsstate;
7291 const char *attname = NULL;
7292 const char *relname = NULL;
7293 bool is_wholerow = false;
7294
7295 /*
7296 * If we're in a scan node, always use aliases from the rangetable, for
7297 * consistency between the simple-relation and remote-join cases. Look at
7298 * the relation's tupdesc only if we're not in a scan node.
7299 */
7300 if (fsstate)
7301 {
7302 /* ForeignScan case */
7303 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
7304 int varno = 0;
7305 AttrNumber colno = 0;
7306
7307 if (fsplan->scan.scanrelid > 0)
7308 {
7309 /* error occurred in a scan against a foreign table */
7310 varno = fsplan->scan.scanrelid;
7311 colno = errpos->cur_attno;
7312 }
7313 else
7314 {
7315 /* error occurred in a scan against a foreign join */
7316 TargetEntry *tle;
7317
7318 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
7319 errpos->cur_attno - 1);
7320
7321 /*
7322 * Target list can have Vars and expressions. For Vars, we can
7323 * get some information, however for expressions we can't. Thus
7324 * for expressions, just show generic context message.
7325 */
7326 if (IsA(tle->expr, Var))
7327 {
7328 Var *var = (Var *) tle->expr;
7329
7330 varno = var->varno;
7331 colno = var->varattno;
7332 }
7333 }
7334
7335 if (varno > 0)
7336 {
7337 EState *estate = fsstate->ss.ps.state;
7338 RangeTblEntry *rte = exec_rt_fetch(varno, estate);
7339
7340 relname = rte->eref->aliasname;
7341
7342 if (colno == 0)
7343 is_wholerow = true;
7344 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
7345 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
7346 else if (colno == SelfItemPointerAttributeNumber)
7347 attname = "ctid";
7348 }
7349 }
7350 else if (rel)
7351 {
7352 /* Non-ForeignScan case (we should always have a rel here) */
7353 TupleDesc tupdesc = RelationGetDescr(rel);
7354
7355 relname = RelationGetRelationName(rel);
7356 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
7357 {
7358 Form_pg_attribute attr = TupleDescAttr(tupdesc,
7359 errpos->cur_attno - 1);
7360
7361 attname = NameStr(attr->attname);
7362 }
7363 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
7364 attname = "ctid";
7365 }
7366
7367 if (relname && is_wholerow)
7368 errcontext("whole-row reference to foreign table \"%s\"", relname);
7369 else if (relname && attname)
7370 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
7371 else
7372 errcontext("processing expression at position %d in select list",
7373 errpos->cur_attno);
7374 }
7375
7376 /*
7377 * Find an equivalence class member expression to be computed as a sort column
7378 * in the given target.
7379 */
7380 Expr *
find_em_expr_for_input_target(PlannerInfo * root,EquivalenceClass * ec,PathTarget * target)7381 find_em_expr_for_input_target(PlannerInfo *root,
7382 EquivalenceClass *ec,
7383 PathTarget *target)
7384 {
7385 ListCell *lc1;
7386 int i;
7387
7388 i = 0;
7389 foreach(lc1, target->exprs)
7390 {
7391 Expr *expr = (Expr *) lfirst(lc1);
7392 Index sgref = get_pathtarget_sortgroupref(target, i);
7393 ListCell *lc2;
7394
7395 /* Ignore non-sort expressions */
7396 if (sgref == 0 ||
7397 get_sortgroupref_clause_noerr(sgref,
7398 root->parse->sortClause) == NULL)
7399 {
7400 i++;
7401 continue;
7402 }
7403
7404 /* We ignore binary-compatible relabeling on both ends */
7405 while (expr && IsA(expr, RelabelType))
7406 expr = ((RelabelType *) expr)->arg;
7407
7408 /* Locate an EquivalenceClass member matching this expr, if any */
7409 foreach(lc2, ec->ec_members)
7410 {
7411 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
7412 Expr *em_expr;
7413
7414 /* Don't match constants */
7415 if (em->em_is_const)
7416 continue;
7417
7418 /* Ignore child members */
7419 if (em->em_is_child)
7420 continue;
7421
7422 /* Match if same expression (after stripping relabel) */
7423 em_expr = em->em_expr;
7424 while (em_expr && IsA(em_expr, RelabelType))
7425 em_expr = ((RelabelType *) em_expr)->arg;
7426
7427 if (equal(em_expr, expr))
7428 return em->em_expr;
7429 }
7430
7431 i++;
7432 }
7433
7434 elog(ERROR, "could not find pathkey item to sort");
7435 return NULL; /* keep compiler quiet */
7436 }
7437
7438 /*
7439 * Determine batch size for a given foreign table. The option specified for
7440 * a table has precedence.
7441 */
7442 static int
get_batch_size_option(Relation rel)7443 get_batch_size_option(Relation rel)
7444 {
7445 Oid foreigntableid = RelationGetRelid(rel);
7446 ForeignTable *table;
7447 ForeignServer *server;
7448 List *options;
7449 ListCell *lc;
7450
7451 /* we use 1 by default, which means "no batching" */
7452 int batch_size = 1;
7453
7454 /*
7455 * Load options for table and server. We append server options after table
7456 * options, because table options take precedence.
7457 */
7458 table = GetForeignTable(foreigntableid);
7459 server = GetForeignServer(table->serverid);
7460
7461 options = NIL;
7462 options = list_concat(options, table->options);
7463 options = list_concat(options, server->options);
7464
7465 /* See if either table or server specifies batch_size. */
7466 foreach(lc, options)
7467 {
7468 DefElem *def = (DefElem *) lfirst(lc);
7469
7470 if (strcmp(def->defname, "batch_size") == 0)
7471 {
7472 (void) parse_int(defGetString(def), &batch_size, 0, NULL);
7473 break;
7474 }
7475 }
7476
7477 return batch_size;
7478 }
7479