1 /*-------------------------------------------------------------------------
2  *
3  * nodeForeignscan.c
4  *	  Routines to support scans of foreign tables
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeForeignscan.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *
18  *		ExecForeignScan			scans a foreign table.
19  *		ExecInitForeignScan		creates and initializes state info.
20  *		ExecReScanForeignScan	rescans the foreign relation.
21  *		ExecEndForeignScan		releases any resources allocated.
22  */
23 #include "postgres.h"
24 
25 #include "executor/executor.h"
26 #include "executor/nodeForeignscan.h"
27 #include "foreign/fdwapi.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 
31 static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33 
34 
35 /* ----------------------------------------------------------------
36  *		ForeignNext
37  *
38  *		This is a workhorse for ExecForeignScan
39  * ----------------------------------------------------------------
40  */
41 static TupleTableSlot *
ForeignNext(ForeignScanState * node)42 ForeignNext(ForeignScanState *node)
43 {
44 	TupleTableSlot *slot;
45 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 	MemoryContext oldcontext;
48 
49 	/* Call the Iterate function in short-lived context */
50 	oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
51 	if (plan->operation != CMD_SELECT)
52 		slot = node->fdwroutine->IterateDirectModify(node);
53 	else
54 		slot = node->fdwroutine->IterateForeignScan(node);
55 	MemoryContextSwitchTo(oldcontext);
56 
57 	/*
58 	 * If any system columns are requested, we have to force the tuple into
59 	 * physical-tuple form to avoid "cannot extract system attribute from
60 	 * virtual tuple" errors later.  We also insert a valid value for
61 	 * tableoid, which is the only actually-useful system column.
62 	 */
63 	if (plan->fsSystemCol && !TupIsNull(slot))
64 	{
65 		HeapTuple	tup = ExecMaterializeSlot(slot);
66 
67 		tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
68 	}
69 
70 	return slot;
71 }
72 
73 /*
74  * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
75  */
76 static bool
ForeignRecheck(ForeignScanState * node,TupleTableSlot * slot)77 ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
78 {
79 	FdwRoutine *fdwroutine = node->fdwroutine;
80 	ExprContext *econtext;
81 
82 	/*
83 	 * extract necessary information from foreign scan node
84 	 */
85 	econtext = node->ss.ps.ps_ExprContext;
86 
87 	/* Does the tuple meet the remote qual condition? */
88 	econtext->ecxt_scantuple = slot;
89 
90 	ResetExprContext(econtext);
91 
92 	/*
93 	 * If an outer join is pushed down, RecheckForeignScan may need to store a
94 	 * different tuple in the slot, because a different set of columns may go
95 	 * to NULL upon recheck.  Otherwise, it shouldn't need to change the slot
96 	 * contents, just return true or false to indicate whether the quals still
97 	 * pass.  For simple cases, setting fdw_recheck_quals may be easier than
98 	 * providing this callback.
99 	 */
100 	if (fdwroutine->RecheckForeignScan &&
101 		!fdwroutine->RecheckForeignScan(node, slot))
102 		return false;
103 
104 	return ExecQual(node->fdw_recheck_quals, econtext);
105 }
106 
107 /* ----------------------------------------------------------------
108  *		ExecForeignScan(node)
109  *
110  *		Fetches the next tuple from the FDW, checks local quals, and
111  *		returns it.
112  *		We call the ExecScan() routine and pass it the appropriate
113  *		access method functions.
114  * ----------------------------------------------------------------
115  */
116 static TupleTableSlot *
ExecForeignScan(PlanState * pstate)117 ExecForeignScan(PlanState *pstate)
118 {
119 	ForeignScanState *node = castNode(ForeignScanState, pstate);
120 
121 	return ExecScan(&node->ss,
122 					(ExecScanAccessMtd) ForeignNext,
123 					(ExecScanRecheckMtd) ForeignRecheck);
124 }
125 
126 
127 /* ----------------------------------------------------------------
128  *		ExecInitForeignScan
129  * ----------------------------------------------------------------
130  */
131 ForeignScanState *
ExecInitForeignScan(ForeignScan * node,EState * estate,int eflags)132 ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
133 {
134 	ForeignScanState *scanstate;
135 	Relation	currentRelation = NULL;
136 	Index		scanrelid = node->scan.scanrelid;
137 	Index		tlistvarno;
138 	FdwRoutine *fdwroutine;
139 
140 	/* check for unsupported flags */
141 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
142 
143 	/*
144 	 * create state structure
145 	 */
146 	scanstate = makeNode(ForeignScanState);
147 	scanstate->ss.ps.plan = (Plan *) node;
148 	scanstate->ss.ps.state = estate;
149 	scanstate->ss.ps.ExecProcNode = ExecForeignScan;
150 
151 	/*
152 	 * Miscellaneous initialization
153 	 *
154 	 * create expression context for node
155 	 */
156 	ExecAssignExprContext(estate, &scanstate->ss.ps);
157 
158 	/*
159 	 * open the base relation, if any, and acquire an appropriate lock on it;
160 	 * also acquire function pointers from the FDW's handler
161 	 */
162 	if (scanrelid > 0)
163 	{
164 		currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
165 		scanstate->ss.ss_currentRelation = currentRelation;
166 		fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
167 	}
168 	else
169 	{
170 		/* We can't use the relcache, so get fdwroutine the hard way */
171 		fdwroutine = GetFdwRoutineByServerId(node->fs_server);
172 	}
173 
174 	/*
175 	 * Determine the scan tuple type.  If the FDW provided a targetlist
176 	 * describing the scan tuples, use that; else use base relation's rowtype.
177 	 */
178 	if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
179 	{
180 		TupleDesc	scan_tupdesc;
181 
182 		scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist, false);
183 		ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc);
184 		/* Node's targetlist will contain Vars with varno = INDEX_VAR */
185 		tlistvarno = INDEX_VAR;
186 	}
187 	else
188 	{
189 		TupleDesc	scan_tupdesc;
190 
191 		/* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
192 		scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
193 		ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc);
194 		/* Node's targetlist will contain Vars with varno = scanrelid */
195 		tlistvarno = scanrelid;
196 	}
197 
198 	/*
199 	 * Initialize result slot, type and projection.
200 	 */
201 	ExecInitResultTupleSlotTL(estate, &scanstate->ss.ps);
202 	ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
203 
204 	/*
205 	 * initialize child expressions
206 	 */
207 	scanstate->ss.ps.qual =
208 		ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
209 	scanstate->fdw_recheck_quals =
210 		ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
211 
212 	/*
213 	 * Initialize FDW-related state.
214 	 */
215 	scanstate->fdwroutine = fdwroutine;
216 	scanstate->fdw_state = NULL;
217 
218 	/* Initialize any outer plan. */
219 	if (outerPlan(node))
220 		outerPlanState(scanstate) =
221 			ExecInitNode(outerPlan(node), estate, eflags);
222 
223 	/*
224 	 * Tell the FDW to initialize the scan.
225 	 */
226 	if (node->operation != CMD_SELECT)
227 		fdwroutine->BeginDirectModify(scanstate, eflags);
228 	else
229 		fdwroutine->BeginForeignScan(scanstate, eflags);
230 
231 	return scanstate;
232 }
233 
234 /* ----------------------------------------------------------------
235  *		ExecEndForeignScan
236  *
237  *		frees any storage allocated through C routines.
238  * ----------------------------------------------------------------
239  */
240 void
ExecEndForeignScan(ForeignScanState * node)241 ExecEndForeignScan(ForeignScanState *node)
242 {
243 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
244 
245 	/* Let the FDW shut down */
246 	if (plan->operation != CMD_SELECT)
247 		node->fdwroutine->EndDirectModify(node);
248 	else
249 		node->fdwroutine->EndForeignScan(node);
250 
251 	/* Shut down any outer plan. */
252 	if (outerPlanState(node))
253 		ExecEndNode(outerPlanState(node));
254 
255 	/* Free the exprcontext */
256 	ExecFreeExprContext(&node->ss.ps);
257 
258 	/* clean out the tuple table */
259 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
260 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
261 
262 	/* close the relation. */
263 	if (node->ss.ss_currentRelation)
264 		ExecCloseScanRelation(node->ss.ss_currentRelation);
265 }
266 
267 /* ----------------------------------------------------------------
268  *		ExecReScanForeignScan
269  *
270  *		Rescans the relation.
271  * ----------------------------------------------------------------
272  */
273 void
ExecReScanForeignScan(ForeignScanState * node)274 ExecReScanForeignScan(ForeignScanState *node)
275 {
276 	PlanState  *outerPlan = outerPlanState(node);
277 
278 	node->fdwroutine->ReScanForeignScan(node);
279 
280 	/*
281 	 * If chgParam of subnode is not null then plan will be re-scanned by
282 	 * first ExecProcNode.  outerPlan may also be NULL, in which case there is
283 	 * nothing to rescan at all.
284 	 */
285 	if (outerPlan != NULL && outerPlan->chgParam == NULL)
286 		ExecReScan(outerPlan);
287 
288 	ExecScanReScan(&node->ss);
289 }
290 
291 /* ----------------------------------------------------------------
292  *		ExecForeignScanEstimate
293  *
294  *		Informs size of the parallel coordination information, if any
295  * ----------------------------------------------------------------
296  */
297 void
ExecForeignScanEstimate(ForeignScanState * node,ParallelContext * pcxt)298 ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
299 {
300 	FdwRoutine *fdwroutine = node->fdwroutine;
301 
302 	if (fdwroutine->EstimateDSMForeignScan)
303 	{
304 		node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
305 		shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
306 		shm_toc_estimate_keys(&pcxt->estimator, 1);
307 	}
308 }
309 
310 /* ----------------------------------------------------------------
311  *		ExecForeignScanInitializeDSM
312  *
313  *		Initialize the parallel coordination information
314  * ----------------------------------------------------------------
315  */
316 void
ExecForeignScanInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)317 ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
318 {
319 	FdwRoutine *fdwroutine = node->fdwroutine;
320 
321 	if (fdwroutine->InitializeDSMForeignScan)
322 	{
323 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
324 		void	   *coordinate;
325 
326 		coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
327 		fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
328 		shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
329 	}
330 }
331 
332 /* ----------------------------------------------------------------
333  *		ExecForeignScanReInitializeDSM
334  *
335  *		Reset shared state before beginning a fresh scan.
336  * ----------------------------------------------------------------
337  */
338 void
ExecForeignScanReInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)339 ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
340 {
341 	FdwRoutine *fdwroutine = node->fdwroutine;
342 
343 	if (fdwroutine->ReInitializeDSMForeignScan)
344 	{
345 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
346 		void	   *coordinate;
347 
348 		coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
349 		fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
350 	}
351 }
352 
353 /* ----------------------------------------------------------------
354  *		ExecForeignScanInitializeWorker
355  *
356  *		Initialization according to the parallel coordination information
357  * ----------------------------------------------------------------
358  */
359 void
ExecForeignScanInitializeWorker(ForeignScanState * node,ParallelWorkerContext * pwcxt)360 ExecForeignScanInitializeWorker(ForeignScanState *node,
361 								ParallelWorkerContext *pwcxt)
362 {
363 	FdwRoutine *fdwroutine = node->fdwroutine;
364 
365 	if (fdwroutine->InitializeWorkerForeignScan)
366 	{
367 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
368 		void	   *coordinate;
369 
370 		coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
371 		fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
372 	}
373 }
374 
375 /* ----------------------------------------------------------------
376  *		ExecShutdownForeignScan
377  *
378  *		Gives FDW chance to stop asynchronous resource consumption
379  *		and release any resources still held.
380  * ----------------------------------------------------------------
381  */
382 void
ExecShutdownForeignScan(ForeignScanState * node)383 ExecShutdownForeignScan(ForeignScanState *node)
384 {
385 	FdwRoutine *fdwroutine = node->fdwroutine;
386 
387 	if (fdwroutine->ShutdownForeignScan)
388 		fdwroutine->ShutdownForeignScan(node);
389 }
390