1 /*------------------------------------------------------------------------- 2 * 3 * nodeForeignscan.c 4 * Routines to support scans of foreign tables 5 * 6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/executor/nodeForeignscan.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 /* 16 * INTERFACE ROUTINES 17 * 18 * ExecForeignScan scans a foreign table. 19 * ExecInitForeignScan creates and initializes state info. 20 * ExecReScanForeignScan rescans the foreign relation. 21 * ExecEndForeignScan releases any resources allocated. 22 */ 23 #include "postgres.h" 24 25 #include "executor/executor.h" 26 #include "executor/nodeForeignscan.h" 27 #include "foreign/fdwapi.h" 28 #include "utils/memutils.h" 29 #include "utils/rel.h" 30 31 static TupleTableSlot *ForeignNext(ForeignScanState *node); 32 static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot); 33 34 35 /* ---------------------------------------------------------------- 36 * ForeignNext 37 * 38 * This is a workhorse for ExecForeignScan 39 * ---------------------------------------------------------------- 40 */ 41 static TupleTableSlot * 42 ForeignNext(ForeignScanState *node) 43 { 44 TupleTableSlot *slot; 45 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; 46 ExprContext *econtext = node->ss.ps.ps_ExprContext; 47 MemoryContext oldcontext; 48 49 /* Call the Iterate function in short-lived context */ 50 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 51 if (plan->operation != CMD_SELECT) 52 slot = node->fdwroutine->IterateDirectModify(node); 53 else 54 slot = node->fdwroutine->IterateForeignScan(node); 55 MemoryContextSwitchTo(oldcontext); 56 57 /* 58 * Insert valid value into tableoid, the only actually-useful system 59 * column. 60 */ 61 if (plan->fsSystemCol && !TupIsNull(slot)) 62 slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation); 63 64 return slot; 65 } 66 67 /* 68 * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual 69 */ 70 static bool 71 ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot) 72 { 73 FdwRoutine *fdwroutine = node->fdwroutine; 74 ExprContext *econtext; 75 76 /* 77 * extract necessary information from foreign scan node 78 */ 79 econtext = node->ss.ps.ps_ExprContext; 80 81 /* Does the tuple meet the remote qual condition? */ 82 econtext->ecxt_scantuple = slot; 83 84 ResetExprContext(econtext); 85 86 /* 87 * If an outer join is pushed down, RecheckForeignScan may need to store a 88 * different tuple in the slot, because a different set of columns may go 89 * to NULL upon recheck. Otherwise, it shouldn't need to change the slot 90 * contents, just return true or false to indicate whether the quals still 91 * pass. For simple cases, setting fdw_recheck_quals may be easier than 92 * providing this callback. 93 */ 94 if (fdwroutine->RecheckForeignScan && 95 !fdwroutine->RecheckForeignScan(node, slot)) 96 return false; 97 98 return ExecQual(node->fdw_recheck_quals, econtext); 99 } 100 101 /* ---------------------------------------------------------------- 102 * ExecForeignScan(node) 103 * 104 * Fetches the next tuple from the FDW, checks local quals, and 105 * returns it. 106 * We call the ExecScan() routine and pass it the appropriate 107 * access method functions. 108 * ---------------------------------------------------------------- 109 */ 110 static TupleTableSlot * 111 ExecForeignScan(PlanState *pstate) 112 { 113 ForeignScanState *node = castNode(ForeignScanState, pstate); 114 115 return ExecScan(&node->ss, 116 (ExecScanAccessMtd) ForeignNext, 117 (ExecScanRecheckMtd) ForeignRecheck); 118 } 119 120 121 /* ---------------------------------------------------------------- 122 * ExecInitForeignScan 123 * ---------------------------------------------------------------- 124 */ 125 ForeignScanState * 126 ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) 127 { 128 ForeignScanState *scanstate; 129 Relation currentRelation = NULL; 130 Index scanrelid = node->scan.scanrelid; 131 Index tlistvarno; 132 FdwRoutine *fdwroutine; 133 134 /* check for unsupported flags */ 135 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 136 137 /* 138 * create state structure 139 */ 140 scanstate = makeNode(ForeignScanState); 141 scanstate->ss.ps.plan = (Plan *) node; 142 scanstate->ss.ps.state = estate; 143 scanstate->ss.ps.ExecProcNode = ExecForeignScan; 144 145 /* 146 * Miscellaneous initialization 147 * 148 * create expression context for node 149 */ 150 ExecAssignExprContext(estate, &scanstate->ss.ps); 151 152 /* 153 * open the scan relation, if any; also acquire function pointers from the 154 * FDW's handler 155 */ 156 if (scanrelid > 0) 157 { 158 currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags); 159 scanstate->ss.ss_currentRelation = currentRelation; 160 fdwroutine = GetFdwRoutineForRelation(currentRelation, true); 161 } 162 else 163 { 164 /* We can't use the relcache, so get fdwroutine the hard way */ 165 fdwroutine = GetFdwRoutineByServerId(node->fs_server); 166 } 167 168 /* 169 * Determine the scan tuple type. If the FDW provided a targetlist 170 * describing the scan tuples, use that; else use base relation's rowtype. 171 */ 172 if (node->fdw_scan_tlist != NIL || currentRelation == NULL) 173 { 174 TupleDesc scan_tupdesc; 175 176 scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist); 177 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, 178 &TTSOpsHeapTuple); 179 /* Node's targetlist will contain Vars with varno = INDEX_VAR */ 180 tlistvarno = INDEX_VAR; 181 } 182 else 183 { 184 TupleDesc scan_tupdesc; 185 186 /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */ 187 scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation)); 188 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, 189 &TTSOpsHeapTuple); 190 /* Node's targetlist will contain Vars with varno = scanrelid */ 191 tlistvarno = scanrelid; 192 } 193 194 /* Don't know what an FDW might return */ 195 scanstate->ss.ps.scanopsfixed = false; 196 scanstate->ss.ps.scanopsset = true; 197 198 /* 199 * Initialize result slot, type and projection. 200 */ 201 ExecInitResultTypeTL(&scanstate->ss.ps); 202 ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno); 203 204 /* 205 * initialize child expressions 206 */ 207 scanstate->ss.ps.qual = 208 ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate); 209 scanstate->fdw_recheck_quals = 210 ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); 211 212 /* 213 * Initialize FDW-related state. 214 */ 215 scanstate->fdwroutine = fdwroutine; 216 scanstate->fdw_state = NULL; 217 218 /* Initialize any outer plan. */ 219 if (outerPlan(node)) 220 outerPlanState(scanstate) = 221 ExecInitNode(outerPlan(node), estate, eflags); 222 223 /* 224 * Tell the FDW to initialize the scan. 225 */ 226 if (node->operation != CMD_SELECT) 227 fdwroutine->BeginDirectModify(scanstate, eflags); 228 else 229 fdwroutine->BeginForeignScan(scanstate, eflags); 230 231 return scanstate; 232 } 233 234 /* ---------------------------------------------------------------- 235 * ExecEndForeignScan 236 * 237 * frees any storage allocated through C routines. 238 * ---------------------------------------------------------------- 239 */ 240 void 241 ExecEndForeignScan(ForeignScanState *node) 242 { 243 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; 244 245 /* Let the FDW shut down */ 246 if (plan->operation != CMD_SELECT) 247 node->fdwroutine->EndDirectModify(node); 248 else 249 node->fdwroutine->EndForeignScan(node); 250 251 /* Shut down any outer plan. */ 252 if (outerPlanState(node)) 253 ExecEndNode(outerPlanState(node)); 254 255 /* Free the exprcontext */ 256 ExecFreeExprContext(&node->ss.ps); 257 258 /* clean out the tuple table */ 259 if (node->ss.ps.ps_ResultTupleSlot) 260 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); 261 ExecClearTuple(node->ss.ss_ScanTupleSlot); 262 } 263 264 /* ---------------------------------------------------------------- 265 * ExecReScanForeignScan 266 * 267 * Rescans the relation. 268 * ---------------------------------------------------------------- 269 */ 270 void 271 ExecReScanForeignScan(ForeignScanState *node) 272 { 273 PlanState *outerPlan = outerPlanState(node); 274 275 node->fdwroutine->ReScanForeignScan(node); 276 277 /* 278 * If chgParam of subnode is not null then plan will be re-scanned by 279 * first ExecProcNode. outerPlan may also be NULL, in which case there is 280 * nothing to rescan at all. 281 */ 282 if (outerPlan != NULL && outerPlan->chgParam == NULL) 283 ExecReScan(outerPlan); 284 285 ExecScanReScan(&node->ss); 286 } 287 288 /* ---------------------------------------------------------------- 289 * ExecForeignScanEstimate 290 * 291 * Informs size of the parallel coordination information, if any 292 * ---------------------------------------------------------------- 293 */ 294 void 295 ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) 296 { 297 FdwRoutine *fdwroutine = node->fdwroutine; 298 299 if (fdwroutine->EstimateDSMForeignScan) 300 { 301 node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); 302 shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); 303 shm_toc_estimate_keys(&pcxt->estimator, 1); 304 } 305 } 306 307 /* ---------------------------------------------------------------- 308 * ExecForeignScanInitializeDSM 309 * 310 * Initialize the parallel coordination information 311 * ---------------------------------------------------------------- 312 */ 313 void 314 ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) 315 { 316 FdwRoutine *fdwroutine = node->fdwroutine; 317 318 if (fdwroutine->InitializeDSMForeignScan) 319 { 320 int plan_node_id = node->ss.ps.plan->plan_node_id; 321 void *coordinate; 322 323 coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); 324 fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); 325 shm_toc_insert(pcxt->toc, plan_node_id, coordinate); 326 } 327 } 328 329 /* ---------------------------------------------------------------- 330 * ExecForeignScanReInitializeDSM 331 * 332 * Reset shared state before beginning a fresh scan. 333 * ---------------------------------------------------------------- 334 */ 335 void 336 ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) 337 { 338 FdwRoutine *fdwroutine = node->fdwroutine; 339 340 if (fdwroutine->ReInitializeDSMForeignScan) 341 { 342 int plan_node_id = node->ss.ps.plan->plan_node_id; 343 void *coordinate; 344 345 coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); 346 fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate); 347 } 348 } 349 350 /* ---------------------------------------------------------------- 351 * ExecForeignScanInitializeWorker 352 * 353 * Initialization according to the parallel coordination information 354 * ---------------------------------------------------------------- 355 */ 356 void 357 ExecForeignScanInitializeWorker(ForeignScanState *node, 358 ParallelWorkerContext *pwcxt) 359 { 360 FdwRoutine *fdwroutine = node->fdwroutine; 361 362 if (fdwroutine->InitializeWorkerForeignScan) 363 { 364 int plan_node_id = node->ss.ps.plan->plan_node_id; 365 void *coordinate; 366 367 coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); 368 fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate); 369 } 370 } 371 372 /* ---------------------------------------------------------------- 373 * ExecShutdownForeignScan 374 * 375 * Gives FDW chance to stop asynchronous resource consumption 376 * and release any resources still held. 377 * ---------------------------------------------------------------- 378 */ 379 void 380 ExecShutdownForeignScan(ForeignScanState *node) 381 { 382 FdwRoutine *fdwroutine = node->fdwroutine; 383 384 if (fdwroutine->ShutdownForeignScan) 385 fdwroutine->ShutdownForeignScan(node); 386 } 387