1 /*-------------------------------------------------------------------------
2 *
3 * file_fdw.c
4 * foreign-data wrapper for server-side flat files (or programs).
5 *
6 * Copyright (c) 2010-2020, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/file_fdw/file_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include <sys/stat.h>
16 #include <unistd.h>
17
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "access/table.h"
22 #include "catalog/pg_authid.h"
23 #include "catalog/pg_foreign_table.h"
24 #include "commands/copy.h"
25 #include "commands/defrem.h"
26 #include "commands/explain.h"
27 #include "commands/vacuum.h"
28 #include "foreign/fdwapi.h"
29 #include "foreign/foreign.h"
30 #include "miscadmin.h"
31 #include "nodes/makefuncs.h"
32 #include "optimizer/optimizer.h"
33 #include "optimizer/pathnode.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "utils/acl.h"
37 #include "utils/memutils.h"
38 #include "utils/rel.h"
39 #include "utils/sampling.h"
40
41 PG_MODULE_MAGIC;
42
43 /*
44 * Describes the valid options for objects that use this wrapper.
45 */
46 struct FileFdwOption
47 {
48 const char *optname;
49 Oid optcontext; /* Oid of catalog in which option may appear */
50 };
51
52 /*
53 * Valid options for file_fdw.
54 * These options are based on the options for the COPY FROM command.
55 * But note that force_not_null and force_null are handled as boolean options
56 * attached to a column, not as table options.
57 *
58 * Note: If you are adding new option for user mapping, you need to modify
59 * fileGetOptions(), which currently doesn't bother to look at user mappings.
60 */
61 static const struct FileFdwOption valid_options[] = {
62 /* Data source options */
63 {"filename", ForeignTableRelationId},
64 {"program", ForeignTableRelationId},
65
66 /* Format options */
67 /* oids option is not supported */
68 {"format", ForeignTableRelationId},
69 {"header", ForeignTableRelationId},
70 {"delimiter", ForeignTableRelationId},
71 {"quote", ForeignTableRelationId},
72 {"escape", ForeignTableRelationId},
73 {"null", ForeignTableRelationId},
74 {"encoding", ForeignTableRelationId},
75 {"force_not_null", AttributeRelationId},
76 {"force_null", AttributeRelationId},
77
78 /*
79 * force_quote is not supported by file_fdw because it's for COPY TO.
80 */
81
82 /* Sentinel */
83 {NULL, InvalidOid}
84 };
85
86 /*
87 * FDW-specific information for RelOptInfo.fdw_private.
88 */
89 typedef struct FileFdwPlanState
90 {
91 char *filename; /* file or program to read from */
92 bool is_program; /* true if filename represents an OS command */
93 List *options; /* merged COPY options, excluding filename and
94 * is_program */
95 BlockNumber pages; /* estimate of file's physical size */
96 double ntuples; /* estimate of number of data rows */
97 } FileFdwPlanState;
98
99 /*
100 * FDW-specific information for ForeignScanState.fdw_state.
101 */
102 typedef struct FileFdwExecutionState
103 {
104 char *filename; /* file or program to read from */
105 bool is_program; /* true if filename represents an OS command */
106 List *options; /* merged COPY options, excluding filename and
107 * is_program */
108 CopyState cstate; /* COPY execution state */
109 } FileFdwExecutionState;
110
111 /*
112 * SQL functions
113 */
114 PG_FUNCTION_INFO_V1(file_fdw_handler);
115 PG_FUNCTION_INFO_V1(file_fdw_validator);
116
117 /*
118 * FDW callback routines
119 */
120 static void fileGetForeignRelSize(PlannerInfo *root,
121 RelOptInfo *baserel,
122 Oid foreigntableid);
123 static void fileGetForeignPaths(PlannerInfo *root,
124 RelOptInfo *baserel,
125 Oid foreigntableid);
126 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
127 RelOptInfo *baserel,
128 Oid foreigntableid,
129 ForeignPath *best_path,
130 List *tlist,
131 List *scan_clauses,
132 Plan *outer_plan);
133 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
134 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
135 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
136 static void fileReScanForeignScan(ForeignScanState *node);
137 static void fileEndForeignScan(ForeignScanState *node);
138 static bool fileAnalyzeForeignTable(Relation relation,
139 AcquireSampleRowsFunc *func,
140 BlockNumber *totalpages);
141 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
142 RangeTblEntry *rte);
143
144 /*
145 * Helper functions
146 */
147 static bool is_valid_option(const char *option, Oid context);
148 static void fileGetOptions(Oid foreigntableid,
149 char **filename,
150 bool *is_program,
151 List **other_options);
152 static List *get_file_fdw_attribute_options(Oid relid);
153 static bool check_selective_binary_conversion(RelOptInfo *baserel,
154 Oid foreigntableid,
155 List **columns);
156 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
157 FileFdwPlanState *fdw_private);
158 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
159 FileFdwPlanState *fdw_private,
160 Cost *startup_cost, Cost *total_cost);
161 static int file_acquire_sample_rows(Relation onerel, int elevel,
162 HeapTuple *rows, int targrows,
163 double *totalrows, double *totaldeadrows);
164
165
166 /*
167 * Foreign-data wrapper handler function: return a struct with pointers
168 * to my callback routines.
169 */
170 Datum
file_fdw_handler(PG_FUNCTION_ARGS)171 file_fdw_handler(PG_FUNCTION_ARGS)
172 {
173 FdwRoutine *fdwroutine = makeNode(FdwRoutine);
174
175 fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
176 fdwroutine->GetForeignPaths = fileGetForeignPaths;
177 fdwroutine->GetForeignPlan = fileGetForeignPlan;
178 fdwroutine->ExplainForeignScan = fileExplainForeignScan;
179 fdwroutine->BeginForeignScan = fileBeginForeignScan;
180 fdwroutine->IterateForeignScan = fileIterateForeignScan;
181 fdwroutine->ReScanForeignScan = fileReScanForeignScan;
182 fdwroutine->EndForeignScan = fileEndForeignScan;
183 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
184 fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
185
186 PG_RETURN_POINTER(fdwroutine);
187 }
188
189 /*
190 * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
191 * USER MAPPING or FOREIGN TABLE that uses file_fdw.
192 *
193 * Raise an ERROR if the option or its value is considered invalid.
194 */
195 Datum
file_fdw_validator(PG_FUNCTION_ARGS)196 file_fdw_validator(PG_FUNCTION_ARGS)
197 {
198 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
199 Oid catalog = PG_GETARG_OID(1);
200 char *filename = NULL;
201 DefElem *force_not_null = NULL;
202 DefElem *force_null = NULL;
203 List *other_options = NIL;
204 ListCell *cell;
205
206 /*
207 * Check that only options supported by file_fdw, and allowed for the
208 * current object type, are given.
209 */
210 foreach(cell, options_list)
211 {
212 DefElem *def = (DefElem *) lfirst(cell);
213
214 if (!is_valid_option(def->defname, catalog))
215 {
216 const struct FileFdwOption *opt;
217 StringInfoData buf;
218
219 /*
220 * Unknown option specified, complain about it. Provide a hint
221 * with list of valid options for the object.
222 */
223 initStringInfo(&buf);
224 for (opt = valid_options; opt->optname; opt++)
225 {
226 if (catalog == opt->optcontext)
227 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
228 opt->optname);
229 }
230
231 ereport(ERROR,
232 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
233 errmsg("invalid option \"%s\"", def->defname),
234 buf.len > 0
235 ? errhint("Valid options in this context are: %s",
236 buf.data)
237 : errhint("There are no valid options in this context.")));
238 }
239
240 /*
241 * Separate out filename, program, and column-specific options, since
242 * ProcessCopyOptions won't accept them.
243 */
244 if (strcmp(def->defname, "filename") == 0 ||
245 strcmp(def->defname, "program") == 0)
246 {
247 if (filename)
248 ereport(ERROR,
249 (errcode(ERRCODE_SYNTAX_ERROR),
250 errmsg("conflicting or redundant options")));
251
252 /*
253 * Check permissions for changing which file or program is used by
254 * the file_fdw.
255 *
256 * Only members of the role 'pg_read_server_files' are allowed to
257 * set the 'filename' option of a file_fdw foreign table, while
258 * only members of the role 'pg_execute_server_program' are
259 * allowed to set the 'program' option. This is because we don't
260 * want regular users to be able to control which file gets read
261 * or which program gets executed.
262 *
263 * Putting this sort of permissions check in a validator is a bit
264 * of a crock, but there doesn't seem to be any other place that
265 * can enforce the check more cleanly.
266 *
267 * Note that the valid_options[] array disallows setting filename
268 * and program at any options level other than foreign table ---
269 * otherwise there'd still be a security hole.
270 */
271 if (strcmp(def->defname, "filename") == 0 &&
272 !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
273 ereport(ERROR,
274 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
275 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table")));
276
277 if (strcmp(def->defname, "program") == 0 &&
278 !is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
279 ereport(ERROR,
280 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
281 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table")));
282
283 filename = defGetString(def);
284 }
285
286 /*
287 * force_not_null is a boolean option; after validation we can discard
288 * it - it will be retrieved later in get_file_fdw_attribute_options()
289 */
290 else if (strcmp(def->defname, "force_not_null") == 0)
291 {
292 if (force_not_null)
293 ereport(ERROR,
294 (errcode(ERRCODE_SYNTAX_ERROR),
295 errmsg("conflicting or redundant options"),
296 errhint("Option \"force_not_null\" supplied more than once for a column.")));
297 force_not_null = def;
298 /* Don't care what the value is, as long as it's a legal boolean */
299 (void) defGetBoolean(def);
300 }
301 /* See comments for force_not_null above */
302 else if (strcmp(def->defname, "force_null") == 0)
303 {
304 if (force_null)
305 ereport(ERROR,
306 (errcode(ERRCODE_SYNTAX_ERROR),
307 errmsg("conflicting or redundant options"),
308 errhint("Option \"force_null\" supplied more than once for a column.")));
309 force_null = def;
310 (void) defGetBoolean(def);
311 }
312 else
313 other_options = lappend(other_options, def);
314 }
315
316 /*
317 * Now apply the core COPY code's validation logic for more checks.
318 */
319 ProcessCopyOptions(NULL, NULL, true, other_options);
320
321 /*
322 * Either filename or program option is required for file_fdw foreign
323 * tables.
324 */
325 if (catalog == ForeignTableRelationId && filename == NULL)
326 ereport(ERROR,
327 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
328 errmsg("either filename or program is required for file_fdw foreign tables")));
329
330 PG_RETURN_VOID();
331 }
332
333 /*
334 * Check if the provided option is one of the valid options.
335 * context is the Oid of the catalog holding the object the option is for.
336 */
337 static bool
is_valid_option(const char * option,Oid context)338 is_valid_option(const char *option, Oid context)
339 {
340 const struct FileFdwOption *opt;
341
342 for (opt = valid_options; opt->optname; opt++)
343 {
344 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
345 return true;
346 }
347 return false;
348 }
349
350 /*
351 * Fetch the options for a file_fdw foreign table.
352 *
353 * We have to separate out filename/program from the other options because
354 * those must not appear in the options list passed to the core COPY code.
355 */
356 static void
fileGetOptions(Oid foreigntableid,char ** filename,bool * is_program,List ** other_options)357 fileGetOptions(Oid foreigntableid,
358 char **filename, bool *is_program, List **other_options)
359 {
360 ForeignTable *table;
361 ForeignServer *server;
362 ForeignDataWrapper *wrapper;
363 List *options;
364 ListCell *lc;
365
366 /*
367 * Extract options from FDW objects. We ignore user mappings because
368 * file_fdw doesn't have any options that can be specified there.
369 *
370 * (XXX Actually, given the current contents of valid_options[], there's
371 * no point in examining anything except the foreign table's own options.
372 * Simplify?)
373 */
374 table = GetForeignTable(foreigntableid);
375 server = GetForeignServer(table->serverid);
376 wrapper = GetForeignDataWrapper(server->fdwid);
377
378 options = NIL;
379 options = list_concat(options, wrapper->options);
380 options = list_concat(options, server->options);
381 options = list_concat(options, table->options);
382 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
383
384 /*
385 * Separate out the filename or program option (we assume there is only
386 * one).
387 */
388 *filename = NULL;
389 *is_program = false;
390 foreach(lc, options)
391 {
392 DefElem *def = (DefElem *) lfirst(lc);
393
394 if (strcmp(def->defname, "filename") == 0)
395 {
396 *filename = defGetString(def);
397 options = foreach_delete_current(options, lc);
398 break;
399 }
400 else if (strcmp(def->defname, "program") == 0)
401 {
402 *filename = defGetString(def);
403 *is_program = true;
404 options = foreach_delete_current(options, lc);
405 break;
406 }
407 }
408
409 /*
410 * The validator should have checked that filename or program was included
411 * in the options, but check again, just in case.
412 */
413 if (*filename == NULL)
414 elog(ERROR, "either filename or program is required for file_fdw foreign tables");
415
416 *other_options = options;
417 }
418
419 /*
420 * Retrieve per-column generic options from pg_attribute and construct a list
421 * of DefElems representing them.
422 *
423 * At the moment we only have "force_not_null", and "force_null",
424 * which should each be combined into a single DefElem listing all such
425 * columns, since that's what COPY expects.
426 */
427 static List *
get_file_fdw_attribute_options(Oid relid)428 get_file_fdw_attribute_options(Oid relid)
429 {
430 Relation rel;
431 TupleDesc tupleDesc;
432 AttrNumber natts;
433 AttrNumber attnum;
434 List *fnncolumns = NIL;
435 List *fncolumns = NIL;
436
437 List *options = NIL;
438
439 rel = table_open(relid, AccessShareLock);
440 tupleDesc = RelationGetDescr(rel);
441 natts = tupleDesc->natts;
442
443 /* Retrieve FDW options for all user-defined attributes. */
444 for (attnum = 1; attnum <= natts; attnum++)
445 {
446 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
447 List *options;
448 ListCell *lc;
449
450 /* Skip dropped attributes. */
451 if (attr->attisdropped)
452 continue;
453
454 options = GetForeignColumnOptions(relid, attnum);
455 foreach(lc, options)
456 {
457 DefElem *def = (DefElem *) lfirst(lc);
458
459 if (strcmp(def->defname, "force_not_null") == 0)
460 {
461 if (defGetBoolean(def))
462 {
463 char *attname = pstrdup(NameStr(attr->attname));
464
465 fnncolumns = lappend(fnncolumns, makeString(attname));
466 }
467 }
468 else if (strcmp(def->defname, "force_null") == 0)
469 {
470 if (defGetBoolean(def))
471 {
472 char *attname = pstrdup(NameStr(attr->attname));
473
474 fncolumns = lappend(fncolumns, makeString(attname));
475 }
476 }
477 /* maybe in future handle other options here */
478 }
479 }
480
481 table_close(rel, AccessShareLock);
482
483 /*
484 * Return DefElem only when some column(s) have force_not_null /
485 * force_null options set
486 */
487 if (fnncolumns != NIL)
488 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
489
490 if (fncolumns != NIL)
491 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
492
493 return options;
494 }
495
496 /*
497 * fileGetForeignRelSize
498 * Obtain relation size estimates for a foreign table
499 */
500 static void
fileGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)501 fileGetForeignRelSize(PlannerInfo *root,
502 RelOptInfo *baserel,
503 Oid foreigntableid)
504 {
505 FileFdwPlanState *fdw_private;
506
507 /*
508 * Fetch options. We only need filename (or program) at this point, but
509 * we might as well get everything and not need to re-fetch it later in
510 * planning.
511 */
512 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
513 fileGetOptions(foreigntableid,
514 &fdw_private->filename,
515 &fdw_private->is_program,
516 &fdw_private->options);
517 baserel->fdw_private = (void *) fdw_private;
518
519 /* Estimate relation size */
520 estimate_size(root, baserel, fdw_private);
521 }
522
523 /*
524 * fileGetForeignPaths
525 * Create possible access paths for a scan on the foreign table
526 *
527 * Currently we don't support any push-down feature, so there is only one
528 * possible access path, which simply returns all records in the order in
529 * the data file.
530 */
531 static void
fileGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)532 fileGetForeignPaths(PlannerInfo *root,
533 RelOptInfo *baserel,
534 Oid foreigntableid)
535 {
536 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
537 Cost startup_cost;
538 Cost total_cost;
539 List *columns;
540 List *coptions = NIL;
541
542 /* Decide whether to selectively perform binary conversion */
543 if (check_selective_binary_conversion(baserel,
544 foreigntableid,
545 &columns))
546 coptions = list_make1(makeDefElem("convert_selectively",
547 (Node *) columns, -1));
548
549 /* Estimate costs */
550 estimate_costs(root, baserel, fdw_private,
551 &startup_cost, &total_cost);
552
553 /*
554 * Create a ForeignPath node and add it as only possible path. We use the
555 * fdw_private list of the path to carry the convert_selectively option;
556 * it will be propagated into the fdw_private list of the Plan node.
557 *
558 * We don't support pushing join clauses into the quals of this path, but
559 * it could still have required parameterization due to LATERAL refs in
560 * its tlist.
561 */
562 add_path(baserel, (Path *)
563 create_foreignscan_path(root, baserel,
564 NULL, /* default pathtarget */
565 baserel->rows,
566 startup_cost,
567 total_cost,
568 NIL, /* no pathkeys */
569 baserel->lateral_relids,
570 NULL, /* no extra plan */
571 coptions));
572
573 /*
574 * If data file was sorted, and we knew it somehow, we could insert
575 * appropriate pathkeys into the ForeignPath node to tell the planner
576 * that.
577 */
578 }
579
580 /*
581 * fileGetForeignPlan
582 * Create a ForeignScan plan node for scanning the foreign table
583 */
584 static ForeignScan *
fileGetForeignPlan(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)585 fileGetForeignPlan(PlannerInfo *root,
586 RelOptInfo *baserel,
587 Oid foreigntableid,
588 ForeignPath *best_path,
589 List *tlist,
590 List *scan_clauses,
591 Plan *outer_plan)
592 {
593 Index scan_relid = baserel->relid;
594
595 /*
596 * We have no native ability to evaluate restriction clauses, so we just
597 * put all the scan_clauses into the plan node's qual list for the
598 * executor to check. So all we have to do here is strip RestrictInfo
599 * nodes from the clauses and ignore pseudoconstants (which will be
600 * handled elsewhere).
601 */
602 scan_clauses = extract_actual_clauses(scan_clauses, false);
603
604 /* Create the ForeignScan node */
605 return make_foreignscan(tlist,
606 scan_clauses,
607 scan_relid,
608 NIL, /* no expressions to evaluate */
609 best_path->fdw_private,
610 NIL, /* no custom tlist */
611 NIL, /* no remote quals */
612 outer_plan);
613 }
614
615 /*
616 * fileExplainForeignScan
617 * Produce extra output for EXPLAIN
618 */
619 static void
fileExplainForeignScan(ForeignScanState * node,ExplainState * es)620 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
621 {
622 char *filename;
623 bool is_program;
624 List *options;
625
626 /* Fetch options --- we only need filename and is_program at this point */
627 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
628 &filename, &is_program, &options);
629
630 if (is_program)
631 ExplainPropertyText("Foreign Program", filename, es);
632 else
633 ExplainPropertyText("Foreign File", filename, es);
634
635 /* Suppress file size if we're not showing cost details */
636 if (es->costs)
637 {
638 struct stat stat_buf;
639
640 if (!is_program &&
641 stat(filename, &stat_buf) == 0)
642 ExplainPropertyInteger("Foreign File Size", "b",
643 (int64) stat_buf.st_size, es);
644 }
645 }
646
647 /*
648 * fileBeginForeignScan
649 * Initiate access to the file by creating CopyState
650 */
651 static void
fileBeginForeignScan(ForeignScanState * node,int eflags)652 fileBeginForeignScan(ForeignScanState *node, int eflags)
653 {
654 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
655 char *filename;
656 bool is_program;
657 List *options;
658 CopyState cstate;
659 FileFdwExecutionState *festate;
660
661 /*
662 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
663 */
664 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
665 return;
666
667 /* Fetch options of foreign table */
668 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
669 &filename, &is_program, &options);
670
671 /* Add any options from the plan (currently only convert_selectively) */
672 options = list_concat(options, plan->fdw_private);
673
674 /*
675 * Create CopyState from FDW options. We always acquire all columns, so
676 * as to match the expected ScanTupleSlot signature.
677 */
678 cstate = BeginCopyFrom(NULL,
679 node->ss.ss_currentRelation,
680 filename,
681 is_program,
682 NULL,
683 NIL,
684 options);
685
686 /*
687 * Save state in node->fdw_state. We must save enough information to call
688 * BeginCopyFrom() again.
689 */
690 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
691 festate->filename = filename;
692 festate->is_program = is_program;
693 festate->options = options;
694 festate->cstate = cstate;
695
696 node->fdw_state = (void *) festate;
697 }
698
699 /*
700 * fileIterateForeignScan
701 * Read next record from the data file and store it into the
702 * ScanTupleSlot as a virtual tuple
703 */
704 static TupleTableSlot *
fileIterateForeignScan(ForeignScanState * node)705 fileIterateForeignScan(ForeignScanState *node)
706 {
707 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
708 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
709 bool found;
710 ErrorContextCallback errcallback;
711
712 /* Set up callback to identify error line number. */
713 errcallback.callback = CopyFromErrorCallback;
714 errcallback.arg = (void *) festate->cstate;
715 errcallback.previous = error_context_stack;
716 error_context_stack = &errcallback;
717
718 /*
719 * The protocol for loading a virtual tuple into a slot is first
720 * ExecClearTuple, then fill the values/isnull arrays, then
721 * ExecStoreVirtualTuple. If we don't find another row in the file, we
722 * just skip the last step, leaving the slot empty as required.
723 *
724 * We can pass ExprContext = NULL because we read all columns from the
725 * file, so no need to evaluate default expressions.
726 */
727 ExecClearTuple(slot);
728 found = NextCopyFrom(festate->cstate, NULL,
729 slot->tts_values, slot->tts_isnull);
730 if (found)
731 ExecStoreVirtualTuple(slot);
732
733 /* Remove error callback. */
734 error_context_stack = errcallback.previous;
735
736 return slot;
737 }
738
739 /*
740 * fileReScanForeignScan
741 * Rescan table, possibly with new parameters
742 */
743 static void
fileReScanForeignScan(ForeignScanState * node)744 fileReScanForeignScan(ForeignScanState *node)
745 {
746 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
747
748 EndCopyFrom(festate->cstate);
749
750 festate->cstate = BeginCopyFrom(NULL,
751 node->ss.ss_currentRelation,
752 festate->filename,
753 festate->is_program,
754 NULL,
755 NIL,
756 festate->options);
757 }
758
759 /*
760 * fileEndForeignScan
761 * Finish scanning foreign table and dispose objects used for this scan
762 */
763 static void
fileEndForeignScan(ForeignScanState * node)764 fileEndForeignScan(ForeignScanState *node)
765 {
766 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
767
768 /* if festate is NULL, we are in EXPLAIN; nothing to do */
769 if (festate)
770 EndCopyFrom(festate->cstate);
771 }
772
773 /*
774 * fileAnalyzeForeignTable
775 * Test whether analyzing this foreign table is supported
776 */
777 static bool
fileAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)778 fileAnalyzeForeignTable(Relation relation,
779 AcquireSampleRowsFunc *func,
780 BlockNumber *totalpages)
781 {
782 char *filename;
783 bool is_program;
784 List *options;
785 struct stat stat_buf;
786
787 /* Fetch options of foreign table */
788 fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
789
790 /*
791 * If this is a program instead of a file, just return false to skip
792 * analyzing the table. We could run the program and collect stats on
793 * whatever it currently returns, but it seems likely that in such cases
794 * the output would be too volatile for the stats to be useful. Maybe
795 * there should be an option to enable doing this?
796 */
797 if (is_program)
798 return false;
799
800 /*
801 * Get size of the file. (XXX if we fail here, would it be better to just
802 * return false to skip analyzing the table?)
803 */
804 if (stat(filename, &stat_buf) < 0)
805 ereport(ERROR,
806 (errcode_for_file_access(),
807 errmsg("could not stat file \"%s\": %m",
808 filename)));
809
810 /*
811 * Convert size to pages. Must return at least 1 so that we can tell
812 * later on that pg_class.relpages is not default.
813 */
814 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
815 if (*totalpages < 1)
816 *totalpages = 1;
817
818 *func = file_acquire_sample_rows;
819
820 return true;
821 }
822
823 /*
824 * fileIsForeignScanParallelSafe
825 * Reading a file, or external program, in a parallel worker should work
826 * just the same as reading it in the leader, so mark scans safe.
827 */
828 static bool
fileIsForeignScanParallelSafe(PlannerInfo * root,RelOptInfo * rel,RangeTblEntry * rte)829 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
830 RangeTblEntry *rte)
831 {
832 return true;
833 }
834
835 /*
836 * check_selective_binary_conversion
837 *
838 * Check to see if it's useful to convert only a subset of the file's columns
839 * to binary. If so, construct a list of the column names to be converted,
840 * return that at *columns, and return true. (Note that it's possible to
841 * determine that no columns need be converted, for instance with a COUNT(*)
842 * query. So we can't use returning a NIL list to indicate failure.)
843 */
844 static bool
check_selective_binary_conversion(RelOptInfo * baserel,Oid foreigntableid,List ** columns)845 check_selective_binary_conversion(RelOptInfo *baserel,
846 Oid foreigntableid,
847 List **columns)
848 {
849 ForeignTable *table;
850 ListCell *lc;
851 Relation rel;
852 TupleDesc tupleDesc;
853 AttrNumber attnum;
854 Bitmapset *attrs_used = NULL;
855 bool has_wholerow = false;
856 int numattrs;
857 int i;
858
859 *columns = NIL; /* default result */
860
861 /*
862 * Check format of the file. If binary format, this is irrelevant.
863 */
864 table = GetForeignTable(foreigntableid);
865 foreach(lc, table->options)
866 {
867 DefElem *def = (DefElem *) lfirst(lc);
868
869 if (strcmp(def->defname, "format") == 0)
870 {
871 char *format = defGetString(def);
872
873 if (strcmp(format, "binary") == 0)
874 return false;
875 break;
876 }
877 }
878
879 /* Collect all the attributes needed for joins or final output. */
880 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
881 &attrs_used);
882
883 /* Add all the attributes used by restriction clauses. */
884 foreach(lc, baserel->baserestrictinfo)
885 {
886 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
887
888 pull_varattnos((Node *) rinfo->clause, baserel->relid,
889 &attrs_used);
890 }
891
892 /* Convert attribute numbers to column names. */
893 rel = table_open(foreigntableid, AccessShareLock);
894 tupleDesc = RelationGetDescr(rel);
895
896 while ((attnum = bms_first_member(attrs_used)) >= 0)
897 {
898 /* Adjust for system attributes. */
899 attnum += FirstLowInvalidHeapAttributeNumber;
900
901 if (attnum == 0)
902 {
903 has_wholerow = true;
904 break;
905 }
906
907 /* Ignore system attributes. */
908 if (attnum < 0)
909 continue;
910
911 /* Get user attributes. */
912 if (attnum > 0)
913 {
914 Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
915 char *attname = NameStr(attr->attname);
916
917 /* Skip dropped attributes (probably shouldn't see any here). */
918 if (attr->attisdropped)
919 continue;
920
921 /*
922 * Skip generated columns (COPY won't accept them in the column
923 * list)
924 */
925 if (attr->attgenerated)
926 continue;
927 *columns = lappend(*columns, makeString(pstrdup(attname)));
928 }
929 }
930
931 /* Count non-dropped user attributes while we have the tupdesc. */
932 numattrs = 0;
933 for (i = 0; i < tupleDesc->natts; i++)
934 {
935 Form_pg_attribute attr = TupleDescAttr(tupleDesc, i);
936
937 if (attr->attisdropped)
938 continue;
939 numattrs++;
940 }
941
942 table_close(rel, AccessShareLock);
943
944 /* If there's a whole-row reference, fail: we need all the columns. */
945 if (has_wholerow)
946 {
947 *columns = NIL;
948 return false;
949 }
950
951 /* If all the user attributes are needed, fail. */
952 if (numattrs == list_length(*columns))
953 {
954 *columns = NIL;
955 return false;
956 }
957
958 return true;
959 }
960
961 /*
962 * Estimate size of a foreign table.
963 *
964 * The main result is returned in baserel->rows. We also set
965 * fdw_private->pages and fdw_private->ntuples for later use in the cost
966 * calculation.
967 */
968 static void
estimate_size(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private)969 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
970 FileFdwPlanState *fdw_private)
971 {
972 struct stat stat_buf;
973 BlockNumber pages;
974 double ntuples;
975 double nrows;
976
977 /*
978 * Get size of the file. It might not be there at plan time, though, in
979 * which case we have to use a default estimate. We also have to fall
980 * back to the default if using a program as the input.
981 */
982 if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
983 stat_buf.st_size = 10 * BLCKSZ;
984
985 /*
986 * Convert size to pages for use in I/O cost estimate later.
987 */
988 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
989 if (pages < 1)
990 pages = 1;
991 fdw_private->pages = pages;
992
993 /*
994 * Estimate the number of tuples in the file.
995 */
996 if (baserel->pages > 0)
997 {
998 /*
999 * We have # of pages and # of tuples from pg_class (that is, from a
1000 * previous ANALYZE), so compute a tuples-per-page estimate and scale
1001 * that by the current file size.
1002 */
1003 double density;
1004
1005 density = baserel->tuples / (double) baserel->pages;
1006 ntuples = clamp_row_est(density * (double) pages);
1007 }
1008 else
1009 {
1010 /*
1011 * Otherwise we have to fake it. We back into this estimate using the
1012 * planner's idea of the relation width; which is bogus if not all
1013 * columns are being read, not to mention that the text representation
1014 * of a row probably isn't the same size as its internal
1015 * representation. Possibly we could do something better, but the
1016 * real answer to anyone who complains is "ANALYZE" ...
1017 */
1018 int tuple_width;
1019
1020 tuple_width = MAXALIGN(baserel->reltarget->width) +
1021 MAXALIGN(SizeofHeapTupleHeader);
1022 ntuples = clamp_row_est((double) stat_buf.st_size /
1023 (double) tuple_width);
1024 }
1025 fdw_private->ntuples = ntuples;
1026
1027 /*
1028 * Now estimate the number of rows returned by the scan after applying the
1029 * baserestrictinfo quals.
1030 */
1031 nrows = ntuples *
1032 clauselist_selectivity(root,
1033 baserel->baserestrictinfo,
1034 0,
1035 JOIN_INNER,
1036 NULL);
1037
1038 nrows = clamp_row_est(nrows);
1039
1040 /* Save the output-rows estimate for the planner */
1041 baserel->rows = nrows;
1042 }
1043
1044 /*
1045 * Estimate costs of scanning a foreign table.
1046 *
1047 * Results are returned in *startup_cost and *total_cost.
1048 */
1049 static void
estimate_costs(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private,Cost * startup_cost,Cost * total_cost)1050 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1051 FileFdwPlanState *fdw_private,
1052 Cost *startup_cost, Cost *total_cost)
1053 {
1054 BlockNumber pages = fdw_private->pages;
1055 double ntuples = fdw_private->ntuples;
1056 Cost run_cost = 0;
1057 Cost cpu_per_tuple;
1058
1059 /*
1060 * We estimate costs almost the same way as cost_seqscan(), thus assuming
1061 * that I/O costs are equivalent to a regular table file of the same size.
1062 * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1063 * for the cost of parsing records.
1064 *
1065 * In the case of a program source, this calculation is even more divorced
1066 * from reality, but we have no good alternative; and it's not clear that
1067 * the numbers we produce here matter much anyway, since there's only one
1068 * access path for the rel.
1069 */
1070 run_cost += seq_page_cost * pages;
1071
1072 *startup_cost = baserel->baserestrictcost.startup;
1073 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1074 run_cost += cpu_per_tuple * ntuples;
1075 *total_cost = *startup_cost + run_cost;
1076 }
1077
1078 /*
1079 * file_acquire_sample_rows -- acquire a random sample of rows from the table
1080 *
1081 * Selected rows are returned in the caller-allocated array rows[],
1082 * which must have at least targrows entries.
1083 * The actual number of rows selected is returned as the function result.
1084 * We also count the total number of rows in the file and return it into
1085 * *totalrows. Note that *totaldeadrows is always set to 0.
1086 *
1087 * Note that the returned list of rows is not always in order by physical
1088 * position in the file. Therefore, correlation estimates derived later
1089 * may be meaningless, but it's OK because we don't use the estimates
1090 * currently (the planner only pays attention to correlation for indexscans).
1091 */
1092 static int
file_acquire_sample_rows(Relation onerel,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)1093 file_acquire_sample_rows(Relation onerel, int elevel,
1094 HeapTuple *rows, int targrows,
1095 double *totalrows, double *totaldeadrows)
1096 {
1097 int numrows = 0;
1098 double rowstoskip = -1; /* -1 means not set yet */
1099 ReservoirStateData rstate;
1100 TupleDesc tupDesc;
1101 Datum *values;
1102 bool *nulls;
1103 bool found;
1104 char *filename;
1105 bool is_program;
1106 List *options;
1107 CopyState cstate;
1108 ErrorContextCallback errcallback;
1109 MemoryContext oldcontext = CurrentMemoryContext;
1110 MemoryContext tupcontext;
1111
1112 Assert(onerel);
1113 Assert(targrows > 0);
1114
1115 tupDesc = RelationGetDescr(onerel);
1116 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1117 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1118
1119 /* Fetch options of foreign table */
1120 fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1121
1122 /*
1123 * Create CopyState from FDW options.
1124 */
1125 cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
1126 options);
1127
1128 /*
1129 * Use per-tuple memory context to prevent leak of memory used to read
1130 * rows from the file with Copy routines.
1131 */
1132 tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1133 "file_fdw temporary context",
1134 ALLOCSET_DEFAULT_SIZES);
1135
1136 /* Prepare for sampling rows */
1137 reservoir_init_selection_state(&rstate, targrows);
1138
1139 /* Set up callback to identify error line number. */
1140 errcallback.callback = CopyFromErrorCallback;
1141 errcallback.arg = (void *) cstate;
1142 errcallback.previous = error_context_stack;
1143 error_context_stack = &errcallback;
1144
1145 *totalrows = 0;
1146 *totaldeadrows = 0;
1147 for (;;)
1148 {
1149 /* Check for user-requested abort or sleep */
1150 vacuum_delay_point();
1151
1152 /* Fetch next row */
1153 MemoryContextReset(tupcontext);
1154 MemoryContextSwitchTo(tupcontext);
1155
1156 found = NextCopyFrom(cstate, NULL, values, nulls);
1157
1158 MemoryContextSwitchTo(oldcontext);
1159
1160 if (!found)
1161 break;
1162
1163 /*
1164 * The first targrows sample rows are simply copied into the
1165 * reservoir. Then we start replacing tuples in the sample until we
1166 * reach the end of the relation. This algorithm is from Jeff Vitter's
1167 * paper (see more info in commands/analyze.c).
1168 */
1169 if (numrows < targrows)
1170 {
1171 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1172 }
1173 else
1174 {
1175 /*
1176 * t in Vitter's paper is the number of records already processed.
1177 * If we need to compute a new S value, we must use the
1178 * not-yet-incremented value of totalrows as t.
1179 */
1180 if (rowstoskip < 0)
1181 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1182
1183 if (rowstoskip <= 0)
1184 {
1185 /*
1186 * Found a suitable tuple, so save it, replacing one old tuple
1187 * at random
1188 */
1189 int k = (int) (targrows * sampler_random_fract(rstate.randstate));
1190
1191 Assert(k >= 0 && k < targrows);
1192 heap_freetuple(rows[k]);
1193 rows[k] = heap_form_tuple(tupDesc, values, nulls);
1194 }
1195
1196 rowstoskip -= 1;
1197 }
1198
1199 *totalrows += 1;
1200 }
1201
1202 /* Remove error callback. */
1203 error_context_stack = errcallback.previous;
1204
1205 /* Clean up. */
1206 MemoryContextDelete(tupcontext);
1207
1208 EndCopyFrom(cstate);
1209
1210 pfree(values);
1211 pfree(nulls);
1212
1213 /*
1214 * Emit some interesting relation info
1215 */
1216 ereport(elevel,
1217 (errmsg("\"%s\": file contains %.0f rows; "
1218 "%d rows in sample",
1219 RelationGetRelationName(onerel),
1220 *totalrows, numrows)));
1221
1222 return numrows;
1223 }
1224