1 /*------------------------------------------------------------------------- 2 * 3 * file_fdw.c 4 * foreign-data wrapper for server-side flat files (or programs). 5 * 6 * Copyright (c) 2010-2021, PostgreSQL Global Development Group 7 * 8 * IDENTIFICATION 9 * contrib/file_fdw/file_fdw.c 10 * 11 *------------------------------------------------------------------------- 12 */ 13 #include "postgres.h" 14 15 #include <sys/stat.h> 16 #include <unistd.h> 17 18 #include "access/htup_details.h" 19 #include "access/reloptions.h" 20 #include "access/sysattr.h" 21 #include "access/table.h" 22 #include "catalog/pg_authid.h" 23 #include "catalog/pg_foreign_table.h" 24 #include "commands/copy.h" 25 #include "commands/defrem.h" 26 #include "commands/explain.h" 27 #include "commands/vacuum.h" 28 #include "foreign/fdwapi.h" 29 #include "foreign/foreign.h" 30 #include "miscadmin.h" 31 #include "nodes/makefuncs.h" 32 #include "optimizer/optimizer.h" 33 #include "optimizer/pathnode.h" 34 #include "optimizer/planmain.h" 35 #include "optimizer/restrictinfo.h" 36 #include "utils/acl.h" 37 #include "utils/memutils.h" 38 #include "utils/rel.h" 39 #include "utils/sampling.h" 40 41 PG_MODULE_MAGIC; 42 43 /* 44 * Describes the valid options for objects that use this wrapper. 45 */ 46 struct FileFdwOption 47 { 48 const char *optname; 49 Oid optcontext; /* Oid of catalog in which option may appear */ 50 }; 51 52 /* 53 * Valid options for file_fdw. 54 * These options are based on the options for the COPY FROM command. 55 * But note that force_not_null and force_null are handled as boolean options 56 * attached to a column, not as table options. 57 * 58 * Note: If you are adding new option for user mapping, you need to modify 59 * fileGetOptions(), which currently doesn't bother to look at user mappings. 60 */ 61 static const struct FileFdwOption valid_options[] = { 62 /* Data source options */ 63 {"filename", ForeignTableRelationId}, 64 {"program", ForeignTableRelationId}, 65 66 /* Format options */ 67 /* oids option is not supported */ 68 {"format", ForeignTableRelationId}, 69 {"header", ForeignTableRelationId}, 70 {"delimiter", ForeignTableRelationId}, 71 {"quote", ForeignTableRelationId}, 72 {"escape", ForeignTableRelationId}, 73 {"null", ForeignTableRelationId}, 74 {"encoding", ForeignTableRelationId}, 75 {"force_not_null", AttributeRelationId}, 76 {"force_null", AttributeRelationId}, 77 78 /* 79 * force_quote is not supported by file_fdw because it's for COPY TO. 80 */ 81 82 /* Sentinel */ 83 {NULL, InvalidOid} 84 }; 85 86 /* 87 * FDW-specific information for RelOptInfo.fdw_private. 88 */ 89 typedef struct FileFdwPlanState 90 { 91 char *filename; /* file or program to read from */ 92 bool is_program; /* true if filename represents an OS command */ 93 List *options; /* merged COPY options, excluding filename and 94 * is_program */ 95 BlockNumber pages; /* estimate of file's physical size */ 96 double ntuples; /* estimate of number of data rows */ 97 } FileFdwPlanState; 98 99 /* 100 * FDW-specific information for ForeignScanState.fdw_state. 101 */ 102 typedef struct FileFdwExecutionState 103 { 104 char *filename; /* file or program to read from */ 105 bool is_program; /* true if filename represents an OS command */ 106 List *options; /* merged COPY options, excluding filename and 107 * is_program */ 108 CopyFromState cstate; /* COPY execution state */ 109 } FileFdwExecutionState; 110 111 /* 112 * SQL functions 113 */ 114 PG_FUNCTION_INFO_V1(file_fdw_handler); 115 PG_FUNCTION_INFO_V1(file_fdw_validator); 116 117 /* 118 * FDW callback routines 119 */ 120 static void fileGetForeignRelSize(PlannerInfo *root, 121 RelOptInfo *baserel, 122 Oid foreigntableid); 123 static void fileGetForeignPaths(PlannerInfo *root, 124 RelOptInfo *baserel, 125 Oid foreigntableid); 126 static ForeignScan *fileGetForeignPlan(PlannerInfo *root, 127 RelOptInfo *baserel, 128 Oid foreigntableid, 129 ForeignPath *best_path, 130 List *tlist, 131 List *scan_clauses, 132 Plan *outer_plan); 133 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es); 134 static void fileBeginForeignScan(ForeignScanState *node, int eflags); 135 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node); 136 static void fileReScanForeignScan(ForeignScanState *node); 137 static void fileEndForeignScan(ForeignScanState *node); 138 static bool fileAnalyzeForeignTable(Relation relation, 139 AcquireSampleRowsFunc *func, 140 BlockNumber *totalpages); 141 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, 142 RangeTblEntry *rte); 143 144 /* 145 * Helper functions 146 */ 147 static bool is_valid_option(const char *option, Oid context); 148 static void fileGetOptions(Oid foreigntableid, 149 char **filename, 150 bool *is_program, 151 List **other_options); 152 static List *get_file_fdw_attribute_options(Oid relid); 153 static bool check_selective_binary_conversion(RelOptInfo *baserel, 154 Oid foreigntableid, 155 List **columns); 156 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel, 157 FileFdwPlanState *fdw_private); 158 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, 159 FileFdwPlanState *fdw_private, 160 Cost *startup_cost, Cost *total_cost); 161 static int file_acquire_sample_rows(Relation onerel, int elevel, 162 HeapTuple *rows, int targrows, 163 double *totalrows, double *totaldeadrows); 164 165 166 /* 167 * Foreign-data wrapper handler function: return a struct with pointers 168 * to my callback routines. 169 */ 170 Datum 171 file_fdw_handler(PG_FUNCTION_ARGS) 172 { 173 FdwRoutine *fdwroutine = makeNode(FdwRoutine); 174 175 fdwroutine->GetForeignRelSize = fileGetForeignRelSize; 176 fdwroutine->GetForeignPaths = fileGetForeignPaths; 177 fdwroutine->GetForeignPlan = fileGetForeignPlan; 178 fdwroutine->ExplainForeignScan = fileExplainForeignScan; 179 fdwroutine->BeginForeignScan = fileBeginForeignScan; 180 fdwroutine->IterateForeignScan = fileIterateForeignScan; 181 fdwroutine->ReScanForeignScan = fileReScanForeignScan; 182 fdwroutine->EndForeignScan = fileEndForeignScan; 183 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable; 184 fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe; 185 186 PG_RETURN_POINTER(fdwroutine); 187 } 188 189 /* 190 * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, 191 * USER MAPPING or FOREIGN TABLE that uses file_fdw. 192 * 193 * Raise an ERROR if the option or its value is considered invalid. 194 */ 195 Datum 196 file_fdw_validator(PG_FUNCTION_ARGS) 197 { 198 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); 199 Oid catalog = PG_GETARG_OID(1); 200 char *filename = NULL; 201 DefElem *force_not_null = NULL; 202 DefElem *force_null = NULL; 203 List *other_options = NIL; 204 ListCell *cell; 205 206 /* 207 * Check that only options supported by file_fdw, and allowed for the 208 * current object type, are given. 209 */ 210 foreach(cell, options_list) 211 { 212 DefElem *def = (DefElem *) lfirst(cell); 213 214 if (!is_valid_option(def->defname, catalog)) 215 { 216 const struct FileFdwOption *opt; 217 StringInfoData buf; 218 219 /* 220 * Unknown option specified, complain about it. Provide a hint 221 * with list of valid options for the object. 222 */ 223 initStringInfo(&buf); 224 for (opt = valid_options; opt->optname; opt++) 225 { 226 if (catalog == opt->optcontext) 227 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", 228 opt->optname); 229 } 230 231 ereport(ERROR, 232 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), 233 errmsg("invalid option \"%s\"", def->defname), 234 buf.len > 0 235 ? errhint("Valid options in this context are: %s", 236 buf.data) 237 : errhint("There are no valid options in this context."))); 238 } 239 240 /* 241 * Separate out filename, program, and column-specific options, since 242 * ProcessCopyOptions won't accept them. 243 */ 244 if (strcmp(def->defname, "filename") == 0 || 245 strcmp(def->defname, "program") == 0) 246 { 247 if (filename) 248 ereport(ERROR, 249 (errcode(ERRCODE_SYNTAX_ERROR), 250 errmsg("conflicting or redundant options"))); 251 252 /* 253 * Check permissions for changing which file or program is used by 254 * the file_fdw. 255 * 256 * Only members of the role 'pg_read_server_files' are allowed to 257 * set the 'filename' option of a file_fdw foreign table, while 258 * only members of the role 'pg_execute_server_program' are 259 * allowed to set the 'program' option. This is because we don't 260 * want regular users to be able to control which file gets read 261 * or which program gets executed. 262 * 263 * Putting this sort of permissions check in a validator is a bit 264 * of a crock, but there doesn't seem to be any other place that 265 * can enforce the check more cleanly. 266 * 267 * Note that the valid_options[] array disallows setting filename 268 * and program at any options level other than foreign table --- 269 * otherwise there'd still be a security hole. 270 */ 271 if (strcmp(def->defname, "filename") == 0 && 272 !is_member_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES)) 273 ereport(ERROR, 274 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 275 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table"))); 276 277 if (strcmp(def->defname, "program") == 0 && 278 !is_member_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM)) 279 ereport(ERROR, 280 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 281 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table"))); 282 283 filename = defGetString(def); 284 } 285 286 /* 287 * force_not_null is a boolean option; after validation we can discard 288 * it - it will be retrieved later in get_file_fdw_attribute_options() 289 */ 290 else if (strcmp(def->defname, "force_not_null") == 0) 291 { 292 if (force_not_null) 293 ereport(ERROR, 294 (errcode(ERRCODE_SYNTAX_ERROR), 295 errmsg("conflicting or redundant options"), 296 errhint("Option \"force_not_null\" supplied more than once for a column."))); 297 force_not_null = def; 298 /* Don't care what the value is, as long as it's a legal boolean */ 299 (void) defGetBoolean(def); 300 } 301 /* See comments for force_not_null above */ 302 else if (strcmp(def->defname, "force_null") == 0) 303 { 304 if (force_null) 305 ereport(ERROR, 306 (errcode(ERRCODE_SYNTAX_ERROR), 307 errmsg("conflicting or redundant options"), 308 errhint("Option \"force_null\" supplied more than once for a column."))); 309 force_null = def; 310 (void) defGetBoolean(def); 311 } 312 else 313 other_options = lappend(other_options, def); 314 } 315 316 /* 317 * Now apply the core COPY code's validation logic for more checks. 318 */ 319 ProcessCopyOptions(NULL, NULL, true, other_options); 320 321 /* 322 * Either filename or program option is required for file_fdw foreign 323 * tables. 324 */ 325 if (catalog == ForeignTableRelationId && filename == NULL) 326 ereport(ERROR, 327 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), 328 errmsg("either filename or program is required for file_fdw foreign tables"))); 329 330 PG_RETURN_VOID(); 331 } 332 333 /* 334 * Check if the provided option is one of the valid options. 335 * context is the Oid of the catalog holding the object the option is for. 336 */ 337 static bool 338 is_valid_option(const char *option, Oid context) 339 { 340 const struct FileFdwOption *opt; 341 342 for (opt = valid_options; opt->optname; opt++) 343 { 344 if (context == opt->optcontext && strcmp(opt->optname, option) == 0) 345 return true; 346 } 347 return false; 348 } 349 350 /* 351 * Fetch the options for a file_fdw foreign table. 352 * 353 * We have to separate out filename/program from the other options because 354 * those must not appear in the options list passed to the core COPY code. 355 */ 356 static void 357 fileGetOptions(Oid foreigntableid, 358 char **filename, bool *is_program, List **other_options) 359 { 360 ForeignTable *table; 361 ForeignServer *server; 362 ForeignDataWrapper *wrapper; 363 List *options; 364 ListCell *lc; 365 366 /* 367 * Extract options from FDW objects. We ignore user mappings because 368 * file_fdw doesn't have any options that can be specified there. 369 * 370 * (XXX Actually, given the current contents of valid_options[], there's 371 * no point in examining anything except the foreign table's own options. 372 * Simplify?) 373 */ 374 table = GetForeignTable(foreigntableid); 375 server = GetForeignServer(table->serverid); 376 wrapper = GetForeignDataWrapper(server->fdwid); 377 378 options = NIL; 379 options = list_concat(options, wrapper->options); 380 options = list_concat(options, server->options); 381 options = list_concat(options, table->options); 382 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid)); 383 384 /* 385 * Separate out the filename or program option (we assume there is only 386 * one). 387 */ 388 *filename = NULL; 389 *is_program = false; 390 foreach(lc, options) 391 { 392 DefElem *def = (DefElem *) lfirst(lc); 393 394 if (strcmp(def->defname, "filename") == 0) 395 { 396 *filename = defGetString(def); 397 options = foreach_delete_current(options, lc); 398 break; 399 } 400 else if (strcmp(def->defname, "program") == 0) 401 { 402 *filename = defGetString(def); 403 *is_program = true; 404 options = foreach_delete_current(options, lc); 405 break; 406 } 407 } 408 409 /* 410 * The validator should have checked that filename or program was included 411 * in the options, but check again, just in case. 412 */ 413 if (*filename == NULL) 414 elog(ERROR, "either filename or program is required for file_fdw foreign tables"); 415 416 *other_options = options; 417 } 418 419 /* 420 * Retrieve per-column generic options from pg_attribute and construct a list 421 * of DefElems representing them. 422 * 423 * At the moment we only have "force_not_null", and "force_null", 424 * which should each be combined into a single DefElem listing all such 425 * columns, since that's what COPY expects. 426 */ 427 static List * 428 get_file_fdw_attribute_options(Oid relid) 429 { 430 Relation rel; 431 TupleDesc tupleDesc; 432 AttrNumber natts; 433 AttrNumber attnum; 434 List *fnncolumns = NIL; 435 List *fncolumns = NIL; 436 437 List *options = NIL; 438 439 rel = table_open(relid, AccessShareLock); 440 tupleDesc = RelationGetDescr(rel); 441 natts = tupleDesc->natts; 442 443 /* Retrieve FDW options for all user-defined attributes. */ 444 for (attnum = 1; attnum <= natts; attnum++) 445 { 446 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); 447 List *options; 448 ListCell *lc; 449 450 /* Skip dropped attributes. */ 451 if (attr->attisdropped) 452 continue; 453 454 options = GetForeignColumnOptions(relid, attnum); 455 foreach(lc, options) 456 { 457 DefElem *def = (DefElem *) lfirst(lc); 458 459 if (strcmp(def->defname, "force_not_null") == 0) 460 { 461 if (defGetBoolean(def)) 462 { 463 char *attname = pstrdup(NameStr(attr->attname)); 464 465 fnncolumns = lappend(fnncolumns, makeString(attname)); 466 } 467 } 468 else if (strcmp(def->defname, "force_null") == 0) 469 { 470 if (defGetBoolean(def)) 471 { 472 char *attname = pstrdup(NameStr(attr->attname)); 473 474 fncolumns = lappend(fncolumns, makeString(attname)); 475 } 476 } 477 /* maybe in future handle other options here */ 478 } 479 } 480 481 table_close(rel, AccessShareLock); 482 483 /* 484 * Return DefElem only when some column(s) have force_not_null / 485 * force_null options set 486 */ 487 if (fnncolumns != NIL) 488 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1)); 489 490 if (fncolumns != NIL) 491 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1)); 492 493 return options; 494 } 495 496 /* 497 * fileGetForeignRelSize 498 * Obtain relation size estimates for a foreign table 499 */ 500 static void 501 fileGetForeignRelSize(PlannerInfo *root, 502 RelOptInfo *baserel, 503 Oid foreigntableid) 504 { 505 FileFdwPlanState *fdw_private; 506 507 /* 508 * Fetch options. We only need filename (or program) at this point, but 509 * we might as well get everything and not need to re-fetch it later in 510 * planning. 511 */ 512 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState)); 513 fileGetOptions(foreigntableid, 514 &fdw_private->filename, 515 &fdw_private->is_program, 516 &fdw_private->options); 517 baserel->fdw_private = (void *) fdw_private; 518 519 /* Estimate relation size */ 520 estimate_size(root, baserel, fdw_private); 521 } 522 523 /* 524 * fileGetForeignPaths 525 * Create possible access paths for a scan on the foreign table 526 * 527 * Currently we don't support any push-down feature, so there is only one 528 * possible access path, which simply returns all records in the order in 529 * the data file. 530 */ 531 static void 532 fileGetForeignPaths(PlannerInfo *root, 533 RelOptInfo *baserel, 534 Oid foreigntableid) 535 { 536 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private; 537 Cost startup_cost; 538 Cost total_cost; 539 List *columns; 540 List *coptions = NIL; 541 542 /* Decide whether to selectively perform binary conversion */ 543 if (check_selective_binary_conversion(baserel, 544 foreigntableid, 545 &columns)) 546 coptions = list_make1(makeDefElem("convert_selectively", 547 (Node *) columns, -1)); 548 549 /* Estimate costs */ 550 estimate_costs(root, baserel, fdw_private, 551 &startup_cost, &total_cost); 552 553 /* 554 * Create a ForeignPath node and add it as only possible path. We use the 555 * fdw_private list of the path to carry the convert_selectively option; 556 * it will be propagated into the fdw_private list of the Plan node. 557 * 558 * We don't support pushing join clauses into the quals of this path, but 559 * it could still have required parameterization due to LATERAL refs in 560 * its tlist. 561 */ 562 add_path(baserel, (Path *) 563 create_foreignscan_path(root, baserel, 564 NULL, /* default pathtarget */ 565 baserel->rows, 566 startup_cost, 567 total_cost, 568 NIL, /* no pathkeys */ 569 baserel->lateral_relids, 570 NULL, /* no extra plan */ 571 coptions)); 572 573 /* 574 * If data file was sorted, and we knew it somehow, we could insert 575 * appropriate pathkeys into the ForeignPath node to tell the planner 576 * that. 577 */ 578 } 579 580 /* 581 * fileGetForeignPlan 582 * Create a ForeignScan plan node for scanning the foreign table 583 */ 584 static ForeignScan * 585 fileGetForeignPlan(PlannerInfo *root, 586 RelOptInfo *baserel, 587 Oid foreigntableid, 588 ForeignPath *best_path, 589 List *tlist, 590 List *scan_clauses, 591 Plan *outer_plan) 592 { 593 Index scan_relid = baserel->relid; 594 595 /* 596 * We have no native ability to evaluate restriction clauses, so we just 597 * put all the scan_clauses into the plan node's qual list for the 598 * executor to check. So all we have to do here is strip RestrictInfo 599 * nodes from the clauses and ignore pseudoconstants (which will be 600 * handled elsewhere). 601 */ 602 scan_clauses = extract_actual_clauses(scan_clauses, false); 603 604 /* Create the ForeignScan node */ 605 return make_foreignscan(tlist, 606 scan_clauses, 607 scan_relid, 608 NIL, /* no expressions to evaluate */ 609 best_path->fdw_private, 610 NIL, /* no custom tlist */ 611 NIL, /* no remote quals */ 612 outer_plan); 613 } 614 615 /* 616 * fileExplainForeignScan 617 * Produce extra output for EXPLAIN 618 */ 619 static void 620 fileExplainForeignScan(ForeignScanState *node, ExplainState *es) 621 { 622 char *filename; 623 bool is_program; 624 List *options; 625 626 /* Fetch options --- we only need filename and is_program at this point */ 627 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), 628 &filename, &is_program, &options); 629 630 if (is_program) 631 ExplainPropertyText("Foreign Program", filename, es); 632 else 633 ExplainPropertyText("Foreign File", filename, es); 634 635 /* Suppress file size if we're not showing cost details */ 636 if (es->costs) 637 { 638 struct stat stat_buf; 639 640 if (!is_program && 641 stat(filename, &stat_buf) == 0) 642 ExplainPropertyInteger("Foreign File Size", "b", 643 (int64) stat_buf.st_size, es); 644 } 645 } 646 647 /* 648 * fileBeginForeignScan 649 * Initiate access to the file by creating CopyState 650 */ 651 static void 652 fileBeginForeignScan(ForeignScanState *node, int eflags) 653 { 654 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; 655 char *filename; 656 bool is_program; 657 List *options; 658 CopyFromState cstate; 659 FileFdwExecutionState *festate; 660 661 /* 662 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. 663 */ 664 if (eflags & EXEC_FLAG_EXPLAIN_ONLY) 665 return; 666 667 /* Fetch options of foreign table */ 668 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), 669 &filename, &is_program, &options); 670 671 /* Add any options from the plan (currently only convert_selectively) */ 672 options = list_concat(options, plan->fdw_private); 673 674 /* 675 * Create CopyState from FDW options. We always acquire all columns, so 676 * as to match the expected ScanTupleSlot signature. 677 */ 678 cstate = BeginCopyFrom(NULL, 679 node->ss.ss_currentRelation, 680 NULL, 681 filename, 682 is_program, 683 NULL, 684 NIL, 685 options); 686 687 /* 688 * Save state in node->fdw_state. We must save enough information to call 689 * BeginCopyFrom() again. 690 */ 691 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState)); 692 festate->filename = filename; 693 festate->is_program = is_program; 694 festate->options = options; 695 festate->cstate = cstate; 696 697 node->fdw_state = (void *) festate; 698 } 699 700 /* 701 * fileIterateForeignScan 702 * Read next record from the data file and store it into the 703 * ScanTupleSlot as a virtual tuple 704 */ 705 static TupleTableSlot * 706 fileIterateForeignScan(ForeignScanState *node) 707 { 708 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 709 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; 710 bool found; 711 ErrorContextCallback errcallback; 712 713 /* Set up callback to identify error line number. */ 714 errcallback.callback = CopyFromErrorCallback; 715 errcallback.arg = (void *) festate->cstate; 716 errcallback.previous = error_context_stack; 717 error_context_stack = &errcallback; 718 719 /* 720 * The protocol for loading a virtual tuple into a slot is first 721 * ExecClearTuple, then fill the values/isnull arrays, then 722 * ExecStoreVirtualTuple. If we don't find another row in the file, we 723 * just skip the last step, leaving the slot empty as required. 724 * 725 * We can pass ExprContext = NULL because we read all columns from the 726 * file, so no need to evaluate default expressions. 727 */ 728 ExecClearTuple(slot); 729 found = NextCopyFrom(festate->cstate, NULL, 730 slot->tts_values, slot->tts_isnull); 731 if (found) 732 ExecStoreVirtualTuple(slot); 733 734 /* Remove error callback. */ 735 error_context_stack = errcallback.previous; 736 737 return slot; 738 } 739 740 /* 741 * fileReScanForeignScan 742 * Rescan table, possibly with new parameters 743 */ 744 static void 745 fileReScanForeignScan(ForeignScanState *node) 746 { 747 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 748 749 EndCopyFrom(festate->cstate); 750 751 festate->cstate = BeginCopyFrom(NULL, 752 node->ss.ss_currentRelation, 753 NULL, 754 festate->filename, 755 festate->is_program, 756 NULL, 757 NIL, 758 festate->options); 759 } 760 761 /* 762 * fileEndForeignScan 763 * Finish scanning foreign table and dispose objects used for this scan 764 */ 765 static void 766 fileEndForeignScan(ForeignScanState *node) 767 { 768 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 769 770 /* if festate is NULL, we are in EXPLAIN; nothing to do */ 771 if (festate) 772 EndCopyFrom(festate->cstate); 773 } 774 775 /* 776 * fileAnalyzeForeignTable 777 * Test whether analyzing this foreign table is supported 778 */ 779 static bool 780 fileAnalyzeForeignTable(Relation relation, 781 AcquireSampleRowsFunc *func, 782 BlockNumber *totalpages) 783 { 784 char *filename; 785 bool is_program; 786 List *options; 787 struct stat stat_buf; 788 789 /* Fetch options of foreign table */ 790 fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options); 791 792 /* 793 * If this is a program instead of a file, just return false to skip 794 * analyzing the table. We could run the program and collect stats on 795 * whatever it currently returns, but it seems likely that in such cases 796 * the output would be too volatile for the stats to be useful. Maybe 797 * there should be an option to enable doing this? 798 */ 799 if (is_program) 800 return false; 801 802 /* 803 * Get size of the file. (XXX if we fail here, would it be better to just 804 * return false to skip analyzing the table?) 805 */ 806 if (stat(filename, &stat_buf) < 0) 807 ereport(ERROR, 808 (errcode_for_file_access(), 809 errmsg("could not stat file \"%s\": %m", 810 filename))); 811 812 /* 813 * Convert size to pages. Must return at least 1 so that we can tell 814 * later on that pg_class.relpages is not default. 815 */ 816 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; 817 if (*totalpages < 1) 818 *totalpages = 1; 819 820 *func = file_acquire_sample_rows; 821 822 return true; 823 } 824 825 /* 826 * fileIsForeignScanParallelSafe 827 * Reading a file, or external program, in a parallel worker should work 828 * just the same as reading it in the leader, so mark scans safe. 829 */ 830 static bool 831 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, 832 RangeTblEntry *rte) 833 { 834 return true; 835 } 836 837 /* 838 * check_selective_binary_conversion 839 * 840 * Check to see if it's useful to convert only a subset of the file's columns 841 * to binary. If so, construct a list of the column names to be converted, 842 * return that at *columns, and return true. (Note that it's possible to 843 * determine that no columns need be converted, for instance with a COUNT(*) 844 * query. So we can't use returning a NIL list to indicate failure.) 845 */ 846 static bool 847 check_selective_binary_conversion(RelOptInfo *baserel, 848 Oid foreigntableid, 849 List **columns) 850 { 851 ForeignTable *table; 852 ListCell *lc; 853 Relation rel; 854 TupleDesc tupleDesc; 855 AttrNumber attnum; 856 Bitmapset *attrs_used = NULL; 857 bool has_wholerow = false; 858 int numattrs; 859 int i; 860 861 *columns = NIL; /* default result */ 862 863 /* 864 * Check format of the file. If binary format, this is irrelevant. 865 */ 866 table = GetForeignTable(foreigntableid); 867 foreach(lc, table->options) 868 { 869 DefElem *def = (DefElem *) lfirst(lc); 870 871 if (strcmp(def->defname, "format") == 0) 872 { 873 char *format = defGetString(def); 874 875 if (strcmp(format, "binary") == 0) 876 return false; 877 break; 878 } 879 } 880 881 /* Collect all the attributes needed for joins or final output. */ 882 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, 883 &attrs_used); 884 885 /* Add all the attributes used by restriction clauses. */ 886 foreach(lc, baserel->baserestrictinfo) 887 { 888 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); 889 890 pull_varattnos((Node *) rinfo->clause, baserel->relid, 891 &attrs_used); 892 } 893 894 /* Convert attribute numbers to column names. */ 895 rel = table_open(foreigntableid, AccessShareLock); 896 tupleDesc = RelationGetDescr(rel); 897 898 while ((attnum = bms_first_member(attrs_used)) >= 0) 899 { 900 /* Adjust for system attributes. */ 901 attnum += FirstLowInvalidHeapAttributeNumber; 902 903 if (attnum == 0) 904 { 905 has_wholerow = true; 906 break; 907 } 908 909 /* Ignore system attributes. */ 910 if (attnum < 0) 911 continue; 912 913 /* Get user attributes. */ 914 if (attnum > 0) 915 { 916 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); 917 char *attname = NameStr(attr->attname); 918 919 /* Skip dropped attributes (probably shouldn't see any here). */ 920 if (attr->attisdropped) 921 continue; 922 923 /* 924 * Skip generated columns (COPY won't accept them in the column 925 * list) 926 */ 927 if (attr->attgenerated) 928 continue; 929 *columns = lappend(*columns, makeString(pstrdup(attname))); 930 } 931 } 932 933 /* Count non-dropped user attributes while we have the tupdesc. */ 934 numattrs = 0; 935 for (i = 0; i < tupleDesc->natts; i++) 936 { 937 Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); 938 939 if (attr->attisdropped) 940 continue; 941 numattrs++; 942 } 943 944 table_close(rel, AccessShareLock); 945 946 /* If there's a whole-row reference, fail: we need all the columns. */ 947 if (has_wholerow) 948 { 949 *columns = NIL; 950 return false; 951 } 952 953 /* If all the user attributes are needed, fail. */ 954 if (numattrs == list_length(*columns)) 955 { 956 *columns = NIL; 957 return false; 958 } 959 960 return true; 961 } 962 963 /* 964 * Estimate size of a foreign table. 965 * 966 * The main result is returned in baserel->rows. We also set 967 * fdw_private->pages and fdw_private->ntuples for later use in the cost 968 * calculation. 969 */ 970 static void 971 estimate_size(PlannerInfo *root, RelOptInfo *baserel, 972 FileFdwPlanState *fdw_private) 973 { 974 struct stat stat_buf; 975 BlockNumber pages; 976 double ntuples; 977 double nrows; 978 979 /* 980 * Get size of the file. It might not be there at plan time, though, in 981 * which case we have to use a default estimate. We also have to fall 982 * back to the default if using a program as the input. 983 */ 984 if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0) 985 stat_buf.st_size = 10 * BLCKSZ; 986 987 /* 988 * Convert size to pages for use in I/O cost estimate later. 989 */ 990 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; 991 if (pages < 1) 992 pages = 1; 993 fdw_private->pages = pages; 994 995 /* 996 * Estimate the number of tuples in the file. 997 */ 998 if (baserel->tuples >= 0 && baserel->pages > 0) 999 { 1000 /* 1001 * We have # of pages and # of tuples from pg_class (that is, from a 1002 * previous ANALYZE), so compute a tuples-per-page estimate and scale 1003 * that by the current file size. 1004 */ 1005 double density; 1006 1007 density = baserel->tuples / (double) baserel->pages; 1008 ntuples = clamp_row_est(density * (double) pages); 1009 } 1010 else 1011 { 1012 /* 1013 * Otherwise we have to fake it. We back into this estimate using the 1014 * planner's idea of the relation width; which is bogus if not all 1015 * columns are being read, not to mention that the text representation 1016 * of a row probably isn't the same size as its internal 1017 * representation. Possibly we could do something better, but the 1018 * real answer to anyone who complains is "ANALYZE" ... 1019 */ 1020 int tuple_width; 1021 1022 tuple_width = MAXALIGN(baserel->reltarget->width) + 1023 MAXALIGN(SizeofHeapTupleHeader); 1024 ntuples = clamp_row_est((double) stat_buf.st_size / 1025 (double) tuple_width); 1026 } 1027 fdw_private->ntuples = ntuples; 1028 1029 /* 1030 * Now estimate the number of rows returned by the scan after applying the 1031 * baserestrictinfo quals. 1032 */ 1033 nrows = ntuples * 1034 clauselist_selectivity(root, 1035 baserel->baserestrictinfo, 1036 0, 1037 JOIN_INNER, 1038 NULL); 1039 1040 nrows = clamp_row_est(nrows); 1041 1042 /* Save the output-rows estimate for the planner */ 1043 baserel->rows = nrows; 1044 } 1045 1046 /* 1047 * Estimate costs of scanning a foreign table. 1048 * 1049 * Results are returned in *startup_cost and *total_cost. 1050 */ 1051 static void 1052 estimate_costs(PlannerInfo *root, RelOptInfo *baserel, 1053 FileFdwPlanState *fdw_private, 1054 Cost *startup_cost, Cost *total_cost) 1055 { 1056 BlockNumber pages = fdw_private->pages; 1057 double ntuples = fdw_private->ntuples; 1058 Cost run_cost = 0; 1059 Cost cpu_per_tuple; 1060 1061 /* 1062 * We estimate costs almost the same way as cost_seqscan(), thus assuming 1063 * that I/O costs are equivalent to a regular table file of the same size. 1064 * However, we take per-tuple CPU costs as 10x of a seqscan, to account 1065 * for the cost of parsing records. 1066 * 1067 * In the case of a program source, this calculation is even more divorced 1068 * from reality, but we have no good alternative; and it's not clear that 1069 * the numbers we produce here matter much anyway, since there's only one 1070 * access path for the rel. 1071 */ 1072 run_cost += seq_page_cost * pages; 1073 1074 *startup_cost = baserel->baserestrictcost.startup; 1075 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple; 1076 run_cost += cpu_per_tuple * ntuples; 1077 *total_cost = *startup_cost + run_cost; 1078 } 1079 1080 /* 1081 * file_acquire_sample_rows -- acquire a random sample of rows from the table 1082 * 1083 * Selected rows are returned in the caller-allocated array rows[], 1084 * which must have at least targrows entries. 1085 * The actual number of rows selected is returned as the function result. 1086 * We also count the total number of rows in the file and return it into 1087 * *totalrows. Note that *totaldeadrows is always set to 0. 1088 * 1089 * Note that the returned list of rows is not always in order by physical 1090 * position in the file. Therefore, correlation estimates derived later 1091 * may be meaningless, but it's OK because we don't use the estimates 1092 * currently (the planner only pays attention to correlation for indexscans). 1093 */ 1094 static int 1095 file_acquire_sample_rows(Relation onerel, int elevel, 1096 HeapTuple *rows, int targrows, 1097 double *totalrows, double *totaldeadrows) 1098 { 1099 int numrows = 0; 1100 double rowstoskip = -1; /* -1 means not set yet */ 1101 ReservoirStateData rstate; 1102 TupleDesc tupDesc; 1103 Datum *values; 1104 bool *nulls; 1105 bool found; 1106 char *filename; 1107 bool is_program; 1108 List *options; 1109 CopyFromState cstate; 1110 ErrorContextCallback errcallback; 1111 MemoryContext oldcontext = CurrentMemoryContext; 1112 MemoryContext tupcontext; 1113 1114 Assert(onerel); 1115 Assert(targrows > 0); 1116 1117 tupDesc = RelationGetDescr(onerel); 1118 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); 1119 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); 1120 1121 /* Fetch options of foreign table */ 1122 fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options); 1123 1124 /* 1125 * Create CopyState from FDW options. 1126 */ 1127 cstate = BeginCopyFrom(NULL, onerel, NULL, filename, is_program, NULL, NIL, 1128 options); 1129 1130 /* 1131 * Use per-tuple memory context to prevent leak of memory used to read 1132 * rows from the file with Copy routines. 1133 */ 1134 tupcontext = AllocSetContextCreate(CurrentMemoryContext, 1135 "file_fdw temporary context", 1136 ALLOCSET_DEFAULT_SIZES); 1137 1138 /* Prepare for sampling rows */ 1139 reservoir_init_selection_state(&rstate, targrows); 1140 1141 /* Set up callback to identify error line number. */ 1142 errcallback.callback = CopyFromErrorCallback; 1143 errcallback.arg = (void *) cstate; 1144 errcallback.previous = error_context_stack; 1145 error_context_stack = &errcallback; 1146 1147 *totalrows = 0; 1148 *totaldeadrows = 0; 1149 for (;;) 1150 { 1151 /* Check for user-requested abort or sleep */ 1152 vacuum_delay_point(); 1153 1154 /* Fetch next row */ 1155 MemoryContextReset(tupcontext); 1156 MemoryContextSwitchTo(tupcontext); 1157 1158 found = NextCopyFrom(cstate, NULL, values, nulls); 1159 1160 MemoryContextSwitchTo(oldcontext); 1161 1162 if (!found) 1163 break; 1164 1165 /* 1166 * The first targrows sample rows are simply copied into the 1167 * reservoir. Then we start replacing tuples in the sample until we 1168 * reach the end of the relation. This algorithm is from Jeff Vitter's 1169 * paper (see more info in commands/analyze.c). 1170 */ 1171 if (numrows < targrows) 1172 { 1173 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls); 1174 } 1175 else 1176 { 1177 /* 1178 * t in Vitter's paper is the number of records already processed. 1179 * If we need to compute a new S value, we must use the 1180 * not-yet-incremented value of totalrows as t. 1181 */ 1182 if (rowstoskip < 0) 1183 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows); 1184 1185 if (rowstoskip <= 0) 1186 { 1187 /* 1188 * Found a suitable tuple, so save it, replacing one old tuple 1189 * at random 1190 */ 1191 int k = (int) (targrows * sampler_random_fract(rstate.randstate)); 1192 1193 Assert(k >= 0 && k < targrows); 1194 heap_freetuple(rows[k]); 1195 rows[k] = heap_form_tuple(tupDesc, values, nulls); 1196 } 1197 1198 rowstoskip -= 1; 1199 } 1200 1201 *totalrows += 1; 1202 } 1203 1204 /* Remove error callback. */ 1205 error_context_stack = errcallback.previous; 1206 1207 /* Clean up. */ 1208 MemoryContextDelete(tupcontext); 1209 1210 EndCopyFrom(cstate); 1211 1212 pfree(values); 1213 pfree(nulls); 1214 1215 /* 1216 * Emit some interesting relation info 1217 */ 1218 ereport(elevel, 1219 (errmsg("\"%s\": file contains %.0f rows; " 1220 "%d rows in sample", 1221 RelationGetRelationName(onerel), 1222 *totalrows, numrows))); 1223 1224 return numrows; 1225 } 1226