1 /*------------------------------------------------------------------------- 2 * 3 * nodeGather.c 4 * Support routines for scanning a plan via multiple workers. 5 * 6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * A Gather executor launches parallel workers to run multiple copies of a test()10 * plan. It can also run the plan itself, if the workers are not available 11 * or have not started up yet. It then merges all of the results it produces 12 * and the results from the workers into a single output stream. Therefore, 13 * it will normally be used with a plan where running multiple copies of the 14 * same plan does not produce duplicate output, such as parallel-aware 15 * SeqScan. 16 * 17 * Alternatively, a Gather node can be configured to use just one worker 18 * and the single-copy flag can be set. In this case, the Gather node will 19 * run the plan in one worker and will not execute the plan itself. In 20 * this case, it simply returns whatever tuples were returned by the worker. 21 * If a worker cannot be obtained, then it will run the plan itself and 22 * return the results. Therefore, a plan used with a single-copy Gather 23 * node need not be parallel-aware. 24 * 25 * IDENTIFICATION 26 * src/backend/executor/nodeGather.c 27 * 28 *------------------------------------------------------------------------- 29 */ 30 31 #include "postgres.h" 32 33 #include "access/relscan.h" 34 #include "access/xact.h" 35 #include "executor/execdebug.h" 36 #include "executor/execParallel.h" 37 #include "executor/nodeGather.h" 38 #include "executor/nodeSubplan.h" 39 #include "executor/tqueue.h" 40 #include "miscadmin.h" 41 #include "optimizer/planmain.h" 42 #include "pgstat.h" 43 #include "utils/memutils.h" 44 #include "utils/rel.h" 45 46 47 static TupleTableSlot *ExecGather(PlanState *pstate); 48 static TupleTableSlot *gather_getnext(GatherState *gatherstate); 49 static HeapTuple gather_readnext(GatherState *gatherstate); 50 static void ExecShutdownGatherWorkers(GatherState *node); 51 52 53 /* ---------------------------------------------------------------- 54 * ExecInitGather 55 * ---------------------------------------------------------------- 56 */ 57 GatherState * 58 ExecInitGather(Gather *node, EState *estate, int eflags) 59 { 60 GatherState *gatherstate; 61 Plan *outerNode; 62 TupleDesc tupDesc; 63 64 /* Gather node doesn't have innerPlan node. */ 65 Assert(innerPlan(node) == NULL); 66 67 /* 68 * create state structure 69 */ 70 gatherstate = makeNode(GatherState); 71 gatherstate->ps.plan = (Plan *) node; 72 gatherstate->ps.state = estate; 73 gatherstate->ps.ExecProcNode = ExecGather; 74 75 gatherstate->initialized = false; 76 gatherstate->need_to_scan_locally = 77 !node->single_copy && parallel_leader_participation; 78 gatherstate->tuples_needed = -1; 79 80 /* 81 * Miscellaneous initialization 82 * 83 * create expression context for node 84 */ 85 ExecAssignExprContext(estate, &gatherstate->ps); 86 87 /* 88 * now initialize outer plan 89 */ 90 outerNode = outerPlan(node); 91 outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags); 92 tupDesc = ExecGetResultType(outerPlanState(gatherstate)); 93 94 /* 95 * Initialize result slot, type and projection. 96 */ 97 ExecInitResultTupleSlotTL(estate, &gatherstate->ps); 98 ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); 99 100 /* 101 * Initialize funnel slot to same tuple descriptor as outer plan. 102 */ 103 gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc); 104 105 /* 106 * Gather doesn't support checking a qual (it's always more efficient to 107 * do it in the child node). 108 */ 109 Assert(!node->plan.qual); 110 111 return gatherstate; 112 } 113 114 /* ---------------------------------------------------------------- 115 * ExecGather(node) 116 * 117 * Scans the relation via multiple workers and returns 118 * the next qualifying tuple. 119 * ---------------------------------------------------------------- 120 */ 121 static TupleTableSlot * 122 ExecGather(PlanState *pstate) 123 { 124 GatherState *node = castNode(GatherState, pstate); 125 TupleTableSlot *slot; 126 ExprContext *econtext; test2()127 128 CHECK_FOR_INTERRUPTS(); 129 130 /* 131 * Initialize the parallel context and workers on first execution. We do 132 * this on first execution rather than during node initialization, as it 133 * needs to allocate a large dynamic segment, so it is better to do it 134 * only if it is really needed. 135 */ f(T t)136 if (!node->initialized) 137 { 138 EState *estate = node->ps.state; 139 Gather *gather = (Gather *) node->ps.plan; 140 141 /* 142 * Sometimes we might have to run without parallelism; but if parallel 143 * mode is active then we can try to fire up some workers. 144 */ 145 if (gather->num_workers > 0 && estate->es_use_parallel_mode) 146 { 147 ParallelContext *pcxt; 148 149 /* Initialize, or re-initialize, shared state needed by workers. */ 150 if (!node->pei) 151 node->pei = ExecInitParallelPlan(node->ps.lefttree, 152 estate, 153 gather->initParam, 154 gather->num_workers, 155 node->tuples_needed); 156 else 157 ExecParallelReinitialize(node->ps.lefttree, 158 node->pei, 159 gather->initParam); 160 161 /* 162 * Register backend workers. We might not get as many as we 163 * requested, or indeed any at all. 164 */ 165 pcxt = node->pei->pcxt; 166 LaunchParallelWorkers(pcxt); 167 /* We save # workers launched for the benefit of EXPLAIN */ 168 node->nworkers_launched = pcxt->nworkers_launched; 169 170 /* Set up tuple queue readers to read the results. */ 171 if (pcxt->nworkers_launched > 0) 172 { 173 ExecParallelCreateReaders(node->pei); 174 /* Make a working array showing the active readers */ 175 node->nreaders = pcxt->nworkers_launched; 176 node->reader = (TupleQueueReader **) 177 palloc(node->nreaders * sizeof(TupleQueueReader *)); 178 memcpy(node->reader, node->pei->reader, 179 node->nreaders * sizeof(TupleQueueReader *)); 180 } 181 else 182 { 183 /* No workers? Then never mind. */ 184 node->nreaders = 0; 185 node->reader = NULL; 186 } 187 node->nextreader = 0; 188 } 189 190 /* Run plan locally if no workers or enabled and not single-copy. */ 191 node->need_to_scan_locally = (node->nreaders == 0) 192 || (!gather->single_copy && parallel_leader_participation); 193 node->initialized = true; 194 } 195 196 /* 197 * Reset per-tuple memory context to free any expression evaluation 198 * storage allocated in the previous tuple cycle. 199 */ 200 econtext = node->ps.ps_ExprContext; 201 ResetExprContext(econtext); 202 203 /* 204 * Get next tuple, either from one of our workers, or by running the plan 205 * ourselves. 206 */ 207 slot = gather_getnext(node); 208 if (TupIsNull(slot)) 209 return NULL; 210 211 /* If no projection is required, we're done. */ 212 if (node->ps.ps_ProjInfo == NULL) 213 return slot; 214 215 /* 216 * Form the result tuple using ExecProject(), and return it. 217 */ 218 econtext->ecxt_outertuple = slot; 219 return ExecProject(node->ps.ps_ProjInfo); 220 } 221 222 /* ---------------------------------------------------------------- 223 * ExecEndGather 224 * 225 * frees any storage allocated through C routines. 226 * ---------------------------------------------------------------- 227 */ 228 void 229 ExecEndGather(GatherState *node) 230 { 231 ExecEndNode(outerPlanState(node)); /* let children clean up first */ 232 ExecShutdownGather(node); 233 ExecFreeExprContext(&node->ps); 234 ExecClearTuple(node->ps.ps_ResultTupleSlot); 235 } 236 237 /* 238 * Read the next tuple. We might fetch a tuple from one of the tuple queues 239 * using gather_readnext, or if no tuple queue contains a tuple and the 240 * single_copy flag is not set, we might generate one locally instead. 241 */ 242 static TupleTableSlot * 243 gather_getnext(GatherState *gatherstate) 244 { 245 PlanState *outerPlan = outerPlanState(gatherstate); 246 TupleTableSlot *outerTupleSlot; 247 TupleTableSlot *fslot = gatherstate->funnel_slot; 248 HeapTuple tup; 249 250 while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) 251 { 252 CHECK_FOR_INTERRUPTS(); 253 254 if (gatherstate->nreaders > 0) 255 { 256 tup = gather_readnext(gatherstate); 257 258 if (HeapTupleIsValid(tup)) 259 { 260 ExecStoreTuple(tup, /* tuple to store */ 261 fslot, /* slot in which to store the tuple */ 262 InvalidBuffer, /* buffer associated with this 263 * tuple */ 264 true); /* pfree tuple when done with it */ 265 return fslot; 266 } 267 } 268 269 if (gatherstate->need_to_scan_locally) 270 { 271 EState *estate = gatherstate->ps.state; 272 273 /* Install our DSA area while executing the plan. */ 274 estate->es_query_dsa = 275 gatherstate->pei ? gatherstate->pei->area : NULL; 276 outerTupleSlot = ExecProcNode(outerPlan); 277 estate->es_query_dsa = NULL; 278 279 if (!TupIsNull(outerTupleSlot)) 280 return outerTupleSlot; 281 282 gatherstate->need_to_scan_locally = false; 283 } 284 } 285 286 return ExecClearTuple(fslot); 287 } 288 289 /* 290 * Attempt to read a tuple from one of our parallel workers. 291 */ 292 static HeapTuple 293 gather_readnext(GatherState *gatherstate) 294 { 295 int nvisited = 0; 296 297 for (;;) 298 { 299 TupleQueueReader *reader; 300 HeapTuple tup; 301 bool readerdone; 302 303 /* Check for async events, particularly messages from workers. */ 304 CHECK_FOR_INTERRUPTS(); 305 306 /* 307 * Attempt to read a tuple, but don't block if none is available. 308 * 309 * Note that TupleQueueReaderNext will just return NULL for a worker 310 * which fails to initialize. We'll treat that worker as having 311 * produced no tuples; WaitForParallelWorkersToFinish will error out 312 * when we get there. 313 */ 314 Assert(gatherstate->nextreader < gatherstate->nreaders); 315 reader = gatherstate->reader[gatherstate->nextreader]; 316 tup = TupleQueueReaderNext(reader, true, &readerdone); 317 318 /* 319 * If this reader is done, remove it from our working array of active 320 * readers. If all readers are done, we're outta here. 321 */ 322 if (readerdone) 323 { 324 Assert(!tup); 325 --gatherstate->nreaders; 326 if (gatherstate->nreaders == 0) 327 { 328 ExecShutdownGatherWorkers(gatherstate); 329 return NULL; 330 } 331 memmove(&gatherstate->reader[gatherstate->nextreader], 332 &gatherstate->reader[gatherstate->nextreader + 1], 333 sizeof(TupleQueueReader *) 334 * (gatherstate->nreaders - gatherstate->nextreader)); 335 if (gatherstate->nextreader >= gatherstate->nreaders) 336 gatherstate->nextreader = 0; 337 continue; 338 } 339 340 /* If we got a tuple, return it. */ 341 if (tup) 342 return tup; 343 344 /* 345 * Advance nextreader pointer in round-robin fashion. Note that we 346 * only reach this code if we weren't able to get a tuple from the 347 * current worker. We used to advance the nextreader pointer after 348 * every tuple, but it turns out to be much more efficient to keep 349 * reading from the same queue until that would require blocking. 350 */ 351 gatherstate->nextreader++; 352 if (gatherstate->nextreader >= gatherstate->nreaders) 353 gatherstate->nextreader = 0; 354 355 /* Have we visited every (surviving) TupleQueueReader? */ 356 nvisited++; 357 if (nvisited >= gatherstate->nreaders) 358 { 359 /* 360 * If (still) running plan locally, return NULL so caller can 361 * generate another tuple from the local copy of the plan. 362 */ 363 if (gatherstate->need_to_scan_locally) 364 return NULL; 365 366 /* Nothing to do except wait for developments. */ 367 WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER); 368 ResetLatch(MyLatch); 369 nvisited = 0; 370 } 371 } 372 } 373 374 /* ---------------------------------------------------------------- 375 * ExecShutdownGatherWorkers 376 * 377 * Stop all the parallel workers. 378 * ---------------------------------------------------------------- 379 */ 380 static void 381 ExecShutdownGatherWorkers(GatherState *node) 382 { 383 if (node->pei != NULL) 384 ExecParallelFinish(node->pei); 385 386 /* Flush local copy of reader array */ 387 if (node->reader) 388 pfree(node->reader); 389 node->reader = NULL; 390 } 391 392 /* ---------------------------------------------------------------- 393 * ExecShutdownGather 394 * 395 * Destroy the setup for parallel workers including parallel context. 396 * ---------------------------------------------------------------- 397 */ 398 void 399 ExecShutdownGather(GatherState *node) 400 { 401 ExecShutdownGatherWorkers(node); 402 403 /* Now destroy the parallel context. */ 404 if (node->pei != NULL) 405 { 406 ExecParallelCleanup(node->pei); 407 node->pei = NULL; 408 } 409 } 410 411 /* ---------------------------------------------------------------- 412 * Join Support 413 * ---------------------------------------------------------------- 414 */ 415 416 /* ---------------------------------------------------------------- 417 * ExecReScanGather 418 * 419 * Prepare to re-scan the result of a Gather. 420 * ---------------------------------------------------------------- 421 */ 422 void 423 ExecReScanGather(GatherState *node) 424 { 425 Gather *gather = (Gather *) node->ps.plan; 426 PlanState *outerPlan = outerPlanState(node); 427 428 /* Make sure any existing workers are gracefully shut down */ 429 ExecShutdownGatherWorkers(node); 430 431 /* Mark node so that shared state will be rebuilt at next call */ 432 node->initialized = false; 433 434 /* 435 * Set child node's chgParam to tell it that the next scan might deliver a 436 * different set of rows within the leader process. (The overall rowset 437 * shouldn't change, but the leader process's subset might; hence nodes 438 * between here and the parallel table scan node mustn't optimize on the 439 * assumption of an unchanging rowset.) 440 */ 441 if (gather->rescan_param >= 0) 442 outerPlan->chgParam = bms_add_member(outerPlan->chgParam, 443 gather->rescan_param); 444 445 /* 446 * If chgParam of subnode is not null then plan will be re-scanned by 447 * first ExecProcNode. Note: because this does nothing if we have a 448 * rescan_param, it's currently guaranteed that parallel-aware child nodes 449 * will not see a ReScan call until after they get a ReInitializeDSM call. 450 * That ordering might not be something to rely on, though. A good rule 451 * of thumb is that ReInitializeDSM should reset only shared state, ReScan 452 * should reset only local state, and anything that depends on both of 453 * those steps being finished must wait until the first ExecProcNode call. 454 */ 455 if (outerPlan->chgParam == NULL) 456 ExecReScan(outerPlan); 457 } 458