1 /*-------------------------------------------------------------------------
2  *
3  * nodeForeignscan.c
4  *	  Routines to support scans of foreign tables
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeForeignscan.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *
18  *		ExecForeignScan			scans a foreign table.
19  *		ExecInitForeignScan		creates and initializes state info.
20  *		ExecReScanForeignScan	rescans the foreign relation.
21  *		ExecEndForeignScan		releases any resources allocated.
22  */
23 #include "postgres.h"
24 
25 #include "executor/executor.h"
26 #include "executor/nodeForeignscan.h"
27 #include "foreign/fdwapi.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 
31 static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33 
34 
35 /* ----------------------------------------------------------------
36  *		ForeignNext
37  *
38  *		This is a workhorse for ExecForeignScan
39  * ----------------------------------------------------------------
40  */
41 static TupleTableSlot *
ForeignNext(ForeignScanState * node)42 ForeignNext(ForeignScanState *node)
43 {
44 	TupleTableSlot *slot;
45 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 	EState	   *estate = node->ss.ps.state;
48 	MemoryContext oldcontext;
49 
50 	/* Call the Iterate function in short-lived context */
51 	oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
52 	if (plan->operation != CMD_SELECT)
53 	{
54 		/*
55 		 * direct modifications cannot be re-evaluated, so shouldn't get here
56 		 * during EvalPlanQual processing
57 		 */
58 		if (estate->es_epq_active != NULL)
59 			elog(ERROR, "cannot re-evaluate a Foreign Update or Delete during EvalPlanQual");
60 
61 		slot = node->fdwroutine->IterateDirectModify(node);
62 	}
63 	else
64 		slot = node->fdwroutine->IterateForeignScan(node);
65 	MemoryContextSwitchTo(oldcontext);
66 
67 	/*
68 	 * Insert valid value into tableoid, the only actually-useful system
69 	 * column.
70 	 */
71 	if (plan->fsSystemCol && !TupIsNull(slot))
72 		slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
73 
74 	return slot;
75 }
76 
77 /*
78  * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
79  */
80 static bool
ForeignRecheck(ForeignScanState * node,TupleTableSlot * slot)81 ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
82 {
83 	FdwRoutine *fdwroutine = node->fdwroutine;
84 	ExprContext *econtext;
85 
86 	/*
87 	 * extract necessary information from foreign scan node
88 	 */
89 	econtext = node->ss.ps.ps_ExprContext;
90 
91 	/* Does the tuple meet the remote qual condition? */
92 	econtext->ecxt_scantuple = slot;
93 
94 	ResetExprContext(econtext);
95 
96 	/*
97 	 * If an outer join is pushed down, RecheckForeignScan may need to store a
98 	 * different tuple in the slot, because a different set of columns may go
99 	 * to NULL upon recheck.  Otherwise, it shouldn't need to change the slot
100 	 * contents, just return true or false to indicate whether the quals still
101 	 * pass.  For simple cases, setting fdw_recheck_quals may be easier than
102 	 * providing this callback.
103 	 */
104 	if (fdwroutine->RecheckForeignScan &&
105 		!fdwroutine->RecheckForeignScan(node, slot))
106 		return false;
107 
108 	return ExecQual(node->fdw_recheck_quals, econtext);
109 }
110 
111 /* ----------------------------------------------------------------
112  *		ExecForeignScan(node)
113  *
114  *		Fetches the next tuple from the FDW, checks local quals, and
115  *		returns it.
116  *		We call the ExecScan() routine and pass it the appropriate
117  *		access method functions.
118  * ----------------------------------------------------------------
119  */
120 static TupleTableSlot *
ExecForeignScan(PlanState * pstate)121 ExecForeignScan(PlanState *pstate)
122 {
123 	ForeignScanState *node = castNode(ForeignScanState, pstate);
124 
125 	return ExecScan(&node->ss,
126 					(ExecScanAccessMtd) ForeignNext,
127 					(ExecScanRecheckMtd) ForeignRecheck);
128 }
129 
130 
131 /* ----------------------------------------------------------------
132  *		ExecInitForeignScan
133  * ----------------------------------------------------------------
134  */
135 ForeignScanState *
ExecInitForeignScan(ForeignScan * node,EState * estate,int eflags)136 ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
137 {
138 	ForeignScanState *scanstate;
139 	Relation	currentRelation = NULL;
140 	Index		scanrelid = node->scan.scanrelid;
141 	Index		tlistvarno;
142 	FdwRoutine *fdwroutine;
143 
144 	/* check for unsupported flags */
145 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
146 
147 	/*
148 	 * create state structure
149 	 */
150 	scanstate = makeNode(ForeignScanState);
151 	scanstate->ss.ps.plan = (Plan *) node;
152 	scanstate->ss.ps.state = estate;
153 	scanstate->ss.ps.ExecProcNode = ExecForeignScan;
154 
155 	/*
156 	 * Miscellaneous initialization
157 	 *
158 	 * create expression context for node
159 	 */
160 	ExecAssignExprContext(estate, &scanstate->ss.ps);
161 
162 	/*
163 	 * open the scan relation, if any; also acquire function pointers from the
164 	 * FDW's handler
165 	 */
166 	if (scanrelid > 0)
167 	{
168 		currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
169 		scanstate->ss.ss_currentRelation = currentRelation;
170 		fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
171 	}
172 	else
173 	{
174 		/* We can't use the relcache, so get fdwroutine the hard way */
175 		fdwroutine = GetFdwRoutineByServerId(node->fs_server);
176 	}
177 
178 	/*
179 	 * Determine the scan tuple type.  If the FDW provided a targetlist
180 	 * describing the scan tuples, use that; else use base relation's rowtype.
181 	 */
182 	if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
183 	{
184 		TupleDesc	scan_tupdesc;
185 
186 		scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
187 		ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
188 							  &TTSOpsHeapTuple);
189 		/* Node's targetlist will contain Vars with varno = INDEX_VAR */
190 		tlistvarno = INDEX_VAR;
191 	}
192 	else
193 	{
194 		TupleDesc	scan_tupdesc;
195 
196 		/* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
197 		scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
198 		ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
199 							  &TTSOpsHeapTuple);
200 		/* Node's targetlist will contain Vars with varno = scanrelid */
201 		tlistvarno = scanrelid;
202 	}
203 
204 	/* Don't know what an FDW might return */
205 	scanstate->ss.ps.scanopsfixed = false;
206 	scanstate->ss.ps.scanopsset = true;
207 
208 	/*
209 	 * Initialize result slot, type and projection.
210 	 */
211 	ExecInitResultTypeTL(&scanstate->ss.ps);
212 	ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
213 
214 	/*
215 	 * initialize child expressions
216 	 */
217 	scanstate->ss.ps.qual =
218 		ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
219 	scanstate->fdw_recheck_quals =
220 		ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
221 
222 	/*
223 	 * Determine whether to scan the foreign relation asynchronously or not;
224 	 * this has to be kept in sync with the code in ExecInitAppend().
225 	 */
226 	scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
227 									  estate->es_epq_active == NULL);
228 
229 	/*
230 	 * Initialize FDW-related state.
231 	 */
232 	scanstate->fdwroutine = fdwroutine;
233 	scanstate->fdw_state = NULL;
234 
235 	/*
236 	 * For the FDW's convenience, look up the modification target relation's
237 	 * ResultRelInfo.  The ModifyTable node should have initialized it for us,
238 	 * see ExecInitModifyTable.
239 	 *
240 	 * Don't try to look up the ResultRelInfo when EvalPlanQual is active,
241 	 * though.  Direct modififications cannot be re-evaluated as part of
242 	 * EvalPlanQual.  The lookup wouldn't work anyway because during
243 	 * EvalPlanQual processing, EvalPlanQual only initializes the subtree
244 	 * under the ModifyTable, and doesn't run ExecInitModifyTable.
245 	 */
246 	if (node->resultRelation > 0 && estate->es_epq_active == NULL)
247 	{
248 		if (estate->es_result_relations == NULL ||
249 			estate->es_result_relations[node->resultRelation - 1] == NULL)
250 		{
251 			elog(ERROR, "result relation not initialized");
252 		}
253 		scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
254 	}
255 
256 	/* Initialize any outer plan. */
257 	if (outerPlan(node))
258 		outerPlanState(scanstate) =
259 			ExecInitNode(outerPlan(node), estate, eflags);
260 
261 	/*
262 	 * Tell the FDW to initialize the scan.
263 	 */
264 	if (node->operation != CMD_SELECT)
265 	{
266 		/*
267 		 * Direct modifications cannot be re-evaluated by EvalPlanQual, so
268 		 * don't bother preparing the FDW.  There can be ForeignScan nodes in
269 		 * the EvalPlanQual subtree, but ExecForeignScan should never be
270 		 * called on them when EvalPlanQual is active.
271 		 */
272 		if (estate->es_epq_active == NULL)
273 			fdwroutine->BeginDirectModify(scanstate, eflags);
274 	}
275 	else
276 		fdwroutine->BeginForeignScan(scanstate, eflags);
277 
278 	return scanstate;
279 }
280 
281 /* ----------------------------------------------------------------
282  *		ExecEndForeignScan
283  *
284  *		frees any storage allocated through C routines.
285  * ----------------------------------------------------------------
286  */
287 void
ExecEndForeignScan(ForeignScanState * node)288 ExecEndForeignScan(ForeignScanState *node)
289 {
290 	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
291 	EState	   *estate = node->ss.ps.state;
292 
293 	/* Let the FDW shut down */
294 	if (plan->operation != CMD_SELECT)
295 	{
296 		if (estate->es_epq_active == NULL)
297 			node->fdwroutine->EndDirectModify(node);
298 	}
299 	else
300 		node->fdwroutine->EndForeignScan(node);
301 
302 	/* Shut down any outer plan. */
303 	if (outerPlanState(node))
304 		ExecEndNode(outerPlanState(node));
305 
306 	/* Free the exprcontext */
307 	ExecFreeExprContext(&node->ss.ps);
308 
309 	/* clean out the tuple table */
310 	if (node->ss.ps.ps_ResultTupleSlot)
311 		ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
312 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
313 }
314 
315 /* ----------------------------------------------------------------
316  *		ExecReScanForeignScan
317  *
318  *		Rescans the relation.
319  * ----------------------------------------------------------------
320  */
321 void
ExecReScanForeignScan(ForeignScanState * node)322 ExecReScanForeignScan(ForeignScanState *node)
323 {
324 	PlanState  *outerPlan = outerPlanState(node);
325 
326 	node->fdwroutine->ReScanForeignScan(node);
327 
328 	/*
329 	 * If chgParam of subnode is not null then plan will be re-scanned by
330 	 * first ExecProcNode.  outerPlan may also be NULL, in which case there is
331 	 * nothing to rescan at all.
332 	 */
333 	if (outerPlan != NULL && outerPlan->chgParam == NULL)
334 		ExecReScan(outerPlan);
335 
336 	ExecScanReScan(&node->ss);
337 }
338 
339 /* ----------------------------------------------------------------
340  *		ExecForeignScanEstimate
341  *
342  *		Informs size of the parallel coordination information, if any
343  * ----------------------------------------------------------------
344  */
345 void
ExecForeignScanEstimate(ForeignScanState * node,ParallelContext * pcxt)346 ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
347 {
348 	FdwRoutine *fdwroutine = node->fdwroutine;
349 
350 	if (fdwroutine->EstimateDSMForeignScan)
351 	{
352 		node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
353 		shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
354 		shm_toc_estimate_keys(&pcxt->estimator, 1);
355 	}
356 }
357 
358 /* ----------------------------------------------------------------
359  *		ExecForeignScanInitializeDSM
360  *
361  *		Initialize the parallel coordination information
362  * ----------------------------------------------------------------
363  */
364 void
ExecForeignScanInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)365 ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
366 {
367 	FdwRoutine *fdwroutine = node->fdwroutine;
368 
369 	if (fdwroutine->InitializeDSMForeignScan)
370 	{
371 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
372 		void	   *coordinate;
373 
374 		coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
375 		fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
376 		shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
377 	}
378 }
379 
380 /* ----------------------------------------------------------------
381  *		ExecForeignScanReInitializeDSM
382  *
383  *		Reset shared state before beginning a fresh scan.
384  * ----------------------------------------------------------------
385  */
386 void
ExecForeignScanReInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)387 ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
388 {
389 	FdwRoutine *fdwroutine = node->fdwroutine;
390 
391 	if (fdwroutine->ReInitializeDSMForeignScan)
392 	{
393 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
394 		void	   *coordinate;
395 
396 		coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
397 		fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
398 	}
399 }
400 
401 /* ----------------------------------------------------------------
402  *		ExecForeignScanInitializeWorker
403  *
404  *		Initialization according to the parallel coordination information
405  * ----------------------------------------------------------------
406  */
407 void
ExecForeignScanInitializeWorker(ForeignScanState * node,ParallelWorkerContext * pwcxt)408 ExecForeignScanInitializeWorker(ForeignScanState *node,
409 								ParallelWorkerContext *pwcxt)
410 {
411 	FdwRoutine *fdwroutine = node->fdwroutine;
412 
413 	if (fdwroutine->InitializeWorkerForeignScan)
414 	{
415 		int			plan_node_id = node->ss.ps.plan->plan_node_id;
416 		void	   *coordinate;
417 
418 		coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
419 		fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
420 	}
421 }
422 
423 /* ----------------------------------------------------------------
424  *		ExecShutdownForeignScan
425  *
426  *		Gives FDW chance to stop asynchronous resource consumption
427  *		and release any resources still held.
428  * ----------------------------------------------------------------
429  */
430 void
ExecShutdownForeignScan(ForeignScanState * node)431 ExecShutdownForeignScan(ForeignScanState *node)
432 {
433 	FdwRoutine *fdwroutine = node->fdwroutine;
434 
435 	if (fdwroutine->ShutdownForeignScan)
436 		fdwroutine->ShutdownForeignScan(node);
437 }
438 
439 /* ----------------------------------------------------------------
440  *		ExecAsyncForeignScanRequest
441  *
442  *		Asynchronously request a tuple from a designed async-capable node
443  * ----------------------------------------------------------------
444  */
445 void
ExecAsyncForeignScanRequest(AsyncRequest * areq)446 ExecAsyncForeignScanRequest(AsyncRequest *areq)
447 {
448 	ForeignScanState *node = (ForeignScanState *) areq->requestee;
449 	FdwRoutine *fdwroutine = node->fdwroutine;
450 
451 	Assert(fdwroutine->ForeignAsyncRequest != NULL);
452 	fdwroutine->ForeignAsyncRequest(areq);
453 }
454 
455 /* ----------------------------------------------------------------
456  *		ExecAsyncForeignScanConfigureWait
457  *
458  *		In async mode, configure for a wait
459  * ----------------------------------------------------------------
460  */
461 void
ExecAsyncForeignScanConfigureWait(AsyncRequest * areq)462 ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
463 {
464 	ForeignScanState *node = (ForeignScanState *) areq->requestee;
465 	FdwRoutine *fdwroutine = node->fdwroutine;
466 
467 	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
468 	fdwroutine->ForeignAsyncConfigureWait(areq);
469 }
470 
471 /* ----------------------------------------------------------------
472  *		ExecAsyncForeignScanNotify
473  *
474  *		Callback invoked when a relevant event has occurred
475  * ----------------------------------------------------------------
476  */
477 void
ExecAsyncForeignScanNotify(AsyncRequest * areq)478 ExecAsyncForeignScanNotify(AsyncRequest *areq)
479 {
480 	ForeignScanState *node = (ForeignScanState *) areq->requestee;
481 	FdwRoutine *fdwroutine = node->fdwroutine;
482 
483 	Assert(fdwroutine->ForeignAsyncNotify != NULL);
484 	fdwroutine->ForeignAsyncNotify(areq);
485 }
486