1 /*-------------------------------------------------------------------------
2 *
3 * file_fdw.c
4 * foreign-data wrapper for server-side flat files (or programs).
5 *
6 * Copyright (c) 2010-2017, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/file_fdw/file_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include <sys/stat.h>
16 #include <unistd.h>
17
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "catalog/pg_foreign_table.h"
22 #include "commands/copy.h"
23 #include "commands/defrem.h"
24 #include "commands/explain.h"
25 #include "commands/vacuum.h"
26 #include "foreign/fdwapi.h"
27 #include "foreign/foreign.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/pathnode.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 #include "utils/sampling.h"
38
39 PG_MODULE_MAGIC;
40
41 /*
42 * Describes the valid options for objects that use this wrapper.
43 */
44 struct FileFdwOption
45 {
46 const char *optname;
47 Oid optcontext; /* Oid of catalog in which option may appear */
48 };
49
50 /*
51 * Valid options for file_fdw.
52 * These options are based on the options for the COPY FROM command.
53 * But note that force_not_null and force_null are handled as boolean options
54 * attached to a column, not as table options.
55 *
56 * Note: If you are adding new option for user mapping, you need to modify
57 * fileGetOptions(), which currently doesn't bother to look at user mappings.
58 */
59 static const struct FileFdwOption valid_options[] = {
60 /* Data source options */
61 {"filename", ForeignTableRelationId},
62 {"program", ForeignTableRelationId},
63
64 /* Format options */
65 /* oids option is not supported */
66 {"format", ForeignTableRelationId},
67 {"header", ForeignTableRelationId},
68 {"delimiter", ForeignTableRelationId},
69 {"quote", ForeignTableRelationId},
70 {"escape", ForeignTableRelationId},
71 {"null", ForeignTableRelationId},
72 {"encoding", ForeignTableRelationId},
73 {"force_not_null", AttributeRelationId},
74 {"force_null", AttributeRelationId},
75
76 /*
77 * force_quote is not supported by file_fdw because it's for COPY TO.
78 */
79
80 /* Sentinel */
81 {NULL, InvalidOid}
82 };
83
84 /*
85 * FDW-specific information for RelOptInfo.fdw_private.
86 */
87 typedef struct FileFdwPlanState
88 {
89 char *filename; /* file or program to read from */
90 bool is_program; /* true if filename represents an OS command */
91 List *options; /* merged COPY options, excluding filename and
92 * is_program */
93 BlockNumber pages; /* estimate of file's physical size */
94 double ntuples; /* estimate of number of data rows */
95 } FileFdwPlanState;
96
97 /*
98 * FDW-specific information for ForeignScanState.fdw_state.
99 */
100 typedef struct FileFdwExecutionState
101 {
102 char *filename; /* file or program to read from */
103 bool is_program; /* true if filename represents an OS command */
104 List *options; /* merged COPY options, excluding filename and
105 * is_program */
106 CopyState cstate; /* COPY execution state */
107 } FileFdwExecutionState;
108
109 /*
110 * SQL functions
111 */
112 PG_FUNCTION_INFO_V1(file_fdw_handler);
113 PG_FUNCTION_INFO_V1(file_fdw_validator);
114
115 /*
116 * FDW callback routines
117 */
118 static void fileGetForeignRelSize(PlannerInfo *root,
119 RelOptInfo *baserel,
120 Oid foreigntableid);
121 static void fileGetForeignPaths(PlannerInfo *root,
122 RelOptInfo *baserel,
123 Oid foreigntableid);
124 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
125 RelOptInfo *baserel,
126 Oid foreigntableid,
127 ForeignPath *best_path,
128 List *tlist,
129 List *scan_clauses,
130 Plan *outer_plan);
131 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
132 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
133 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
134 static void fileReScanForeignScan(ForeignScanState *node);
135 static void fileEndForeignScan(ForeignScanState *node);
136 static bool fileAnalyzeForeignTable(Relation relation,
137 AcquireSampleRowsFunc *func,
138 BlockNumber *totalpages);
139 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
140 RangeTblEntry *rte);
141
142 /*
143 * Helper functions
144 */
145 static bool is_valid_option(const char *option, Oid context);
146 static void fileGetOptions(Oid foreigntableid,
147 char **filename,
148 bool *is_program,
149 List **other_options);
150 static List *get_file_fdw_attribute_options(Oid relid);
151 static bool check_selective_binary_conversion(RelOptInfo *baserel,
152 Oid foreigntableid,
153 List **columns);
154 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
155 FileFdwPlanState *fdw_private);
156 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
157 FileFdwPlanState *fdw_private,
158 Cost *startup_cost, Cost *total_cost);
159 static int file_acquire_sample_rows(Relation onerel, int elevel,
160 HeapTuple *rows, int targrows,
161 double *totalrows, double *totaldeadrows);
162
163
164 /*
165 * Foreign-data wrapper handler function: return a struct with pointers
166 * to my callback routines.
167 */
168 Datum
file_fdw_handler(PG_FUNCTION_ARGS)169 file_fdw_handler(PG_FUNCTION_ARGS)
170 {
171 FdwRoutine *fdwroutine = makeNode(FdwRoutine);
172
173 fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
174 fdwroutine->GetForeignPaths = fileGetForeignPaths;
175 fdwroutine->GetForeignPlan = fileGetForeignPlan;
176 fdwroutine->ExplainForeignScan = fileExplainForeignScan;
177 fdwroutine->BeginForeignScan = fileBeginForeignScan;
178 fdwroutine->IterateForeignScan = fileIterateForeignScan;
179 fdwroutine->ReScanForeignScan = fileReScanForeignScan;
180 fdwroutine->EndForeignScan = fileEndForeignScan;
181 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
182 fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
183
184 PG_RETURN_POINTER(fdwroutine);
185 }
186
187 /*
188 * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
189 * USER MAPPING or FOREIGN TABLE that uses file_fdw.
190 *
191 * Raise an ERROR if the option or its value is considered invalid.
192 */
193 Datum
file_fdw_validator(PG_FUNCTION_ARGS)194 file_fdw_validator(PG_FUNCTION_ARGS)
195 {
196 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
197 Oid catalog = PG_GETARG_OID(1);
198 char *filename = NULL;
199 DefElem *force_not_null = NULL;
200 DefElem *force_null = NULL;
201 List *other_options = NIL;
202 ListCell *cell;
203
204 /*
205 * Only superusers are allowed to set options of a file_fdw foreign table.
206 * This is because we don't want non-superusers to be able to control
207 * which file gets read or which program gets executed.
208 *
209 * Putting this sort of permissions check in a validator is a bit of a
210 * crock, but there doesn't seem to be any other place that can enforce
211 * the check more cleanly.
212 *
213 * Note that the valid_options[] array disallows setting filename and
214 * program at any options level other than foreign table --- otherwise
215 * there'd still be a security hole.
216 */
217 if (catalog == ForeignTableRelationId && !superuser())
218 ereport(ERROR,
219 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
220 errmsg("only superuser can change options of a file_fdw foreign table")));
221
222 /*
223 * Check that only options supported by file_fdw, and allowed for the
224 * current object type, are given.
225 */
226 foreach(cell, options_list)
227 {
228 DefElem *def = (DefElem *) lfirst(cell);
229
230 if (!is_valid_option(def->defname, catalog))
231 {
232 const struct FileFdwOption *opt;
233 StringInfoData buf;
234
235 /*
236 * Unknown option specified, complain about it. Provide a hint
237 * with list of valid options for the object.
238 */
239 initStringInfo(&buf);
240 for (opt = valid_options; opt->optname; opt++)
241 {
242 if (catalog == opt->optcontext)
243 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
244 opt->optname);
245 }
246
247 ereport(ERROR,
248 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
249 errmsg("invalid option \"%s\"", def->defname),
250 buf.len > 0
251 ? errhint("Valid options in this context are: %s",
252 buf.data)
253 : errhint("There are no valid options in this context.")));
254 }
255
256 /*
257 * Separate out filename, program, and column-specific options, since
258 * ProcessCopyOptions won't accept them.
259 */
260 if (strcmp(def->defname, "filename") == 0 ||
261 strcmp(def->defname, "program") == 0)
262 {
263 if (filename)
264 ereport(ERROR,
265 (errcode(ERRCODE_SYNTAX_ERROR),
266 errmsg("conflicting or redundant options")));
267 filename = defGetString(def);
268 }
269
270 /*
271 * force_not_null is a boolean option; after validation we can discard
272 * it - it will be retrieved later in get_file_fdw_attribute_options()
273 */
274 else if (strcmp(def->defname, "force_not_null") == 0)
275 {
276 if (force_not_null)
277 ereport(ERROR,
278 (errcode(ERRCODE_SYNTAX_ERROR),
279 errmsg("conflicting or redundant options"),
280 errhint("option \"force_not_null\" supplied more than once for a column")));
281 force_not_null = def;
282 /* Don't care what the value is, as long as it's a legal boolean */
283 (void) defGetBoolean(def);
284 }
285 /* See comments for force_not_null above */
286 else if (strcmp(def->defname, "force_null") == 0)
287 {
288 if (force_null)
289 ereport(ERROR,
290 (errcode(ERRCODE_SYNTAX_ERROR),
291 errmsg("conflicting or redundant options"),
292 errhint("option \"force_null\" supplied more than once for a column")));
293 force_null = def;
294 (void) defGetBoolean(def);
295 }
296 else
297 other_options = lappend(other_options, def);
298 }
299
300 /*
301 * Now apply the core COPY code's validation logic for more checks.
302 */
303 ProcessCopyOptions(NULL, NULL, true, other_options);
304
305 /*
306 * Either filename or program option is required for file_fdw foreign
307 * tables.
308 */
309 if (catalog == ForeignTableRelationId && filename == NULL)
310 ereport(ERROR,
311 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
312 errmsg("either filename or program is required for file_fdw foreign tables")));
313
314 PG_RETURN_VOID();
315 }
316
317 /*
318 * Check if the provided option is one of the valid options.
319 * context is the Oid of the catalog holding the object the option is for.
320 */
321 static bool
is_valid_option(const char * option,Oid context)322 is_valid_option(const char *option, Oid context)
323 {
324 const struct FileFdwOption *opt;
325
326 for (opt = valid_options; opt->optname; opt++)
327 {
328 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
329 return true;
330 }
331 return false;
332 }
333
334 /*
335 * Fetch the options for a file_fdw foreign table.
336 *
337 * We have to separate out filename/program from the other options because
338 * those must not appear in the options list passed to the core COPY code.
339 */
340 static void
fileGetOptions(Oid foreigntableid,char ** filename,bool * is_program,List ** other_options)341 fileGetOptions(Oid foreigntableid,
342 char **filename, bool *is_program, List **other_options)
343 {
344 ForeignTable *table;
345 ForeignServer *server;
346 ForeignDataWrapper *wrapper;
347 List *options;
348 ListCell *lc,
349 *prev;
350
351 /*
352 * Extract options from FDW objects. We ignore user mappings because
353 * file_fdw doesn't have any options that can be specified there.
354 *
355 * (XXX Actually, given the current contents of valid_options[], there's
356 * no point in examining anything except the foreign table's own options.
357 * Simplify?)
358 */
359 table = GetForeignTable(foreigntableid);
360 server = GetForeignServer(table->serverid);
361 wrapper = GetForeignDataWrapper(server->fdwid);
362
363 options = NIL;
364 options = list_concat(options, wrapper->options);
365 options = list_concat(options, server->options);
366 options = list_concat(options, table->options);
367 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
368
369 /*
370 * Separate out the filename or program option (we assume there is only
371 * one).
372 */
373 *filename = NULL;
374 *is_program = false;
375 prev = NULL;
376 foreach(lc, options)
377 {
378 DefElem *def = (DefElem *) lfirst(lc);
379
380 if (strcmp(def->defname, "filename") == 0)
381 {
382 *filename = defGetString(def);
383 options = list_delete_cell(options, lc, prev);
384 break;
385 }
386 else if (strcmp(def->defname, "program") == 0)
387 {
388 *filename = defGetString(def);
389 *is_program = true;
390 options = list_delete_cell(options, lc, prev);
391 break;
392 }
393 prev = lc;
394 }
395
396 /*
397 * The validator should have checked that filename or program was included
398 * in the options, but check again, just in case.
399 */
400 if (*filename == NULL)
401 elog(ERROR, "either filename or program is required for file_fdw foreign tables");
402
403 *other_options = options;
404 }
405
406 /*
407 * Retrieve per-column generic options from pg_attribute and construct a list
408 * of DefElems representing them.
409 *
410 * At the moment we only have "force_not_null", and "force_null",
411 * which should each be combined into a single DefElem listing all such
412 * columns, since that's what COPY expects.
413 */
414 static List *
get_file_fdw_attribute_options(Oid relid)415 get_file_fdw_attribute_options(Oid relid)
416 {
417 Relation rel;
418 TupleDesc tupleDesc;
419 AttrNumber natts;
420 AttrNumber attnum;
421 List *fnncolumns = NIL;
422 List *fncolumns = NIL;
423
424 List *options = NIL;
425
426 rel = heap_open(relid, AccessShareLock);
427 tupleDesc = RelationGetDescr(rel);
428 natts = tupleDesc->natts;
429
430 /* Retrieve FDW options for all user-defined attributes. */
431 for (attnum = 1; attnum <= natts; attnum++)
432 {
433 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
434 List *options;
435 ListCell *lc;
436
437 /* Skip dropped attributes. */
438 if (attr->attisdropped)
439 continue;
440
441 options = GetForeignColumnOptions(relid, attnum);
442 foreach(lc, options)
443 {
444 DefElem *def = (DefElem *) lfirst(lc);
445
446 if (strcmp(def->defname, "force_not_null") == 0)
447 {
448 if (defGetBoolean(def))
449 {
450 char *attname = pstrdup(NameStr(attr->attname));
451
452 fnncolumns = lappend(fnncolumns, makeString(attname));
453 }
454 }
455 else if (strcmp(def->defname, "force_null") == 0)
456 {
457 if (defGetBoolean(def))
458 {
459 char *attname = pstrdup(NameStr(attr->attname));
460
461 fncolumns = lappend(fncolumns, makeString(attname));
462 }
463 }
464 /* maybe in future handle other options here */
465 }
466 }
467
468 heap_close(rel, AccessShareLock);
469
470 /*
471 * Return DefElem only when some column(s) have force_not_null /
472 * force_null options set
473 */
474 if (fnncolumns != NIL)
475 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
476
477 if (fncolumns != NIL)
478 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
479
480 return options;
481 }
482
483 /*
484 * fileGetForeignRelSize
485 * Obtain relation size estimates for a foreign table
486 */
487 static void
fileGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)488 fileGetForeignRelSize(PlannerInfo *root,
489 RelOptInfo *baserel,
490 Oid foreigntableid)
491 {
492 FileFdwPlanState *fdw_private;
493
494 /*
495 * Fetch options. We only need filename (or program) at this point, but
496 * we might as well get everything and not need to re-fetch it later in
497 * planning.
498 */
499 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
500 fileGetOptions(foreigntableid,
501 &fdw_private->filename,
502 &fdw_private->is_program,
503 &fdw_private->options);
504 baserel->fdw_private = (void *) fdw_private;
505
506 /* Estimate relation size */
507 estimate_size(root, baserel, fdw_private);
508 }
509
510 /*
511 * fileGetForeignPaths
512 * Create possible access paths for a scan on the foreign table
513 *
514 * Currently we don't support any push-down feature, so there is only one
515 * possible access path, which simply returns all records in the order in
516 * the data file.
517 */
518 static void
fileGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)519 fileGetForeignPaths(PlannerInfo *root,
520 RelOptInfo *baserel,
521 Oid foreigntableid)
522 {
523 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
524 Cost startup_cost;
525 Cost total_cost;
526 List *columns;
527 List *coptions = NIL;
528
529 /* Decide whether to selectively perform binary conversion */
530 if (check_selective_binary_conversion(baserel,
531 foreigntableid,
532 &columns))
533 coptions = list_make1(makeDefElem("convert_selectively",
534 (Node *) columns, -1));
535
536 /* Estimate costs */
537 estimate_costs(root, baserel, fdw_private,
538 &startup_cost, &total_cost);
539
540 /*
541 * Create a ForeignPath node and add it as only possible path. We use the
542 * fdw_private list of the path to carry the convert_selectively option;
543 * it will be propagated into the fdw_private list of the Plan node.
544 *
545 * We don't support pushing join clauses into the quals of this path, but
546 * it could still have required parameterization due to LATERAL refs in
547 * its tlist.
548 */
549 add_path(baserel, (Path *)
550 create_foreignscan_path(root, baserel,
551 NULL, /* default pathtarget */
552 baserel->rows,
553 startup_cost,
554 total_cost,
555 NIL, /* no pathkeys */
556 baserel->lateral_relids,
557 NULL, /* no extra plan */
558 coptions));
559
560 /*
561 * If data file was sorted, and we knew it somehow, we could insert
562 * appropriate pathkeys into the ForeignPath node to tell the planner
563 * that.
564 */
565 }
566
567 /*
568 * fileGetForeignPlan
569 * Create a ForeignScan plan node for scanning the foreign table
570 */
571 static ForeignScan *
fileGetForeignPlan(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)572 fileGetForeignPlan(PlannerInfo *root,
573 RelOptInfo *baserel,
574 Oid foreigntableid,
575 ForeignPath *best_path,
576 List *tlist,
577 List *scan_clauses,
578 Plan *outer_plan)
579 {
580 Index scan_relid = baserel->relid;
581
582 /*
583 * We have no native ability to evaluate restriction clauses, so we just
584 * put all the scan_clauses into the plan node's qual list for the
585 * executor to check. So all we have to do here is strip RestrictInfo
586 * nodes from the clauses and ignore pseudoconstants (which will be
587 * handled elsewhere).
588 */
589 scan_clauses = extract_actual_clauses(scan_clauses, false);
590
591 /* Create the ForeignScan node */
592 return make_foreignscan(tlist,
593 scan_clauses,
594 scan_relid,
595 NIL, /* no expressions to evaluate */
596 best_path->fdw_private,
597 NIL, /* no custom tlist */
598 NIL, /* no remote quals */
599 outer_plan);
600 }
601
602 /*
603 * fileExplainForeignScan
604 * Produce extra output for EXPLAIN
605 */
606 static void
fileExplainForeignScan(ForeignScanState * node,ExplainState * es)607 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
608 {
609 char *filename;
610 bool is_program;
611 List *options;
612
613 /* Fetch options --- we only need filename and is_program at this point */
614 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
615 &filename, &is_program, &options);
616
617 if (is_program)
618 ExplainPropertyText("Foreign Program", filename, es);
619 else
620 ExplainPropertyText("Foreign File", filename, es);
621
622 /* Suppress file size if we're not showing cost details */
623 if (es->costs)
624 {
625 struct stat stat_buf;
626
627 if (!is_program &&
628 stat(filename, &stat_buf) == 0)
629 ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
630 es);
631 }
632 }
633
634 /*
635 * fileBeginForeignScan
636 * Initiate access to the file by creating CopyState
637 */
638 static void
fileBeginForeignScan(ForeignScanState * node,int eflags)639 fileBeginForeignScan(ForeignScanState *node, int eflags)
640 {
641 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
642 char *filename;
643 bool is_program;
644 List *options;
645 CopyState cstate;
646 FileFdwExecutionState *festate;
647
648 /*
649 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
650 */
651 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
652 return;
653
654 /* Fetch options of foreign table */
655 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
656 &filename, &is_program, &options);
657
658 /* Add any options from the plan (currently only convert_selectively) */
659 options = list_concat(options, plan->fdw_private);
660
661 /*
662 * Create CopyState from FDW options. We always acquire all columns, so
663 * as to match the expected ScanTupleSlot signature.
664 */
665 cstate = BeginCopyFrom(NULL,
666 node->ss.ss_currentRelation,
667 filename,
668 is_program,
669 NULL,
670 NIL,
671 options);
672
673 /*
674 * Save state in node->fdw_state. We must save enough information to call
675 * BeginCopyFrom() again.
676 */
677 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
678 festate->filename = filename;
679 festate->is_program = is_program;
680 festate->options = options;
681 festate->cstate = cstate;
682
683 node->fdw_state = (void *) festate;
684 }
685
686 /*
687 * fileIterateForeignScan
688 * Read next record from the data file and store it into the
689 * ScanTupleSlot as a virtual tuple
690 */
691 static TupleTableSlot *
fileIterateForeignScan(ForeignScanState * node)692 fileIterateForeignScan(ForeignScanState *node)
693 {
694 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
695 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
696 bool found;
697 ErrorContextCallback errcallback;
698
699 /* Set up callback to identify error line number. */
700 errcallback.callback = CopyFromErrorCallback;
701 errcallback.arg = (void *) festate->cstate;
702 errcallback.previous = error_context_stack;
703 error_context_stack = &errcallback;
704
705 /*
706 * The protocol for loading a virtual tuple into a slot is first
707 * ExecClearTuple, then fill the values/isnull arrays, then
708 * ExecStoreVirtualTuple. If we don't find another row in the file, we
709 * just skip the last step, leaving the slot empty as required.
710 *
711 * We can pass ExprContext = NULL because we read all columns from the
712 * file, so no need to evaluate default expressions.
713 *
714 * We can also pass tupleOid = NULL because we don't allow oids for
715 * foreign tables.
716 */
717 ExecClearTuple(slot);
718 found = NextCopyFrom(festate->cstate, NULL,
719 slot->tts_values, slot->tts_isnull,
720 NULL);
721 if (found)
722 ExecStoreVirtualTuple(slot);
723
724 /* Remove error callback. */
725 error_context_stack = errcallback.previous;
726
727 return slot;
728 }
729
730 /*
731 * fileReScanForeignScan
732 * Rescan table, possibly with new parameters
733 */
734 static void
fileReScanForeignScan(ForeignScanState * node)735 fileReScanForeignScan(ForeignScanState *node)
736 {
737 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
738
739 EndCopyFrom(festate->cstate);
740
741 festate->cstate = BeginCopyFrom(NULL,
742 node->ss.ss_currentRelation,
743 festate->filename,
744 festate->is_program,
745 NULL,
746 NIL,
747 festate->options);
748 }
749
750 /*
751 * fileEndForeignScan
752 * Finish scanning foreign table and dispose objects used for this scan
753 */
754 static void
fileEndForeignScan(ForeignScanState * node)755 fileEndForeignScan(ForeignScanState *node)
756 {
757 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
758
759 /* if festate is NULL, we are in EXPLAIN; nothing to do */
760 if (festate)
761 EndCopyFrom(festate->cstate);
762 }
763
764 /*
765 * fileAnalyzeForeignTable
766 * Test whether analyzing this foreign table is supported
767 */
768 static bool
fileAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)769 fileAnalyzeForeignTable(Relation relation,
770 AcquireSampleRowsFunc *func,
771 BlockNumber *totalpages)
772 {
773 char *filename;
774 bool is_program;
775 List *options;
776 struct stat stat_buf;
777
778 /* Fetch options of foreign table */
779 fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
780
781 /*
782 * If this is a program instead of a file, just return false to skip
783 * analyzing the table. We could run the program and collect stats on
784 * whatever it currently returns, but it seems likely that in such cases
785 * the output would be too volatile for the stats to be useful. Maybe
786 * there should be an option to enable doing this?
787 */
788 if (is_program)
789 return false;
790
791 /*
792 * Get size of the file. (XXX if we fail here, would it be better to just
793 * return false to skip analyzing the table?)
794 */
795 if (stat(filename, &stat_buf) < 0)
796 ereport(ERROR,
797 (errcode_for_file_access(),
798 errmsg("could not stat file \"%s\": %m",
799 filename)));
800
801 /*
802 * Convert size to pages. Must return at least 1 so that we can tell
803 * later on that pg_class.relpages is not default.
804 */
805 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
806 if (*totalpages < 1)
807 *totalpages = 1;
808
809 *func = file_acquire_sample_rows;
810
811 return true;
812 }
813
814 /*
815 * fileIsForeignScanParallelSafe
816 * Reading a file, or external program, in a parallel worker should work
817 * just the same as reading it in the leader, so mark scans safe.
818 */
819 static bool
fileIsForeignScanParallelSafe(PlannerInfo * root,RelOptInfo * rel,RangeTblEntry * rte)820 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
821 RangeTblEntry *rte)
822 {
823 return true;
824 }
825
826 /*
827 * check_selective_binary_conversion
828 *
829 * Check to see if it's useful to convert only a subset of the file's columns
830 * to binary. If so, construct a list of the column names to be converted,
831 * return that at *columns, and return TRUE. (Note that it's possible to
832 * determine that no columns need be converted, for instance with a COUNT(*)
833 * query. So we can't use returning a NIL list to indicate failure.)
834 */
835 static bool
check_selective_binary_conversion(RelOptInfo * baserel,Oid foreigntableid,List ** columns)836 check_selective_binary_conversion(RelOptInfo *baserel,
837 Oid foreigntableid,
838 List **columns)
839 {
840 ForeignTable *table;
841 ListCell *lc;
842 Relation rel;
843 TupleDesc tupleDesc;
844 AttrNumber attnum;
845 Bitmapset *attrs_used = NULL;
846 bool has_wholerow = false;
847 int numattrs;
848 int i;
849
850 *columns = NIL; /* default result */
851
852 /*
853 * Check format of the file. If binary format, this is irrelevant.
854 */
855 table = GetForeignTable(foreigntableid);
856 foreach(lc, table->options)
857 {
858 DefElem *def = (DefElem *) lfirst(lc);
859
860 if (strcmp(def->defname, "format") == 0)
861 {
862 char *format = defGetString(def);
863
864 if (strcmp(format, "binary") == 0)
865 return false;
866 break;
867 }
868 }
869
870 /* Collect all the attributes needed for joins or final output. */
871 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
872 &attrs_used);
873
874 /* Add all the attributes used by restriction clauses. */
875 foreach(lc, baserel->baserestrictinfo)
876 {
877 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
878
879 pull_varattnos((Node *) rinfo->clause, baserel->relid,
880 &attrs_used);
881 }
882
883 /* Convert attribute numbers to column names. */
884 rel = heap_open(foreigntableid, AccessShareLock);
885 tupleDesc = RelationGetDescr(rel);
886
887 while ((attnum = bms_first_member(attrs_used)) >= 0)
888 {
889 /* Adjust for system attributes. */
890 attnum += FirstLowInvalidHeapAttributeNumber;
891
892 if (attnum == 0)
893 {
894 has_wholerow = true;
895 break;
896 }
897
898 /* Ignore system attributes. */
899 if (attnum < 0)
900 continue;
901
902 /* Get user attributes. */
903 if (attnum > 0)
904 {
905 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
906 char *attname = NameStr(attr->attname);
907
908 /* Skip dropped attributes (probably shouldn't see any here). */
909 if (attr->attisdropped)
910 continue;
911 *columns = lappend(*columns, makeString(pstrdup(attname)));
912 }
913 }
914
915 /* Count non-dropped user attributes while we have the tupdesc. */
916 numattrs = 0;
917 for (i = 0; i < tupleDesc->natts; i++)
918 {
919 Form_pg_attribute attr = tupleDesc->attrs[i];
920
921 if (attr->attisdropped)
922 continue;
923 numattrs++;
924 }
925
926 heap_close(rel, AccessShareLock);
927
928 /* If there's a whole-row reference, fail: we need all the columns. */
929 if (has_wholerow)
930 {
931 *columns = NIL;
932 return false;
933 }
934
935 /* If all the user attributes are needed, fail. */
936 if (numattrs == list_length(*columns))
937 {
938 *columns = NIL;
939 return false;
940 }
941
942 return true;
943 }
944
945 /*
946 * Estimate size of a foreign table.
947 *
948 * The main result is returned in baserel->rows. We also set
949 * fdw_private->pages and fdw_private->ntuples for later use in the cost
950 * calculation.
951 */
952 static void
estimate_size(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private)953 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
954 FileFdwPlanState *fdw_private)
955 {
956 struct stat stat_buf;
957 BlockNumber pages;
958 double ntuples;
959 double nrows;
960
961 /*
962 * Get size of the file. It might not be there at plan time, though, in
963 * which case we have to use a default estimate. We also have to fall
964 * back to the default if using a program as the input.
965 */
966 if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
967 stat_buf.st_size = 10 * BLCKSZ;
968
969 /*
970 * Convert size to pages for use in I/O cost estimate later.
971 */
972 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
973 if (pages < 1)
974 pages = 1;
975 fdw_private->pages = pages;
976
977 /*
978 * Estimate the number of tuples in the file.
979 */
980 if (baserel->pages > 0)
981 {
982 /*
983 * We have # of pages and # of tuples from pg_class (that is, from a
984 * previous ANALYZE), so compute a tuples-per-page estimate and scale
985 * that by the current file size.
986 */
987 double density;
988
989 density = baserel->tuples / (double) baserel->pages;
990 ntuples = clamp_row_est(density * (double) pages);
991 }
992 else
993 {
994 /*
995 * Otherwise we have to fake it. We back into this estimate using the
996 * planner's idea of the relation width; which is bogus if not all
997 * columns are being read, not to mention that the text representation
998 * of a row probably isn't the same size as its internal
999 * representation. Possibly we could do something better, but the
1000 * real answer to anyone who complains is "ANALYZE" ...
1001 */
1002 int tuple_width;
1003
1004 tuple_width = MAXALIGN(baserel->reltarget->width) +
1005 MAXALIGN(SizeofHeapTupleHeader);
1006 ntuples = clamp_row_est((double) stat_buf.st_size /
1007 (double) tuple_width);
1008 }
1009 fdw_private->ntuples = ntuples;
1010
1011 /*
1012 * Now estimate the number of rows returned by the scan after applying the
1013 * baserestrictinfo quals.
1014 */
1015 nrows = ntuples *
1016 clauselist_selectivity(root,
1017 baserel->baserestrictinfo,
1018 0,
1019 JOIN_INNER,
1020 NULL);
1021
1022 nrows = clamp_row_est(nrows);
1023
1024 /* Save the output-rows estimate for the planner */
1025 baserel->rows = nrows;
1026 }
1027
1028 /*
1029 * Estimate costs of scanning a foreign table.
1030 *
1031 * Results are returned in *startup_cost and *total_cost.
1032 */
1033 static void
estimate_costs(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private,Cost * startup_cost,Cost * total_cost)1034 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1035 FileFdwPlanState *fdw_private,
1036 Cost *startup_cost, Cost *total_cost)
1037 {
1038 BlockNumber pages = fdw_private->pages;
1039 double ntuples = fdw_private->ntuples;
1040 Cost run_cost = 0;
1041 Cost cpu_per_tuple;
1042
1043 /*
1044 * We estimate costs almost the same way as cost_seqscan(), thus assuming
1045 * that I/O costs are equivalent to a regular table file of the same size.
1046 * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1047 * for the cost of parsing records.
1048 *
1049 * In the case of a program source, this calculation is even more divorced
1050 * from reality, but we have no good alternative; and it's not clear that
1051 * the numbers we produce here matter much anyway, since there's only one
1052 * access path for the rel.
1053 */
1054 run_cost += seq_page_cost * pages;
1055
1056 *startup_cost = baserel->baserestrictcost.startup;
1057 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1058 run_cost += cpu_per_tuple * ntuples;
1059 *total_cost = *startup_cost + run_cost;
1060 }
1061
1062 /*
1063 * file_acquire_sample_rows -- acquire a random sample of rows from the table
1064 *
1065 * Selected rows are returned in the caller-allocated array rows[],
1066 * which must have at least targrows entries.
1067 * The actual number of rows selected is returned as the function result.
1068 * We also count the total number of rows in the file and return it into
1069 * *totalrows. Note that *totaldeadrows is always set to 0.
1070 *
1071 * Note that the returned list of rows is not always in order by physical
1072 * position in the file. Therefore, correlation estimates derived later
1073 * may be meaningless, but it's OK because we don't use the estimates
1074 * currently (the planner only pays attention to correlation for indexscans).
1075 */
1076 static int
file_acquire_sample_rows(Relation onerel,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)1077 file_acquire_sample_rows(Relation onerel, int elevel,
1078 HeapTuple *rows, int targrows,
1079 double *totalrows, double *totaldeadrows)
1080 {
1081 int numrows = 0;
1082 double rowstoskip = -1; /* -1 means not set yet */
1083 ReservoirStateData rstate;
1084 TupleDesc tupDesc;
1085 Datum *values;
1086 bool *nulls;
1087 bool found;
1088 char *filename;
1089 bool is_program;
1090 List *options;
1091 CopyState cstate;
1092 ErrorContextCallback errcallback;
1093 MemoryContext oldcontext = CurrentMemoryContext;
1094 MemoryContext tupcontext;
1095
1096 Assert(onerel);
1097 Assert(targrows > 0);
1098
1099 tupDesc = RelationGetDescr(onerel);
1100 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1101 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1102
1103 /* Fetch options of foreign table */
1104 fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1105
1106 /*
1107 * Create CopyState from FDW options.
1108 */
1109 cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
1110 options);
1111
1112 /*
1113 * Use per-tuple memory context to prevent leak of memory used to read
1114 * rows from the file with Copy routines.
1115 */
1116 tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1117 "file_fdw temporary context",
1118 ALLOCSET_DEFAULT_SIZES);
1119
1120 /* Prepare for sampling rows */
1121 reservoir_init_selection_state(&rstate, targrows);
1122
1123 /* Set up callback to identify error line number. */
1124 errcallback.callback = CopyFromErrorCallback;
1125 errcallback.arg = (void *) cstate;
1126 errcallback.previous = error_context_stack;
1127 error_context_stack = &errcallback;
1128
1129 *totalrows = 0;
1130 *totaldeadrows = 0;
1131 for (;;)
1132 {
1133 /* Check for user-requested abort or sleep */
1134 vacuum_delay_point();
1135
1136 /* Fetch next row */
1137 MemoryContextReset(tupcontext);
1138 MemoryContextSwitchTo(tupcontext);
1139
1140 found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
1141
1142 MemoryContextSwitchTo(oldcontext);
1143
1144 if (!found)
1145 break;
1146
1147 /*
1148 * The first targrows sample rows are simply copied into the
1149 * reservoir. Then we start replacing tuples in the sample until we
1150 * reach the end of the relation. This algorithm is from Jeff Vitter's
1151 * paper (see more info in commands/analyze.c).
1152 */
1153 if (numrows < targrows)
1154 {
1155 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1156 }
1157 else
1158 {
1159 /*
1160 * t in Vitter's paper is the number of records already processed.
1161 * If we need to compute a new S value, we must use the
1162 * not-yet-incremented value of totalrows as t.
1163 */
1164 if (rowstoskip < 0)
1165 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1166
1167 if (rowstoskip <= 0)
1168 {
1169 /*
1170 * Found a suitable tuple, so save it, replacing one old tuple
1171 * at random
1172 */
1173 int k = (int) (targrows * sampler_random_fract(rstate.randstate));
1174
1175 Assert(k >= 0 && k < targrows);
1176 heap_freetuple(rows[k]);
1177 rows[k] = heap_form_tuple(tupDesc, values, nulls);
1178 }
1179
1180 rowstoskip -= 1;
1181 }
1182
1183 *totalrows += 1;
1184 }
1185
1186 /* Remove error callback. */
1187 error_context_stack = errcallback.previous;
1188
1189 /* Clean up. */
1190 MemoryContextDelete(tupcontext);
1191
1192 EndCopyFrom(cstate);
1193
1194 pfree(values);
1195 pfree(nulls);
1196
1197 /*
1198 * Emit some interesting relation info
1199 */
1200 ereport(elevel,
1201 (errmsg("\"%s\": file contains %.0f rows; "
1202 "%d rows in sample",
1203 RelationGetRelationName(onerel),
1204 *totalrows, numrows)));
1205
1206 return numrows;
1207 }
1208