1 /*-------------------------------------------------------------------------
2  *
3  * file_fdw.c
4  *		  foreign-data wrapper for server-side flat files (or programs).
5  *
6  * Copyright (c) 2010-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/file_fdw/file_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <sys/stat.h>
16 #include <unistd.h>
17 
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "access/table.h"
22 #include "catalog/pg_authid.h"
23 #include "catalog/pg_foreign_table.h"
24 #include "commands/copy.h"
25 #include "commands/defrem.h"
26 #include "commands/explain.h"
27 #include "commands/vacuum.h"
28 #include "foreign/fdwapi.h"
29 #include "foreign/foreign.h"
30 #include "miscadmin.h"
31 #include "nodes/makefuncs.h"
32 #include "optimizer/optimizer.h"
33 #include "optimizer/pathnode.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "utils/memutils.h"
37 #include "utils/rel.h"
38 #include "utils/sampling.h"
39 
40 PG_MODULE_MAGIC;
41 
42 /*
43  * Describes the valid options for objects that use this wrapper.
44  */
45 struct FileFdwOption
46 {
47 	const char *optname;
48 	Oid			optcontext;		/* Oid of catalog in which option may appear */
49 };
50 
51 /*
52  * Valid options for file_fdw.
53  * These options are based on the options for the COPY FROM command.
54  * But note that force_not_null and force_null are handled as boolean options
55  * attached to a column, not as table options.
56  *
57  * Note: If you are adding new option for user mapping, you need to modify
58  * fileGetOptions(), which currently doesn't bother to look at user mappings.
59  */
60 static const struct FileFdwOption valid_options[] = {
61 	/* Data source options */
62 	{"filename", ForeignTableRelationId},
63 	{"program", ForeignTableRelationId},
64 
65 	/* Format options */
66 	/* oids option is not supported */
67 	{"format", ForeignTableRelationId},
68 	{"header", ForeignTableRelationId},
69 	{"delimiter", ForeignTableRelationId},
70 	{"quote", ForeignTableRelationId},
71 	{"escape", ForeignTableRelationId},
72 	{"null", ForeignTableRelationId},
73 	{"encoding", ForeignTableRelationId},
74 	{"force_not_null", AttributeRelationId},
75 	{"force_null", AttributeRelationId},
76 
77 	/*
78 	 * force_quote is not supported by file_fdw because it's for COPY TO.
79 	 */
80 
81 	/* Sentinel */
82 	{NULL, InvalidOid}
83 };
84 
85 /*
86  * FDW-specific information for RelOptInfo.fdw_private.
87  */
88 typedef struct FileFdwPlanState
89 {
90 	char	   *filename;		/* file or program to read from */
91 	bool		is_program;		/* true if filename represents an OS command */
92 	List	   *options;		/* merged COPY options, excluding filename and
93 								 * is_program */
94 	BlockNumber pages;			/* estimate of file's physical size */
95 	double		ntuples;		/* estimate of number of data rows */
96 } FileFdwPlanState;
97 
98 /*
99  * FDW-specific information for ForeignScanState.fdw_state.
100  */
101 typedef struct FileFdwExecutionState
102 {
103 	char	   *filename;		/* file or program to read from */
104 	bool		is_program;		/* true if filename represents an OS command */
105 	List	   *options;		/* merged COPY options, excluding filename and
106 								 * is_program */
107 	CopyState	cstate;			/* COPY execution state */
108 } FileFdwExecutionState;
109 
110 /*
111  * SQL functions
112  */
113 PG_FUNCTION_INFO_V1(file_fdw_handler);
114 PG_FUNCTION_INFO_V1(file_fdw_validator);
115 
116 /*
117  * FDW callback routines
118  */
119 static void fileGetForeignRelSize(PlannerInfo *root,
120 								  RelOptInfo *baserel,
121 								  Oid foreigntableid);
122 static void fileGetForeignPaths(PlannerInfo *root,
123 								RelOptInfo *baserel,
124 								Oid foreigntableid);
125 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
126 									   RelOptInfo *baserel,
127 									   Oid foreigntableid,
128 									   ForeignPath *best_path,
129 									   List *tlist,
130 									   List *scan_clauses,
131 									   Plan *outer_plan);
132 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
133 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
134 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
135 static void fileReScanForeignScan(ForeignScanState *node);
136 static void fileEndForeignScan(ForeignScanState *node);
137 static bool fileAnalyzeForeignTable(Relation relation,
138 									AcquireSampleRowsFunc *func,
139 									BlockNumber *totalpages);
140 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
141 										  RangeTblEntry *rte);
142 
143 /*
144  * Helper functions
145  */
146 static bool is_valid_option(const char *option, Oid context);
147 static void fileGetOptions(Oid foreigntableid,
148 						   char **filename,
149 						   bool *is_program,
150 						   List **other_options);
151 static List *get_file_fdw_attribute_options(Oid relid);
152 static bool check_selective_binary_conversion(RelOptInfo *baserel,
153 											  Oid foreigntableid,
154 											  List **columns);
155 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
156 						  FileFdwPlanState *fdw_private);
157 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
158 						   FileFdwPlanState *fdw_private,
159 						   Cost *startup_cost, Cost *total_cost);
160 static int	file_acquire_sample_rows(Relation onerel, int elevel,
161 									 HeapTuple *rows, int targrows,
162 									 double *totalrows, double *totaldeadrows);
163 
164 
165 /*
166  * Foreign-data wrapper handler function: return a struct with pointers
167  * to my callback routines.
168  */
169 Datum
170 file_fdw_handler(PG_FUNCTION_ARGS)
171 {
172 	FdwRoutine *fdwroutine = makeNode(FdwRoutine);
173 
174 	fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
175 	fdwroutine->GetForeignPaths = fileGetForeignPaths;
176 	fdwroutine->GetForeignPlan = fileGetForeignPlan;
177 	fdwroutine->ExplainForeignScan = fileExplainForeignScan;
178 	fdwroutine->BeginForeignScan = fileBeginForeignScan;
179 	fdwroutine->IterateForeignScan = fileIterateForeignScan;
180 	fdwroutine->ReScanForeignScan = fileReScanForeignScan;
181 	fdwroutine->EndForeignScan = fileEndForeignScan;
182 	fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
183 	fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
184 
185 	PG_RETURN_POINTER(fdwroutine);
186 }
187 
188 /*
189  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
190  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
191  *
192  * Raise an ERROR if the option or its value is considered invalid.
193  */
194 Datum
195 file_fdw_validator(PG_FUNCTION_ARGS)
196 {
197 	List	   *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
198 	Oid			catalog = PG_GETARG_OID(1);
199 	char	   *filename = NULL;
200 	DefElem    *force_not_null = NULL;
201 	DefElem    *force_null = NULL;
202 	List	   *other_options = NIL;
203 	ListCell   *cell;
204 
205 	/*
206 	 * Check that only options supported by file_fdw, and allowed for the
207 	 * current object type, are given.
208 	 */
209 	foreach(cell, options_list)
210 	{
211 		DefElem    *def = (DefElem *) lfirst(cell);
212 
213 		if (!is_valid_option(def->defname, catalog))
214 		{
215 			const struct FileFdwOption *opt;
216 			StringInfoData buf;
217 
218 			/*
219 			 * Unknown option specified, complain about it. Provide a hint
220 			 * with list of valid options for the object.
221 			 */
222 			initStringInfo(&buf);
223 			for (opt = valid_options; opt->optname; opt++)
224 			{
225 				if (catalog == opt->optcontext)
226 					appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
227 									 opt->optname);
228 			}
229 
230 			ereport(ERROR,
231 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
232 					 errmsg("invalid option \"%s\"", def->defname),
233 					 buf.len > 0
234 					 ? errhint("Valid options in this context are: %s",
235 							   buf.data)
236 					 : errhint("There are no valid options in this context.")));
237 		}
238 
239 		/*
240 		 * Separate out filename, program, and column-specific options, since
241 		 * ProcessCopyOptions won't accept them.
242 		 */
243 		if (strcmp(def->defname, "filename") == 0 ||
244 			strcmp(def->defname, "program") == 0)
245 		{
246 			if (filename)
247 				ereport(ERROR,
248 						(errcode(ERRCODE_SYNTAX_ERROR),
249 						 errmsg("conflicting or redundant options")));
250 
251 			/*
252 			 * Check permissions for changing which file or program is used by
253 			 * the file_fdw.
254 			 *
255 			 * Only members of the role 'pg_read_server_files' are allowed to
256 			 * set the 'filename' option of a file_fdw foreign table, while
257 			 * only members of the role 'pg_execute_server_program' are
258 			 * allowed to set the 'program' option.  This is because we don't
259 			 * want regular users to be able to control which file gets read
260 			 * or which program gets executed.
261 			 *
262 			 * Putting this sort of permissions check in a validator is a bit
263 			 * of a crock, but there doesn't seem to be any other place that
264 			 * can enforce the check more cleanly.
265 			 *
266 			 * Note that the valid_options[] array disallows setting filename
267 			 * and program at any options level other than foreign table ---
268 			 * otherwise there'd still be a security hole.
269 			 */
270 			if (strcmp(def->defname, "filename") == 0 &&
271 				!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
272 				ereport(ERROR,
273 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
274 						 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table")));
275 
276 			if (strcmp(def->defname, "program") == 0 &&
277 				!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
278 				ereport(ERROR,
279 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
280 						 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table")));
281 
282 			filename = defGetString(def);
283 		}
284 
285 		/*
286 		 * force_not_null is a boolean option; after validation we can discard
287 		 * it - it will be retrieved later in get_file_fdw_attribute_options()
288 		 */
289 		else if (strcmp(def->defname, "force_not_null") == 0)
290 		{
291 			if (force_not_null)
292 				ereport(ERROR,
293 						(errcode(ERRCODE_SYNTAX_ERROR),
294 						 errmsg("conflicting or redundant options"),
295 						 errhint("Option \"force_not_null\" supplied more than once for a column.")));
296 			force_not_null = def;
297 			/* Don't care what the value is, as long as it's a legal boolean */
298 			(void) defGetBoolean(def);
299 		}
300 		/* See comments for force_not_null above */
301 		else if (strcmp(def->defname, "force_null") == 0)
302 		{
303 			if (force_null)
304 				ereport(ERROR,
305 						(errcode(ERRCODE_SYNTAX_ERROR),
306 						 errmsg("conflicting or redundant options"),
307 						 errhint("Option \"force_null\" supplied more than once for a column.")));
308 			force_null = def;
309 			(void) defGetBoolean(def);
310 		}
311 		else
312 			other_options = lappend(other_options, def);
313 	}
314 
315 	/*
316 	 * Now apply the core COPY code's validation logic for more checks.
317 	 */
318 	ProcessCopyOptions(NULL, NULL, true, other_options);
319 
320 	/*
321 	 * Either filename or program option is required for file_fdw foreign
322 	 * tables.
323 	 */
324 	if (catalog == ForeignTableRelationId && filename == NULL)
325 		ereport(ERROR,
326 				(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
327 				 errmsg("either filename or program is required for file_fdw foreign tables")));
328 
329 	PG_RETURN_VOID();
330 }
331 
332 /*
333  * Check if the provided option is one of the valid options.
334  * context is the Oid of the catalog holding the object the option is for.
335  */
336 static bool
337 is_valid_option(const char *option, Oid context)
338 {
339 	const struct FileFdwOption *opt;
340 
341 	for (opt = valid_options; opt->optname; opt++)
342 	{
343 		if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
344 			return true;
345 	}
346 	return false;
347 }
348 
349 /*
350  * Fetch the options for a file_fdw foreign table.
351  *
352  * We have to separate out filename/program from the other options because
353  * those must not appear in the options list passed to the core COPY code.
354  */
355 static void
356 fileGetOptions(Oid foreigntableid,
357 			   char **filename, bool *is_program, List **other_options)
358 {
359 	ForeignTable *table;
360 	ForeignServer *server;
361 	ForeignDataWrapper *wrapper;
362 	List	   *options;
363 	ListCell   *lc,
364 			   *prev;
365 
366 	/*
367 	 * Extract options from FDW objects.  We ignore user mappings because
368 	 * file_fdw doesn't have any options that can be specified there.
369 	 *
370 	 * (XXX Actually, given the current contents of valid_options[], there's
371 	 * no point in examining anything except the foreign table's own options.
372 	 * Simplify?)
373 	 */
374 	table = GetForeignTable(foreigntableid);
375 	server = GetForeignServer(table->serverid);
376 	wrapper = GetForeignDataWrapper(server->fdwid);
377 
378 	options = NIL;
379 	options = list_concat(options, wrapper->options);
380 	options = list_concat(options, server->options);
381 	options = list_concat(options, table->options);
382 	options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
383 
384 	/*
385 	 * Separate out the filename or program option (we assume there is only
386 	 * one).
387 	 */
388 	*filename = NULL;
389 	*is_program = false;
390 	prev = NULL;
391 	foreach(lc, options)
392 	{
393 		DefElem    *def = (DefElem *) lfirst(lc);
394 
395 		if (strcmp(def->defname, "filename") == 0)
396 		{
397 			*filename = defGetString(def);
398 			options = list_delete_cell(options, lc, prev);
399 			break;
400 		}
401 		else if (strcmp(def->defname, "program") == 0)
402 		{
403 			*filename = defGetString(def);
404 			*is_program = true;
405 			options = list_delete_cell(options, lc, prev);
406 			break;
407 		}
408 		prev = lc;
409 	}
410 
411 	/*
412 	 * The validator should have checked that filename or program was included
413 	 * in the options, but check again, just in case.
414 	 */
415 	if (*filename == NULL)
416 		elog(ERROR, "either filename or program is required for file_fdw foreign tables");
417 
418 	*other_options = options;
419 }
420 
421 /*
422  * Retrieve per-column generic options from pg_attribute and construct a list
423  * of DefElems representing them.
424  *
425  * At the moment we only have "force_not_null", and "force_null",
426  * which should each be combined into a single DefElem listing all such
427  * columns, since that's what COPY expects.
428  */
429 static List *
430 get_file_fdw_attribute_options(Oid relid)
431 {
432 	Relation	rel;
433 	TupleDesc	tupleDesc;
434 	AttrNumber	natts;
435 	AttrNumber	attnum;
436 	List	   *fnncolumns = NIL;
437 	List	   *fncolumns = NIL;
438 
439 	List	   *options = NIL;
440 
441 	rel = table_open(relid, AccessShareLock);
442 	tupleDesc = RelationGetDescr(rel);
443 	natts = tupleDesc->natts;
444 
445 	/* Retrieve FDW options for all user-defined attributes. */
446 	for (attnum = 1; attnum <= natts; attnum++)
447 	{
448 		Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
449 		List	   *options;
450 		ListCell   *lc;
451 
452 		/* Skip dropped attributes. */
453 		if (attr->attisdropped)
454 			continue;
455 
456 		options = GetForeignColumnOptions(relid, attnum);
457 		foreach(lc, options)
458 		{
459 			DefElem    *def = (DefElem *) lfirst(lc);
460 
461 			if (strcmp(def->defname, "force_not_null") == 0)
462 			{
463 				if (defGetBoolean(def))
464 				{
465 					char	   *attname = pstrdup(NameStr(attr->attname));
466 
467 					fnncolumns = lappend(fnncolumns, makeString(attname));
468 				}
469 			}
470 			else if (strcmp(def->defname, "force_null") == 0)
471 			{
472 				if (defGetBoolean(def))
473 				{
474 					char	   *attname = pstrdup(NameStr(attr->attname));
475 
476 					fncolumns = lappend(fncolumns, makeString(attname));
477 				}
478 			}
479 			/* maybe in future handle other options here */
480 		}
481 	}
482 
483 	table_close(rel, AccessShareLock);
484 
485 	/*
486 	 * Return DefElem only when some column(s) have force_not_null /
487 	 * force_null options set
488 	 */
489 	if (fnncolumns != NIL)
490 		options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
491 
492 	if (fncolumns != NIL)
493 		options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
494 
495 	return options;
496 }
497 
498 /*
499  * fileGetForeignRelSize
500  *		Obtain relation size estimates for a foreign table
501  */
502 static void
503 fileGetForeignRelSize(PlannerInfo *root,
504 					  RelOptInfo *baserel,
505 					  Oid foreigntableid)
506 {
507 	FileFdwPlanState *fdw_private;
508 
509 	/*
510 	 * Fetch options.  We only need filename (or program) at this point, but
511 	 * we might as well get everything and not need to re-fetch it later in
512 	 * planning.
513 	 */
514 	fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
515 	fileGetOptions(foreigntableid,
516 				   &fdw_private->filename,
517 				   &fdw_private->is_program,
518 				   &fdw_private->options);
519 	baserel->fdw_private = (void *) fdw_private;
520 
521 	/* Estimate relation size */
522 	estimate_size(root, baserel, fdw_private);
523 }
524 
525 /*
526  * fileGetForeignPaths
527  *		Create possible access paths for a scan on the foreign table
528  *
529  *		Currently we don't support any push-down feature, so there is only one
530  *		possible access path, which simply returns all records in the order in
531  *		the data file.
532  */
533 static void
534 fileGetForeignPaths(PlannerInfo *root,
535 					RelOptInfo *baserel,
536 					Oid foreigntableid)
537 {
538 	FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
539 	Cost		startup_cost;
540 	Cost		total_cost;
541 	List	   *columns;
542 	List	   *coptions = NIL;
543 
544 	/* Decide whether to selectively perform binary conversion */
545 	if (check_selective_binary_conversion(baserel,
546 										  foreigntableid,
547 										  &columns))
548 		coptions = list_make1(makeDefElem("convert_selectively",
549 										  (Node *) columns, -1));
550 
551 	/* Estimate costs */
552 	estimate_costs(root, baserel, fdw_private,
553 				   &startup_cost, &total_cost);
554 
555 	/*
556 	 * Create a ForeignPath node and add it as only possible path.  We use the
557 	 * fdw_private list of the path to carry the convert_selectively option;
558 	 * it will be propagated into the fdw_private list of the Plan node.
559 	 *
560 	 * We don't support pushing join clauses into the quals of this path, but
561 	 * it could still have required parameterization due to LATERAL refs in
562 	 * its tlist.
563 	 */
564 	add_path(baserel, (Path *)
565 			 create_foreignscan_path(root, baserel,
566 									 NULL,	/* default pathtarget */
567 									 baserel->rows,
568 									 startup_cost,
569 									 total_cost,
570 									 NIL,	/* no pathkeys */
571 									 baserel->lateral_relids,
572 									 NULL,	/* no extra plan */
573 									 coptions));
574 
575 	/*
576 	 * If data file was sorted, and we knew it somehow, we could insert
577 	 * appropriate pathkeys into the ForeignPath node to tell the planner
578 	 * that.
579 	 */
580 }
581 
582 /*
583  * fileGetForeignPlan
584  *		Create a ForeignScan plan node for scanning the foreign table
585  */
586 static ForeignScan *
587 fileGetForeignPlan(PlannerInfo *root,
588 				   RelOptInfo *baserel,
589 				   Oid foreigntableid,
590 				   ForeignPath *best_path,
591 				   List *tlist,
592 				   List *scan_clauses,
593 				   Plan *outer_plan)
594 {
595 	Index		scan_relid = baserel->relid;
596 
597 	/*
598 	 * We have no native ability to evaluate restriction clauses, so we just
599 	 * put all the scan_clauses into the plan node's qual list for the
600 	 * executor to check.  So all we have to do here is strip RestrictInfo
601 	 * nodes from the clauses and ignore pseudoconstants (which will be
602 	 * handled elsewhere).
603 	 */
604 	scan_clauses = extract_actual_clauses(scan_clauses, false);
605 
606 	/* Create the ForeignScan node */
607 	return make_foreignscan(tlist,
608 							scan_clauses,
609 							scan_relid,
610 							NIL,	/* no expressions to evaluate */
611 							best_path->fdw_private,
612 							NIL,	/* no custom tlist */
613 							NIL,	/* no remote quals */
614 							outer_plan);
615 }
616 
617 /*
618  * fileExplainForeignScan
619  *		Produce extra output for EXPLAIN
620  */
621 static void
622 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
623 {
624 	char	   *filename;
625 	bool		is_program;
626 	List	   *options;
627 
628 	/* Fetch options --- we only need filename and is_program at this point */
629 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
630 				   &filename, &is_program, &options);
631 
632 	if (is_program)
633 		ExplainPropertyText("Foreign Program", filename, es);
634 	else
635 		ExplainPropertyText("Foreign File", filename, es);
636 
637 	/* Suppress file size if we're not showing cost details */
638 	if (es->costs)
639 	{
640 		struct stat stat_buf;
641 
642 		if (!is_program &&
643 			stat(filename, &stat_buf) == 0)
644 			ExplainPropertyInteger("Foreign File Size", "b",
645 								   (int64) stat_buf.st_size, es);
646 	}
647 }
648 
649 /*
650  * fileBeginForeignScan
651  *		Initiate access to the file by creating CopyState
652  */
653 static void
654 fileBeginForeignScan(ForeignScanState *node, int eflags)
655 {
656 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
657 	char	   *filename;
658 	bool		is_program;
659 	List	   *options;
660 	CopyState	cstate;
661 	FileFdwExecutionState *festate;
662 
663 	/*
664 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
665 	 */
666 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
667 		return;
668 
669 	/* Fetch options of foreign table */
670 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
671 				   &filename, &is_program, &options);
672 
673 	/* Add any options from the plan (currently only convert_selectively) */
674 	options = list_concat(options, plan->fdw_private);
675 
676 	/*
677 	 * Create CopyState from FDW options.  We always acquire all columns, so
678 	 * as to match the expected ScanTupleSlot signature.
679 	 */
680 	cstate = BeginCopyFrom(NULL,
681 						   node->ss.ss_currentRelation,
682 						   filename,
683 						   is_program,
684 						   NULL,
685 						   NIL,
686 						   options);
687 
688 	/*
689 	 * Save state in node->fdw_state.  We must save enough information to call
690 	 * BeginCopyFrom() again.
691 	 */
692 	festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
693 	festate->filename = filename;
694 	festate->is_program = is_program;
695 	festate->options = options;
696 	festate->cstate = cstate;
697 
698 	node->fdw_state = (void *) festate;
699 }
700 
701 /*
702  * fileIterateForeignScan
703  *		Read next record from the data file and store it into the
704  *		ScanTupleSlot as a virtual tuple
705  */
706 static TupleTableSlot *
707 fileIterateForeignScan(ForeignScanState *node)
708 {
709 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
710 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
711 	bool		found;
712 	ErrorContextCallback errcallback;
713 
714 	/* Set up callback to identify error line number. */
715 	errcallback.callback = CopyFromErrorCallback;
716 	errcallback.arg = (void *) festate->cstate;
717 	errcallback.previous = error_context_stack;
718 	error_context_stack = &errcallback;
719 
720 	/*
721 	 * The protocol for loading a virtual tuple into a slot is first
722 	 * ExecClearTuple, then fill the values/isnull arrays, then
723 	 * ExecStoreVirtualTuple.  If we don't find another row in the file, we
724 	 * just skip the last step, leaving the slot empty as required.
725 	 *
726 	 * We can pass ExprContext = NULL because we read all columns from the
727 	 * file, so no need to evaluate default expressions.
728 	 */
729 	ExecClearTuple(slot);
730 	found = NextCopyFrom(festate->cstate, NULL,
731 						 slot->tts_values, slot->tts_isnull);
732 	if (found)
733 		ExecStoreVirtualTuple(slot);
734 
735 	/* Remove error callback. */
736 	error_context_stack = errcallback.previous;
737 
738 	return slot;
739 }
740 
741 /*
742  * fileReScanForeignScan
743  *		Rescan table, possibly with new parameters
744  */
745 static void
746 fileReScanForeignScan(ForeignScanState *node)
747 {
748 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
749 
750 	EndCopyFrom(festate->cstate);
751 
752 	festate->cstate = BeginCopyFrom(NULL,
753 									node->ss.ss_currentRelation,
754 									festate->filename,
755 									festate->is_program,
756 									NULL,
757 									NIL,
758 									festate->options);
759 }
760 
761 /*
762  * fileEndForeignScan
763  *		Finish scanning foreign table and dispose objects used for this scan
764  */
765 static void
766 fileEndForeignScan(ForeignScanState *node)
767 {
768 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
769 
770 	/* if festate is NULL, we are in EXPLAIN; nothing to do */
771 	if (festate)
772 		EndCopyFrom(festate->cstate);
773 }
774 
775 /*
776  * fileAnalyzeForeignTable
777  *		Test whether analyzing this foreign table is supported
778  */
779 static bool
780 fileAnalyzeForeignTable(Relation relation,
781 						AcquireSampleRowsFunc *func,
782 						BlockNumber *totalpages)
783 {
784 	char	   *filename;
785 	bool		is_program;
786 	List	   *options;
787 	struct stat stat_buf;
788 
789 	/* Fetch options of foreign table */
790 	fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
791 
792 	/*
793 	 * If this is a program instead of a file, just return false to skip
794 	 * analyzing the table.  We could run the program and collect stats on
795 	 * whatever it currently returns, but it seems likely that in such cases
796 	 * the output would be too volatile for the stats to be useful.  Maybe
797 	 * there should be an option to enable doing this?
798 	 */
799 	if (is_program)
800 		return false;
801 
802 	/*
803 	 * Get size of the file.  (XXX if we fail here, would it be better to just
804 	 * return false to skip analyzing the table?)
805 	 */
806 	if (stat(filename, &stat_buf) < 0)
807 		ereport(ERROR,
808 				(errcode_for_file_access(),
809 				 errmsg("could not stat file \"%s\": %m",
810 						filename)));
811 
812 	/*
813 	 * Convert size to pages.  Must return at least 1 so that we can tell
814 	 * later on that pg_class.relpages is not default.
815 	 */
816 	*totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
817 	if (*totalpages < 1)
818 		*totalpages = 1;
819 
820 	*func = file_acquire_sample_rows;
821 
822 	return true;
823 }
824 
825 /*
826  * fileIsForeignScanParallelSafe
827  *		Reading a file, or external program, in a parallel worker should work
828  *		just the same as reading it in the leader, so mark scans safe.
829  */
830 static bool
831 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
832 							  RangeTblEntry *rte)
833 {
834 	return true;
835 }
836 
837 /*
838  * check_selective_binary_conversion
839  *
840  * Check to see if it's useful to convert only a subset of the file's columns
841  * to binary.  If so, construct a list of the column names to be converted,
842  * return that at *columns, and return true.  (Note that it's possible to
843  * determine that no columns need be converted, for instance with a COUNT(*)
844  * query.  So we can't use returning a NIL list to indicate failure.)
845  */
846 static bool
847 check_selective_binary_conversion(RelOptInfo *baserel,
848 								  Oid foreigntableid,
849 								  List **columns)
850 {
851 	ForeignTable *table;
852 	ListCell   *lc;
853 	Relation	rel;
854 	TupleDesc	tupleDesc;
855 	AttrNumber	attnum;
856 	Bitmapset  *attrs_used = NULL;
857 	bool		has_wholerow = false;
858 	int			numattrs;
859 	int			i;
860 
861 	*columns = NIL;				/* default result */
862 
863 	/*
864 	 * Check format of the file.  If binary format, this is irrelevant.
865 	 */
866 	table = GetForeignTable(foreigntableid);
867 	foreach(lc, table->options)
868 	{
869 		DefElem    *def = (DefElem *) lfirst(lc);
870 
871 		if (strcmp(def->defname, "format") == 0)
872 		{
873 			char	   *format = defGetString(def);
874 
875 			if (strcmp(format, "binary") == 0)
876 				return false;
877 			break;
878 		}
879 	}
880 
881 	/* Collect all the attributes needed for joins or final output. */
882 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
883 				   &attrs_used);
884 
885 	/* Add all the attributes used by restriction clauses. */
886 	foreach(lc, baserel->baserestrictinfo)
887 	{
888 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
889 
890 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
891 					   &attrs_used);
892 	}
893 
894 	/* Convert attribute numbers to column names. */
895 	rel = table_open(foreigntableid, AccessShareLock);
896 	tupleDesc = RelationGetDescr(rel);
897 
898 	while ((attnum = bms_first_member(attrs_used)) >= 0)
899 	{
900 		/* Adjust for system attributes. */
901 		attnum += FirstLowInvalidHeapAttributeNumber;
902 
903 		if (attnum == 0)
904 		{
905 			has_wholerow = true;
906 			break;
907 		}
908 
909 		/* Ignore system attributes. */
910 		if (attnum < 0)
911 			continue;
912 
913 		/* Get user attributes. */
914 		if (attnum > 0)
915 		{
916 			Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
917 			char	   *attname = NameStr(attr->attname);
918 
919 			/* Skip dropped attributes (probably shouldn't see any here). */
920 			if (attr->attisdropped)
921 				continue;
922 
923 			/*
924 			 * Skip generated columns (COPY won't accept them in the column
925 			 * list)
926 			 */
927 			if (attr->attgenerated)
928 				continue;
929 			*columns = lappend(*columns, makeString(pstrdup(attname)));
930 		}
931 	}
932 
933 	/* Count non-dropped user attributes while we have the tupdesc. */
934 	numattrs = 0;
935 	for (i = 0; i < tupleDesc->natts; i++)
936 	{
937 		Form_pg_attribute attr = TupleDescAttr(tupleDesc, i);
938 
939 		if (attr->attisdropped)
940 			continue;
941 		numattrs++;
942 	}
943 
944 	table_close(rel, AccessShareLock);
945 
946 	/* If there's a whole-row reference, fail: we need all the columns. */
947 	if (has_wholerow)
948 	{
949 		*columns = NIL;
950 		return false;
951 	}
952 
953 	/* If all the user attributes are needed, fail. */
954 	if (numattrs == list_length(*columns))
955 	{
956 		*columns = NIL;
957 		return false;
958 	}
959 
960 	return true;
961 }
962 
963 /*
964  * Estimate size of a foreign table.
965  *
966  * The main result is returned in baserel->rows.  We also set
967  * fdw_private->pages and fdw_private->ntuples for later use in the cost
968  * calculation.
969  */
970 static void
971 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
972 			  FileFdwPlanState *fdw_private)
973 {
974 	struct stat stat_buf;
975 	BlockNumber pages;
976 	double		ntuples;
977 	double		nrows;
978 
979 	/*
980 	 * Get size of the file.  It might not be there at plan time, though, in
981 	 * which case we have to use a default estimate.  We also have to fall
982 	 * back to the default if using a program as the input.
983 	 */
984 	if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
985 		stat_buf.st_size = 10 * BLCKSZ;
986 
987 	/*
988 	 * Convert size to pages for use in I/O cost estimate later.
989 	 */
990 	pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
991 	if (pages < 1)
992 		pages = 1;
993 	fdw_private->pages = pages;
994 
995 	/*
996 	 * Estimate the number of tuples in the file.
997 	 */
998 	if (baserel->pages > 0)
999 	{
1000 		/*
1001 		 * We have # of pages and # of tuples from pg_class (that is, from a
1002 		 * previous ANALYZE), so compute a tuples-per-page estimate and scale
1003 		 * that by the current file size.
1004 		 */
1005 		double		density;
1006 
1007 		density = baserel->tuples / (double) baserel->pages;
1008 		ntuples = clamp_row_est(density * (double) pages);
1009 	}
1010 	else
1011 	{
1012 		/*
1013 		 * Otherwise we have to fake it.  We back into this estimate using the
1014 		 * planner's idea of the relation width; which is bogus if not all
1015 		 * columns are being read, not to mention that the text representation
1016 		 * of a row probably isn't the same size as its internal
1017 		 * representation.  Possibly we could do something better, but the
1018 		 * real answer to anyone who complains is "ANALYZE" ...
1019 		 */
1020 		int			tuple_width;
1021 
1022 		tuple_width = MAXALIGN(baserel->reltarget->width) +
1023 			MAXALIGN(SizeofHeapTupleHeader);
1024 		ntuples = clamp_row_est((double) stat_buf.st_size /
1025 								(double) tuple_width);
1026 	}
1027 	fdw_private->ntuples = ntuples;
1028 
1029 	/*
1030 	 * Now estimate the number of rows returned by the scan after applying the
1031 	 * baserestrictinfo quals.
1032 	 */
1033 	nrows = ntuples *
1034 		clauselist_selectivity(root,
1035 							   baserel->baserestrictinfo,
1036 							   0,
1037 							   JOIN_INNER,
1038 							   NULL);
1039 
1040 	nrows = clamp_row_est(nrows);
1041 
1042 	/* Save the output-rows estimate for the planner */
1043 	baserel->rows = nrows;
1044 }
1045 
1046 /*
1047  * Estimate costs of scanning a foreign table.
1048  *
1049  * Results are returned in *startup_cost and *total_cost.
1050  */
1051 static void
1052 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1053 			   FileFdwPlanState *fdw_private,
1054 			   Cost *startup_cost, Cost *total_cost)
1055 {
1056 	BlockNumber pages = fdw_private->pages;
1057 	double		ntuples = fdw_private->ntuples;
1058 	Cost		run_cost = 0;
1059 	Cost		cpu_per_tuple;
1060 
1061 	/*
1062 	 * We estimate costs almost the same way as cost_seqscan(), thus assuming
1063 	 * that I/O costs are equivalent to a regular table file of the same size.
1064 	 * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1065 	 * for the cost of parsing records.
1066 	 *
1067 	 * In the case of a program source, this calculation is even more divorced
1068 	 * from reality, but we have no good alternative; and it's not clear that
1069 	 * the numbers we produce here matter much anyway, since there's only one
1070 	 * access path for the rel.
1071 	 */
1072 	run_cost += seq_page_cost * pages;
1073 
1074 	*startup_cost = baserel->baserestrictcost.startup;
1075 	cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1076 	run_cost += cpu_per_tuple * ntuples;
1077 	*total_cost = *startup_cost + run_cost;
1078 }
1079 
1080 /*
1081  * file_acquire_sample_rows -- acquire a random sample of rows from the table
1082  *
1083  * Selected rows are returned in the caller-allocated array rows[],
1084  * which must have at least targrows entries.
1085  * The actual number of rows selected is returned as the function result.
1086  * We also count the total number of rows in the file and return it into
1087  * *totalrows.  Note that *totaldeadrows is always set to 0.
1088  *
1089  * Note that the returned list of rows is not always in order by physical
1090  * position in the file.  Therefore, correlation estimates derived later
1091  * may be meaningless, but it's OK because we don't use the estimates
1092  * currently (the planner only pays attention to correlation for indexscans).
1093  */
1094 static int
1095 file_acquire_sample_rows(Relation onerel, int elevel,
1096 						 HeapTuple *rows, int targrows,
1097 						 double *totalrows, double *totaldeadrows)
1098 {
1099 	int			numrows = 0;
1100 	double		rowstoskip = -1;	/* -1 means not set yet */
1101 	ReservoirStateData rstate;
1102 	TupleDesc	tupDesc;
1103 	Datum	   *values;
1104 	bool	   *nulls;
1105 	bool		found;
1106 	char	   *filename;
1107 	bool		is_program;
1108 	List	   *options;
1109 	CopyState	cstate;
1110 	ErrorContextCallback errcallback;
1111 	MemoryContext oldcontext = CurrentMemoryContext;
1112 	MemoryContext tupcontext;
1113 
1114 	Assert(onerel);
1115 	Assert(targrows > 0);
1116 
1117 	tupDesc = RelationGetDescr(onerel);
1118 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1119 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1120 
1121 	/* Fetch options of foreign table */
1122 	fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1123 
1124 	/*
1125 	 * Create CopyState from FDW options.
1126 	 */
1127 	cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
1128 						   options);
1129 
1130 	/*
1131 	 * Use per-tuple memory context to prevent leak of memory used to read
1132 	 * rows from the file with Copy routines.
1133 	 */
1134 	tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1135 									   "file_fdw temporary context",
1136 									   ALLOCSET_DEFAULT_SIZES);
1137 
1138 	/* Prepare for sampling rows */
1139 	reservoir_init_selection_state(&rstate, targrows);
1140 
1141 	/* Set up callback to identify error line number. */
1142 	errcallback.callback = CopyFromErrorCallback;
1143 	errcallback.arg = (void *) cstate;
1144 	errcallback.previous = error_context_stack;
1145 	error_context_stack = &errcallback;
1146 
1147 	*totalrows = 0;
1148 	*totaldeadrows = 0;
1149 	for (;;)
1150 	{
1151 		/* Check for user-requested abort or sleep */
1152 		vacuum_delay_point();
1153 
1154 		/* Fetch next row */
1155 		MemoryContextReset(tupcontext);
1156 		MemoryContextSwitchTo(tupcontext);
1157 
1158 		found = NextCopyFrom(cstate, NULL, values, nulls);
1159 
1160 		MemoryContextSwitchTo(oldcontext);
1161 
1162 		if (!found)
1163 			break;
1164 
1165 		/*
1166 		 * The first targrows sample rows are simply copied into the
1167 		 * reservoir.  Then we start replacing tuples in the sample until we
1168 		 * reach the end of the relation. This algorithm is from Jeff Vitter's
1169 		 * paper (see more info in commands/analyze.c).
1170 		 */
1171 		if (numrows < targrows)
1172 		{
1173 			rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1174 		}
1175 		else
1176 		{
1177 			/*
1178 			 * t in Vitter's paper is the number of records already processed.
1179 			 * If we need to compute a new S value, we must use the
1180 			 * not-yet-incremented value of totalrows as t.
1181 			 */
1182 			if (rowstoskip < 0)
1183 				rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1184 
1185 			if (rowstoskip <= 0)
1186 			{
1187 				/*
1188 				 * Found a suitable tuple, so save it, replacing one old tuple
1189 				 * at random
1190 				 */
1191 				int			k = (int) (targrows * sampler_random_fract(rstate.randstate));
1192 
1193 				Assert(k >= 0 && k < targrows);
1194 				heap_freetuple(rows[k]);
1195 				rows[k] = heap_form_tuple(tupDesc, values, nulls);
1196 			}
1197 
1198 			rowstoskip -= 1;
1199 		}
1200 
1201 		*totalrows += 1;
1202 	}
1203 
1204 	/* Remove error callback. */
1205 	error_context_stack = errcallback.previous;
1206 
1207 	/* Clean up. */
1208 	MemoryContextDelete(tupcontext);
1209 
1210 	EndCopyFrom(cstate);
1211 
1212 	pfree(values);
1213 	pfree(nulls);
1214 
1215 	/*
1216 	 * Emit some interesting relation info
1217 	 */
1218 	ereport(elevel,
1219 			(errmsg("\"%s\": file contains %.0f rows; "
1220 					"%d rows in sample",
1221 					RelationGetRelationName(onerel),
1222 					*totalrows, numrows)));
1223 
1224 	return numrows;
1225 }
1226