1 /*-------------------------------------------------------------------------
2 *
3 * file_fdw.c
4 * foreign-data wrapper for server-side flat files (or programs).
5 *
6 * Copyright (c) 2010-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/file_fdw/file_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include <sys/stat.h>
16 #include <unistd.h>
17
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "access/table.h"
22 #include "catalog/pg_authid.h"
23 #include "catalog/pg_foreign_table.h"
24 #include "commands/copy.h"
25 #include "commands/defrem.h"
26 #include "commands/explain.h"
27 #include "commands/vacuum.h"
28 #include "foreign/fdwapi.h"
29 #include "foreign/foreign.h"
30 #include "miscadmin.h"
31 #include "nodes/makefuncs.h"
32 #include "optimizer/optimizer.h"
33 #include "optimizer/pathnode.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "utils/memutils.h"
37 #include "utils/rel.h"
38 #include "utils/sampling.h"
39
40 PG_MODULE_MAGIC;
41
42 /*
43 * Describes the valid options for objects that use this wrapper.
44 */
45 struct FileFdwOption
46 {
47 const char *optname;
48 Oid optcontext; /* Oid of catalog in which option may appear */
49 };
50
51 /*
52 * Valid options for file_fdw.
53 * These options are based on the options for the COPY FROM command.
54 * But note that force_not_null and force_null are handled as boolean options
55 * attached to a column, not as table options.
56 *
57 * Note: If you are adding new option for user mapping, you need to modify
58 * fileGetOptions(), which currently doesn't bother to look at user mappings.
59 */
60 static const struct FileFdwOption valid_options[] = {
61 /* Data source options */
62 {"filename", ForeignTableRelationId},
63 {"program", ForeignTableRelationId},
64
65 /* Format options */
66 /* oids option is not supported */
67 {"format", ForeignTableRelationId},
68 {"header", ForeignTableRelationId},
69 {"delimiter", ForeignTableRelationId},
70 {"quote", ForeignTableRelationId},
71 {"escape", ForeignTableRelationId},
72 {"null", ForeignTableRelationId},
73 {"encoding", ForeignTableRelationId},
74 {"force_not_null", AttributeRelationId},
75 {"force_null", AttributeRelationId},
76
77 /*
78 * force_quote is not supported by file_fdw because it's for COPY TO.
79 */
80
81 /* Sentinel */
82 {NULL, InvalidOid}
83 };
84
85 /*
86 * FDW-specific information for RelOptInfo.fdw_private.
87 */
88 typedef struct FileFdwPlanState
89 {
90 char *filename; /* file or program to read from */
91 bool is_program; /* true if filename represents an OS command */
92 List *options; /* merged COPY options, excluding filename and
93 * is_program */
94 BlockNumber pages; /* estimate of file's physical size */
95 double ntuples; /* estimate of number of data rows */
96 } FileFdwPlanState;
97
98 /*
99 * FDW-specific information for ForeignScanState.fdw_state.
100 */
101 typedef struct FileFdwExecutionState
102 {
103 char *filename; /* file or program to read from */
104 bool is_program; /* true if filename represents an OS command */
105 List *options; /* merged COPY options, excluding filename and
106 * is_program */
107 CopyState cstate; /* COPY execution state */
108 } FileFdwExecutionState;
109
110 /*
111 * SQL functions
112 */
113 PG_FUNCTION_INFO_V1(file_fdw_handler);
114 PG_FUNCTION_INFO_V1(file_fdw_validator);
115
116 /*
117 * FDW callback routines
118 */
119 static void fileGetForeignRelSize(PlannerInfo *root,
120 RelOptInfo *baserel,
121 Oid foreigntableid);
122 static void fileGetForeignPaths(PlannerInfo *root,
123 RelOptInfo *baserel,
124 Oid foreigntableid);
125 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
126 RelOptInfo *baserel,
127 Oid foreigntableid,
128 ForeignPath *best_path,
129 List *tlist,
130 List *scan_clauses,
131 Plan *outer_plan);
132 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
133 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
134 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
135 static void fileReScanForeignScan(ForeignScanState *node);
136 static void fileEndForeignScan(ForeignScanState *node);
137 static bool fileAnalyzeForeignTable(Relation relation,
138 AcquireSampleRowsFunc *func,
139 BlockNumber *totalpages);
140 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
141 RangeTblEntry *rte);
142
143 /*
144 * Helper functions
145 */
146 static bool is_valid_option(const char *option, Oid context);
147 static void fileGetOptions(Oid foreigntableid,
148 char **filename,
149 bool *is_program,
150 List **other_options);
151 static List *get_file_fdw_attribute_options(Oid relid);
152 static bool check_selective_binary_conversion(RelOptInfo *baserel,
153 Oid foreigntableid,
154 List **columns);
155 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
156 FileFdwPlanState *fdw_private);
157 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
158 FileFdwPlanState *fdw_private,
159 Cost *startup_cost, Cost *total_cost);
160 static int file_acquire_sample_rows(Relation onerel, int elevel,
161 HeapTuple *rows, int targrows,
162 double *totalrows, double *totaldeadrows);
163
164
165 /*
166 * Foreign-data wrapper handler function: return a struct with pointers
167 * to my callback routines.
168 */
169 Datum
file_fdw_handler(PG_FUNCTION_ARGS)170 file_fdw_handler(PG_FUNCTION_ARGS)
171 {
172 FdwRoutine *fdwroutine = makeNode(FdwRoutine);
173
174 fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
175 fdwroutine->GetForeignPaths = fileGetForeignPaths;
176 fdwroutine->GetForeignPlan = fileGetForeignPlan;
177 fdwroutine->ExplainForeignScan = fileExplainForeignScan;
178 fdwroutine->BeginForeignScan = fileBeginForeignScan;
179 fdwroutine->IterateForeignScan = fileIterateForeignScan;
180 fdwroutine->ReScanForeignScan = fileReScanForeignScan;
181 fdwroutine->EndForeignScan = fileEndForeignScan;
182 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
183 fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
184
185 PG_RETURN_POINTER(fdwroutine);
186 }
187
188 /*
189 * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
190 * USER MAPPING or FOREIGN TABLE that uses file_fdw.
191 *
192 * Raise an ERROR if the option or its value is considered invalid.
193 */
194 Datum
file_fdw_validator(PG_FUNCTION_ARGS)195 file_fdw_validator(PG_FUNCTION_ARGS)
196 {
197 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
198 Oid catalog = PG_GETARG_OID(1);
199 char *filename = NULL;
200 DefElem *force_not_null = NULL;
201 DefElem *force_null = NULL;
202 List *other_options = NIL;
203 ListCell *cell;
204
205 /*
206 * Check that only options supported by file_fdw, and allowed for the
207 * current object type, are given.
208 */
209 foreach(cell, options_list)
210 {
211 DefElem *def = (DefElem *) lfirst(cell);
212
213 if (!is_valid_option(def->defname, catalog))
214 {
215 const struct FileFdwOption *opt;
216 StringInfoData buf;
217
218 /*
219 * Unknown option specified, complain about it. Provide a hint
220 * with list of valid options for the object.
221 */
222 initStringInfo(&buf);
223 for (opt = valid_options; opt->optname; opt++)
224 {
225 if (catalog == opt->optcontext)
226 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
227 opt->optname);
228 }
229
230 ereport(ERROR,
231 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
232 errmsg("invalid option \"%s\"", def->defname),
233 buf.len > 0
234 ? errhint("Valid options in this context are: %s",
235 buf.data)
236 : errhint("There are no valid options in this context.")));
237 }
238
239 /*
240 * Separate out filename, program, and column-specific options, since
241 * ProcessCopyOptions won't accept them.
242 */
243 if (strcmp(def->defname, "filename") == 0 ||
244 strcmp(def->defname, "program") == 0)
245 {
246 if (filename)
247 ereport(ERROR,
248 (errcode(ERRCODE_SYNTAX_ERROR),
249 errmsg("conflicting or redundant options")));
250
251 /*
252 * Check permissions for changing which file or program is used by
253 * the file_fdw.
254 *
255 * Only members of the role 'pg_read_server_files' are allowed to
256 * set the 'filename' option of a file_fdw foreign table, while
257 * only members of the role 'pg_execute_server_program' are
258 * allowed to set the 'program' option. This is because we don't
259 * want regular users to be able to control which file gets read
260 * or which program gets executed.
261 *
262 * Putting this sort of permissions check in a validator is a bit
263 * of a crock, but there doesn't seem to be any other place that
264 * can enforce the check more cleanly.
265 *
266 * Note that the valid_options[] array disallows setting filename
267 * and program at any options level other than foreign table ---
268 * otherwise there'd still be a security hole.
269 */
270 if (strcmp(def->defname, "filename") == 0 &&
271 !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
272 ereport(ERROR,
273 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
274 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table")));
275
276 if (strcmp(def->defname, "program") == 0 &&
277 !is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
278 ereport(ERROR,
279 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
280 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table")));
281
282 filename = defGetString(def);
283 }
284
285 /*
286 * force_not_null is a boolean option; after validation we can discard
287 * it - it will be retrieved later in get_file_fdw_attribute_options()
288 */
289 else if (strcmp(def->defname, "force_not_null") == 0)
290 {
291 if (force_not_null)
292 ereport(ERROR,
293 (errcode(ERRCODE_SYNTAX_ERROR),
294 errmsg("conflicting or redundant options"),
295 errhint("Option \"force_not_null\" supplied more than once for a column.")));
296 force_not_null = def;
297 /* Don't care what the value is, as long as it's a legal boolean */
298 (void) defGetBoolean(def);
299 }
300 /* See comments for force_not_null above */
301 else if (strcmp(def->defname, "force_null") == 0)
302 {
303 if (force_null)
304 ereport(ERROR,
305 (errcode(ERRCODE_SYNTAX_ERROR),
306 errmsg("conflicting or redundant options"),
307 errhint("Option \"force_null\" supplied more than once for a column.")));
308 force_null = def;
309 (void) defGetBoolean(def);
310 }
311 else
312 other_options = lappend(other_options, def);
313 }
314
315 /*
316 * Now apply the core COPY code's validation logic for more checks.
317 */
318 ProcessCopyOptions(NULL, NULL, true, other_options);
319
320 /*
321 * Either filename or program option is required for file_fdw foreign
322 * tables.
323 */
324 if (catalog == ForeignTableRelationId && filename == NULL)
325 ereport(ERROR,
326 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
327 errmsg("either filename or program is required for file_fdw foreign tables")));
328
329 PG_RETURN_VOID();
330 }
331
332 /*
333 * Check if the provided option is one of the valid options.
334 * context is the Oid of the catalog holding the object the option is for.
335 */
336 static bool
is_valid_option(const char * option,Oid context)337 is_valid_option(const char *option, Oid context)
338 {
339 const struct FileFdwOption *opt;
340
341 for (opt = valid_options; opt->optname; opt++)
342 {
343 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
344 return true;
345 }
346 return false;
347 }
348
349 /*
350 * Fetch the options for a file_fdw foreign table.
351 *
352 * We have to separate out filename/program from the other options because
353 * those must not appear in the options list passed to the core COPY code.
354 */
355 static void
fileGetOptions(Oid foreigntableid,char ** filename,bool * is_program,List ** other_options)356 fileGetOptions(Oid foreigntableid,
357 char **filename, bool *is_program, List **other_options)
358 {
359 ForeignTable *table;
360 ForeignServer *server;
361 ForeignDataWrapper *wrapper;
362 List *options;
363 ListCell *lc,
364 *prev;
365
366 /*
367 * Extract options from FDW objects. We ignore user mappings because
368 * file_fdw doesn't have any options that can be specified there.
369 *
370 * (XXX Actually, given the current contents of valid_options[], there's
371 * no point in examining anything except the foreign table's own options.
372 * Simplify?)
373 */
374 table = GetForeignTable(foreigntableid);
375 server = GetForeignServer(table->serverid);
376 wrapper = GetForeignDataWrapper(server->fdwid);
377
378 options = NIL;
379 options = list_concat(options, wrapper->options);
380 options = list_concat(options, server->options);
381 options = list_concat(options, table->options);
382 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
383
384 /*
385 * Separate out the filename or program option (we assume there is only
386 * one).
387 */
388 *filename = NULL;
389 *is_program = false;
390 prev = NULL;
391 foreach(lc, options)
392 {
393 DefElem *def = (DefElem *) lfirst(lc);
394
395 if (strcmp(def->defname, "filename") == 0)
396 {
397 *filename = defGetString(def);
398 options = list_delete_cell(options, lc, prev);
399 break;
400 }
401 else if (strcmp(def->defname, "program") == 0)
402 {
403 *filename = defGetString(def);
404 *is_program = true;
405 options = list_delete_cell(options, lc, prev);
406 break;
407 }
408 prev = lc;
409 }
410
411 /*
412 * The validator should have checked that filename or program was included
413 * in the options, but check again, just in case.
414 */
415 if (*filename == NULL)
416 elog(ERROR, "either filename or program is required for file_fdw foreign tables");
417
418 *other_options = options;
419 }
420
421 /*
422 * Retrieve per-column generic options from pg_attribute and construct a list
423 * of DefElems representing them.
424 *
425 * At the moment we only have "force_not_null", and "force_null",
426 * which should each be combined into a single DefElem listing all such
427 * columns, since that's what COPY expects.
428 */
429 static List *
get_file_fdw_attribute_options(Oid relid)430 get_file_fdw_attribute_options(Oid relid)
431 {
432 Relation rel;
433 TupleDesc tupleDesc;
434 AttrNumber natts;
435 AttrNumber attnum;
436 List *fnncolumns = NIL;
437 List *fncolumns = NIL;
438
439 List *options = NIL;
440
441 rel = table_open(relid, AccessShareLock);
442 tupleDesc = RelationGetDescr(rel);
443 natts = tupleDesc->natts;
444
445 /* Retrieve FDW options for all user-defined attributes. */
446 for (attnum = 1; attnum <= natts; attnum++)
447 {
448 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
449 List *options;
450 ListCell *lc;
451
452 /* Skip dropped attributes. */
453 if (attr->attisdropped)
454 continue;
455
456 options = GetForeignColumnOptions(relid, attnum);
457 foreach(lc, options)
458 {
459 DefElem *def = (DefElem *) lfirst(lc);
460
461 if (strcmp(def->defname, "force_not_null") == 0)
462 {
463 if (defGetBoolean(def))
464 {
465 char *attname = pstrdup(NameStr(attr->attname));
466
467 fnncolumns = lappend(fnncolumns, makeString(attname));
468 }
469 }
470 else if (strcmp(def->defname, "force_null") == 0)
471 {
472 if (defGetBoolean(def))
473 {
474 char *attname = pstrdup(NameStr(attr->attname));
475
476 fncolumns = lappend(fncolumns, makeString(attname));
477 }
478 }
479 /* maybe in future handle other options here */
480 }
481 }
482
483 table_close(rel, AccessShareLock);
484
485 /*
486 * Return DefElem only when some column(s) have force_not_null /
487 * force_null options set
488 */
489 if (fnncolumns != NIL)
490 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
491
492 if (fncolumns != NIL)
493 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
494
495 return options;
496 }
497
498 /*
499 * fileGetForeignRelSize
500 * Obtain relation size estimates for a foreign table
501 */
502 static void
fileGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)503 fileGetForeignRelSize(PlannerInfo *root,
504 RelOptInfo *baserel,
505 Oid foreigntableid)
506 {
507 FileFdwPlanState *fdw_private;
508
509 /*
510 * Fetch options. We only need filename (or program) at this point, but
511 * we might as well get everything and not need to re-fetch it later in
512 * planning.
513 */
514 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
515 fileGetOptions(foreigntableid,
516 &fdw_private->filename,
517 &fdw_private->is_program,
518 &fdw_private->options);
519 baserel->fdw_private = (void *) fdw_private;
520
521 /* Estimate relation size */
522 estimate_size(root, baserel, fdw_private);
523 }
524
525 /*
526 * fileGetForeignPaths
527 * Create possible access paths for a scan on the foreign table
528 *
529 * Currently we don't support any push-down feature, so there is only one
530 * possible access path, which simply returns all records in the order in
531 * the data file.
532 */
533 static void
fileGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)534 fileGetForeignPaths(PlannerInfo *root,
535 RelOptInfo *baserel,
536 Oid foreigntableid)
537 {
538 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
539 Cost startup_cost;
540 Cost total_cost;
541 List *columns;
542 List *coptions = NIL;
543
544 /* Decide whether to selectively perform binary conversion */
545 if (check_selective_binary_conversion(baserel,
546 foreigntableid,
547 &columns))
548 coptions = list_make1(makeDefElem("convert_selectively",
549 (Node *) columns, -1));
550
551 /* Estimate costs */
552 estimate_costs(root, baserel, fdw_private,
553 &startup_cost, &total_cost);
554
555 /*
556 * Create a ForeignPath node and add it as only possible path. We use the
557 * fdw_private list of the path to carry the convert_selectively option;
558 * it will be propagated into the fdw_private list of the Plan node.
559 *
560 * We don't support pushing join clauses into the quals of this path, but
561 * it could still have required parameterization due to LATERAL refs in
562 * its tlist.
563 */
564 add_path(baserel, (Path *)
565 create_foreignscan_path(root, baserel,
566 NULL, /* default pathtarget */
567 baserel->rows,
568 startup_cost,
569 total_cost,
570 NIL, /* no pathkeys */
571 baserel->lateral_relids,
572 NULL, /* no extra plan */
573 coptions));
574
575 /*
576 * If data file was sorted, and we knew it somehow, we could insert
577 * appropriate pathkeys into the ForeignPath node to tell the planner
578 * that.
579 */
580 }
581
582 /*
583 * fileGetForeignPlan
584 * Create a ForeignScan plan node for scanning the foreign table
585 */
586 static ForeignScan *
fileGetForeignPlan(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)587 fileGetForeignPlan(PlannerInfo *root,
588 RelOptInfo *baserel,
589 Oid foreigntableid,
590 ForeignPath *best_path,
591 List *tlist,
592 List *scan_clauses,
593 Plan *outer_plan)
594 {
595 Index scan_relid = baserel->relid;
596
597 /*
598 * We have no native ability to evaluate restriction clauses, so we just
599 * put all the scan_clauses into the plan node's qual list for the
600 * executor to check. So all we have to do here is strip RestrictInfo
601 * nodes from the clauses and ignore pseudoconstants (which will be
602 * handled elsewhere).
603 */
604 scan_clauses = extract_actual_clauses(scan_clauses, false);
605
606 /* Create the ForeignScan node */
607 return make_foreignscan(tlist,
608 scan_clauses,
609 scan_relid,
610 NIL, /* no expressions to evaluate */
611 best_path->fdw_private,
612 NIL, /* no custom tlist */
613 NIL, /* no remote quals */
614 outer_plan);
615 }
616
617 /*
618 * fileExplainForeignScan
619 * Produce extra output for EXPLAIN
620 */
621 static void
fileExplainForeignScan(ForeignScanState * node,ExplainState * es)622 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
623 {
624 char *filename;
625 bool is_program;
626 List *options;
627
628 /* Fetch options --- we only need filename and is_program at this point */
629 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
630 &filename, &is_program, &options);
631
632 if (is_program)
633 ExplainPropertyText("Foreign Program", filename, es);
634 else
635 ExplainPropertyText("Foreign File", filename, es);
636
637 /* Suppress file size if we're not showing cost details */
638 if (es->costs)
639 {
640 struct stat stat_buf;
641
642 if (!is_program &&
643 stat(filename, &stat_buf) == 0)
644 ExplainPropertyInteger("Foreign File Size", "b",
645 (int64) stat_buf.st_size, es);
646 }
647 }
648
649 /*
650 * fileBeginForeignScan
651 * Initiate access to the file by creating CopyState
652 */
653 static void
fileBeginForeignScan(ForeignScanState * node,int eflags)654 fileBeginForeignScan(ForeignScanState *node, int eflags)
655 {
656 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
657 char *filename;
658 bool is_program;
659 List *options;
660 CopyState cstate;
661 FileFdwExecutionState *festate;
662
663 /*
664 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
665 */
666 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
667 return;
668
669 /* Fetch options of foreign table */
670 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
671 &filename, &is_program, &options);
672
673 /* Add any options from the plan (currently only convert_selectively) */
674 options = list_concat(options, plan->fdw_private);
675
676 /*
677 * Create CopyState from FDW options. We always acquire all columns, so
678 * as to match the expected ScanTupleSlot signature.
679 */
680 cstate = BeginCopyFrom(NULL,
681 node->ss.ss_currentRelation,
682 filename,
683 is_program,
684 NULL,
685 NIL,
686 options);
687
688 /*
689 * Save state in node->fdw_state. We must save enough information to call
690 * BeginCopyFrom() again.
691 */
692 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
693 festate->filename = filename;
694 festate->is_program = is_program;
695 festate->options = options;
696 festate->cstate = cstate;
697
698 node->fdw_state = (void *) festate;
699 }
700
701 /*
702 * fileIterateForeignScan
703 * Read next record from the data file and store it into the
704 * ScanTupleSlot as a virtual tuple
705 */
706 static TupleTableSlot *
fileIterateForeignScan(ForeignScanState * node)707 fileIterateForeignScan(ForeignScanState *node)
708 {
709 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
710 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
711 bool found;
712 ErrorContextCallback errcallback;
713
714 /* Set up callback to identify error line number. */
715 errcallback.callback = CopyFromErrorCallback;
716 errcallback.arg = (void *) festate->cstate;
717 errcallback.previous = error_context_stack;
718 error_context_stack = &errcallback;
719
720 /*
721 * The protocol for loading a virtual tuple into a slot is first
722 * ExecClearTuple, then fill the values/isnull arrays, then
723 * ExecStoreVirtualTuple. If we don't find another row in the file, we
724 * just skip the last step, leaving the slot empty as required.
725 *
726 * We can pass ExprContext = NULL because we read all columns from the
727 * file, so no need to evaluate default expressions.
728 */
729 ExecClearTuple(slot);
730 found = NextCopyFrom(festate->cstate, NULL,
731 slot->tts_values, slot->tts_isnull);
732 if (found)
733 ExecStoreVirtualTuple(slot);
734
735 /* Remove error callback. */
736 error_context_stack = errcallback.previous;
737
738 return slot;
739 }
740
741 /*
742 * fileReScanForeignScan
743 * Rescan table, possibly with new parameters
744 */
745 static void
fileReScanForeignScan(ForeignScanState * node)746 fileReScanForeignScan(ForeignScanState *node)
747 {
748 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
749
750 EndCopyFrom(festate->cstate);
751
752 festate->cstate = BeginCopyFrom(NULL,
753 node->ss.ss_currentRelation,
754 festate->filename,
755 festate->is_program,
756 NULL,
757 NIL,
758 festate->options);
759 }
760
761 /*
762 * fileEndForeignScan
763 * Finish scanning foreign table and dispose objects used for this scan
764 */
765 static void
fileEndForeignScan(ForeignScanState * node)766 fileEndForeignScan(ForeignScanState *node)
767 {
768 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
769
770 /* if festate is NULL, we are in EXPLAIN; nothing to do */
771 if (festate)
772 EndCopyFrom(festate->cstate);
773 }
774
775 /*
776 * fileAnalyzeForeignTable
777 * Test whether analyzing this foreign table is supported
778 */
779 static bool
fileAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)780 fileAnalyzeForeignTable(Relation relation,
781 AcquireSampleRowsFunc *func,
782 BlockNumber *totalpages)
783 {
784 char *filename;
785 bool is_program;
786 List *options;
787 struct stat stat_buf;
788
789 /* Fetch options of foreign table */
790 fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
791
792 /*
793 * If this is a program instead of a file, just return false to skip
794 * analyzing the table. We could run the program and collect stats on
795 * whatever it currently returns, but it seems likely that in such cases
796 * the output would be too volatile for the stats to be useful. Maybe
797 * there should be an option to enable doing this?
798 */
799 if (is_program)
800 return false;
801
802 /*
803 * Get size of the file. (XXX if we fail here, would it be better to just
804 * return false to skip analyzing the table?)
805 */
806 if (stat(filename, &stat_buf) < 0)
807 ereport(ERROR,
808 (errcode_for_file_access(),
809 errmsg("could not stat file \"%s\": %m",
810 filename)));
811
812 /*
813 * Convert size to pages. Must return at least 1 so that we can tell
814 * later on that pg_class.relpages is not default.
815 */
816 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
817 if (*totalpages < 1)
818 *totalpages = 1;
819
820 *func = file_acquire_sample_rows;
821
822 return true;
823 }
824
825 /*
826 * fileIsForeignScanParallelSafe
827 * Reading a file, or external program, in a parallel worker should work
828 * just the same as reading it in the leader, so mark scans safe.
829 */
830 static bool
fileIsForeignScanParallelSafe(PlannerInfo * root,RelOptInfo * rel,RangeTblEntry * rte)831 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
832 RangeTblEntry *rte)
833 {
834 return true;
835 }
836
837 /*
838 * check_selective_binary_conversion
839 *
840 * Check to see if it's useful to convert only a subset of the file's columns
841 * to binary. If so, construct a list of the column names to be converted,
842 * return that at *columns, and return true. (Note that it's possible to
843 * determine that no columns need be converted, for instance with a COUNT(*)
844 * query. So we can't use returning a NIL list to indicate failure.)
845 */
846 static bool
check_selective_binary_conversion(RelOptInfo * baserel,Oid foreigntableid,List ** columns)847 check_selective_binary_conversion(RelOptInfo *baserel,
848 Oid foreigntableid,
849 List **columns)
850 {
851 ForeignTable *table;
852 ListCell *lc;
853 Relation rel;
854 TupleDesc tupleDesc;
855 AttrNumber attnum;
856 Bitmapset *attrs_used = NULL;
857 bool has_wholerow = false;
858 int numattrs;
859 int i;
860
861 *columns = NIL; /* default result */
862
863 /*
864 * Check format of the file. If binary format, this is irrelevant.
865 */
866 table = GetForeignTable(foreigntableid);
867 foreach(lc, table->options)
868 {
869 DefElem *def = (DefElem *) lfirst(lc);
870
871 if (strcmp(def->defname, "format") == 0)
872 {
873 char *format = defGetString(def);
874
875 if (strcmp(format, "binary") == 0)
876 return false;
877 break;
878 }
879 }
880
881 /* Collect all the attributes needed for joins or final output. */
882 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
883 &attrs_used);
884
885 /* Add all the attributes used by restriction clauses. */
886 foreach(lc, baserel->baserestrictinfo)
887 {
888 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
889
890 pull_varattnos((Node *) rinfo->clause, baserel->relid,
891 &attrs_used);
892 }
893
894 /* Convert attribute numbers to column names. */
895 rel = table_open(foreigntableid, AccessShareLock);
896 tupleDesc = RelationGetDescr(rel);
897
898 while ((attnum = bms_first_member(attrs_used)) >= 0)
899 {
900 /* Adjust for system attributes. */
901 attnum += FirstLowInvalidHeapAttributeNumber;
902
903 if (attnum == 0)
904 {
905 has_wholerow = true;
906 break;
907 }
908
909 /* Ignore system attributes. */
910 if (attnum < 0)
911 continue;
912
913 /* Get user attributes. */
914 if (attnum > 0)
915 {
916 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
917 char *attname = NameStr(attr->attname);
918
919 /* Skip dropped attributes (probably shouldn't see any here). */
920 if (attr->attisdropped)
921 continue;
922
923 /*
924 * Skip generated columns (COPY won't accept them in the column
925 * list)
926 */
927 if (attr->attgenerated)
928 continue;
929 *columns = lappend(*columns, makeString(pstrdup(attname)));
930 }
931 }
932
933 /* Count non-dropped user attributes while we have the tupdesc. */
934 numattrs = 0;
935 for (i = 0; i < tupleDesc->natts; i++)
936 {
937 Form_pg_attribute attr = TupleDescAttr(tupleDesc, i);
938
939 if (attr->attisdropped)
940 continue;
941 numattrs++;
942 }
943
944 table_close(rel, AccessShareLock);
945
946 /* If there's a whole-row reference, fail: we need all the columns. */
947 if (has_wholerow)
948 {
949 *columns = NIL;
950 return false;
951 }
952
953 /* If all the user attributes are needed, fail. */
954 if (numattrs == list_length(*columns))
955 {
956 *columns = NIL;
957 return false;
958 }
959
960 return true;
961 }
962
963 /*
964 * Estimate size of a foreign table.
965 *
966 * The main result is returned in baserel->rows. We also set
967 * fdw_private->pages and fdw_private->ntuples for later use in the cost
968 * calculation.
969 */
970 static void
estimate_size(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private)971 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
972 FileFdwPlanState *fdw_private)
973 {
974 struct stat stat_buf;
975 BlockNumber pages;
976 double ntuples;
977 double nrows;
978
979 /*
980 * Get size of the file. It might not be there at plan time, though, in
981 * which case we have to use a default estimate. We also have to fall
982 * back to the default if using a program as the input.
983 */
984 if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
985 stat_buf.st_size = 10 * BLCKSZ;
986
987 /*
988 * Convert size to pages for use in I/O cost estimate later.
989 */
990 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
991 if (pages < 1)
992 pages = 1;
993 fdw_private->pages = pages;
994
995 /*
996 * Estimate the number of tuples in the file.
997 */
998 if (baserel->pages > 0)
999 {
1000 /*
1001 * We have # of pages and # of tuples from pg_class (that is, from a
1002 * previous ANALYZE), so compute a tuples-per-page estimate and scale
1003 * that by the current file size.
1004 */
1005 double density;
1006
1007 density = baserel->tuples / (double) baserel->pages;
1008 ntuples = clamp_row_est(density * (double) pages);
1009 }
1010 else
1011 {
1012 /*
1013 * Otherwise we have to fake it. We back into this estimate using the
1014 * planner's idea of the relation width; which is bogus if not all
1015 * columns are being read, not to mention that the text representation
1016 * of a row probably isn't the same size as its internal
1017 * representation. Possibly we could do something better, but the
1018 * real answer to anyone who complains is "ANALYZE" ...
1019 */
1020 int tuple_width;
1021
1022 tuple_width = MAXALIGN(baserel->reltarget->width) +
1023 MAXALIGN(SizeofHeapTupleHeader);
1024 ntuples = clamp_row_est((double) stat_buf.st_size /
1025 (double) tuple_width);
1026 }
1027 fdw_private->ntuples = ntuples;
1028
1029 /*
1030 * Now estimate the number of rows returned by the scan after applying the
1031 * baserestrictinfo quals.
1032 */
1033 nrows = ntuples *
1034 clauselist_selectivity(root,
1035 baserel->baserestrictinfo,
1036 0,
1037 JOIN_INNER,
1038 NULL);
1039
1040 nrows = clamp_row_est(nrows);
1041
1042 /* Save the output-rows estimate for the planner */
1043 baserel->rows = nrows;
1044 }
1045
1046 /*
1047 * Estimate costs of scanning a foreign table.
1048 *
1049 * Results are returned in *startup_cost and *total_cost.
1050 */
1051 static void
estimate_costs(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private,Cost * startup_cost,Cost * total_cost)1052 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1053 FileFdwPlanState *fdw_private,
1054 Cost *startup_cost, Cost *total_cost)
1055 {
1056 BlockNumber pages = fdw_private->pages;
1057 double ntuples = fdw_private->ntuples;
1058 Cost run_cost = 0;
1059 Cost cpu_per_tuple;
1060
1061 /*
1062 * We estimate costs almost the same way as cost_seqscan(), thus assuming
1063 * that I/O costs are equivalent to a regular table file of the same size.
1064 * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1065 * for the cost of parsing records.
1066 *
1067 * In the case of a program source, this calculation is even more divorced
1068 * from reality, but we have no good alternative; and it's not clear that
1069 * the numbers we produce here matter much anyway, since there's only one
1070 * access path for the rel.
1071 */
1072 run_cost += seq_page_cost * pages;
1073
1074 *startup_cost = baserel->baserestrictcost.startup;
1075 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1076 run_cost += cpu_per_tuple * ntuples;
1077 *total_cost = *startup_cost + run_cost;
1078 }
1079
1080 /*
1081 * file_acquire_sample_rows -- acquire a random sample of rows from the table
1082 *
1083 * Selected rows are returned in the caller-allocated array rows[],
1084 * which must have at least targrows entries.
1085 * The actual number of rows selected is returned as the function result.
1086 * We also count the total number of rows in the file and return it into
1087 * *totalrows. Note that *totaldeadrows is always set to 0.
1088 *
1089 * Note that the returned list of rows is not always in order by physical
1090 * position in the file. Therefore, correlation estimates derived later
1091 * may be meaningless, but it's OK because we don't use the estimates
1092 * currently (the planner only pays attention to correlation for indexscans).
1093 */
1094 static int
file_acquire_sample_rows(Relation onerel,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)1095 file_acquire_sample_rows(Relation onerel, int elevel,
1096 HeapTuple *rows, int targrows,
1097 double *totalrows, double *totaldeadrows)
1098 {
1099 int numrows = 0;
1100 double rowstoskip = -1; /* -1 means not set yet */
1101 ReservoirStateData rstate;
1102 TupleDesc tupDesc;
1103 Datum *values;
1104 bool *nulls;
1105 bool found;
1106 char *filename;
1107 bool is_program;
1108 List *options;
1109 CopyState cstate;
1110 ErrorContextCallback errcallback;
1111 MemoryContext oldcontext = CurrentMemoryContext;
1112 MemoryContext tupcontext;
1113
1114 Assert(onerel);
1115 Assert(targrows > 0);
1116
1117 tupDesc = RelationGetDescr(onerel);
1118 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1119 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1120
1121 /* Fetch options of foreign table */
1122 fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1123
1124 /*
1125 * Create CopyState from FDW options.
1126 */
1127 cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
1128 options);
1129
1130 /*
1131 * Use per-tuple memory context to prevent leak of memory used to read
1132 * rows from the file with Copy routines.
1133 */
1134 tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1135 "file_fdw temporary context",
1136 ALLOCSET_DEFAULT_SIZES);
1137
1138 /* Prepare for sampling rows */
1139 reservoir_init_selection_state(&rstate, targrows);
1140
1141 /* Set up callback to identify error line number. */
1142 errcallback.callback = CopyFromErrorCallback;
1143 errcallback.arg = (void *) cstate;
1144 errcallback.previous = error_context_stack;
1145 error_context_stack = &errcallback;
1146
1147 *totalrows = 0;
1148 *totaldeadrows = 0;
1149 for (;;)
1150 {
1151 /* Check for user-requested abort or sleep */
1152 vacuum_delay_point();
1153
1154 /* Fetch next row */
1155 MemoryContextReset(tupcontext);
1156 MemoryContextSwitchTo(tupcontext);
1157
1158 found = NextCopyFrom(cstate, NULL, values, nulls);
1159
1160 MemoryContextSwitchTo(oldcontext);
1161
1162 if (!found)
1163 break;
1164
1165 /*
1166 * The first targrows sample rows are simply copied into the
1167 * reservoir. Then we start replacing tuples in the sample until we
1168 * reach the end of the relation. This algorithm is from Jeff Vitter's
1169 * paper (see more info in commands/analyze.c).
1170 */
1171 if (numrows < targrows)
1172 {
1173 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1174 }
1175 else
1176 {
1177 /*
1178 * t in Vitter's paper is the number of records already processed.
1179 * If we need to compute a new S value, we must use the
1180 * not-yet-incremented value of totalrows as t.
1181 */
1182 if (rowstoskip < 0)
1183 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1184
1185 if (rowstoskip <= 0)
1186 {
1187 /*
1188 * Found a suitable tuple, so save it, replacing one old tuple
1189 * at random
1190 */
1191 int k = (int) (targrows * sampler_random_fract(rstate.randstate));
1192
1193 Assert(k >= 0 && k < targrows);
1194 heap_freetuple(rows[k]);
1195 rows[k] = heap_form_tuple(tupDesc, values, nulls);
1196 }
1197
1198 rowstoskip -= 1;
1199 }
1200
1201 *totalrows += 1;
1202 }
1203
1204 /* Remove error callback. */
1205 error_context_stack = errcallback.previous;
1206
1207 /* Clean up. */
1208 MemoryContextDelete(tupcontext);
1209
1210 EndCopyFrom(cstate);
1211
1212 pfree(values);
1213 pfree(nulls);
1214
1215 /*
1216 * Emit some interesting relation info
1217 */
1218 ereport(elevel,
1219 (errmsg("\"%s\": file contains %.0f rows; "
1220 "%d rows in sample",
1221 RelationGetRelationName(onerel),
1222 *totalrows, numrows)));
1223
1224 return numrows;
1225 }
1226