1 /*-------------------------------------------------------------------------
2  *
3  * nodeForeignscan.c
4  *	  Routines to support scans of foreign tables
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeForeignscan.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *
18  *		ExecForeignScan			scans a foreign table.
19  *		ExecInitForeignScan		creates and initializes state info.
20  *		ExecReScanForeignScan	rescans the foreign relation.
21  *		ExecEndForeignScan		releases any resources allocated.
22  */
23 #include "postgres.h"
24 
25 #include "executor/executor.h"
26 #include "executor/nodeForeignscan.h"
27 #include "foreign/fdwapi.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 
31 static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33 
34 
35 /* ----------------------------------------------------------------
36  *		ForeignNext
37  *
38  *		This is a workhorse for ExecForeignScan
39  * ----------------------------------------------------------------
40  */
41 static TupleTableSlot *
42 ForeignNext(ForeignScanState *node)
43 {
44 	TupleTableSlot *slot;
45 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 	MemoryContext oldcontext;
48 
49 	/* Call the Iterate function in short-lived context */
50 	oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
51 	if (plan->operation != CMD_SELECT)
52 		slot = node->fdwroutine->IterateDirectModify(node);
53 	else
54 		slot = node->fdwroutine->IterateForeignScan(node);
55 	MemoryContextSwitchTo(oldcontext);
56 
57 	/*
58 	 * Insert valid value into tableoid, the only actually-useful system
59 	 * column.
60 	 */
61 	if (plan->fsSystemCol && !TupIsNull(slot))
62 		slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
63 
64 	return slot;
65 }
66 
67 /*
68  * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
69  */
70 static bool
71 ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
72 {
73 	FdwRoutine *fdwroutine = node->fdwroutine;
74 	ExprContext *econtext;
75 
76 	/*
77 	 * extract necessary information from foreign scan node
78 	 */
79 	econtext = node->ss.ps.ps_ExprContext;
80 
81 	/* Does the tuple meet the remote qual condition? */
82 	econtext->ecxt_scantuple = slot;
83 
84 	ResetExprContext(econtext);
85 
86 	/*
87 	 * If an outer join is pushed down, RecheckForeignScan may need to store a
88 	 * different tuple in the slot, because a different set of columns may go
89 	 * to NULL upon recheck.  Otherwise, it shouldn't need to change the slot
90 	 * contents, just return true or false to indicate whether the quals still
91 	 * pass.  For simple cases, setting fdw_recheck_quals may be easier than
92 	 * providing this callback.
93 	 */
94 	if (fdwroutine->RecheckForeignScan &&
95 		!fdwroutine->RecheckForeignScan(node, slot))
96 		return false;
97 
98 	return ExecQual(node->fdw_recheck_quals, econtext);
99 }
100 
101 /* ----------------------------------------------------------------
102  *		ExecForeignScan(node)
103  *
104  *		Fetches the next tuple from the FDW, checks local quals, and
105  *		returns it.
106  *		We call the ExecScan() routine and pass it the appropriate
107  *		access method functions.
108  * ----------------------------------------------------------------
109  */
110 static TupleTableSlot *
111 ExecForeignScan(PlanState *pstate)
112 {
113 	ForeignScanState *node = castNode(ForeignScanState, pstate);
114 
115 	return ExecScan(&node->ss,
116 					(ExecScanAccessMtd) ForeignNext,
117 					(ExecScanRecheckMtd) ForeignRecheck);
118 }
119 
120 
121 /* ----------------------------------------------------------------
122  *		ExecInitForeignScan
123  * ----------------------------------------------------------------
124  */
125 ForeignScanState *
126 ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
127 {
128 	ForeignScanState *scanstate;
129 	Relation	currentRelation = NULL;
130 	Index		scanrelid = node->scan.scanrelid;
131 	Index		tlistvarno;
132 	FdwRoutine *fdwroutine;
133 
134 	/* check for unsupported flags */
135 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
136 
137 	/*
138 	 * create state structure
139 	 */
140 	scanstate = makeNode(ForeignScanState);
141 	scanstate->ss.ps.plan = (Plan *) node;
142 	scanstate->ss.ps.state = estate;
143 	scanstate->ss.ps.ExecProcNode = ExecForeignScan;
144 
145 	/*
146 	 * Miscellaneous initialization
147 	 *
148 	 * create expression context for node
149 	 */
150 	ExecAssignExprContext(estate, &scanstate->ss.ps);
151 
152 	/*
153 	 * open the scan relation, if any; also acquire function pointers from the
154 	 * FDW's handler
155 	 */
156 	if (scanrelid > 0)
157 	{
158 		currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
159 		scanstate->ss.ss_currentRelation = currentRelation;
160 		fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
161 	}
162 	else
163 	{
164 		/* We can't use the relcache, so get fdwroutine the hard way */
165 		fdwroutine = GetFdwRoutineByServerId(node->fs_server);
166 	}
167 
168 	/*
169 	 * Determine the scan tuple type.  If the FDW provided a targetlist
170 	 * describing the scan tuples, use that; else use base relation's rowtype.
171 	 */
172 	if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
173 	{
174 		TupleDesc	scan_tupdesc;
175 
176 		scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
177 		ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
178 							  &TTSOpsHeapTuple);
179 		/* Node's targetlist will contain Vars with varno = INDEX_VAR */
180 		tlistvarno = INDEX_VAR;
181 	}
182 	else
183 	{
184 		TupleDesc	scan_tupdesc;
185 
186 		/* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
187 		scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
188 		ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
189 							  &TTSOpsHeapTuple);
190 		/* Node's targetlist will contain Vars with varno = scanrelid */
191 		tlistvarno = scanrelid;
192 	}
193 
194 	/* Don't know what an FDW might return */
195 	scanstate->ss.ps.scanopsfixed = false;
196 	scanstate->ss.ps.scanopsset = true;
197 
198 	/*
199 	 * Initialize result slot, type and projection.
200 	 */
201 	ExecInitResultTypeTL(&scanstate->ss.ps);
202 	ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
203 
204 	/*
205 	 * initialize child expressions
206 	 */
207 	scanstate->ss.ps.qual =
208 		ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
209 	scanstate->fdw_recheck_quals =
210 		ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
211 
212 	/*
213 	 * Initialize FDW-related state.
214 	 */
215 	scanstate->fdwroutine = fdwroutine;
216 	scanstate->fdw_state = NULL;
217 
218 	/* Initialize any outer plan. */
219 	if (outerPlan(node))
220 		outerPlanState(scanstate) =
221 			ExecInitNode(outerPlan(node), estate, eflags);
222 
223 	/*
224 	 * Tell the FDW to initialize the scan.
225 	 */
226 	if (node->operation != CMD_SELECT)
227 		fdwroutine->BeginDirectModify(scanstate, eflags);
228 	else
229 		fdwroutine->BeginForeignScan(scanstate, eflags);
230 
231 	return scanstate;
232 }
233 
234 /* ----------------------------------------------------------------
235  *		ExecEndForeignScan
236  *
237  *		frees any storage allocated through C routines.
238  * ----------------------------------------------------------------
239  */
240 void
241 ExecEndForeignScan(ForeignScanState *node)
242 {
243 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
244 
245 	/* Let the FDW shut down */
246 	if (plan->operation != CMD_SELECT)
247 		node->fdwroutine->EndDirectModify(node);
248 	else
249 		node->fdwroutine->EndForeignScan(node);
250 
251 	/* Shut down any outer plan. */
252 	if (outerPlanState(node))
253 		ExecEndNode(outerPlanState(node));
254 
255 	/* Free the exprcontext */
256 	ExecFreeExprContext(&node->ss.ps);
257 
258 	/* clean out the tuple table */
259 	if (node->ss.ps.ps_ResultTupleSlot)
260 		ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
261 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
262 }
263 
264 /* ----------------------------------------------------------------
265  *		ExecReScanForeignScan
266  *
267  *		Rescans the relation.
268  * ----------------------------------------------------------------
269  */
270 void
271 ExecReScanForeignScan(ForeignScanState *node)
272 {
273 	PlanState  *outerPlan = outerPlanState(node);
274 
275 	node->fdwroutine->ReScanForeignScan(node);
276 
277 	/*
278 	 * If chgParam of subnode is not null then plan will be re-scanned by
279 	 * first ExecProcNode.  outerPlan may also be NULL, in which case there is
280 	 * nothing to rescan at all.
281 	 */
282 	if (outerPlan != NULL && outerPlan->chgParam == NULL)
283 		ExecReScan(outerPlan);
284 
285 	ExecScanReScan(&node->ss);
286 }
287 
288 /* ----------------------------------------------------------------
289  *		ExecForeignScanEstimate
290  *
291  *		Informs size of the parallel coordination information, if any
292  * ----------------------------------------------------------------
293  */
294 void
295 ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
296 {
297 	FdwRoutine *fdwroutine = node->fdwroutine;
298 
299 	if (fdwroutine->EstimateDSMForeignScan)
300 	{
301 		node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
302 		shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
303 		shm_toc_estimate_keys(&pcxt->estimator, 1);
304 	}
305 }
306 
307 /* ----------------------------------------------------------------
308  *		ExecForeignScanInitializeDSM
309  *
310  *		Initialize the parallel coordination information
311  * ----------------------------------------------------------------
312  */
313 void
314 ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
315 {
316 	FdwRoutine *fdwroutine = node->fdwroutine;
317 
318 	if (fdwroutine->InitializeDSMForeignScan)
319 	{
320 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
321 		void	   *coordinate;
322 
323 		coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
324 		fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
325 		shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
326 	}
327 }
328 
329 /* ----------------------------------------------------------------
330  *		ExecForeignScanReInitializeDSM
331  *
332  *		Reset shared state before beginning a fresh scan.
333  * ----------------------------------------------------------------
334  */
335 void
336 ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
337 {
338 	FdwRoutine *fdwroutine = node->fdwroutine;
339 
340 	if (fdwroutine->ReInitializeDSMForeignScan)
341 	{
342 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
343 		void	   *coordinate;
344 
345 		coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
346 		fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
347 	}
348 }
349 
350 /* ----------------------------------------------------------------
351  *		ExecForeignScanInitializeWorker
352  *
353  *		Initialization according to the parallel coordination information
354  * ----------------------------------------------------------------
355  */
356 void
357 ExecForeignScanInitializeWorker(ForeignScanState *node,
358 								ParallelWorkerContext *pwcxt)
359 {
360 	FdwRoutine *fdwroutine = node->fdwroutine;
361 
362 	if (fdwroutine->InitializeWorkerForeignScan)
363 	{
364 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
365 		void	   *coordinate;
366 
367 		coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
368 		fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
369 	}
370 }
371 
372 /* ----------------------------------------------------------------
373  *		ExecShutdownForeignScan
374  *
375  *		Gives FDW chance to stop asynchronous resource consumption
376  *		and release any resources still held.
377  * ----------------------------------------------------------------
378  */
379 void
380 ExecShutdownForeignScan(ForeignScanState *node)
381 {
382 	FdwRoutine *fdwroutine = node->fdwroutine;
383 
384 	if (fdwroutine->ShutdownForeignScan)
385 		fdwroutine->ShutdownForeignScan(node);
386 }
387