1 /*-------------------------------------------------------------------------
2 *
3 * execParallel.c
4 * Support routines for parallel execution.
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * This file contains routines that are intended to support setting up,
10 * using, and tearing down a ParallelContext from within the PostgreSQL
11 * executor. The ParallelContext machinery will handle starting the
12 * workers and ensuring that their state generally matches that of the
13 * leader; see src/backend/access/transam/README.parallel for details.
14 * However, we must save and restore relevant executor state, such as
15 * any ParamListInfo associated with the query, buffer usage info, and
16 * the actual plan to be passed down to the worker.
17 *
18 * IDENTIFICATION
19 * src/backend/executor/execParallel.c
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAppend.h"
29 #include "executor/nodeBitmapHeapscan.h"
30 #include "executor/nodeCustom.h"
31 #include "executor/nodeForeignscan.h"
32 #include "executor/nodeHash.h"
33 #include "executor/nodeHashjoin.h"
34 #include "executor/nodeIndexscan.h"
35 #include "executor/nodeIndexonlyscan.h"
36 #include "executor/nodeSeqscan.h"
37 #include "executor/nodeSort.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "jit/jit.h"
41 #include "nodes/nodeFuncs.h"
42 #include "optimizer/planmain.h"
43 #include "optimizer/planner.h"
44 #include "storage/spin.h"
45 #include "tcop/tcopprot.h"
46 #include "utils/datum.h"
47 #include "utils/dsa.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/snapmgr.h"
51 #include "pgstat.h"
52
53 /*
54 * Magic numbers for parallel executor communication. We use constants
55 * greater than any 32-bit integer here so that values < 2^32 can be used
56 * by individual parallel nodes to store their own state.
57 */
58 #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
59 #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
60 #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
61 #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
62 #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
63 #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
64 #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
65 #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
66 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67
68 #define PARALLEL_TUPLE_QUEUE_SIZE 65536
69
70 /*
71 * Fixed-size random stuff that we need to pass to parallel workers.
72 */
73 typedef struct FixedParallelExecutorState
74 {
75 int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
76 dsa_pointer param_exec;
77 int eflags;
78 int jit_flags;
79 } FixedParallelExecutorState;
80
81 /*
82 * DSM structure for accumulating per-PlanState instrumentation.
83 *
84 * instrument_options: Same meaning here as in instrument.c.
85 *
86 * instrument_offset: Offset, relative to the start of this structure,
87 * of the first Instrumentation object. This will depend on the length of
88 * the plan_node_id array.
89 *
90 * num_workers: Number of workers.
91 *
92 * num_plan_nodes: Number of plan nodes.
93 *
94 * plan_node_id: Array of plan nodes for which we are gathering instrumentation
95 * from parallel workers. The length of this array is given by num_plan_nodes.
96 */
97 struct SharedExecutorInstrumentation
98 {
99 int instrument_options;
100 int instrument_offset;
101 int num_workers;
102 int num_plan_nodes;
103 int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
104 /* array of num_plan_nodes * num_workers Instrumentation objects follows */
105 };
106 #define GetInstrumentationArray(sei) \
107 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
108 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
109
110 /* Context object for ExecParallelEstimate. */
111 typedef struct ExecParallelEstimateContext
112 {
113 ParallelContext *pcxt;
114 int nnodes;
115 } ExecParallelEstimateContext;
116
117 /* Context object for ExecParallelInitializeDSM. */
118 typedef struct ExecParallelInitializeDSMContext
119 {
120 ParallelContext *pcxt;
121 SharedExecutorInstrumentation *instrumentation;
122 int nnodes;
123 } ExecParallelInitializeDSMContext;
124
125 /* Helper functions that run in the parallel leader. */
126 static char *ExecSerializePlan(Plan *plan, EState *estate);
127 static bool ExecParallelEstimate(PlanState *node,
128 ExecParallelEstimateContext *e);
129 static bool ExecParallelInitializeDSM(PlanState *node,
130 ExecParallelInitializeDSMContext *d);
131 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
132 bool reinitialize);
133 static bool ExecParallelReInitializeDSM(PlanState *planstate,
134 ParallelContext *pcxt);
135 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
136 SharedExecutorInstrumentation *instrumentation);
137
138 /* Helper function that runs in the parallel worker. */
139 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
140
141 /*
142 * Create a serialized representation of the plan to be sent to each worker.
143 */
144 static char *
ExecSerializePlan(Plan * plan,EState * estate)145 ExecSerializePlan(Plan *plan, EState *estate)
146 {
147 PlannedStmt *pstmt;
148 ListCell *lc;
149
150 /* We can't scribble on the original plan, so make a copy. */
151 plan = copyObject(plan);
152
153 /*
154 * The worker will start its own copy of the executor, and that copy will
155 * insert a junk filter if the toplevel node has any resjunk entries. We
156 * don't want that to happen, because while resjunk columns shouldn't be
157 * sent back to the user, here the tuples are coming back to another
158 * backend which may very well need them. So mutate the target list
159 * accordingly. This is sort of a hack; there might be better ways to do
160 * this...
161 */
162 foreach(lc, plan->targetlist)
163 {
164 TargetEntry *tle = lfirst_node(TargetEntry, lc);
165
166 tle->resjunk = false;
167 }
168
169 /*
170 * Create a dummy PlannedStmt. Most of the fields don't need to be valid
171 * for our purposes, but the worker will need at least a minimal
172 * PlannedStmt to start the executor.
173 */
174 pstmt = makeNode(PlannedStmt);
175 pstmt->commandType = CMD_SELECT;
176 pstmt->queryId = UINT64CONST(0);
177 pstmt->hasReturning = false;
178 pstmt->hasModifyingCTE = false;
179 pstmt->canSetTag = true;
180 pstmt->transientPlan = false;
181 pstmt->dependsOnRole = false;
182 pstmt->parallelModeNeeded = false;
183 pstmt->planTree = plan;
184 pstmt->rtable = estate->es_range_table;
185 pstmt->resultRelations = NIL;
186 pstmt->nonleafResultRelations = NIL;
187
188 /*
189 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
190 * for unsafe ones (so that the list indexes of the safe ones are
191 * preserved). This positively ensures that the worker won't try to run,
192 * or even do ExecInitNode on, an unsafe subplan. That's important to
193 * protect, eg, non-parallel-aware FDWs from getting into trouble.
194 */
195 pstmt->subplans = NIL;
196 foreach(lc, estate->es_plannedstmt->subplans)
197 {
198 Plan *subplan = (Plan *) lfirst(lc);
199
200 if (subplan && !subplan->parallel_safe)
201 subplan = NULL;
202 pstmt->subplans = lappend(pstmt->subplans, subplan);
203 }
204
205 pstmt->rewindPlanIDs = NULL;
206 pstmt->rowMarks = NIL;
207 pstmt->relationOids = NIL;
208 pstmt->invalItems = NIL; /* workers can't replan anyway... */
209 pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
210 pstmt->utilityStmt = NULL;
211 pstmt->stmt_location = -1;
212 pstmt->stmt_len = -1;
213
214 /* Return serialized copy of our dummy PlannedStmt. */
215 return nodeToString(pstmt);
216 }
217
218 /*
219 * Parallel-aware plan nodes (and occasionally others) may need some state
220 * which is shared across all parallel workers. Before we size the DSM, give
221 * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
222 * &pcxt->estimator.
223 *
224 * While we're at it, count the number of PlanState nodes in the tree, so
225 * we know how many Instrumentation structures we need.
226 */
227 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)228 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
229 {
230 if (planstate == NULL)
231 return false;
232
233 /* Count this node. */
234 e->nnodes++;
235
236 switch (nodeTag(planstate))
237 {
238 case T_SeqScanState:
239 if (planstate->plan->parallel_aware)
240 ExecSeqScanEstimate((SeqScanState *) planstate,
241 e->pcxt);
242 break;
243 case T_IndexScanState:
244 if (planstate->plan->parallel_aware)
245 ExecIndexScanEstimate((IndexScanState *) planstate,
246 e->pcxt);
247 break;
248 case T_IndexOnlyScanState:
249 if (planstate->plan->parallel_aware)
250 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
251 e->pcxt);
252 break;
253 case T_ForeignScanState:
254 if (planstate->plan->parallel_aware)
255 ExecForeignScanEstimate((ForeignScanState *) planstate,
256 e->pcxt);
257 break;
258 case T_AppendState:
259 if (planstate->plan->parallel_aware)
260 ExecAppendEstimate((AppendState *) planstate,
261 e->pcxt);
262 break;
263 case T_CustomScanState:
264 if (planstate->plan->parallel_aware)
265 ExecCustomScanEstimate((CustomScanState *) planstate,
266 e->pcxt);
267 break;
268 case T_BitmapHeapScanState:
269 if (planstate->plan->parallel_aware)
270 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
271 e->pcxt);
272 break;
273 case T_HashJoinState:
274 if (planstate->plan->parallel_aware)
275 ExecHashJoinEstimate((HashJoinState *) planstate,
276 e->pcxt);
277 break;
278 case T_HashState:
279 /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 ExecHashEstimate((HashState *) planstate, e->pcxt);
281 break;
282 case T_SortState:
283 /* even when not parallel-aware, for EXPLAIN ANALYZE */
284 ExecSortEstimate((SortState *) planstate, e->pcxt);
285 break;
286
287 default:
288 break;
289 }
290
291 return planstate_tree_walker(planstate, ExecParallelEstimate, e);
292 }
293
294 /*
295 * Estimate the amount of space required to serialize the indicated parameters.
296 */
297 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)298 EstimateParamExecSpace(EState *estate, Bitmapset *params)
299 {
300 int paramid;
301 Size sz = sizeof(int);
302
303 paramid = -1;
304 while ((paramid = bms_next_member(params, paramid)) >= 0)
305 {
306 Oid typeOid;
307 int16 typLen;
308 bool typByVal;
309 ParamExecData *prm;
310
311 prm = &(estate->es_param_exec_vals[paramid]);
312 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
313 paramid);
314
315 sz = add_size(sz, sizeof(int)); /* space for paramid */
316
317 /* space for datum/isnull */
318 if (OidIsValid(typeOid))
319 get_typlenbyval(typeOid, &typLen, &typByVal);
320 else
321 {
322 /* If no type OID, assume by-value, like copyParamList does. */
323 typLen = sizeof(Datum);
324 typByVal = true;
325 }
326 sz = add_size(sz,
327 datumEstimateSpace(prm->value, prm->isnull,
328 typByVal, typLen));
329 }
330 return sz;
331 }
332
333 /*
334 * Serialize specified PARAM_EXEC parameters.
335 *
336 * We write the number of parameters first, as a 4-byte integer, and then
337 * write details for each parameter in turn. The details for each parameter
338 * consist of a 4-byte paramid (location of param in execution time internal
339 * parameter array) and then the datum as serialized by datumSerialize().
340 */
341 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)342 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
343 {
344 Size size;
345 int nparams;
346 int paramid;
347 ParamExecData *prm;
348 dsa_pointer handle;
349 char *start_address;
350
351 /* Allocate enough space for the current parameter values. */
352 size = EstimateParamExecSpace(estate, params);
353 handle = dsa_allocate(area, size);
354 start_address = dsa_get_address(area, handle);
355
356 /* First write the number of parameters as a 4-byte integer. */
357 nparams = bms_num_members(params);
358 memcpy(start_address, &nparams, sizeof(int));
359 start_address += sizeof(int);
360
361 /* Write details for each parameter in turn. */
362 paramid = -1;
363 while ((paramid = bms_next_member(params, paramid)) >= 0)
364 {
365 Oid typeOid;
366 int16 typLen;
367 bool typByVal;
368
369 prm = &(estate->es_param_exec_vals[paramid]);
370 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
371 paramid);
372
373 /* Write paramid. */
374 memcpy(start_address, ¶mid, sizeof(int));
375 start_address += sizeof(int);
376
377 /* Write datum/isnull */
378 if (OidIsValid(typeOid))
379 get_typlenbyval(typeOid, &typLen, &typByVal);
380 else
381 {
382 /* If no type OID, assume by-value, like copyParamList does. */
383 typLen = sizeof(Datum);
384 typByVal = true;
385 }
386 datumSerialize(prm->value, prm->isnull, typByVal, typLen,
387 &start_address);
388 }
389
390 return handle;
391 }
392
393 /*
394 * Restore specified PARAM_EXEC parameters.
395 */
396 static void
RestoreParamExecParams(char * start_address,EState * estate)397 RestoreParamExecParams(char *start_address, EState *estate)
398 {
399 int nparams;
400 int i;
401 int paramid;
402
403 memcpy(&nparams, start_address, sizeof(int));
404 start_address += sizeof(int);
405
406 for (i = 0; i < nparams; i++)
407 {
408 ParamExecData *prm;
409
410 /* Read paramid */
411 memcpy(¶mid, start_address, sizeof(int));
412 start_address += sizeof(int);
413 prm = &(estate->es_param_exec_vals[paramid]);
414
415 /* Read datum/isnull. */
416 prm->value = datumRestore(&start_address, &prm->isnull);
417 prm->execPlan = NULL;
418 }
419 }
420
421 /*
422 * Initialize the dynamic shared memory segment that will be used to control
423 * parallel execution.
424 */
425 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)426 ExecParallelInitializeDSM(PlanState *planstate,
427 ExecParallelInitializeDSMContext *d)
428 {
429 if (planstate == NULL)
430 return false;
431
432 /* If instrumentation is enabled, initialize slot for this node. */
433 if (d->instrumentation != NULL)
434 d->instrumentation->plan_node_id[d->nnodes] =
435 planstate->plan->plan_node_id;
436
437 /* Count this node. */
438 d->nnodes++;
439
440 /*
441 * Call initializers for DSM-using plan nodes.
442 *
443 * Most plan nodes won't do anything here, but plan nodes that allocated
444 * DSM may need to initialize shared state in the DSM before parallel
445 * workers are launched. They can allocate the space they previously
446 * estimated using shm_toc_allocate, and add the keys they previously
447 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
448 */
449 switch (nodeTag(planstate))
450 {
451 case T_SeqScanState:
452 if (planstate->plan->parallel_aware)
453 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
454 d->pcxt);
455 break;
456 case T_IndexScanState:
457 if (planstate->plan->parallel_aware)
458 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
459 d->pcxt);
460 break;
461 case T_IndexOnlyScanState:
462 if (planstate->plan->parallel_aware)
463 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
464 d->pcxt);
465 break;
466 case T_ForeignScanState:
467 if (planstate->plan->parallel_aware)
468 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
469 d->pcxt);
470 break;
471 case T_AppendState:
472 if (planstate->plan->parallel_aware)
473 ExecAppendInitializeDSM((AppendState *) planstate,
474 d->pcxt);
475 break;
476 case T_CustomScanState:
477 if (planstate->plan->parallel_aware)
478 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
479 d->pcxt);
480 break;
481 case T_BitmapHeapScanState:
482 if (planstate->plan->parallel_aware)
483 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
484 d->pcxt);
485 break;
486 case T_HashJoinState:
487 if (planstate->plan->parallel_aware)
488 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
489 d->pcxt);
490 break;
491 case T_HashState:
492 /* even when not parallel-aware, for EXPLAIN ANALYZE */
493 ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
494 break;
495 case T_SortState:
496 /* even when not parallel-aware, for EXPLAIN ANALYZE */
497 ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
498 break;
499
500 default:
501 break;
502 }
503
504 return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
505 }
506
507 /*
508 * It sets up the response queues for backend workers to return tuples
509 * to the main backend and start the workers.
510 */
511 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)512 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
513 {
514 shm_mq_handle **responseq;
515 char *tqueuespace;
516 int i;
517
518 /* Skip this if no workers. */
519 if (pcxt->nworkers == 0)
520 return NULL;
521
522 /* Allocate memory for shared memory queue handles. */
523 responseq = (shm_mq_handle **)
524 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
525
526 /*
527 * If not reinitializing, allocate space from the DSM for the queues;
528 * otherwise, find the already allocated space.
529 */
530 if (!reinitialize)
531 tqueuespace =
532 shm_toc_allocate(pcxt->toc,
533 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
534 pcxt->nworkers));
535 else
536 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
537
538 /* Create the queues, and become the receiver for each. */
539 for (i = 0; i < pcxt->nworkers; ++i)
540 {
541 shm_mq *mq;
542
543 mq = shm_mq_create(tqueuespace +
544 ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
545 (Size) PARALLEL_TUPLE_QUEUE_SIZE);
546
547 shm_mq_set_receiver(mq, MyProc);
548 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
549 }
550
551 /* Add array of queues to shm_toc, so others can find it. */
552 if (!reinitialize)
553 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
554
555 /* Return array of handles. */
556 return responseq;
557 }
558
559 /*
560 * Sets up the required infrastructure for backend workers to perform
561 * execution and return results to the main backend.
562 */
563 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)564 ExecInitParallelPlan(PlanState *planstate, EState *estate,
565 Bitmapset *sendParams, int nworkers,
566 int64 tuples_needed)
567 {
568 ParallelExecutorInfo *pei;
569 ParallelContext *pcxt;
570 ExecParallelEstimateContext e;
571 ExecParallelInitializeDSMContext d;
572 FixedParallelExecutorState *fpes;
573 char *pstmt_data;
574 char *pstmt_space;
575 char *paramlistinfo_space;
576 BufferUsage *bufusage_space;
577 SharedExecutorInstrumentation *instrumentation = NULL;
578 SharedJitInstrumentation *jit_instrumentation = NULL;
579 int pstmt_len;
580 int paramlistinfo_len;
581 int instrumentation_len = 0;
582 int jit_instrumentation_len = 0;
583 int instrument_offset = 0;
584 Size dsa_minsize = dsa_minimum_size();
585 char *query_string;
586 int query_len;
587
588 /*
589 * Force any initplan outputs that we're going to pass to workers to be
590 * evaluated, if they weren't already.
591 *
592 * For simplicity, we use the EState's per-output-tuple ExprContext here.
593 * That risks intra-query memory leakage, since we might pass through here
594 * many times before that ExprContext gets reset; but ExecSetParamPlan
595 * doesn't normally leak any memory in the context (see its comments), so
596 * it doesn't seem worth complicating this function's API to pass it a
597 * shorter-lived ExprContext. This might need to change someday.
598 */
599 ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
600
601 /* Allocate object for return value. */
602 pei = palloc0(sizeof(ParallelExecutorInfo));
603 pei->finished = false;
604 pei->planstate = planstate;
605
606 /* Fix up and serialize plan to be sent to workers. */
607 pstmt_data = ExecSerializePlan(planstate->plan, estate);
608
609 /* Create a parallel context. */
610 pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
611 pei->pcxt = pcxt;
612
613 /*
614 * Before telling the parallel context to create a dynamic shared memory
615 * segment, we need to figure out how big it should be. Estimate space
616 * for the various things we need to store.
617 */
618
619 /* Estimate space for fixed-size state. */
620 shm_toc_estimate_chunk(&pcxt->estimator,
621 sizeof(FixedParallelExecutorState));
622 shm_toc_estimate_keys(&pcxt->estimator, 1);
623
624 /* Estimate space for query text. */
625 query_len = strlen(estate->es_sourceText);
626 shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
627 shm_toc_estimate_keys(&pcxt->estimator, 1);
628
629 /* Estimate space for serialized PlannedStmt. */
630 pstmt_len = strlen(pstmt_data) + 1;
631 shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
632 shm_toc_estimate_keys(&pcxt->estimator, 1);
633
634 /* Estimate space for serialized ParamListInfo. */
635 paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
636 shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
637 shm_toc_estimate_keys(&pcxt->estimator, 1);
638
639 /*
640 * Estimate space for BufferUsage.
641 *
642 * If EXPLAIN is not in use and there are no extensions loaded that care,
643 * we could skip this. But we have no way of knowing whether anyone's
644 * looking at pgBufferUsage, so do it unconditionally.
645 */
646 shm_toc_estimate_chunk(&pcxt->estimator,
647 mul_size(sizeof(BufferUsage), pcxt->nworkers));
648 shm_toc_estimate_keys(&pcxt->estimator, 1);
649
650 /* Estimate space for tuple queues. */
651 shm_toc_estimate_chunk(&pcxt->estimator,
652 mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
653 shm_toc_estimate_keys(&pcxt->estimator, 1);
654
655 /*
656 * Give parallel-aware nodes a chance to add to the estimates, and get a
657 * count of how many PlanState nodes there are.
658 */
659 e.pcxt = pcxt;
660 e.nnodes = 0;
661 ExecParallelEstimate(planstate, &e);
662
663 /* Estimate space for instrumentation, if required. */
664 if (estate->es_instrument)
665 {
666 instrumentation_len =
667 offsetof(SharedExecutorInstrumentation, plan_node_id) +
668 sizeof(int) * e.nnodes;
669 instrumentation_len = MAXALIGN(instrumentation_len);
670 instrument_offset = instrumentation_len;
671 instrumentation_len +=
672 mul_size(sizeof(Instrumentation),
673 mul_size(e.nnodes, nworkers));
674 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
675 shm_toc_estimate_keys(&pcxt->estimator, 1);
676
677 /* Estimate space for JIT instrumentation, if required. */
678 if (estate->es_jit_flags != PGJIT_NONE)
679 {
680 jit_instrumentation_len =
681 offsetof(SharedJitInstrumentation, jit_instr) +
682 sizeof(JitInstrumentation) * nworkers;
683 shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
684 shm_toc_estimate_keys(&pcxt->estimator, 1);
685 }
686 }
687
688 /* Estimate space for DSA area. */
689 shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
690 shm_toc_estimate_keys(&pcxt->estimator, 1);
691
692 /* Everyone's had a chance to ask for space, so now create the DSM. */
693 InitializeParallelDSM(pcxt);
694
695 /*
696 * OK, now we have a dynamic shared memory segment, and it should be big
697 * enough to store all of the data we estimated we would want to put into
698 * it, plus whatever general stuff (not specifically executor-related) the
699 * ParallelContext itself needs to store there. None of the space we
700 * asked for has been allocated or initialized yet, though, so do that.
701 */
702
703 /* Store fixed-size state. */
704 fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
705 fpes->tuples_needed = tuples_needed;
706 fpes->param_exec = InvalidDsaPointer;
707 fpes->eflags = estate->es_top_eflags;
708 fpes->jit_flags = estate->es_jit_flags;
709 shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
710
711 /* Store query string */
712 query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
713 memcpy(query_string, estate->es_sourceText, query_len + 1);
714 shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
715
716 /* Store serialized PlannedStmt. */
717 pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
718 memcpy(pstmt_space, pstmt_data, pstmt_len);
719 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
720
721 /* Store serialized ParamListInfo. */
722 paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
723 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
724 SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
725
726 /* Allocate space for each worker's BufferUsage; no need to initialize. */
727 bufusage_space = shm_toc_allocate(pcxt->toc,
728 mul_size(sizeof(BufferUsage), pcxt->nworkers));
729 shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
730 pei->buffer_usage = bufusage_space;
731
732 /* Set up the tuple queues that the workers will write into. */
733 pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
734
735 /* We don't need the TupleQueueReaders yet, though. */
736 pei->reader = NULL;
737
738 /*
739 * If instrumentation options were supplied, allocate space for the data.
740 * It only gets partially initialized here; the rest happens during
741 * ExecParallelInitializeDSM.
742 */
743 if (estate->es_instrument)
744 {
745 Instrumentation *instrument;
746 int i;
747
748 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
749 instrumentation->instrument_options = estate->es_instrument;
750 instrumentation->instrument_offset = instrument_offset;
751 instrumentation->num_workers = nworkers;
752 instrumentation->num_plan_nodes = e.nnodes;
753 instrument = GetInstrumentationArray(instrumentation);
754 for (i = 0; i < nworkers * e.nnodes; ++i)
755 InstrInit(&instrument[i], estate->es_instrument);
756 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
757 instrumentation);
758 pei->instrumentation = instrumentation;
759
760 if (estate->es_jit_flags != PGJIT_NONE)
761 {
762 jit_instrumentation = shm_toc_allocate(pcxt->toc,
763 jit_instrumentation_len);
764 jit_instrumentation->num_workers = nworkers;
765 memset(jit_instrumentation->jit_instr, 0,
766 sizeof(JitInstrumentation) * nworkers);
767 shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
768 jit_instrumentation);
769 pei->jit_instrumentation = jit_instrumentation;
770 }
771 }
772
773 /*
774 * Create a DSA area that can be used by the leader and all workers.
775 * (However, if we failed to create a DSM and are using private memory
776 * instead, then skip this.)
777 */
778 if (pcxt->seg != NULL)
779 {
780 char *area_space;
781
782 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
783 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
784 pei->area = dsa_create_in_place(area_space, dsa_minsize,
785 LWTRANCHE_PARALLEL_QUERY_DSA,
786 pcxt->seg);
787
788 /*
789 * Serialize parameters, if any, using DSA storage. We don't dare use
790 * the main parallel query DSM for this because we might relaunch
791 * workers after the values have changed (and thus the amount of
792 * storage required has changed).
793 */
794 if (!bms_is_empty(sendParams))
795 {
796 pei->param_exec = SerializeParamExecParams(estate, sendParams,
797 pei->area);
798 fpes->param_exec = pei->param_exec;
799 }
800 }
801
802 /*
803 * Give parallel-aware nodes a chance to initialize their shared data.
804 * This also initializes the elements of instrumentation->ps_instrument,
805 * if it exists.
806 */
807 d.pcxt = pcxt;
808 d.instrumentation = instrumentation;
809 d.nnodes = 0;
810
811 /* Install our DSA area while initializing the plan. */
812 estate->es_query_dsa = pei->area;
813 ExecParallelInitializeDSM(planstate, &d);
814 estate->es_query_dsa = NULL;
815
816 /*
817 * Make sure that the world hasn't shifted under our feet. This could
818 * probably just be an Assert(), but let's be conservative for now.
819 */
820 if (e.nnodes != d.nnodes)
821 elog(ERROR, "inconsistent count of PlanState nodes");
822
823 /* OK, we're ready to rock and roll. */
824 return pei;
825 }
826
827 /*
828 * Set up tuple queue readers to read the results of a parallel subplan.
829 *
830 * This is separate from ExecInitParallelPlan() because we can launch the
831 * worker processes and let them start doing something before we do this.
832 */
833 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)834 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
835 {
836 int nworkers = pei->pcxt->nworkers_launched;
837 int i;
838
839 Assert(pei->reader == NULL);
840
841 if (nworkers > 0)
842 {
843 pei->reader = (TupleQueueReader **)
844 palloc(nworkers * sizeof(TupleQueueReader *));
845
846 for (i = 0; i < nworkers; i++)
847 {
848 shm_mq_set_handle(pei->tqueue[i],
849 pei->pcxt->worker[i].bgwhandle);
850 pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
851 }
852 }
853 }
854
855 /*
856 * Re-initialize the parallel executor shared memory state before launching
857 * a fresh batch of workers.
858 */
859 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)860 ExecParallelReinitialize(PlanState *planstate,
861 ParallelExecutorInfo *pei,
862 Bitmapset *sendParams)
863 {
864 EState *estate = planstate->state;
865 FixedParallelExecutorState *fpes;
866
867 /* Old workers must already be shut down */
868 Assert(pei->finished);
869
870 /*
871 * Force any initplan outputs that we're going to pass to workers to be
872 * evaluated, if they weren't already (see comments in
873 * ExecInitParallelPlan).
874 */
875 ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
876
877 ReinitializeParallelDSM(pei->pcxt);
878 pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
879 pei->reader = NULL;
880 pei->finished = false;
881
882 fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
883
884 /* Free any serialized parameters from the last round. */
885 if (DsaPointerIsValid(fpes->param_exec))
886 {
887 dsa_free(pei->area, fpes->param_exec);
888 fpes->param_exec = InvalidDsaPointer;
889 }
890
891 /* Serialize current parameter values if required. */
892 if (!bms_is_empty(sendParams))
893 {
894 pei->param_exec = SerializeParamExecParams(estate, sendParams,
895 pei->area);
896 fpes->param_exec = pei->param_exec;
897 }
898
899 /* Traverse plan tree and let each child node reset associated state. */
900 estate->es_query_dsa = pei->area;
901 ExecParallelReInitializeDSM(planstate, pei->pcxt);
902 estate->es_query_dsa = NULL;
903 }
904
905 /*
906 * Traverse plan tree to reinitialize per-node dynamic shared memory state
907 */
908 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)909 ExecParallelReInitializeDSM(PlanState *planstate,
910 ParallelContext *pcxt)
911 {
912 if (planstate == NULL)
913 return false;
914
915 /*
916 * Call reinitializers for DSM-using plan nodes.
917 */
918 switch (nodeTag(planstate))
919 {
920 case T_SeqScanState:
921 if (planstate->plan->parallel_aware)
922 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
923 pcxt);
924 break;
925 case T_IndexScanState:
926 if (planstate->plan->parallel_aware)
927 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
928 pcxt);
929 break;
930 case T_IndexOnlyScanState:
931 if (planstate->plan->parallel_aware)
932 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
933 pcxt);
934 break;
935 case T_ForeignScanState:
936 if (planstate->plan->parallel_aware)
937 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
938 pcxt);
939 break;
940 case T_AppendState:
941 if (planstate->plan->parallel_aware)
942 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
943 break;
944 case T_CustomScanState:
945 if (planstate->plan->parallel_aware)
946 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
947 pcxt);
948 break;
949 case T_BitmapHeapScanState:
950 if (planstate->plan->parallel_aware)
951 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
952 pcxt);
953 break;
954 case T_HashJoinState:
955 if (planstate->plan->parallel_aware)
956 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
957 pcxt);
958 break;
959 case T_HashState:
960 case T_SortState:
961 /* these nodes have DSM state, but no reinitialization is required */
962 break;
963
964 default:
965 break;
966 }
967
968 return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
969 }
970
971 /*
972 * Copy instrumentation information about this node and its descendants from
973 * dynamic shared memory.
974 */
975 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)976 ExecParallelRetrieveInstrumentation(PlanState *planstate,
977 SharedExecutorInstrumentation *instrumentation)
978 {
979 Instrumentation *instrument;
980 int i;
981 int n;
982 int ibytes;
983 int plan_node_id = planstate->plan->plan_node_id;
984 MemoryContext oldcontext;
985
986 /* Find the instrumentation for this node. */
987 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
988 if (instrumentation->plan_node_id[i] == plan_node_id)
989 break;
990 if (i >= instrumentation->num_plan_nodes)
991 elog(ERROR, "plan node %d not found", plan_node_id);
992
993 /* Accumulate the statistics from all workers. */
994 instrument = GetInstrumentationArray(instrumentation);
995 instrument += i * instrumentation->num_workers;
996 for (n = 0; n < instrumentation->num_workers; ++n)
997 InstrAggNode(planstate->instrument, &instrument[n]);
998
999 /*
1000 * Also store the per-worker detail.
1001 *
1002 * Worker instrumentation should be allocated in the same context as the
1003 * regular instrumentation information, which is the per-query context.
1004 * Switch into per-query memory context.
1005 */
1006 oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1007 ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1008 planstate->worker_instrument =
1009 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1010 MemoryContextSwitchTo(oldcontext);
1011
1012 planstate->worker_instrument->num_workers = instrumentation->num_workers;
1013 memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1014
1015 /* Perform any node-type-specific work that needs to be done. */
1016 switch (nodeTag(planstate))
1017 {
1018 case T_SortState:
1019 ExecSortRetrieveInstrumentation((SortState *) planstate);
1020 break;
1021 case T_HashState:
1022 ExecHashRetrieveInstrumentation((HashState *) planstate);
1023 break;
1024 default:
1025 break;
1026 }
1027
1028 return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1029 instrumentation);
1030 }
1031
1032 /*
1033 * Add up the workers' JIT instrumentation from dynamic shared memory.
1034 */
1035 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1036 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1037 SharedJitInstrumentation *shared_jit)
1038 {
1039 JitInstrumentation *combined;
1040 int ibytes;
1041
1042 int n;
1043
1044 /*
1045 * Accumulate worker JIT instrumentation into the combined JIT
1046 * instrumentation, allocating it if required.
1047 */
1048 if (!planstate->state->es_jit_worker_instr)
1049 planstate->state->es_jit_worker_instr =
1050 MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1051 combined = planstate->state->es_jit_worker_instr;
1052
1053 /* Accumulate all the workers' instrumentations. */
1054 for (n = 0; n < shared_jit->num_workers; ++n)
1055 InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1056
1057 /*
1058 * Store the per-worker detail.
1059 *
1060 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1061 * instrumentation in per-query context.
1062 */
1063 ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1064 + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1065 planstate->worker_jit_instrument =
1066 MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1067
1068 memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1069 }
1070
1071 /*
1072 * Finish parallel execution. We wait for parallel workers to finish, and
1073 * accumulate their buffer usage.
1074 */
1075 void
ExecParallelFinish(ParallelExecutorInfo * pei)1076 ExecParallelFinish(ParallelExecutorInfo *pei)
1077 {
1078 int nworkers = pei->pcxt->nworkers_launched;
1079 int i;
1080
1081 /* Make this be a no-op if called twice in a row. */
1082 if (pei->finished)
1083 return;
1084
1085 /*
1086 * Detach from tuple queues ASAP, so that any still-active workers will
1087 * notice that no further results are wanted.
1088 */
1089 if (pei->tqueue != NULL)
1090 {
1091 for (i = 0; i < nworkers; i++)
1092 shm_mq_detach(pei->tqueue[i]);
1093 pfree(pei->tqueue);
1094 pei->tqueue = NULL;
1095 }
1096
1097 /*
1098 * While we're waiting for the workers to finish, let's get rid of the
1099 * tuple queue readers. (Any other local cleanup could be done here too.)
1100 */
1101 if (pei->reader != NULL)
1102 {
1103 for (i = 0; i < nworkers; i++)
1104 DestroyTupleQueueReader(pei->reader[i]);
1105 pfree(pei->reader);
1106 pei->reader = NULL;
1107 }
1108
1109 /* Now wait for the workers to finish. */
1110 WaitForParallelWorkersToFinish(pei->pcxt);
1111
1112 /*
1113 * Next, accumulate buffer usage. (This must wait for the workers to
1114 * finish, or we might get incomplete data.)
1115 */
1116 for (i = 0; i < nworkers; i++)
1117 InstrAccumParallelQuery(&pei->buffer_usage[i]);
1118
1119 pei->finished = true;
1120 }
1121
1122 /*
1123 * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1124 * resources still exist after ExecParallelFinish. We separate these
1125 * routines because someone might want to examine the contents of the DSM
1126 * after ExecParallelFinish and before calling this routine.
1127 */
1128 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1129 ExecParallelCleanup(ParallelExecutorInfo *pei)
1130 {
1131 /* Accumulate instrumentation, if any. */
1132 if (pei->instrumentation)
1133 ExecParallelRetrieveInstrumentation(pei->planstate,
1134 pei->instrumentation);
1135
1136 /* Accumulate JIT instrumentation, if any. */
1137 if (pei->jit_instrumentation)
1138 ExecParallelRetrieveJitInstrumentation(pei->planstate,
1139 pei->jit_instrumentation);
1140
1141 /* Free any serialized parameters. */
1142 if (DsaPointerIsValid(pei->param_exec))
1143 {
1144 dsa_free(pei->area, pei->param_exec);
1145 pei->param_exec = InvalidDsaPointer;
1146 }
1147 if (pei->area != NULL)
1148 {
1149 dsa_detach(pei->area);
1150 pei->area = NULL;
1151 }
1152 if (pei->pcxt != NULL)
1153 {
1154 DestroyParallelContext(pei->pcxt);
1155 pei->pcxt = NULL;
1156 }
1157 pfree(pei);
1158 }
1159
1160 /*
1161 * Create a DestReceiver to write tuples we produce to the shm_mq designated
1162 * for that purpose.
1163 */
1164 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1165 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1166 {
1167 char *mqspace;
1168 shm_mq *mq;
1169
1170 mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1171 mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1172 mq = (shm_mq *) mqspace;
1173 shm_mq_set_sender(mq, MyProc);
1174 return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1175 }
1176
1177 /*
1178 * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1179 */
1180 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1181 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1182 int instrument_options)
1183 {
1184 char *pstmtspace;
1185 char *paramspace;
1186 PlannedStmt *pstmt;
1187 ParamListInfo paramLI;
1188 char *queryString;
1189
1190 /* Get the query string from shared memory */
1191 queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1192
1193 /* Reconstruct leader-supplied PlannedStmt. */
1194 pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1195 pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1196
1197 /* Reconstruct ParamListInfo. */
1198 paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1199 paramLI = RestoreParamList(¶mspace);
1200
1201 /* Create a QueryDesc for the query. */
1202 return CreateQueryDesc(pstmt,
1203 queryString,
1204 GetActiveSnapshot(), InvalidSnapshot,
1205 receiver, paramLI, NULL, instrument_options);
1206 }
1207
1208 /*
1209 * Copy instrumentation information from this node and its descendants into
1210 * dynamic shared memory, so that the parallel leader can retrieve it.
1211 */
1212 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1213 ExecParallelReportInstrumentation(PlanState *planstate,
1214 SharedExecutorInstrumentation *instrumentation)
1215 {
1216 int i;
1217 int plan_node_id = planstate->plan->plan_node_id;
1218 Instrumentation *instrument;
1219
1220 InstrEndLoop(planstate->instrument);
1221
1222 /*
1223 * If we shuffled the plan_node_id values in ps_instrument into sorted
1224 * order, we could use binary search here. This might matter someday if
1225 * we're pushing down sufficiently large plan trees. For now, do it the
1226 * slow, dumb way.
1227 */
1228 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1229 if (instrumentation->plan_node_id[i] == plan_node_id)
1230 break;
1231 if (i >= instrumentation->num_plan_nodes)
1232 elog(ERROR, "plan node %d not found", plan_node_id);
1233
1234 /*
1235 * Add our statistics to the per-node, per-worker totals. It's possible
1236 * that this could happen more than once if we relaunched workers.
1237 */
1238 instrument = GetInstrumentationArray(instrumentation);
1239 instrument += i * instrumentation->num_workers;
1240 Assert(IsParallelWorker());
1241 Assert(ParallelWorkerNumber < instrumentation->num_workers);
1242 InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1243
1244 return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1245 instrumentation);
1246 }
1247
1248 /*
1249 * Initialize the PlanState and its descendants with the information
1250 * retrieved from shared memory. This has to be done once the PlanState
1251 * is allocated and initialized by executor; that is, after ExecutorStart().
1252 */
1253 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1254 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1255 {
1256 if (planstate == NULL)
1257 return false;
1258
1259 switch (nodeTag(planstate))
1260 {
1261 case T_SeqScanState:
1262 if (planstate->plan->parallel_aware)
1263 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1264 break;
1265 case T_IndexScanState:
1266 if (planstate->plan->parallel_aware)
1267 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1268 pwcxt);
1269 break;
1270 case T_IndexOnlyScanState:
1271 if (planstate->plan->parallel_aware)
1272 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1273 pwcxt);
1274 break;
1275 case T_ForeignScanState:
1276 if (planstate->plan->parallel_aware)
1277 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1278 pwcxt);
1279 break;
1280 case T_AppendState:
1281 if (planstate->plan->parallel_aware)
1282 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1283 break;
1284 case T_CustomScanState:
1285 if (planstate->plan->parallel_aware)
1286 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1287 pwcxt);
1288 break;
1289 case T_BitmapHeapScanState:
1290 if (planstate->plan->parallel_aware)
1291 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1292 pwcxt);
1293 break;
1294 case T_HashJoinState:
1295 if (planstate->plan->parallel_aware)
1296 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1297 pwcxt);
1298 break;
1299 case T_HashState:
1300 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1301 ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1302 break;
1303 case T_SortState:
1304 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1305 ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1306 break;
1307
1308 default:
1309 break;
1310 }
1311
1312 return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1313 pwcxt);
1314 }
1315
1316 /*
1317 * Main entrypoint for parallel query worker processes.
1318 *
1319 * We reach this function from ParallelWorkerMain, so the setup necessary to
1320 * create a sensible parallel environment has already been done;
1321 * ParallelWorkerMain worries about stuff like the transaction state, combo
1322 * CID mappings, and GUC values, so we don't need to deal with any of that
1323 * here.
1324 *
1325 * Our job is to deal with concerns specific to the executor. The parallel
1326 * group leader will have stored a serialized PlannedStmt, and it's our job
1327 * to execute that plan and write the resulting tuples to the appropriate
1328 * tuple queue. Various bits of supporting information that we need in order
1329 * to do this are also stored in the dsm_segment and can be accessed through
1330 * the shm_toc.
1331 */
1332 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1333 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1334 {
1335 FixedParallelExecutorState *fpes;
1336 BufferUsage *buffer_usage;
1337 DestReceiver *receiver;
1338 QueryDesc *queryDesc;
1339 SharedExecutorInstrumentation *instrumentation;
1340 SharedJitInstrumentation *jit_instrumentation;
1341 int instrument_options = 0;
1342 void *area_space;
1343 dsa_area *area;
1344 ParallelWorkerContext pwcxt;
1345
1346 /* Get fixed-size state. */
1347 fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1348
1349 /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1350 receiver = ExecParallelGetReceiver(seg, toc);
1351 instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1352 if (instrumentation != NULL)
1353 instrument_options = instrumentation->instrument_options;
1354 jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1355 true);
1356 queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1357
1358 /* Setting debug_query_string for individual workers */
1359 debug_query_string = queryDesc->sourceText;
1360
1361 /* Report workers' query for monitoring purposes */
1362 pgstat_report_activity(STATE_RUNNING, debug_query_string);
1363
1364 /* Attach to the dynamic shared memory area. */
1365 area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1366 area = dsa_attach_in_place(area_space, seg);
1367
1368 /* Start up the executor */
1369 queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1370 ExecutorStart(queryDesc, fpes->eflags);
1371
1372 /* Special executor initialization steps for parallel workers */
1373 queryDesc->planstate->state->es_query_dsa = area;
1374 if (DsaPointerIsValid(fpes->param_exec))
1375 {
1376 char *paramexec_space;
1377
1378 paramexec_space = dsa_get_address(area, fpes->param_exec);
1379 RestoreParamExecParams(paramexec_space, queryDesc->estate);
1380
1381 }
1382 pwcxt.toc = toc;
1383 pwcxt.seg = seg;
1384 ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1385
1386 /* Pass down any tuple bound */
1387 ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1388
1389 /*
1390 * Prepare to track buffer usage during query execution.
1391 *
1392 * We do this after starting up the executor to match what happens in the
1393 * leader, which also doesn't count buffer accesses that occur during
1394 * executor startup.
1395 */
1396 InstrStartParallelQuery();
1397
1398 /*
1399 * Run the plan. If we specified a tuple bound, be careful not to demand
1400 * more tuples than that.
1401 */
1402 ExecutorRun(queryDesc,
1403 ForwardScanDirection,
1404 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1405 true);
1406
1407 /* Shut down the executor */
1408 ExecutorFinish(queryDesc);
1409
1410 /* Report buffer usage during parallel execution. */
1411 buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1412 InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
1413
1414 /* Report instrumentation data if any instrumentation options are set. */
1415 if (instrumentation != NULL)
1416 ExecParallelReportInstrumentation(queryDesc->planstate,
1417 instrumentation);
1418
1419 /* Report JIT instrumentation data if any */
1420 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1421 {
1422 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1423 jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1424 queryDesc->estate->es_jit->instr;
1425 }
1426
1427 /* Must do this after capturing instrumentation. */
1428 ExecutorEnd(queryDesc);
1429
1430 /* Cleanup. */
1431 dsa_detach(area);
1432 FreeQueryDesc(queryDesc);
1433 receiver->rDestroy(receiver);
1434 }
1435