1 /*-------------------------------------------------------------------------
2 *
3 * nodeForeignscan.c
4 * Routines to support scans of foreign tables
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeForeignscan.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 /*
16 * INTERFACE ROUTINES
17 *
18 * ExecForeignScan scans a foreign table.
19 * ExecInitForeignScan creates and initializes state info.
20 * ExecReScanForeignScan rescans the foreign relation.
21 * ExecEndForeignScan releases any resources allocated.
22 */
23 #include "postgres.h"
24
25 #include "executor/executor.h"
26 #include "executor/nodeForeignscan.h"
27 #include "foreign/fdwapi.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30
31 static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33
34
35 /* ----------------------------------------------------------------
36 * ForeignNext
37 *
38 * This is a workhorse for ExecForeignScan
39 * ----------------------------------------------------------------
40 */
41 static TupleTableSlot *
ForeignNext(ForeignScanState * node)42 ForeignNext(ForeignScanState *node)
43 {
44 TupleTableSlot *slot;
45 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 EState *estate = node->ss.ps.state;
48 MemoryContext oldcontext;
49
50 /* Call the Iterate function in short-lived context */
51 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
52 if (plan->operation != CMD_SELECT)
53 {
54 /*
55 * direct modifications cannot be re-evaluated, so shouldn't get here
56 * during EvalPlanQual processing
57 */
58 if (estate->es_epq_active != NULL)
59 elog(ERROR, "cannot re-evaluate a Foreign Update or Delete during EvalPlanQual");
60
61 slot = node->fdwroutine->IterateDirectModify(node);
62 }
63 else
64 slot = node->fdwroutine->IterateForeignScan(node);
65 MemoryContextSwitchTo(oldcontext);
66
67 /*
68 * Insert valid value into tableoid, the only actually-useful system
69 * column.
70 */
71 if (plan->fsSystemCol && !TupIsNull(slot))
72 slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
73
74 return slot;
75 }
76
77 /*
78 * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
79 */
80 static bool
ForeignRecheck(ForeignScanState * node,TupleTableSlot * slot)81 ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
82 {
83 FdwRoutine *fdwroutine = node->fdwroutine;
84 ExprContext *econtext;
85
86 /*
87 * extract necessary information from foreign scan node
88 */
89 econtext = node->ss.ps.ps_ExprContext;
90
91 /* Does the tuple meet the remote qual condition? */
92 econtext->ecxt_scantuple = slot;
93
94 ResetExprContext(econtext);
95
96 /*
97 * If an outer join is pushed down, RecheckForeignScan may need to store a
98 * different tuple in the slot, because a different set of columns may go
99 * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
100 * contents, just return true or false to indicate whether the quals still
101 * pass. For simple cases, setting fdw_recheck_quals may be easier than
102 * providing this callback.
103 */
104 if (fdwroutine->RecheckForeignScan &&
105 !fdwroutine->RecheckForeignScan(node, slot))
106 return false;
107
108 return ExecQual(node->fdw_recheck_quals, econtext);
109 }
110
111 /* ----------------------------------------------------------------
112 * ExecForeignScan(node)
113 *
114 * Fetches the next tuple from the FDW, checks local quals, and
115 * returns it.
116 * We call the ExecScan() routine and pass it the appropriate
117 * access method functions.
118 * ----------------------------------------------------------------
119 */
120 static TupleTableSlot *
ExecForeignScan(PlanState * pstate)121 ExecForeignScan(PlanState *pstate)
122 {
123 ForeignScanState *node = castNode(ForeignScanState, pstate);
124
125 return ExecScan(&node->ss,
126 (ExecScanAccessMtd) ForeignNext,
127 (ExecScanRecheckMtd) ForeignRecheck);
128 }
129
130
131 /* ----------------------------------------------------------------
132 * ExecInitForeignScan
133 * ----------------------------------------------------------------
134 */
135 ForeignScanState *
ExecInitForeignScan(ForeignScan * node,EState * estate,int eflags)136 ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
137 {
138 ForeignScanState *scanstate;
139 Relation currentRelation = NULL;
140 Index scanrelid = node->scan.scanrelid;
141 Index tlistvarno;
142 FdwRoutine *fdwroutine;
143
144 /* check for unsupported flags */
145 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
146
147 /*
148 * create state structure
149 */
150 scanstate = makeNode(ForeignScanState);
151 scanstate->ss.ps.plan = (Plan *) node;
152 scanstate->ss.ps.state = estate;
153 scanstate->ss.ps.ExecProcNode = ExecForeignScan;
154
155 /*
156 * Miscellaneous initialization
157 *
158 * create expression context for node
159 */
160 ExecAssignExprContext(estate, &scanstate->ss.ps);
161
162 /*
163 * open the scan relation, if any; also acquire function pointers from the
164 * FDW's handler
165 */
166 if (scanrelid > 0)
167 {
168 currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
169 scanstate->ss.ss_currentRelation = currentRelation;
170 fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
171 }
172 else
173 {
174 /* We can't use the relcache, so get fdwroutine the hard way */
175 fdwroutine = GetFdwRoutineByServerId(node->fs_server);
176 }
177
178 /*
179 * Determine the scan tuple type. If the FDW provided a targetlist
180 * describing the scan tuples, use that; else use base relation's rowtype.
181 */
182 if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
183 {
184 TupleDesc scan_tupdesc;
185
186 scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
187 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
188 &TTSOpsHeapTuple);
189 /* Node's targetlist will contain Vars with varno = INDEX_VAR */
190 tlistvarno = INDEX_VAR;
191 }
192 else
193 {
194 TupleDesc scan_tupdesc;
195
196 /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
197 scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
198 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
199 &TTSOpsHeapTuple);
200 /* Node's targetlist will contain Vars with varno = scanrelid */
201 tlistvarno = scanrelid;
202 }
203
204 /* Don't know what an FDW might return */
205 scanstate->ss.ps.scanopsfixed = false;
206 scanstate->ss.ps.scanopsset = true;
207
208 /*
209 * Initialize result slot, type and projection.
210 */
211 ExecInitResultTypeTL(&scanstate->ss.ps);
212 ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
213
214 /*
215 * initialize child expressions
216 */
217 scanstate->ss.ps.qual =
218 ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
219 scanstate->fdw_recheck_quals =
220 ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
221
222 /*
223 * Determine whether to scan the foreign relation asynchronously or not;
224 * this has to be kept in sync with the code in ExecInitAppend().
225 */
226 scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
227 estate->es_epq_active == NULL);
228
229 /*
230 * Initialize FDW-related state.
231 */
232 scanstate->fdwroutine = fdwroutine;
233 scanstate->fdw_state = NULL;
234
235 /*
236 * For the FDW's convenience, look up the modification target relation's
237 * ResultRelInfo. The ModifyTable node should have initialized it for us,
238 * see ExecInitModifyTable.
239 *
240 * Don't try to look up the ResultRelInfo when EvalPlanQual is active,
241 * though. Direct modififications cannot be re-evaluated as part of
242 * EvalPlanQual. The lookup wouldn't work anyway because during
243 * EvalPlanQual processing, EvalPlanQual only initializes the subtree
244 * under the ModifyTable, and doesn't run ExecInitModifyTable.
245 */
246 if (node->resultRelation > 0 && estate->es_epq_active == NULL)
247 {
248 if (estate->es_result_relations == NULL ||
249 estate->es_result_relations[node->resultRelation - 1] == NULL)
250 {
251 elog(ERROR, "result relation not initialized");
252 }
253 scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
254 }
255
256 /* Initialize any outer plan. */
257 if (outerPlan(node))
258 outerPlanState(scanstate) =
259 ExecInitNode(outerPlan(node), estate, eflags);
260
261 /*
262 * Tell the FDW to initialize the scan.
263 */
264 if (node->operation != CMD_SELECT)
265 {
266 /*
267 * Direct modifications cannot be re-evaluated by EvalPlanQual, so
268 * don't bother preparing the FDW. There can be ForeignScan nodes in
269 * the EvalPlanQual subtree, but ExecForeignScan should never be
270 * called on them when EvalPlanQual is active.
271 */
272 if (estate->es_epq_active == NULL)
273 fdwroutine->BeginDirectModify(scanstate, eflags);
274 }
275 else
276 fdwroutine->BeginForeignScan(scanstate, eflags);
277
278 return scanstate;
279 }
280
281 /* ----------------------------------------------------------------
282 * ExecEndForeignScan
283 *
284 * frees any storage allocated through C routines.
285 * ----------------------------------------------------------------
286 */
287 void
ExecEndForeignScan(ForeignScanState * node)288 ExecEndForeignScan(ForeignScanState *node)
289 {
290 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
291 EState *estate = node->ss.ps.state;
292
293 /* Let the FDW shut down */
294 if (plan->operation != CMD_SELECT)
295 {
296 if (estate->es_epq_active == NULL)
297 node->fdwroutine->EndDirectModify(node);
298 }
299 else
300 node->fdwroutine->EndForeignScan(node);
301
302 /* Shut down any outer plan. */
303 if (outerPlanState(node))
304 ExecEndNode(outerPlanState(node));
305
306 /* Free the exprcontext */
307 ExecFreeExprContext(&node->ss.ps);
308
309 /* clean out the tuple table */
310 if (node->ss.ps.ps_ResultTupleSlot)
311 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
312 ExecClearTuple(node->ss.ss_ScanTupleSlot);
313 }
314
315 /* ----------------------------------------------------------------
316 * ExecReScanForeignScan
317 *
318 * Rescans the relation.
319 * ----------------------------------------------------------------
320 */
321 void
ExecReScanForeignScan(ForeignScanState * node)322 ExecReScanForeignScan(ForeignScanState *node)
323 {
324 PlanState *outerPlan = outerPlanState(node);
325
326 node->fdwroutine->ReScanForeignScan(node);
327
328 /*
329 * If chgParam of subnode is not null then plan will be re-scanned by
330 * first ExecProcNode. outerPlan may also be NULL, in which case there is
331 * nothing to rescan at all.
332 */
333 if (outerPlan != NULL && outerPlan->chgParam == NULL)
334 ExecReScan(outerPlan);
335
336 ExecScanReScan(&node->ss);
337 }
338
339 /* ----------------------------------------------------------------
340 * ExecForeignScanEstimate
341 *
342 * Informs size of the parallel coordination information, if any
343 * ----------------------------------------------------------------
344 */
345 void
ExecForeignScanEstimate(ForeignScanState * node,ParallelContext * pcxt)346 ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
347 {
348 FdwRoutine *fdwroutine = node->fdwroutine;
349
350 if (fdwroutine->EstimateDSMForeignScan)
351 {
352 node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
353 shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
354 shm_toc_estimate_keys(&pcxt->estimator, 1);
355 }
356 }
357
358 /* ----------------------------------------------------------------
359 * ExecForeignScanInitializeDSM
360 *
361 * Initialize the parallel coordination information
362 * ----------------------------------------------------------------
363 */
364 void
ExecForeignScanInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)365 ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
366 {
367 FdwRoutine *fdwroutine = node->fdwroutine;
368
369 if (fdwroutine->InitializeDSMForeignScan)
370 {
371 int plan_node_id = node->ss.ps.plan->plan_node_id;
372 void *coordinate;
373
374 coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
375 fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
376 shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
377 }
378 }
379
380 /* ----------------------------------------------------------------
381 * ExecForeignScanReInitializeDSM
382 *
383 * Reset shared state before beginning a fresh scan.
384 * ----------------------------------------------------------------
385 */
386 void
ExecForeignScanReInitializeDSM(ForeignScanState * node,ParallelContext * pcxt)387 ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
388 {
389 FdwRoutine *fdwroutine = node->fdwroutine;
390
391 if (fdwroutine->ReInitializeDSMForeignScan)
392 {
393 int plan_node_id = node->ss.ps.plan->plan_node_id;
394 void *coordinate;
395
396 coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
397 fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
398 }
399 }
400
401 /* ----------------------------------------------------------------
402 * ExecForeignScanInitializeWorker
403 *
404 * Initialization according to the parallel coordination information
405 * ----------------------------------------------------------------
406 */
407 void
ExecForeignScanInitializeWorker(ForeignScanState * node,ParallelWorkerContext * pwcxt)408 ExecForeignScanInitializeWorker(ForeignScanState *node,
409 ParallelWorkerContext *pwcxt)
410 {
411 FdwRoutine *fdwroutine = node->fdwroutine;
412
413 if (fdwroutine->InitializeWorkerForeignScan)
414 {
415 int plan_node_id = node->ss.ps.plan->plan_node_id;
416 void *coordinate;
417
418 coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
419 fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
420 }
421 }
422
423 /* ----------------------------------------------------------------
424 * ExecShutdownForeignScan
425 *
426 * Gives FDW chance to stop asynchronous resource consumption
427 * and release any resources still held.
428 * ----------------------------------------------------------------
429 */
430 void
ExecShutdownForeignScan(ForeignScanState * node)431 ExecShutdownForeignScan(ForeignScanState *node)
432 {
433 FdwRoutine *fdwroutine = node->fdwroutine;
434
435 if (fdwroutine->ShutdownForeignScan)
436 fdwroutine->ShutdownForeignScan(node);
437 }
438
439 /* ----------------------------------------------------------------
440 * ExecAsyncForeignScanRequest
441 *
442 * Asynchronously request a tuple from a designed async-capable node
443 * ----------------------------------------------------------------
444 */
445 void
ExecAsyncForeignScanRequest(AsyncRequest * areq)446 ExecAsyncForeignScanRequest(AsyncRequest *areq)
447 {
448 ForeignScanState *node = (ForeignScanState *) areq->requestee;
449 FdwRoutine *fdwroutine = node->fdwroutine;
450
451 Assert(fdwroutine->ForeignAsyncRequest != NULL);
452 fdwroutine->ForeignAsyncRequest(areq);
453 }
454
455 /* ----------------------------------------------------------------
456 * ExecAsyncForeignScanConfigureWait
457 *
458 * In async mode, configure for a wait
459 * ----------------------------------------------------------------
460 */
461 void
ExecAsyncForeignScanConfigureWait(AsyncRequest * areq)462 ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
463 {
464 ForeignScanState *node = (ForeignScanState *) areq->requestee;
465 FdwRoutine *fdwroutine = node->fdwroutine;
466
467 Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
468 fdwroutine->ForeignAsyncConfigureWait(areq);
469 }
470
471 /* ----------------------------------------------------------------
472 * ExecAsyncForeignScanNotify
473 *
474 * Callback invoked when a relevant event has occurred
475 * ----------------------------------------------------------------
476 */
477 void
ExecAsyncForeignScanNotify(AsyncRequest * areq)478 ExecAsyncForeignScanNotify(AsyncRequest *areq)
479 {
480 ForeignScanState *node = (ForeignScanState *) areq->requestee;
481 FdwRoutine *fdwroutine = node->fdwroutine;
482
483 Assert(fdwroutine->ForeignAsyncNotify != NULL);
484 fdwroutine->ForeignAsyncNotify(areq);
485 }
486