1 /*-------------------------------------------------------------------------
2  *
3  * file_fdw.c
4  *		  foreign-data wrapper for server-side flat files (or programs).
5  *
6  * Copyright (c) 2010-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/file_fdw/file_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <sys/stat.h>
16 #include <unistd.h>
17 
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "catalog/pg_foreign_table.h"
22 #include "commands/copy.h"
23 #include "commands/defrem.h"
24 #include "commands/explain.h"
25 #include "commands/vacuum.h"
26 #include "foreign/fdwapi.h"
27 #include "foreign/foreign.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/pathnode.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 #include "utils/sampling.h"
38 
39 PG_MODULE_MAGIC;
40 
41 /*
42  * Describes the valid options for objects that use this wrapper.
43  */
44 struct FileFdwOption
45 {
46 	const char *optname;
47 	Oid			optcontext;		/* Oid of catalog in which option may appear */
48 };
49 
50 /*
51  * Valid options for file_fdw.
52  * These options are based on the options for the COPY FROM command.
53  * But note that force_not_null and force_null are handled as boolean options
54  * attached to a column, not as table options.
55  *
56  * Note: If you are adding new option for user mapping, you need to modify
57  * fileGetOptions(), which currently doesn't bother to look at user mappings.
58  */
59 static const struct FileFdwOption valid_options[] = {
60 	/* Data source options */
61 	{"filename", ForeignTableRelationId},
62 	{"program", ForeignTableRelationId},
63 
64 	/* Format options */
65 	/* oids option is not supported */
66 	{"format", ForeignTableRelationId},
67 	{"header", ForeignTableRelationId},
68 	{"delimiter", ForeignTableRelationId},
69 	{"quote", ForeignTableRelationId},
70 	{"escape", ForeignTableRelationId},
71 	{"null", ForeignTableRelationId},
72 	{"encoding", ForeignTableRelationId},
73 	{"force_not_null", AttributeRelationId},
74 	{"force_null", AttributeRelationId},
75 
76 	/*
77 	 * force_quote is not supported by file_fdw because it's for COPY TO.
78 	 */
79 
80 	/* Sentinel */
81 	{NULL, InvalidOid}
82 };
83 
84 /*
85  * FDW-specific information for RelOptInfo.fdw_private.
86  */
87 typedef struct FileFdwPlanState
88 {
89 	char	   *filename;		/* file or program to read from */
90 	bool		is_program;		/* true if filename represents an OS command */
91 	List	   *options;		/* merged COPY options, excluding filename and
92 								 * is_program */
93 	BlockNumber pages;			/* estimate of file's physical size */
94 	double		ntuples;		/* estimate of number of data rows */
95 } FileFdwPlanState;
96 
97 /*
98  * FDW-specific information for ForeignScanState.fdw_state.
99  */
100 typedef struct FileFdwExecutionState
101 {
102 	char	   *filename;		/* file or program to read from */
103 	bool		is_program;		/* true if filename represents an OS command */
104 	List	   *options;		/* merged COPY options, excluding filename and
105 								 * is_program */
106 	CopyState	cstate;			/* COPY execution state */
107 } FileFdwExecutionState;
108 
109 /*
110  * SQL functions
111  */
112 PG_FUNCTION_INFO_V1(file_fdw_handler);
113 PG_FUNCTION_INFO_V1(file_fdw_validator);
114 
115 /*
116  * FDW callback routines
117  */
118 static void fileGetForeignRelSize(PlannerInfo *root,
119 					  RelOptInfo *baserel,
120 					  Oid foreigntableid);
121 static void fileGetForeignPaths(PlannerInfo *root,
122 					RelOptInfo *baserel,
123 					Oid foreigntableid);
124 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
125 				   RelOptInfo *baserel,
126 				   Oid foreigntableid,
127 				   ForeignPath *best_path,
128 				   List *tlist,
129 				   List *scan_clauses,
130 				   Plan *outer_plan);
131 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
132 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
133 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
134 static void fileReScanForeignScan(ForeignScanState *node);
135 static void fileEndForeignScan(ForeignScanState *node);
136 static bool fileAnalyzeForeignTable(Relation relation,
137 						AcquireSampleRowsFunc *func,
138 						BlockNumber *totalpages);
139 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
140 							  RangeTblEntry *rte);
141 
142 /*
143  * Helper functions
144  */
145 static bool is_valid_option(const char *option, Oid context);
146 static void fileGetOptions(Oid foreigntableid,
147 			   char **filename,
148 			   bool *is_program,
149 			   List **other_options);
150 static List *get_file_fdw_attribute_options(Oid relid);
151 static bool check_selective_binary_conversion(RelOptInfo *baserel,
152 								  Oid foreigntableid,
153 								  List **columns);
154 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
155 			  FileFdwPlanState *fdw_private);
156 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
157 			   FileFdwPlanState *fdw_private,
158 			   Cost *startup_cost, Cost *total_cost);
159 static int file_acquire_sample_rows(Relation onerel, int elevel,
160 						 HeapTuple *rows, int targrows,
161 						 double *totalrows, double *totaldeadrows);
162 
163 
164 /*
165  * Foreign-data wrapper handler function: return a struct with pointers
166  * to my callback routines.
167  */
168 Datum
file_fdw_handler(PG_FUNCTION_ARGS)169 file_fdw_handler(PG_FUNCTION_ARGS)
170 {
171 	FdwRoutine *fdwroutine = makeNode(FdwRoutine);
172 
173 	fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
174 	fdwroutine->GetForeignPaths = fileGetForeignPaths;
175 	fdwroutine->GetForeignPlan = fileGetForeignPlan;
176 	fdwroutine->ExplainForeignScan = fileExplainForeignScan;
177 	fdwroutine->BeginForeignScan = fileBeginForeignScan;
178 	fdwroutine->IterateForeignScan = fileIterateForeignScan;
179 	fdwroutine->ReScanForeignScan = fileReScanForeignScan;
180 	fdwroutine->EndForeignScan = fileEndForeignScan;
181 	fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
182 	fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
183 
184 	PG_RETURN_POINTER(fdwroutine);
185 }
186 
187 /*
188  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
189  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
190  *
191  * Raise an ERROR if the option or its value is considered invalid.
192  */
193 Datum
file_fdw_validator(PG_FUNCTION_ARGS)194 file_fdw_validator(PG_FUNCTION_ARGS)
195 {
196 	List	   *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
197 	Oid			catalog = PG_GETARG_OID(1);
198 	char	   *filename = NULL;
199 	DefElem    *force_not_null = NULL;
200 	DefElem    *force_null = NULL;
201 	List	   *other_options = NIL;
202 	ListCell   *cell;
203 
204 	/*
205 	 * Only superusers are allowed to set options of a file_fdw foreign table.
206 	 * This is because we don't want non-superusers to be able to control
207 	 * which file gets read or which program gets executed.
208 	 *
209 	 * Putting this sort of permissions check in a validator is a bit of a
210 	 * crock, but there doesn't seem to be any other place that can enforce
211 	 * the check more cleanly.
212 	 *
213 	 * Note that the valid_options[] array disallows setting filename and
214 	 * program at any options level other than foreign table --- otherwise
215 	 * there'd still be a security hole.
216 	 */
217 	if (catalog == ForeignTableRelationId && !superuser())
218 		ereport(ERROR,
219 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
220 				 errmsg("only superuser can change options of a file_fdw foreign table")));
221 
222 	/*
223 	 * Check that only options supported by file_fdw, and allowed for the
224 	 * current object type, are given.
225 	 */
226 	foreach(cell, options_list)
227 	{
228 		DefElem    *def = (DefElem *) lfirst(cell);
229 
230 		if (!is_valid_option(def->defname, catalog))
231 		{
232 			const struct FileFdwOption *opt;
233 			StringInfoData buf;
234 
235 			/*
236 			 * Unknown option specified, complain about it. Provide a hint
237 			 * with list of valid options for the object.
238 			 */
239 			initStringInfo(&buf);
240 			for (opt = valid_options; opt->optname; opt++)
241 			{
242 				if (catalog == opt->optcontext)
243 					appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
244 									 opt->optname);
245 			}
246 
247 			ereport(ERROR,
248 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
249 					 errmsg("invalid option \"%s\"", def->defname),
250 					 buf.len > 0
251 					 ? errhint("Valid options in this context are: %s",
252 							   buf.data)
253 					 : errhint("There are no valid options in this context.")));
254 		}
255 
256 		/*
257 		 * Separate out filename, program, and column-specific options, since
258 		 * ProcessCopyOptions won't accept them.
259 		 */
260 		if (strcmp(def->defname, "filename") == 0 ||
261 			strcmp(def->defname, "program") == 0)
262 		{
263 			if (filename)
264 				ereport(ERROR,
265 						(errcode(ERRCODE_SYNTAX_ERROR),
266 						 errmsg("conflicting or redundant options")));
267 			filename = defGetString(def);
268 		}
269 
270 		/*
271 		 * force_not_null is a boolean option; after validation we can discard
272 		 * it - it will be retrieved later in get_file_fdw_attribute_options()
273 		 */
274 		else if (strcmp(def->defname, "force_not_null") == 0)
275 		{
276 			if (force_not_null)
277 				ereport(ERROR,
278 						(errcode(ERRCODE_SYNTAX_ERROR),
279 						 errmsg("conflicting or redundant options"),
280 						 errhint("option \"force_not_null\" supplied more than once for a column")));
281 			force_not_null = def;
282 			/* Don't care what the value is, as long as it's a legal boolean */
283 			(void) defGetBoolean(def);
284 		}
285 		/* See comments for force_not_null above */
286 		else if (strcmp(def->defname, "force_null") == 0)
287 		{
288 			if (force_null)
289 				ereport(ERROR,
290 						(errcode(ERRCODE_SYNTAX_ERROR),
291 						 errmsg("conflicting or redundant options"),
292 						 errhint("option \"force_null\" supplied more than once for a column")));
293 			force_null = def;
294 			(void) defGetBoolean(def);
295 		}
296 		else
297 			other_options = lappend(other_options, def);
298 	}
299 
300 	/*
301 	 * Now apply the core COPY code's validation logic for more checks.
302 	 */
303 	ProcessCopyOptions(NULL, NULL, true, other_options);
304 
305 	/*
306 	 * Either filename or program option is required for file_fdw foreign
307 	 * tables.
308 	 */
309 	if (catalog == ForeignTableRelationId && filename == NULL)
310 		ereport(ERROR,
311 				(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
312 				 errmsg("either filename or program is required for file_fdw foreign tables")));
313 
314 	PG_RETURN_VOID();
315 }
316 
317 /*
318  * Check if the provided option is one of the valid options.
319  * context is the Oid of the catalog holding the object the option is for.
320  */
321 static bool
is_valid_option(const char * option,Oid context)322 is_valid_option(const char *option, Oid context)
323 {
324 	const struct FileFdwOption *opt;
325 
326 	for (opt = valid_options; opt->optname; opt++)
327 	{
328 		if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
329 			return true;
330 	}
331 	return false;
332 }
333 
334 /*
335  * Fetch the options for a file_fdw foreign table.
336  *
337  * We have to separate out filename/program from the other options because
338  * those must not appear in the options list passed to the core COPY code.
339  */
340 static void
fileGetOptions(Oid foreigntableid,char ** filename,bool * is_program,List ** other_options)341 fileGetOptions(Oid foreigntableid,
342 			   char **filename, bool *is_program, List **other_options)
343 {
344 	ForeignTable *table;
345 	ForeignServer *server;
346 	ForeignDataWrapper *wrapper;
347 	List	   *options;
348 	ListCell   *lc,
349 			   *prev;
350 
351 	/*
352 	 * Extract options from FDW objects.  We ignore user mappings because
353 	 * file_fdw doesn't have any options that can be specified there.
354 	 *
355 	 * (XXX Actually, given the current contents of valid_options[], there's
356 	 * no point in examining anything except the foreign table's own options.
357 	 * Simplify?)
358 	 */
359 	table = GetForeignTable(foreigntableid);
360 	server = GetForeignServer(table->serverid);
361 	wrapper = GetForeignDataWrapper(server->fdwid);
362 
363 	options = NIL;
364 	options = list_concat(options, wrapper->options);
365 	options = list_concat(options, server->options);
366 	options = list_concat(options, table->options);
367 	options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
368 
369 	/*
370 	 * Separate out the filename or program option (we assume there is only
371 	 * one).
372 	 */
373 	*filename = NULL;
374 	*is_program = false;
375 	prev = NULL;
376 	foreach(lc, options)
377 	{
378 		DefElem    *def = (DefElem *) lfirst(lc);
379 
380 		if (strcmp(def->defname, "filename") == 0)
381 		{
382 			*filename = defGetString(def);
383 			options = list_delete_cell(options, lc, prev);
384 			break;
385 		}
386 		else if (strcmp(def->defname, "program") == 0)
387 		{
388 			*filename = defGetString(def);
389 			*is_program = true;
390 			options = list_delete_cell(options, lc, prev);
391 			break;
392 		}
393 		prev = lc;
394 	}
395 
396 	/*
397 	 * The validator should have checked that filename or program was included
398 	 * in the options, but check again, just in case.
399 	 */
400 	if (*filename == NULL)
401 		elog(ERROR, "either filename or program is required for file_fdw foreign tables");
402 
403 	*other_options = options;
404 }
405 
406 /*
407  * Retrieve per-column generic options from pg_attribute and construct a list
408  * of DefElems representing them.
409  *
410  * At the moment we only have "force_not_null", and "force_null",
411  * which should each be combined into a single DefElem listing all such
412  * columns, since that's what COPY expects.
413  */
414 static List *
get_file_fdw_attribute_options(Oid relid)415 get_file_fdw_attribute_options(Oid relid)
416 {
417 	Relation	rel;
418 	TupleDesc	tupleDesc;
419 	AttrNumber	natts;
420 	AttrNumber	attnum;
421 	List	   *fnncolumns = NIL;
422 	List	   *fncolumns = NIL;
423 
424 	List	   *options = NIL;
425 
426 	rel = heap_open(relid, AccessShareLock);
427 	tupleDesc = RelationGetDescr(rel);
428 	natts = tupleDesc->natts;
429 
430 	/* Retrieve FDW options for all user-defined attributes. */
431 	for (attnum = 1; attnum <= natts; attnum++)
432 	{
433 		Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
434 		List	   *options;
435 		ListCell   *lc;
436 
437 		/* Skip dropped attributes. */
438 		if (attr->attisdropped)
439 			continue;
440 
441 		options = GetForeignColumnOptions(relid, attnum);
442 		foreach(lc, options)
443 		{
444 			DefElem    *def = (DefElem *) lfirst(lc);
445 
446 			if (strcmp(def->defname, "force_not_null") == 0)
447 			{
448 				if (defGetBoolean(def))
449 				{
450 					char	   *attname = pstrdup(NameStr(attr->attname));
451 
452 					fnncolumns = lappend(fnncolumns, makeString(attname));
453 				}
454 			}
455 			else if (strcmp(def->defname, "force_null") == 0)
456 			{
457 				if (defGetBoolean(def))
458 				{
459 					char	   *attname = pstrdup(NameStr(attr->attname));
460 
461 					fncolumns = lappend(fncolumns, makeString(attname));
462 				}
463 			}
464 			/* maybe in future handle other options here */
465 		}
466 	}
467 
468 	heap_close(rel, AccessShareLock);
469 
470 	/*
471 	 * Return DefElem only when some column(s) have force_not_null /
472 	 * force_null options set
473 	 */
474 	if (fnncolumns != NIL)
475 		options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
476 
477 	if (fncolumns != NIL)
478 		options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
479 
480 	return options;
481 }
482 
483 /*
484  * fileGetForeignRelSize
485  *		Obtain relation size estimates for a foreign table
486  */
487 static void
fileGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)488 fileGetForeignRelSize(PlannerInfo *root,
489 					  RelOptInfo *baserel,
490 					  Oid foreigntableid)
491 {
492 	FileFdwPlanState *fdw_private;
493 
494 	/*
495 	 * Fetch options.  We only need filename (or program) at this point, but
496 	 * we might as well get everything and not need to re-fetch it later in
497 	 * planning.
498 	 */
499 	fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
500 	fileGetOptions(foreigntableid,
501 				   &fdw_private->filename,
502 				   &fdw_private->is_program,
503 				   &fdw_private->options);
504 	baserel->fdw_private = (void *) fdw_private;
505 
506 	/* Estimate relation size */
507 	estimate_size(root, baserel, fdw_private);
508 }
509 
510 /*
511  * fileGetForeignPaths
512  *		Create possible access paths for a scan on the foreign table
513  *
514  *		Currently we don't support any push-down feature, so there is only one
515  *		possible access path, which simply returns all records in the order in
516  *		the data file.
517  */
518 static void
fileGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)519 fileGetForeignPaths(PlannerInfo *root,
520 					RelOptInfo *baserel,
521 					Oid foreigntableid)
522 {
523 	FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
524 	Cost		startup_cost;
525 	Cost		total_cost;
526 	List	   *columns;
527 	List	   *coptions = NIL;
528 
529 	/* Decide whether to selectively perform binary conversion */
530 	if (check_selective_binary_conversion(baserel,
531 										  foreigntableid,
532 										  &columns))
533 		coptions = list_make1(makeDefElem("convert_selectively",
534 										  (Node *) columns, -1));
535 
536 	/* Estimate costs */
537 	estimate_costs(root, baserel, fdw_private,
538 				   &startup_cost, &total_cost);
539 
540 	/*
541 	 * Create a ForeignPath node and add it as only possible path.  We use the
542 	 * fdw_private list of the path to carry the convert_selectively option;
543 	 * it will be propagated into the fdw_private list of the Plan node.
544 	 *
545 	 * We don't support pushing join clauses into the quals of this path, but
546 	 * it could still have required parameterization due to LATERAL refs in
547 	 * its tlist.
548 	 */
549 	add_path(baserel, (Path *)
550 			 create_foreignscan_path(root, baserel,
551 									 NULL,	/* default pathtarget */
552 									 baserel->rows,
553 									 startup_cost,
554 									 total_cost,
555 									 NIL,	/* no pathkeys */
556 									 baserel->lateral_relids,
557 									 NULL,	/* no extra plan */
558 									 coptions));
559 
560 	/*
561 	 * If data file was sorted, and we knew it somehow, we could insert
562 	 * appropriate pathkeys into the ForeignPath node to tell the planner
563 	 * that.
564 	 */
565 }
566 
567 /*
568  * fileGetForeignPlan
569  *		Create a ForeignScan plan node for scanning the foreign table
570  */
571 static ForeignScan *
fileGetForeignPlan(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)572 fileGetForeignPlan(PlannerInfo *root,
573 				   RelOptInfo *baserel,
574 				   Oid foreigntableid,
575 				   ForeignPath *best_path,
576 				   List *tlist,
577 				   List *scan_clauses,
578 				   Plan *outer_plan)
579 {
580 	Index		scan_relid = baserel->relid;
581 
582 	/*
583 	 * We have no native ability to evaluate restriction clauses, so we just
584 	 * put all the scan_clauses into the plan node's qual list for the
585 	 * executor to check.  So all we have to do here is strip RestrictInfo
586 	 * nodes from the clauses and ignore pseudoconstants (which will be
587 	 * handled elsewhere).
588 	 */
589 	scan_clauses = extract_actual_clauses(scan_clauses, false);
590 
591 	/* Create the ForeignScan node */
592 	return make_foreignscan(tlist,
593 							scan_clauses,
594 							scan_relid,
595 							NIL,	/* no expressions to evaluate */
596 							best_path->fdw_private,
597 							NIL,	/* no custom tlist */
598 							NIL,	/* no remote quals */
599 							outer_plan);
600 }
601 
602 /*
603  * fileExplainForeignScan
604  *		Produce extra output for EXPLAIN
605  */
606 static void
fileExplainForeignScan(ForeignScanState * node,ExplainState * es)607 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
608 {
609 	char	   *filename;
610 	bool		is_program;
611 	List	   *options;
612 
613 	/* Fetch options --- we only need filename and is_program at this point */
614 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
615 				   &filename, &is_program, &options);
616 
617 	if (is_program)
618 		ExplainPropertyText("Foreign Program", filename, es);
619 	else
620 		ExplainPropertyText("Foreign File", filename, es);
621 
622 	/* Suppress file size if we're not showing cost details */
623 	if (es->costs)
624 	{
625 		struct stat stat_buf;
626 
627 		if (!is_program &&
628 			stat(filename, &stat_buf) == 0)
629 			ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
630 								es);
631 	}
632 }
633 
634 /*
635  * fileBeginForeignScan
636  *		Initiate access to the file by creating CopyState
637  */
638 static void
fileBeginForeignScan(ForeignScanState * node,int eflags)639 fileBeginForeignScan(ForeignScanState *node, int eflags)
640 {
641 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
642 	char	   *filename;
643 	bool		is_program;
644 	List	   *options;
645 	CopyState	cstate;
646 	FileFdwExecutionState *festate;
647 
648 	/*
649 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
650 	 */
651 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
652 		return;
653 
654 	/* Fetch options of foreign table */
655 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
656 				   &filename, &is_program, &options);
657 
658 	/* Add any options from the plan (currently only convert_selectively) */
659 	options = list_concat(options, plan->fdw_private);
660 
661 	/*
662 	 * Create CopyState from FDW options.  We always acquire all columns, so
663 	 * as to match the expected ScanTupleSlot signature.
664 	 */
665 	cstate = BeginCopyFrom(NULL,
666 						   node->ss.ss_currentRelation,
667 						   filename,
668 						   is_program,
669 						   NULL,
670 						   NIL,
671 						   options);
672 
673 	/*
674 	 * Save state in node->fdw_state.  We must save enough information to call
675 	 * BeginCopyFrom() again.
676 	 */
677 	festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
678 	festate->filename = filename;
679 	festate->is_program = is_program;
680 	festate->options = options;
681 	festate->cstate = cstate;
682 
683 	node->fdw_state = (void *) festate;
684 }
685 
686 /*
687  * fileIterateForeignScan
688  *		Read next record from the data file and store it into the
689  *		ScanTupleSlot as a virtual tuple
690  */
691 static TupleTableSlot *
fileIterateForeignScan(ForeignScanState * node)692 fileIterateForeignScan(ForeignScanState *node)
693 {
694 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
695 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
696 	bool		found;
697 	ErrorContextCallback errcallback;
698 
699 	/* Set up callback to identify error line number. */
700 	errcallback.callback = CopyFromErrorCallback;
701 	errcallback.arg = (void *) festate->cstate;
702 	errcallback.previous = error_context_stack;
703 	error_context_stack = &errcallback;
704 
705 	/*
706 	 * The protocol for loading a virtual tuple into a slot is first
707 	 * ExecClearTuple, then fill the values/isnull arrays, then
708 	 * ExecStoreVirtualTuple.  If we don't find another row in the file, we
709 	 * just skip the last step, leaving the slot empty as required.
710 	 *
711 	 * We can pass ExprContext = NULL because we read all columns from the
712 	 * file, so no need to evaluate default expressions.
713 	 *
714 	 * We can also pass tupleOid = NULL because we don't allow oids for
715 	 * foreign tables.
716 	 */
717 	ExecClearTuple(slot);
718 	found = NextCopyFrom(festate->cstate, NULL,
719 						 slot->tts_values, slot->tts_isnull,
720 						 NULL);
721 	if (found)
722 		ExecStoreVirtualTuple(slot);
723 
724 	/* Remove error callback. */
725 	error_context_stack = errcallback.previous;
726 
727 	return slot;
728 }
729 
730 /*
731  * fileReScanForeignScan
732  *		Rescan table, possibly with new parameters
733  */
734 static void
fileReScanForeignScan(ForeignScanState * node)735 fileReScanForeignScan(ForeignScanState *node)
736 {
737 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
738 
739 	EndCopyFrom(festate->cstate);
740 
741 	festate->cstate = BeginCopyFrom(NULL,
742 									node->ss.ss_currentRelation,
743 									festate->filename,
744 									festate->is_program,
745 									NULL,
746 									NIL,
747 									festate->options);
748 }
749 
750 /*
751  * fileEndForeignScan
752  *		Finish scanning foreign table and dispose objects used for this scan
753  */
754 static void
fileEndForeignScan(ForeignScanState * node)755 fileEndForeignScan(ForeignScanState *node)
756 {
757 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
758 
759 	/* if festate is NULL, we are in EXPLAIN; nothing to do */
760 	if (festate)
761 		EndCopyFrom(festate->cstate);
762 }
763 
764 /*
765  * fileAnalyzeForeignTable
766  *		Test whether analyzing this foreign table is supported
767  */
768 static bool
fileAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)769 fileAnalyzeForeignTable(Relation relation,
770 						AcquireSampleRowsFunc *func,
771 						BlockNumber *totalpages)
772 {
773 	char	   *filename;
774 	bool		is_program;
775 	List	   *options;
776 	struct stat stat_buf;
777 
778 	/* Fetch options of foreign table */
779 	fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
780 
781 	/*
782 	 * If this is a program instead of a file, just return false to skip
783 	 * analyzing the table.  We could run the program and collect stats on
784 	 * whatever it currently returns, but it seems likely that in such cases
785 	 * the output would be too volatile for the stats to be useful.  Maybe
786 	 * there should be an option to enable doing this?
787 	 */
788 	if (is_program)
789 		return false;
790 
791 	/*
792 	 * Get size of the file.  (XXX if we fail here, would it be better to just
793 	 * return false to skip analyzing the table?)
794 	 */
795 	if (stat(filename, &stat_buf) < 0)
796 		ereport(ERROR,
797 				(errcode_for_file_access(),
798 				 errmsg("could not stat file \"%s\": %m",
799 						filename)));
800 
801 	/*
802 	 * Convert size to pages.  Must return at least 1 so that we can tell
803 	 * later on that pg_class.relpages is not default.
804 	 */
805 	*totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
806 	if (*totalpages < 1)
807 		*totalpages = 1;
808 
809 	*func = file_acquire_sample_rows;
810 
811 	return true;
812 }
813 
814 /*
815  * fileIsForeignScanParallelSafe
816  *		Reading a file, or external program, in a parallel worker should work
817  *		just the same as reading it in the leader, so mark scans safe.
818  */
819 static bool
fileIsForeignScanParallelSafe(PlannerInfo * root,RelOptInfo * rel,RangeTblEntry * rte)820 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
821 							  RangeTblEntry *rte)
822 {
823 	return true;
824 }
825 
826 /*
827  * check_selective_binary_conversion
828  *
829  * Check to see if it's useful to convert only a subset of the file's columns
830  * to binary.  If so, construct a list of the column names to be converted,
831  * return that at *columns, and return TRUE.  (Note that it's possible to
832  * determine that no columns need be converted, for instance with a COUNT(*)
833  * query.  So we can't use returning a NIL list to indicate failure.)
834  */
835 static bool
check_selective_binary_conversion(RelOptInfo * baserel,Oid foreigntableid,List ** columns)836 check_selective_binary_conversion(RelOptInfo *baserel,
837 								  Oid foreigntableid,
838 								  List **columns)
839 {
840 	ForeignTable *table;
841 	ListCell   *lc;
842 	Relation	rel;
843 	TupleDesc	tupleDesc;
844 	AttrNumber	attnum;
845 	Bitmapset  *attrs_used = NULL;
846 	bool		has_wholerow = false;
847 	int			numattrs;
848 	int			i;
849 
850 	*columns = NIL;				/* default result */
851 
852 	/*
853 	 * Check format of the file.  If binary format, this is irrelevant.
854 	 */
855 	table = GetForeignTable(foreigntableid);
856 	foreach(lc, table->options)
857 	{
858 		DefElem    *def = (DefElem *) lfirst(lc);
859 
860 		if (strcmp(def->defname, "format") == 0)
861 		{
862 			char	   *format = defGetString(def);
863 
864 			if (strcmp(format, "binary") == 0)
865 				return false;
866 			break;
867 		}
868 	}
869 
870 	/* Collect all the attributes needed for joins or final output. */
871 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
872 				   &attrs_used);
873 
874 	/* Add all the attributes used by restriction clauses. */
875 	foreach(lc, baserel->baserestrictinfo)
876 	{
877 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
878 
879 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
880 					   &attrs_used);
881 	}
882 
883 	/* Convert attribute numbers to column names. */
884 	rel = heap_open(foreigntableid, AccessShareLock);
885 	tupleDesc = RelationGetDescr(rel);
886 
887 	while ((attnum = bms_first_member(attrs_used)) >= 0)
888 	{
889 		/* Adjust for system attributes. */
890 		attnum += FirstLowInvalidHeapAttributeNumber;
891 
892 		if (attnum == 0)
893 		{
894 			has_wholerow = true;
895 			break;
896 		}
897 
898 		/* Ignore system attributes. */
899 		if (attnum < 0)
900 			continue;
901 
902 		/* Get user attributes. */
903 		if (attnum > 0)
904 		{
905 			Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
906 			char	   *attname = NameStr(attr->attname);
907 
908 			/* Skip dropped attributes (probably shouldn't see any here). */
909 			if (attr->attisdropped)
910 				continue;
911 			*columns = lappend(*columns, makeString(pstrdup(attname)));
912 		}
913 	}
914 
915 	/* Count non-dropped user attributes while we have the tupdesc. */
916 	numattrs = 0;
917 	for (i = 0; i < tupleDesc->natts; i++)
918 	{
919 		Form_pg_attribute attr = tupleDesc->attrs[i];
920 
921 		if (attr->attisdropped)
922 			continue;
923 		numattrs++;
924 	}
925 
926 	heap_close(rel, AccessShareLock);
927 
928 	/* If there's a whole-row reference, fail: we need all the columns. */
929 	if (has_wholerow)
930 	{
931 		*columns = NIL;
932 		return false;
933 	}
934 
935 	/* If all the user attributes are needed, fail. */
936 	if (numattrs == list_length(*columns))
937 	{
938 		*columns = NIL;
939 		return false;
940 	}
941 
942 	return true;
943 }
944 
945 /*
946  * Estimate size of a foreign table.
947  *
948  * The main result is returned in baserel->rows.  We also set
949  * fdw_private->pages and fdw_private->ntuples for later use in the cost
950  * calculation.
951  */
952 static void
estimate_size(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private)953 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
954 			  FileFdwPlanState *fdw_private)
955 {
956 	struct stat stat_buf;
957 	BlockNumber pages;
958 	double		ntuples;
959 	double		nrows;
960 
961 	/*
962 	 * Get size of the file.  It might not be there at plan time, though, in
963 	 * which case we have to use a default estimate.  We also have to fall
964 	 * back to the default if using a program as the input.
965 	 */
966 	if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
967 		stat_buf.st_size = 10 * BLCKSZ;
968 
969 	/*
970 	 * Convert size to pages for use in I/O cost estimate later.
971 	 */
972 	pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
973 	if (pages < 1)
974 		pages = 1;
975 	fdw_private->pages = pages;
976 
977 	/*
978 	 * Estimate the number of tuples in the file.
979 	 */
980 	if (baserel->pages > 0)
981 	{
982 		/*
983 		 * We have # of pages and # of tuples from pg_class (that is, from a
984 		 * previous ANALYZE), so compute a tuples-per-page estimate and scale
985 		 * that by the current file size.
986 		 */
987 		double		density;
988 
989 		density = baserel->tuples / (double) baserel->pages;
990 		ntuples = clamp_row_est(density * (double) pages);
991 	}
992 	else
993 	{
994 		/*
995 		 * Otherwise we have to fake it.  We back into this estimate using the
996 		 * planner's idea of the relation width; which is bogus if not all
997 		 * columns are being read, not to mention that the text representation
998 		 * of a row probably isn't the same size as its internal
999 		 * representation.  Possibly we could do something better, but the
1000 		 * real answer to anyone who complains is "ANALYZE" ...
1001 		 */
1002 		int			tuple_width;
1003 
1004 		tuple_width = MAXALIGN(baserel->reltarget->width) +
1005 			MAXALIGN(SizeofHeapTupleHeader);
1006 		ntuples = clamp_row_est((double) stat_buf.st_size /
1007 								(double) tuple_width);
1008 	}
1009 	fdw_private->ntuples = ntuples;
1010 
1011 	/*
1012 	 * Now estimate the number of rows returned by the scan after applying the
1013 	 * baserestrictinfo quals.
1014 	 */
1015 	nrows = ntuples *
1016 		clauselist_selectivity(root,
1017 							   baserel->baserestrictinfo,
1018 							   0,
1019 							   JOIN_INNER,
1020 							   NULL);
1021 
1022 	nrows = clamp_row_est(nrows);
1023 
1024 	/* Save the output-rows estimate for the planner */
1025 	baserel->rows = nrows;
1026 }
1027 
1028 /*
1029  * Estimate costs of scanning a foreign table.
1030  *
1031  * Results are returned in *startup_cost and *total_cost.
1032  */
1033 static void
estimate_costs(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private,Cost * startup_cost,Cost * total_cost)1034 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1035 			   FileFdwPlanState *fdw_private,
1036 			   Cost *startup_cost, Cost *total_cost)
1037 {
1038 	BlockNumber pages = fdw_private->pages;
1039 	double		ntuples = fdw_private->ntuples;
1040 	Cost		run_cost = 0;
1041 	Cost		cpu_per_tuple;
1042 
1043 	/*
1044 	 * We estimate costs almost the same way as cost_seqscan(), thus assuming
1045 	 * that I/O costs are equivalent to a regular table file of the same size.
1046 	 * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1047 	 * for the cost of parsing records.
1048 	 *
1049 	 * In the case of a program source, this calculation is even more divorced
1050 	 * from reality, but we have no good alternative; and it's not clear that
1051 	 * the numbers we produce here matter much anyway, since there's only one
1052 	 * access path for the rel.
1053 	 */
1054 	run_cost += seq_page_cost * pages;
1055 
1056 	*startup_cost = baserel->baserestrictcost.startup;
1057 	cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1058 	run_cost += cpu_per_tuple * ntuples;
1059 	*total_cost = *startup_cost + run_cost;
1060 }
1061 
1062 /*
1063  * file_acquire_sample_rows -- acquire a random sample of rows from the table
1064  *
1065  * Selected rows are returned in the caller-allocated array rows[],
1066  * which must have at least targrows entries.
1067  * The actual number of rows selected is returned as the function result.
1068  * We also count the total number of rows in the file and return it into
1069  * *totalrows.  Note that *totaldeadrows is always set to 0.
1070  *
1071  * Note that the returned list of rows is not always in order by physical
1072  * position in the file.  Therefore, correlation estimates derived later
1073  * may be meaningless, but it's OK because we don't use the estimates
1074  * currently (the planner only pays attention to correlation for indexscans).
1075  */
1076 static int
file_acquire_sample_rows(Relation onerel,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)1077 file_acquire_sample_rows(Relation onerel, int elevel,
1078 						 HeapTuple *rows, int targrows,
1079 						 double *totalrows, double *totaldeadrows)
1080 {
1081 	int			numrows = 0;
1082 	double		rowstoskip = -1;	/* -1 means not set yet */
1083 	ReservoirStateData rstate;
1084 	TupleDesc	tupDesc;
1085 	Datum	   *values;
1086 	bool	   *nulls;
1087 	bool		found;
1088 	char	   *filename;
1089 	bool		is_program;
1090 	List	   *options;
1091 	CopyState	cstate;
1092 	ErrorContextCallback errcallback;
1093 	MemoryContext oldcontext = CurrentMemoryContext;
1094 	MemoryContext tupcontext;
1095 
1096 	Assert(onerel);
1097 	Assert(targrows > 0);
1098 
1099 	tupDesc = RelationGetDescr(onerel);
1100 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1101 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1102 
1103 	/* Fetch options of foreign table */
1104 	fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1105 
1106 	/*
1107 	 * Create CopyState from FDW options.
1108 	 */
1109 	cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
1110 						   options);
1111 
1112 	/*
1113 	 * Use per-tuple memory context to prevent leak of memory used to read
1114 	 * rows from the file with Copy routines.
1115 	 */
1116 	tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1117 									   "file_fdw temporary context",
1118 									   ALLOCSET_DEFAULT_SIZES);
1119 
1120 	/* Prepare for sampling rows */
1121 	reservoir_init_selection_state(&rstate, targrows);
1122 
1123 	/* Set up callback to identify error line number. */
1124 	errcallback.callback = CopyFromErrorCallback;
1125 	errcallback.arg = (void *) cstate;
1126 	errcallback.previous = error_context_stack;
1127 	error_context_stack = &errcallback;
1128 
1129 	*totalrows = 0;
1130 	*totaldeadrows = 0;
1131 	for (;;)
1132 	{
1133 		/* Check for user-requested abort or sleep */
1134 		vacuum_delay_point();
1135 
1136 		/* Fetch next row */
1137 		MemoryContextReset(tupcontext);
1138 		MemoryContextSwitchTo(tupcontext);
1139 
1140 		found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
1141 
1142 		MemoryContextSwitchTo(oldcontext);
1143 
1144 		if (!found)
1145 			break;
1146 
1147 		/*
1148 		 * The first targrows sample rows are simply copied into the
1149 		 * reservoir.  Then we start replacing tuples in the sample until we
1150 		 * reach the end of the relation. This algorithm is from Jeff Vitter's
1151 		 * paper (see more info in commands/analyze.c).
1152 		 */
1153 		if (numrows < targrows)
1154 		{
1155 			rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1156 		}
1157 		else
1158 		{
1159 			/*
1160 			 * t in Vitter's paper is the number of records already processed.
1161 			 * If we need to compute a new S value, we must use the
1162 			 * not-yet-incremented value of totalrows as t.
1163 			 */
1164 			if (rowstoskip < 0)
1165 				rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1166 
1167 			if (rowstoskip <= 0)
1168 			{
1169 				/*
1170 				 * Found a suitable tuple, so save it, replacing one old tuple
1171 				 * at random
1172 				 */
1173 				int			k = (int) (targrows * sampler_random_fract(rstate.randstate));
1174 
1175 				Assert(k >= 0 && k < targrows);
1176 				heap_freetuple(rows[k]);
1177 				rows[k] = heap_form_tuple(tupDesc, values, nulls);
1178 			}
1179 
1180 			rowstoskip -= 1;
1181 		}
1182 
1183 		*totalrows += 1;
1184 	}
1185 
1186 	/* Remove error callback. */
1187 	error_context_stack = errcallback.previous;
1188 
1189 	/* Clean up. */
1190 	MemoryContextDelete(tupcontext);
1191 
1192 	EndCopyFrom(cstate);
1193 
1194 	pfree(values);
1195 	pfree(nulls);
1196 
1197 	/*
1198 	 * Emit some interesting relation info
1199 	 */
1200 	ereport(elevel,
1201 			(errmsg("\"%s\": file contains %.0f rows; "
1202 					"%d rows in sample",
1203 					RelationGetRelationName(onerel),
1204 					*totalrows, numrows)));
1205 
1206 	return numrows;
1207 }
1208