1 /*------------------------------------------------------------------------- 2 * 3 * postgres_fdw.c 4 * Foreign-data wrapper for remote PostgreSQL servers 5 * 6 * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group 7 * 8 * IDENTIFICATION 9 * contrib/postgres_fdw/postgres_fdw.c 10 * 11 *------------------------------------------------------------------------- 12 */ 13 #include "postgres.h" 14 15 #include "postgres_fdw.h" 16 17 #include "access/htup_details.h" 18 #include "access/sysattr.h" 19 #include "access/table.h" 20 #include "catalog/pg_class.h" 21 #include "commands/defrem.h" 22 #include "commands/explain.h" 23 #include "commands/vacuum.h" 24 #include "foreign/fdwapi.h" 25 #include "funcapi.h" 26 #include "miscadmin.h" 27 #include "nodes/makefuncs.h" 28 #include "nodes/nodeFuncs.h" 29 #include "optimizer/clauses.h" 30 #include "optimizer/cost.h" 31 #include "optimizer/optimizer.h" 32 #include "optimizer/pathnode.h" 33 #include "optimizer/paths.h" 34 #include "optimizer/planmain.h" 35 #include "optimizer/restrictinfo.h" 36 #include "optimizer/tlist.h" 37 #include "parser/parsetree.h" 38 #include "utils/builtins.h" 39 #include "utils/float.h" 40 #include "utils/guc.h" 41 #include "utils/lsyscache.h" 42 #include "utils/memutils.h" 43 #include "utils/rel.h" 44 #include "utils/sampling.h" 45 #include "utils/selfuncs.h" 46 47 /* source-code-compatibility hacks for pull_varnos() API change */ 48 #define make_restrictinfo(a,b,c,d,e,f,g,h,i) make_restrictinfo_new(a,b,c,d,e,f,g,h,i) 49 50 PG_MODULE_MAGIC; 51 52 /* Default CPU cost to start up a foreign query. */ 53 #define DEFAULT_FDW_STARTUP_COST 100.0 54 55 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ 56 #define DEFAULT_FDW_TUPLE_COST 0.01 57 58 /* If no remote estimates, assume a sort costs 20% extra */ 59 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 60 61 /* 62 * Indexes of FDW-private information stored in fdw_private lists. 63 * 64 * These items are indexed with the enum FdwScanPrivateIndex, so an item 65 * can be fetched with list_nth(). For example, to get the SELECT statement: 66 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); 67 */ 68 enum FdwScanPrivateIndex 69 { 70 /* SQL statement to execute remotely (as a String node) */ 71 FdwScanPrivateSelectSql, 72 /* Integer list of attribute numbers retrieved by the SELECT */ 73 FdwScanPrivateRetrievedAttrs, 74 /* Integer representing the desired fetch_size */ 75 FdwScanPrivateFetchSize, 76 77 /* 78 * String describing join i.e. names of relations being joined and types 79 * of join, added when the scan is join 80 */ 81 FdwScanPrivateRelations 82 }; 83 84 /* 85 * Similarly, this enum describes what's kept in the fdw_private list for 86 * a ModifyTable node referencing a postgres_fdw foreign table. We store: 87 * 88 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server 89 * 2) Integer list of target attribute numbers for INSERT/UPDATE 90 * (NIL for a DELETE) 91 * 3) Boolean flag showing if the remote query has a RETURNING clause 92 * 4) Integer list of attribute numbers retrieved by RETURNING, if any 93 */ 94 enum FdwModifyPrivateIndex 95 { 96 /* SQL statement to execute remotely (as a String node) */ 97 FdwModifyPrivateUpdateSql, 98 /* Integer list of target attribute numbers for INSERT/UPDATE */ 99 FdwModifyPrivateTargetAttnums, 100 /* has-returning flag (as an integer Value node) */ 101 FdwModifyPrivateHasReturning, 102 /* Integer list of attribute numbers retrieved by RETURNING */ 103 FdwModifyPrivateRetrievedAttrs 104 }; 105 106 /* 107 * Similarly, this enum describes what's kept in the fdw_private list for 108 * a ForeignScan node that modifies a foreign table directly. We store: 109 * 110 * 1) UPDATE/DELETE statement text to be sent to the remote server 111 * 2) Boolean flag showing if the remote query has a RETURNING clause 112 * 3) Integer list of attribute numbers retrieved by RETURNING, if any 113 * 4) Boolean flag showing if we set the command es_processed 114 */ 115 enum FdwDirectModifyPrivateIndex 116 { 117 /* SQL statement to execute remotely (as a String node) */ 118 FdwDirectModifyPrivateUpdateSql, 119 /* has-returning flag (as an integer Value node) */ 120 FdwDirectModifyPrivateHasReturning, 121 /* Integer list of attribute numbers retrieved by RETURNING */ 122 FdwDirectModifyPrivateRetrievedAttrs, 123 /* set-processed flag (as an integer Value node) */ 124 FdwDirectModifyPrivateSetProcessed 125 }; 126 127 /* 128 * Execution state of a foreign scan using postgres_fdw. 129 */ 130 typedef struct PgFdwScanState 131 { 132 Relation rel; /* relcache entry for the foreign table. NULL 133 * for a foreign join scan. */ 134 TupleDesc tupdesc; /* tuple descriptor of scan */ 135 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ 136 137 /* extracted fdw_private data */ 138 char *query; /* text of SELECT command */ 139 List *retrieved_attrs; /* list of retrieved attribute numbers */ 140 141 /* for remote query execution */ 142 PGconn *conn; /* connection for the scan */ 143 unsigned int cursor_number; /* quasi-unique ID for my cursor */ 144 bool cursor_exists; /* have we created the cursor? */ 145 int numParams; /* number of parameters passed to query */ 146 FmgrInfo *param_flinfo; /* output conversion functions for them */ 147 List *param_exprs; /* executable expressions for param values */ 148 const char **param_values; /* textual values of query parameters */ 149 150 /* for storing result tuples */ 151 HeapTuple *tuples; /* array of currently-retrieved tuples */ 152 int num_tuples; /* # of tuples in array */ 153 int next_tuple; /* index of next one to return */ 154 155 /* batch-level state, for optimizing rewinds and avoiding useless fetch */ 156 int fetch_ct_2; /* Min(# of fetches done, 2) */ 157 bool eof_reached; /* true if last fetch reached EOF */ 158 159 /* working memory contexts */ 160 MemoryContext batch_cxt; /* context holding current batch of tuples */ 161 MemoryContext temp_cxt; /* context for per-tuple temporary data */ 162 163 int fetch_size; /* number of tuples per fetch */ 164 } PgFdwScanState; 165 166 /* 167 * Execution state of a foreign insert/update/delete operation. 168 */ 169 typedef struct PgFdwModifyState 170 { 171 Relation rel; /* relcache entry for the foreign table */ 172 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ 173 174 /* for remote query execution */ 175 PGconn *conn; /* connection for the scan */ 176 char *p_name; /* name of prepared statement, if created */ 177 178 /* extracted fdw_private data */ 179 char *query; /* text of INSERT/UPDATE/DELETE command */ 180 List *target_attrs; /* list of target attribute numbers */ 181 bool has_returning; /* is there a RETURNING clause? */ 182 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ 183 184 /* info about parameters for prepared statement */ 185 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ 186 int p_nums; /* number of parameters to transmit */ 187 FmgrInfo *p_flinfo; /* output conversion functions for them */ 188 189 /* working memory context */ 190 MemoryContext temp_cxt; /* context for per-tuple temporary data */ 191 192 /* for update row movement if subplan result rel */ 193 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if 194 * created */ 195 } PgFdwModifyState; 196 197 /* 198 * Execution state of a foreign scan that modifies a foreign table directly. 199 */ 200 typedef struct PgFdwDirectModifyState 201 { 202 Relation rel; /* relcache entry for the foreign table */ 203 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ 204 205 /* extracted fdw_private data */ 206 char *query; /* text of UPDATE/DELETE command */ 207 bool has_returning; /* is there a RETURNING clause? */ 208 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ 209 bool set_processed; /* do we set the command es_processed? */ 210 211 /* for remote query execution */ 212 PGconn *conn; /* connection for the update */ 213 int numParams; /* number of parameters passed to query */ 214 FmgrInfo *param_flinfo; /* output conversion functions for them */ 215 List *param_exprs; /* executable expressions for param values */ 216 const char **param_values; /* textual values of query parameters */ 217 218 /* for storing result tuples */ 219 PGresult *result; /* result for query */ 220 int num_tuples; /* # of result tuples */ 221 int next_tuple; /* index of next one to return */ 222 Relation resultRel; /* relcache entry for the target relation */ 223 AttrNumber *attnoMap; /* array of attnums of input user columns */ 224 AttrNumber ctidAttno; /* attnum of input ctid column */ 225 AttrNumber oidAttno; /* attnum of input oid column */ 226 bool hasSystemCols; /* are there system columns of resultRel? */ 227 228 /* working memory context */ 229 MemoryContext temp_cxt; /* context for per-tuple temporary data */ 230 } PgFdwDirectModifyState; 231 232 /* 233 * Workspace for analyzing a foreign table. 234 */ 235 typedef struct PgFdwAnalyzeState 236 { 237 Relation rel; /* relcache entry for the foreign table */ 238 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ 239 List *retrieved_attrs; /* attr numbers retrieved by query */ 240 241 /* collected sample rows */ 242 HeapTuple *rows; /* array of size targrows */ 243 int targrows; /* target # of sample rows */ 244 int numrows; /* # of sample rows collected */ 245 246 /* for random sampling */ 247 double samplerows; /* # of rows fetched */ 248 double rowstoskip; /* # of rows to skip before next sample */ 249 ReservoirStateData rstate; /* state for reservoir sampling */ 250 251 /* working memory contexts */ 252 MemoryContext anl_cxt; /* context for per-analyze lifespan data */ 253 MemoryContext temp_cxt; /* context for per-tuple temporary data */ 254 } PgFdwAnalyzeState; 255 256 /* 257 * This enum describes what's kept in the fdw_private list for a ForeignPath. 258 * We store: 259 * 260 * 1) Boolean flag showing if the remote query has the final sort 261 * 2) Boolean flag showing if the remote query has the LIMIT clause 262 */ 263 enum FdwPathPrivateIndex 264 { 265 /* has-final-sort flag (as an integer Value node) */ 266 FdwPathPrivateHasFinalSort, 267 /* has-limit flag (as an integer Value node) */ 268 FdwPathPrivateHasLimit 269 }; 270 271 /* Struct for extra information passed to estimate_path_cost_size() */ 272 typedef struct 273 { 274 PathTarget *target; 275 bool has_final_sort; 276 bool has_limit; 277 double limit_tuples; 278 int64 count_est; 279 int64 offset_est; 280 } PgFdwPathExtraData; 281 282 /* 283 * Identify the attribute where data conversion fails. 284 */ 285 typedef struct ConversionLocation 286 { 287 AttrNumber cur_attno; /* attribute number being processed, or 0 */ 288 Relation rel; /* foreign table being processed, or NULL */ 289 ForeignScanState *fsstate; /* plan node being processed, or NULL */ 290 } ConversionLocation; 291 292 /* Callback argument for ec_member_matches_foreign */ 293 typedef struct 294 { 295 Expr *current; /* current expr, or NULL if not yet found */ 296 List *already_used; /* expressions already dealt with */ 297 } ec_member_foreign_arg; 298 299 /* 300 * SQL functions 301 */ 302 PG_FUNCTION_INFO_V1(postgres_fdw_handler); 303 304 /* 305 * FDW callback routines 306 */ 307 static void postgresGetForeignRelSize(PlannerInfo *root, 308 RelOptInfo *baserel, 309 Oid foreigntableid); 310 static void postgresGetForeignPaths(PlannerInfo *root, 311 RelOptInfo *baserel, 312 Oid foreigntableid); 313 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, 314 RelOptInfo *foreignrel, 315 Oid foreigntableid, 316 ForeignPath *best_path, 317 List *tlist, 318 List *scan_clauses, 319 Plan *outer_plan); 320 static void postgresBeginForeignScan(ForeignScanState *node, int eflags); 321 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); 322 static void postgresReScanForeignScan(ForeignScanState *node); 323 static void postgresEndForeignScan(ForeignScanState *node); 324 static void postgresAddForeignUpdateTargets(Query *parsetree, 325 RangeTblEntry *target_rte, 326 Relation target_relation); 327 static List *postgresPlanForeignModify(PlannerInfo *root, 328 ModifyTable *plan, 329 Index resultRelation, 330 int subplan_index); 331 static void postgresBeginForeignModify(ModifyTableState *mtstate, 332 ResultRelInfo *resultRelInfo, 333 List *fdw_private, 334 int subplan_index, 335 int eflags); 336 static TupleTableSlot *postgresExecForeignInsert(EState *estate, 337 ResultRelInfo *resultRelInfo, 338 TupleTableSlot *slot, 339 TupleTableSlot *planSlot); 340 static TupleTableSlot *postgresExecForeignUpdate(EState *estate, 341 ResultRelInfo *resultRelInfo, 342 TupleTableSlot *slot, 343 TupleTableSlot *planSlot); 344 static TupleTableSlot *postgresExecForeignDelete(EState *estate, 345 ResultRelInfo *resultRelInfo, 346 TupleTableSlot *slot, 347 TupleTableSlot *planSlot); 348 static void postgresEndForeignModify(EState *estate, 349 ResultRelInfo *resultRelInfo); 350 static void postgresBeginForeignInsert(ModifyTableState *mtstate, 351 ResultRelInfo *resultRelInfo); 352 static void postgresEndForeignInsert(EState *estate, 353 ResultRelInfo *resultRelInfo); 354 static int postgresIsForeignRelUpdatable(Relation rel); 355 static bool postgresPlanDirectModify(PlannerInfo *root, 356 ModifyTable *plan, 357 Index resultRelation, 358 int subplan_index); 359 static void postgresBeginDirectModify(ForeignScanState *node, int eflags); 360 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node); 361 static void postgresEndDirectModify(ForeignScanState *node); 362 static void postgresExplainForeignScan(ForeignScanState *node, 363 ExplainState *es); 364 static void postgresExplainForeignModify(ModifyTableState *mtstate, 365 ResultRelInfo *rinfo, 366 List *fdw_private, 367 int subplan_index, 368 ExplainState *es); 369 static void postgresExplainDirectModify(ForeignScanState *node, 370 ExplainState *es); 371 static bool postgresAnalyzeForeignTable(Relation relation, 372 AcquireSampleRowsFunc *func, 373 BlockNumber *totalpages); 374 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, 375 Oid serverOid); 376 static void postgresGetForeignJoinPaths(PlannerInfo *root, 377 RelOptInfo *joinrel, 378 RelOptInfo *outerrel, 379 RelOptInfo *innerrel, 380 JoinType jointype, 381 JoinPathExtraData *extra); 382 static bool postgresRecheckForeignScan(ForeignScanState *node, 383 TupleTableSlot *slot); 384 static void postgresGetForeignUpperPaths(PlannerInfo *root, 385 UpperRelationKind stage, 386 RelOptInfo *input_rel, 387 RelOptInfo *output_rel, 388 void *extra); 389 390 /* 391 * Helper functions 392 */ 393 static void estimate_path_cost_size(PlannerInfo *root, 394 RelOptInfo *foreignrel, 395 List *param_join_conds, 396 List *pathkeys, 397 PgFdwPathExtraData *fpextra, 398 double *p_rows, int *p_width, 399 Cost *p_startup_cost, Cost *p_total_cost); 400 static void get_remote_estimate(const char *sql, 401 PGconn *conn, 402 double *rows, 403 int *width, 404 Cost *startup_cost, 405 Cost *total_cost); 406 static void adjust_foreign_grouping_path_cost(PlannerInfo *root, 407 List *pathkeys, 408 double retrieved_rows, 409 double width, 410 double limit_tuples, 411 Cost *p_startup_cost, 412 Cost *p_run_cost); 413 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, 414 EquivalenceClass *ec, EquivalenceMember *em, 415 void *arg); 416 static void create_cursor(ForeignScanState *node); 417 static void fetch_more_data(ForeignScanState *node); 418 static void close_cursor(PGconn *conn, unsigned int cursor_number); 419 static PgFdwModifyState *create_foreign_modify(EState *estate, 420 RangeTblEntry *rte, 421 ResultRelInfo *resultRelInfo, 422 CmdType operation, 423 Plan *subplan, 424 char *query, 425 List *target_attrs, 426 bool has_returning, 427 List *retrieved_attrs); 428 static TupleTableSlot *execute_foreign_modify(EState *estate, 429 ResultRelInfo *resultRelInfo, 430 CmdType operation, 431 TupleTableSlot *slot, 432 TupleTableSlot *planSlot); 433 static void prepare_foreign_modify(PgFdwModifyState *fmstate); 434 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, 435 ItemPointer tupleid, 436 TupleTableSlot *slot); 437 static void store_returning_result(PgFdwModifyState *fmstate, 438 TupleTableSlot *slot, PGresult *res); 439 static void finish_foreign_modify(PgFdwModifyState *fmstate); 440 static List *build_remote_returning(Index rtindex, Relation rel, 441 List *returningList); 442 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); 443 static void execute_dml_stmt(ForeignScanState *node); 444 static TupleTableSlot *get_returning_data(ForeignScanState *node); 445 static void init_returning_filter(PgFdwDirectModifyState *dmstate, 446 List *fdw_scan_tlist, 447 Index rtindex); 448 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate, 449 TupleTableSlot *slot, 450 EState *estate); 451 static void prepare_query_params(PlanState *node, 452 List *fdw_exprs, 453 int numParams, 454 FmgrInfo **param_flinfo, 455 List **param_exprs, 456 const char ***param_values); 457 static void process_query_params(ExprContext *econtext, 458 FmgrInfo *param_flinfo, 459 List *param_exprs, 460 const char **param_values); 461 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, 462 HeapTuple *rows, int targrows, 463 double *totalrows, 464 double *totaldeadrows); 465 static void analyze_row_processor(PGresult *res, int row, 466 PgFdwAnalyzeState *astate); 467 static HeapTuple make_tuple_from_result_row(PGresult *res, 468 int row, 469 Relation rel, 470 AttInMetadata *attinmeta, 471 List *retrieved_attrs, 472 ForeignScanState *fsstate, 473 MemoryContext temp_context); 474 static void conversion_error_callback(void *arg); 475 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, 476 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, 477 JoinPathExtraData *extra); 478 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, 479 Node *havingQual); 480 static List *get_useful_pathkeys_for_relation(PlannerInfo *root, 481 RelOptInfo *rel); 482 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); 483 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, 484 Path *epq_path); 485 static void add_foreign_grouping_paths(PlannerInfo *root, 486 RelOptInfo *input_rel, 487 RelOptInfo *grouped_rel, 488 GroupPathExtraData *extra); 489 static void add_foreign_ordered_paths(PlannerInfo *root, 490 RelOptInfo *input_rel, 491 RelOptInfo *ordered_rel); 492 static void add_foreign_final_paths(PlannerInfo *root, 493 RelOptInfo *input_rel, 494 RelOptInfo *final_rel, 495 FinalPathExtraData *extra); 496 static void apply_server_options(PgFdwRelationInfo *fpinfo); 497 static void apply_table_options(PgFdwRelationInfo *fpinfo); 498 static void merge_fdw_options(PgFdwRelationInfo *fpinfo, 499 const PgFdwRelationInfo *fpinfo_o, 500 const PgFdwRelationInfo *fpinfo_i); 501 502 503 /* 504 * Foreign-data wrapper handler function: return a struct with pointers 505 * to my callback routines. 506 */ 507 Datum 508 postgres_fdw_handler(PG_FUNCTION_ARGS) 509 { 510 FdwRoutine *routine = makeNode(FdwRoutine); 511 512 /* Functions for scanning foreign tables */ 513 routine->GetForeignRelSize = postgresGetForeignRelSize; 514 routine->GetForeignPaths = postgresGetForeignPaths; 515 routine->GetForeignPlan = postgresGetForeignPlan; 516 routine->BeginForeignScan = postgresBeginForeignScan; 517 routine->IterateForeignScan = postgresIterateForeignScan; 518 routine->ReScanForeignScan = postgresReScanForeignScan; 519 routine->EndForeignScan = postgresEndForeignScan; 520 521 /* Functions for updating foreign tables */ 522 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; 523 routine->PlanForeignModify = postgresPlanForeignModify; 524 routine->BeginForeignModify = postgresBeginForeignModify; 525 routine->ExecForeignInsert = postgresExecForeignInsert; 526 routine->ExecForeignUpdate = postgresExecForeignUpdate; 527 routine->ExecForeignDelete = postgresExecForeignDelete; 528 routine->EndForeignModify = postgresEndForeignModify; 529 routine->BeginForeignInsert = postgresBeginForeignInsert; 530 routine->EndForeignInsert = postgresEndForeignInsert; 531 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; 532 routine->PlanDirectModify = postgresPlanDirectModify; 533 routine->BeginDirectModify = postgresBeginDirectModify; 534 routine->IterateDirectModify = postgresIterateDirectModify; 535 routine->EndDirectModify = postgresEndDirectModify; 536 537 /* Function for EvalPlanQual rechecks */ 538 routine->RecheckForeignScan = postgresRecheckForeignScan; 539 /* Support functions for EXPLAIN */ 540 routine->ExplainForeignScan = postgresExplainForeignScan; 541 routine->ExplainForeignModify = postgresExplainForeignModify; 542 routine->ExplainDirectModify = postgresExplainDirectModify; 543 544 /* Support functions for ANALYZE */ 545 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; 546 547 /* Support functions for IMPORT FOREIGN SCHEMA */ 548 routine->ImportForeignSchema = postgresImportForeignSchema; 549 550 /* Support functions for join push-down */ 551 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; 552 553 /* Support functions for upper relation push-down */ 554 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; 555 556 PG_RETURN_POINTER(routine); 557 } 558 559 /* 560 * postgresGetForeignRelSize 561 * Estimate # of rows and width of the result of the scan 562 * 563 * We should consider the effect of all baserestrictinfo clauses here, but 564 * not any join clauses. 565 */ 566 static void 567 postgresGetForeignRelSize(PlannerInfo *root, 568 RelOptInfo *baserel, 569 Oid foreigntableid) 570 { 571 PgFdwRelationInfo *fpinfo; 572 ListCell *lc; 573 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); 574 const char *namespace; 575 const char *relname; 576 const char *refname; 577 578 /* 579 * We use PgFdwRelationInfo to pass various information to subsequent 580 * functions. 581 */ 582 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); 583 baserel->fdw_private = (void *) fpinfo; 584 585 /* Base foreign tables need to be pushed down always. */ 586 fpinfo->pushdown_safe = true; 587 588 /* Look up foreign-table catalog info. */ 589 fpinfo->table = GetForeignTable(foreigntableid); 590 fpinfo->server = GetForeignServer(fpinfo->table->serverid); 591 592 /* 593 * Extract user-settable option values. Note that per-table settings of 594 * use_remote_estimate and fetch_size override per-server settings of 595 * them, respectively. 596 */ 597 fpinfo->use_remote_estimate = false; 598 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; 599 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; 600 fpinfo->shippable_extensions = NIL; 601 fpinfo->fetch_size = 100; 602 603 apply_server_options(fpinfo); 604 apply_table_options(fpinfo); 605 606 /* 607 * If the table or the server is configured to use remote estimates, 608 * identify which user to do remote access as during planning. This 609 * should match what ExecCheckRTEPerms() does. If we fail due to lack of 610 * permissions, the query would have failed at runtime anyway. 611 */ 612 if (fpinfo->use_remote_estimate) 613 { 614 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); 615 616 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); 617 } 618 else 619 fpinfo->user = NULL; 620 621 /* 622 * Identify which baserestrictinfo clauses can be sent to the remote 623 * server and which can't. 624 */ 625 classifyConditions(root, baserel, baserel->baserestrictinfo, 626 &fpinfo->remote_conds, &fpinfo->local_conds); 627 628 /* 629 * Identify which attributes will need to be retrieved from the remote 630 * server. These include all attrs needed for joins or final output, plus 631 * all attrs used in the local_conds. (Note: if we end up using a 632 * parameterized scan, it's possible that some of the join clauses will be 633 * sent to the remote and thus we wouldn't really need to retrieve the 634 * columns used in them. Doesn't seem worth detecting that case though.) 635 */ 636 fpinfo->attrs_used = NULL; 637 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, 638 &fpinfo->attrs_used); 639 foreach(lc, fpinfo->local_conds) 640 { 641 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); 642 643 pull_varattnos((Node *) rinfo->clause, baserel->relid, 644 &fpinfo->attrs_used); 645 } 646 647 /* 648 * Compute the selectivity and cost of the local_conds, so we don't have 649 * to do it over again for each path. The best we can do for these 650 * conditions is to estimate selectivity on the basis of local statistics. 651 */ 652 fpinfo->local_conds_sel = clauselist_selectivity(root, 653 fpinfo->local_conds, 654 baserel->relid, 655 JOIN_INNER, 656 NULL); 657 658 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); 659 660 /* 661 * Set # of retrieved rows and cached relation costs to some negative 662 * value, so that we can detect when they are set to some sensible values, 663 * during one (usually the first) of the calls to estimate_path_cost_size. 664 */ 665 fpinfo->retrieved_rows = -1; 666 fpinfo->rel_startup_cost = -1; 667 fpinfo->rel_total_cost = -1; 668 669 /* 670 * If the table or the server is configured to use remote estimates, 671 * connect to the foreign server and execute EXPLAIN to estimate the 672 * number of rows selected by the restriction clauses, as well as the 673 * average row width. Otherwise, estimate using whatever statistics we 674 * have locally, in a way similar to ordinary tables. 675 */ 676 if (fpinfo->use_remote_estimate) 677 { 678 /* 679 * Get cost/size estimates with help of remote server. Save the 680 * values in fpinfo so we don't need to do it again to generate the 681 * basic foreign path. 682 */ 683 estimate_path_cost_size(root, baserel, NIL, NIL, NULL, 684 &fpinfo->rows, &fpinfo->width, 685 &fpinfo->startup_cost, &fpinfo->total_cost); 686 687 /* Report estimated baserel size to planner. */ 688 baserel->rows = fpinfo->rows; 689 baserel->reltarget->width = fpinfo->width; 690 } 691 else 692 { 693 /* 694 * If the foreign table has never been ANALYZEd, it will have relpages 695 * and reltuples equal to zero, which most likely has nothing to do 696 * with reality. We can't do a whole lot about that if we're not 697 * allowed to consult the remote server, but we can use a hack similar 698 * to plancat.c's treatment of empty relations: use a minimum size 699 * estimate of 10 pages, and divide by the column-datatype-based width 700 * estimate to get the corresponding number of tuples. 701 */ 702 if (baserel->pages == 0 && baserel->tuples == 0) 703 { 704 baserel->pages = 10; 705 baserel->tuples = 706 (10 * BLCKSZ) / (baserel->reltarget->width + 707 MAXALIGN(SizeofHeapTupleHeader)); 708 } 709 710 /* Estimate baserel size as best we can with local statistics. */ 711 set_baserel_size_estimates(root, baserel); 712 713 /* Fill in basically-bogus cost estimates for use later. */ 714 estimate_path_cost_size(root, baserel, NIL, NIL, NULL, 715 &fpinfo->rows, &fpinfo->width, 716 &fpinfo->startup_cost, &fpinfo->total_cost); 717 } 718 719 /* 720 * Set the name of relation in fpinfo, while we are constructing it here. 721 * It will be used to build the string describing the join relation in 722 * EXPLAIN output. We can't know whether VERBOSE option is specified or 723 * not, so always schema-qualify the foreign table name. 724 */ 725 fpinfo->relation_name = makeStringInfo(); 726 namespace = get_namespace_name(get_rel_namespace(foreigntableid)); 727 relname = get_rel_name(foreigntableid); 728 refname = rte->eref->aliasname; 729 appendStringInfo(fpinfo->relation_name, "%s.%s", 730 quote_identifier(namespace), 731 quote_identifier(relname)); 732 if (*refname && strcmp(refname, relname) != 0) 733 appendStringInfo(fpinfo->relation_name, " %s", 734 quote_identifier(rte->eref->aliasname)); 735 736 /* No outer and inner relations. */ 737 fpinfo->make_outerrel_subquery = false; 738 fpinfo->make_innerrel_subquery = false; 739 fpinfo->lower_subquery_rels = NULL; 740 /* Set the relation index. */ 741 fpinfo->relation_index = baserel->relid; 742 } 743 744 /* 745 * get_useful_ecs_for_relation 746 * Determine which EquivalenceClasses might be involved in useful 747 * orderings of this relation. 748 * 749 * This function is in some respects a mirror image of the core function 750 * pathkeys_useful_for_merging: for a regular table, we know what indexes 751 * we have and want to test whether any of them are useful. For a foreign 752 * table, we don't know what indexes are present on the remote side but 753 * want to speculate about which ones we'd like to use if they existed. 754 * 755 * This function returns a list of potentially-useful equivalence classes, 756 * but it does not guarantee that an EquivalenceMember exists which contains 757 * Vars only from the given relation. For example, given ft1 JOIN t1 ON 758 * ft1.x + t1.x = 0, this function will say that the equivalence class 759 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and 760 * t1 is local (or on a different server), it will turn out that no useful 761 * ORDER BY clause can be generated. It's not our job to figure that out 762 * here; we're only interested in identifying relevant ECs. 763 */ 764 static List * 765 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel) 766 { 767 List *useful_eclass_list = NIL; 768 ListCell *lc; 769 Relids relids; 770 771 /* 772 * First, consider whether any active EC is potentially useful for a merge 773 * join against this relation. 774 */ 775 if (rel->has_eclass_joins) 776 { 777 foreach(lc, root->eq_classes) 778 { 779 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc); 780 781 if (eclass_useful_for_merging(root, cur_ec, rel)) 782 useful_eclass_list = lappend(useful_eclass_list, cur_ec); 783 } 784 } 785 786 /* 787 * Next, consider whether there are any non-EC derivable join clauses that 788 * are merge-joinable. If the joininfo list is empty, we can exit 789 * quickly. 790 */ 791 if (rel->joininfo == NIL) 792 return useful_eclass_list; 793 794 /* If this is a child rel, we must use the topmost parent rel to search. */ 795 if (IS_OTHER_REL(rel)) 796 { 797 Assert(!bms_is_empty(rel->top_parent_relids)); 798 relids = rel->top_parent_relids; 799 } 800 else 801 relids = rel->relids; 802 803 /* Check each join clause in turn. */ 804 foreach(lc, rel->joininfo) 805 { 806 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc); 807 808 /* Consider only mergejoinable clauses */ 809 if (restrictinfo->mergeopfamilies == NIL) 810 continue; 811 812 /* Make sure we've got canonical ECs. */ 813 update_mergeclause_eclasses(root, restrictinfo); 814 815 /* 816 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee 817 * that left_ec and right_ec will be initialized, per comments in 818 * distribute_qual_to_rels. 819 * 820 * We want to identify which side of this merge-joinable clause 821 * contains columns from the relation produced by this RelOptInfo. We 822 * test for overlap, not containment, because there could be extra 823 * relations on either side. For example, suppose we've got something 824 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON 825 * A.y = D.y. The input rel might be the joinrel between A and B, and 826 * we'll consider the join clause A.y = D.y. relids contains a 827 * relation not involved in the join class (B) and the equivalence 828 * class for the left-hand side of the clause contains a relation not 829 * involved in the input rel (C). Despite the fact that we have only 830 * overlap and not containment in either direction, A.y is potentially 831 * useful as a sort column. 832 * 833 * Note that it's even possible that relids overlaps neither side of 834 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x 835 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list, 836 * but overlaps neither side of B. In that case, we just skip this 837 * join clause, since it doesn't suggest a useful sort order for this 838 * relation. 839 */ 840 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids)) 841 useful_eclass_list = list_append_unique_ptr(useful_eclass_list, 842 restrictinfo->right_ec); 843 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids)) 844 useful_eclass_list = list_append_unique_ptr(useful_eclass_list, 845 restrictinfo->left_ec); 846 } 847 848 return useful_eclass_list; 849 } 850 851 /* 852 * get_useful_pathkeys_for_relation 853 * Determine which orderings of a relation might be useful. 854 * 855 * Getting data in sorted order can be useful either because the requested 856 * order matches the final output ordering for the overall query we're 857 * planning, or because it enables an efficient merge join. Here, we try 858 * to figure out which pathkeys to consider. 859 */ 860 static List * 861 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) 862 { 863 List *useful_pathkeys_list = NIL; 864 List *useful_eclass_list; 865 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; 866 EquivalenceClass *query_ec = NULL; 867 ListCell *lc; 868 869 /* 870 * Pushing the query_pathkeys to the remote server is always worth 871 * considering, because it might let us avoid a local sort. 872 */ 873 fpinfo->qp_is_pushdown_safe = false; 874 if (root->query_pathkeys) 875 { 876 bool query_pathkeys_ok = true; 877 878 foreach(lc, root->query_pathkeys) 879 { 880 PathKey *pathkey = (PathKey *) lfirst(lc); 881 EquivalenceClass *pathkey_ec = pathkey->pk_eclass; 882 Expr *em_expr; 883 884 /* 885 * The planner and executor don't have any clever strategy for 886 * taking data sorted by a prefix of the query's pathkeys and 887 * getting it to be sorted by all of those pathkeys. We'll just 888 * end up resorting the entire data set. So, unless we can push 889 * down all of the query pathkeys, forget it. 890 * 891 * is_foreign_expr would detect volatile expressions as well, but 892 * checking ec_has_volatile here saves some cycles. 893 */ 894 if (pathkey_ec->ec_has_volatile || 895 !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) || 896 !is_foreign_expr(root, rel, em_expr)) 897 { 898 query_pathkeys_ok = false; 899 break; 900 } 901 } 902 903 if (query_pathkeys_ok) 904 { 905 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys)); 906 fpinfo->qp_is_pushdown_safe = true; 907 } 908 } 909 910 /* 911 * Even if we're not using remote estimates, having the remote side do the 912 * sort generally won't be any worse than doing it locally, and it might 913 * be much better if the remote side can generate data in the right order 914 * without needing a sort at all. However, what we're going to do next is 915 * try to generate pathkeys that seem promising for possible merge joins, 916 * and that's more speculative. A wrong choice might hurt quite a bit, so 917 * bail out if we can't use remote estimates. 918 */ 919 if (!fpinfo->use_remote_estimate) 920 return useful_pathkeys_list; 921 922 /* Get the list of interesting EquivalenceClasses. */ 923 useful_eclass_list = get_useful_ecs_for_relation(root, rel); 924 925 /* Extract unique EC for query, if any, so we don't consider it again. */ 926 if (list_length(root->query_pathkeys) == 1) 927 { 928 PathKey *query_pathkey = linitial(root->query_pathkeys); 929 930 query_ec = query_pathkey->pk_eclass; 931 } 932 933 /* 934 * As a heuristic, the only pathkeys we consider here are those of length 935 * one. It's surely possible to consider more, but since each one we 936 * choose to consider will generate a round-trip to the remote side, we 937 * need to be a bit cautious here. It would sure be nice to have a local 938 * cache of information about remote index definitions... 939 */ 940 foreach(lc, useful_eclass_list) 941 { 942 EquivalenceClass *cur_ec = lfirst(lc); 943 Expr *em_expr; 944 PathKey *pathkey; 945 946 /* If redundant with what we did above, skip it. */ 947 if (cur_ec == query_ec) 948 continue; 949 950 /* If no pushable expression for this rel, skip it. */ 951 em_expr = find_em_expr_for_rel(cur_ec, rel); 952 if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr)) 953 continue; 954 955 /* Looks like we can generate a pathkey, so let's do it. */ 956 pathkey = make_canonical_pathkey(root, cur_ec, 957 linitial_oid(cur_ec->ec_opfamilies), 958 BTLessStrategyNumber, 959 false); 960 useful_pathkeys_list = lappend(useful_pathkeys_list, 961 list_make1(pathkey)); 962 } 963 964 return useful_pathkeys_list; 965 } 966 967 /* 968 * postgresGetForeignPaths 969 * Create possible scan paths for a scan on the foreign table 970 */ 971 static void 972 postgresGetForeignPaths(PlannerInfo *root, 973 RelOptInfo *baserel, 974 Oid foreigntableid) 975 { 976 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; 977 ForeignPath *path; 978 List *ppi_list; 979 ListCell *lc; 980 981 /* 982 * Create simplest ForeignScan path node and add it to baserel. This path 983 * corresponds to SeqScan path of regular tables (though depending on what 984 * baserestrict conditions we were able to send to remote, there might 985 * actually be an indexscan happening there). We already did all the work 986 * to estimate cost and size of this path. 987 * 988 * Although this path uses no join clauses, it could still have required 989 * parameterization due to LATERAL refs in its tlist. 990 */ 991 path = create_foreignscan_path(root, baserel, 992 NULL, /* default pathtarget */ 993 fpinfo->rows, 994 fpinfo->startup_cost, 995 fpinfo->total_cost, 996 NIL, /* no pathkeys */ 997 baserel->lateral_relids, 998 NULL, /* no extra plan */ 999 NIL); /* no fdw_private list */ 1000 add_path(baserel, (Path *) path); 1001 1002 /* Add paths with pathkeys */ 1003 add_paths_with_pathkeys_for_rel(root, baserel, NULL); 1004 1005 /* 1006 * If we're not using remote estimates, stop here. We have no way to 1007 * estimate whether any join clauses would be worth sending across, so 1008 * don't bother building parameterized paths. 1009 */ 1010 if (!fpinfo->use_remote_estimate) 1011 return; 1012 1013 /* 1014 * Thumb through all join clauses for the rel to identify which outer 1015 * relations could supply one or more safe-to-send-to-remote join clauses. 1016 * We'll build a parameterized path for each such outer relation. 1017 * 1018 * It's convenient to manage this by representing each candidate outer 1019 * relation by the ParamPathInfo node for it. We can then use the 1020 * ppi_clauses list in the ParamPathInfo node directly as a list of the 1021 * interesting join clauses for that rel. This takes care of the 1022 * possibility that there are multiple safe join clauses for such a rel, 1023 * and also ensures that we account for unsafe join clauses that we'll 1024 * still have to enforce locally (since the parameterized-path machinery 1025 * insists that we handle all movable clauses). 1026 */ 1027 ppi_list = NIL; 1028 foreach(lc, baserel->joininfo) 1029 { 1030 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); 1031 Relids required_outer; 1032 ParamPathInfo *param_info; 1033 1034 /* Check if clause can be moved to this rel */ 1035 if (!join_clause_is_movable_to(rinfo, baserel)) 1036 continue; 1037 1038 /* See if it is safe to send to remote */ 1039 if (!is_foreign_expr(root, baserel, rinfo->clause)) 1040 continue; 1041 1042 /* Calculate required outer rels for the resulting path */ 1043 required_outer = bms_union(rinfo->clause_relids, 1044 baserel->lateral_relids); 1045 /* We do not want the foreign rel itself listed in required_outer */ 1046 required_outer = bms_del_member(required_outer, baserel->relid); 1047 1048 /* 1049 * required_outer probably can't be empty here, but if it were, we 1050 * couldn't make a parameterized path. 1051 */ 1052 if (bms_is_empty(required_outer)) 1053 continue; 1054 1055 /* Get the ParamPathInfo */ 1056 param_info = get_baserel_parampathinfo(root, baserel, 1057 required_outer); 1058 Assert(param_info != NULL); 1059 1060 /* 1061 * Add it to list unless we already have it. Testing pointer equality 1062 * is OK since get_baserel_parampathinfo won't make duplicates. 1063 */ 1064 ppi_list = list_append_unique_ptr(ppi_list, param_info); 1065 } 1066 1067 /* 1068 * The above scan examined only "generic" join clauses, not those that 1069 * were absorbed into EquivalenceClauses. See if we can make anything out 1070 * of EquivalenceClauses. 1071 */ 1072 if (baserel->has_eclass_joins) 1073 { 1074 /* 1075 * We repeatedly scan the eclass list looking for column references 1076 * (or expressions) belonging to the foreign rel. Each time we find 1077 * one, we generate a list of equivalence joinclauses for it, and then 1078 * see if any are safe to send to the remote. Repeat till there are 1079 * no more candidate EC members. 1080 */ 1081 ec_member_foreign_arg arg; 1082 1083 arg.already_used = NIL; 1084 for (;;) 1085 { 1086 List *clauses; 1087 1088 /* Make clauses, skipping any that join to lateral_referencers */ 1089 arg.current = NULL; 1090 clauses = generate_implied_equalities_for_column(root, 1091 baserel, 1092 ec_member_matches_foreign, 1093 (void *) &arg, 1094 baserel->lateral_referencers); 1095 1096 /* Done if there are no more expressions in the foreign rel */ 1097 if (arg.current == NULL) 1098 { 1099 Assert(clauses == NIL); 1100 break; 1101 } 1102 1103 /* Scan the extracted join clauses */ 1104 foreach(lc, clauses) 1105 { 1106 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); 1107 Relids required_outer; 1108 ParamPathInfo *param_info; 1109 1110 /* Check if clause can be moved to this rel */ 1111 if (!join_clause_is_movable_to(rinfo, baserel)) 1112 continue; 1113 1114 /* See if it is safe to send to remote */ 1115 if (!is_foreign_expr(root, baserel, rinfo->clause)) 1116 continue; 1117 1118 /* Calculate required outer rels for the resulting path */ 1119 required_outer = bms_union(rinfo->clause_relids, 1120 baserel->lateral_relids); 1121 required_outer = bms_del_member(required_outer, baserel->relid); 1122 if (bms_is_empty(required_outer)) 1123 continue; 1124 1125 /* Get the ParamPathInfo */ 1126 param_info = get_baserel_parampathinfo(root, baserel, 1127 required_outer); 1128 Assert(param_info != NULL); 1129 1130 /* Add it to list unless we already have it */ 1131 ppi_list = list_append_unique_ptr(ppi_list, param_info); 1132 } 1133 1134 /* Try again, now ignoring the expression we found this time */ 1135 arg.already_used = lappend(arg.already_used, arg.current); 1136 } 1137 } 1138 1139 /* 1140 * Now build a path for each useful outer relation. 1141 */ 1142 foreach(lc, ppi_list) 1143 { 1144 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc); 1145 double rows; 1146 int width; 1147 Cost startup_cost; 1148 Cost total_cost; 1149 1150 /* Get a cost estimate from the remote */ 1151 estimate_path_cost_size(root, baserel, 1152 param_info->ppi_clauses, NIL, NULL, 1153 &rows, &width, 1154 &startup_cost, &total_cost); 1155 1156 /* 1157 * ppi_rows currently won't get looked at by anything, but still we 1158 * may as well ensure that it matches our idea of the rowcount. 1159 */ 1160 param_info->ppi_rows = rows; 1161 1162 /* Make the path */ 1163 path = create_foreignscan_path(root, baserel, 1164 NULL, /* default pathtarget */ 1165 rows, 1166 startup_cost, 1167 total_cost, 1168 NIL, /* no pathkeys */ 1169 param_info->ppi_req_outer, 1170 NULL, 1171 NIL); /* no fdw_private list */ 1172 add_path(baserel, (Path *) path); 1173 } 1174 } 1175 1176 /* 1177 * postgresGetForeignPlan 1178 * Create ForeignScan plan node which implements selected best path 1179 */ 1180 static ForeignScan * 1181 postgresGetForeignPlan(PlannerInfo *root, 1182 RelOptInfo *foreignrel, 1183 Oid foreigntableid, 1184 ForeignPath *best_path, 1185 List *tlist, 1186 List *scan_clauses, 1187 Plan *outer_plan) 1188 { 1189 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; 1190 Index scan_relid; 1191 List *fdw_private; 1192 List *remote_exprs = NIL; 1193 List *local_exprs = NIL; 1194 List *params_list = NIL; 1195 List *fdw_scan_tlist = NIL; 1196 List *fdw_recheck_quals = NIL; 1197 List *retrieved_attrs; 1198 StringInfoData sql; 1199 bool has_final_sort = false; 1200 bool has_limit = false; 1201 ListCell *lc; 1202 1203 /* 1204 * Get FDW private data created by postgresGetForeignUpperPaths(), if any. 1205 */ 1206 if (best_path->fdw_private) 1207 { 1208 has_final_sort = intVal(list_nth(best_path->fdw_private, 1209 FdwPathPrivateHasFinalSort)); 1210 has_limit = intVal(list_nth(best_path->fdw_private, 1211 FdwPathPrivateHasLimit)); 1212 } 1213 1214 if (IS_SIMPLE_REL(foreignrel)) 1215 { 1216 /* 1217 * For base relations, set scan_relid as the relid of the relation. 1218 */ 1219 scan_relid = foreignrel->relid; 1220 1221 /* 1222 * In a base-relation scan, we must apply the given scan_clauses. 1223 * 1224 * Separate the scan_clauses into those that can be executed remotely 1225 * and those that can't. baserestrictinfo clauses that were 1226 * previously determined to be safe or unsafe by classifyConditions 1227 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything 1228 * else in the scan_clauses list will be a join clause, which we have 1229 * to check for remote-safety. 1230 * 1231 * Note: the join clauses we see here should be the exact same ones 1232 * previously examined by postgresGetForeignPaths. Possibly it'd be 1233 * worth passing forward the classification work done then, rather 1234 * than repeating it here. 1235 * 1236 * This code must match "extract_actual_clauses(scan_clauses, false)" 1237 * except for the additional decision about remote versus local 1238 * execution. 1239 */ 1240 foreach(lc, scan_clauses) 1241 { 1242 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); 1243 1244 /* Ignore any pseudoconstants, they're dealt with elsewhere */ 1245 if (rinfo->pseudoconstant) 1246 continue; 1247 1248 if (list_member_ptr(fpinfo->remote_conds, rinfo)) 1249 remote_exprs = lappend(remote_exprs, rinfo->clause); 1250 else if (list_member_ptr(fpinfo->local_conds, rinfo)) 1251 local_exprs = lappend(local_exprs, rinfo->clause); 1252 else if (is_foreign_expr(root, foreignrel, rinfo->clause)) 1253 remote_exprs = lappend(remote_exprs, rinfo->clause); 1254 else 1255 local_exprs = lappend(local_exprs, rinfo->clause); 1256 } 1257 1258 /* 1259 * For a base-relation scan, we have to support EPQ recheck, which 1260 * should recheck all the remote quals. 1261 */ 1262 fdw_recheck_quals = remote_exprs; 1263 } 1264 else 1265 { 1266 /* 1267 * Join relation or upper relation - set scan_relid to 0. 1268 */ 1269 scan_relid = 0; 1270 1271 /* 1272 * For a join rel, baserestrictinfo is NIL and we are not considering 1273 * parameterization right now, so there should be no scan_clauses for 1274 * a joinrel or an upper rel either. 1275 */ 1276 Assert(!scan_clauses); 1277 1278 /* 1279 * Instead we get the conditions to apply from the fdw_private 1280 * structure. 1281 */ 1282 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false); 1283 local_exprs = extract_actual_clauses(fpinfo->local_conds, false); 1284 1285 /* 1286 * We leave fdw_recheck_quals empty in this case, since we never need 1287 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ 1288 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths(). 1289 * If we're planning an upperrel (ie, remote grouping or aggregation) 1290 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be 1291 * allowed, and indeed we *can't* put the remote clauses into 1292 * fdw_recheck_quals because the unaggregated Vars won't be available 1293 * locally. 1294 */ 1295 1296 /* Build the list of columns to be fetched from the foreign server. */ 1297 fdw_scan_tlist = build_tlist_to_deparse(foreignrel); 1298 1299 /* 1300 * Ensure that the outer plan produces a tuple whose descriptor 1301 * matches our scan tuple slot. Also, remove the local conditions 1302 * from outer plan's quals, lest they be evaluated twice, once by the 1303 * local plan and once by the scan. 1304 */ 1305 if (outer_plan) 1306 { 1307 ListCell *lc; 1308 1309 /* 1310 * Right now, we only consider grouping and aggregation beyond 1311 * joins. Queries involving aggregates or grouping do not require 1312 * EPQ mechanism, hence should not have an outer plan here. 1313 */ 1314 Assert(!IS_UPPER_REL(foreignrel)); 1315 1316 /* 1317 * First, update the plan's qual list if possible. In some cases 1318 * the quals might be enforced below the topmost plan level, in 1319 * which case we'll fail to remove them; it's not worth working 1320 * harder than this. 1321 */ 1322 foreach(lc, local_exprs) 1323 { 1324 Node *qual = lfirst(lc); 1325 1326 outer_plan->qual = list_delete(outer_plan->qual, qual); 1327 1328 /* 1329 * For an inner join the local conditions of foreign scan plan 1330 * can be part of the joinquals as well. (They might also be 1331 * in the mergequals or hashquals, but we can't touch those 1332 * without breaking the plan.) 1333 */ 1334 if (IsA(outer_plan, NestLoop) || 1335 IsA(outer_plan, MergeJoin) || 1336 IsA(outer_plan, HashJoin)) 1337 { 1338 Join *join_plan = (Join *) outer_plan; 1339 1340 if (join_plan->jointype == JOIN_INNER) 1341 join_plan->joinqual = list_delete(join_plan->joinqual, 1342 qual); 1343 } 1344 } 1345 1346 /* 1347 * Now fix the subplan's tlist --- this might result in inserting 1348 * a Result node atop the plan tree. 1349 */ 1350 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist, 1351 best_path->path.parallel_safe); 1352 } 1353 } 1354 1355 /* 1356 * Build the query string to be sent for execution, and identify 1357 * expressions to be sent as parameters. 1358 */ 1359 initStringInfo(&sql); 1360 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, 1361 remote_exprs, best_path->path.pathkeys, 1362 has_final_sort, has_limit, false, 1363 &retrieved_attrs, ¶ms_list); 1364 1365 /* Remember remote_exprs for possible use by postgresPlanDirectModify */ 1366 fpinfo->final_remote_exprs = remote_exprs; 1367 1368 /* 1369 * Build the fdw_private list that will be available to the executor. 1370 * Items in the list must match order in enum FdwScanPrivateIndex. 1371 */ 1372 fdw_private = list_make3(makeString(sql.data), 1373 retrieved_attrs, 1374 makeInteger(fpinfo->fetch_size)); 1375 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) 1376 fdw_private = lappend(fdw_private, 1377 makeString(fpinfo->relation_name->data)); 1378 1379 /* 1380 * Create the ForeignScan node for the given relation. 1381 * 1382 * Note that the remote parameter expressions are stored in the fdw_exprs 1383 * field of the finished plan node; we can't keep them in private state 1384 * because then they wouldn't be subject to later planner processing. 1385 */ 1386 return make_foreignscan(tlist, 1387 local_exprs, 1388 scan_relid, 1389 params_list, 1390 fdw_private, 1391 fdw_scan_tlist, 1392 fdw_recheck_quals, 1393 outer_plan); 1394 } 1395 1396 /* 1397 * postgresBeginForeignScan 1398 * Initiate an executor scan of a foreign PostgreSQL table. 1399 */ 1400 static void 1401 postgresBeginForeignScan(ForeignScanState *node, int eflags) 1402 { 1403 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; 1404 EState *estate = node->ss.ps.state; 1405 PgFdwScanState *fsstate; 1406 RangeTblEntry *rte; 1407 Oid userid; 1408 ForeignTable *table; 1409 UserMapping *user; 1410 int rtindex; 1411 int numParams; 1412 1413 /* 1414 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. 1415 */ 1416 if (eflags & EXEC_FLAG_EXPLAIN_ONLY) 1417 return; 1418 1419 /* 1420 * We'll save private state in node->fdw_state. 1421 */ 1422 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); 1423 node->fdw_state = (void *) fsstate; 1424 1425 /* 1426 * Identify which user to do the remote access as. This should match what 1427 * ExecCheckRTEPerms() does. In case of a join or aggregate, use the 1428 * lowest-numbered member RTE as a representative; we would get the same 1429 * result from any. 1430 */ 1431 if (fsplan->scan.scanrelid > 0) 1432 rtindex = fsplan->scan.scanrelid; 1433 else 1434 rtindex = bms_next_member(fsplan->fs_relids, -1); 1435 rte = exec_rt_fetch(rtindex, estate); 1436 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); 1437 1438 /* Get info about foreign table. */ 1439 table = GetForeignTable(rte->relid); 1440 user = GetUserMapping(userid, table->serverid); 1441 1442 /* 1443 * Get connection to the foreign server. Connection manager will 1444 * establish new connection if necessary. 1445 */ 1446 fsstate->conn = GetConnection(user, false); 1447 1448 /* Assign a unique ID for my cursor */ 1449 fsstate->cursor_number = GetCursorNumber(fsstate->conn); 1450 fsstate->cursor_exists = false; 1451 1452 /* Get private info created by planner functions. */ 1453 fsstate->query = strVal(list_nth(fsplan->fdw_private, 1454 FdwScanPrivateSelectSql)); 1455 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, 1456 FdwScanPrivateRetrievedAttrs); 1457 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, 1458 FdwScanPrivateFetchSize)); 1459 1460 /* Create contexts for batches of tuples and per-tuple temp workspace. */ 1461 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, 1462 "postgres_fdw tuple data", 1463 ALLOCSET_DEFAULT_SIZES); 1464 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, 1465 "postgres_fdw temporary data", 1466 ALLOCSET_SMALL_SIZES); 1467 1468 /* 1469 * Get info we'll need for converting data fetched from the foreign server 1470 * into local representation and error reporting during that process. 1471 */ 1472 if (fsplan->scan.scanrelid > 0) 1473 { 1474 fsstate->rel = node->ss.ss_currentRelation; 1475 fsstate->tupdesc = RelationGetDescr(fsstate->rel); 1476 } 1477 else 1478 { 1479 fsstate->rel = NULL; 1480 fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; 1481 } 1482 1483 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); 1484 1485 /* 1486 * Prepare for processing of parameters used in remote query, if any. 1487 */ 1488 numParams = list_length(fsplan->fdw_exprs); 1489 fsstate->numParams = numParams; 1490 if (numParams > 0) 1491 prepare_query_params((PlanState *) node, 1492 fsplan->fdw_exprs, 1493 numParams, 1494 &fsstate->param_flinfo, 1495 &fsstate->param_exprs, 1496 &fsstate->param_values); 1497 } 1498 1499 /* 1500 * postgresIterateForeignScan 1501 * Retrieve next row from the result set, or clear tuple slot to indicate 1502 * EOF. 1503 */ 1504 static TupleTableSlot * 1505 postgresIterateForeignScan(ForeignScanState *node) 1506 { 1507 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; 1508 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; 1509 1510 /* 1511 * If this is the first call after Begin or ReScan, we need to create the 1512 * cursor on the remote side. 1513 */ 1514 if (!fsstate->cursor_exists) 1515 create_cursor(node); 1516 1517 /* 1518 * Get some more tuples, if we've run out. 1519 */ 1520 if (fsstate->next_tuple >= fsstate->num_tuples) 1521 { 1522 /* No point in another fetch if we already detected EOF, though. */ 1523 if (!fsstate->eof_reached) 1524 fetch_more_data(node); 1525 /* If we didn't get any tuples, must be end of data. */ 1526 if (fsstate->next_tuple >= fsstate->num_tuples) 1527 return ExecClearTuple(slot); 1528 } 1529 1530 /* 1531 * Return the next tuple. 1532 */ 1533 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++], 1534 slot, 1535 false); 1536 1537 return slot; 1538 } 1539 1540 /* 1541 * postgresReScanForeignScan 1542 * Restart the scan. 1543 */ 1544 static void 1545 postgresReScanForeignScan(ForeignScanState *node) 1546 { 1547 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; 1548 char sql[64]; 1549 PGresult *res; 1550 1551 /* If we haven't created the cursor yet, nothing to do. */ 1552 if (!fsstate->cursor_exists) 1553 return; 1554 1555 /* 1556 * If any internal parameters affecting this node have changed, we'd 1557 * better destroy and recreate the cursor. Otherwise, rewinding it should 1558 * be good enough. If we've only fetched zero or one batch, we needn't 1559 * even rewind the cursor, just rescan what we have. 1560 */ 1561 if (node->ss.ps.chgParam != NULL) 1562 { 1563 fsstate->cursor_exists = false; 1564 snprintf(sql, sizeof(sql), "CLOSE c%u", 1565 fsstate->cursor_number); 1566 } 1567 else if (fsstate->fetch_ct_2 > 1) 1568 { 1569 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", 1570 fsstate->cursor_number); 1571 } 1572 else 1573 { 1574 /* Easy: just rescan what we already have in memory, if anything */ 1575 fsstate->next_tuple = 0; 1576 return; 1577 } 1578 1579 /* 1580 * We don't use a PG_TRY block here, so be careful not to throw error 1581 * without releasing the PGresult. 1582 */ 1583 res = pgfdw_exec_query(fsstate->conn, sql); 1584 if (PQresultStatus(res) != PGRES_COMMAND_OK) 1585 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); 1586 PQclear(res); 1587 1588 /* Now force a fresh FETCH. */ 1589 fsstate->tuples = NULL; 1590 fsstate->num_tuples = 0; 1591 fsstate->next_tuple = 0; 1592 fsstate->fetch_ct_2 = 0; 1593 fsstate->eof_reached = false; 1594 } 1595 1596 /* 1597 * postgresEndForeignScan 1598 * Finish scanning foreign table and dispose objects used for this scan 1599 */ 1600 static void 1601 postgresEndForeignScan(ForeignScanState *node) 1602 { 1603 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; 1604 1605 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ 1606 if (fsstate == NULL) 1607 return; 1608 1609 /* Close the cursor if open, to prevent accumulation of cursors */ 1610 if (fsstate->cursor_exists) 1611 close_cursor(fsstate->conn, fsstate->cursor_number); 1612 1613 /* Release remote connection */ 1614 ReleaseConnection(fsstate->conn); 1615 fsstate->conn = NULL; 1616 1617 /* MemoryContexts will be deleted automatically. */ 1618 } 1619 1620 /* 1621 * postgresAddForeignUpdateTargets 1622 * Add resjunk column(s) needed for update/delete on a foreign table 1623 */ 1624 static void 1625 postgresAddForeignUpdateTargets(Query *parsetree, 1626 RangeTblEntry *target_rte, 1627 Relation target_relation) 1628 { 1629 Var *var; 1630 const char *attrname; 1631 TargetEntry *tle; 1632 1633 /* 1634 * In postgres_fdw, what we need is the ctid, same as for a regular table. 1635 */ 1636 1637 /* Make a Var representing the desired value */ 1638 var = makeVar(parsetree->resultRelation, 1639 SelfItemPointerAttributeNumber, 1640 TIDOID, 1641 -1, 1642 InvalidOid, 1643 0); 1644 1645 /* Wrap it in a resjunk TLE with the right name ... */ 1646 attrname = "ctid"; 1647 1648 tle = makeTargetEntry((Expr *) var, 1649 list_length(parsetree->targetList) + 1, 1650 pstrdup(attrname), 1651 true); 1652 1653 /* ... and add it to the query's targetlist */ 1654 parsetree->targetList = lappend(parsetree->targetList, tle); 1655 } 1656 1657 /* 1658 * postgresPlanForeignModify 1659 * Plan an insert/update/delete operation on a foreign table 1660 */ 1661 static List * 1662 postgresPlanForeignModify(PlannerInfo *root, 1663 ModifyTable *plan, 1664 Index resultRelation, 1665 int subplan_index) 1666 { 1667 CmdType operation = plan->operation; 1668 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); 1669 Relation rel; 1670 StringInfoData sql; 1671 List *targetAttrs = NIL; 1672 List *withCheckOptionList = NIL; 1673 List *returningList = NIL; 1674 List *retrieved_attrs = NIL; 1675 bool doNothing = false; 1676 1677 initStringInfo(&sql); 1678 1679 /* 1680 * Core code already has some lock on each rel being planned, so we can 1681 * use NoLock here. 1682 */ 1683 rel = table_open(rte->relid, NoLock); 1684 1685 /* 1686 * In an INSERT, we transmit all columns that are defined in the foreign 1687 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the 1688 * foreign table, we transmit all columns like INSERT; else we transmit 1689 * only columns that were explicitly targets of the UPDATE, so as to avoid 1690 * unnecessary data transmission. (We can't do that for INSERT since we 1691 * would miss sending default values for columns not listed in the source 1692 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since 1693 * those triggers might change values for non-target columns, in which 1694 * case we would miss sending changed values for those columns.) 1695 */ 1696 if (operation == CMD_INSERT || 1697 (operation == CMD_UPDATE && 1698 rel->trigdesc && 1699 rel->trigdesc->trig_update_before_row)) 1700 { 1701 TupleDesc tupdesc = RelationGetDescr(rel); 1702 int attnum; 1703 1704 for (attnum = 1; attnum <= tupdesc->natts; attnum++) 1705 { 1706 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); 1707 1708 if (!attr->attisdropped) 1709 targetAttrs = lappend_int(targetAttrs, attnum); 1710 } 1711 } 1712 else if (operation == CMD_UPDATE) 1713 { 1714 int col; 1715 Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols); 1716 1717 col = -1; 1718 while ((col = bms_next_member(allUpdatedCols, col)) >= 0) 1719 { 1720 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ 1721 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; 1722 1723 if (attno <= InvalidAttrNumber) /* shouldn't happen */ 1724 elog(ERROR, "system-column update is not supported"); 1725 targetAttrs = lappend_int(targetAttrs, attno); 1726 } 1727 } 1728 1729 /* 1730 * Extract the relevant WITH CHECK OPTION list if any. 1731 */ 1732 if (plan->withCheckOptionLists) 1733 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists, 1734 subplan_index); 1735 1736 /* 1737 * Extract the relevant RETURNING list if any. 1738 */ 1739 if (plan->returningLists) 1740 returningList = (List *) list_nth(plan->returningLists, subplan_index); 1741 1742 /* 1743 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification 1744 * should have already been rejected in the optimizer, as presently there 1745 * is no way to recognize an arbiter index on a foreign table. Only DO 1746 * NOTHING is supported without an inference specification. 1747 */ 1748 if (plan->onConflictAction == ONCONFLICT_NOTHING) 1749 doNothing = true; 1750 else if (plan->onConflictAction != ONCONFLICT_NONE) 1751 elog(ERROR, "unexpected ON CONFLICT specification: %d", 1752 (int) plan->onConflictAction); 1753 1754 /* 1755 * Construct the SQL command string. 1756 */ 1757 switch (operation) 1758 { 1759 case CMD_INSERT: 1760 deparseInsertSql(&sql, rte, resultRelation, rel, 1761 targetAttrs, doNothing, 1762 withCheckOptionList, returningList, 1763 &retrieved_attrs); 1764 break; 1765 case CMD_UPDATE: 1766 deparseUpdateSql(&sql, rte, resultRelation, rel, 1767 targetAttrs, 1768 withCheckOptionList, returningList, 1769 &retrieved_attrs); 1770 break; 1771 case CMD_DELETE: 1772 deparseDeleteSql(&sql, rte, resultRelation, rel, 1773 returningList, 1774 &retrieved_attrs); 1775 break; 1776 default: 1777 elog(ERROR, "unexpected operation: %d", (int) operation); 1778 break; 1779 } 1780 1781 table_close(rel, NoLock); 1782 1783 /* 1784 * Build the fdw_private list that will be available to the executor. 1785 * Items in the list must match enum FdwModifyPrivateIndex, above. 1786 */ 1787 return list_make4(makeString(sql.data), 1788 targetAttrs, 1789 makeInteger((retrieved_attrs != NIL)), 1790 retrieved_attrs); 1791 } 1792 1793 /* 1794 * postgresBeginForeignModify 1795 * Begin an insert/update/delete operation on a foreign table 1796 */ 1797 static void 1798 postgresBeginForeignModify(ModifyTableState *mtstate, 1799 ResultRelInfo *resultRelInfo, 1800 List *fdw_private, 1801 int subplan_index, 1802 int eflags) 1803 { 1804 PgFdwModifyState *fmstate; 1805 char *query; 1806 List *target_attrs; 1807 bool has_returning; 1808 List *retrieved_attrs; 1809 RangeTblEntry *rte; 1810 1811 /* 1812 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState 1813 * stays NULL. 1814 */ 1815 if (eflags & EXEC_FLAG_EXPLAIN_ONLY) 1816 return; 1817 1818 /* Deconstruct fdw_private data. */ 1819 query = strVal(list_nth(fdw_private, 1820 FdwModifyPrivateUpdateSql)); 1821 target_attrs = (List *) list_nth(fdw_private, 1822 FdwModifyPrivateTargetAttnums); 1823 has_returning = intVal(list_nth(fdw_private, 1824 FdwModifyPrivateHasReturning)); 1825 retrieved_attrs = (List *) list_nth(fdw_private, 1826 FdwModifyPrivateRetrievedAttrs); 1827 1828 /* Find RTE. */ 1829 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, 1830 mtstate->ps.state); 1831 1832 /* Construct an execution state. */ 1833 fmstate = create_foreign_modify(mtstate->ps.state, 1834 rte, 1835 resultRelInfo, 1836 mtstate->operation, 1837 mtstate->mt_plans[subplan_index]->plan, 1838 query, 1839 target_attrs, 1840 has_returning, 1841 retrieved_attrs); 1842 1843 resultRelInfo->ri_FdwState = fmstate; 1844 } 1845 1846 /* 1847 * postgresExecForeignInsert 1848 * Insert one row into a foreign table 1849 */ 1850 static TupleTableSlot * 1851 postgresExecForeignInsert(EState *estate, 1852 ResultRelInfo *resultRelInfo, 1853 TupleTableSlot *slot, 1854 TupleTableSlot *planSlot) 1855 { 1856 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; 1857 TupleTableSlot *rslot; 1858 1859 /* 1860 * If the fmstate has aux_fmstate set, use the aux_fmstate (see 1861 * postgresBeginForeignInsert()) 1862 */ 1863 if (fmstate->aux_fmstate) 1864 resultRelInfo->ri_FdwState = fmstate->aux_fmstate; 1865 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, 1866 slot, planSlot); 1867 /* Revert that change */ 1868 if (fmstate->aux_fmstate) 1869 resultRelInfo->ri_FdwState = fmstate; 1870 1871 return rslot; 1872 } 1873 1874 /* 1875 * postgresExecForeignUpdate 1876 * Update one row in a foreign table 1877 */ 1878 static TupleTableSlot * 1879 postgresExecForeignUpdate(EState *estate, 1880 ResultRelInfo *resultRelInfo, 1881 TupleTableSlot *slot, 1882 TupleTableSlot *planSlot) 1883 { 1884 return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, 1885 slot, planSlot); 1886 } 1887 1888 /* 1889 * postgresExecForeignDelete 1890 * Delete one row from a foreign table 1891 */ 1892 static TupleTableSlot * 1893 postgresExecForeignDelete(EState *estate, 1894 ResultRelInfo *resultRelInfo, 1895 TupleTableSlot *slot, 1896 TupleTableSlot *planSlot) 1897 { 1898 return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, 1899 slot, planSlot); 1900 } 1901 1902 /* 1903 * postgresEndForeignModify 1904 * Finish an insert/update/delete operation on a foreign table 1905 */ 1906 static void 1907 postgresEndForeignModify(EState *estate, 1908 ResultRelInfo *resultRelInfo) 1909 { 1910 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; 1911 1912 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ 1913 if (fmstate == NULL) 1914 return; 1915 1916 /* Destroy the execution state */ 1917 finish_foreign_modify(fmstate); 1918 } 1919 1920 /* 1921 * postgresBeginForeignInsert 1922 * Begin an insert operation on a foreign table 1923 */ 1924 static void 1925 postgresBeginForeignInsert(ModifyTableState *mtstate, 1926 ResultRelInfo *resultRelInfo) 1927 { 1928 PgFdwModifyState *fmstate; 1929 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan); 1930 EState *estate = mtstate->ps.state; 1931 Index resultRelation; 1932 Relation rel = resultRelInfo->ri_RelationDesc; 1933 RangeTblEntry *rte; 1934 TupleDesc tupdesc = RelationGetDescr(rel); 1935 int attnum; 1936 StringInfoData sql; 1937 List *targetAttrs = NIL; 1938 List *retrieved_attrs = NIL; 1939 bool doNothing = false; 1940 1941 /* 1942 * If the foreign table we are about to insert routed rows into is also an 1943 * UPDATE subplan result rel that will be updated later, proceeding with 1944 * the INSERT will result in the later UPDATE incorrectly modifying those 1945 * routed rows, so prevent the INSERT --- it would be nice if we could 1946 * handle this case; but for now, throw an error for safety. 1947 */ 1948 if (plan && plan->operation == CMD_UPDATE && 1949 (resultRelInfo->ri_usesFdwDirectModify || 1950 resultRelInfo->ri_FdwState) && 1951 resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan) 1952 ereport(ERROR, 1953 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 1954 errmsg("cannot route tuples into foreign table to be updated \"%s\"", 1955 RelationGetRelationName(rel)))); 1956 1957 initStringInfo(&sql); 1958 1959 /* We transmit all columns that are defined in the foreign table. */ 1960 for (attnum = 1; attnum <= tupdesc->natts; attnum++) 1961 { 1962 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); 1963 1964 if (!attr->attisdropped) 1965 targetAttrs = lappend_int(targetAttrs, attnum); 1966 } 1967 1968 /* Check if we add the ON CONFLICT clause to the remote query. */ 1969 if (plan) 1970 { 1971 OnConflictAction onConflictAction = plan->onConflictAction; 1972 1973 /* We only support DO NOTHING without an inference specification. */ 1974 if (onConflictAction == ONCONFLICT_NOTHING) 1975 doNothing = true; 1976 else if (onConflictAction != ONCONFLICT_NONE) 1977 elog(ERROR, "unexpected ON CONFLICT specification: %d", 1978 (int) onConflictAction); 1979 } 1980 1981 /* 1982 * If the foreign table is a partition that doesn't have a corresponding 1983 * RTE entry, we need to create a new RTE 1984 * describing the foreign table for use by deparseInsertSql and 1985 * create_foreign_modify() below, after first copying the parent's RTE and 1986 * modifying some fields to describe the foreign partition to work on. 1987 * However, if this is invoked by UPDATE, the existing RTE may already 1988 * correspond to this partition if it is one of the UPDATE subplan target 1989 * rels; in that case, we can just use the existing RTE as-is. 1990 */ 1991 if (resultRelInfo->ri_RangeTableIndex == 0) 1992 { 1993 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo; 1994 1995 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate); 1996 rte = copyObject(rte); 1997 rte->relid = RelationGetRelid(rel); 1998 rte->relkind = RELKIND_FOREIGN_TABLE; 1999 2000 /* 2001 * For UPDATE, we must use the RT index of the first subplan target 2002 * rel's RTE, because the core code would have built expressions for 2003 * the partition, such as RETURNING, using that RT index as varno of 2004 * Vars contained in those expressions. 2005 */ 2006 if (plan && plan->operation == CMD_UPDATE && 2007 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation) 2008 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex; 2009 else 2010 resultRelation = rootResultRelInfo->ri_RangeTableIndex; 2011 } 2012 else 2013 { 2014 resultRelation = resultRelInfo->ri_RangeTableIndex; 2015 rte = exec_rt_fetch(resultRelation, estate); 2016 } 2017 2018 /* Construct the SQL command string. */ 2019 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, 2020 resultRelInfo->ri_WithCheckOptions, 2021 resultRelInfo->ri_returningList, 2022 &retrieved_attrs); 2023 2024 /* Construct an execution state. */ 2025 fmstate = create_foreign_modify(mtstate->ps.state, 2026 rte, 2027 resultRelInfo, 2028 CMD_INSERT, 2029 NULL, 2030 sql.data, 2031 targetAttrs, 2032 retrieved_attrs != NIL, 2033 retrieved_attrs); 2034 2035 /* 2036 * If the given resultRelInfo already has PgFdwModifyState set, it means 2037 * the foreign table is an UPDATE subplan result rel; in which case, store 2038 * the resulting state into the aux_fmstate of the PgFdwModifyState. 2039 */ 2040 if (resultRelInfo->ri_FdwState) 2041 { 2042 Assert(plan && plan->operation == CMD_UPDATE); 2043 Assert(resultRelInfo->ri_usesFdwDirectModify == false); 2044 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate; 2045 } 2046 else 2047 resultRelInfo->ri_FdwState = fmstate; 2048 } 2049 2050 /* 2051 * postgresEndForeignInsert 2052 * Finish an insert operation on a foreign table 2053 */ 2054 static void 2055 postgresEndForeignInsert(EState *estate, 2056 ResultRelInfo *resultRelInfo) 2057 { 2058 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; 2059 2060 Assert(fmstate != NULL); 2061 2062 /* 2063 * If the fmstate has aux_fmstate set, get the aux_fmstate (see 2064 * postgresBeginForeignInsert()) 2065 */ 2066 if (fmstate->aux_fmstate) 2067 fmstate = fmstate->aux_fmstate; 2068 2069 /* Destroy the execution state */ 2070 finish_foreign_modify(fmstate); 2071 } 2072 2073 /* 2074 * postgresIsForeignRelUpdatable 2075 * Determine whether a foreign table supports INSERT, UPDATE and/or 2076 * DELETE. 2077 */ 2078 static int 2079 postgresIsForeignRelUpdatable(Relation rel) 2080 { 2081 bool updatable; 2082 ForeignTable *table; 2083 ForeignServer *server; 2084 ListCell *lc; 2085 2086 /* 2087 * By default, all postgres_fdw foreign tables are assumed updatable. This 2088 * can be overridden by a per-server setting, which in turn can be 2089 * overridden by a per-table setting. 2090 */ 2091 updatable = true; 2092 2093 table = GetForeignTable(RelationGetRelid(rel)); 2094 server = GetForeignServer(table->serverid); 2095 2096 foreach(lc, server->options) 2097 { 2098 DefElem *def = (DefElem *) lfirst(lc); 2099 2100 if (strcmp(def->defname, "updatable") == 0) 2101 updatable = defGetBoolean(def); 2102 } 2103 foreach(lc, table->options) 2104 { 2105 DefElem *def = (DefElem *) lfirst(lc); 2106 2107 if (strcmp(def->defname, "updatable") == 0) 2108 updatable = defGetBoolean(def); 2109 } 2110 2111 /* 2112 * Currently "updatable" means support for INSERT, UPDATE and DELETE. 2113 */ 2114 return updatable ? 2115 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0; 2116 } 2117 2118 /* 2119 * postgresRecheckForeignScan 2120 * Execute a local join execution plan for a foreign join 2121 */ 2122 static bool 2123 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) 2124 { 2125 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid; 2126 PlanState *outerPlan = outerPlanState(node); 2127 TupleTableSlot *result; 2128 2129 /* For base foreign relations, it suffices to set fdw_recheck_quals */ 2130 if (scanrelid > 0) 2131 return true; 2132 2133 Assert(outerPlan != NULL); 2134 2135 /* Execute a local join execution plan */ 2136 result = ExecProcNode(outerPlan); 2137 if (TupIsNull(result)) 2138 return false; 2139 2140 /* Store result in the given slot */ 2141 ExecCopySlot(slot, result); 2142 2143 return true; 2144 } 2145 2146 /* 2147 * postgresPlanDirectModify 2148 * Consider a direct foreign table modification 2149 * 2150 * Decide whether it is safe to modify a foreign table directly, and if so, 2151 * rewrite subplan accordingly. 2152 */ 2153 static bool 2154 postgresPlanDirectModify(PlannerInfo *root, 2155 ModifyTable *plan, 2156 Index resultRelation, 2157 int subplan_index) 2158 { 2159 CmdType operation = plan->operation; 2160 Plan *subplan; 2161 RelOptInfo *foreignrel; 2162 RangeTblEntry *rte; 2163 PgFdwRelationInfo *fpinfo; 2164 Relation rel; 2165 StringInfoData sql; 2166 ForeignScan *fscan; 2167 List *targetAttrs = NIL; 2168 List *remote_exprs; 2169 List *params_list = NIL; 2170 List *returningList = NIL; 2171 List *retrieved_attrs = NIL; 2172 2173 /* 2174 * Decide whether it is safe to modify a foreign table directly. 2175 */ 2176 2177 /* 2178 * The table modification must be an UPDATE or DELETE. 2179 */ 2180 if (operation != CMD_UPDATE && operation != CMD_DELETE) 2181 return false; 2182 2183 /* 2184 * It's unsafe to modify a foreign table directly if there are any local 2185 * joins needed. 2186 */ 2187 subplan = (Plan *) list_nth(plan->plans, subplan_index); 2188 if (!IsA(subplan, ForeignScan)) 2189 return false; 2190 fscan = (ForeignScan *) subplan; 2191 2192 /* 2193 * It's unsafe to modify a foreign table directly if there are any quals 2194 * that should be evaluated locally. 2195 */ 2196 if (subplan->qual != NIL) 2197 return false; 2198 2199 /* Safe to fetch data about the target foreign rel */ 2200 if (fscan->scan.scanrelid == 0) 2201 { 2202 foreignrel = find_join_rel(root, fscan->fs_relids); 2203 /* We should have a rel for this foreign join. */ 2204 Assert(foreignrel); 2205 } 2206 else 2207 foreignrel = root->simple_rel_array[resultRelation]; 2208 rte = root->simple_rte_array[resultRelation]; 2209 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; 2210 2211 /* 2212 * It's unsafe to update a foreign table directly, if any expressions to 2213 * assign to the target columns are unsafe to evaluate remotely. 2214 */ 2215 if (operation == CMD_UPDATE) 2216 { 2217 int col; 2218 2219 /* 2220 * We transmit only columns that were explicitly targets of the 2221 * UPDATE, so as to avoid unnecessary data transmission. 2222 */ 2223 col = -1; 2224 while ((col = bms_next_member(rte->updatedCols, col)) >= 0) 2225 { 2226 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ 2227 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; 2228 TargetEntry *tle; 2229 2230 if (attno <= InvalidAttrNumber) /* shouldn't happen */ 2231 elog(ERROR, "system-column update is not supported"); 2232 2233 tle = get_tle_by_resno(subplan->targetlist, attno); 2234 2235 if (!tle) 2236 elog(ERROR, "attribute number %d not found in subplan targetlist", 2237 attno); 2238 2239 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr)) 2240 return false; 2241 2242 targetAttrs = lappend_int(targetAttrs, attno); 2243 } 2244 } 2245 2246 /* 2247 * Ok, rewrite subplan so as to modify the foreign table directly. 2248 */ 2249 initStringInfo(&sql); 2250 2251 /* 2252 * Core code already has some lock on each rel being planned, so we can 2253 * use NoLock here. 2254 */ 2255 rel = table_open(rte->relid, NoLock); 2256 2257 /* 2258 * Recall the qual clauses that must be evaluated remotely. (These are 2259 * bare clauses not RestrictInfos, but deparse.c's appendConditions() 2260 * doesn't care.) 2261 */ 2262 remote_exprs = fpinfo->final_remote_exprs; 2263 2264 /* 2265 * Extract the relevant RETURNING list if any. 2266 */ 2267 if (plan->returningLists) 2268 { 2269 returningList = (List *) list_nth(plan->returningLists, subplan_index); 2270 2271 /* 2272 * When performing an UPDATE/DELETE .. RETURNING on a join directly, 2273 * we fetch from the foreign server any Vars specified in RETURNING 2274 * that refer not only to the target relation but to non-target 2275 * relations. So we'll deparse them into the RETURNING clause of the 2276 * remote query; use a targetlist consisting of them instead, which 2277 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan 2278 * node below. 2279 */ 2280 if (fscan->scan.scanrelid == 0) 2281 returningList = build_remote_returning(resultRelation, rel, 2282 returningList); 2283 } 2284 2285 /* 2286 * Construct the SQL command string. 2287 */ 2288 switch (operation) 2289 { 2290 case CMD_UPDATE: 2291 deparseDirectUpdateSql(&sql, root, resultRelation, rel, 2292 foreignrel, 2293 ((Plan *) fscan)->targetlist, 2294 targetAttrs, 2295 remote_exprs, ¶ms_list, 2296 returningList, &retrieved_attrs); 2297 break; 2298 case CMD_DELETE: 2299 deparseDirectDeleteSql(&sql, root, resultRelation, rel, 2300 foreignrel, 2301 remote_exprs, ¶ms_list, 2302 returningList, &retrieved_attrs); 2303 break; 2304 default: 2305 elog(ERROR, "unexpected operation: %d", (int) operation); 2306 break; 2307 } 2308 2309 /* 2310 * Update the operation info. 2311 */ 2312 fscan->operation = operation; 2313 2314 /* 2315 * Update the fdw_exprs list that will be available to the executor. 2316 */ 2317 fscan->fdw_exprs = params_list; 2318 2319 /* 2320 * Update the fdw_private list that will be available to the executor. 2321 * Items in the list must match enum FdwDirectModifyPrivateIndex, above. 2322 */ 2323 fscan->fdw_private = list_make4(makeString(sql.data), 2324 makeInteger((retrieved_attrs != NIL)), 2325 retrieved_attrs, 2326 makeInteger(plan->canSetTag)); 2327 2328 /* 2329 * Update the foreign-join-related fields. 2330 */ 2331 if (fscan->scan.scanrelid == 0) 2332 { 2333 /* No need for the outer subplan. */ 2334 fscan->scan.plan.lefttree = NULL; 2335 2336 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */ 2337 if (returningList) 2338 rebuild_fdw_scan_tlist(fscan, returningList); 2339 } 2340 2341 table_close(rel, NoLock); 2342 return true; 2343 } 2344 2345 /* 2346 * postgresBeginDirectModify 2347 * Prepare a direct foreign table modification 2348 */ 2349 static void 2350 postgresBeginDirectModify(ForeignScanState *node, int eflags) 2351 { 2352 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; 2353 EState *estate = node->ss.ps.state; 2354 PgFdwDirectModifyState *dmstate; 2355 Index rtindex; 2356 RangeTblEntry *rte; 2357 Oid userid; 2358 ForeignTable *table; 2359 UserMapping *user; 2360 int numParams; 2361 2362 /* 2363 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. 2364 */ 2365 if (eflags & EXEC_FLAG_EXPLAIN_ONLY) 2366 return; 2367 2368 /* 2369 * We'll save private state in node->fdw_state. 2370 */ 2371 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState)); 2372 node->fdw_state = (void *) dmstate; 2373 2374 /* 2375 * Identify which user to do the remote access as. This should match what 2376 * ExecCheckRTEPerms() does. 2377 */ 2378 rtindex = estate->es_result_relation_info->ri_RangeTableIndex; 2379 rte = exec_rt_fetch(rtindex, estate); 2380 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); 2381 2382 /* Get info about foreign table. */ 2383 if (fsplan->scan.scanrelid == 0) 2384 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags); 2385 else 2386 dmstate->rel = node->ss.ss_currentRelation; 2387 table = GetForeignTable(RelationGetRelid(dmstate->rel)); 2388 user = GetUserMapping(userid, table->serverid); 2389 2390 /* 2391 * Get connection to the foreign server. Connection manager will 2392 * establish new connection if necessary. 2393 */ 2394 dmstate->conn = GetConnection(user, false); 2395 2396 /* Update the foreign-join-related fields. */ 2397 if (fsplan->scan.scanrelid == 0) 2398 { 2399 /* Save info about foreign table. */ 2400 dmstate->resultRel = dmstate->rel; 2401 2402 /* 2403 * Set dmstate->rel to NULL to teach get_returning_data() and 2404 * make_tuple_from_result_row() that columns fetched from the remote 2405 * server are described by fdw_scan_tlist of the foreign-scan plan 2406 * node, not the tuple descriptor for the target relation. 2407 */ 2408 dmstate->rel = NULL; 2409 } 2410 2411 /* Initialize state variable */ 2412 dmstate->num_tuples = -1; /* -1 means not set yet */ 2413 2414 /* Get private info created by planner functions. */ 2415 dmstate->query = strVal(list_nth(fsplan->fdw_private, 2416 FdwDirectModifyPrivateUpdateSql)); 2417 dmstate->has_returning = intVal(list_nth(fsplan->fdw_private, 2418 FdwDirectModifyPrivateHasReturning)); 2419 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, 2420 FdwDirectModifyPrivateRetrievedAttrs); 2421 dmstate->set_processed = intVal(list_nth(fsplan->fdw_private, 2422 FdwDirectModifyPrivateSetProcessed)); 2423 2424 /* Create context for per-tuple temp workspace. */ 2425 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, 2426 "postgres_fdw temporary data", 2427 ALLOCSET_SMALL_SIZES); 2428 2429 /* Prepare for input conversion of RETURNING results. */ 2430 if (dmstate->has_returning) 2431 { 2432 TupleDesc tupdesc; 2433 2434 if (fsplan->scan.scanrelid == 0) 2435 tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; 2436 else 2437 tupdesc = RelationGetDescr(dmstate->rel); 2438 2439 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); 2440 2441 /* 2442 * When performing an UPDATE/DELETE .. RETURNING on a join directly, 2443 * initialize a filter to extract an updated/deleted tuple from a scan 2444 * tuple. 2445 */ 2446 if (fsplan->scan.scanrelid == 0) 2447 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex); 2448 } 2449 2450 /* 2451 * Prepare for processing of parameters used in remote query, if any. 2452 */ 2453 numParams = list_length(fsplan->fdw_exprs); 2454 dmstate->numParams = numParams; 2455 if (numParams > 0) 2456 prepare_query_params((PlanState *) node, 2457 fsplan->fdw_exprs, 2458 numParams, 2459 &dmstate->param_flinfo, 2460 &dmstate->param_exprs, 2461 &dmstate->param_values); 2462 } 2463 2464 /* 2465 * postgresIterateDirectModify 2466 * Execute a direct foreign table modification 2467 */ 2468 static TupleTableSlot * 2469 postgresIterateDirectModify(ForeignScanState *node) 2470 { 2471 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; 2472 EState *estate = node->ss.ps.state; 2473 ResultRelInfo *resultRelInfo = estate->es_result_relation_info; 2474 2475 /* 2476 * If this is the first call after Begin, execute the statement. 2477 */ 2478 if (dmstate->num_tuples == -1) 2479 execute_dml_stmt(node); 2480 2481 /* 2482 * If the local query doesn't specify RETURNING, just clear tuple slot. 2483 */ 2484 if (!resultRelInfo->ri_projectReturning) 2485 { 2486 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; 2487 Instrumentation *instr = node->ss.ps.instrument; 2488 2489 Assert(!dmstate->has_returning); 2490 2491 /* Increment the command es_processed count if necessary. */ 2492 if (dmstate->set_processed) 2493 estate->es_processed += dmstate->num_tuples; 2494 2495 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */ 2496 if (instr) 2497 instr->tuplecount += dmstate->num_tuples; 2498 2499 return ExecClearTuple(slot); 2500 } 2501 2502 /* 2503 * Get the next RETURNING tuple. 2504 */ 2505 return get_returning_data(node); 2506 } 2507 2508 /* 2509 * postgresEndDirectModify 2510 * Finish a direct foreign table modification 2511 */ 2512 static void 2513 postgresEndDirectModify(ForeignScanState *node) 2514 { 2515 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; 2516 2517 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */ 2518 if (dmstate == NULL) 2519 return; 2520 2521 /* Release PGresult */ 2522 if (dmstate->result) 2523 PQclear(dmstate->result); 2524 2525 /* Release remote connection */ 2526 ReleaseConnection(dmstate->conn); 2527 dmstate->conn = NULL; 2528 2529 /* MemoryContext will be deleted automatically. */ 2530 } 2531 2532 /* 2533 * postgresExplainForeignScan 2534 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table 2535 */ 2536 static void 2537 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) 2538 { 2539 List *fdw_private; 2540 char *sql; 2541 char *relations; 2542 2543 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; 2544 2545 /* 2546 * Add names of relation handled by the foreign scan when the scan is a 2547 * join 2548 */ 2549 if (list_length(fdw_private) > FdwScanPrivateRelations) 2550 { 2551 relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); 2552 ExplainPropertyText("Relations", relations, es); 2553 } 2554 2555 /* 2556 * Add remote query, when VERBOSE option is specified. 2557 */ 2558 if (es->verbose) 2559 { 2560 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); 2561 ExplainPropertyText("Remote SQL", sql, es); 2562 } 2563 } 2564 2565 /* 2566 * postgresExplainForeignModify 2567 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table 2568 */ 2569 static void 2570 postgresExplainForeignModify(ModifyTableState *mtstate, 2571 ResultRelInfo *rinfo, 2572 List *fdw_private, 2573 int subplan_index, 2574 ExplainState *es) 2575 { 2576 if (es->verbose) 2577 { 2578 char *sql = strVal(list_nth(fdw_private, 2579 FdwModifyPrivateUpdateSql)); 2580 2581 ExplainPropertyText("Remote SQL", sql, es); 2582 } 2583 } 2584 2585 /* 2586 * postgresExplainDirectModify 2587 * Produce extra output for EXPLAIN of a ForeignScan that modifies a 2588 * foreign table directly 2589 */ 2590 static void 2591 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) 2592 { 2593 List *fdw_private; 2594 char *sql; 2595 2596 if (es->verbose) 2597 { 2598 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; 2599 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql)); 2600 ExplainPropertyText("Remote SQL", sql, es); 2601 } 2602 } 2603 2604 2605 /* 2606 * estimate_path_cost_size 2607 * Get cost and size estimates for a foreign scan on given foreign relation 2608 * either a base relation or a join between foreign relations or an upper 2609 * relation containing foreign relations. 2610 * 2611 * param_join_conds are the parameterization clauses with outer relations. 2612 * pathkeys specify the expected sort order if any for given path being costed. 2613 * fpextra specifies additional post-scan/join-processing steps such as the 2614 * final sort and the LIMIT restriction. 2615 * 2616 * The function returns the cost and size estimates in p_rows, p_width, 2617 * p_startup_cost and p_total_cost variables. 2618 */ 2619 static void 2620 estimate_path_cost_size(PlannerInfo *root, 2621 RelOptInfo *foreignrel, 2622 List *param_join_conds, 2623 List *pathkeys, 2624 PgFdwPathExtraData *fpextra, 2625 double *p_rows, int *p_width, 2626 Cost *p_startup_cost, Cost *p_total_cost) 2627 { 2628 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; 2629 double rows; 2630 double retrieved_rows; 2631 int width; 2632 Cost startup_cost; 2633 Cost total_cost; 2634 2635 /* Make sure the core code has set up the relation's reltarget */ 2636 Assert(foreignrel->reltarget); 2637 2638 /* 2639 * If the table or the server is configured to use remote estimates, 2640 * connect to the foreign server and execute EXPLAIN to estimate the 2641 * number of rows selected by the restriction+join clauses. Otherwise, 2642 * estimate rows using whatever statistics we have locally, in a way 2643 * similar to ordinary tables. 2644 */ 2645 if (fpinfo->use_remote_estimate) 2646 { 2647 List *remote_param_join_conds; 2648 List *local_param_join_conds; 2649 StringInfoData sql; 2650 PGconn *conn; 2651 Selectivity local_sel; 2652 QualCost local_cost; 2653 List *fdw_scan_tlist = NIL; 2654 List *remote_conds; 2655 2656 /* Required only to be passed to deparseSelectStmtForRel */ 2657 List *retrieved_attrs; 2658 2659 /* 2660 * param_join_conds might contain both clauses that are safe to send 2661 * across, and clauses that aren't. 2662 */ 2663 classifyConditions(root, foreignrel, param_join_conds, 2664 &remote_param_join_conds, &local_param_join_conds); 2665 2666 /* Build the list of columns to be fetched from the foreign server. */ 2667 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) 2668 fdw_scan_tlist = build_tlist_to_deparse(foreignrel); 2669 else 2670 fdw_scan_tlist = NIL; 2671 2672 /* 2673 * The complete list of remote conditions includes everything from 2674 * baserestrictinfo plus any extra join_conds relevant to this 2675 * particular path. 2676 */ 2677 remote_conds = list_concat(list_copy(remote_param_join_conds), 2678 fpinfo->remote_conds); 2679 2680 /* 2681 * Construct EXPLAIN query including the desired SELECT, FROM, and 2682 * WHERE clauses. Params and other-relation Vars are replaced by dummy 2683 * values, so don't request params_list. 2684 */ 2685 initStringInfo(&sql); 2686 appendStringInfoString(&sql, "EXPLAIN "); 2687 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, 2688 remote_conds, pathkeys, 2689 fpextra ? fpextra->has_final_sort : false, 2690 fpextra ? fpextra->has_limit : false, 2691 false, &retrieved_attrs, NULL); 2692 2693 /* Get the remote estimate */ 2694 conn = GetConnection(fpinfo->user, false); 2695 get_remote_estimate(sql.data, conn, &rows, &width, 2696 &startup_cost, &total_cost); 2697 ReleaseConnection(conn); 2698 2699 retrieved_rows = rows; 2700 2701 /* Factor in the selectivity of the locally-checked quals */ 2702 local_sel = clauselist_selectivity(root, 2703 local_param_join_conds, 2704 foreignrel->relid, 2705 JOIN_INNER, 2706 NULL); 2707 local_sel *= fpinfo->local_conds_sel; 2708 2709 rows = clamp_row_est(rows * local_sel); 2710 2711 /* Add in the eval cost of the locally-checked quals */ 2712 startup_cost += fpinfo->local_conds_cost.startup; 2713 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; 2714 cost_qual_eval(&local_cost, local_param_join_conds, root); 2715 startup_cost += local_cost.startup; 2716 total_cost += local_cost.per_tuple * retrieved_rows; 2717 2718 /* 2719 * Add in tlist eval cost for each output row. In case of an 2720 * aggregate, some of the tlist expressions such as grouping 2721 * expressions will be evaluated remotely, so adjust the costs. 2722 */ 2723 startup_cost += foreignrel->reltarget->cost.startup; 2724 total_cost += foreignrel->reltarget->cost.startup; 2725 total_cost += foreignrel->reltarget->cost.per_tuple * rows; 2726 if (IS_UPPER_REL(foreignrel)) 2727 { 2728 QualCost tlist_cost; 2729 2730 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root); 2731 startup_cost -= tlist_cost.startup; 2732 total_cost -= tlist_cost.startup; 2733 total_cost -= tlist_cost.per_tuple * rows; 2734 } 2735 } 2736 else 2737 { 2738 Cost run_cost = 0; 2739 2740 /* 2741 * We don't support join conditions in this mode (hence, no 2742 * parameterized paths can be made). 2743 */ 2744 Assert(param_join_conds == NIL); 2745 2746 /* 2747 * We will come here again and again with different set of pathkeys or 2748 * additional post-scan/join-processing steps that caller wants to 2749 * cost. We don't need to calculate the cost/size estimates for the 2750 * underlying scan, join, or grouping each time. Instead, use those 2751 * estimates if we have cached them already. 2752 */ 2753 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0) 2754 { 2755 Assert(fpinfo->retrieved_rows >= 1); 2756 2757 rows = fpinfo->rows; 2758 retrieved_rows = fpinfo->retrieved_rows; 2759 width = fpinfo->width; 2760 startup_cost = fpinfo->rel_startup_cost; 2761 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; 2762 2763 /* 2764 * If we estimate the costs of a foreign scan or a foreign join 2765 * with additional post-scan/join-processing steps, the scan or 2766 * join costs obtained from the cache wouldn't yet contain the 2767 * eval costs for the final scan/join target, which would've been 2768 * updated by apply_scanjoin_target_to_paths(); add the eval costs 2769 * now. 2770 */ 2771 if (fpextra && !IS_UPPER_REL(foreignrel)) 2772 { 2773 /* Shouldn't get here unless we have LIMIT */ 2774 Assert(fpextra->has_limit); 2775 Assert(foreignrel->reloptkind == RELOPT_BASEREL || 2776 foreignrel->reloptkind == RELOPT_JOINREL); 2777 startup_cost += foreignrel->reltarget->cost.startup; 2778 run_cost += foreignrel->reltarget->cost.per_tuple * rows; 2779 } 2780 } 2781 else if (IS_JOIN_REL(foreignrel)) 2782 { 2783 PgFdwRelationInfo *fpinfo_i; 2784 PgFdwRelationInfo *fpinfo_o; 2785 QualCost join_cost; 2786 QualCost remote_conds_cost; 2787 double nrows; 2788 2789 /* Use rows/width estimates made by the core code. */ 2790 rows = foreignrel->rows; 2791 width = foreignrel->reltarget->width; 2792 2793 /* For join we expect inner and outer relations set */ 2794 Assert(fpinfo->innerrel && fpinfo->outerrel); 2795 2796 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private; 2797 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; 2798 2799 /* Estimate of number of rows in cross product */ 2800 nrows = fpinfo_i->rows * fpinfo_o->rows; 2801 2802 /* 2803 * Back into an estimate of the number of retrieved rows. Just in 2804 * case this is nuts, clamp to at most nrow. 2805 */ 2806 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); 2807 retrieved_rows = Min(retrieved_rows, nrows); 2808 2809 /* 2810 * The cost of foreign join is estimated as cost of generating 2811 * rows for the joining relations + cost for applying quals on the 2812 * rows. 2813 */ 2814 2815 /* 2816 * Calculate the cost of clauses pushed down to the foreign server 2817 */ 2818 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); 2819 /* Calculate the cost of applying join clauses */ 2820 cost_qual_eval(&join_cost, fpinfo->joinclauses, root); 2821 2822 /* 2823 * Startup cost includes startup cost of joining relations and the 2824 * startup cost for join and other clauses. We do not include the 2825 * startup cost specific to join strategy (e.g. setting up hash 2826 * tables) since we do not know what strategy the foreign server 2827 * is going to use. 2828 */ 2829 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; 2830 startup_cost += join_cost.startup; 2831 startup_cost += remote_conds_cost.startup; 2832 startup_cost += fpinfo->local_conds_cost.startup; 2833 2834 /* 2835 * Run time cost includes: 2836 * 2837 * 1. Run time cost (total_cost - startup_cost) of relations being 2838 * joined 2839 * 2840 * 2. Run time cost of applying join clauses on the cross product 2841 * of the joining relations. 2842 * 2843 * 3. Run time cost of applying pushed down other clauses on the 2844 * result of join 2845 * 2846 * 4. Run time cost of applying nonpushable other clauses locally 2847 * on the result fetched from the foreign server. 2848 */ 2849 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; 2850 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; 2851 run_cost += nrows * join_cost.per_tuple; 2852 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); 2853 run_cost += nrows * remote_conds_cost.per_tuple; 2854 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; 2855 2856 /* Add in tlist eval cost for each output row */ 2857 startup_cost += foreignrel->reltarget->cost.startup; 2858 run_cost += foreignrel->reltarget->cost.per_tuple * rows; 2859 } 2860 else if (IS_UPPER_REL(foreignrel)) 2861 { 2862 RelOptInfo *outerrel = fpinfo->outerrel; 2863 PgFdwRelationInfo *ofpinfo; 2864 AggClauseCosts aggcosts; 2865 double input_rows; 2866 int numGroupCols; 2867 double numGroups = 1; 2868 2869 /* The upper relation should have its outer relation set */ 2870 Assert(outerrel); 2871 /* and that outer relation should have its reltarget set */ 2872 Assert(outerrel->reltarget); 2873 2874 /* 2875 * This cost model is mixture of costing done for sorted and 2876 * hashed aggregates in cost_agg(). We are not sure which 2877 * strategy will be considered at remote side, thus for 2878 * simplicity, we put all startup related costs in startup_cost 2879 * and all finalization and run cost are added in total_cost. 2880 */ 2881 2882 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private; 2883 2884 /* Get rows from input rel */ 2885 input_rows = ofpinfo->rows; 2886 2887 /* Collect statistics about aggregates for estimating costs. */ 2888 MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); 2889 if (root->parse->hasAggs) 2890 { 2891 get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist, 2892 AGGSPLIT_SIMPLE, &aggcosts); 2893 2894 /* 2895 * The cost of aggregates in the HAVING qual will be the same 2896 * for each child as it is for the parent, so there's no need 2897 * to use a translated version of havingQual. 2898 */ 2899 get_agg_clause_costs(root, (Node *) root->parse->havingQual, 2900 AGGSPLIT_SIMPLE, &aggcosts); 2901 } 2902 2903 /* Get number of grouping columns and possible number of groups */ 2904 numGroupCols = list_length(root->parse->groupClause); 2905 numGroups = estimate_num_groups(root, 2906 get_sortgrouplist_exprs(root->parse->groupClause, 2907 fpinfo->grouped_tlist), 2908 input_rows, NULL); 2909 2910 /* 2911 * Get the retrieved_rows and rows estimates. If there are HAVING 2912 * quals, account for their selectivity. 2913 */ 2914 if (root->parse->havingQual) 2915 { 2916 /* Factor in the selectivity of the remotely-checked quals */ 2917 retrieved_rows = 2918 clamp_row_est(numGroups * 2919 clauselist_selectivity(root, 2920 fpinfo->remote_conds, 2921 0, 2922 JOIN_INNER, 2923 NULL)); 2924 /* Factor in the selectivity of the locally-checked quals */ 2925 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel); 2926 } 2927 else 2928 { 2929 rows = retrieved_rows = numGroups; 2930 } 2931 2932 /* Use width estimate made by the core code. */ 2933 width = foreignrel->reltarget->width; 2934 2935 /*----- 2936 * Startup cost includes: 2937 * 1. Startup cost for underneath input relation, adjusted for 2938 * tlist replacement by apply_scanjoin_target_to_paths() 2939 * 2. Cost of performing aggregation, per cost_agg() 2940 *----- 2941 */ 2942 startup_cost = ofpinfo->rel_startup_cost; 2943 startup_cost += outerrel->reltarget->cost.startup; 2944 startup_cost += aggcosts.transCost.startup; 2945 startup_cost += aggcosts.transCost.per_tuple * input_rows; 2946 startup_cost += aggcosts.finalCost.startup; 2947 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; 2948 2949 /*----- 2950 * Run time cost includes: 2951 * 1. Run time cost of underneath input relation, adjusted for 2952 * tlist replacement by apply_scanjoin_target_to_paths() 2953 * 2. Run time cost of performing aggregation, per cost_agg() 2954 *----- 2955 */ 2956 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; 2957 run_cost += outerrel->reltarget->cost.per_tuple * input_rows; 2958 run_cost += aggcosts.finalCost.per_tuple * numGroups; 2959 run_cost += cpu_tuple_cost * numGroups; 2960 2961 /* Account for the eval cost of HAVING quals, if any */ 2962 if (root->parse->havingQual) 2963 { 2964 QualCost remote_cost; 2965 2966 /* Add in the eval cost of the remotely-checked quals */ 2967 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root); 2968 startup_cost += remote_cost.startup; 2969 run_cost += remote_cost.per_tuple * numGroups; 2970 /* Add in the eval cost of the locally-checked quals */ 2971 startup_cost += fpinfo->local_conds_cost.startup; 2972 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; 2973 } 2974 2975 /* Add in tlist eval cost for each output row */ 2976 startup_cost += foreignrel->reltarget->cost.startup; 2977 run_cost += foreignrel->reltarget->cost.per_tuple * rows; 2978 } 2979 else 2980 { 2981 Cost cpu_per_tuple; 2982 2983 /* Use rows/width estimates made by set_baserel_size_estimates. */ 2984 rows = foreignrel->rows; 2985 width = foreignrel->reltarget->width; 2986 2987 /* 2988 * Back into an estimate of the number of retrieved rows. Just in 2989 * case this is nuts, clamp to at most foreignrel->tuples. 2990 */ 2991 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); 2992 retrieved_rows = Min(retrieved_rows, foreignrel->tuples); 2993 2994 /* 2995 * Cost as though this were a seqscan, which is pessimistic. We 2996 * effectively imagine the local_conds are being evaluated 2997 * remotely, too. 2998 */ 2999 startup_cost = 0; 3000 run_cost = 0; 3001 run_cost += seq_page_cost * foreignrel->pages; 3002 3003 startup_cost += foreignrel->baserestrictcost.startup; 3004 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; 3005 run_cost += cpu_per_tuple * foreignrel->tuples; 3006 3007 /* Add in tlist eval cost for each output row */ 3008 startup_cost += foreignrel->reltarget->cost.startup; 3009 run_cost += foreignrel->reltarget->cost.per_tuple * rows; 3010 } 3011 3012 /* 3013 * Without remote estimates, we have no real way to estimate the cost 3014 * of generating sorted output. It could be free if the query plan 3015 * the remote side would have chosen generates properly-sorted output 3016 * anyway, but in most cases it will cost something. Estimate a value 3017 * high enough that we won't pick the sorted path when the ordering 3018 * isn't locally useful, but low enough that we'll err on the side of 3019 * pushing down the ORDER BY clause when it's useful to do so. 3020 */ 3021 if (pathkeys != NIL) 3022 { 3023 if (IS_UPPER_REL(foreignrel)) 3024 { 3025 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL && 3026 fpinfo->stage == UPPERREL_GROUP_AGG); 3027 adjust_foreign_grouping_path_cost(root, pathkeys, 3028 retrieved_rows, width, 3029 fpextra->limit_tuples, 3030 &startup_cost, &run_cost); 3031 } 3032 else 3033 { 3034 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER; 3035 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER; 3036 } 3037 } 3038 3039 total_cost = startup_cost + run_cost; 3040 3041 /* Adjust the cost estimates if we have LIMIT */ 3042 if (fpextra && fpextra->has_limit) 3043 { 3044 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost, 3045 fpextra->offset_est, fpextra->count_est); 3046 retrieved_rows = rows; 3047 } 3048 } 3049 3050 /* 3051 * If this includes the final sort step, the given target, which will be 3052 * applied to the resulting path, might have different expressions from 3053 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist 3054 * eval costs. 3055 */ 3056 if (fpextra && fpextra->has_final_sort && 3057 fpextra->target != foreignrel->reltarget) 3058 { 3059 QualCost oldcost = foreignrel->reltarget->cost; 3060 QualCost newcost = fpextra->target->cost; 3061 3062 startup_cost += newcost.startup - oldcost.startup; 3063 total_cost += newcost.startup - oldcost.startup; 3064 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows; 3065 } 3066 3067 /* 3068 * Cache the retrieved rows and cost estimates for scans, joins, or 3069 * groupings without any parameterization, pathkeys, or additional 3070 * post-scan/join-processing steps, before adding the costs for 3071 * transferring data from the foreign server. These estimates are useful 3072 * for costing remote joins involving this relation or costing other 3073 * remote operations on this relation such as remote sorts and remote 3074 * LIMIT restrictions, when the costs can not be obtained from the foreign 3075 * server. This function will be called at least once for every foreign 3076 * relation without any parameterization, pathkeys, or additional 3077 * post-scan/join-processing steps. 3078 */ 3079 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL) 3080 { 3081 fpinfo->retrieved_rows = retrieved_rows; 3082 fpinfo->rel_startup_cost = startup_cost; 3083 fpinfo->rel_total_cost = total_cost; 3084 } 3085 3086 /* 3087 * Add some additional cost factors to account for connection overhead 3088 * (fdw_startup_cost), transferring data across the network 3089 * (fdw_tuple_cost per retrieved row), and local manipulation of the data 3090 * (cpu_tuple_cost per retrieved row). 3091 */ 3092 startup_cost += fpinfo->fdw_startup_cost; 3093 total_cost += fpinfo->fdw_startup_cost; 3094 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; 3095 total_cost += cpu_tuple_cost * retrieved_rows; 3096 3097 /* 3098 * If we have LIMIT, we should prefer performing the restriction remotely 3099 * rather than locally, as the former avoids extra row fetches from the 3100 * remote that the latter might cause. But since the core code doesn't 3101 * account for such fetches when estimating the costs of the local 3102 * restriction (see create_limit_path()), there would be no difference 3103 * between the costs of the local restriction and the costs of the remote 3104 * restriction estimated above if we don't use remote estimates (except 3105 * for the case where the foreignrel is a grouping relation, the given 3106 * pathkeys is not NIL, and the effects of a bounded sort for that rel is 3107 * accounted for in costing the remote restriction). Tweak the costs of 3108 * the remote restriction to ensure we'll prefer it if LIMIT is a useful 3109 * one. 3110 */ 3111 if (!fpinfo->use_remote_estimate && 3112 fpextra && fpextra->has_limit && 3113 fpextra->limit_tuples > 0 && 3114 fpextra->limit_tuples < fpinfo->rows) 3115 { 3116 Assert(fpinfo->rows > 0); 3117 total_cost -= (total_cost - startup_cost) * 0.05 * 3118 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows; 3119 } 3120 3121 /* Return results. */ 3122 *p_rows = rows; 3123 *p_width = width; 3124 *p_startup_cost = startup_cost; 3125 *p_total_cost = total_cost; 3126 } 3127 3128 /* 3129 * Estimate costs of executing a SQL statement remotely. 3130 * The given "sql" must be an EXPLAIN command. 3131 */ 3132 static void 3133 get_remote_estimate(const char *sql, PGconn *conn, 3134 double *rows, int *width, 3135 Cost *startup_cost, Cost *total_cost) 3136 { 3137 PGresult *volatile res = NULL; 3138 3139 /* PGresult must be released before leaving this function. */ 3140 PG_TRY(); 3141 { 3142 char *line; 3143 char *p; 3144 int n; 3145 3146 /* 3147 * Execute EXPLAIN remotely. 3148 */ 3149 res = pgfdw_exec_query(conn, sql); 3150 if (PQresultStatus(res) != PGRES_TUPLES_OK) 3151 pgfdw_report_error(ERROR, res, conn, false, sql); 3152 3153 /* 3154 * Extract cost numbers for topmost plan node. Note we search for a 3155 * left paren from the end of the line to avoid being confused by 3156 * other uses of parentheses. 3157 */ 3158 line = PQgetvalue(res, 0, 0); 3159 p = strrchr(line, '('); 3160 if (p == NULL) 3161 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); 3162 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", 3163 startup_cost, total_cost, rows, width); 3164 if (n != 4) 3165 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); 3166 3167 PQclear(res); 3168 res = NULL; 3169 } 3170 PG_CATCH(); 3171 { 3172 if (res) 3173 PQclear(res); 3174 PG_RE_THROW(); 3175 } 3176 PG_END_TRY(); 3177 } 3178 3179 /* 3180 * Adjust the cost estimates of a foreign grouping path to include the cost of 3181 * generating properly-sorted output. 3182 */ 3183 static void 3184 adjust_foreign_grouping_path_cost(PlannerInfo *root, 3185 List *pathkeys, 3186 double retrieved_rows, 3187 double width, 3188 double limit_tuples, 3189 Cost *p_startup_cost, 3190 Cost *p_run_cost) 3191 { 3192 /* 3193 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote 3194 * side is unlikely to generate properly-sorted output, so it would need 3195 * an explicit sort; adjust the given costs with cost_sort(). Likewise, 3196 * if the GROUP BY clause is sort-able but isn't a superset of the given 3197 * pathkeys, adjust the costs with that function. Otherwise, adjust the 3198 * costs by applying the same heuristic as for the scan or join case. 3199 */ 3200 if (!grouping_is_sortable(root->parse->groupClause) || 3201 !pathkeys_contained_in(pathkeys, root->group_pathkeys)) 3202 { 3203 Path sort_path; /* dummy for result of cost_sort */ 3204 3205 cost_sort(&sort_path, 3206 root, 3207 pathkeys, 3208 *p_startup_cost + *p_run_cost, 3209 retrieved_rows, 3210 width, 3211 0.0, 3212 work_mem, 3213 limit_tuples); 3214 3215 *p_startup_cost = sort_path.startup_cost; 3216 *p_run_cost = sort_path.total_cost - sort_path.startup_cost; 3217 } 3218 else 3219 { 3220 /* 3221 * The default extra cost seems too large for foreign-grouping cases; 3222 * add 1/4th of that default. 3223 */ 3224 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER 3225 - 1.0) * 0.25; 3226 3227 *p_startup_cost *= sort_multiplier; 3228 *p_run_cost *= sort_multiplier; 3229 } 3230 } 3231 3232 /* 3233 * Detect whether we want to process an EquivalenceClass member. 3234 * 3235 * This is a callback for use by generate_implied_equalities_for_column. 3236 */ 3237 static bool 3238 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, 3239 EquivalenceClass *ec, EquivalenceMember *em, 3240 void *arg) 3241 { 3242 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg; 3243 Expr *expr = em->em_expr; 3244 3245 /* 3246 * If we've identified what we're processing in the current scan, we only 3247 * want to match that expression. 3248 */ 3249 if (state->current != NULL) 3250 return equal(expr, state->current); 3251 3252 /* 3253 * Otherwise, ignore anything we've already processed. 3254 */ 3255 if (list_member(state->already_used, expr)) 3256 return false; 3257 3258 /* This is the new target to process. */ 3259 state->current = expr; 3260 return true; 3261 } 3262 3263 /* 3264 * Create cursor for node's query with current parameter values. 3265 */ 3266 static void 3267 create_cursor(ForeignScanState *node) 3268 { 3269 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; 3270 ExprContext *econtext = node->ss.ps.ps_ExprContext; 3271 int numParams = fsstate->numParams; 3272 const char **values = fsstate->param_values; 3273 PGconn *conn = fsstate->conn; 3274 StringInfoData buf; 3275 PGresult *res; 3276 3277 /* 3278 * Construct array of query parameter values in text format. We do the 3279 * conversions in the short-lived per-tuple context, so as not to cause a 3280 * memory leak over repeated scans. 3281 */ 3282 if (numParams > 0) 3283 { 3284 MemoryContext oldcontext; 3285 3286 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 3287 3288 process_query_params(econtext, 3289 fsstate->param_flinfo, 3290 fsstate->param_exprs, 3291 values); 3292 3293 MemoryContextSwitchTo(oldcontext); 3294 } 3295 3296 /* Construct the DECLARE CURSOR command */ 3297 initStringInfo(&buf); 3298 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", 3299 fsstate->cursor_number, fsstate->query); 3300 3301 /* 3302 * Notice that we pass NULL for paramTypes, thus forcing the remote server 3303 * to infer types for all parameters. Since we explicitly cast every 3304 * parameter (see deparse.c), the "inference" is trivial and will produce 3305 * the desired result. This allows us to avoid assuming that the remote 3306 * server has the same OIDs we do for the parameters' types. 3307 */ 3308 if (!PQsendQueryParams(conn, buf.data, numParams, 3309 NULL, values, NULL, NULL, 0)) 3310 pgfdw_report_error(ERROR, NULL, conn, false, buf.data); 3311 3312 /* 3313 * Get the result, and check for success. 3314 * 3315 * We don't use a PG_TRY block here, so be careful not to throw error 3316 * without releasing the PGresult. 3317 */ 3318 res = pgfdw_get_result(conn, buf.data); 3319 if (PQresultStatus(res) != PGRES_COMMAND_OK) 3320 pgfdw_report_error(ERROR, res, conn, true, fsstate->query); 3321 PQclear(res); 3322 3323 /* Mark the cursor as created, and show no tuples have been retrieved */ 3324 fsstate->cursor_exists = true; 3325 fsstate->tuples = NULL; 3326 fsstate->num_tuples = 0; 3327 fsstate->next_tuple = 0; 3328 fsstate->fetch_ct_2 = 0; 3329 fsstate->eof_reached = false; 3330 3331 /* Clean up */ 3332 pfree(buf.data); 3333 } 3334 3335 /* 3336 * Fetch some more rows from the node's cursor. 3337 */ 3338 static void 3339 fetch_more_data(ForeignScanState *node) 3340 { 3341 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; 3342 PGresult *volatile res = NULL; 3343 MemoryContext oldcontext; 3344 3345 /* 3346 * We'll store the tuples in the batch_cxt. First, flush the previous 3347 * batch. 3348 */ 3349 fsstate->tuples = NULL; 3350 MemoryContextReset(fsstate->batch_cxt); 3351 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); 3352 3353 /* PGresult must be released before leaving this function. */ 3354 PG_TRY(); 3355 { 3356 PGconn *conn = fsstate->conn; 3357 char sql[64]; 3358 int numrows; 3359 int i; 3360 3361 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", 3362 fsstate->fetch_size, fsstate->cursor_number); 3363 3364 res = pgfdw_exec_query(conn, sql); 3365 /* On error, report the original query, not the FETCH. */ 3366 if (PQresultStatus(res) != PGRES_TUPLES_OK) 3367 pgfdw_report_error(ERROR, res, conn, false, fsstate->query); 3368 3369 /* Convert the data into HeapTuples */ 3370 numrows = PQntuples(res); 3371 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); 3372 fsstate->num_tuples = numrows; 3373 fsstate->next_tuple = 0; 3374 3375 for (i = 0; i < numrows; i++) 3376 { 3377 Assert(IsA(node->ss.ps.plan, ForeignScan)); 3378 3379 fsstate->tuples[i] = 3380 make_tuple_from_result_row(res, i, 3381 fsstate->rel, 3382 fsstate->attinmeta, 3383 fsstate->retrieved_attrs, 3384 node, 3385 fsstate->temp_cxt); 3386 } 3387 3388 /* Update fetch_ct_2 */ 3389 if (fsstate->fetch_ct_2 < 2) 3390 fsstate->fetch_ct_2++; 3391 3392 /* Must be EOF if we didn't get as many tuples as we asked for. */ 3393 fsstate->eof_reached = (numrows < fsstate->fetch_size); 3394 3395 PQclear(res); 3396 res = NULL; 3397 } 3398 PG_CATCH(); 3399 { 3400 if (res) 3401 PQclear(res); 3402 PG_RE_THROW(); 3403 } 3404 PG_END_TRY(); 3405 3406 MemoryContextSwitchTo(oldcontext); 3407 } 3408 3409 /* 3410 * Force assorted GUC parameters to settings that ensure that we'll output 3411 * data values in a form that is unambiguous to the remote server. 3412 * 3413 * This is rather expensive and annoying to do once per row, but there's 3414 * little choice if we want to be sure values are transmitted accurately; 3415 * we can't leave the settings in place between rows for fear of affecting 3416 * user-visible computations. 3417 * 3418 * We use the equivalent of a function SET option to allow the settings to 3419 * persist only until the caller calls reset_transmission_modes(). If an 3420 * error is thrown in between, guc.c will take care of undoing the settings. 3421 * 3422 * The return value is the nestlevel that must be passed to 3423 * reset_transmission_modes() to undo things. 3424 */ 3425 int 3426 set_transmission_modes(void) 3427 { 3428 int nestlevel = NewGUCNestLevel(); 3429 3430 /* 3431 * The values set here should match what pg_dump does. See also 3432 * configure_remote_session in connection.c. 3433 */ 3434 if (DateStyle != USE_ISO_DATES) 3435 (void) set_config_option("datestyle", "ISO", 3436 PGC_USERSET, PGC_S_SESSION, 3437 GUC_ACTION_SAVE, true, 0, false); 3438 if (IntervalStyle != INTSTYLE_POSTGRES) 3439 (void) set_config_option("intervalstyle", "postgres", 3440 PGC_USERSET, PGC_S_SESSION, 3441 GUC_ACTION_SAVE, true, 0, false); 3442 if (extra_float_digits < 3) 3443 (void) set_config_option("extra_float_digits", "3", 3444 PGC_USERSET, PGC_S_SESSION, 3445 GUC_ACTION_SAVE, true, 0, false); 3446 3447 return nestlevel; 3448 } 3449 3450 /* 3451 * Undo the effects of set_transmission_modes(). 3452 */ 3453 void 3454 reset_transmission_modes(int nestlevel) 3455 { 3456 AtEOXact_GUC(true, nestlevel); 3457 } 3458 3459 /* 3460 * Utility routine to close a cursor. 3461 */ 3462 static void 3463 close_cursor(PGconn *conn, unsigned int cursor_number) 3464 { 3465 char sql[64]; 3466 PGresult *res; 3467 3468 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); 3469 3470 /* 3471 * We don't use a PG_TRY block here, so be careful not to throw error 3472 * without releasing the PGresult. 3473 */ 3474 res = pgfdw_exec_query(conn, sql); 3475 if (PQresultStatus(res) != PGRES_COMMAND_OK) 3476 pgfdw_report_error(ERROR, res, conn, true, sql); 3477 PQclear(res); 3478 } 3479 3480 /* 3481 * create_foreign_modify 3482 * Construct an execution state of a foreign insert/update/delete 3483 * operation 3484 */ 3485 static PgFdwModifyState * 3486 create_foreign_modify(EState *estate, 3487 RangeTblEntry *rte, 3488 ResultRelInfo *resultRelInfo, 3489 CmdType operation, 3490 Plan *subplan, 3491 char *query, 3492 List *target_attrs, 3493 bool has_returning, 3494 List *retrieved_attrs) 3495 { 3496 PgFdwModifyState *fmstate; 3497 Relation rel = resultRelInfo->ri_RelationDesc; 3498 TupleDesc tupdesc = RelationGetDescr(rel); 3499 Oid userid; 3500 ForeignTable *table; 3501 UserMapping *user; 3502 AttrNumber n_params; 3503 Oid typefnoid; 3504 bool isvarlena; 3505 ListCell *lc; 3506 3507 /* Begin constructing PgFdwModifyState. */ 3508 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); 3509 fmstate->rel = rel; 3510 3511 /* 3512 * Identify which user to do the remote access as. This should match what 3513 * ExecCheckRTEPerms() does. 3514 */ 3515 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); 3516 3517 /* Get info about foreign table. */ 3518 table = GetForeignTable(RelationGetRelid(rel)); 3519 user = GetUserMapping(userid, table->serverid); 3520 3521 /* Open connection; report that we'll create a prepared statement. */ 3522 fmstate->conn = GetConnection(user, true); 3523 fmstate->p_name = NULL; /* prepared statement not made yet */ 3524 3525 /* Set up remote query information. */ 3526 fmstate->query = query; 3527 fmstate->target_attrs = target_attrs; 3528 fmstate->has_returning = has_returning; 3529 fmstate->retrieved_attrs = retrieved_attrs; 3530 3531 /* Create context for per-tuple temp workspace. */ 3532 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, 3533 "postgres_fdw temporary data", 3534 ALLOCSET_SMALL_SIZES); 3535 3536 /* Prepare for input conversion of RETURNING results. */ 3537 if (fmstate->has_returning) 3538 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); 3539 3540 /* Prepare for output conversion of parameters used in prepared stmt. */ 3541 n_params = list_length(fmstate->target_attrs) + 1; 3542 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); 3543 fmstate->p_nums = 0; 3544 3545 if (operation == CMD_UPDATE || operation == CMD_DELETE) 3546 { 3547 Assert(subplan != NULL); 3548 3549 /* Find the ctid resjunk column in the subplan's result */ 3550 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, 3551 "ctid"); 3552 if (!AttributeNumberIsValid(fmstate->ctidAttno)) 3553 elog(ERROR, "could not find junk ctid column"); 3554 3555 /* First transmittable parameter will be ctid */ 3556 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); 3557 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); 3558 fmstate->p_nums++; 3559 } 3560 3561 if (operation == CMD_INSERT || operation == CMD_UPDATE) 3562 { 3563 /* Set up for remaining transmittable parameters */ 3564 foreach(lc, fmstate->target_attrs) 3565 { 3566 int attnum = lfirst_int(lc); 3567 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); 3568 3569 Assert(!attr->attisdropped); 3570 3571 /* Ignore generated columns; they are set to DEFAULT */ 3572 if (attr->attgenerated) 3573 continue; 3574 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); 3575 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); 3576 fmstate->p_nums++; 3577 } 3578 } 3579 3580 Assert(fmstate->p_nums <= n_params); 3581 3582 /* Initialize auxiliary state */ 3583 fmstate->aux_fmstate = NULL; 3584 3585 return fmstate; 3586 } 3587 3588 /* 3589 * execute_foreign_modify 3590 * Perform foreign-table modification as required, and fetch RETURNING 3591 * result if any. (This is the shared guts of postgresExecForeignInsert, 3592 * postgresExecForeignUpdate, and postgresExecForeignDelete.) 3593 */ 3594 static TupleTableSlot * 3595 execute_foreign_modify(EState *estate, 3596 ResultRelInfo *resultRelInfo, 3597 CmdType operation, 3598 TupleTableSlot *slot, 3599 TupleTableSlot *planSlot) 3600 { 3601 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; 3602 ItemPointer ctid = NULL; 3603 const char **p_values; 3604 PGresult *res; 3605 int n_rows; 3606 3607 /* The operation should be INSERT, UPDATE, or DELETE */ 3608 Assert(operation == CMD_INSERT || 3609 operation == CMD_UPDATE || 3610 operation == CMD_DELETE); 3611 3612 /* Set up the prepared statement on the remote server, if we didn't yet */ 3613 if (!fmstate->p_name) 3614 prepare_foreign_modify(fmstate); 3615 3616 /* 3617 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column 3618 */ 3619 if (operation == CMD_UPDATE || operation == CMD_DELETE) 3620 { 3621 Datum datum; 3622 bool isNull; 3623 3624 datum = ExecGetJunkAttribute(planSlot, 3625 fmstate->ctidAttno, 3626 &isNull); 3627 /* shouldn't ever get a null result... */ 3628 if (isNull) 3629 elog(ERROR, "ctid is NULL"); 3630 ctid = (ItemPointer) DatumGetPointer(datum); 3631 } 3632 3633 /* Convert parameters needed by prepared statement to text form */ 3634 p_values = convert_prep_stmt_params(fmstate, ctid, slot); 3635 3636 /* 3637 * Execute the prepared statement. 3638 */ 3639 if (!PQsendQueryPrepared(fmstate->conn, 3640 fmstate->p_name, 3641 fmstate->p_nums, 3642 p_values, 3643 NULL, 3644 NULL, 3645 0)) 3646 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); 3647 3648 /* 3649 * Get the result, and check for success. 3650 * 3651 * We don't use a PG_TRY block here, so be careful not to throw error 3652 * without releasing the PGresult. 3653 */ 3654 res = pgfdw_get_result(fmstate->conn, fmstate->query); 3655 if (PQresultStatus(res) != 3656 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) 3657 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); 3658 3659 /* Check number of rows affected, and fetch RETURNING tuple if any */ 3660 if (fmstate->has_returning) 3661 { 3662 n_rows = PQntuples(res); 3663 if (n_rows > 0) 3664 store_returning_result(fmstate, slot, res); 3665 } 3666 else 3667 n_rows = atoi(PQcmdTuples(res)); 3668 3669 /* And clean up */ 3670 PQclear(res); 3671 3672 MemoryContextReset(fmstate->temp_cxt); 3673 3674 /* 3675 * Return NULL if nothing was inserted/updated/deleted on the remote end 3676 */ 3677 return (n_rows > 0) ? slot : NULL; 3678 } 3679 3680 /* 3681 * prepare_foreign_modify 3682 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE 3683 */ 3684 static void 3685 prepare_foreign_modify(PgFdwModifyState *fmstate) 3686 { 3687 char prep_name[NAMEDATALEN]; 3688 char *p_name; 3689 PGresult *res; 3690 3691 /* Construct name we'll use for the prepared statement. */ 3692 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", 3693 GetPrepStmtNumber(fmstate->conn)); 3694 p_name = pstrdup(prep_name); 3695 3696 /* 3697 * We intentionally do not specify parameter types here, but leave the 3698 * remote server to derive them by default. This avoids possible problems 3699 * with the remote server using different type OIDs than we do. All of 3700 * the prepared statements we use in this module are simple enough that 3701 * the remote server will make the right choices. 3702 */ 3703 if (!PQsendPrepare(fmstate->conn, 3704 p_name, 3705 fmstate->query, 3706 0, 3707 NULL)) 3708 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); 3709 3710 /* 3711 * Get the result, and check for success. 3712 * 3713 * We don't use a PG_TRY block here, so be careful not to throw error 3714 * without releasing the PGresult. 3715 */ 3716 res = pgfdw_get_result(fmstate->conn, fmstate->query); 3717 if (PQresultStatus(res) != PGRES_COMMAND_OK) 3718 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); 3719 PQclear(res); 3720 3721 /* This action shows that the prepare has been done. */ 3722 fmstate->p_name = p_name; 3723 } 3724 3725 /* 3726 * convert_prep_stmt_params 3727 * Create array of text strings representing parameter values 3728 * 3729 * tupleid is ctid to send, or NULL if none 3730 * slot is slot to get remaining parameters from, or NULL if none 3731 * 3732 * Data is constructed in temp_cxt; caller should reset that after use. 3733 */ 3734 static const char ** 3735 convert_prep_stmt_params(PgFdwModifyState *fmstate, 3736 ItemPointer tupleid, 3737 TupleTableSlot *slot) 3738 { 3739 const char **p_values; 3740 int pindex = 0; 3741 MemoryContext oldcontext; 3742 3743 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); 3744 3745 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); 3746 3747 /* 1st parameter should be ctid, if it's in use */ 3748 if (tupleid != NULL) 3749 { 3750 /* don't need set_transmission_modes for TID output */ 3751 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], 3752 PointerGetDatum(tupleid)); 3753 pindex++; 3754 } 3755 3756 /* get following parameters from slot */ 3757 if (slot != NULL && fmstate->target_attrs != NIL) 3758 { 3759 TupleDesc tupdesc = RelationGetDescr(fmstate->rel); 3760 int nestlevel; 3761 ListCell *lc; 3762 3763 nestlevel = set_transmission_modes(); 3764 3765 foreach(lc, fmstate->target_attrs) 3766 { 3767 int attnum = lfirst_int(lc); 3768 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); 3769 Datum value; 3770 bool isnull; 3771 3772 /* Ignore generated columns; they are set to DEFAULT */ 3773 if (attr->attgenerated) 3774 continue; 3775 value = slot_getattr(slot, attnum, &isnull); 3776 if (isnull) 3777 p_values[pindex] = NULL; 3778 else 3779 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], 3780 value); 3781 pindex++; 3782 } 3783 3784 reset_transmission_modes(nestlevel); 3785 } 3786 3787 Assert(pindex == fmstate->p_nums); 3788 3789 MemoryContextSwitchTo(oldcontext); 3790 3791 return p_values; 3792 } 3793 3794 /* 3795 * store_returning_result 3796 * Store the result of a RETURNING clause 3797 * 3798 * On error, be sure to release the PGresult on the way out. Callers do not 3799 * have PG_TRY blocks to ensure this happens. 3800 */ 3801 static void 3802 store_returning_result(PgFdwModifyState *fmstate, 3803 TupleTableSlot *slot, PGresult *res) 3804 { 3805 PG_TRY(); 3806 { 3807 HeapTuple newtup; 3808 3809 newtup = make_tuple_from_result_row(res, 0, 3810 fmstate->rel, 3811 fmstate->attinmeta, 3812 fmstate->retrieved_attrs, 3813 NULL, 3814 fmstate->temp_cxt); 3815 3816 /* 3817 * The returning slot will not necessarily be suitable to store 3818 * heaptuples directly, so allow for conversion. 3819 */ 3820 ExecForceStoreHeapTuple(newtup, slot, true); 3821 } 3822 PG_CATCH(); 3823 { 3824 if (res) 3825 PQclear(res); 3826 PG_RE_THROW(); 3827 } 3828 PG_END_TRY(); 3829 } 3830 3831 /* 3832 * finish_foreign_modify 3833 * Release resources for a foreign insert/update/delete operation 3834 */ 3835 static void 3836 finish_foreign_modify(PgFdwModifyState *fmstate) 3837 { 3838 Assert(fmstate != NULL); 3839 3840 /* If we created a prepared statement, destroy it */ 3841 if (fmstate->p_name) 3842 { 3843 char sql[64]; 3844 PGresult *res; 3845 3846 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); 3847 3848 /* 3849 * We don't use a PG_TRY block here, so be careful not to throw error 3850 * without releasing the PGresult. 3851 */ 3852 res = pgfdw_exec_query(fmstate->conn, sql); 3853 if (PQresultStatus(res) != PGRES_COMMAND_OK) 3854 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); 3855 PQclear(res); 3856 fmstate->p_name = NULL; 3857 } 3858 3859 /* Release remote connection */ 3860 ReleaseConnection(fmstate->conn); 3861 fmstate->conn = NULL; 3862 } 3863 3864 /* 3865 * build_remote_returning 3866 * Build a RETURNING targetlist of a remote query for performing an 3867 * UPDATE/DELETE .. RETURNING on a join directly 3868 */ 3869 static List * 3870 build_remote_returning(Index rtindex, Relation rel, List *returningList) 3871 { 3872 bool have_wholerow = false; 3873 List *tlist = NIL; 3874 List *vars; 3875 ListCell *lc; 3876 3877 Assert(returningList); 3878 3879 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS); 3880 3881 /* 3882 * If there's a whole-row reference to the target relation, then we'll 3883 * need all the columns of the relation. 3884 */ 3885 foreach(lc, vars) 3886 { 3887 Var *var = (Var *) lfirst(lc); 3888 3889 if (IsA(var, Var) && 3890 var->varno == rtindex && 3891 var->varattno == InvalidAttrNumber) 3892 { 3893 have_wholerow = true; 3894 break; 3895 } 3896 } 3897 3898 if (have_wholerow) 3899 { 3900 TupleDesc tupdesc = RelationGetDescr(rel); 3901 int i; 3902 3903 for (i = 1; i <= tupdesc->natts; i++) 3904 { 3905 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1); 3906 Var *var; 3907 3908 /* Ignore dropped attributes. */ 3909 if (attr->attisdropped) 3910 continue; 3911 3912 var = makeVar(rtindex, 3913 i, 3914 attr->atttypid, 3915 attr->atttypmod, 3916 attr->attcollation, 3917 0); 3918 3919 tlist = lappend(tlist, 3920 makeTargetEntry((Expr *) var, 3921 list_length(tlist) + 1, 3922 NULL, 3923 false)); 3924 } 3925 } 3926 3927 /* Now add any remaining columns to tlist. */ 3928 foreach(lc, vars) 3929 { 3930 Var *var = (Var *) lfirst(lc); 3931 3932 /* 3933 * No need for whole-row references to the target relation. We don't 3934 * need system columns other than ctid and oid either, since those are 3935 * set locally. 3936 */ 3937 if (IsA(var, Var) && 3938 var->varno == rtindex && 3939 var->varattno <= InvalidAttrNumber && 3940 var->varattno != SelfItemPointerAttributeNumber) 3941 continue; /* don't need it */ 3942 3943 if (tlist_member((Expr *) var, tlist)) 3944 continue; /* already got it */ 3945 3946 tlist = lappend(tlist, 3947 makeTargetEntry((Expr *) var, 3948 list_length(tlist) + 1, 3949 NULL, 3950 false)); 3951 } 3952 3953 list_free(vars); 3954 3955 return tlist; 3956 } 3957 3958 /* 3959 * rebuild_fdw_scan_tlist 3960 * Build new fdw_scan_tlist of given foreign-scan plan node from given 3961 * tlist 3962 * 3963 * There might be columns that the fdw_scan_tlist of the given foreign-scan 3964 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would 3965 * have contained resjunk columns such as 'ctid' of the target relation and 3966 * 'wholerow' of non-target relations, but the tlist might not contain them, 3967 * for example. So, adjust the tlist so it contains all the columns specified 3968 * in the fdw_scan_tlist; else setrefs.c will get confused. 3969 */ 3970 static void 3971 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist) 3972 { 3973 List *new_tlist = tlist; 3974 List *old_tlist = fscan->fdw_scan_tlist; 3975 ListCell *lc; 3976 3977 foreach(lc, old_tlist) 3978 { 3979 TargetEntry *tle = (TargetEntry *) lfirst(lc); 3980 3981 if (tlist_member(tle->expr, new_tlist)) 3982 continue; /* already got it */ 3983 3984 new_tlist = lappend(new_tlist, 3985 makeTargetEntry(tle->expr, 3986 list_length(new_tlist) + 1, 3987 NULL, 3988 false)); 3989 } 3990 fscan->fdw_scan_tlist = new_tlist; 3991 } 3992 3993 /* 3994 * Execute a direct UPDATE/DELETE statement. 3995 */ 3996 static void 3997 execute_dml_stmt(ForeignScanState *node) 3998 { 3999 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; 4000 ExprContext *econtext = node->ss.ps.ps_ExprContext; 4001 int numParams = dmstate->numParams; 4002 const char **values = dmstate->param_values; 4003 4004 /* 4005 * Construct array of query parameter values in text format. 4006 */ 4007 if (numParams > 0) 4008 process_query_params(econtext, 4009 dmstate->param_flinfo, 4010 dmstate->param_exprs, 4011 values); 4012 4013 /* 4014 * Notice that we pass NULL for paramTypes, thus forcing the remote server 4015 * to infer types for all parameters. Since we explicitly cast every 4016 * parameter (see deparse.c), the "inference" is trivial and will produce 4017 * the desired result. This allows us to avoid assuming that the remote 4018 * server has the same OIDs we do for the parameters' types. 4019 */ 4020 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, 4021 NULL, values, NULL, NULL, 0)) 4022 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); 4023 4024 /* 4025 * Get the result, and check for success. 4026 * 4027 * We don't use a PG_TRY block here, so be careful not to throw error 4028 * without releasing the PGresult. 4029 */ 4030 dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); 4031 if (PQresultStatus(dmstate->result) != 4032 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) 4033 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, 4034 dmstate->query); 4035 4036 /* Get the number of rows affected. */ 4037 if (dmstate->has_returning) 4038 dmstate->num_tuples = PQntuples(dmstate->result); 4039 else 4040 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result)); 4041 } 4042 4043 /* 4044 * Get the result of a RETURNING clause. 4045 */ 4046 static TupleTableSlot * 4047 get_returning_data(ForeignScanState *node) 4048 { 4049 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; 4050 EState *estate = node->ss.ps.state; 4051 ResultRelInfo *resultRelInfo = estate->es_result_relation_info; 4052 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; 4053 TupleTableSlot *resultSlot; 4054 4055 Assert(resultRelInfo->ri_projectReturning); 4056 4057 /* If we didn't get any tuples, must be end of data. */ 4058 if (dmstate->next_tuple >= dmstate->num_tuples) 4059 return ExecClearTuple(slot); 4060 4061 /* Increment the command es_processed count if necessary. */ 4062 if (dmstate->set_processed) 4063 estate->es_processed += 1; 4064 4065 /* 4066 * Store a RETURNING tuple. If has_returning is false, just emit a dummy 4067 * tuple. (has_returning is false when the local query is of the form 4068 * "UPDATE/DELETE .. RETURNING 1" for example.) 4069 */ 4070 if (!dmstate->has_returning) 4071 { 4072 ExecStoreAllNullTuple(slot); 4073 resultSlot = slot; 4074 } 4075 else 4076 { 4077 /* 4078 * On error, be sure to release the PGresult on the way out. Callers 4079 * do not have PG_TRY blocks to ensure this happens. 4080 */ 4081 PG_TRY(); 4082 { 4083 HeapTuple newtup; 4084 4085 newtup = make_tuple_from_result_row(dmstate->result, 4086 dmstate->next_tuple, 4087 dmstate->rel, 4088 dmstate->attinmeta, 4089 dmstate->retrieved_attrs, 4090 node, 4091 dmstate->temp_cxt); 4092 ExecStoreHeapTuple(newtup, slot, false); 4093 } 4094 PG_CATCH(); 4095 { 4096 if (dmstate->result) 4097 PQclear(dmstate->result); 4098 PG_RE_THROW(); 4099 } 4100 PG_END_TRY(); 4101 4102 /* Get the updated/deleted tuple. */ 4103 if (dmstate->rel) 4104 resultSlot = slot; 4105 else 4106 resultSlot = apply_returning_filter(dmstate, slot, estate); 4107 } 4108 dmstate->next_tuple++; 4109 4110 /* Make slot available for evaluation of the local query RETURNING list. */ 4111 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = 4112 resultSlot; 4113 4114 return slot; 4115 } 4116 4117 /* 4118 * Initialize a filter to extract an updated/deleted tuple from a scan tuple. 4119 */ 4120 static void 4121 init_returning_filter(PgFdwDirectModifyState *dmstate, 4122 List *fdw_scan_tlist, 4123 Index rtindex) 4124 { 4125 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); 4126 ListCell *lc; 4127 int i; 4128 4129 /* 4130 * Calculate the mapping between the fdw_scan_tlist's entries and the 4131 * result tuple's attributes. 4132 * 4133 * The "map" is an array of indexes of the result tuple's attributes in 4134 * fdw_scan_tlist, i.e., one entry for every attribute of the result 4135 * tuple. We store zero for any attributes that don't have the 4136 * corresponding entries in that list, marking that a NULL is needed in 4137 * the result tuple. 4138 * 4139 * Also get the indexes of the entries for ctid and oid if any. 4140 */ 4141 dmstate->attnoMap = (AttrNumber *) 4142 palloc0(resultTupType->natts * sizeof(AttrNumber)); 4143 4144 dmstate->ctidAttno = dmstate->oidAttno = 0; 4145 4146 i = 1; 4147 dmstate->hasSystemCols = false; 4148 foreach(lc, fdw_scan_tlist) 4149 { 4150 TargetEntry *tle = (TargetEntry *) lfirst(lc); 4151 Var *var = (Var *) tle->expr; 4152 4153 Assert(IsA(var, Var)); 4154 4155 /* 4156 * If the Var is a column of the target relation to be retrieved from 4157 * the foreign server, get the index of the entry. 4158 */ 4159 if (var->varno == rtindex && 4160 list_member_int(dmstate->retrieved_attrs, i)) 4161 { 4162 int attrno = var->varattno; 4163 4164 if (attrno < 0) 4165 { 4166 /* 4167 * We don't retrieve system columns other than ctid and oid. 4168 */ 4169 if (attrno == SelfItemPointerAttributeNumber) 4170 dmstate->ctidAttno = i; 4171 else 4172 Assert(false); 4173 dmstate->hasSystemCols = true; 4174 } 4175 else 4176 { 4177 /* 4178 * We don't retrieve whole-row references to the target 4179 * relation either. 4180 */ 4181 Assert(attrno > 0); 4182 4183 dmstate->attnoMap[attrno - 1] = i; 4184 } 4185 } 4186 i++; 4187 } 4188 } 4189 4190 /* 4191 * Extract and return an updated/deleted tuple from a scan tuple. 4192 */ 4193 static TupleTableSlot * 4194 apply_returning_filter(PgFdwDirectModifyState *dmstate, 4195 TupleTableSlot *slot, 4196 EState *estate) 4197 { 4198 ResultRelInfo *relInfo = estate->es_result_relation_info; 4199 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); 4200 TupleTableSlot *resultSlot; 4201 Datum *values; 4202 bool *isnull; 4203 Datum *old_values; 4204 bool *old_isnull; 4205 int i; 4206 4207 /* 4208 * Use the return tuple slot as a place to store the result tuple. 4209 */ 4210 resultSlot = ExecGetReturningSlot(estate, relInfo); 4211 4212 /* 4213 * Extract all the values of the scan tuple. 4214 */ 4215 slot_getallattrs(slot); 4216 old_values = slot->tts_values; 4217 old_isnull = slot->tts_isnull; 4218 4219 /* 4220 * Prepare to build the result tuple. 4221 */ 4222 ExecClearTuple(resultSlot); 4223 values = resultSlot->tts_values; 4224 isnull = resultSlot->tts_isnull; 4225 4226 /* 4227 * Transpose data into proper fields of the result tuple. 4228 */ 4229 for (i = 0; i < resultTupType->natts; i++) 4230 { 4231 int j = dmstate->attnoMap[i]; 4232 4233 if (j == 0) 4234 { 4235 values[i] = (Datum) 0; 4236 isnull[i] = true; 4237 } 4238 else 4239 { 4240 values[i] = old_values[j - 1]; 4241 isnull[i] = old_isnull[j - 1]; 4242 } 4243 } 4244 4245 /* 4246 * Build the virtual tuple. 4247 */ 4248 ExecStoreVirtualTuple(resultSlot); 4249 4250 /* 4251 * If we have any system columns to return, materialize a heap tuple in 4252 * the slot from column values set above and install system columns in 4253 * that tuple. 4254 */ 4255 if (dmstate->hasSystemCols) 4256 { 4257 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL); 4258 4259 /* ctid */ 4260 if (dmstate->ctidAttno) 4261 { 4262 ItemPointer ctid = NULL; 4263 4264 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]); 4265 resultTup->t_self = *ctid; 4266 } 4267 4268 /* 4269 * And remaining columns 4270 * 4271 * Note: since we currently don't allow the target relation to appear 4272 * on the nullable side of an outer join, any system columns wouldn't 4273 * go to NULL. 4274 * 4275 * Note: no need to care about tableoid here because it will be 4276 * initialized in ExecProcessReturning(). 4277 */ 4278 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId); 4279 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId); 4280 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId); 4281 } 4282 4283 /* 4284 * And return the result tuple. 4285 */ 4286 return resultSlot; 4287 } 4288 4289 /* 4290 * Prepare for processing of parameters used in remote query. 4291 */ 4292 static void 4293 prepare_query_params(PlanState *node, 4294 List *fdw_exprs, 4295 int numParams, 4296 FmgrInfo **param_flinfo, 4297 List **param_exprs, 4298 const char ***param_values) 4299 { 4300 int i; 4301 ListCell *lc; 4302 4303 Assert(numParams > 0); 4304 4305 /* Prepare for output conversion of parameters used in remote query. */ 4306 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); 4307 4308 i = 0; 4309 foreach(lc, fdw_exprs) 4310 { 4311 Node *param_expr = (Node *) lfirst(lc); 4312 Oid typefnoid; 4313 bool isvarlena; 4314 4315 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); 4316 fmgr_info(typefnoid, &(*param_flinfo)[i]); 4317 i++; 4318 } 4319 4320 /* 4321 * Prepare remote-parameter expressions for evaluation. (Note: in 4322 * practice, we expect that all these expressions will be just Params, so 4323 * we could possibly do something more efficient than using the full 4324 * expression-eval machinery for this. But probably there would be little 4325 * benefit, and it'd require postgres_fdw to know more than is desirable 4326 * about Param evaluation.) 4327 */ 4328 *param_exprs = ExecInitExprList(fdw_exprs, node); 4329 4330 /* Allocate buffer for text form of query parameters. */ 4331 *param_values = (const char **) palloc0(numParams * sizeof(char *)); 4332 } 4333 4334 /* 4335 * Construct array of query parameter values in text format. 4336 */ 4337 static void 4338 process_query_params(ExprContext *econtext, 4339 FmgrInfo *param_flinfo, 4340 List *param_exprs, 4341 const char **param_values) 4342 { 4343 int nestlevel; 4344 int i; 4345 ListCell *lc; 4346 4347 nestlevel = set_transmission_modes(); 4348 4349 i = 0; 4350 foreach(lc, param_exprs) 4351 { 4352 ExprState *expr_state = (ExprState *) lfirst(lc); 4353 Datum expr_value; 4354 bool isNull; 4355 4356 /* Evaluate the parameter expression */ 4357 expr_value = ExecEvalExpr(expr_state, econtext, &isNull); 4358 4359 /* 4360 * Get string representation of each parameter value by invoking 4361 * type-specific output function, unless the value is null. 4362 */ 4363 if (isNull) 4364 param_values[i] = NULL; 4365 else 4366 param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); 4367 4368 i++; 4369 } 4370 4371 reset_transmission_modes(nestlevel); 4372 } 4373 4374 /* 4375 * postgresAnalyzeForeignTable 4376 * Test whether analyzing this foreign table is supported 4377 */ 4378 static bool 4379 postgresAnalyzeForeignTable(Relation relation, 4380 AcquireSampleRowsFunc *func, 4381 BlockNumber *totalpages) 4382 { 4383 ForeignTable *table; 4384 UserMapping *user; 4385 PGconn *conn; 4386 StringInfoData sql; 4387 PGresult *volatile res = NULL; 4388 4389 /* Return the row-analysis function pointer */ 4390 *func = postgresAcquireSampleRowsFunc; 4391 4392 /* 4393 * Now we have to get the number of pages. It's annoying that the ANALYZE 4394 * API requires us to return that now, because it forces some duplication 4395 * of effort between this routine and postgresAcquireSampleRowsFunc. But 4396 * it's probably not worth redefining that API at this point. 4397 */ 4398 4399 /* 4400 * Get the connection to use. We do the remote access as the table's 4401 * owner, even if the ANALYZE was started by some other user. 4402 */ 4403 table = GetForeignTable(RelationGetRelid(relation)); 4404 user = GetUserMapping(relation->rd_rel->relowner, table->serverid); 4405 conn = GetConnection(user, false); 4406 4407 /* 4408 * Construct command to get page count for relation. 4409 */ 4410 initStringInfo(&sql); 4411 deparseAnalyzeSizeSql(&sql, relation); 4412 4413 /* In what follows, do not risk leaking any PGresults. */ 4414 PG_TRY(); 4415 { 4416 res = pgfdw_exec_query(conn, sql.data); 4417 if (PQresultStatus(res) != PGRES_TUPLES_OK) 4418 pgfdw_report_error(ERROR, res, conn, false, sql.data); 4419 4420 if (PQntuples(res) != 1 || PQnfields(res) != 1) 4421 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); 4422 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); 4423 4424 PQclear(res); 4425 res = NULL; 4426 } 4427 PG_CATCH(); 4428 { 4429 if (res) 4430 PQclear(res); 4431 PG_RE_THROW(); 4432 } 4433 PG_END_TRY(); 4434 4435 ReleaseConnection(conn); 4436 4437 return true; 4438 } 4439 4440 /* 4441 * Acquire a random sample of rows from foreign table managed by postgres_fdw. 4442 * 4443 * We fetch the whole table from the remote side and pick out some sample rows. 4444 * 4445 * Selected rows are returned in the caller-allocated array rows[], 4446 * which must have at least targrows entries. 4447 * The actual number of rows selected is returned as the function result. 4448 * We also count the total number of rows in the table and return it into 4449 * *totalrows. Note that *totaldeadrows is always set to 0. 4450 * 4451 * Note that the returned list of rows is not always in order by physical 4452 * position in the table. Therefore, correlation estimates derived later 4453 * may be meaningless, but it's OK because we don't use the estimates 4454 * currently (the planner only pays attention to correlation for indexscans). 4455 */ 4456 static int 4457 postgresAcquireSampleRowsFunc(Relation relation, int elevel, 4458 HeapTuple *rows, int targrows, 4459 double *totalrows, 4460 double *totaldeadrows) 4461 { 4462 PgFdwAnalyzeState astate; 4463 ForeignTable *table; 4464 ForeignServer *server; 4465 UserMapping *user; 4466 PGconn *conn; 4467 unsigned int cursor_number; 4468 StringInfoData sql; 4469 PGresult *volatile res = NULL; 4470 4471 /* Initialize workspace state */ 4472 astate.rel = relation; 4473 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); 4474 4475 astate.rows = rows; 4476 astate.targrows = targrows; 4477 astate.numrows = 0; 4478 astate.samplerows = 0; 4479 astate.rowstoskip = -1; /* -1 means not set yet */ 4480 reservoir_init_selection_state(&astate.rstate, targrows); 4481 4482 /* Remember ANALYZE context, and create a per-tuple temp context */ 4483 astate.anl_cxt = CurrentMemoryContext; 4484 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, 4485 "postgres_fdw temporary data", 4486 ALLOCSET_SMALL_SIZES); 4487 4488 /* 4489 * Get the connection to use. We do the remote access as the table's 4490 * owner, even if the ANALYZE was started by some other user. 4491 */ 4492 table = GetForeignTable(RelationGetRelid(relation)); 4493 server = GetForeignServer(table->serverid); 4494 user = GetUserMapping(relation->rd_rel->relowner, table->serverid); 4495 conn = GetConnection(user, false); 4496 4497 /* 4498 * Construct cursor that retrieves whole rows from remote. 4499 */ 4500 cursor_number = GetCursorNumber(conn); 4501 initStringInfo(&sql); 4502 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); 4503 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); 4504 4505 /* In what follows, do not risk leaking any PGresults. */ 4506 PG_TRY(); 4507 { 4508 res = pgfdw_exec_query(conn, sql.data); 4509 if (PQresultStatus(res) != PGRES_COMMAND_OK) 4510 pgfdw_report_error(ERROR, res, conn, false, sql.data); 4511 PQclear(res); 4512 res = NULL; 4513 4514 /* Retrieve and process rows a batch at a time. */ 4515 for (;;) 4516 { 4517 char fetch_sql[64]; 4518 int fetch_size; 4519 int numrows; 4520 int i; 4521 ListCell *lc; 4522 4523 /* Allow users to cancel long query */ 4524 CHECK_FOR_INTERRUPTS(); 4525 4526 /* 4527 * XXX possible future improvement: if rowstoskip is large, we 4528 * could issue a MOVE rather than physically fetching the rows, 4529 * then just adjust rowstoskip and samplerows appropriately. 4530 */ 4531 4532 /* The fetch size is arbitrary, but shouldn't be enormous. */ 4533 fetch_size = 100; 4534 foreach(lc, server->options) 4535 { 4536 DefElem *def = (DefElem *) lfirst(lc); 4537 4538 if (strcmp(def->defname, "fetch_size") == 0) 4539 { 4540 fetch_size = strtol(defGetString(def), NULL, 10); 4541 break; 4542 } 4543 } 4544 foreach(lc, table->options) 4545 { 4546 DefElem *def = (DefElem *) lfirst(lc); 4547 4548 if (strcmp(def->defname, "fetch_size") == 0) 4549 { 4550 fetch_size = strtol(defGetString(def), NULL, 10); 4551 break; 4552 } 4553 } 4554 4555 /* Fetch some rows */ 4556 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", 4557 fetch_size, cursor_number); 4558 4559 res = pgfdw_exec_query(conn, fetch_sql); 4560 /* On error, report the original query, not the FETCH. */ 4561 if (PQresultStatus(res) != PGRES_TUPLES_OK) 4562 pgfdw_report_error(ERROR, res, conn, false, sql.data); 4563 4564 /* Process whatever we got. */ 4565 numrows = PQntuples(res); 4566 for (i = 0; i < numrows; i++) 4567 analyze_row_processor(res, i, &astate); 4568 4569 PQclear(res); 4570 res = NULL; 4571 4572 /* Must be EOF if we didn't get all the rows requested. */ 4573 if (numrows < fetch_size) 4574 break; 4575 } 4576 4577 /* Close the cursor, just to be tidy. */ 4578 close_cursor(conn, cursor_number); 4579 } 4580 PG_CATCH(); 4581 { 4582 if (res) 4583 PQclear(res); 4584 PG_RE_THROW(); 4585 } 4586 PG_END_TRY(); 4587 4588 ReleaseConnection(conn); 4589 4590 /* We assume that we have no dead tuple. */ 4591 *totaldeadrows = 0.0; 4592 4593 /* We've retrieved all living tuples from foreign server. */ 4594 *totalrows = astate.samplerows; 4595 4596 /* 4597 * Emit some interesting relation info 4598 */ 4599 ereport(elevel, 4600 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample", 4601 RelationGetRelationName(relation), 4602 astate.samplerows, astate.numrows))); 4603 4604 return astate.numrows; 4605 } 4606 4607 /* 4608 * Collect sample rows from the result of query. 4609 * - Use all tuples in sample until target # of samples are collected. 4610 * - Subsequently, replace already-sampled tuples randomly. 4611 */ 4612 static void 4613 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) 4614 { 4615 int targrows = astate->targrows; 4616 int pos; /* array index to store tuple in */ 4617 MemoryContext oldcontext; 4618 4619 /* Always increment sample row counter. */ 4620 astate->samplerows += 1; 4621 4622 /* 4623 * Determine the slot where this sample row should be stored. Set pos to 4624 * negative value to indicate the row should be skipped. 4625 */ 4626 if (astate->numrows < targrows) 4627 { 4628 /* First targrows rows are always included into the sample */ 4629 pos = astate->numrows++; 4630 } 4631 else 4632 { 4633 /* 4634 * Now we start replacing tuples in the sample until we reach the end 4635 * of the relation. Same algorithm as in acquire_sample_rows in 4636 * analyze.c; see Jeff Vitter's paper. 4637 */ 4638 if (astate->rowstoskip < 0) 4639 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows); 4640 4641 if (astate->rowstoskip <= 0) 4642 { 4643 /* Choose a random reservoir element to replace. */ 4644 pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate)); 4645 Assert(pos >= 0 && pos < targrows); 4646 heap_freetuple(astate->rows[pos]); 4647 } 4648 else 4649 { 4650 /* Skip this tuple. */ 4651 pos = -1; 4652 } 4653 4654 astate->rowstoskip -= 1; 4655 } 4656 4657 if (pos >= 0) 4658 { 4659 /* 4660 * Create sample tuple from current result row, and store it in the 4661 * position determined above. The tuple has to be created in anl_cxt. 4662 */ 4663 oldcontext = MemoryContextSwitchTo(astate->anl_cxt); 4664 4665 astate->rows[pos] = make_tuple_from_result_row(res, row, 4666 astate->rel, 4667 astate->attinmeta, 4668 astate->retrieved_attrs, 4669 NULL, 4670 astate->temp_cxt); 4671 4672 MemoryContextSwitchTo(oldcontext); 4673 } 4674 } 4675 4676 /* 4677 * Import a foreign schema 4678 */ 4679 static List * 4680 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) 4681 { 4682 List *commands = NIL; 4683 bool import_collate = true; 4684 bool import_default = false; 4685 bool import_generated = true; 4686 bool import_not_null = true; 4687 ForeignServer *server; 4688 UserMapping *mapping; 4689 PGconn *conn; 4690 StringInfoData buf; 4691 PGresult *volatile res = NULL; 4692 int numrows, 4693 i; 4694 ListCell *lc; 4695 4696 /* Parse statement options */ 4697 foreach(lc, stmt->options) 4698 { 4699 DefElem *def = (DefElem *) lfirst(lc); 4700 4701 if (strcmp(def->defname, "import_collate") == 0) 4702 import_collate = defGetBoolean(def); 4703 else if (strcmp(def->defname, "import_default") == 0) 4704 import_default = defGetBoolean(def); 4705 else if (strcmp(def->defname, "import_generated") == 0) 4706 import_generated = defGetBoolean(def); 4707 else if (strcmp(def->defname, "import_not_null") == 0) 4708 import_not_null = defGetBoolean(def); 4709 else 4710 ereport(ERROR, 4711 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), 4712 errmsg("invalid option \"%s\"", def->defname))); 4713 } 4714 4715 /* 4716 * Get connection to the foreign server. Connection manager will 4717 * establish new connection if necessary. 4718 */ 4719 server = GetForeignServer(serverOid); 4720 mapping = GetUserMapping(GetUserId(), server->serverid); 4721 conn = GetConnection(mapping, false); 4722 4723 /* Don't attempt to import collation if remote server hasn't got it */ 4724 if (PQserverVersion(conn) < 90100) 4725 import_collate = false; 4726 4727 /* Create workspace for strings */ 4728 initStringInfo(&buf); 4729 4730 /* In what follows, do not risk leaking any PGresults. */ 4731 PG_TRY(); 4732 { 4733 /* Check that the schema really exists */ 4734 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); 4735 deparseStringLiteral(&buf, stmt->remote_schema); 4736 4737 res = pgfdw_exec_query(conn, buf.data); 4738 if (PQresultStatus(res) != PGRES_TUPLES_OK) 4739 pgfdw_report_error(ERROR, res, conn, false, buf.data); 4740 4741 if (PQntuples(res) != 1) 4742 ereport(ERROR, 4743 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), 4744 errmsg("schema \"%s\" is not present on foreign server \"%s\"", 4745 stmt->remote_schema, server->servername))); 4746 4747 PQclear(res); 4748 res = NULL; 4749 resetStringInfo(&buf); 4750 4751 /* 4752 * Fetch all table data from this schema, possibly restricted by 4753 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention 4754 * to EXCEPT/LIMIT TO here, because the core code will filter the 4755 * statements we return according to those lists anyway. But it 4756 * should save a few cycles to not process excluded tables in the 4757 * first place.) 4758 * 4759 * Ignore table data for partitions and only include the definitions 4760 * of the root partitioned tables to allow access to the complete 4761 * remote data set locally in the schema imported. 4762 * 4763 * Note: because we run the connection with search_path restricted to 4764 * pg_catalog, the format_type() and pg_get_expr() outputs will always 4765 * include a schema name for types/functions in other schemas, which 4766 * is what we want. 4767 */ 4768 appendStringInfoString(&buf, 4769 "SELECT relname, " 4770 " attname, " 4771 " format_type(atttypid, atttypmod), " 4772 " attnotnull, "); 4773 4774 /* Generated columns are supported since Postgres 12 */ 4775 if (PQserverVersion(conn) >= 120000) 4776 appendStringInfoString(&buf, 4777 " attgenerated, " 4778 " pg_get_expr(adbin, adrelid), "); 4779 else 4780 appendStringInfoString(&buf, 4781 " NULL, " 4782 " pg_get_expr(adbin, adrelid), "); 4783 4784 if (import_collate) 4785 appendStringInfoString(&buf, 4786 " collname, " 4787 " collnsp.nspname " 4788 "FROM pg_class c " 4789 " JOIN pg_namespace n ON " 4790 " relnamespace = n.oid " 4791 " LEFT JOIN pg_attribute a ON " 4792 " attrelid = c.oid AND attnum > 0 " 4793 " AND NOT attisdropped " 4794 " LEFT JOIN pg_attrdef ad ON " 4795 " adrelid = c.oid AND adnum = attnum " 4796 " LEFT JOIN pg_collation coll ON " 4797 " coll.oid = attcollation " 4798 " LEFT JOIN pg_namespace collnsp ON " 4799 " collnsp.oid = collnamespace "); 4800 else 4801 appendStringInfoString(&buf, 4802 " NULL, NULL " 4803 "FROM pg_class c " 4804 " JOIN pg_namespace n ON " 4805 " relnamespace = n.oid " 4806 " LEFT JOIN pg_attribute a ON " 4807 " attrelid = c.oid AND attnum > 0 " 4808 " AND NOT attisdropped " 4809 " LEFT JOIN pg_attrdef ad ON " 4810 " adrelid = c.oid AND adnum = attnum "); 4811 4812 appendStringInfoString(&buf, 4813 "WHERE c.relkind IN (" 4814 CppAsString2(RELKIND_RELATION) "," 4815 CppAsString2(RELKIND_VIEW) "," 4816 CppAsString2(RELKIND_FOREIGN_TABLE) "," 4817 CppAsString2(RELKIND_MATVIEW) "," 4818 CppAsString2(RELKIND_PARTITIONED_TABLE) ") " 4819 " AND n.nspname = "); 4820 deparseStringLiteral(&buf, stmt->remote_schema); 4821 4822 /* Partitions are supported since Postgres 10 */ 4823 if (PQserverVersion(conn) >= 100000) 4824 appendStringInfoString(&buf, " AND NOT c.relispartition "); 4825 4826 /* Apply restrictions for LIMIT TO and EXCEPT */ 4827 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || 4828 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) 4829 { 4830 bool first_item = true; 4831 4832 appendStringInfoString(&buf, " AND c.relname "); 4833 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) 4834 appendStringInfoString(&buf, "NOT "); 4835 appendStringInfoString(&buf, "IN ("); 4836 4837 /* Append list of table names within IN clause */ 4838 foreach(lc, stmt->table_list) 4839 { 4840 RangeVar *rv = (RangeVar *) lfirst(lc); 4841 4842 if (first_item) 4843 first_item = false; 4844 else 4845 appendStringInfoString(&buf, ", "); 4846 deparseStringLiteral(&buf, rv->relname); 4847 } 4848 appendStringInfoChar(&buf, ')'); 4849 } 4850 4851 /* Append ORDER BY at the end of query to ensure output ordering */ 4852 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); 4853 4854 /* Fetch the data */ 4855 res = pgfdw_exec_query(conn, buf.data); 4856 if (PQresultStatus(res) != PGRES_TUPLES_OK) 4857 pgfdw_report_error(ERROR, res, conn, false, buf.data); 4858 4859 /* Process results */ 4860 numrows = PQntuples(res); 4861 /* note: incrementation of i happens in inner loop's while() test */ 4862 for (i = 0; i < numrows;) 4863 { 4864 char *tablename = PQgetvalue(res, i, 0); 4865 bool first_item = true; 4866 4867 resetStringInfo(&buf); 4868 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", 4869 quote_identifier(tablename)); 4870 4871 /* Scan all rows for this table */ 4872 do 4873 { 4874 char *attname; 4875 char *typename; 4876 char *attnotnull; 4877 char *attgenerated; 4878 char *attdefault; 4879 char *collname; 4880 char *collnamespace; 4881 4882 /* If table has no columns, we'll see nulls here */ 4883 if (PQgetisnull(res, i, 1)) 4884 continue; 4885 4886 attname = PQgetvalue(res, i, 1); 4887 typename = PQgetvalue(res, i, 2); 4888 attnotnull = PQgetvalue(res, i, 3); 4889 attgenerated = PQgetisnull(res, i, 4) ? (char *) NULL : 4890 PQgetvalue(res, i, 4); 4891 attdefault = PQgetisnull(res, i, 5) ? (char *) NULL : 4892 PQgetvalue(res, i, 5); 4893 collname = PQgetisnull(res, i, 6) ? (char *) NULL : 4894 PQgetvalue(res, i, 6); 4895 collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL : 4896 PQgetvalue(res, i, 7); 4897 4898 if (first_item) 4899 first_item = false; 4900 else 4901 appendStringInfoString(&buf, ",\n"); 4902 4903 /* Print column name and type */ 4904 appendStringInfo(&buf, " %s %s", 4905 quote_identifier(attname), 4906 typename); 4907 4908 /* 4909 * Add column_name option so that renaming the foreign table's 4910 * column doesn't break the association to the underlying 4911 * column. 4912 */ 4913 appendStringInfoString(&buf, " OPTIONS (column_name "); 4914 deparseStringLiteral(&buf, attname); 4915 appendStringInfoChar(&buf, ')'); 4916 4917 /* Add COLLATE if needed */ 4918 if (import_collate && collname != NULL && collnamespace != NULL) 4919 appendStringInfo(&buf, " COLLATE %s.%s", 4920 quote_identifier(collnamespace), 4921 quote_identifier(collname)); 4922 4923 /* Add DEFAULT if needed */ 4924 if (import_default && attdefault != NULL && 4925 (!attgenerated || !attgenerated[0])) 4926 appendStringInfo(&buf, " DEFAULT %s", attdefault); 4927 4928 /* Add GENERATED if needed */ 4929 if (import_generated && attgenerated != NULL && 4930 attgenerated[0] == ATTRIBUTE_GENERATED_STORED) 4931 { 4932 Assert(attdefault != NULL); 4933 appendStringInfo(&buf, 4934 " GENERATED ALWAYS AS (%s) STORED", 4935 attdefault); 4936 } 4937 4938 /* Add NOT NULL if needed */ 4939 if (import_not_null && attnotnull[0] == 't') 4940 appendStringInfoString(&buf, " NOT NULL"); 4941 } 4942 while (++i < numrows && 4943 strcmp(PQgetvalue(res, i, 0), tablename) == 0); 4944 4945 /* 4946 * Add server name and table-level options. We specify remote 4947 * schema and table name as options (the latter to ensure that 4948 * renaming the foreign table doesn't break the association). 4949 */ 4950 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", 4951 quote_identifier(server->servername)); 4952 4953 appendStringInfoString(&buf, "schema_name "); 4954 deparseStringLiteral(&buf, stmt->remote_schema); 4955 appendStringInfoString(&buf, ", table_name "); 4956 deparseStringLiteral(&buf, tablename); 4957 4958 appendStringInfoString(&buf, ");"); 4959 4960 commands = lappend(commands, pstrdup(buf.data)); 4961 } 4962 4963 /* Clean up */ 4964 PQclear(res); 4965 res = NULL; 4966 } 4967 PG_CATCH(); 4968 { 4969 if (res) 4970 PQclear(res); 4971 PG_RE_THROW(); 4972 } 4973 PG_END_TRY(); 4974 4975 ReleaseConnection(conn); 4976 4977 return commands; 4978 } 4979 4980 /* 4981 * Assess whether the join between inner and outer relations can be pushed down 4982 * to the foreign server. As a side effect, save information we obtain in this 4983 * function to PgFdwRelationInfo passed in. 4984 */ 4985 static bool 4986 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, 4987 RelOptInfo *outerrel, RelOptInfo *innerrel, 4988 JoinPathExtraData *extra) 4989 { 4990 PgFdwRelationInfo *fpinfo; 4991 PgFdwRelationInfo *fpinfo_o; 4992 PgFdwRelationInfo *fpinfo_i; 4993 ListCell *lc; 4994 List *joinclauses; 4995 4996 /* 4997 * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. 4998 * Constructing queries representing SEMI and ANTI joins is hard, hence 4999 * not considered right now. 5000 */ 5001 if (jointype != JOIN_INNER && jointype != JOIN_LEFT && 5002 jointype != JOIN_RIGHT && jointype != JOIN_FULL) 5003 return false; 5004 5005 /* 5006 * If either of the joining relations is marked as unsafe to pushdown, the 5007 * join can not be pushed down. 5008 */ 5009 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private; 5010 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; 5011 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; 5012 if (!fpinfo_o || !fpinfo_o->pushdown_safe || 5013 !fpinfo_i || !fpinfo_i->pushdown_safe) 5014 return false; 5015 5016 /* 5017 * If joining relations have local conditions, those conditions are 5018 * required to be applied before joining the relations. Hence the join can 5019 * not be pushed down. 5020 */ 5021 if (fpinfo_o->local_conds || fpinfo_i->local_conds) 5022 return false; 5023 5024 /* 5025 * Merge FDW options. We might be tempted to do this after we have deemed 5026 * the foreign join to be OK. But we must do this beforehand so that we 5027 * know which quals can be evaluated on the foreign server, which might 5028 * depend on shippable_extensions. 5029 */ 5030 fpinfo->server = fpinfo_o->server; 5031 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i); 5032 5033 /* 5034 * Separate restrict list into join quals and pushed-down (other) quals. 5035 * 5036 * Join quals belonging to an outer join must all be shippable, else we 5037 * cannot execute the join remotely. Add such quals to 'joinclauses'. 5038 * 5039 * Add other quals to fpinfo->remote_conds if they are shippable, else to 5040 * fpinfo->local_conds. In an inner join it's okay to execute conditions 5041 * either locally or remotely; the same is true for pushed-down conditions 5042 * at an outer join. 5043 * 5044 * Note we might return failure after having already scribbled on 5045 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we 5046 * won't consult those lists again if we deem the join unshippable. 5047 */ 5048 joinclauses = NIL; 5049 foreach(lc, extra->restrictlist) 5050 { 5051 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); 5052 bool is_remote_clause = is_foreign_expr(root, joinrel, 5053 rinfo->clause); 5054 5055 if (IS_OUTER_JOIN(jointype) && 5056 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids)) 5057 { 5058 if (!is_remote_clause) 5059 return false; 5060 joinclauses = lappend(joinclauses, rinfo); 5061 } 5062 else 5063 { 5064 if (is_remote_clause) 5065 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); 5066 else 5067 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); 5068 } 5069 } 5070 5071 /* 5072 * deparseExplicitTargetList() isn't smart enough to handle anything other 5073 * than a Var. In particular, if there's some PlaceHolderVar that would 5074 * need to be evaluated within this join tree (because there's an upper 5075 * reference to a quantity that may go to NULL as a result of an outer 5076 * join), then we can't try to push the join down because we'll fail when 5077 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that 5078 * needs to be evaluated *at the top* of this join tree is OK, because we 5079 * can do that locally after fetching the results from the remote side. 5080 */ 5081 foreach(lc, root->placeholder_list) 5082 { 5083 PlaceHolderInfo *phinfo = lfirst(lc); 5084 Relids relids; 5085 5086 /* PlaceHolderInfo refers to parent relids, not child relids. */ 5087 relids = IS_OTHER_REL(joinrel) ? 5088 joinrel->top_parent_relids : joinrel->relids; 5089 5090 if (bms_is_subset(phinfo->ph_eval_at, relids) && 5091 bms_nonempty_difference(relids, phinfo->ph_eval_at)) 5092 return false; 5093 } 5094 5095 /* Save the join clauses, for later use. */ 5096 fpinfo->joinclauses = joinclauses; 5097 5098 fpinfo->outerrel = outerrel; 5099 fpinfo->innerrel = innerrel; 5100 fpinfo->jointype = jointype; 5101 5102 /* 5103 * By default, both the input relations are not required to be deparsed as 5104 * subqueries, but there might be some relations covered by the input 5105 * relations that are required to be deparsed as subqueries, so save the 5106 * relids of those relations for later use by the deparser. 5107 */ 5108 fpinfo->make_outerrel_subquery = false; 5109 fpinfo->make_innerrel_subquery = false; 5110 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids)); 5111 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids)); 5112 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels, 5113 fpinfo_i->lower_subquery_rels); 5114 5115 /* 5116 * Pull the other remote conditions from the joining relations into join 5117 * clauses or other remote clauses (remote_conds) of this relation 5118 * wherever possible. This avoids building subqueries at every join step. 5119 * 5120 * For an inner join, clauses from both the relations are added to the 5121 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from 5122 * the outer side are added to remote_conds since those can be evaluated 5123 * after the join is evaluated. The clauses from inner side are added to 5124 * the joinclauses, since they need to be evaluated while constructing the 5125 * join. 5126 * 5127 * For a FULL OUTER JOIN, the other clauses from either relation can not 5128 * be added to the joinclauses or remote_conds, since each relation acts 5129 * as an outer relation for the other. 5130 * 5131 * The joining sides can not have local conditions, thus no need to test 5132 * shippability of the clauses being pulled up. 5133 */ 5134 switch (jointype) 5135 { 5136 case JOIN_INNER: 5137 fpinfo->remote_conds = list_concat(fpinfo->remote_conds, 5138 list_copy(fpinfo_i->remote_conds)); 5139 fpinfo->remote_conds = list_concat(fpinfo->remote_conds, 5140 list_copy(fpinfo_o->remote_conds)); 5141 break; 5142 5143 case JOIN_LEFT: 5144 fpinfo->joinclauses = list_concat(fpinfo->joinclauses, 5145 list_copy(fpinfo_i->remote_conds)); 5146 fpinfo->remote_conds = list_concat(fpinfo->remote_conds, 5147 list_copy(fpinfo_o->remote_conds)); 5148 break; 5149 5150 case JOIN_RIGHT: 5151 fpinfo->joinclauses = list_concat(fpinfo->joinclauses, 5152 list_copy(fpinfo_o->remote_conds)); 5153 fpinfo->remote_conds = list_concat(fpinfo->remote_conds, 5154 list_copy(fpinfo_i->remote_conds)); 5155 break; 5156 5157 case JOIN_FULL: 5158 5159 /* 5160 * In this case, if any of the input relations has conditions, we 5161 * need to deparse that relation as a subquery so that the 5162 * conditions can be evaluated before the join. Remember it in 5163 * the fpinfo of this relation so that the deparser can take 5164 * appropriate action. Also, save the relids of base relations 5165 * covered by that relation for later use by the deparser. 5166 */ 5167 if (fpinfo_o->remote_conds) 5168 { 5169 fpinfo->make_outerrel_subquery = true; 5170 fpinfo->lower_subquery_rels = 5171 bms_add_members(fpinfo->lower_subquery_rels, 5172 outerrel->relids); 5173 } 5174 if (fpinfo_i->remote_conds) 5175 { 5176 fpinfo->make_innerrel_subquery = true; 5177 fpinfo->lower_subquery_rels = 5178 bms_add_members(fpinfo->lower_subquery_rels, 5179 innerrel->relids); 5180 } 5181 break; 5182 5183 default: 5184 /* Should not happen, we have just checked this above */ 5185 elog(ERROR, "unsupported join type %d", jointype); 5186 } 5187 5188 /* 5189 * For an inner join, all restrictions can be treated alike. Treating the 5190 * pushed down conditions as join conditions allows a top level full outer 5191 * join to be deparsed without requiring subqueries. 5192 */ 5193 if (jointype == JOIN_INNER) 5194 { 5195 Assert(!fpinfo->joinclauses); 5196 fpinfo->joinclauses = fpinfo->remote_conds; 5197 fpinfo->remote_conds = NIL; 5198 } 5199 5200 /* Mark that this join can be pushed down safely */ 5201 fpinfo->pushdown_safe = true; 5202 5203 /* Get user mapping */ 5204 if (fpinfo->use_remote_estimate) 5205 { 5206 if (fpinfo_o->use_remote_estimate) 5207 fpinfo->user = fpinfo_o->user; 5208 else 5209 fpinfo->user = fpinfo_i->user; 5210 } 5211 else 5212 fpinfo->user = NULL; 5213 5214 /* 5215 * Set # of retrieved rows and cached relation costs to some negative 5216 * value, so that we can detect when they are set to some sensible values, 5217 * during one (usually the first) of the calls to estimate_path_cost_size. 5218 */ 5219 fpinfo->retrieved_rows = -1; 5220 fpinfo->rel_startup_cost = -1; 5221 fpinfo->rel_total_cost = -1; 5222 5223 /* 5224 * Set the string describing this join relation to be used in EXPLAIN 5225 * output of corresponding ForeignScan. 5226 */ 5227 fpinfo->relation_name = makeStringInfo(); 5228 appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)", 5229 fpinfo_o->relation_name->data, 5230 get_jointype_name(fpinfo->jointype), 5231 fpinfo_i->relation_name->data); 5232 5233 /* 5234 * Set the relation index. This is defined as the position of this 5235 * joinrel in the join_rel_list list plus the length of the rtable list. 5236 * Note that since this joinrel is at the end of the join_rel_list list 5237 * when we are called, we can get the position by list_length. 5238 */ 5239 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */ 5240 fpinfo->relation_index = 5241 list_length(root->parse->rtable) + list_length(root->join_rel_list); 5242 5243 return true; 5244 } 5245 5246 static void 5247 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, 5248 Path *epq_path) 5249 { 5250 List *useful_pathkeys_list = NIL; /* List of all pathkeys */ 5251 ListCell *lc; 5252 5253 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel); 5254 5255 /* Create one path for each set of pathkeys we found above. */ 5256 foreach(lc, useful_pathkeys_list) 5257 { 5258 double rows; 5259 int width; 5260 Cost startup_cost; 5261 Cost total_cost; 5262 List *useful_pathkeys = lfirst(lc); 5263 Path *sorted_epq_path; 5264 5265 estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL, 5266 &rows, &width, &startup_cost, &total_cost); 5267 5268 /* 5269 * The EPQ path must be at least as well sorted as the path itself, in 5270 * case it gets used as input to a mergejoin. 5271 */ 5272 sorted_epq_path = epq_path; 5273 if (sorted_epq_path != NULL && 5274 !pathkeys_contained_in(useful_pathkeys, 5275 sorted_epq_path->pathkeys)) 5276 sorted_epq_path = (Path *) 5277 create_sort_path(root, 5278 rel, 5279 sorted_epq_path, 5280 useful_pathkeys, 5281 -1.0); 5282 5283 if (IS_SIMPLE_REL(rel)) 5284 add_path(rel, (Path *) 5285 create_foreignscan_path(root, rel, 5286 NULL, 5287 rows, 5288 startup_cost, 5289 total_cost, 5290 useful_pathkeys, 5291 rel->lateral_relids, 5292 sorted_epq_path, 5293 NIL)); 5294 else 5295 add_path(rel, (Path *) 5296 create_foreign_join_path(root, rel, 5297 NULL, 5298 rows, 5299 startup_cost, 5300 total_cost, 5301 useful_pathkeys, 5302 rel->lateral_relids, 5303 sorted_epq_path, 5304 NIL)); 5305 } 5306 } 5307 5308 /* 5309 * Parse options from foreign server and apply them to fpinfo. 5310 * 5311 * New options might also require tweaking merge_fdw_options(). 5312 */ 5313 static void 5314 apply_server_options(PgFdwRelationInfo *fpinfo) 5315 { 5316 ListCell *lc; 5317 5318 foreach(lc, fpinfo->server->options) 5319 { 5320 DefElem *def = (DefElem *) lfirst(lc); 5321 5322 if (strcmp(def->defname, "use_remote_estimate") == 0) 5323 fpinfo->use_remote_estimate = defGetBoolean(def); 5324 else if (strcmp(def->defname, "fdw_startup_cost") == 0) 5325 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL); 5326 else if (strcmp(def->defname, "fdw_tuple_cost") == 0) 5327 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL); 5328 else if (strcmp(def->defname, "extensions") == 0) 5329 fpinfo->shippable_extensions = 5330 ExtractExtensionList(defGetString(def), false); 5331 else if (strcmp(def->defname, "fetch_size") == 0) 5332 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); 5333 } 5334 } 5335 5336 /* 5337 * Parse options from foreign table and apply them to fpinfo. 5338 * 5339 * New options might also require tweaking merge_fdw_options(). 5340 */ 5341 static void 5342 apply_table_options(PgFdwRelationInfo *fpinfo) 5343 { 5344 ListCell *lc; 5345 5346 foreach(lc, fpinfo->table->options) 5347 { 5348 DefElem *def = (DefElem *) lfirst(lc); 5349 5350 if (strcmp(def->defname, "use_remote_estimate") == 0) 5351 fpinfo->use_remote_estimate = defGetBoolean(def); 5352 else if (strcmp(def->defname, "fetch_size") == 0) 5353 fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); 5354 } 5355 } 5356 5357 /* 5358 * Merge FDW options from input relations into a new set of options for a join 5359 * or an upper rel. 5360 * 5361 * For a join relation, FDW-specific information about the inner and outer 5362 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, 5363 * fpinfo_o provides the information for the input relation; fpinfo_i is 5364 * expected to NULL. 5365 */ 5366 static void 5367 merge_fdw_options(PgFdwRelationInfo *fpinfo, 5368 const PgFdwRelationInfo *fpinfo_o, 5369 const PgFdwRelationInfo *fpinfo_i) 5370 { 5371 /* We must always have fpinfo_o. */ 5372 Assert(fpinfo_o); 5373 5374 /* fpinfo_i may be NULL, but if present the servers must both match. */ 5375 Assert(!fpinfo_i || 5376 fpinfo_i->server->serverid == fpinfo_o->server->serverid); 5377 5378 /* 5379 * Copy the server specific FDW options. (For a join, both relations come 5380 * from the same server, so the server options should have the same value 5381 * for both relations.) 5382 */ 5383 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; 5384 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; 5385 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; 5386 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; 5387 fpinfo->fetch_size = fpinfo_o->fetch_size; 5388 5389 /* Merge the table level options from either side of the join. */ 5390 if (fpinfo_i) 5391 { 5392 /* 5393 * We'll prefer to use remote estimates for this join if any table 5394 * from either side of the join is using remote estimates. This is 5395 * most likely going to be preferred since they're already willing to 5396 * pay the price of a round trip to get the remote EXPLAIN. In any 5397 * case it's not entirely clear how we might otherwise handle this 5398 * best. 5399 */ 5400 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || 5401 fpinfo_i->use_remote_estimate; 5402 5403 /* 5404 * Set fetch size to maximum of the joining sides, since we are 5405 * expecting the rows returned by the join to be proportional to the 5406 * relation sizes. 5407 */ 5408 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); 5409 } 5410 } 5411 5412 /* 5413 * postgresGetForeignJoinPaths 5414 * Add possible ForeignPath to joinrel, if join is safe to push down. 5415 */ 5416 static void 5417 postgresGetForeignJoinPaths(PlannerInfo *root, 5418 RelOptInfo *joinrel, 5419 RelOptInfo *outerrel, 5420 RelOptInfo *innerrel, 5421 JoinType jointype, 5422 JoinPathExtraData *extra) 5423 { 5424 PgFdwRelationInfo *fpinfo; 5425 ForeignPath *joinpath; 5426 double rows; 5427 int width; 5428 Cost startup_cost; 5429 Cost total_cost; 5430 Path *epq_path; /* Path to create plan to be executed when 5431 * EvalPlanQual gets triggered. */ 5432 5433 /* 5434 * Skip if this join combination has been considered already. 5435 */ 5436 if (joinrel->fdw_private) 5437 return; 5438 5439 /* 5440 * This code does not work for joins with lateral references, since those 5441 * must have parameterized paths, which we don't generate yet. 5442 */ 5443 if (!bms_is_empty(joinrel->lateral_relids)) 5444 return; 5445 5446 /* 5447 * Create unfinished PgFdwRelationInfo entry which is used to indicate 5448 * that the join relation is already considered, so that we won't waste 5449 * time in judging safety of join pushdown and adding the same paths again 5450 * if found safe. Once we know that this join can be pushed down, we fill 5451 * the entry. 5452 */ 5453 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); 5454 fpinfo->pushdown_safe = false; 5455 joinrel->fdw_private = fpinfo; 5456 /* attrs_used is only for base relations. */ 5457 fpinfo->attrs_used = NULL; 5458 5459 /* 5460 * If there is a possibility that EvalPlanQual will be executed, we need 5461 * to be able to reconstruct the row using scans of the base relations. 5462 * GetExistingLocalJoinPath will find a suitable path for this purpose in 5463 * the path list of the joinrel, if one exists. We must be careful to 5464 * call it before adding any ForeignPath, since the ForeignPath might 5465 * dominate the only suitable local path available. We also do it before 5466 * calling foreign_join_ok(), since that function updates fpinfo and marks 5467 * it as pushable if the join is found to be pushable. 5468 */ 5469 if (root->parse->commandType == CMD_DELETE || 5470 root->parse->commandType == CMD_UPDATE || 5471 root->rowMarks) 5472 { 5473 epq_path = GetExistingLocalJoinPath(joinrel); 5474 if (!epq_path) 5475 { 5476 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found"); 5477 return; 5478 } 5479 } 5480 else 5481 epq_path = NULL; 5482 5483 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra)) 5484 { 5485 /* Free path required for EPQ if we copied one; we don't need it now */ 5486 if (epq_path) 5487 pfree(epq_path); 5488 return; 5489 } 5490 5491 /* 5492 * Compute the selectivity and cost of the local_conds, so we don't have 5493 * to do it over again for each path. The best we can do for these 5494 * conditions is to estimate selectivity on the basis of local statistics. 5495 * The local conditions are applied after the join has been computed on 5496 * the remote side like quals in WHERE clause, so pass jointype as 5497 * JOIN_INNER. 5498 */ 5499 fpinfo->local_conds_sel = clauselist_selectivity(root, 5500 fpinfo->local_conds, 5501 0, 5502 JOIN_INNER, 5503 NULL); 5504 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); 5505 5506 /* 5507 * If we are going to estimate costs locally, estimate the join clause 5508 * selectivity here while we have special join info. 5509 */ 5510 if (!fpinfo->use_remote_estimate) 5511 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, 5512 0, fpinfo->jointype, 5513 extra->sjinfo); 5514 5515 /* Estimate costs for bare join relation */ 5516 estimate_path_cost_size(root, joinrel, NIL, NIL, NULL, 5517 &rows, &width, &startup_cost, &total_cost); 5518 /* Now update this information in the joinrel */ 5519 joinrel->rows = rows; 5520 joinrel->reltarget->width = width; 5521 fpinfo->rows = rows; 5522 fpinfo->width = width; 5523 fpinfo->startup_cost = startup_cost; 5524 fpinfo->total_cost = total_cost; 5525 5526 /* 5527 * Create a new join path and add it to the joinrel which represents a 5528 * join between foreign tables. 5529 */ 5530 joinpath = create_foreign_join_path(root, 5531 joinrel, 5532 NULL, /* default pathtarget */ 5533 rows, 5534 startup_cost, 5535 total_cost, 5536 NIL, /* no pathkeys */ 5537 joinrel->lateral_relids, 5538 epq_path, 5539 NIL); /* no fdw_private */ 5540 5541 /* Add generated path into joinrel by add_path(). */ 5542 add_path(joinrel, (Path *) joinpath); 5543 5544 /* Consider pathkeys for the join relation */ 5545 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path); 5546 5547 /* XXX Consider parameterized paths for the join relation */ 5548 } 5549 5550 /* 5551 * Assess whether the aggregation, grouping and having operations can be pushed 5552 * down to the foreign server. As a side effect, save information we obtain in 5553 * this function to PgFdwRelationInfo of the input relation. 5554 */ 5555 static bool 5556 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, 5557 Node *havingQual) 5558 { 5559 Query *query = root->parse; 5560 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; 5561 PathTarget *grouping_target = grouped_rel->reltarget; 5562 PgFdwRelationInfo *ofpinfo; 5563 ListCell *lc; 5564 int i; 5565 List *tlist = NIL; 5566 5567 /* We currently don't support pushing Grouping Sets. */ 5568 if (query->groupingSets) 5569 return false; 5570 5571 /* Get the fpinfo of the underlying scan relation. */ 5572 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; 5573 5574 /* 5575 * If underlying scan relation has any local conditions, those conditions 5576 * are required to be applied before performing aggregation. Hence the 5577 * aggregate cannot be pushed down. 5578 */ 5579 if (ofpinfo->local_conds) 5580 return false; 5581 5582 /* 5583 * Examine grouping expressions, as well as other expressions we'd need to 5584 * compute, and check whether they are safe to push down to the foreign 5585 * server. All GROUP BY expressions will be part of the grouping target 5586 * and thus there is no need to search for them separately. Add grouping 5587 * expressions into target list which will be passed to foreign server. 5588 * 5589 * A tricky fine point is that we must not put any expression into the 5590 * target list that is just a foreign param (that is, something that 5591 * deparse.c would conclude has to be sent to the foreign server). If we 5592 * do, the expression will also appear in the fdw_exprs list of the plan 5593 * node, and setrefs.c will get confused and decide that the fdw_exprs 5594 * entry is actually a reference to the fdw_scan_tlist entry, resulting in 5595 * a broken plan. Somewhat oddly, it's OK if the expression contains such 5596 * a node, as long as it's not at top level; then no match is possible. 5597 */ 5598 i = 0; 5599 foreach(lc, grouping_target->exprs) 5600 { 5601 Expr *expr = (Expr *) lfirst(lc); 5602 Index sgref = get_pathtarget_sortgroupref(grouping_target, i); 5603 ListCell *l; 5604 5605 /* Check whether this expression is part of GROUP BY clause */ 5606 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) 5607 { 5608 TargetEntry *tle; 5609 5610 /* 5611 * If any GROUP BY expression is not shippable, then we cannot 5612 * push down aggregation to the foreign server. 5613 */ 5614 if (!is_foreign_expr(root, grouped_rel, expr)) 5615 return false; 5616 5617 /* 5618 * If it would be a foreign param, we can't put it into the tlist, 5619 * so we have to fail. 5620 */ 5621 if (is_foreign_param(root, grouped_rel, expr)) 5622 return false; 5623 5624 /* 5625 * Pushable, so add to tlist. We need to create a TLE for this 5626 * expression and apply the sortgroupref to it. We cannot use 5627 * add_to_flat_tlist() here because that avoids making duplicate 5628 * entries in the tlist. If there are duplicate entries with 5629 * distinct sortgrouprefs, we have to duplicate that situation in 5630 * the output tlist. 5631 */ 5632 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); 5633 tle->ressortgroupref = sgref; 5634 tlist = lappend(tlist, tle); 5635 } 5636 else 5637 { 5638 /* 5639 * Non-grouping expression we need to compute. Can we ship it 5640 * as-is to the foreign server? 5641 */ 5642 if (is_foreign_expr(root, grouped_rel, expr) && 5643 !is_foreign_param(root, grouped_rel, expr)) 5644 { 5645 /* Yes, so add to tlist as-is; OK to suppress duplicates */ 5646 tlist = add_to_flat_tlist(tlist, list_make1(expr)); 5647 } 5648 else 5649 { 5650 /* Not pushable as a whole; extract its Vars and aggregates */ 5651 List *aggvars; 5652 5653 aggvars = pull_var_clause((Node *) expr, 5654 PVC_INCLUDE_AGGREGATES); 5655 5656 /* 5657 * If any aggregate expression is not shippable, then we 5658 * cannot push down aggregation to the foreign server. (We 5659 * don't have to check is_foreign_param, since that certainly 5660 * won't return true for any such expression.) 5661 */ 5662 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) 5663 return false; 5664 5665 /* 5666 * Add aggregates, if any, into the targetlist. Plain Vars 5667 * outside an aggregate can be ignored, because they should be 5668 * either same as some GROUP BY column or part of some GROUP 5669 * BY expression. In either case, they are already part of 5670 * the targetlist and thus no need to add them again. In fact 5671 * including plain Vars in the tlist when they do not match a 5672 * GROUP BY column would cause the foreign server to complain 5673 * that the shipped query is invalid. 5674 */ 5675 foreach(l, aggvars) 5676 { 5677 Expr *expr = (Expr *) lfirst(l); 5678 5679 if (IsA(expr, Aggref)) 5680 tlist = add_to_flat_tlist(tlist, list_make1(expr)); 5681 } 5682 } 5683 } 5684 5685 i++; 5686 } 5687 5688 /* 5689 * Classify the pushable and non-pushable HAVING clauses and save them in 5690 * remote_conds and local_conds of the grouped rel's fpinfo. 5691 */ 5692 if (havingQual) 5693 { 5694 ListCell *lc; 5695 5696 foreach(lc, (List *) havingQual) 5697 { 5698 Expr *expr = (Expr *) lfirst(lc); 5699 RestrictInfo *rinfo; 5700 5701 /* 5702 * Currently, the core code doesn't wrap havingQuals in 5703 * RestrictInfos, so we must make our own. 5704 */ 5705 Assert(!IsA(expr, RestrictInfo)); 5706 rinfo = make_restrictinfo(root, 5707 expr, 5708 true, 5709 false, 5710 false, 5711 root->qual_security_level, 5712 grouped_rel->relids, 5713 NULL, 5714 NULL); 5715 if (is_foreign_expr(root, grouped_rel, expr)) 5716 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); 5717 else 5718 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); 5719 } 5720 } 5721 5722 /* 5723 * If there are any local conditions, pull Vars and aggregates from it and 5724 * check whether they are safe to pushdown or not. 5725 */ 5726 if (fpinfo->local_conds) 5727 { 5728 List *aggvars = NIL; 5729 ListCell *lc; 5730 5731 foreach(lc, fpinfo->local_conds) 5732 { 5733 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); 5734 5735 aggvars = list_concat(aggvars, 5736 pull_var_clause((Node *) rinfo->clause, 5737 PVC_INCLUDE_AGGREGATES)); 5738 } 5739 5740 foreach(lc, aggvars) 5741 { 5742 Expr *expr = (Expr *) lfirst(lc); 5743 5744 /* 5745 * If aggregates within local conditions are not safe to push 5746 * down, then we cannot push down the query. Vars are already 5747 * part of GROUP BY clause which are checked above, so no need to 5748 * access them again here. Again, we need not check 5749 * is_foreign_param for a foreign aggregate. 5750 */ 5751 if (IsA(expr, Aggref)) 5752 { 5753 if (!is_foreign_expr(root, grouped_rel, expr)) 5754 return false; 5755 5756 tlist = add_to_flat_tlist(tlist, list_make1(expr)); 5757 } 5758 } 5759 } 5760 5761 /* Store generated targetlist */ 5762 fpinfo->grouped_tlist = tlist; 5763 5764 /* Safe to pushdown */ 5765 fpinfo->pushdown_safe = true; 5766 5767 /* 5768 * Set # of retrieved rows and cached relation costs to some negative 5769 * value, so that we can detect when they are set to some sensible values, 5770 * during one (usually the first) of the calls to estimate_path_cost_size. 5771 */ 5772 fpinfo->retrieved_rows = -1; 5773 fpinfo->rel_startup_cost = -1; 5774 fpinfo->rel_total_cost = -1; 5775 5776 /* 5777 * Set the string describing this grouped relation to be used in EXPLAIN 5778 * output of corresponding ForeignScan. 5779 */ 5780 fpinfo->relation_name = makeStringInfo(); 5781 appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)", 5782 ofpinfo->relation_name->data); 5783 5784 return true; 5785 } 5786 5787 /* 5788 * postgresGetForeignUpperPaths 5789 * Add paths for post-join operations like aggregation, grouping etc. if 5790 * corresponding operations are safe to push down. 5791 */ 5792 static void 5793 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, 5794 RelOptInfo *input_rel, RelOptInfo *output_rel, 5795 void *extra) 5796 { 5797 PgFdwRelationInfo *fpinfo; 5798 5799 /* 5800 * If input rel is not safe to pushdown, then simply return as we cannot 5801 * perform any post-join operations on the foreign server. 5802 */ 5803 if (!input_rel->fdw_private || 5804 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) 5805 return; 5806 5807 /* Ignore stages we don't support; and skip any duplicate calls. */ 5808 if ((stage != UPPERREL_GROUP_AGG && 5809 stage != UPPERREL_ORDERED && 5810 stage != UPPERREL_FINAL) || 5811 output_rel->fdw_private) 5812 return; 5813 5814 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); 5815 fpinfo->pushdown_safe = false; 5816 fpinfo->stage = stage; 5817 output_rel->fdw_private = fpinfo; 5818 5819 switch (stage) 5820 { 5821 case UPPERREL_GROUP_AGG: 5822 add_foreign_grouping_paths(root, input_rel, output_rel, 5823 (GroupPathExtraData *) extra); 5824 break; 5825 case UPPERREL_ORDERED: 5826 add_foreign_ordered_paths(root, input_rel, output_rel); 5827 break; 5828 case UPPERREL_FINAL: 5829 add_foreign_final_paths(root, input_rel, output_rel, 5830 (FinalPathExtraData *) extra); 5831 break; 5832 default: 5833 elog(ERROR, "unexpected upper relation: %d", (int) stage); 5834 break; 5835 } 5836 } 5837 5838 /* 5839 * add_foreign_grouping_paths 5840 * Add foreign path for grouping and/or aggregation. 5841 * 5842 * Given input_rel represents the underlying scan. The paths are added to the 5843 * given grouped_rel. 5844 */ 5845 static void 5846 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, 5847 RelOptInfo *grouped_rel, 5848 GroupPathExtraData *extra) 5849 { 5850 Query *parse = root->parse; 5851 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; 5852 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; 5853 ForeignPath *grouppath; 5854 double rows; 5855 int width; 5856 Cost startup_cost; 5857 Cost total_cost; 5858 5859 /* Nothing to be done, if there is no grouping or aggregation required. */ 5860 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && 5861 !root->hasHavingQual) 5862 return; 5863 5864 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || 5865 extra->patype == PARTITIONWISE_AGGREGATE_FULL); 5866 5867 /* save the input_rel as outerrel in fpinfo */ 5868 fpinfo->outerrel = input_rel; 5869 5870 /* 5871 * Copy foreign table, foreign server, user mapping, FDW options etc. 5872 * details from the input relation's fpinfo. 5873 */ 5874 fpinfo->table = ifpinfo->table; 5875 fpinfo->server = ifpinfo->server; 5876 fpinfo->user = ifpinfo->user; 5877 merge_fdw_options(fpinfo, ifpinfo, NULL); 5878 5879 /* 5880 * Assess if it is safe to push down aggregation and grouping. 5881 * 5882 * Use HAVING qual from extra. In case of child partition, it will have 5883 * translated Vars. 5884 */ 5885 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual)) 5886 return; 5887 5888 /* 5889 * Compute the selectivity and cost of the local_conds, so we don't have 5890 * to do it over again for each path. (Currently we create just a single 5891 * path here, but in future it would be possible that we build more paths 5892 * such as pre-sorted paths as in postgresGetForeignPaths and 5893 * postgresGetForeignJoinPaths.) The best we can do for these conditions 5894 * is to estimate selectivity on the basis of local statistics. 5895 */ 5896 fpinfo->local_conds_sel = clauselist_selectivity(root, 5897 fpinfo->local_conds, 5898 0, 5899 JOIN_INNER, 5900 NULL); 5901 5902 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); 5903 5904 /* Estimate the cost of push down */ 5905 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL, 5906 &rows, &width, &startup_cost, &total_cost); 5907 5908 /* Now update this information in the fpinfo */ 5909 fpinfo->rows = rows; 5910 fpinfo->width = width; 5911 fpinfo->startup_cost = startup_cost; 5912 fpinfo->total_cost = total_cost; 5913 5914 /* Create and add foreign path to the grouping relation. */ 5915 grouppath = create_foreign_upper_path(root, 5916 grouped_rel, 5917 grouped_rel->reltarget, 5918 rows, 5919 startup_cost, 5920 total_cost, 5921 NIL, /* no pathkeys */ 5922 NULL, 5923 NIL); /* no fdw_private */ 5924 5925 /* Add generated path into grouped_rel by add_path(). */ 5926 add_path(grouped_rel, (Path *) grouppath); 5927 } 5928 5929 /* 5930 * add_foreign_ordered_paths 5931 * Add foreign paths for performing the final sort remotely. 5932 * 5933 * Given input_rel contains the source-data Paths. The paths are added to the 5934 * given ordered_rel. 5935 */ 5936 static void 5937 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, 5938 RelOptInfo *ordered_rel) 5939 { 5940 Query *parse = root->parse; 5941 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; 5942 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private; 5943 PgFdwPathExtraData *fpextra; 5944 double rows; 5945 int width; 5946 Cost startup_cost; 5947 Cost total_cost; 5948 List *fdw_private; 5949 ForeignPath *ordered_path; 5950 ListCell *lc; 5951 5952 /* Shouldn't get here unless the query has ORDER BY */ 5953 Assert(parse->sortClause); 5954 5955 /* We don't support cases where there are any SRFs in the targetlist */ 5956 if (parse->hasTargetSRFs) 5957 return; 5958 5959 /* Save the input_rel as outerrel in fpinfo */ 5960 fpinfo->outerrel = input_rel; 5961 5962 /* 5963 * Copy foreign table, foreign server, user mapping, FDW options etc. 5964 * details from the input relation's fpinfo. 5965 */ 5966 fpinfo->table = ifpinfo->table; 5967 fpinfo->server = ifpinfo->server; 5968 fpinfo->user = ifpinfo->user; 5969 merge_fdw_options(fpinfo, ifpinfo, NULL); 5970 5971 /* 5972 * If the input_rel is a base or join relation, we would already have 5973 * considered pushing down the final sort to the remote server when 5974 * creating pre-sorted foreign paths for that relation, because the 5975 * query_pathkeys is set to the root->sort_pathkeys in that case (see 5976 * standard_qp_callback()). 5977 */ 5978 if (input_rel->reloptkind == RELOPT_BASEREL || 5979 input_rel->reloptkind == RELOPT_JOINREL) 5980 { 5981 Assert(root->query_pathkeys == root->sort_pathkeys); 5982 5983 /* Safe to push down if the query_pathkeys is safe to push down */ 5984 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe; 5985 5986 return; 5987 } 5988 5989 /* The input_rel should be a grouping relation */ 5990 Assert(input_rel->reloptkind == RELOPT_UPPER_REL && 5991 ifpinfo->stage == UPPERREL_GROUP_AGG); 5992 5993 /* 5994 * We try to create a path below by extending a simple foreign path for 5995 * the underlying grouping relation to perform the final sort remotely, 5996 * which is stored into the fdw_private list of the resulting path. 5997 */ 5998 5999 /* Assess if it is safe to push down the final sort */ 6000 foreach(lc, root->sort_pathkeys) 6001 { 6002 PathKey *pathkey = (PathKey *) lfirst(lc); 6003 EquivalenceClass *pathkey_ec = pathkey->pk_eclass; 6004 Expr *sort_expr; 6005 6006 /* 6007 * is_foreign_expr would detect volatile expressions as well, but 6008 * checking ec_has_volatile here saves some cycles. 6009 */ 6010 if (pathkey_ec->ec_has_volatile) 6011 return; 6012 6013 /* Get the sort expression for the pathkey_ec */ 6014 sort_expr = find_em_expr_for_input_target(root, 6015 pathkey_ec, 6016 input_rel->reltarget); 6017 6018 /* If it's unsafe to remote, we cannot push down the final sort */ 6019 if (!is_foreign_expr(root, input_rel, sort_expr)) 6020 return; 6021 } 6022 6023 /* Safe to push down */ 6024 fpinfo->pushdown_safe = true; 6025 6026 /* Construct PgFdwPathExtraData */ 6027 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); 6028 fpextra->target = root->upper_targets[UPPERREL_ORDERED]; 6029 fpextra->has_final_sort = true; 6030 6031 /* Estimate the costs of performing the final sort remotely */ 6032 estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra, 6033 &rows, &width, &startup_cost, &total_cost); 6034 6035 /* 6036 * Build the fdw_private list that will be used by postgresGetForeignPlan. 6037 * Items in the list must match order in enum FdwPathPrivateIndex. 6038 */ 6039 fdw_private = list_make2(makeInteger(true), makeInteger(false)); 6040 6041 /* Create foreign ordering path */ 6042 ordered_path = create_foreign_upper_path(root, 6043 input_rel, 6044 root->upper_targets[UPPERREL_ORDERED], 6045 rows, 6046 startup_cost, 6047 total_cost, 6048 root->sort_pathkeys, 6049 NULL, /* no extra plan */ 6050 fdw_private); 6051 6052 /* and add it to the ordered_rel */ 6053 add_path(ordered_rel, (Path *) ordered_path); 6054 } 6055 6056 /* 6057 * add_foreign_final_paths 6058 * Add foreign paths for performing the final processing remotely. 6059 * 6060 * Given input_rel contains the source-data Paths. The paths are added to the 6061 * given final_rel. 6062 */ 6063 static void 6064 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, 6065 RelOptInfo *final_rel, 6066 FinalPathExtraData *extra) 6067 { 6068 Query *parse = root->parse; 6069 PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; 6070 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private; 6071 bool has_final_sort = false; 6072 List *pathkeys = NIL; 6073 PgFdwPathExtraData *fpextra; 6074 bool save_use_remote_estimate = false; 6075 double rows; 6076 int width; 6077 Cost startup_cost; 6078 Cost total_cost; 6079 List *fdw_private; 6080 ForeignPath *final_path; 6081 6082 /* 6083 * Currently, we only support this for SELECT commands 6084 */ 6085 if (parse->commandType != CMD_SELECT) 6086 return; 6087 6088 /* 6089 * No work if there is no FOR UPDATE/SHARE clause and if there is no need 6090 * to add a LIMIT node 6091 */ 6092 if (!parse->rowMarks && !extra->limit_needed) 6093 return; 6094 6095 /* We don't support cases where there are any SRFs in the targetlist */ 6096 if (parse->hasTargetSRFs) 6097 return; 6098 6099 /* Save the input_rel as outerrel in fpinfo */ 6100 fpinfo->outerrel = input_rel; 6101 6102 /* 6103 * Copy foreign table, foreign server, user mapping, FDW options etc. 6104 * details from the input relation's fpinfo. 6105 */ 6106 fpinfo->table = ifpinfo->table; 6107 fpinfo->server = ifpinfo->server; 6108 fpinfo->user = ifpinfo->user; 6109 merge_fdw_options(fpinfo, ifpinfo, NULL); 6110 6111 /* 6112 * If there is no need to add a LIMIT node, there might be a ForeignPath 6113 * in the input_rel's pathlist that implements all behavior of the query. 6114 * Note: we would already have accounted for the query's FOR UPDATE/SHARE 6115 * (if any) before we get here. 6116 */ 6117 if (!extra->limit_needed) 6118 { 6119 ListCell *lc; 6120 6121 Assert(parse->rowMarks); 6122 6123 /* 6124 * Grouping and aggregation are not supported with FOR UPDATE/SHARE, 6125 * so the input_rel should be a base, join, or ordered relation; and 6126 * if it's an ordered relation, its input relation should be a base or 6127 * join relation. 6128 */ 6129 Assert(input_rel->reloptkind == RELOPT_BASEREL || 6130 input_rel->reloptkind == RELOPT_JOINREL || 6131 (input_rel->reloptkind == RELOPT_UPPER_REL && 6132 ifpinfo->stage == UPPERREL_ORDERED && 6133 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL || 6134 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL))); 6135 6136 foreach(lc, input_rel->pathlist) 6137 { 6138 Path *path = (Path *) lfirst(lc); 6139 6140 /* 6141 * apply_scanjoin_target_to_paths() uses create_projection_path() 6142 * to adjust each of its input paths if needed, whereas 6143 * create_ordered_paths() uses apply_projection_to_path() to do 6144 * that. So the former might have put a ProjectionPath on top of 6145 * the ForeignPath; look through ProjectionPath and see if the 6146 * path underneath it is ForeignPath. 6147 */ 6148 if (IsA(path, ForeignPath) || 6149 (IsA(path, ProjectionPath) && 6150 IsA(((ProjectionPath *) path)->subpath, ForeignPath))) 6151 { 6152 /* 6153 * Create foreign final path; this gets rid of a 6154 * no-longer-needed outer plan (if any), which makes the 6155 * EXPLAIN output look cleaner 6156 */ 6157 final_path = create_foreign_upper_path(root, 6158 path->parent, 6159 path->pathtarget, 6160 path->rows, 6161 path->startup_cost, 6162 path->total_cost, 6163 path->pathkeys, 6164 NULL, /* no extra plan */ 6165 NULL); /* no fdw_private */ 6166 6167 /* and add it to the final_rel */ 6168 add_path(final_rel, (Path *) final_path); 6169 6170 /* Safe to push down */ 6171 fpinfo->pushdown_safe = true; 6172 6173 return; 6174 } 6175 } 6176 6177 /* 6178 * If we get here it means no ForeignPaths; since we would already 6179 * have considered pushing down all operations for the query to the 6180 * remote server, give up on it. 6181 */ 6182 return; 6183 } 6184 6185 Assert(extra->limit_needed); 6186 6187 /* 6188 * If the input_rel is an ordered relation, replace the input_rel with its 6189 * input relation 6190 */ 6191 if (input_rel->reloptkind == RELOPT_UPPER_REL && 6192 ifpinfo->stage == UPPERREL_ORDERED) 6193 { 6194 input_rel = ifpinfo->outerrel; 6195 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; 6196 has_final_sort = true; 6197 pathkeys = root->sort_pathkeys; 6198 } 6199 6200 /* The input_rel should be a base, join, or grouping relation */ 6201 Assert(input_rel->reloptkind == RELOPT_BASEREL || 6202 input_rel->reloptkind == RELOPT_JOINREL || 6203 (input_rel->reloptkind == RELOPT_UPPER_REL && 6204 ifpinfo->stage == UPPERREL_GROUP_AGG)); 6205 6206 /* 6207 * We try to create a path below by extending a simple foreign path for 6208 * the underlying base, join, or grouping relation to perform the final 6209 * sort (if has_final_sort) and the LIMIT restriction remotely, which is 6210 * stored into the fdw_private list of the resulting path. (We 6211 * re-estimate the costs of sorting the underlying relation, if 6212 * has_final_sort.) 6213 */ 6214 6215 /* 6216 * Assess if it is safe to push down the LIMIT and OFFSET to the remote 6217 * server 6218 */ 6219 6220 /* 6221 * If the underlying relation has any local conditions, the LIMIT/OFFSET 6222 * cannot be pushed down. 6223 */ 6224 if (ifpinfo->local_conds) 6225 return; 6226 6227 /* 6228 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are 6229 * not safe to remote. 6230 */ 6231 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) || 6232 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount)) 6233 return; 6234 6235 /* Safe to push down */ 6236 fpinfo->pushdown_safe = true; 6237 6238 /* Construct PgFdwPathExtraData */ 6239 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); 6240 fpextra->target = root->upper_targets[UPPERREL_FINAL]; 6241 fpextra->has_final_sort = has_final_sort; 6242 fpextra->has_limit = extra->limit_needed; 6243 fpextra->limit_tuples = extra->limit_tuples; 6244 fpextra->count_est = extra->count_est; 6245 fpextra->offset_est = extra->offset_est; 6246 6247 /* 6248 * Estimate the costs of performing the final sort and the LIMIT 6249 * restriction remotely. If has_final_sort is false, we wouldn't need to 6250 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be 6251 * roughly estimated using the costs we already have for the underlying 6252 * relation, in the same way as when use_remote_estimate is false. Since 6253 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to 6254 * false in that case. 6255 */ 6256 if (!fpextra->has_final_sort) 6257 { 6258 save_use_remote_estimate = ifpinfo->use_remote_estimate; 6259 ifpinfo->use_remote_estimate = false; 6260 } 6261 estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra, 6262 &rows, &width, &startup_cost, &total_cost); 6263 if (!fpextra->has_final_sort) 6264 ifpinfo->use_remote_estimate = save_use_remote_estimate; 6265 6266 /* 6267 * Build the fdw_private list that will be used by postgresGetForeignPlan. 6268 * Items in the list must match order in enum FdwPathPrivateIndex. 6269 */ 6270 fdw_private = list_make2(makeInteger(has_final_sort), 6271 makeInteger(extra->limit_needed)); 6272 6273 /* 6274 * Create foreign final path; this gets rid of a no-longer-needed outer 6275 * plan (if any), which makes the EXPLAIN output look cleaner 6276 */ 6277 final_path = create_foreign_upper_path(root, 6278 input_rel, 6279 root->upper_targets[UPPERREL_FINAL], 6280 rows, 6281 startup_cost, 6282 total_cost, 6283 pathkeys, 6284 NULL, /* no extra plan */ 6285 fdw_private); 6286 6287 /* and add it to the final_rel */ 6288 add_path(final_rel, (Path *) final_path); 6289 } 6290 6291 /* 6292 * Create a tuple from the specified row of the PGresult. 6293 * 6294 * rel is the local representation of the foreign table, attinmeta is 6295 * conversion data for the rel's tupdesc, and retrieved_attrs is an 6296 * integer list of the table column numbers present in the PGresult. 6297 * fsstate is the ForeignScan plan node's execution state. 6298 * temp_context is a working context that can be reset after each tuple. 6299 * 6300 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL 6301 * if we're processing a remote join, while fsstate is NULL in a non-query 6302 * context such as ANALYZE, or if we're processing a non-scan query node. 6303 */ 6304 static HeapTuple 6305 make_tuple_from_result_row(PGresult *res, 6306 int row, 6307 Relation rel, 6308 AttInMetadata *attinmeta, 6309 List *retrieved_attrs, 6310 ForeignScanState *fsstate, 6311 MemoryContext temp_context) 6312 { 6313 HeapTuple tuple; 6314 TupleDesc tupdesc; 6315 Datum *values; 6316 bool *nulls; 6317 ItemPointer ctid = NULL; 6318 ConversionLocation errpos; 6319 ErrorContextCallback errcallback; 6320 MemoryContext oldcontext; 6321 ListCell *lc; 6322 int j; 6323 6324 Assert(row < PQntuples(res)); 6325 6326 /* 6327 * Do the following work in a temp context that we reset after each tuple. 6328 * This cleans up not only the data we have direct access to, but any 6329 * cruft the I/O functions might leak. 6330 */ 6331 oldcontext = MemoryContextSwitchTo(temp_context); 6332 6333 /* 6334 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is 6335 * provided, otherwise look to the scan node's ScanTupleSlot. 6336 */ 6337 if (rel) 6338 tupdesc = RelationGetDescr(rel); 6339 else 6340 { 6341 Assert(fsstate); 6342 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; 6343 } 6344 6345 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); 6346 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); 6347 /* Initialize to nulls for any columns not present in result */ 6348 memset(nulls, true, tupdesc->natts * sizeof(bool)); 6349 6350 /* 6351 * Set up and install callback to report where conversion error occurs. 6352 */ 6353 errpos.cur_attno = 0; 6354 errpos.rel = rel; 6355 errpos.fsstate = fsstate; 6356 errcallback.callback = conversion_error_callback; 6357 errcallback.arg = (void *) &errpos; 6358 errcallback.previous = error_context_stack; 6359 error_context_stack = &errcallback; 6360 6361 /* 6362 * i indexes columns in the relation, j indexes columns in the PGresult. 6363 */ 6364 j = 0; 6365 foreach(lc, retrieved_attrs) 6366 { 6367 int i = lfirst_int(lc); 6368 char *valstr; 6369 6370 /* fetch next column's textual value */ 6371 if (PQgetisnull(res, row, j)) 6372 valstr = NULL; 6373 else 6374 valstr = PQgetvalue(res, row, j); 6375 6376 /* 6377 * convert value to internal representation 6378 * 6379 * Note: we ignore system columns other than ctid and oid in result 6380 */ 6381 errpos.cur_attno = i; 6382 if (i > 0) 6383 { 6384 /* ordinary column */ 6385 Assert(i <= tupdesc->natts); 6386 nulls[i - 1] = (valstr == NULL); 6387 /* Apply the input function even to nulls, to support domains */ 6388 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], 6389 valstr, 6390 attinmeta->attioparams[i - 1], 6391 attinmeta->atttypmods[i - 1]); 6392 } 6393 else if (i == SelfItemPointerAttributeNumber) 6394 { 6395 /* ctid */ 6396 if (valstr != NULL) 6397 { 6398 Datum datum; 6399 6400 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); 6401 ctid = (ItemPointer) DatumGetPointer(datum); 6402 } 6403 } 6404 errpos.cur_attno = 0; 6405 6406 j++; 6407 } 6408 6409 /* Uninstall error context callback. */ 6410 error_context_stack = errcallback.previous; 6411 6412 /* 6413 * Check we got the expected number of columns. Note: j == 0 and 6414 * PQnfields == 1 is expected, since deparse emits a NULL if no columns. 6415 */ 6416 if (j > 0 && j != PQnfields(res)) 6417 elog(ERROR, "remote query result does not match the foreign table"); 6418 6419 /* 6420 * Build the result tuple in caller's memory context. 6421 */ 6422 MemoryContextSwitchTo(oldcontext); 6423 6424 tuple = heap_form_tuple(tupdesc, values, nulls); 6425 6426 /* 6427 * If we have a CTID to return, install it in both t_self and t_ctid. 6428 * t_self is the normal place, but if the tuple is converted to a 6429 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be 6430 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code). 6431 */ 6432 if (ctid) 6433 tuple->t_self = tuple->t_data->t_ctid = *ctid; 6434 6435 /* 6436 * Stomp on the xmin, xmax, and cmin fields from the tuple created by 6437 * heap_form_tuple. heap_form_tuple actually creates the tuple with 6438 * DatumTupleFields, not HeapTupleFields, but the executor expects 6439 * HeapTupleFields and will happily extract system columns on that 6440 * assumption. If we don't do this then, for example, the tuple length 6441 * ends up in the xmin field, which isn't what we want. 6442 */ 6443 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId); 6444 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId); 6445 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId); 6446 6447 /* Clean up */ 6448 MemoryContextReset(temp_context); 6449 6450 return tuple; 6451 } 6452 6453 /* 6454 * Callback function which is called when error occurs during column value 6455 * conversion. Print names of column and relation. 6456 * 6457 * Note that this function mustn't do any catalog lookups, since we are in 6458 * an already-failed transaction. Fortunately, we can get the needed info 6459 * from the relation or the query's rangetable instead. 6460 */ 6461 static void 6462 conversion_error_callback(void *arg) 6463 { 6464 ConversionLocation *errpos = (ConversionLocation *) arg; 6465 Relation rel = errpos->rel; 6466 ForeignScanState *fsstate = errpos->fsstate; 6467 const char *attname = NULL; 6468 const char *relname = NULL; 6469 bool is_wholerow = false; 6470 6471 /* 6472 * If we're in a scan node, always use aliases from the rangetable, for 6473 * consistency between the simple-relation and remote-join cases. Look at 6474 * the relation's tupdesc only if we're not in a scan node. 6475 */ 6476 if (fsstate) 6477 { 6478 /* ForeignScan case */ 6479 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan); 6480 int varno = 0; 6481 AttrNumber colno = 0; 6482 6483 if (fsplan->scan.scanrelid > 0) 6484 { 6485 /* error occurred in a scan against a foreign table */ 6486 varno = fsplan->scan.scanrelid; 6487 colno = errpos->cur_attno; 6488 } 6489 else 6490 { 6491 /* error occurred in a scan against a foreign join */ 6492 TargetEntry *tle; 6493 6494 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, 6495 errpos->cur_attno - 1); 6496 6497 /* 6498 * Target list can have Vars and expressions. For Vars, we can 6499 * get some information, however for expressions we can't. Thus 6500 * for expressions, just show generic context message. 6501 */ 6502 if (IsA(tle->expr, Var)) 6503 { 6504 Var *var = (Var *) tle->expr; 6505 6506 varno = var->varno; 6507 colno = var->varattno; 6508 } 6509 } 6510 6511 if (varno > 0) 6512 { 6513 EState *estate = fsstate->ss.ps.state; 6514 RangeTblEntry *rte = exec_rt_fetch(varno, estate); 6515 6516 relname = rte->eref->aliasname; 6517 6518 if (colno == 0) 6519 is_wholerow = true; 6520 else if (colno > 0 && colno <= list_length(rte->eref->colnames)) 6521 attname = strVal(list_nth(rte->eref->colnames, colno - 1)); 6522 else if (colno == SelfItemPointerAttributeNumber) 6523 attname = "ctid"; 6524 } 6525 } 6526 else if (rel) 6527 { 6528 /* Non-ForeignScan case (we should always have a rel here) */ 6529 TupleDesc tupdesc = RelationGetDescr(rel); 6530 6531 relname = RelationGetRelationName(rel); 6532 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) 6533 { 6534 Form_pg_attribute attr = TupleDescAttr(tupdesc, 6535 errpos->cur_attno - 1); 6536 6537 attname = NameStr(attr->attname); 6538 } 6539 else if (errpos->cur_attno == SelfItemPointerAttributeNumber) 6540 attname = "ctid"; 6541 } 6542 6543 if (relname && is_wholerow) 6544 errcontext("whole-row reference to foreign table \"%s\"", relname); 6545 else if (relname && attname) 6546 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); 6547 else 6548 errcontext("processing expression at position %d in select list", 6549 errpos->cur_attno); 6550 } 6551 6552 /* 6553 * Find an equivalence class member expression, all of whose Vars, come from 6554 * the indicated relation. 6555 */ 6556 Expr * 6557 find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel) 6558 { 6559 ListCell *lc_em; 6560 6561 foreach(lc_em, ec->ec_members) 6562 { 6563 EquivalenceMember *em = lfirst(lc_em); 6564 6565 if (bms_is_subset(em->em_relids, rel->relids) && 6566 !bms_is_empty(em->em_relids)) 6567 { 6568 /* 6569 * If there is more than one equivalence member whose Vars are 6570 * taken entirely from this relation, we'll be content to choose 6571 * any one of those. 6572 */ 6573 return em->em_expr; 6574 } 6575 } 6576 6577 /* We didn't find any suitable equivalence class expression */ 6578 return NULL; 6579 } 6580 6581 /* 6582 * Find an equivalence class member expression to be computed as a sort column 6583 * in the given target. 6584 */ 6585 Expr * 6586 find_em_expr_for_input_target(PlannerInfo *root, 6587 EquivalenceClass *ec, 6588 PathTarget *target) 6589 { 6590 ListCell *lc1; 6591 int i; 6592 6593 i = 0; 6594 foreach(lc1, target->exprs) 6595 { 6596 Expr *expr = (Expr *) lfirst(lc1); 6597 Index sgref = get_pathtarget_sortgroupref(target, i); 6598 ListCell *lc2; 6599 6600 /* Ignore non-sort expressions */ 6601 if (sgref == 0 || 6602 get_sortgroupref_clause_noerr(sgref, 6603 root->parse->sortClause) == NULL) 6604 { 6605 i++; 6606 continue; 6607 } 6608 6609 /* We ignore binary-compatible relabeling on both ends */ 6610 while (expr && IsA(expr, RelabelType)) 6611 expr = ((RelabelType *) expr)->arg; 6612 6613 /* Locate an EquivalenceClass member matching this expr, if any */ 6614 foreach(lc2, ec->ec_members) 6615 { 6616 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2); 6617 Expr *em_expr; 6618 6619 /* Don't match constants */ 6620 if (em->em_is_const) 6621 continue; 6622 6623 /* Ignore child members */ 6624 if (em->em_is_child) 6625 continue; 6626 6627 /* Match if same expression (after stripping relabel) */ 6628 em_expr = em->em_expr; 6629 while (em_expr && IsA(em_expr, RelabelType)) 6630 em_expr = ((RelabelType *) em_expr)->arg; 6631 6632 if (equal(em_expr, expr)) 6633 return em->em_expr; 6634 } 6635 6636 i++; 6637 } 6638 6639 elog(ERROR, "could not find pathkey item to sort"); 6640 return NULL; /* keep compiler quiet */ 6641 } 6642