1 /*-------------------------------------------------------------------------
2  *
3  * file_fdw.c
4  *		  foreign-data wrapper for server-side flat files (or programs).
5  *
6  * Copyright (c) 2010-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/file_fdw/file_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <sys/stat.h>
16 #include <unistd.h>
17 
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "access/table.h"
22 #include "catalog/pg_authid.h"
23 #include "catalog/pg_foreign_table.h"
24 #include "commands/copy.h"
25 #include "commands/defrem.h"
26 #include "commands/explain.h"
27 #include "commands/vacuum.h"
28 #include "foreign/fdwapi.h"
29 #include "foreign/foreign.h"
30 #include "miscadmin.h"
31 #include "nodes/makefuncs.h"
32 #include "optimizer/optimizer.h"
33 #include "optimizer/pathnode.h"
34 #include "optimizer/planmain.h"
35 #include "optimizer/restrictinfo.h"
36 #include "utils/acl.h"
37 #include "utils/memutils.h"
38 #include "utils/rel.h"
39 #include "utils/sampling.h"
40 
41 PG_MODULE_MAGIC;
42 
43 /*
44  * Describes the valid options for objects that use this wrapper.
45  */
46 struct FileFdwOption
47 {
48 	const char *optname;
49 	Oid			optcontext;		/* Oid of catalog in which option may appear */
50 };
51 
52 /*
53  * Valid options for file_fdw.
54  * These options are based on the options for the COPY FROM command.
55  * But note that force_not_null and force_null are handled as boolean options
56  * attached to a column, not as table options.
57  *
58  * Note: If you are adding new option for user mapping, you need to modify
59  * fileGetOptions(), which currently doesn't bother to look at user mappings.
60  */
61 static const struct FileFdwOption valid_options[] = {
62 	/* Data source options */
63 	{"filename", ForeignTableRelationId},
64 	{"program", ForeignTableRelationId},
65 
66 	/* Format options */
67 	/* oids option is not supported */
68 	{"format", ForeignTableRelationId},
69 	{"header", ForeignTableRelationId},
70 	{"delimiter", ForeignTableRelationId},
71 	{"quote", ForeignTableRelationId},
72 	{"escape", ForeignTableRelationId},
73 	{"null", ForeignTableRelationId},
74 	{"encoding", ForeignTableRelationId},
75 	{"force_not_null", AttributeRelationId},
76 	{"force_null", AttributeRelationId},
77 
78 	/*
79 	 * force_quote is not supported by file_fdw because it's for COPY TO.
80 	 */
81 
82 	/* Sentinel */
83 	{NULL, InvalidOid}
84 };
85 
86 /*
87  * FDW-specific information for RelOptInfo.fdw_private.
88  */
89 typedef struct FileFdwPlanState
90 {
91 	char	   *filename;		/* file or program to read from */
92 	bool		is_program;		/* true if filename represents an OS command */
93 	List	   *options;		/* merged COPY options, excluding filename and
94 								 * is_program */
95 	BlockNumber pages;			/* estimate of file's physical size */
96 	double		ntuples;		/* estimate of number of data rows */
97 } FileFdwPlanState;
98 
99 /*
100  * FDW-specific information for ForeignScanState.fdw_state.
101  */
102 typedef struct FileFdwExecutionState
103 {
104 	char	   *filename;		/* file or program to read from */
105 	bool		is_program;		/* true if filename represents an OS command */
106 	List	   *options;		/* merged COPY options, excluding filename and
107 								 * is_program */
108 	CopyState	cstate;			/* COPY execution state */
109 } FileFdwExecutionState;
110 
111 /*
112  * SQL functions
113  */
114 PG_FUNCTION_INFO_V1(file_fdw_handler);
115 PG_FUNCTION_INFO_V1(file_fdw_validator);
116 
117 /*
118  * FDW callback routines
119  */
120 static void fileGetForeignRelSize(PlannerInfo *root,
121 								  RelOptInfo *baserel,
122 								  Oid foreigntableid);
123 static void fileGetForeignPaths(PlannerInfo *root,
124 								RelOptInfo *baserel,
125 								Oid foreigntableid);
126 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
127 									   RelOptInfo *baserel,
128 									   Oid foreigntableid,
129 									   ForeignPath *best_path,
130 									   List *tlist,
131 									   List *scan_clauses,
132 									   Plan *outer_plan);
133 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
134 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
135 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
136 static void fileReScanForeignScan(ForeignScanState *node);
137 static void fileEndForeignScan(ForeignScanState *node);
138 static bool fileAnalyzeForeignTable(Relation relation,
139 									AcquireSampleRowsFunc *func,
140 									BlockNumber *totalpages);
141 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
142 										  RangeTblEntry *rte);
143 
144 /*
145  * Helper functions
146  */
147 static bool is_valid_option(const char *option, Oid context);
148 static void fileGetOptions(Oid foreigntableid,
149 						   char **filename,
150 						   bool *is_program,
151 						   List **other_options);
152 static List *get_file_fdw_attribute_options(Oid relid);
153 static bool check_selective_binary_conversion(RelOptInfo *baserel,
154 											  Oid foreigntableid,
155 											  List **columns);
156 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
157 						  FileFdwPlanState *fdw_private);
158 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
159 						   FileFdwPlanState *fdw_private,
160 						   Cost *startup_cost, Cost *total_cost);
161 static int	file_acquire_sample_rows(Relation onerel, int elevel,
162 									 HeapTuple *rows, int targrows,
163 									 double *totalrows, double *totaldeadrows);
164 
165 
166 /*
167  * Foreign-data wrapper handler function: return a struct with pointers
168  * to my callback routines.
169  */
170 Datum
file_fdw_handler(PG_FUNCTION_ARGS)171 file_fdw_handler(PG_FUNCTION_ARGS)
172 {
173 	FdwRoutine *fdwroutine = makeNode(FdwRoutine);
174 
175 	fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
176 	fdwroutine->GetForeignPaths = fileGetForeignPaths;
177 	fdwroutine->GetForeignPlan = fileGetForeignPlan;
178 	fdwroutine->ExplainForeignScan = fileExplainForeignScan;
179 	fdwroutine->BeginForeignScan = fileBeginForeignScan;
180 	fdwroutine->IterateForeignScan = fileIterateForeignScan;
181 	fdwroutine->ReScanForeignScan = fileReScanForeignScan;
182 	fdwroutine->EndForeignScan = fileEndForeignScan;
183 	fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
184 	fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
185 
186 	PG_RETURN_POINTER(fdwroutine);
187 }
188 
189 /*
190  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
191  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
192  *
193  * Raise an ERROR if the option or its value is considered invalid.
194  */
195 Datum
file_fdw_validator(PG_FUNCTION_ARGS)196 file_fdw_validator(PG_FUNCTION_ARGS)
197 {
198 	List	   *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
199 	Oid			catalog = PG_GETARG_OID(1);
200 	char	   *filename = NULL;
201 	DefElem    *force_not_null = NULL;
202 	DefElem    *force_null = NULL;
203 	List	   *other_options = NIL;
204 	ListCell   *cell;
205 
206 	/*
207 	 * Check that only options supported by file_fdw, and allowed for the
208 	 * current object type, are given.
209 	 */
210 	foreach(cell, options_list)
211 	{
212 		DefElem    *def = (DefElem *) lfirst(cell);
213 
214 		if (!is_valid_option(def->defname, catalog))
215 		{
216 			const struct FileFdwOption *opt;
217 			StringInfoData buf;
218 
219 			/*
220 			 * Unknown option specified, complain about it. Provide a hint
221 			 * with list of valid options for the object.
222 			 */
223 			initStringInfo(&buf);
224 			for (opt = valid_options; opt->optname; opt++)
225 			{
226 				if (catalog == opt->optcontext)
227 					appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
228 									 opt->optname);
229 			}
230 
231 			ereport(ERROR,
232 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
233 					 errmsg("invalid option \"%s\"", def->defname),
234 					 buf.len > 0
235 					 ? errhint("Valid options in this context are: %s",
236 							   buf.data)
237 					 : errhint("There are no valid options in this context.")));
238 		}
239 
240 		/*
241 		 * Separate out filename, program, and column-specific options, since
242 		 * ProcessCopyOptions won't accept them.
243 		 */
244 		if (strcmp(def->defname, "filename") == 0 ||
245 			strcmp(def->defname, "program") == 0)
246 		{
247 			if (filename)
248 				ereport(ERROR,
249 						(errcode(ERRCODE_SYNTAX_ERROR),
250 						 errmsg("conflicting or redundant options")));
251 
252 			/*
253 			 * Check permissions for changing which file or program is used by
254 			 * the file_fdw.
255 			 *
256 			 * Only members of the role 'pg_read_server_files' are allowed to
257 			 * set the 'filename' option of a file_fdw foreign table, while
258 			 * only members of the role 'pg_execute_server_program' are
259 			 * allowed to set the 'program' option.  This is because we don't
260 			 * want regular users to be able to control which file gets read
261 			 * or which program gets executed.
262 			 *
263 			 * Putting this sort of permissions check in a validator is a bit
264 			 * of a crock, but there doesn't seem to be any other place that
265 			 * can enforce the check more cleanly.
266 			 *
267 			 * Note that the valid_options[] array disallows setting filename
268 			 * and program at any options level other than foreign table ---
269 			 * otherwise there'd still be a security hole.
270 			 */
271 			if (strcmp(def->defname, "filename") == 0 &&
272 				!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
273 				ereport(ERROR,
274 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
275 						 errmsg("only superuser or a member of the pg_read_server_files role may specify the filename option of a file_fdw foreign table")));
276 
277 			if (strcmp(def->defname, "program") == 0 &&
278 				!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
279 				ereport(ERROR,
280 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
281 						 errmsg("only superuser or a member of the pg_execute_server_program role may specify the program option of a file_fdw foreign table")));
282 
283 			filename = defGetString(def);
284 		}
285 
286 		/*
287 		 * force_not_null is a boolean option; after validation we can discard
288 		 * it - it will be retrieved later in get_file_fdw_attribute_options()
289 		 */
290 		else if (strcmp(def->defname, "force_not_null") == 0)
291 		{
292 			if (force_not_null)
293 				ereport(ERROR,
294 						(errcode(ERRCODE_SYNTAX_ERROR),
295 						 errmsg("conflicting or redundant options"),
296 						 errhint("Option \"force_not_null\" supplied more than once for a column.")));
297 			force_not_null = def;
298 			/* Don't care what the value is, as long as it's a legal boolean */
299 			(void) defGetBoolean(def);
300 		}
301 		/* See comments for force_not_null above */
302 		else if (strcmp(def->defname, "force_null") == 0)
303 		{
304 			if (force_null)
305 				ereport(ERROR,
306 						(errcode(ERRCODE_SYNTAX_ERROR),
307 						 errmsg("conflicting or redundant options"),
308 						 errhint("Option \"force_null\" supplied more than once for a column.")));
309 			force_null = def;
310 			(void) defGetBoolean(def);
311 		}
312 		else
313 			other_options = lappend(other_options, def);
314 	}
315 
316 	/*
317 	 * Now apply the core COPY code's validation logic for more checks.
318 	 */
319 	ProcessCopyOptions(NULL, NULL, true, other_options);
320 
321 	/*
322 	 * Either filename or program option is required for file_fdw foreign
323 	 * tables.
324 	 */
325 	if (catalog == ForeignTableRelationId && filename == NULL)
326 		ereport(ERROR,
327 				(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
328 				 errmsg("either filename or program is required for file_fdw foreign tables")));
329 
330 	PG_RETURN_VOID();
331 }
332 
333 /*
334  * Check if the provided option is one of the valid options.
335  * context is the Oid of the catalog holding the object the option is for.
336  */
337 static bool
is_valid_option(const char * option,Oid context)338 is_valid_option(const char *option, Oid context)
339 {
340 	const struct FileFdwOption *opt;
341 
342 	for (opt = valid_options; opt->optname; opt++)
343 	{
344 		if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
345 			return true;
346 	}
347 	return false;
348 }
349 
350 /*
351  * Fetch the options for a file_fdw foreign table.
352  *
353  * We have to separate out filename/program from the other options because
354  * those must not appear in the options list passed to the core COPY code.
355  */
356 static void
fileGetOptions(Oid foreigntableid,char ** filename,bool * is_program,List ** other_options)357 fileGetOptions(Oid foreigntableid,
358 			   char **filename, bool *is_program, List **other_options)
359 {
360 	ForeignTable *table;
361 	ForeignServer *server;
362 	ForeignDataWrapper *wrapper;
363 	List	   *options;
364 	ListCell   *lc;
365 
366 	/*
367 	 * Extract options from FDW objects.  We ignore user mappings because
368 	 * file_fdw doesn't have any options that can be specified there.
369 	 *
370 	 * (XXX Actually, given the current contents of valid_options[], there's
371 	 * no point in examining anything except the foreign table's own options.
372 	 * Simplify?)
373 	 */
374 	table = GetForeignTable(foreigntableid);
375 	server = GetForeignServer(table->serverid);
376 	wrapper = GetForeignDataWrapper(server->fdwid);
377 
378 	options = NIL;
379 	options = list_concat(options, wrapper->options);
380 	options = list_concat(options, server->options);
381 	options = list_concat(options, table->options);
382 	options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
383 
384 	/*
385 	 * Separate out the filename or program option (we assume there is only
386 	 * one).
387 	 */
388 	*filename = NULL;
389 	*is_program = false;
390 	foreach(lc, options)
391 	{
392 		DefElem    *def = (DefElem *) lfirst(lc);
393 
394 		if (strcmp(def->defname, "filename") == 0)
395 		{
396 			*filename = defGetString(def);
397 			options = foreach_delete_current(options, lc);
398 			break;
399 		}
400 		else if (strcmp(def->defname, "program") == 0)
401 		{
402 			*filename = defGetString(def);
403 			*is_program = true;
404 			options = foreach_delete_current(options, lc);
405 			break;
406 		}
407 	}
408 
409 	/*
410 	 * The validator should have checked that filename or program was included
411 	 * in the options, but check again, just in case.
412 	 */
413 	if (*filename == NULL)
414 		elog(ERROR, "either filename or program is required for file_fdw foreign tables");
415 
416 	*other_options = options;
417 }
418 
419 /*
420  * Retrieve per-column generic options from pg_attribute and construct a list
421  * of DefElems representing them.
422  *
423  * At the moment we only have "force_not_null", and "force_null",
424  * which should each be combined into a single DefElem listing all such
425  * columns, since that's what COPY expects.
426  */
427 static List *
get_file_fdw_attribute_options(Oid relid)428 get_file_fdw_attribute_options(Oid relid)
429 {
430 	Relation	rel;
431 	TupleDesc	tupleDesc;
432 	AttrNumber	natts;
433 	AttrNumber	attnum;
434 	List	   *fnncolumns = NIL;
435 	List	   *fncolumns = NIL;
436 
437 	List	   *options = NIL;
438 
439 	rel = table_open(relid, AccessShareLock);
440 	tupleDesc = RelationGetDescr(rel);
441 	natts = tupleDesc->natts;
442 
443 	/* Retrieve FDW options for all user-defined attributes. */
444 	for (attnum = 1; attnum <= natts; attnum++)
445 	{
446 		Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
447 		List	   *options;
448 		ListCell   *lc;
449 
450 		/* Skip dropped attributes. */
451 		if (attr->attisdropped)
452 			continue;
453 
454 		options = GetForeignColumnOptions(relid, attnum);
455 		foreach(lc, options)
456 		{
457 			DefElem    *def = (DefElem *) lfirst(lc);
458 
459 			if (strcmp(def->defname, "force_not_null") == 0)
460 			{
461 				if (defGetBoolean(def))
462 				{
463 					char	   *attname = pstrdup(NameStr(attr->attname));
464 
465 					fnncolumns = lappend(fnncolumns, makeString(attname));
466 				}
467 			}
468 			else if (strcmp(def->defname, "force_null") == 0)
469 			{
470 				if (defGetBoolean(def))
471 				{
472 					char	   *attname = pstrdup(NameStr(attr->attname));
473 
474 					fncolumns = lappend(fncolumns, makeString(attname));
475 				}
476 			}
477 			/* maybe in future handle other options here */
478 		}
479 	}
480 
481 	table_close(rel, AccessShareLock);
482 
483 	/*
484 	 * Return DefElem only when some column(s) have force_not_null /
485 	 * force_null options set
486 	 */
487 	if (fnncolumns != NIL)
488 		options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
489 
490 	if (fncolumns != NIL)
491 		options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
492 
493 	return options;
494 }
495 
496 /*
497  * fileGetForeignRelSize
498  *		Obtain relation size estimates for a foreign table
499  */
500 static void
fileGetForeignRelSize(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)501 fileGetForeignRelSize(PlannerInfo *root,
502 					  RelOptInfo *baserel,
503 					  Oid foreigntableid)
504 {
505 	FileFdwPlanState *fdw_private;
506 
507 	/*
508 	 * Fetch options.  We only need filename (or program) at this point, but
509 	 * we might as well get everything and not need to re-fetch it later in
510 	 * planning.
511 	 */
512 	fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
513 	fileGetOptions(foreigntableid,
514 				   &fdw_private->filename,
515 				   &fdw_private->is_program,
516 				   &fdw_private->options);
517 	baserel->fdw_private = (void *) fdw_private;
518 
519 	/* Estimate relation size */
520 	estimate_size(root, baserel, fdw_private);
521 }
522 
523 /*
524  * fileGetForeignPaths
525  *		Create possible access paths for a scan on the foreign table
526  *
527  *		Currently we don't support any push-down feature, so there is only one
528  *		possible access path, which simply returns all records in the order in
529  *		the data file.
530  */
531 static void
fileGetForeignPaths(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid)532 fileGetForeignPaths(PlannerInfo *root,
533 					RelOptInfo *baserel,
534 					Oid foreigntableid)
535 {
536 	FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
537 	Cost		startup_cost;
538 	Cost		total_cost;
539 	List	   *columns;
540 	List	   *coptions = NIL;
541 
542 	/* Decide whether to selectively perform binary conversion */
543 	if (check_selective_binary_conversion(baserel,
544 										  foreigntableid,
545 										  &columns))
546 		coptions = list_make1(makeDefElem("convert_selectively",
547 										  (Node *) columns, -1));
548 
549 	/* Estimate costs */
550 	estimate_costs(root, baserel, fdw_private,
551 				   &startup_cost, &total_cost);
552 
553 	/*
554 	 * Create a ForeignPath node and add it as only possible path.  We use the
555 	 * fdw_private list of the path to carry the convert_selectively option;
556 	 * it will be propagated into the fdw_private list of the Plan node.
557 	 *
558 	 * We don't support pushing join clauses into the quals of this path, but
559 	 * it could still have required parameterization due to LATERAL refs in
560 	 * its tlist.
561 	 */
562 	add_path(baserel, (Path *)
563 			 create_foreignscan_path(root, baserel,
564 									 NULL,	/* default pathtarget */
565 									 baserel->rows,
566 									 startup_cost,
567 									 total_cost,
568 									 NIL,	/* no pathkeys */
569 									 baserel->lateral_relids,
570 									 NULL,	/* no extra plan */
571 									 coptions));
572 
573 	/*
574 	 * If data file was sorted, and we knew it somehow, we could insert
575 	 * appropriate pathkeys into the ForeignPath node to tell the planner
576 	 * that.
577 	 */
578 }
579 
580 /*
581  * fileGetForeignPlan
582  *		Create a ForeignScan plan node for scanning the foreign table
583  */
584 static ForeignScan *
fileGetForeignPlan(PlannerInfo * root,RelOptInfo * baserel,Oid foreigntableid,ForeignPath * best_path,List * tlist,List * scan_clauses,Plan * outer_plan)585 fileGetForeignPlan(PlannerInfo *root,
586 				   RelOptInfo *baserel,
587 				   Oid foreigntableid,
588 				   ForeignPath *best_path,
589 				   List *tlist,
590 				   List *scan_clauses,
591 				   Plan *outer_plan)
592 {
593 	Index		scan_relid = baserel->relid;
594 
595 	/*
596 	 * We have no native ability to evaluate restriction clauses, so we just
597 	 * put all the scan_clauses into the plan node's qual list for the
598 	 * executor to check.  So all we have to do here is strip RestrictInfo
599 	 * nodes from the clauses and ignore pseudoconstants (which will be
600 	 * handled elsewhere).
601 	 */
602 	scan_clauses = extract_actual_clauses(scan_clauses, false);
603 
604 	/* Create the ForeignScan node */
605 	return make_foreignscan(tlist,
606 							scan_clauses,
607 							scan_relid,
608 							NIL,	/* no expressions to evaluate */
609 							best_path->fdw_private,
610 							NIL,	/* no custom tlist */
611 							NIL,	/* no remote quals */
612 							outer_plan);
613 }
614 
615 /*
616  * fileExplainForeignScan
617  *		Produce extra output for EXPLAIN
618  */
619 static void
fileExplainForeignScan(ForeignScanState * node,ExplainState * es)620 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
621 {
622 	char	   *filename;
623 	bool		is_program;
624 	List	   *options;
625 
626 	/* Fetch options --- we only need filename and is_program at this point */
627 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
628 				   &filename, &is_program, &options);
629 
630 	if (is_program)
631 		ExplainPropertyText("Foreign Program", filename, es);
632 	else
633 		ExplainPropertyText("Foreign File", filename, es);
634 
635 	/* Suppress file size if we're not showing cost details */
636 	if (es->costs)
637 	{
638 		struct stat stat_buf;
639 
640 		if (!is_program &&
641 			stat(filename, &stat_buf) == 0)
642 			ExplainPropertyInteger("Foreign File Size", "b",
643 								   (int64) stat_buf.st_size, es);
644 	}
645 }
646 
647 /*
648  * fileBeginForeignScan
649  *		Initiate access to the file by creating CopyState
650  */
651 static void
fileBeginForeignScan(ForeignScanState * node,int eflags)652 fileBeginForeignScan(ForeignScanState *node, int eflags)
653 {
654 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
655 	char	   *filename;
656 	bool		is_program;
657 	List	   *options;
658 	CopyState	cstate;
659 	FileFdwExecutionState *festate;
660 
661 	/*
662 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
663 	 */
664 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
665 		return;
666 
667 	/* Fetch options of foreign table */
668 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
669 				   &filename, &is_program, &options);
670 
671 	/* Add any options from the plan (currently only convert_selectively) */
672 	options = list_concat(options, plan->fdw_private);
673 
674 	/*
675 	 * Create CopyState from FDW options.  We always acquire all columns, so
676 	 * as to match the expected ScanTupleSlot signature.
677 	 */
678 	cstate = BeginCopyFrom(NULL,
679 						   node->ss.ss_currentRelation,
680 						   filename,
681 						   is_program,
682 						   NULL,
683 						   NIL,
684 						   options);
685 
686 	/*
687 	 * Save state in node->fdw_state.  We must save enough information to call
688 	 * BeginCopyFrom() again.
689 	 */
690 	festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
691 	festate->filename = filename;
692 	festate->is_program = is_program;
693 	festate->options = options;
694 	festate->cstate = cstate;
695 
696 	node->fdw_state = (void *) festate;
697 }
698 
699 /*
700  * fileIterateForeignScan
701  *		Read next record from the data file and store it into the
702  *		ScanTupleSlot as a virtual tuple
703  */
704 static TupleTableSlot *
fileIterateForeignScan(ForeignScanState * node)705 fileIterateForeignScan(ForeignScanState *node)
706 {
707 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
708 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
709 	bool		found;
710 	ErrorContextCallback errcallback;
711 
712 	/* Set up callback to identify error line number. */
713 	errcallback.callback = CopyFromErrorCallback;
714 	errcallback.arg = (void *) festate->cstate;
715 	errcallback.previous = error_context_stack;
716 	error_context_stack = &errcallback;
717 
718 	/*
719 	 * The protocol for loading a virtual tuple into a slot is first
720 	 * ExecClearTuple, then fill the values/isnull arrays, then
721 	 * ExecStoreVirtualTuple.  If we don't find another row in the file, we
722 	 * just skip the last step, leaving the slot empty as required.
723 	 *
724 	 * We can pass ExprContext = NULL because we read all columns from the
725 	 * file, so no need to evaluate default expressions.
726 	 */
727 	ExecClearTuple(slot);
728 	found = NextCopyFrom(festate->cstate, NULL,
729 						 slot->tts_values, slot->tts_isnull);
730 	if (found)
731 		ExecStoreVirtualTuple(slot);
732 
733 	/* Remove error callback. */
734 	error_context_stack = errcallback.previous;
735 
736 	return slot;
737 }
738 
739 /*
740  * fileReScanForeignScan
741  *		Rescan table, possibly with new parameters
742  */
743 static void
fileReScanForeignScan(ForeignScanState * node)744 fileReScanForeignScan(ForeignScanState *node)
745 {
746 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
747 
748 	EndCopyFrom(festate->cstate);
749 
750 	festate->cstate = BeginCopyFrom(NULL,
751 									node->ss.ss_currentRelation,
752 									festate->filename,
753 									festate->is_program,
754 									NULL,
755 									NIL,
756 									festate->options);
757 }
758 
759 /*
760  * fileEndForeignScan
761  *		Finish scanning foreign table and dispose objects used for this scan
762  */
763 static void
fileEndForeignScan(ForeignScanState * node)764 fileEndForeignScan(ForeignScanState *node)
765 {
766 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
767 
768 	/* if festate is NULL, we are in EXPLAIN; nothing to do */
769 	if (festate)
770 		EndCopyFrom(festate->cstate);
771 }
772 
773 /*
774  * fileAnalyzeForeignTable
775  *		Test whether analyzing this foreign table is supported
776  */
777 static bool
fileAnalyzeForeignTable(Relation relation,AcquireSampleRowsFunc * func,BlockNumber * totalpages)778 fileAnalyzeForeignTable(Relation relation,
779 						AcquireSampleRowsFunc *func,
780 						BlockNumber *totalpages)
781 {
782 	char	   *filename;
783 	bool		is_program;
784 	List	   *options;
785 	struct stat stat_buf;
786 
787 	/* Fetch options of foreign table */
788 	fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
789 
790 	/*
791 	 * If this is a program instead of a file, just return false to skip
792 	 * analyzing the table.  We could run the program and collect stats on
793 	 * whatever it currently returns, but it seems likely that in such cases
794 	 * the output would be too volatile for the stats to be useful.  Maybe
795 	 * there should be an option to enable doing this?
796 	 */
797 	if (is_program)
798 		return false;
799 
800 	/*
801 	 * Get size of the file.  (XXX if we fail here, would it be better to just
802 	 * return false to skip analyzing the table?)
803 	 */
804 	if (stat(filename, &stat_buf) < 0)
805 		ereport(ERROR,
806 				(errcode_for_file_access(),
807 				 errmsg("could not stat file \"%s\": %m",
808 						filename)));
809 
810 	/*
811 	 * Convert size to pages.  Must return at least 1 so that we can tell
812 	 * later on that pg_class.relpages is not default.
813 	 */
814 	*totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
815 	if (*totalpages < 1)
816 		*totalpages = 1;
817 
818 	*func = file_acquire_sample_rows;
819 
820 	return true;
821 }
822 
823 /*
824  * fileIsForeignScanParallelSafe
825  *		Reading a file, or external program, in a parallel worker should work
826  *		just the same as reading it in the leader, so mark scans safe.
827  */
828 static bool
fileIsForeignScanParallelSafe(PlannerInfo * root,RelOptInfo * rel,RangeTblEntry * rte)829 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
830 							  RangeTblEntry *rte)
831 {
832 	return true;
833 }
834 
835 /*
836  * check_selective_binary_conversion
837  *
838  * Check to see if it's useful to convert only a subset of the file's columns
839  * to binary.  If so, construct a list of the column names to be converted,
840  * return that at *columns, and return true.  (Note that it's possible to
841  * determine that no columns need be converted, for instance with a COUNT(*)
842  * query.  So we can't use returning a NIL list to indicate failure.)
843  */
844 static bool
check_selective_binary_conversion(RelOptInfo * baserel,Oid foreigntableid,List ** columns)845 check_selective_binary_conversion(RelOptInfo *baserel,
846 								  Oid foreigntableid,
847 								  List **columns)
848 {
849 	ForeignTable *table;
850 	ListCell   *lc;
851 	Relation	rel;
852 	TupleDesc	tupleDesc;
853 	AttrNumber	attnum;
854 	Bitmapset  *attrs_used = NULL;
855 	bool		has_wholerow = false;
856 	int			numattrs;
857 	int			i;
858 
859 	*columns = NIL;				/* default result */
860 
861 	/*
862 	 * Check format of the file.  If binary format, this is irrelevant.
863 	 */
864 	table = GetForeignTable(foreigntableid);
865 	foreach(lc, table->options)
866 	{
867 		DefElem    *def = (DefElem *) lfirst(lc);
868 
869 		if (strcmp(def->defname, "format") == 0)
870 		{
871 			char	   *format = defGetString(def);
872 
873 			if (strcmp(format, "binary") == 0)
874 				return false;
875 			break;
876 		}
877 	}
878 
879 	/* Collect all the attributes needed for joins or final output. */
880 	pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
881 				   &attrs_used);
882 
883 	/* Add all the attributes used by restriction clauses. */
884 	foreach(lc, baserel->baserestrictinfo)
885 	{
886 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
887 
888 		pull_varattnos((Node *) rinfo->clause, baserel->relid,
889 					   &attrs_used);
890 	}
891 
892 	/* Convert attribute numbers to column names. */
893 	rel = table_open(foreigntableid, AccessShareLock);
894 	tupleDesc = RelationGetDescr(rel);
895 
896 	while ((attnum = bms_first_member(attrs_used)) >= 0)
897 	{
898 		/* Adjust for system attributes. */
899 		attnum += FirstLowInvalidHeapAttributeNumber;
900 
901 		if (attnum == 0)
902 		{
903 			has_wholerow = true;
904 			break;
905 		}
906 
907 		/* Ignore system attributes. */
908 		if (attnum < 0)
909 			continue;
910 
911 		/* Get user attributes. */
912 		if (attnum > 0)
913 		{
914 			Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
915 			char	   *attname = NameStr(attr->attname);
916 
917 			/* Skip dropped attributes (probably shouldn't see any here). */
918 			if (attr->attisdropped)
919 				continue;
920 
921 			/*
922 			 * Skip generated columns (COPY won't accept them in the column
923 			 * list)
924 			 */
925 			if (attr->attgenerated)
926 				continue;
927 			*columns = lappend(*columns, makeString(pstrdup(attname)));
928 		}
929 	}
930 
931 	/* Count non-dropped user attributes while we have the tupdesc. */
932 	numattrs = 0;
933 	for (i = 0; i < tupleDesc->natts; i++)
934 	{
935 		Form_pg_attribute attr = TupleDescAttr(tupleDesc, i);
936 
937 		if (attr->attisdropped)
938 			continue;
939 		numattrs++;
940 	}
941 
942 	table_close(rel, AccessShareLock);
943 
944 	/* If there's a whole-row reference, fail: we need all the columns. */
945 	if (has_wholerow)
946 	{
947 		*columns = NIL;
948 		return false;
949 	}
950 
951 	/* If all the user attributes are needed, fail. */
952 	if (numattrs == list_length(*columns))
953 	{
954 		*columns = NIL;
955 		return false;
956 	}
957 
958 	return true;
959 }
960 
961 /*
962  * Estimate size of a foreign table.
963  *
964  * The main result is returned in baserel->rows.  We also set
965  * fdw_private->pages and fdw_private->ntuples for later use in the cost
966  * calculation.
967  */
968 static void
estimate_size(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private)969 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
970 			  FileFdwPlanState *fdw_private)
971 {
972 	struct stat stat_buf;
973 	BlockNumber pages;
974 	double		ntuples;
975 	double		nrows;
976 
977 	/*
978 	 * Get size of the file.  It might not be there at plan time, though, in
979 	 * which case we have to use a default estimate.  We also have to fall
980 	 * back to the default if using a program as the input.
981 	 */
982 	if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
983 		stat_buf.st_size = 10 * BLCKSZ;
984 
985 	/*
986 	 * Convert size to pages for use in I/O cost estimate later.
987 	 */
988 	pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
989 	if (pages < 1)
990 		pages = 1;
991 	fdw_private->pages = pages;
992 
993 	/*
994 	 * Estimate the number of tuples in the file.
995 	 */
996 	if (baserel->pages > 0)
997 	{
998 		/*
999 		 * We have # of pages and # of tuples from pg_class (that is, from a
1000 		 * previous ANALYZE), so compute a tuples-per-page estimate and scale
1001 		 * that by the current file size.
1002 		 */
1003 		double		density;
1004 
1005 		density = baserel->tuples / (double) baserel->pages;
1006 		ntuples = clamp_row_est(density * (double) pages);
1007 	}
1008 	else
1009 	{
1010 		/*
1011 		 * Otherwise we have to fake it.  We back into this estimate using the
1012 		 * planner's idea of the relation width; which is bogus if not all
1013 		 * columns are being read, not to mention that the text representation
1014 		 * of a row probably isn't the same size as its internal
1015 		 * representation.  Possibly we could do something better, but the
1016 		 * real answer to anyone who complains is "ANALYZE" ...
1017 		 */
1018 		int			tuple_width;
1019 
1020 		tuple_width = MAXALIGN(baserel->reltarget->width) +
1021 			MAXALIGN(SizeofHeapTupleHeader);
1022 		ntuples = clamp_row_est((double) stat_buf.st_size /
1023 								(double) tuple_width);
1024 	}
1025 	fdw_private->ntuples = ntuples;
1026 
1027 	/*
1028 	 * Now estimate the number of rows returned by the scan after applying the
1029 	 * baserestrictinfo quals.
1030 	 */
1031 	nrows = ntuples *
1032 		clauselist_selectivity(root,
1033 							   baserel->baserestrictinfo,
1034 							   0,
1035 							   JOIN_INNER,
1036 							   NULL);
1037 
1038 	nrows = clamp_row_est(nrows);
1039 
1040 	/* Save the output-rows estimate for the planner */
1041 	baserel->rows = nrows;
1042 }
1043 
1044 /*
1045  * Estimate costs of scanning a foreign table.
1046  *
1047  * Results are returned in *startup_cost and *total_cost.
1048  */
1049 static void
estimate_costs(PlannerInfo * root,RelOptInfo * baserel,FileFdwPlanState * fdw_private,Cost * startup_cost,Cost * total_cost)1050 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1051 			   FileFdwPlanState *fdw_private,
1052 			   Cost *startup_cost, Cost *total_cost)
1053 {
1054 	BlockNumber pages = fdw_private->pages;
1055 	double		ntuples = fdw_private->ntuples;
1056 	Cost		run_cost = 0;
1057 	Cost		cpu_per_tuple;
1058 
1059 	/*
1060 	 * We estimate costs almost the same way as cost_seqscan(), thus assuming
1061 	 * that I/O costs are equivalent to a regular table file of the same size.
1062 	 * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1063 	 * for the cost of parsing records.
1064 	 *
1065 	 * In the case of a program source, this calculation is even more divorced
1066 	 * from reality, but we have no good alternative; and it's not clear that
1067 	 * the numbers we produce here matter much anyway, since there's only one
1068 	 * access path for the rel.
1069 	 */
1070 	run_cost += seq_page_cost * pages;
1071 
1072 	*startup_cost = baserel->baserestrictcost.startup;
1073 	cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1074 	run_cost += cpu_per_tuple * ntuples;
1075 	*total_cost = *startup_cost + run_cost;
1076 }
1077 
1078 /*
1079  * file_acquire_sample_rows -- acquire a random sample of rows from the table
1080  *
1081  * Selected rows are returned in the caller-allocated array rows[],
1082  * which must have at least targrows entries.
1083  * The actual number of rows selected is returned as the function result.
1084  * We also count the total number of rows in the file and return it into
1085  * *totalrows.  Note that *totaldeadrows is always set to 0.
1086  *
1087  * Note that the returned list of rows is not always in order by physical
1088  * position in the file.  Therefore, correlation estimates derived later
1089  * may be meaningless, but it's OK because we don't use the estimates
1090  * currently (the planner only pays attention to correlation for indexscans).
1091  */
1092 static int
file_acquire_sample_rows(Relation onerel,int elevel,HeapTuple * rows,int targrows,double * totalrows,double * totaldeadrows)1093 file_acquire_sample_rows(Relation onerel, int elevel,
1094 						 HeapTuple *rows, int targrows,
1095 						 double *totalrows, double *totaldeadrows)
1096 {
1097 	int			numrows = 0;
1098 	double		rowstoskip = -1;	/* -1 means not set yet */
1099 	ReservoirStateData rstate;
1100 	TupleDesc	tupDesc;
1101 	Datum	   *values;
1102 	bool	   *nulls;
1103 	bool		found;
1104 	char	   *filename;
1105 	bool		is_program;
1106 	List	   *options;
1107 	CopyState	cstate;
1108 	ErrorContextCallback errcallback;
1109 	MemoryContext oldcontext = CurrentMemoryContext;
1110 	MemoryContext tupcontext;
1111 
1112 	Assert(onerel);
1113 	Assert(targrows > 0);
1114 
1115 	tupDesc = RelationGetDescr(onerel);
1116 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1117 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1118 
1119 	/* Fetch options of foreign table */
1120 	fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1121 
1122 	/*
1123 	 * Create CopyState from FDW options.
1124 	 */
1125 	cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
1126 						   options);
1127 
1128 	/*
1129 	 * Use per-tuple memory context to prevent leak of memory used to read
1130 	 * rows from the file with Copy routines.
1131 	 */
1132 	tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1133 									   "file_fdw temporary context",
1134 									   ALLOCSET_DEFAULT_SIZES);
1135 
1136 	/* Prepare for sampling rows */
1137 	reservoir_init_selection_state(&rstate, targrows);
1138 
1139 	/* Set up callback to identify error line number. */
1140 	errcallback.callback = CopyFromErrorCallback;
1141 	errcallback.arg = (void *) cstate;
1142 	errcallback.previous = error_context_stack;
1143 	error_context_stack = &errcallback;
1144 
1145 	*totalrows = 0;
1146 	*totaldeadrows = 0;
1147 	for (;;)
1148 	{
1149 		/* Check for user-requested abort or sleep */
1150 		vacuum_delay_point();
1151 
1152 		/* Fetch next row */
1153 		MemoryContextReset(tupcontext);
1154 		MemoryContextSwitchTo(tupcontext);
1155 
1156 		found = NextCopyFrom(cstate, NULL, values, nulls);
1157 
1158 		MemoryContextSwitchTo(oldcontext);
1159 
1160 		if (!found)
1161 			break;
1162 
1163 		/*
1164 		 * The first targrows sample rows are simply copied into the
1165 		 * reservoir.  Then we start replacing tuples in the sample until we
1166 		 * reach the end of the relation. This algorithm is from Jeff Vitter's
1167 		 * paper (see more info in commands/analyze.c).
1168 		 */
1169 		if (numrows < targrows)
1170 		{
1171 			rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1172 		}
1173 		else
1174 		{
1175 			/*
1176 			 * t in Vitter's paper is the number of records already processed.
1177 			 * If we need to compute a new S value, we must use the
1178 			 * not-yet-incremented value of totalrows as t.
1179 			 */
1180 			if (rowstoskip < 0)
1181 				rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1182 
1183 			if (rowstoskip <= 0)
1184 			{
1185 				/*
1186 				 * Found a suitable tuple, so save it, replacing one old tuple
1187 				 * at random
1188 				 */
1189 				int			k = (int) (targrows * sampler_random_fract(rstate.randstate));
1190 
1191 				Assert(k >= 0 && k < targrows);
1192 				heap_freetuple(rows[k]);
1193 				rows[k] = heap_form_tuple(tupDesc, values, nulls);
1194 			}
1195 
1196 			rowstoskip -= 1;
1197 		}
1198 
1199 		*totalrows += 1;
1200 	}
1201 
1202 	/* Remove error callback. */
1203 	error_context_stack = errcallback.previous;
1204 
1205 	/* Clean up. */
1206 	MemoryContextDelete(tupcontext);
1207 
1208 	EndCopyFrom(cstate);
1209 
1210 	pfree(values);
1211 	pfree(nulls);
1212 
1213 	/*
1214 	 * Emit some interesting relation info
1215 	 */
1216 	ereport(elevel,
1217 			(errmsg("\"%s\": file contains %.0f rows; "
1218 					"%d rows in sample",
1219 					RelationGetRelationName(onerel),
1220 					*totalrows, numrows)));
1221 
1222 	return numrows;
1223 }
1224