1 /*-------------------------------------------------------------------------
2 *
3 * nodeForeignscan.c
4 * Routines to support scans of foreign tables
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeForeignscan.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 /*
16 * INTERFACE ROUTINES
17 *
18 * ExecForeignScan scans a foreign table.
19 * ExecInitForeignScan creates and initializes state info.
20 * ExecReScanForeignScan rescans the foreign relation.
21 * ExecEndForeignScan releases any resources allocated.
22 */
23 #include "postgres.h"
24
25 #include "executor/executor.h"
26 #include "executor/nodeForeignscan.h"
27 #include "foreign/fdwapi.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30
31 static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33
34
35 /* ----------------------------------------------------------------
36 * ForeignNext
37 *
38 * This is a workhorse for ExecForeignScan
39 * ----------------------------------------------------------------
40 */
41 static TupleTableSlot *
ForeignNext(ForeignScanState * node)42 ForeignNext(ForeignScanState *node)
43 {
44 TupleTableSlot *slot;
45 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 MemoryContext oldcontext;
48
49 /* Call the Iterate function in short-lived context */
50 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
51 if (plan->operation != CMD_SELECT)
52 slot = node->fdwroutine->IterateDirectModify(node);
53 else
54 slot = node->fdwroutine->IterateForeignScan(node);
55 MemoryContextSwitchTo(oldcontext);
56
57 /*
58 * If any system columns are requested, we have to force the tuple into
59 * physical-tuple form to avoid "cannot extract system attribute from
60 * virtual tuple" errors later. We also insert a valid value for
61 * tableoid, which is the only actually-useful system column.
62 */
63 if (plan->fsSystemCol && !TupIsNull(slot))
64 {
65 HeapTuple tup = ExecMaterializeSlot(slot);
66
67 tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
68 }
69
70 return slot;
71 }
72
73 /*
74 * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
75 */
76 static bool
ForeignRecheck(ForeignScanState * node,TupleTableSlot * slot)77 ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
78 {
79 FdwRoutine *fdwroutine = node->fdwroutine;
80 ExprContext *econtext;
81
82 /*
83 * extract necessary information from foreign scan node
84 */
85 econtext = node->ss.ps.ps_ExprContext;
86
87 /* Does the tuple meet the remote qual condition? */
88 econtext->ecxt_scantuple = slot;
89
90 ResetExprContext(econtext);
91
92 /*
93 * If an outer join is pushed down, RecheckForeignScan may need to store a
94 * different tuple in the slot, because a different set of columns may go
95 * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
96 * contents, just return true or false to indicate whether the quals still
97 * pass. For simple cases, setting fdw_recheck_quals may be easier than
98 * providing this callback.
99 */
100 if (fdwroutine->RecheckForeignScan &&
101 !fdwroutine->RecheckForeignScan(node, slot))
102 return false;
103
104 return ExecQual(node->fdw_recheck_quals, econtext);
105 }
106
107 /* ----------------------------------------------------------------
108 * ExecForeignScan(node)
109 *
110 * Fetches the next tuple from the FDW, checks local quals, and
111 * returns it.
112 * We call the ExecScan() routine and pass it the appropriate
113 * access method functions.
114 * ----------------------------------------------------------------
115 */
116 static TupleTableSlot *
ExecForeignScan(PlanState * pstate)117 ExecForeignScan(PlanState *pstate)
118 {
119 ForeignScanState *node = castNode(ForeignScanState, pstate);
120
121 return ExecScan(&node->ss,
122 (ExecScanAccessMtd) ForeignNext,
123 (ExecScanRecheckMtd) ForeignRecheck);
124 }
125
126
127 /* ----------------------------------------------------------------
128 * ExecInitForeignScan
129 * ----------------------------------------------------------------
130 */
131 ForeignScanState *
ExecInitForeignScan(ForeignScan * node,EState * estate,int eflags)132 ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
133 {
134 ForeignScanState *scanstate;
135 Relation currentRelation = NULL;
136 Index scanrelid = node->scan.scanrelid;
137 Index tlistvarno;
138 FdwRoutine *fdwroutine;
139
140 /* check for unsupported flags */
141 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
142
143 /*
144 * create state structure
145 */
146 scanstate = makeNode(ForeignScanState);
147 scanstate->ss.ps.plan = (Plan *) node;
148 scanstate->ss.ps.state = estate;
149 scanstate->ss.ps.ExecProcNode = ExecForeignScan;
150
151 /*
152 * Miscellaneous initialization
153 *
154 * create expression context for node
155 */
156 ExecAssignExprContext(estate, &scanstate->ss.ps);
157
158 /*
159 * open the base relation, if any, and acquire an appropriate lock on it;
160 * also acquire function pointers from the FDW's handler
161 */
162 if (scanrelid > 0)
163 {
164 currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
165 scanstate->ss.ss_currentRelation = currentRelation;
166 fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
167 }
168 else
169 {
170 /* We can't use the relcache, so get fdwroutine the hard way */
171 fdwroutine = GetFdwRoutineByServerId(node->fs_server);
172 }
173
174 /*
175 * Determine the scan tuple type. If the FDW provided a targetlist
176 * describing the scan tuples, use that; else use base relation's rowtype.
177 */
178 if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
179 {
180 TupleDesc scan_tupdesc;
181
182 scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist, false);
183 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc);
184 /* Node's targetlist will contain Vars with varno = INDEX_VAR */
185 tlistvarno = INDEX_VAR;
186 }
187 else
188 {
189 TupleDesc scan_tupdesc;
190
191 /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
192 scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
193 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc);
194 /* Node's targetlist will contain Vars with varno = scanrelid */
195 tlistvarno = scanrelid;
196 }
197
198 /*
199 * Initialize result slot, type and projection.
200 */
201 ExecInitResultTupleSlotTL(estate, &scanstate->ss.ps);
202 ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
203
204 /*
205 * initialize child expressions
206 */
207 scanstate->ss.ps.qual =
208 ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
209 scanstate->fdw_recheck_quals =
210 ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
211
212 /*
213 * Initialize FDW-related state.
214 */
215 scanstate->fdwroutine = fdwroutine;
216 scanstate->fdw_state = NULL;
217
218 /* Initialize any outer plan. */
219 if (outerPlan(node))
220 outerPlanState(scanstate) =
221 ExecInitNode(outerPlan(node), estate, eflags);
222
223 /*
224 * Tell the FDW to initialize the scan.
225 */
226 if (node->operation != CMD_SELECT)
227 fdwroutine->BeginDirectModify(scanstate, eflags);
228 else
229 fdwroutine->BeginForeignScan(scanstate, eflags);
230
231 return scanstate;
232 }
233
234 /* ----------------------------------------------------------------
235 * ExecEndForeignScan
236 *
237 * frees any storage allocated through C routines.
238 * ----------------------------------------------------------------
239 */
240 void
ExecEndForeignScan(ForeignScanState * node)241 ExecEndForeignScan(ForeignScanState *node)
242 {
243 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
244
245 /* Let the FDW shut down */
246 if (plan->operation != CMD_SELECT)
247 node->fdwroutine->EndDirectModify(node);
248 else
249 node->fdwroutine->EndForeignScan(node);
250
251 /* Shut down any outer plan. */
252 if (outerPlanState(node))
253 ExecEndNode(outerPlanState(node));
254
255 /* Free the exprcontext */
256 ExecFreeExprContext(&node->ss.ps);
257
258 /* clean out the tuple table */
259 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
260 ExecClearTuple(node->ss.ss_ScanTupleSlot);
261
262 /* close the relation. */
263 if (node->ss.ss_currentRelation)
264 ExecCloseScanRelation(node->ss.ss_currentRelation);
265 }
266
267 /* ----------------------------------------------------------------
268 * ExecReScanForeignScan
269 *
270 * Rescans the relation.
271 * ----------------------------------------------------------------
272 */
273 void
ExecReScanForeignScan(ForeignScanState * node)274 ExecReScanForeignScan(ForeignScanState *node)
275 {
276 PlanState *outerPlan = outerPlanState(node);
277
278 node->fdwroutine->ReScanForeignScan(node);
279
280 /*
281 * If chgParam of subnode is not null then plan will be re-scanned by
282 * first ExecProcNode. outerPlan may also be NULL, in which case there is
283 * nothing to rescan at all.
284 */
285 if (outerPlan != NULL && outerPlan->chgParam == NULL)
286 ExecReScan(outerPlan);
287
288 ExecScanReScan(&node->ss);
289 }
290
291 /* ----------------------------------------------------------------
292 * ExecForeignScanEstimate
293 *
294 * Informs size of the parallel coordination information, if any
295 * ----------------------------------------------------------------
296 */
297 void
ExecForeignScanEstimate(ForeignScanState * node,ParallelContext * pcxt)298 ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
299 {
300 FdwRoutine *fdwroutine = node->fdwroutine;
301
302 if (fdwroutine->EstimateDSMForeignScan)
303 {
304 node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
305 shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
306 shm_toc_estimate_keys(&pcxt->estimator, 1);
307 }
308 }
309
310 /* ----------------------------------------------------------------
311 * ExecForeignScanInitializeDSM
312 *
313 * Initialize the parallel coordination information
314 * ----------------------------------------------------------------
315 */
316 void
ExecForeignScanInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)317 ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
318 {
319 FdwRoutine *fdwroutine = node->fdwroutine;
320
321 if (fdwroutine->InitializeDSMForeignScan)
322 {
323 int plan_node_id = node->ss.ps.plan->plan_node_id;
324 void *coordinate;
325
326 coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
327 fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
328 shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
329 }
330 }
331
332 /* ----------------------------------------------------------------
333 * ExecForeignScanReInitializeDSM
334 *
335 * Reset shared state before beginning a fresh scan.
336 * ----------------------------------------------------------------
337 */
338 void
ExecForeignScanReInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)339 ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
340 {
341 FdwRoutine *fdwroutine = node->fdwroutine;
342
343 if (fdwroutine->ReInitializeDSMForeignScan)
344 {
345 int plan_node_id = node->ss.ps.plan->plan_node_id;
346 void *coordinate;
347
348 coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
349 fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
350 }
351 }
352
353 /* ----------------------------------------------------------------
354 * ExecForeignScanInitializeWorker
355 *
356 * Initialization according to the parallel coordination information
357 * ----------------------------------------------------------------
358 */
359 void
ExecForeignScanInitializeWorker(ForeignScanState * node,ParallelWorkerContext * pwcxt)360 ExecForeignScanInitializeWorker(ForeignScanState *node,
361 ParallelWorkerContext *pwcxt)
362 {
363 FdwRoutine *fdwroutine = node->fdwroutine;
364
365 if (fdwroutine->InitializeWorkerForeignScan)
366 {
367 int plan_node_id = node->ss.ps.plan->plan_node_id;
368 void *coordinate;
369
370 coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
371 fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
372 }
373 }
374
375 /* ----------------------------------------------------------------
376 * ExecShutdownForeignScan
377 *
378 * Gives FDW chance to stop asynchronous resource consumption
379 * and release any resources still held.
380 * ----------------------------------------------------------------
381 */
382 void
ExecShutdownForeignScan(ForeignScanState * node)383 ExecShutdownForeignScan(ForeignScanState *node)
384 {
385 FdwRoutine *fdwroutine = node->fdwroutine;
386
387 if (fdwroutine->ShutdownForeignScan)
388 fdwroutine->ShutdownForeignScan(node);
389 }
390