1 /*------------------------------------------------------------------------- 2 * 3 * file_fdw.c 4 * foreign-data wrapper for server-side flat files (or programs). 5 * 6 * Copyright (c) 2010-2020, PostgreSQL Global Development Group 7 * 8 * IDENTIFICATION 9 * contrib/file_fdw/file_fdw.c 10 * 11 *------------------------------------------------------------------------- 12 */ 13 #include "postgres.h" 14 15 #include <sys/stat.h> 16 #include <unistd.h> 17 18 #include "access/htup_details.h" 19 #include "access/reloptions.h" 20 #include "access/sysattr.h" 21 #include "access/table.h" 22 #include "catalog/pg_authid.h" 23 #include "catalog/pg_foreign_table.h" 24 #include "commands/copy.h" 25 #include "commands/defrem.h" 26 #include "commands/explain.h" 27 #include "commands/vacuum.h" 28 #include "foreign/fdwapi.h" 29 #include "foreign/foreign.h" 30 #include "miscadmin.h" 31 #include "nodes/makefuncs.h" 32 #include "optimizer/optimizer.h" 33 #include "optimizer/pathnode.h" 34 #include "optimizer/planmain.h" 35 #include "optimizer/restrictinfo.h" 36 #include "utils/acl.h" 37 #include "utils/memutils.h" 38 #include "utils/rel.h" 39 #include "utils/sampling.h" 40 41 PG_MODULE_MAGIC; 42 43 /* 44 * Describes the valid options for objects that use this wrapper. 45 */ 46 struct FileFdwOption 47 { 48 const char *optname; 49 Oid optcontext; /* Oid of catalog in which option may appear */ 50 }; 51 52 /* 53 * Valid options for file_fdw. 54 * These options are based on the options for the COPY FROM command. 55 * But note that force_not_null and force_null are handled as boolean options 56 * attached to a column, not as table options. 57 * 58 * Note: If you are adding new option for user mapping, you need to modify 59 * fileGetOptions(), which currently doesn't bother to look at user mappings. 60 */ 61 static const struct FileFdwOption valid_options[] = { 62 /* Data source options */ 63 {"filename", ForeignTableRelationId}, 64 {"program", ForeignTableRelationId}, 65 66 /* Format options */ 67 /* oids option is not supported */ 68 {"format", ForeignTableRelationId}, 69 {"header", ForeignTableRelationId}, 70 {"delimiter", ForeignTableRelationId}, 71 {"quote", ForeignTableRelationId}, 72 {"escape", ForeignTableRelationId}, 73 {"null", ForeignTableRelationId}, 74 {"encoding", ForeignTableRelationId}, 75 {"force_not_null", AttributeRelationId}, 76 {"force_null", AttributeRelationId}, 77 78 /* 79 * force_quote is not supported by file_fdw because it's for COPY TO. 80 */ 81 82 /* Sentinel */ 83 {NULL, InvalidOid} 84 }; 85 86 /* 87 * FDW-specific information for RelOptInfo.fdw_private. 88 */ 89 typedef struct FileFdwPlanState 90 { 91 char *filename; /* file or program to read from */ 92 bool is_program; /* true if filename represents an OS command */ 93 List *options; /* merged COPY options, excluding filename and 94 * is_program */ 95 BlockNumber pages; /* estimate of file's physical size */ 96 double ntuples; /* estimate of number of data rows */ 97 } FileFdwPlanState; 98 99 /* 100 * FDW-specific information for ForeignScanState.fdw_state. 101 */ 102 typedef struct FileFdwExecutionState 103 { 104 char *filename; /* file or program to read from */ 105 bool is_program; /* true if filename represents an OS command */ 106 List *options; /* merged COPY options, excluding filename and 107 * is_program */ 108 CopyState cstate; /* COPY execution state */ 109 } FileFdwExecutionState; 110 111 /* 112 * SQL functions 113 */ 114 PG_FUNCTION_INFO_V1(file_fdw_handler); 115 PG_FUNCTION_INFO_V1(file_fdw_validator); 116 117 /* 118 * FDW callback routines 119 */ 120 static void fileGetForeignRelSize(PlannerInfo *root, 121 RelOptInfo *baserel, 122 Oid foreigntableid); 123 static void fileGetForeignPaths(PlannerInfo *root, 124 RelOptInfo *baserel, 125 Oid foreigntableid); 126 static ForeignScan *fileGetForeignPlan(PlannerInfo *root, 127 RelOptInfo *baserel, 128 Oid foreigntableid, 129 ForeignPath *best_path, 130 List *tlist, 131 List *scan_clauses, 132 Plan *outer_plan); 133 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es); 134 static void fileBeginForeignScan(ForeignScanState *node, int eflags); 135 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node); 136 static void fileReScanForeignScan(ForeignScanState *node); 137 static void fileEndForeignScan(ForeignScanState *node); 138 static bool fileAnalyzeForeignTable(Relation relation, 139 AcquireSampleRowsFunc *func, 140 BlockNumber *totalpages); 141 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, 142 RangeTblEntry *rte); 143 144 /* 145 * Helper functions 146 */ 147 static bool is_valid_option(const char *option, Oid context); 148 static void fileGetOptions(Oid foreigntableid, 149 char **filename, 150 bool *is_program, 151 List **other_options); 152 static List *get_file_fdw_attribute_options(Oid relid); 153 static bool check_selective_binary_conversion(RelOptInfo *baserel, 154 Oid foreigntableid, 155 List **columns); 156 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel, 157 FileFdwPlanState *fdw_private); 158 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, 159 FileFdwPlanState *fdw_private, 160 Cost *startup_cost, Cost *total_cost); 161 static int file_acquire_sample_rows(Relation onerel, int elevel, 162 HeapTuple *rows, int targrows, 163 double *totalrows, double *totaldeadrows); 164 165 166 /* 167 * Foreign-data wrapper handler function: return a struct with pointers 168 * to my callback routines. 169 */ 170 Datum 171 file_fdw_handler(PG_FUNCTION_ARGS) 172 { 173 FdwRoutine *fdwroutine = makeNode(FdwRoutine); 174 175 fdwroutine->GetForeignRelSize = fileGetForeignRelSize; 176 fdwroutine->GetForeignPaths = fileGetForeignPaths; 177 fdwroutine->GetForeignPlan = fileGetForeignPlan; 178 fdwroutine->ExplainForeignScan = fileExplainForeignScan; 179 fdwroutine->BeginForeignScan = fileBeginForeignScan; 180 fdwroutine->IterateForeignScan = fileIterateForeignScan; 181 fdwroutine->ReScanForeignScan = fileReScanForeignScan; 182 fdwroutine->EndForeignScan = fileEndForeignScan; 183 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable; 184 fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe; 185 186 PG_RETURN_POINTER(fdwroutine); 187 } 188 189 /* 190 * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, 191 * USER MAPPING or FOREIGN TABLE that uses file_fdw. 192 * 193 * Raise an ERROR if the option or its value is considered invalid. 194 */ 195 Datum 196 file_fdw_validator(PG_FUNCTION_ARGS) 197 { 198 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); 199 Oid catalog = PG_GETARG_OID(1); 200 char *filename = NULL; 201 DefElem *force_not_null = NULL; 202 DefElem *force_null = NULL; 203 List *other_options = NIL; 204 ListCell *cell; 205 206 /* 207 * Check that only options supported by file_fdw, and allowed for the 208 * current object type, are given. 209 */ 210 foreach(cell, options_list) 211 { 212 DefElem *def = (DefElem *) lfirst(cell); 213 214 if (!is_valid_option(def->defname, catalog)) 215 { 216 const struct FileFdwOption *opt; 217 StringInfoData buf; 218 219 /* 220 * Unknown option specified, complain about it. Provide a hint 221 * with list of valid options for the object. 222 */ 223 initStringInfo(&buf); 224 for (opt = valid_options; opt->optname; opt++) 225 { 226 if (catalog == opt->optcontext) 227 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", 228 opt->optname); 229 } 230 231 ereport(ERROR, 232 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), 233 errmsg("invalid option \"%s\"", def->defname), 234 buf.len > 0 235 ? errhint("Valid options in this context are: %s", 236 buf.data) 237 : errhint("There are no valid options in this context."))); 238 } 239 240 /* 241 * Separate out filename, program, and column-specific options, since 242 * ProcessCopyOptions won't accept them. 243 */ 244 if (strcmp(def->defname, "filename") == 0 || 245 strcmp(def->defname, "program") == 0) 246 { 247 if (filename) 248 ereport(ERROR, 249 (errcode(ERRCODE_SYNTAX_ERROR), 250 errmsg("conflicting or redundant options"))); 251 252 /* 253 * Check permissions for changing which file or program is used by 254 * the file_fdw. 255 * 256 * Only members of the role 'pg_read_server_files' are allowed to 257 * set the 'filename' option of a file_fdw foreign table, while 258 * only members of the role 'pg_execute_server_program' are 259 * allowed to set the 'program' option. This is because we don't 260 * want regular users to be able to control which file gets read 261 * or which program gets executed. 262 * 263 * Putting this sort of permissions check in a validator is a bit 264 * of a crock, but there doesn't seem to be any other place that 265 * can enforce the check more cleanly. 266 * 267 * Note that the valid_options[] array disallows setting filename 268 * and program at any options level other than foreign table --- 269 * otherwise there'd still be a security hole. 270 */ 271 if (strcmp(def->defname, "filename") == 0 && 272 !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES)) 273 ereport(ERROR, 274 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 275 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table"))); 276 277 if (strcmp(def->defname, "program") == 0 && 278 !is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM)) 279 ereport(ERROR, 280 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 281 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table"))); 282 283 filename = defGetString(def); 284 } 285 286 /* 287 * force_not_null is a boolean option; after validation we can discard 288 * it - it will be retrieved later in get_file_fdw_attribute_options() 289 */ 290 else if (strcmp(def->defname, "force_not_null") == 0) 291 { 292 if (force_not_null) 293 ereport(ERROR, 294 (errcode(ERRCODE_SYNTAX_ERROR), 295 errmsg("conflicting or redundant options"), 296 errhint("Option \"force_not_null\" supplied more than once for a column."))); 297 force_not_null = def; 298 /* Don't care what the value is, as long as it's a legal boolean */ 299 (void) defGetBoolean(def); 300 } 301 /* See comments for force_not_null above */ 302 else if (strcmp(def->defname, "force_null") == 0) 303 { 304 if (force_null) 305 ereport(ERROR, 306 (errcode(ERRCODE_SYNTAX_ERROR), 307 errmsg("conflicting or redundant options"), 308 errhint("Option \"force_null\" supplied more than once for a column."))); 309 force_null = def; 310 (void) defGetBoolean(def); 311 } 312 else 313 other_options = lappend(other_options, def); 314 } 315 316 /* 317 * Now apply the core COPY code's validation logic for more checks. 318 */ 319 ProcessCopyOptions(NULL, NULL, true, other_options); 320 321 /* 322 * Either filename or program option is required for file_fdw foreign 323 * tables. 324 */ 325 if (catalog == ForeignTableRelationId && filename == NULL) 326 ereport(ERROR, 327 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), 328 errmsg("either filename or program is required for file_fdw foreign tables"))); 329 330 PG_RETURN_VOID(); 331 } 332 333 /* 334 * Check if the provided option is one of the valid options. 335 * context is the Oid of the catalog holding the object the option is for. 336 */ 337 static bool 338 is_valid_option(const char *option, Oid context) 339 { 340 const struct FileFdwOption *opt; 341 342 for (opt = valid_options; opt->optname; opt++) 343 { 344 if (context == opt->optcontext && strcmp(opt->optname, option) == 0) 345 return true; 346 } 347 return false; 348 } 349 350 /* 351 * Fetch the options for a file_fdw foreign table. 352 * 353 * We have to separate out filename/program from the other options because 354 * those must not appear in the options list passed to the core COPY code. 355 */ 356 static void 357 fileGetOptions(Oid foreigntableid, 358 char **filename, bool *is_program, List **other_options) 359 { 360 ForeignTable *table; 361 ForeignServer *server; 362 ForeignDataWrapper *wrapper; 363 List *options; 364 ListCell *lc; 365 366 /* 367 * Extract options from FDW objects. We ignore user mappings because 368 * file_fdw doesn't have any options that can be specified there. 369 * 370 * (XXX Actually, given the current contents of valid_options[], there's 371 * no point in examining anything except the foreign table's own options. 372 * Simplify?) 373 */ 374 table = GetForeignTable(foreigntableid); 375 server = GetForeignServer(table->serverid); 376 wrapper = GetForeignDataWrapper(server->fdwid); 377 378 options = NIL; 379 options = list_concat(options, wrapper->options); 380 options = list_concat(options, server->options); 381 options = list_concat(options, table->options); 382 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid)); 383 384 /* 385 * Separate out the filename or program option (we assume there is only 386 * one). 387 */ 388 *filename = NULL; 389 *is_program = false; 390 foreach(lc, options) 391 { 392 DefElem *def = (DefElem *) lfirst(lc); 393 394 if (strcmp(def->defname, "filename") == 0) 395 { 396 *filename = defGetString(def); 397 options = foreach_delete_current(options, lc); 398 break; 399 } 400 else if (strcmp(def->defname, "program") == 0) 401 { 402 *filename = defGetString(def); 403 *is_program = true; 404 options = foreach_delete_current(options, lc); 405 break; 406 } 407 } 408 409 /* 410 * The validator should have checked that filename or program was included 411 * in the options, but check again, just in case. 412 */ 413 if (*filename == NULL) 414 elog(ERROR, "either filename or program is required for file_fdw foreign tables"); 415 416 *other_options = options; 417 } 418 419 /* 420 * Retrieve per-column generic options from pg_attribute and construct a list 421 * of DefElems representing them. 422 * 423 * At the moment we only have "force_not_null", and "force_null", 424 * which should each be combined into a single DefElem listing all such 425 * columns, since that's what COPY expects. 426 */ 427 static List * 428 get_file_fdw_attribute_options(Oid relid) 429 { 430 Relation rel; 431 TupleDesc tupleDesc; 432 AttrNumber natts; 433 AttrNumber attnum; 434 List *fnncolumns = NIL; 435 List *fncolumns = NIL; 436 437 List *options = NIL; 438 439 rel = table_open(relid, AccessShareLock); 440 tupleDesc = RelationGetDescr(rel); 441 natts = tupleDesc->natts; 442 443 /* Retrieve FDW options for all user-defined attributes. */ 444 for (attnum = 1; attnum <= natts; attnum++) 445 { 446 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); 447 List *options; 448 ListCell *lc; 449 450 /* Skip dropped attributes. */ 451 if (attr->attisdropped) 452 continue; 453 454 options = GetForeignColumnOptions(relid, attnum); 455 foreach(lc, options) 456 { 457 DefElem *def = (DefElem *) lfirst(lc); 458 459 if (strcmp(def->defname, "force_not_null") == 0) 460 { 461 if (defGetBoolean(def)) 462 { 463 char *attname = pstrdup(NameStr(attr->attname)); 464 465 fnncolumns = lappend(fnncolumns, makeString(attname)); 466 } 467 } 468 else if (strcmp(def->defname, "force_null") == 0) 469 { 470 if (defGetBoolean(def)) 471 { 472 char *attname = pstrdup(NameStr(attr->attname)); 473 474 fncolumns = lappend(fncolumns, makeString(attname)); 475 } 476 } 477 /* maybe in future handle other options here */ 478 } 479 } 480 481 table_close(rel, AccessShareLock); 482 483 /* 484 * Return DefElem only when some column(s) have force_not_null / 485 * force_null options set 486 */ 487 if (fnncolumns != NIL) 488 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1)); 489 490 if (fncolumns != NIL) 491 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1)); 492 493 return options; 494 } 495 496 /* 497 * fileGetForeignRelSize 498 * Obtain relation size estimates for a foreign table 499 */ 500 static void 501 fileGetForeignRelSize(PlannerInfo *root, 502 RelOptInfo *baserel, 503 Oid foreigntableid) 504 { 505 FileFdwPlanState *fdw_private; 506 507 /* 508 * Fetch options. We only need filename (or program) at this point, but 509 * we might as well get everything and not need to re-fetch it later in 510 * planning. 511 */ 512 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState)); 513 fileGetOptions(foreigntableid, 514 &fdw_private->filename, 515 &fdw_private->is_program, 516 &fdw_private->options); 517 baserel->fdw_private = (void *) fdw_private; 518 519 /* Estimate relation size */ 520 estimate_size(root, baserel, fdw_private); 521 } 522 523 /* 524 * fileGetForeignPaths 525 * Create possible access paths for a scan on the foreign table 526 * 527 * Currently we don't support any push-down feature, so there is only one 528 * possible access path, which simply returns all records in the order in 529 * the data file. 530 */ 531 static void 532 fileGetForeignPaths(PlannerInfo *root, 533 RelOptInfo *baserel, 534 Oid foreigntableid) 535 { 536 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private; 537 Cost startup_cost; 538 Cost total_cost; 539 List *columns; 540 List *coptions = NIL; 541 542 /* Decide whether to selectively perform binary conversion */ 543 if (check_selective_binary_conversion(baserel, 544 foreigntableid, 545 &columns)) 546 coptions = list_make1(makeDefElem("convert_selectively", 547 (Node *) columns, -1)); 548 549 /* Estimate costs */ 550 estimate_costs(root, baserel, fdw_private, 551 &startup_cost, &total_cost); 552 553 /* 554 * Create a ForeignPath node and add it as only possible path. We use the 555 * fdw_private list of the path to carry the convert_selectively option; 556 * it will be propagated into the fdw_private list of the Plan node. 557 * 558 * We don't support pushing join clauses into the quals of this path, but 559 * it could still have required parameterization due to LATERAL refs in 560 * its tlist. 561 */ 562 add_path(baserel, (Path *) 563 create_foreignscan_path(root, baserel, 564 NULL, /* default pathtarget */ 565 baserel->rows, 566 startup_cost, 567 total_cost, 568 NIL, /* no pathkeys */ 569 baserel->lateral_relids, 570 NULL, /* no extra plan */ 571 coptions)); 572 573 /* 574 * If data file was sorted, and we knew it somehow, we could insert 575 * appropriate pathkeys into the ForeignPath node to tell the planner 576 * that. 577 */ 578 } 579 580 /* 581 * fileGetForeignPlan 582 * Create a ForeignScan plan node for scanning the foreign table 583 */ 584 static ForeignScan * 585 fileGetForeignPlan(PlannerInfo *root, 586 RelOptInfo *baserel, 587 Oid foreigntableid, 588 ForeignPath *best_path, 589 List *tlist, 590 List *scan_clauses, 591 Plan *outer_plan) 592 { 593 Index scan_relid = baserel->relid; 594 595 /* 596 * We have no native ability to evaluate restriction clauses, so we just 597 * put all the scan_clauses into the plan node's qual list for the 598 * executor to check. So all we have to do here is strip RestrictInfo 599 * nodes from the clauses and ignore pseudoconstants (which will be 600 * handled elsewhere). 601 */ 602 scan_clauses = extract_actual_clauses(scan_clauses, false); 603 604 /* Create the ForeignScan node */ 605 return make_foreignscan(tlist, 606 scan_clauses, 607 scan_relid, 608 NIL, /* no expressions to evaluate */ 609 best_path->fdw_private, 610 NIL, /* no custom tlist */ 611 NIL, /* no remote quals */ 612 outer_plan); 613 } 614 615 /* 616 * fileExplainForeignScan 617 * Produce extra output for EXPLAIN 618 */ 619 static void 620 fileExplainForeignScan(ForeignScanState *node, ExplainState *es) 621 { 622 char *filename; 623 bool is_program; 624 List *options; 625 626 /* Fetch options --- we only need filename and is_program at this point */ 627 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), 628 &filename, &is_program, &options); 629 630 if (is_program) 631 ExplainPropertyText("Foreign Program", filename, es); 632 else 633 ExplainPropertyText("Foreign File", filename, es); 634 635 /* Suppress file size if we're not showing cost details */ 636 if (es->costs) 637 { 638 struct stat stat_buf; 639 640 if (!is_program && 641 stat(filename, &stat_buf) == 0) 642 ExplainPropertyInteger("Foreign File Size", "b", 643 (int64) stat_buf.st_size, es); 644 } 645 } 646 647 /* 648 * fileBeginForeignScan 649 * Initiate access to the file by creating CopyState 650 */ 651 static void 652 fileBeginForeignScan(ForeignScanState *node, int eflags) 653 { 654 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; 655 char *filename; 656 bool is_program; 657 List *options; 658 CopyState cstate; 659 FileFdwExecutionState *festate; 660 661 /* 662 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. 663 */ 664 if (eflags & EXEC_FLAG_EXPLAIN_ONLY) 665 return; 666 667 /* Fetch options of foreign table */ 668 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), 669 &filename, &is_program, &options); 670 671 /* Add any options from the plan (currently only convert_selectively) */ 672 options = list_concat(options, plan->fdw_private); 673 674 /* 675 * Create CopyState from FDW options. We always acquire all columns, so 676 * as to match the expected ScanTupleSlot signature. 677 */ 678 cstate = BeginCopyFrom(NULL, 679 node->ss.ss_currentRelation, 680 filename, 681 is_program, 682 NULL, 683 NIL, 684 options); 685 686 /* 687 * Save state in node->fdw_state. We must save enough information to call 688 * BeginCopyFrom() again. 689 */ 690 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState)); 691 festate->filename = filename; 692 festate->is_program = is_program; 693 festate->options = options; 694 festate->cstate = cstate; 695 696 node->fdw_state = (void *) festate; 697 } 698 699 /* 700 * fileIterateForeignScan 701 * Read next record from the data file and store it into the 702 * ScanTupleSlot as a virtual tuple 703 */ 704 static TupleTableSlot * 705 fileIterateForeignScan(ForeignScanState *node) 706 { 707 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 708 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; 709 bool found; 710 ErrorContextCallback errcallback; 711 712 /* Set up callback to identify error line number. */ 713 errcallback.callback = CopyFromErrorCallback; 714 errcallback.arg = (void *) festate->cstate; 715 errcallback.previous = error_context_stack; 716 error_context_stack = &errcallback; 717 718 /* 719 * The protocol for loading a virtual tuple into a slot is first 720 * ExecClearTuple, then fill the values/isnull arrays, then 721 * ExecStoreVirtualTuple. If we don't find another row in the file, we 722 * just skip the last step, leaving the slot empty as required. 723 * 724 * We can pass ExprContext = NULL because we read all columns from the 725 * file, so no need to evaluate default expressions. 726 */ 727 ExecClearTuple(slot); 728 found = NextCopyFrom(festate->cstate, NULL, 729 slot->tts_values, slot->tts_isnull); 730 if (found) 731 ExecStoreVirtualTuple(slot); 732 733 /* Remove error callback. */ 734 error_context_stack = errcallback.previous; 735 736 return slot; 737 } 738 739 /* 740 * fileReScanForeignScan 741 * Rescan table, possibly with new parameters 742 */ 743 static void 744 fileReScanForeignScan(ForeignScanState *node) 745 { 746 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 747 748 EndCopyFrom(festate->cstate); 749 750 festate->cstate = BeginCopyFrom(NULL, 751 node->ss.ss_currentRelation, 752 festate->filename, 753 festate->is_program, 754 NULL, 755 NIL, 756 festate->options); 757 } 758 759 /* 760 * fileEndForeignScan 761 * Finish scanning foreign table and dispose objects used for this scan 762 */ 763 static void 764 fileEndForeignScan(ForeignScanState *node) 765 { 766 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; 767 768 /* if festate is NULL, we are in EXPLAIN; nothing to do */ 769 if (festate) 770 EndCopyFrom(festate->cstate); 771 } 772 773 /* 774 * fileAnalyzeForeignTable 775 * Test whether analyzing this foreign table is supported 776 */ 777 static bool 778 fileAnalyzeForeignTable(Relation relation, 779 AcquireSampleRowsFunc *func, 780 BlockNumber *totalpages) 781 { 782 char *filename; 783 bool is_program; 784 List *options; 785 struct stat stat_buf; 786 787 /* Fetch options of foreign table */ 788 fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options); 789 790 /* 791 * If this is a program instead of a file, just return false to skip 792 * analyzing the table. We could run the program and collect stats on 793 * whatever it currently returns, but it seems likely that in such cases 794 * the output would be too volatile for the stats to be useful. Maybe 795 * there should be an option to enable doing this? 796 */ 797 if (is_program) 798 return false; 799 800 /* 801 * Get size of the file. (XXX if we fail here, would it be better to just 802 * return false to skip analyzing the table?) 803 */ 804 if (stat(filename, &stat_buf) < 0) 805 ereport(ERROR, 806 (errcode_for_file_access(), 807 errmsg("could not stat file \"%s\": %m", 808 filename))); 809 810 /* 811 * Convert size to pages. Must return at least 1 so that we can tell 812 * later on that pg_class.relpages is not default. 813 */ 814 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; 815 if (*totalpages < 1) 816 *totalpages = 1; 817 818 *func = file_acquire_sample_rows; 819 820 return true; 821 } 822 823 /* 824 * fileIsForeignScanParallelSafe 825 * Reading a file, or external program, in a parallel worker should work 826 * just the same as reading it in the leader, so mark scans safe. 827 */ 828 static bool 829 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, 830 RangeTblEntry *rte) 831 { 832 return true; 833 } 834 835 /* 836 * check_selective_binary_conversion 837 * 838 * Check to see if it's useful to convert only a subset of the file's columns 839 * to binary. If so, construct a list of the column names to be converted, 840 * return that at *columns, and return true. (Note that it's possible to 841 * determine that no columns need be converted, for instance with a COUNT(*) 842 * query. So we can't use returning a NIL list to indicate failure.) 843 */ 844 static bool 845 check_selective_binary_conversion(RelOptInfo *baserel, 846 Oid foreigntableid, 847 List **columns) 848 { 849 ForeignTable *table; 850 ListCell *lc; 851 Relation rel; 852 TupleDesc tupleDesc; 853 AttrNumber attnum; 854 Bitmapset *attrs_used = NULL; 855 bool has_wholerow = false; 856 int numattrs; 857 int i; 858 859 *columns = NIL; /* default result */ 860 861 /* 862 * Check format of the file. If binary format, this is irrelevant. 863 */ 864 table = GetForeignTable(foreigntableid); 865 foreach(lc, table->options) 866 { 867 DefElem *def = (DefElem *) lfirst(lc); 868 869 if (strcmp(def->defname, "format") == 0) 870 { 871 char *format = defGetString(def); 872 873 if (strcmp(format, "binary") == 0) 874 return false; 875 break; 876 } 877 } 878 879 /* Collect all the attributes needed for joins or final output. */ 880 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, 881 &attrs_used); 882 883 /* Add all the attributes used by restriction clauses. */ 884 foreach(lc, baserel->baserestrictinfo) 885 { 886 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); 887 888 pull_varattnos((Node *) rinfo->clause, baserel->relid, 889 &attrs_used); 890 } 891 892 /* Convert attribute numbers to column names. */ 893 rel = table_open(foreigntableid, AccessShareLock); 894 tupleDesc = RelationGetDescr(rel); 895 896 while ((attnum = bms_first_member(attrs_used)) >= 0) 897 { 898 /* Adjust for system attributes. */ 899 attnum += FirstLowInvalidHeapAttributeNumber; 900 901 if (attnum == 0) 902 { 903 has_wholerow = true; 904 break; 905 } 906 907 /* Ignore system attributes. */ 908 if (attnum < 0) 909 continue; 910 911 /* Get user attributes. */ 912 if (attnum > 0) 913 { 914 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); 915 char *attname = NameStr(attr->attname); 916 917 /* Skip dropped attributes (probably shouldn't see any here). */ 918 if (attr->attisdropped) 919 continue; 920 921 /* 922 * Skip generated columns (COPY won't accept them in the column 923 * list) 924 */ 925 if (attr->attgenerated) 926 continue; 927 *columns = lappend(*columns, makeString(pstrdup(attname))); 928 } 929 } 930 931 /* Count non-dropped user attributes while we have the tupdesc. */ 932 numattrs = 0; 933 for (i = 0; i < tupleDesc->natts; i++) 934 { 935 Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); 936 937 if (attr->attisdropped) 938 continue; 939 numattrs++; 940 } 941 942 table_close(rel, AccessShareLock); 943 944 /* If there's a whole-row reference, fail: we need all the columns. */ 945 if (has_wholerow) 946 { 947 *columns = NIL; 948 return false; 949 } 950 951 /* If all the user attributes are needed, fail. */ 952 if (numattrs == list_length(*columns)) 953 { 954 *columns = NIL; 955 return false; 956 } 957 958 return true; 959 } 960 961 /* 962 * Estimate size of a foreign table. 963 * 964 * The main result is returned in baserel->rows. We also set 965 * fdw_private->pages and fdw_private->ntuples for later use in the cost 966 * calculation. 967 */ 968 static void 969 estimate_size(PlannerInfo *root, RelOptInfo *baserel, 970 FileFdwPlanState *fdw_private) 971 { 972 struct stat stat_buf; 973 BlockNumber pages; 974 double ntuples; 975 double nrows; 976 977 /* 978 * Get size of the file. It might not be there at plan time, though, in 979 * which case we have to use a default estimate. We also have to fall 980 * back to the default if using a program as the input. 981 */ 982 if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0) 983 stat_buf.st_size = 10 * BLCKSZ; 984 985 /* 986 * Convert size to pages for use in I/O cost estimate later. 987 */ 988 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; 989 if (pages < 1) 990 pages = 1; 991 fdw_private->pages = pages; 992 993 /* 994 * Estimate the number of tuples in the file. 995 */ 996 if (baserel->pages > 0) 997 { 998 /* 999 * We have # of pages and # of tuples from pg_class (that is, from a 1000 * previous ANALYZE), so compute a tuples-per-page estimate and scale 1001 * that by the current file size. 1002 */ 1003 double density; 1004 1005 density = baserel->tuples / (double) baserel->pages; 1006 ntuples = clamp_row_est(density * (double) pages); 1007 } 1008 else 1009 { 1010 /* 1011 * Otherwise we have to fake it. We back into this estimate using the 1012 * planner's idea of the relation width; which is bogus if not all 1013 * columns are being read, not to mention that the text representation 1014 * of a row probably isn't the same size as its internal 1015 * representation. Possibly we could do something better, but the 1016 * real answer to anyone who complains is "ANALYZE" ... 1017 */ 1018 int tuple_width; 1019 1020 tuple_width = MAXALIGN(baserel->reltarget->width) + 1021 MAXALIGN(SizeofHeapTupleHeader); 1022 ntuples = clamp_row_est((double) stat_buf.st_size / 1023 (double) tuple_width); 1024 } 1025 fdw_private->ntuples = ntuples; 1026 1027 /* 1028 * Now estimate the number of rows returned by the scan after applying the 1029 * baserestrictinfo quals. 1030 */ 1031 nrows = ntuples * 1032 clauselist_selectivity(root, 1033 baserel->baserestrictinfo, 1034 0, 1035 JOIN_INNER, 1036 NULL); 1037 1038 nrows = clamp_row_est(nrows); 1039 1040 /* Save the output-rows estimate for the planner */ 1041 baserel->rows = nrows; 1042 } 1043 1044 /* 1045 * Estimate costs of scanning a foreign table. 1046 * 1047 * Results are returned in *startup_cost and *total_cost. 1048 */ 1049 static void 1050 estimate_costs(PlannerInfo *root, RelOptInfo *baserel, 1051 FileFdwPlanState *fdw_private, 1052 Cost *startup_cost, Cost *total_cost) 1053 { 1054 BlockNumber pages = fdw_private->pages; 1055 double ntuples = fdw_private->ntuples; 1056 Cost run_cost = 0; 1057 Cost cpu_per_tuple; 1058 1059 /* 1060 * We estimate costs almost the same way as cost_seqscan(), thus assuming 1061 * that I/O costs are equivalent to a regular table file of the same size. 1062 * However, we take per-tuple CPU costs as 10x of a seqscan, to account 1063 * for the cost of parsing records. 1064 * 1065 * In the case of a program source, this calculation is even more divorced 1066 * from reality, but we have no good alternative; and it's not clear that 1067 * the numbers we produce here matter much anyway, since there's only one 1068 * access path for the rel. 1069 */ 1070 run_cost += seq_page_cost * pages; 1071 1072 *startup_cost = baserel->baserestrictcost.startup; 1073 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple; 1074 run_cost += cpu_per_tuple * ntuples; 1075 *total_cost = *startup_cost + run_cost; 1076 } 1077 1078 /* 1079 * file_acquire_sample_rows -- acquire a random sample of rows from the table 1080 * 1081 * Selected rows are returned in the caller-allocated array rows[], 1082 * which must have at least targrows entries. 1083 * The actual number of rows selected is returned as the function result. 1084 * We also count the total number of rows in the file and return it into 1085 * *totalrows. Note that *totaldeadrows is always set to 0. 1086 * 1087 * Note that the returned list of rows is not always in order by physical 1088 * position in the file. Therefore, correlation estimates derived later 1089 * may be meaningless, but it's OK because we don't use the estimates 1090 * currently (the planner only pays attention to correlation for indexscans). 1091 */ 1092 static int 1093 file_acquire_sample_rows(Relation onerel, int elevel, 1094 HeapTuple *rows, int targrows, 1095 double *totalrows, double *totaldeadrows) 1096 { 1097 int numrows = 0; 1098 double rowstoskip = -1; /* -1 means not set yet */ 1099 ReservoirStateData rstate; 1100 TupleDesc tupDesc; 1101 Datum *values; 1102 bool *nulls; 1103 bool found; 1104 char *filename; 1105 bool is_program; 1106 List *options; 1107 CopyState cstate; 1108 ErrorContextCallback errcallback; 1109 MemoryContext oldcontext = CurrentMemoryContext; 1110 MemoryContext tupcontext; 1111 1112 Assert(onerel); 1113 Assert(targrows > 0); 1114 1115 tupDesc = RelationGetDescr(onerel); 1116 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); 1117 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); 1118 1119 /* Fetch options of foreign table */ 1120 fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options); 1121 1122 /* 1123 * Create CopyState from FDW options. 1124 */ 1125 cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL, 1126 options); 1127 1128 /* 1129 * Use per-tuple memory context to prevent leak of memory used to read 1130 * rows from the file with Copy routines. 1131 */ 1132 tupcontext = AllocSetContextCreate(CurrentMemoryContext, 1133 "file_fdw temporary context", 1134 ALLOCSET_DEFAULT_SIZES); 1135 1136 /* Prepare for sampling rows */ 1137 reservoir_init_selection_state(&rstate, targrows); 1138 1139 /* Set up callback to identify error line number. */ 1140 errcallback.callback = CopyFromErrorCallback; 1141 errcallback.arg = (void *) cstate; 1142 errcallback.previous = error_context_stack; 1143 error_context_stack = &errcallback; 1144 1145 *totalrows = 0; 1146 *totaldeadrows = 0; 1147 for (;;) 1148 { 1149 /* Check for user-requested abort or sleep */ 1150 vacuum_delay_point(); 1151 1152 /* Fetch next row */ 1153 MemoryContextReset(tupcontext); 1154 MemoryContextSwitchTo(tupcontext); 1155 1156 found = NextCopyFrom(cstate, NULL, values, nulls); 1157 1158 MemoryContextSwitchTo(oldcontext); 1159 1160 if (!found) 1161 break; 1162 1163 /* 1164 * The first targrows sample rows are simply copied into the 1165 * reservoir. Then we start replacing tuples in the sample until we 1166 * reach the end of the relation. This algorithm is from Jeff Vitter's 1167 * paper (see more info in commands/analyze.c). 1168 */ 1169 if (numrows < targrows) 1170 { 1171 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls); 1172 } 1173 else 1174 { 1175 /* 1176 * t in Vitter's paper is the number of records already processed. 1177 * If we need to compute a new S value, we must use the 1178 * not-yet-incremented value of totalrows as t. 1179 */ 1180 if (rowstoskip < 0) 1181 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows); 1182 1183 if (rowstoskip <= 0) 1184 { 1185 /* 1186 * Found a suitable tuple, so save it, replacing one old tuple 1187 * at random 1188 */ 1189 int k = (int) (targrows * sampler_random_fract(rstate.randstate)); 1190 1191 Assert(k >= 0 && k < targrows); 1192 heap_freetuple(rows[k]); 1193 rows[k] = heap_form_tuple(tupDesc, values, nulls); 1194 } 1195 1196 rowstoskip -= 1; 1197 } 1198 1199 *totalrows += 1; 1200 } 1201 1202 /* Remove error callback. */ 1203 error_context_stack = errcallback.previous; 1204 1205 /* Clean up. */ 1206 MemoryContextDelete(tupcontext); 1207 1208 EndCopyFrom(cstate); 1209 1210 pfree(values); 1211 pfree(nulls); 1212 1213 /* 1214 * Emit some interesting relation info 1215 */ 1216 ereport(elevel, 1217 (errmsg("\"%s\": file contains %.0f rows; " 1218 "%d rows in sample", 1219 RelationGetRelationName(onerel), 1220 *totalrows, numrows))); 1221 1222 return numrows; 1223 } 1224