1 /*------------------------------------------------------------------------- 2 * 3 * nodeGather.c 4 * Support routines for scanning a plan via multiple workers. 5 * 6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * A Gather executor launches parallel workers to run multiple copies of a 10 * plan. It can also run the plan itself, if the workers are not available 11 * or have not started up yet. It then merges all of the results it produces 12 * and the results from the workers into a single output stream. Therefore, 13 * it will normally be used with a plan where running multiple copies of the 14 * same plan does not produce duplicate output, such as parallel-aware 15 * SeqScan. 16 * 17 * Alternatively, a Gather node can be configured to use just one worker foo(int * a)18 * and the single-copy flag can be set. In this case, the Gather node will 19 * run the plan in one worker and will not execute the plan itself. In 20 * this case, it simply returns whatever tuples were returned by the worker. 21 * If a worker cannot be obtained, then it will run the plan itself and 22 * return the results. Therefore, a plan used with a single-copy Gather 23 * node need not be parallel-aware. 24 * 25 * IDENTIFICATION 26 * src/backend/executor/nodeGather.c 27 * 28 *------------------------------------------------------------------------- 29 */ 30 31 #include "postgres.h" 32 33 #include "access/relscan.h" 34 #include "access/xact.h" 35 #include "executor/execdebug.h" 36 #include "executor/execParallel.h" 37 #include "executor/nodeGather.h" 38 #include "executor/nodeSubplan.h" 39 #include "executor/tqueue.h" 40 #include "miscadmin.h" 41 #include "optimizer/optimizer.h" 42 #include "pgstat.h" 43 #include "utils/memutils.h" 44 #include "utils/rel.h" 45 46 47 static TupleTableSlot *ExecGather(PlanState *pstate); 48 static TupleTableSlot *gather_getnext(GatherState *gatherstate); 49 static MinimalTuple gather_readnext(GatherState *gatherstate); 50 static void ExecShutdownGatherWorkers(GatherState *node); 51 52 53 /* ---------------------------------------------------------------- 54 * ExecInitGather 55 * ---------------------------------------------------------------- 56 */ 57 GatherState * 58 ExecInitGather(Gather *node, EState *estate, int eflags) 59 { 60 GatherState *gatherstate; 61 Plan *outerNode; 62 TupleDesc tupDesc; 63 64 /* Gather node doesn't have innerPlan node. */ 65 Assert(innerPlan(node) == NULL); 66 67 /* 68 * create state structure 69 */ 70 gatherstate = makeNode(GatherState); 71 gatherstate->ps.plan = (Plan *) node; 72 gatherstate->ps.state = estate; 73 gatherstate->ps.ExecProcNode = ExecGather; 74 75 gatherstate->initialized = false; 76 gatherstate->need_to_scan_locally = 77 !node->single_copy && parallel_leader_participation; 78 gatherstate->tuples_needed = -1; 79 80 /* 81 * Miscellaneous initialization 82 * 83 * create expression context for node 84 */ 85 ExecAssignExprContext(estate, &gatherstate->ps); 86 87 /* 88 * now initialize outer plan 89 */ 90 outerNode = outerPlan(node); 91 outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags); 92 tupDesc = ExecGetResultType(outerPlanState(gatherstate)); 93 94 /* 95 * Leader may access ExecProcNode result directly (if 96 * need_to_scan_locally), or from workers via tuple queue. So we can't 97 * trivially rely on the slot type being fixed for expressions evaluated 98 * within this node. 99 */ 100 gatherstate->ps.outeropsset = true; 101 gatherstate->ps.outeropsfixed = false; 102 103 /* 104 * Initialize result type and projection. 105 */ 106 ExecInitResultTypeTL(&gatherstate->ps); 107 ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); 108 109 /* 110 * Without projections result slot type is not trivially known, see 111 * comment above. 112 */ 113 if (gatherstate->ps.ps_ProjInfo == NULL) 114 { 115 gatherstate->ps.resultopsset = true; 116 gatherstate->ps.resultopsfixed = false; 117 } 118 119 /* 120 * Initialize funnel slot to same tuple descriptor as outer plan. 121 */ 122 gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc, 123 &TTSOpsMinimalTuple); 124 125 /* 126 * Gather doesn't support checking a qual (it's always more efficient to 127 * do it in the child node). 128 */ 129 Assert(!node->plan.qual); 130 131 return gatherstate; 132 } 133 134 /* ---------------------------------------------------------------- 135 * ExecGather(node) 136 * 137 * Scans the relation via multiple workers and returns 138 * the next qualifying tuple. 139 * ---------------------------------------------------------------- 140 */ 141 static TupleTableSlot * 142 ExecGather(PlanState *pstate) 143 { 144 GatherState *node = castNode(GatherState, pstate); 145 TupleTableSlot *slot; 146 ExprContext *econtext; 147 148 CHECK_FOR_INTERRUPTS(); 149 150 /* 151 * Initialize the parallel context and workers on first execution. We do 152 * this on first execution rather than during node initialization, as it 153 * needs to allocate a large dynamic segment, so it is better to do it 154 * only if it is really needed. 155 */ 156 if (!node->initialized) 157 { 158 EState *estate = node->ps.state; 159 Gather *gather = (Gather *) node->ps.plan; 160 161 /* 162 * Sometimes we might have to run without parallelism; but if parallel 163 * mode is active then we can try to fire up some workers. 164 */ 165 if (gather->num_workers > 0 && estate->es_use_parallel_mode) 166 { 167 ParallelContext *pcxt; 168 169 /* Initialize, or re-initialize, shared state needed by workers. */ 170 if (!node->pei) 171 node->pei = ExecInitParallelPlan(node->ps.lefttree, 172 estate, 173 gather->initParam, 174 gather->num_workers, 175 node->tuples_needed); 176 else 177 ExecParallelReinitialize(node->ps.lefttree, 178 node->pei, 179 gather->initParam); 180 181 /* 182 * Register backend workers. We might not get as many as we 183 * requested, or indeed any at all. 184 */ 185 pcxt = node->pei->pcxt; 186 LaunchParallelWorkers(pcxt); 187 /* We save # workers launched for the benefit of EXPLAIN */ 188 node->nworkers_launched = pcxt->nworkers_launched; 189 190 /* Set up tuple queue readers to read the results. */ 191 if (pcxt->nworkers_launched > 0) 192 { 193 ExecParallelCreateReaders(node->pei); 194 /* Make a working array showing the active readers */ 195 node->nreaders = pcxt->nworkers_launched; 196 node->reader = (TupleQueueReader **) 197 palloc(node->nreaders * sizeof(TupleQueueReader *)); 198 memcpy(node->reader, node->pei->reader, 199 node->nreaders * sizeof(TupleQueueReader *)); 200 } 201 else 202 { 203 /* No workers? Then never mind. */ 204 node->nreaders = 0; 205 node->reader = NULL; 206 } 207 node->nextreader = 0; 208 } 209 210 /* Run plan locally if no workers or enabled and not single-copy. */ 211 node->need_to_scan_locally = (node->nreaders == 0) 212 || (!gather->single_copy && parallel_leader_participation); 213 node->initialized = true; 214 } 215 216 /* 217 * Reset per-tuple memory context to free any expression evaluation 218 * storage allocated in the previous tuple cycle. 219 */ 220 econtext = node->ps.ps_ExprContext; 221 ResetExprContext(econtext); 222 223 /* 224 * Get next tuple, either from one of our workers, or by running the plan 225 * ourselves. 226 */ 227 slot = gather_getnext(node); 228 if (TupIsNull(slot)) 229 return NULL; 230 231 /* If no projection is required, we're done. */ 232 if (node->ps.ps_ProjInfo == NULL) 233 return slot; 234 235 /* 236 * Form the result tuple using ExecProject(), and return it. 237 */ 238 econtext->ecxt_outertuple = slot; 239 return ExecProject(node->ps.ps_ProjInfo); 240 } 241 242 /* ---------------------------------------------------------------- 243 * ExecEndGather 244 * 245 * frees any storage allocated through C routines. 246 * ---------------------------------------------------------------- 247 */ 248 void 249 ExecEndGather(GatherState *node) 250 { 251 ExecEndNode(outerPlanState(node)); /* let children clean up first */ 252 ExecShutdownGather(node); 253 ExecFreeExprContext(&node->ps); 254 if (node->ps.ps_ResultTupleSlot) 255 ExecClearTuple(node->ps.ps_ResultTupleSlot); 256 } 257 258 /* 259 * Read the next tuple. We might fetch a tuple from one of the tuple queues 260 * using gather_readnext, or if no tuple queue contains a tuple and the 261 * single_copy flag is not set, we might generate one locally instead. 262 */ 263 static TupleTableSlot * 264 gather_getnext(GatherState *gatherstate) 265 { 266 PlanState *outerPlan = outerPlanState(gatherstate); 267 TupleTableSlot *outerTupleSlot; 268 TupleTableSlot *fslot = gatherstate->funnel_slot; 269 MinimalTuple tup; 270 271 while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) 272 { 273 CHECK_FOR_INTERRUPTS(); 274 275 if (gatherstate->nreaders > 0) 276 { 277 tup = gather_readnext(gatherstate); 278 279 if (HeapTupleIsValid(tup)) 280 { 281 ExecStoreMinimalTuple(tup, /* tuple to store */ 282 fslot, /* slot to store the tuple */ 283 false); /* don't pfree tuple */ 284 return fslot; 285 } 286 } 287 288 if (gatherstate->need_to_scan_locally) 289 { 290 EState *estate = gatherstate->ps.state; 291 292 /* Install our DSA area while executing the plan. */ 293 estate->es_query_dsa = 294 gatherstate->pei ? gatherstate->pei->area : NULL; 295 outerTupleSlot = ExecProcNode(outerPlan); 296 estate->es_query_dsa = NULL; 297 298 if (!TupIsNull(outerTupleSlot)) 299 return outerTupleSlot; 300 301 gatherstate->need_to_scan_locally = false; 302 } 303 } 304 305 return ExecClearTuple(fslot); 306 } 307 308 /* 309 * Attempt to read a tuple from one of our parallel workers. 310 */ 311 static MinimalTuple 312 gather_readnext(GatherState *gatherstate) 313 { 314 int nvisited = 0; 315 316 for (;;) 317 { 318 TupleQueueReader *reader; 319 MinimalTuple tup; 320 bool readerdone; 321 322 /* Check for async events, particularly messages from workers. */ 323 CHECK_FOR_INTERRUPTS(); 324 325 /* 326 * Attempt to read a tuple, but don't block if none is available. 327 * 328 * Note that TupleQueueReaderNext will just return NULL for a worker 329 * which fails to initialize. We'll treat that worker as having 330 * produced no tuples; WaitForParallelWorkersToFinish will error out 331 * when we get there. 332 */ 333 Assert(gatherstate->nextreader < gatherstate->nreaders); 334 reader = gatherstate->reader[gatherstate->nextreader]; 335 tup = TupleQueueReaderNext(reader, true, &readerdone); 336 337 /* 338 * If this reader is done, remove it from our working array of active 339 * readers. If all readers are done, we're outta here. 340 */ 341 if (readerdone) 342 { 343 Assert(!tup); 344 --gatherstate->nreaders; 345 if (gatherstate->nreaders == 0) 346 { 347 ExecShutdownGatherWorkers(gatherstate); 348 return NULL; 349 } 350 memmove(&gatherstate->reader[gatherstate->nextreader], 351 &gatherstate->reader[gatherstate->nextreader + 1], 352 sizeof(TupleQueueReader *) 353 * (gatherstate->nreaders - gatherstate->nextreader)); 354 if (gatherstate->nextreader >= gatherstate->nreaders) 355 gatherstate->nextreader = 0; 356 continue; 357 } 358 359 /* If we got a tuple, return it. */ 360 if (tup) 361 return tup; 362 363 /* 364 * Advance nextreader pointer in round-robin fashion. Note that we 365 * only reach this code if we weren't able to get a tuple from the 366 * current worker. We used to advance the nextreader pointer after 367 * every tuple, but it turns out to be much more efficient to keep 368 * reading from the same queue until that would require blocking. 369 */ 370 gatherstate->nextreader++; 371 if (gatherstate->nextreader >= gatherstate->nreaders) 372 gatherstate->nextreader = 0; 373 374 /* Have we visited every (surviving) TupleQueueReader? */ 375 nvisited++; 376 if (nvisited >= gatherstate->nreaders) 377 { 378 /* 379 * If (still) running plan locally, return NULL so caller can 380 * generate another tuple from the local copy of the plan. 381 */ 382 if (gatherstate->need_to_scan_locally) 383 return NULL; 384 385 /* Nothing to do except wait for developments. */ 386 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, 387 WAIT_EVENT_EXECUTE_GATHER); 388 ResetLatch(MyLatch); 389 nvisited = 0; 390 } 391 } 392 } 393 394 /* ---------------------------------------------------------------- 395 * ExecShutdownGatherWorkers 396 * 397 * Stop all the parallel workers. 398 * ---------------------------------------------------------------- 399 */ 400 static void 401 ExecShutdownGatherWorkers(GatherState *node) 402 { 403 if (node->pei != NULL) 404 ExecParallelFinish(node->pei); 405 406 /* Flush local copy of reader array */ 407 if (node->reader) 408 pfree(node->reader); 409 node->reader = NULL; 410 } 411 412 /* ---------------------------------------------------------------- 413 * ExecShutdownGather 414 * 415 * Destroy the setup for parallel workers including parallel context. 416 * ---------------------------------------------------------------- 417 */ 418 void 419 ExecShutdownGather(GatherState *node) 420 { 421 ExecShutdownGatherWorkers(node); 422 423 /* Now destroy the parallel context. */ 424 if (node->pei != NULL) 425 { 426 ExecParallelCleanup(node->pei); 427 node->pei = NULL; 428 } 429 } 430 431 /* ---------------------------------------------------------------- 432 * Join Support 433 * ---------------------------------------------------------------- 434 */ 435 436 /* ---------------------------------------------------------------- 437 * ExecReScanGather 438 * 439 * Prepare to re-scan the result of a Gather. 440 * ---------------------------------------------------------------- 441 */ 442 void 443 ExecReScanGather(GatherState *node) 444 { 445 Gather *gather = (Gather *) node->ps.plan; 446 PlanState *outerPlan = outerPlanState(node); 447 448 /* Make sure any existing workers are gracefully shut down */ 449 ExecShutdownGatherWorkers(node); 450 451 /* Mark node so that shared state will be rebuilt at next call */ 452 node->initialized = false; 453 454 /* 455 * Set child node's chgParam to tell it that the next scan might deliver a 456 * different set of rows within the leader process. (The overall rowset 457 * shouldn't change, but the leader process's subset might; hence nodes 458 * between here and the parallel table scan node mustn't optimize on the 459 * assumption of an unchanging rowset.) 460 */ 461 if (gather->rescan_param >= 0) 462 outerPlan->chgParam = bms_add_member(outerPlan->chgParam, 463 gather->rescan_param); 464 465 /* 466 * If chgParam of subnode is not null then plan will be re-scanned by 467 * first ExecProcNode. Note: because this does nothing if we have a 468 * rescan_param, it's currently guaranteed that parallel-aware child nodes 469 * will not see a ReScan call until after they get a ReInitializeDSM call. 470 * That ordering might not be something to rely on, though. A good rule 471 * of thumb is that ReInitializeDSM should reset only shared state, ReScan 472 * should reset only local state, and anything that depends on both of 473 * those steps being finished must wait until the first ExecProcNode call. 474 */ 475 if (outerPlan->chgParam == NULL) 476 ExecReScan(outerPlan); 477 } 478