1 /*------------------------------------------------------------------------- 2 * 3 * file_fdw.c 4 * foreign-data wrapper for server-side flat files (or programs). 5 * 6 * Copyright (c) 2010-2019, PostgreSQL Global Development Group 7 * 8 * IDENTIFICATION 9 * contrib/file_fdw/file_fdw.c 10 * 11 *------------------------------------------------------------------------- 12 */ 13 #include "postgres.h" 14 15 #include <sys/stat.h> 16 #include <unistd.h> 17 18 #include "access/htup_details.h" 19 #include "access/reloptions.h" 20 #include "access/sysattr.h" 21 #include "access/table.h" 22 #include "catalog/pg_authid.h" 23 #include "catalog/pg_foreign_table.h" 24 #include "commands/copy.h" 25 #include "commands/defrem.h" 26 #include "commands/explain.h" 27 #include "commands/vacuum.h" 28 #include "foreign/fdwapi.h" 29 #include "foreign/foreign.h" 30 #include "miscadmin.h" 31 #include "nodes/makefuncs.h" 32 #include "optimizer/optimizer.h" 33 #include "optimizer/pathnode.h" 34 #include "optimizer/planmain.h" 35 #include "optimizer/restrictinfo.h" 36 #include "utils/memutils.h" 37 #include "utils/rel.h" 38 #include "utils/sampling.h" 39 40 PG_MODULE_MAGIC; 41 42 /* 43 * Describes the valid options for objects that use this wrapper. 44 */ 45 struct FileFdwOption 46 { 47 const char *optname; 48 Oid optcontext; /* Oid of catalog in which option may appear */ 49 }; 50 51 /* 52 * Valid options for file_fdw. 53 * These options are based on the options for the COPY FROM command. 54 * But note that force_not_null and force_null are handled as boolean options 55 * attached to a column, not as table options. 56 * 57 * Note: If you are adding new option for user mapping, you need to modify 58 * fileGetOptions(), which currently doesn't bother to look at user mappings. 59 */ 60 static const struct FileFdwOption valid_options[] = { 61 /* Data source options */ 62 {"filename", ForeignTableRelationId}, 63 {"program", ForeignTableRelationId}, 64 65 /* Format options */ 66 /* oids option is not supported */ 67 {"format", ForeignTableRelationId}, 68 {"header", ForeignTableRelationId}, 69 {"delimiter", ForeignTableRelationId}, 70 {"quote", ForeignTableRelationId}, 71 {"escape", ForeignTableRelationId}, 72 {"null", ForeignTableRelationId}, 73 {"encoding", ForeignTableRelationId}, 74 {"force_not_null", AttributeRelationId}, 75 {"force_null", AttributeRelationId}, 76 77 /* 78 * force_quote is not supported by file_fdw because it's for COPY TO. 79 */ 80 81 /* Sentinel */ 82 {NULL, InvalidOid} 83 }; 84 85 /* 86 * FDW-specific information for RelOptInfo.fdw_private. 87 */ 88 typedef struct FileFdwPlanState 89 { 90 char *filename; /* file or program to read from */ 91 bool is_program; /* true if filename represents an OS command */ 92 List *options; /* merged COPY options, excluding filename and 93 * is_program */ 94 BlockNumber pages; /* estimate of file's physical size */ 95 double ntuples; /* estimate of number of data rows */ 96 } FileFdwPlanState; 97 98 /* 99 * FDW-specific information for ForeignScanState.fdw_state. 100 */ 101 typedef struct FileFdwExecutionState 102 { 103 char *filename; /* file or program to read from */ 104 bool is_program; /* true if filename represents an OS command */ 105 List *options; /* merged COPY options, excluding filename and 106 * is_program */ 107 CopyState cstate; /* COPY execution state */ 108 } FileFdwExecutionState; 109 110 /* 111 * SQL functions 112 */ 113 PG_FUNCTION_INFO_V1(file_fdw_handler); 114 PG_FUNCTION_INFO_V1(file_fdw_validator); 115 116 /* 117 * FDW callback routines 118 */ 119 static void fileGetForeignRelSize(PlannerInfo *root, 120 RelOptInfo *baserel, 121 Oid foreigntableid); 122 static void fileGetForeignPaths(PlannerInfo *root, 123 RelOptInfo *baserel, 124 Oid foreigntableid); 125 static ForeignScan *fileGetForeignPlan(PlannerInfo *root, 126 RelOptInfo *baserel, 127 Oid foreigntableid, 128 ForeignPath *best_path, 129 List *tlist, 130 List *scan_clauses, 131 Plan *outer_plan); 132 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es); 133 static void fileBeginForeignScan(ForeignScanState *node, int eflags); 134 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node); 135 static void fileReScanForeignScan(ForeignScanState *node); 136 static void fileEndForeignScan(ForeignScanState *node); 137 static bool fileAnalyzeForeignTable(Relation relation, 138 AcquireSampleRowsFunc *func, 139 BlockNumber *totalpages); 140 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, 141 RangeTblEntry *rte); 142 143 /* 144 * Helper functions 145 */ 146 static bool is_valid_option(const char *option, Oid context); 147 static void fileGetOptions(Oid foreigntableid, 148 char **filename, 149 bool *is_program, 150 List **other_options); 151 static List *get_file_fdw_attribute_options(Oid relid); 152 static bool check_selective_binary_conversion(RelOptInfo *baserel, 153 Oid foreigntableid, 154 List **columns); 155 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel, 156 FileFdwPlanState *fdw_private); 157 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, 158 FileFdwPlanState *fdw_private, 159 Cost *startup_cost, Cost *total_cost); 160 static int file_acquire_sample_rows(Relation onerel, int elevel, 161 HeapTuple *rows, int targrows, 162 double *totalrows, double *totaldeadrows); 163 164 165 /* 166 * Foreign-data wrapper handler function: return a struct with pointers 167 * to my callback routines. 168 */ 169 Datum 170 file_fdw_handler(PG_FUNCTION_ARGS) 171 { 172 FdwRoutine *fdwroutine = makeNode(FdwRoutine); 173 174 fdwroutine->GetForeignRelSize = fileGetForeignRelSize; 175 fdwroutine->GetForeignPaths = fileGetForeignPaths; 176 fdwroutine->GetForeignPlan = fileGetForeignPlan; 177 fdwroutine->ExplainForeignScan = fileExplainForeignScan; 178 fdwroutine->BeginForeignScan = fileBeginForeignScan; 179 fdwroutine->IterateForeignScan = fileIterateForeignScan; 180 fdwroutine->ReScanForeignScan = fileReScanForeignScan; 181 fdwroutine->EndForeignScan = fileEndForeignScan; 182 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable; 183 fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe; 184 185 PG_RETURN_POINTER(fdwroutine); 186 } 187 188 /* 189 * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, 190 * USER MAPPING or FOREIGN TABLE that uses file_fdw. 191 * 192 * Raise an ERROR if the option or its value is considered invalid. 193 */ 194 Datum 195 file_fdw_validator(PG_FUNCTION_ARGS) 196 { 197 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); 198 Oid catalog = PG_GETARG_OID(1); 199 char *filename = NULL; 200 DefElem *force_not_null = NULL; 201 DefElem *force_null = NULL; 202 List *other_options = NIL; 203 ListCell *cell; 204 205 /* 206 * Check that only options supported by file_fdw, and allowed for the 207 * current object type, are given. 208 */ 209 foreach(cell, options_list) 210 { 211 DefElem *def = (DefElem *) lfirst(cell); 212 213 if (!is_valid_option(def->defname, catalog)) 214 { 215 const struct FileFdwOption *opt; 216 StringInfoData buf; 217 218 /* 219 * Unknown option specified, complain about it. Provide a hint 220 * with list of valid options for the object. 221 */ 222 initStringInfo(&buf); 223 for (opt = valid_options; opt->optname; opt++) 224 { 225 if (catalog == opt->optcontext) 226 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", 227 opt->optname); 228 } 229 230 ereport(ERROR, 231 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), 232 errmsg("invalid option \"%s\"", def->defname), 233 buf.len > 0 234 ? errhint("Valid options in this context are: %s", 235 buf.data) 236 : errhint("There are no valid options in this context."))); 237 } 238 239 /* 240 * Separate out filename, program, and column-specific options, since 241 * ProcessCopyOptions won't accept them. 242 */ 243 if (strcmp(def->defname, "filename") == 0 || 244 strcmp(def->defname, "program") == 0) 245 { 246 if (filename) 247 ereport(ERROR, 248 (errcode(ERRCODE_SYNTAX_ERROR), 249 errmsg("conflicting or redundant options"))); 250 251 /* 252 * Check permissions for changing which file or program is used by 253 * the file_fdw. 254 * 255 * Only members of the role 'pg_read_server_files' are allowed to 256 * set the 'filename' option of a file_fdw foreign table, while 257 * only members of the role 'pg_execute_server_program' are 258 * allowed to set the 'program' option. This is because we don't 259 * want regular users to be able to control which file gets read 260 * or which program gets executed. 261 * 262 * Putting this sort of permissions check in a validator is a bit 263 * of a crock, but there doesn't seem to be any other place that 264 * can enforce the check more cleanly. 265 * 266 * Note that the valid_options[] array disallows setting filename 267 * and program at any options level other than foreign table --- 268 * otherwise there'd still be a security hole. 269 */ 270 if (strcmp(def->defname, "filename") == 0 && 271 !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES)) 272 ereport(ERROR, 273 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 274 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table"))); 275 276 if (strcmp(def->defname, "program") == 0 && 277 !is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM)) 278 ereport(ERROR, 279 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 280 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table"))); 281 282 filename = defGetString(def); 283 } 284 285 /* 286 * force_not_null is a boolean option; after validation we can discard 287 * it - it will be retrieved later in get_file_fdw_attribute_options() 288 */ 289 else if (strcmp(def->defname, "force_not_null") == 0) 290 { 291 if (force_not_null) 292 ereport(ERROR, 293 (errcode(ERRCODE_SYNTAX_ERROR), 294 errmsg("conflicting or redundant options"), 295 errhint("Option \"force_not_null\" supplied more than once for a column."))); 296 force_not_null = def; 297 /* Don't care what the value is, as long as it's a legal boolean */ 298 (void) defGetBoolean(def); 299 } 300 /* See comments for force_not_null above */ 301 else if (strcmp(def->defname, "force_null") == 0) 302 { 303 if (force_null) 304 ereport(ERROR, 305 (errcode(ERRCODE_SYNTAX_ERROR), 306 errmsg("conflicting or redundant options"), 307 errhint("Option \"force_null\" supplied more than once for a column."))); 308 force_null = def; 309 (void) defGetBoolean(def); 310 } 311 else 312 other_options = lappend(other_options, def); 313 } 314 315 /* 316 * Now apply the core COPY code's validation logic for more checks. 317 */ 318 ProcessCopyOptions(NULL, NULL, true, other_options); 319 320 /* 321 * Either filename or program option is required for file_fdw foreign 322 * tables. 323 */ 324 if (catalog == ForeignTableRelationId && filename == NULL) 325 ereport(ERROR, 326 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), 327 errmsg("either filename or program is required for file_fdw foreign tables"))); 328 329 PG_RETURN_VOID(); 330 } 331 332 /* 333 * Check if the provided option is one of the valid options. 334 * context is the Oid of the catalog holding the object the option is for. 335 */ 336 static bool 337 is_valid_option(const char *option, Oid context) 338 { 339 const struct FileFdwOption *opt; 340 341 for (opt = valid_options; opt->optname; opt++) 342 { 343 if (context == opt->optcontext && strcmp(opt->optname, option) == 0) 344 return true; 345 } 346 return false; 347 } 348 349 /* 350 * Fetch the options for a file_fdw foreign table. 351 * 352 * We have to separate out filename/program from the other options because 353 * those must not appear in the options list passed to the core COPY code. 354 */ 355 static void 356 fileGetOptions(Oid foreigntableid, 357 char **filename, bool *is_program, List **other_options) 358 { 359 ForeignTable *table; 360 ForeignServer *server; 361 ForeignDataWrapper *wrapper; 362 List *options; 363 ListCell *lc, 364 *prev; 365 366 /* 367 * Extract options from FDW objects. We ignore user mappings because 368 * file_fdw doesn't have any options that can be specified there. 369 * 370 * (XXX Actually, given the current contents of valid_options[], there's 371 * no point in examining anything except the foreign table's own options. 372 * Simplify?) 373 */ 374 table = GetForeignTable(foreigntableid); 375 server = GetForeignServer(table->serverid); 376 wrapper = GetForeignDataWrapper(server->fdwid); 377 378 options = NIL; 379 options = list_concat(options, wrapper->options); 380 options = list_concat(options, server->options); 381 options = list_concat(options, table->options); 382 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid)); 383 384 /* 385 * Separate out the filename or program option (we assume there is only 386 * one). 387 */ 388 *filename = NULL; 389 *is_program = false; 390 prev = NULL; 391 foreach(lc, options) 392 { 393 DefElem *def = (DefElem *) lfirst(lc); 394 395 if (strcmp(def->defname, "filename") == 0) 396 { 397 *filename = defGetString(def); 398 options = list_delete_cell(options, lc, prev); 399 break; 400 } 401 else if (strcmp(def->defname, "program") == 0) 402 { 403 *filename = defGetString(def); 404 *is_program = true; 405 options = list_delete_cell(options, lc, prev); 406 break; 407 } 408 prev = lc; 409 } 410 411 /* 412 * The validator should have checked that filename or program was included 413 * in the options, but check again, just in case. 414 */ 415 if (*filename == NULL) 416 elog(ERROR, "either filename or program is required for file_fdw foreign tables"); 417 418 *other_options = options; 419 } 420 421 /* 422 * Retrieve per-column generic options from pg_attribute and construct a list 423 * of DefElems representing them. 424 * 425 * At the moment we only have "force_not_null", and "force_null", 426 * which should each be combined into a single DefElem listing all such 427 * columns, since that's what COPY expects. 428 */ 429 static List * 430 get_file_fdw_attribute_options(Oid relid) 431 { 432 Relation rel; 433 TupleDesc tupleDesc; 434 AttrNumber natts; 435 AttrNumber attnum; 436 List *fnncolumns = NIL; 437 List *fncolumns = NIL; 438 439 List *options = NIL; 440 441 rel = table_open(relid, AccessShareLock); 442 tupleDesc = RelationGetDescr(rel); 443 natts = tupleDesc->natts; 444 445 /* Retrieve FDW options for all user-defined attributes. */ 446 for (attnum = 1; attnum <= natts; attnum++) 447 { 448 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); 449 List *options; 450 ListCell *lc; 451 452 /* Skip dropped attributes. */ 453 if (attr->attisdropped) 454 continue; 455 456 options = GetForeignColumnOptions(relid, attnum); 457 foreach(lc, options) 458 { 459 DefElem *def = (DefElem *) lfirst(lc); 460 461 if (strcmp(def->defname, "force_not_null") == 0) 462 { 463 if (defGetBoolean(def)) 464 { 465 char *attname = pstrdup(NameStr(attr->attname)); 466 467 fnncolumns = lappend(fnncolumns, makeString(attname)); 468 } 469 } 470 else if (strcmp(def->defname, "force_null") == 0) 471 { 472 if (defGetBoolean(def)) 473 { 474 char *attname = pstrdup(NameStr(attr->attname)); 475 476 fncolumns = lappend(fncolumns, makeString(attname)); 477 } 478 } 479 /* maybe in future handle other options here */ 480 } 481 } 482 483 table_close(rel, AccessShareLock); 484 485 /* 486 * Return DefElem only when some column(s) have force_not_null / 487 * force_null options set 488 */ 489 if (fnncolumns != NIL) 490 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1)); 491 492 if (fncolumns != NIL) 493 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1)); 494 495 return options; 496 } 497 498 /* 499 * fileGetForeignRelSize 500 * Obtain relation size estimates for a foreign table 501 */ 502 static void 503 fileGetForeignRelSize(PlannerInfo *root, 504 RelOptInfo *baserel, 505 Oid foreigntableid) 506 { 507 FileFdwPlanState *fdw_private; 508 509 /* 510 * Fetch options. We only need filename (or program) at this point, but 511 * we might as well get everything and not need to re-fetch it later in 512 * planning. 513 */ 514 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState)); 515 fileGetOptions(foreigntableid, 516 &fdw_private->filename, 517 &fdw_private->is_program, 518 &fdw_private->options); 519 baserel->fdw_private = (void *) fdw_private; 520 521 /* Estimate relation size */ 522 estimate_size(root, baserel, fdw_private); 523 } 524 525 /* 526 * fileGetForeignPaths 527 * Create possible access paths for a scan on the foreign table 528 * 529 * Currently we don't support any push-down feature, so there is only one 530 * possible access path, which simply returns all records in the order in 531 * the data file. 532 */ 533 static void 534 fileGetForeignPaths(PlannerInfo *root, 535 RelOptInfo *baserel, 536 Oid foreigntableid) 537 { 538 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private; 539 Cost startup_cost; 540 Cost total_cost; 541 List *columns; 542 List *coptions = NIL; 543 544 /* Decide whether to selectively perform binary conversion */ 545 if (check_selective_binary_conversion(baserel, 546 foreigntableid, 547 &columns)) 548 coptions = list_make1(makeDefElem("convert_selectively", 549 (Node *) columns, -1)); 550 551 /* Estimate costs */ 552 estimate_costs(root, baserel, fdw_private, 553 &startup_cost, &total_cost); 554 555 /* 556 * Create a ForeignPath node and add it as only possible path. We use the 557 * fdw_private list of the path to carry the convert_selectively option; 558 * it will be propagated into the fdw_private list of the Plan node. 559 * 560 * We don't support pushing join clauses into the quals of this path, but 561 * it could still have required parameterization due to LATERAL refs in 562 * its tlist. 563 */ 564 add_path(baserel, (Path *) 565 create_foreignscan_path(root, baserel, 566 NULL, /* default pathtarget */ 567 baserel->rows, 568 startup_cost, 569 total_cost, 570 NIL, /* no pathkeys */ 571 baserel->lateral_relids, 572 NULL, /* no extra plan */ 573 coptions)); 574 575 /* 576 * If data file was sorted, and we knew it somehow, we could insert 577 * appropriate pathkeys into the ForeignPath node to tell the planner 578 * that. 579 */ 580 } 581 582 /* 583 * fileGetForeignPlan 584 * Create a ForeignScan plan node for scanning the foreign table 585 */ 586 static ForeignScan * 587 fileGetForeignPlan(PlannerInfo *root, 588 RelOptInfo *baserel, 589 Oid foreigntableid, 590 ForeignPath *best_path, 591 List *tlist, 592 List *scan_clauses, 593 Plan *outer_plan) 594 { 595 Index scan_relid = baserel->relid; 596 597 /* 598 * We have no native ability to evaluate restriction clauses, so we just 599 * put all the scan_clauses into the plan node's qual list for the 600 * executor to check. So all we have to do here is strip RestrictInfo 601 * nodes from the clauses and ignore pseudoconstants (which will be 602 * handled elsewhere). 603 */ 604 scan_clauses = extract_actual_clauses(scan_clauses, false); 605 606 /* Create the ForeignScan node */ 607 return make_foreignscan(tlist, 608 scan_clauses, 609 scan_relid, 610 NIL, /* no expressions to evaluate */ 611 best_path->fdw_private, 612 NIL, /* no custom tlist */ 613 NIL, /* no remote quals */ 614 outer_plan); 615 } 616 617 /* 618 * fileExplainForeignScan 619 * Produce extra output for EXPLAIN 620 */ 621 static void 622 fileExplainForeignScan(ForeignScanState *node, ExplainState *es) 623 { 624 char *filename; 625 bool is_program; 626 List *options; 627 628 /* Fetch options --- we only need filename and is_program at this point */ 629 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), 630 &filename, &is_program, &options); 631 632 if (is_program) 633 ExplainPropertyText("Foreign Program", filename, es); 634 else 635 ExplainPropertyText("Foreign File", filename, es); 636 637 /* Suppress file size if we're not showing cost details */ 638 if (es->costs) 639 { 640 struct stat stat_buf; 641 642 if (!is_program && 643 stat(filename, &stat_buf) == 0) 644 ExplainPropertyInteger("Foreign File Size", "b", 645 (int64) stat_buf.st_size, es); 646 } 647 } 648 649 /* 650 * fileBeginForeignScan 651 * Initiate access to the file by creating CopyState 652 */ 653 static void 654 fileBeginForeignScan(ForeignScanState *node, int eflags) 655 { 656 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; 657 char *filename; 658 bool is_program; 659 List *options; 660 CopyState cstate; 661 FileFdwExecutionState *festate; 662 663 /* 664 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. 665 */ 666 if (eflags & EXEC_FLAG_EXPLAIN_ONLY) 667 return; 668 669 /* Fetch options of foreign table */ 670 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), 671 &filename, &is_program, &options); 672 673 /* Add any options from the plan (currently only convert_selectively) */ 674 options = list_concat(options, plan->fdw_private); 675 676 /* 677 * Create CopyState from FDW options. We always acquire all columns, so 678 * as to match the expected ScanTupleSlot signature. 679 */ 680 cstate = BeginCopyFrom(NULL, 681 node->ss.ss_currentRelation, 682 filename, 683 is_program, 684 NULL, 685 NIL, 686 options); 687 688 /* 689 * Save state in node->fdw_state. We must save enough information to call 690 * BeginCopyFrom() again. 691 */ 692 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState)); 693 festate->filename = filename; 694 festate->is_program = is_program; 695 festate->options = options; 696 festate->cstate = cstate; 697 698 node->fdw_state = (void *) festate; 699 } 700 701 /* 702 * fileIterateForeignScan 703 * Read next record from the data file and store it into the 704 * ScanTupleSlot as a virtual tuple 705 */ 706 static TupleTableSlot * 707 fileIterateForeignScan(ForeignScanState *node) 708 { 709 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 710 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; 711 bool found; 712 ErrorContextCallback errcallback; 713 714 /* Set up callback to identify error line number. */ 715 errcallback.callback = CopyFromErrorCallback; 716 errcallback.arg = (void *) festate->cstate; 717 errcallback.previous = error_context_stack; 718 error_context_stack = &errcallback; 719 720 /* 721 * The protocol for loading a virtual tuple into a slot is first 722 * ExecClearTuple, then fill the values/isnull arrays, then 723 * ExecStoreVirtualTuple. If we don't find another row in the file, we 724 * just skip the last step, leaving the slot empty as required. 725 * 726 * We can pass ExprContext = NULL because we read all columns from the 727 * file, so no need to evaluate default expressions. 728 */ 729 ExecClearTuple(slot); 730 found = NextCopyFrom(festate->cstate, NULL, 731 slot->tts_values, slot->tts_isnull); 732 if (found) 733 ExecStoreVirtualTuple(slot); 734 735 /* Remove error callback. */ 736 error_context_stack = errcallback.previous; 737 738 return slot; 739 } 740 741 /* 742 * fileReScanForeignScan 743 * Rescan table, possibly with new parameters 744 */ 745 static void 746 fileReScanForeignScan(ForeignScanState *node) 747 { 748 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 749 750 EndCopyFrom(festate->cstate); 751 752 festate->cstate = BeginCopyFrom(NULL, 753 node->ss.ss_currentRelation, 754 festate->filename, 755 festate->is_program, 756 NULL, 757 NIL, 758 festate->options); 759 } 760 761 /* 762 * fileEndForeignScan 763 * Finish scanning foreign table and dispose objects used for this scan 764 */ 765 static void 766 fileEndForeignScan(ForeignScanState *node) 767 { 768 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 769 770 /* if festate is NULL, we are in EXPLAIN; nothing to do */ 771 if (festate) 772 EndCopyFrom(festate->cstate); 773 } 774 775 /* 776 * fileAnalyzeForeignTable 777 * Test whether analyzing this foreign table is supported 778 */ 779 static bool 780 fileAnalyzeForeignTable(Relation relation, 781 AcquireSampleRowsFunc *func, 782 BlockNumber *totalpages) 783 { 784 char *filename; 785 bool is_program; 786 List *options; 787 struct stat stat_buf; 788 789 /* Fetch options of foreign table */ 790 fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options); 791 792 /* 793 * If this is a program instead of a file, just return false to skip 794 * analyzing the table. We could run the program and collect stats on 795 * whatever it currently returns, but it seems likely that in such cases 796 * the output would be too volatile for the stats to be useful. Maybe 797 * there should be an option to enable doing this? 798 */ 799 if (is_program) 800 return false; 801 802 /* 803 * Get size of the file. (XXX if we fail here, would it be better to just 804 * return false to skip analyzing the table?) 805 */ 806 if (stat(filename, &stat_buf) < 0) 807 ereport(ERROR, 808 (errcode_for_file_access(), 809 errmsg("could not stat file \"%s\": %m", 810 filename))); 811 812 /* 813 * Convert size to pages. Must return at least 1 so that we can tell 814 * later on that pg_class.relpages is not default. 815 */ 816 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; 817 if (*totalpages < 1) 818 *totalpages = 1; 819 820 *func = file_acquire_sample_rows; 821 822 return true; 823 } 824 825 /* 826 * fileIsForeignScanParallelSafe 827 * Reading a file, or external program, in a parallel worker should work 828 * just the same as reading it in the leader, so mark scans safe. 829 */ 830 static bool 831 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, 832 RangeTblEntry *rte) 833 { 834 return true; 835 } 836 837 /* 838 * check_selective_binary_conversion 839 * 840 * Check to see if it's useful to convert only a subset of the file's columns 841 * to binary. If so, construct a list of the column names to be converted, 842 * return that at *columns, and return true. (Note that it's possible to 843 * determine that no columns need be converted, for instance with a COUNT(*) 844 * query. So we can't use returning a NIL list to indicate failure.) 845 */ 846 static bool 847 check_selective_binary_conversion(RelOptInfo *baserel, 848 Oid foreigntableid, 849 List **columns) 850 { 851 ForeignTable *table; 852 ListCell *lc; 853 Relation rel; 854 TupleDesc tupleDesc; 855 AttrNumber attnum; 856 Bitmapset *attrs_used = NULL; 857 bool has_wholerow = false; 858 int numattrs; 859 int i; 860 861 *columns = NIL; /* default result */ 862 863 /* 864 * Check format of the file. If binary format, this is irrelevant. 865 */ 866 table = GetForeignTable(foreigntableid); 867 foreach(lc, table->options) 868 { 869 DefElem *def = (DefElem *) lfirst(lc); 870 871 if (strcmp(def->defname, "format") == 0) 872 { 873 char *format = defGetString(def); 874 875 if (strcmp(format, "binary") == 0) 876 return false; 877 break; 878 } 879 } 880 881 /* Collect all the attributes needed for joins or final output. */ 882 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, 883 &attrs_used); 884 885 /* Add all the attributes used by restriction clauses. */ 886 foreach(lc, baserel->baserestrictinfo) 887 { 888 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); 889 890 pull_varattnos((Node *) rinfo->clause, baserel->relid, 891 &attrs_used); 892 } 893 894 /* Convert attribute numbers to column names. */ 895 rel = table_open(foreigntableid, AccessShareLock); 896 tupleDesc = RelationGetDescr(rel); 897 898 while ((attnum = bms_first_member(attrs_used)) >= 0) 899 { 900 /* Adjust for system attributes. */ 901 attnum += FirstLowInvalidHeapAttributeNumber; 902 903 if (attnum == 0) 904 { 905 has_wholerow = true; 906 break; 907 } 908 909 /* Ignore system attributes. */ 910 if (attnum < 0) 911 continue; 912 913 /* Get user attributes. */ 914 if (attnum > 0) 915 { 916 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); 917 char *attname = NameStr(attr->attname); 918 919 /* Skip dropped attributes (probably shouldn't see any here). */ 920 if (attr->attisdropped) 921 continue; 922 923 /* 924 * Skip generated columns (COPY won't accept them in the column 925 * list) 926 */ 927 if (attr->attgenerated) 928 continue; 929 *columns = lappend(*columns, makeString(pstrdup(attname))); 930 } 931 } 932 933 /* Count non-dropped user attributes while we have the tupdesc. */ 934 numattrs = 0; 935 for (i = 0; i < tupleDesc->natts; i++) 936 { 937 Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); 938 939 if (attr->attisdropped) 940 continue; 941 numattrs++; 942 } 943 944 table_close(rel, AccessShareLock); 945 946 /* If there's a whole-row reference, fail: we need all the columns. */ 947 if (has_wholerow) 948 { 949 *columns = NIL; 950 return false; 951 } 952 953 /* If all the user attributes are needed, fail. */ 954 if (numattrs == list_length(*columns)) 955 { 956 *columns = NIL; 957 return false; 958 } 959 960 return true; 961 } 962 963 /* 964 * Estimate size of a foreign table. 965 * 966 * The main result is returned in baserel->rows. We also set 967 * fdw_private->pages and fdw_private->ntuples for later use in the cost 968 * calculation. 969 */ 970 static void 971 estimate_size(PlannerInfo *root, RelOptInfo *baserel, 972 FileFdwPlanState *fdw_private) 973 { 974 struct stat stat_buf; 975 BlockNumber pages; 976 double ntuples; 977 double nrows; 978 979 /* 980 * Get size of the file. It might not be there at plan time, though, in 981 * which case we have to use a default estimate. We also have to fall 982 * back to the default if using a program as the input. 983 */ 984 if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0) 985 stat_buf.st_size = 10 * BLCKSZ; 986 987 /* 988 * Convert size to pages for use in I/O cost estimate later. 989 */ 990 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; 991 if (pages < 1) 992 pages = 1; 993 fdw_private->pages = pages; 994 995 /* 996 * Estimate the number of tuples in the file. 997 */ 998 if (baserel->pages > 0) 999 { 1000 /* 1001 * We have # of pages and # of tuples from pg_class (that is, from a 1002 * previous ANALYZE), so compute a tuples-per-page estimate and scale 1003 * that by the current file size. 1004 */ 1005 double density; 1006 1007 density = baserel->tuples / (double) baserel->pages; 1008 ntuples = clamp_row_est(density * (double) pages); 1009 } 1010 else 1011 { 1012 /* 1013 * Otherwise we have to fake it. We back into this estimate using the 1014 * planner's idea of the relation width; which is bogus if not all 1015 * columns are being read, not to mention that the text representation 1016 * of a row probably isn't the same size as its internal 1017 * representation. Possibly we could do something better, but the 1018 * real answer to anyone who complains is "ANALYZE" ... 1019 */ 1020 int tuple_width; 1021 1022 tuple_width = MAXALIGN(baserel->reltarget->width) + 1023 MAXALIGN(SizeofHeapTupleHeader); 1024 ntuples = clamp_row_est((double) stat_buf.st_size / 1025 (double) tuple_width); 1026 } 1027 fdw_private->ntuples = ntuples; 1028 1029 /* 1030 * Now estimate the number of rows returned by the scan after applying the 1031 * baserestrictinfo quals. 1032 */ 1033 nrows = ntuples * 1034 clauselist_selectivity(root, 1035 baserel->baserestrictinfo, 1036 0, 1037 JOIN_INNER, 1038 NULL); 1039 1040 nrows = clamp_row_est(nrows); 1041 1042 /* Save the output-rows estimate for the planner */ 1043 baserel->rows = nrows; 1044 } 1045 1046 /* 1047 * Estimate costs of scanning a foreign table. 1048 * 1049 * Results are returned in *startup_cost and *total_cost. 1050 */ 1051 static void 1052 estimate_costs(PlannerInfo *root, RelOptInfo *baserel, 1053 FileFdwPlanState *fdw_private, 1054 Cost *startup_cost, Cost *total_cost) 1055 { 1056 BlockNumber pages = fdw_private->pages; 1057 double ntuples = fdw_private->ntuples; 1058 Cost run_cost = 0; 1059 Cost cpu_per_tuple; 1060 1061 /* 1062 * We estimate costs almost the same way as cost_seqscan(), thus assuming 1063 * that I/O costs are equivalent to a regular table file of the same size. 1064 * However, we take per-tuple CPU costs as 10x of a seqscan, to account 1065 * for the cost of parsing records. 1066 * 1067 * In the case of a program source, this calculation is even more divorced 1068 * from reality, but we have no good alternative; and it's not clear that 1069 * the numbers we produce here matter much anyway, since there's only one 1070 * access path for the rel. 1071 */ 1072 run_cost += seq_page_cost * pages; 1073 1074 *startup_cost = baserel->baserestrictcost.startup; 1075 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple; 1076 run_cost += cpu_per_tuple * ntuples; 1077 *total_cost = *startup_cost + run_cost; 1078 } 1079 1080 /* 1081 * file_acquire_sample_rows -- acquire a random sample of rows from the table 1082 * 1083 * Selected rows are returned in the caller-allocated array rows[], 1084 * which must have at least targrows entries. 1085 * The actual number of rows selected is returned as the function result. 1086 * We also count the total number of rows in the file and return it into 1087 * *totalrows. Note that *totaldeadrows is always set to 0. 1088 * 1089 * Note that the returned list of rows is not always in order by physical 1090 * position in the file. Therefore, correlation estimates derived later 1091 * may be meaningless, but it's OK because we don't use the estimates 1092 * currently (the planner only pays attention to correlation for indexscans). 1093 */ 1094 static int 1095 file_acquire_sample_rows(Relation onerel, int elevel, 1096 HeapTuple *rows, int targrows, 1097 double *totalrows, double *totaldeadrows) 1098 { 1099 int numrows = 0; 1100 double rowstoskip = -1; /* -1 means not set yet */ 1101 ReservoirStateData rstate; 1102 TupleDesc tupDesc; 1103 Datum *values; 1104 bool *nulls; 1105 bool found; 1106 char *filename; 1107 bool is_program; 1108 List *options; 1109 CopyState cstate; 1110 ErrorContextCallback errcallback; 1111 MemoryContext oldcontext = CurrentMemoryContext; 1112 MemoryContext tupcontext; 1113 1114 Assert(onerel); 1115 Assert(targrows > 0); 1116 1117 tupDesc = RelationGetDescr(onerel); 1118 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); 1119 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); 1120 1121 /* Fetch options of foreign table */ 1122 fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options); 1123 1124 /* 1125 * Create CopyState from FDW options. 1126 */ 1127 cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL, 1128 options); 1129 1130 /* 1131 * Use per-tuple memory context to prevent leak of memory used to read 1132 * rows from the file with Copy routines. 1133 */ 1134 tupcontext = AllocSetContextCreate(CurrentMemoryContext, 1135 "file_fdw temporary context", 1136 ALLOCSET_DEFAULT_SIZES); 1137 1138 /* Prepare for sampling rows */ 1139 reservoir_init_selection_state(&rstate, targrows); 1140 1141 /* Set up callback to identify error line number. */ 1142 errcallback.callback = CopyFromErrorCallback; 1143 errcallback.arg = (void *) cstate; 1144 errcallback.previous = error_context_stack; 1145 error_context_stack = &errcallback; 1146 1147 *totalrows = 0; 1148 *totaldeadrows = 0; 1149 for (;;) 1150 { 1151 /* Check for user-requested abort or sleep */ 1152 vacuum_delay_point(); 1153 1154 /* Fetch next row */ 1155 MemoryContextReset(tupcontext); 1156 MemoryContextSwitchTo(tupcontext); 1157 1158 found = NextCopyFrom(cstate, NULL, values, nulls); 1159 1160 MemoryContextSwitchTo(oldcontext); 1161 1162 if (!found) 1163 break; 1164 1165 /* 1166 * The first targrows sample rows are simply copied into the 1167 * reservoir. Then we start replacing tuples in the sample until we 1168 * reach the end of the relation. This algorithm is from Jeff Vitter's 1169 * paper (see more info in commands/analyze.c). 1170 */ 1171 if (numrows < targrows) 1172 { 1173 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls); 1174 } 1175 else 1176 { 1177 /* 1178 * t in Vitter's paper is the number of records already processed. 1179 * If we need to compute a new S value, we must use the 1180 * not-yet-incremented value of totalrows as t. 1181 */ 1182 if (rowstoskip < 0) 1183 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows); 1184 1185 if (rowstoskip <= 0) 1186 { 1187 /* 1188 * Found a suitable tuple, so save it, replacing one old tuple 1189 * at random 1190 */ 1191 int k = (int) (targrows * sampler_random_fract(rstate.randstate)); 1192 1193 Assert(k >= 0 && k < targrows); 1194 heap_freetuple(rows[k]); 1195 rows[k] = heap_form_tuple(tupDesc, values, nulls); 1196 } 1197 1198 rowstoskip -= 1; 1199 } 1200 1201 *totalrows += 1; 1202 } 1203 1204 /* Remove error callback. */ 1205 error_context_stack = errcallback.previous; 1206 1207 /* Clean up. */ 1208 MemoryContextDelete(tupcontext); 1209 1210 EndCopyFrom(cstate); 1211 1212 pfree(values); 1213 pfree(nulls); 1214 1215 /* 1216 * Emit some interesting relation info 1217 */ 1218 ereport(elevel, 1219 (errmsg("\"%s\": file contains %.0f rows; " 1220 "%d rows in sample", 1221 RelationGetRelationName(onerel), 1222 *totalrows, numrows))); 1223 1224 return numrows; 1225 } 1226