1 /*-------------------------------------------------------------------------
2 *
3 * execParallel.c
4 * Support routines for parallel execution.
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * This file contains routines that are intended to support setting up,
10 * using, and tearing down a ParallelContext from within the PostgreSQL
11 * executor. The ParallelContext machinery will handle starting the
12 * workers and ensuring that their state generally matches that of the
13 * leader; see src/backend/access/transam/README.parallel for details.
14 * However, we must save and restore relevant executor state, such as
15 * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 * the actual plan to be passed down to the worker.
17 *
18 * IDENTIFICATION
19 * src/backend/executor/execParallel.c
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAgg.h"
29 #include "executor/nodeAppend.h"
30 #include "executor/nodeBitmapHeapscan.h"
31 #include "executor/nodeCustom.h"
32 #include "executor/nodeForeignscan.h"
33 #include "executor/nodeHash.h"
34 #include "executor/nodeHashjoin.h"
35 #include "executor/nodeIncrementalSort.h"
36 #include "executor/nodeIndexonlyscan.h"
37 #include "executor/nodeIndexscan.h"
38 #include "executor/nodeMemoize.h"
39 #include "executor/nodeSeqscan.h"
40 #include "executor/nodeSort.h"
41 #include "executor/nodeSubplan.h"
42 #include "executor/tqueue.h"
43 #include "jit/jit.h"
44 #include "nodes/nodeFuncs.h"
45 #include "pgstat.h"
46 #include "storage/spin.h"
47 #include "tcop/tcopprot.h"
48 #include "utils/datum.h"
49 #include "utils/dsa.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/snapmgr.h"
53
54 /*
55 * Magic numbers for parallel executor communication. We use constants
56 * greater than any 32-bit integer here so that values < 2^32 can be used
57 * by individual parallel nodes to store their own state.
58 */
59 #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60 #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61 #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62 #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63 #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64 #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65 #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66 #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
69
70 #define PARALLEL_TUPLE_QUEUE_SIZE 65536
71
72 /*
73 * Fixed-size random stuff that we need to pass to parallel workers.
74 */
75 typedef struct FixedParallelExecutorState
76 {
77 int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 dsa_pointer param_exec;
79 int eflags;
80 int jit_flags;
81 } FixedParallelExecutorState;
82
83 /*
84 * DSM structure for accumulating per-PlanState instrumentation.
85 *
86 * instrument_options: Same meaning here as in instrument.c.
87 *
88 * instrument_offset: Offset, relative to the start of this structure,
89 * of the first Instrumentation object. This will depend on the length of
90 * the plan_node_id array.
91 *
92 * num_workers: Number of workers.
93 *
94 * num_plan_nodes: Number of plan nodes.
95 *
96 * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97 * from parallel workers. The length of this array is given by num_plan_nodes.
98 */
99 struct SharedExecutorInstrumentation
100 {
101 int instrument_options;
102 int instrument_offset;
103 int num_workers;
104 int num_plan_nodes;
105 int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 /* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 };
108 #define GetInstrumentationArray(sei) \
109 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111
112 /* Context object for ExecParallelEstimate. */
113 typedef struct ExecParallelEstimateContext
114 {
115 ParallelContext *pcxt;
116 int nnodes;
117 } ExecParallelEstimateContext;
118
119 /* Context object for ExecParallelInitializeDSM. */
120 typedef struct ExecParallelInitializeDSMContext
121 {
122 ParallelContext *pcxt;
123 SharedExecutorInstrumentation *instrumentation;
124 int nnodes;
125 } ExecParallelInitializeDSMContext;
126
127 /* Helper functions that run in the parallel leader. */
128 static char *ExecSerializePlan(Plan *plan, EState *estate);
129 static bool ExecParallelEstimate(PlanState *node,
130 ExecParallelEstimateContext *e);
131 static bool ExecParallelInitializeDSM(PlanState *node,
132 ExecParallelInitializeDSMContext *d);
133 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 bool reinitialize);
135 static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 ParallelContext *pcxt);
137 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 SharedExecutorInstrumentation *instrumentation);
139
140 /* Helper function that runs in the parallel worker. */
141 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142
143 /*
144 * Create a serialized representation of the plan to be sent to each worker.
145 */
146 static char *
ExecSerializePlan(Plan * plan,EState * estate)147 ExecSerializePlan(Plan *plan, EState *estate)
148 {
149 PlannedStmt *pstmt;
150 ListCell *lc;
151
152 /* We can't scribble on the original plan, so make a copy. */
153 plan = copyObject(plan);
154
155 /*
156 * The worker will start its own copy of the executor, and that copy will
157 * insert a junk filter if the toplevel node has any resjunk entries. We
158 * don't want that to happen, because while resjunk columns shouldn't be
159 * sent back to the user, here the tuples are coming back to another
160 * backend which may very well need them. So mutate the target list
161 * accordingly. This is sort of a hack; there might be better ways to do
162 * this...
163 */
164 foreach(lc, plan->targetlist)
165 {
166 TargetEntry *tle = lfirst_node(TargetEntry, lc);
167
168 tle->resjunk = false;
169 }
170
171 /*
172 * Create a dummy PlannedStmt. Most of the fields don't need to be valid
173 * for our purposes, but the worker will need at least a minimal
174 * PlannedStmt to start the executor.
175 */
176 pstmt = makeNode(PlannedStmt);
177 pstmt->commandType = CMD_SELECT;
178 pstmt->queryId = pgstat_get_my_query_id();
179 pstmt->hasReturning = false;
180 pstmt->hasModifyingCTE = false;
181 pstmt->canSetTag = true;
182 pstmt->transientPlan = false;
183 pstmt->dependsOnRole = false;
184 pstmt->parallelModeNeeded = false;
185 pstmt->planTree = plan;
186 pstmt->rtable = estate->es_range_table;
187 pstmt->resultRelations = NIL;
188 pstmt->appendRelations = NIL;
189
190 /*
191 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
192 * for unsafe ones (so that the list indexes of the safe ones are
193 * preserved). This positively ensures that the worker won't try to run,
194 * or even do ExecInitNode on, an unsafe subplan. That's important to
195 * protect, eg, non-parallel-aware FDWs from getting into trouble.
196 */
197 pstmt->subplans = NIL;
198 foreach(lc, estate->es_plannedstmt->subplans)
199 {
200 Plan *subplan = (Plan *) lfirst(lc);
201
202 if (subplan && !subplan->parallel_safe)
203 subplan = NULL;
204 pstmt->subplans = lappend(pstmt->subplans, subplan);
205 }
206
207 pstmt->rewindPlanIDs = NULL;
208 pstmt->rowMarks = NIL;
209 pstmt->relationOids = NIL;
210 pstmt->invalItems = NIL; /* workers can't replan anyway... */
211 pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
212 pstmt->utilityStmt = NULL;
213 pstmt->stmt_location = -1;
214 pstmt->stmt_len = -1;
215
216 /* Return serialized copy of our dummy PlannedStmt. */
217 return nodeToString(pstmt);
218 }
219
220 /*
221 * Parallel-aware plan nodes (and occasionally others) may need some state
222 * which is shared across all parallel workers. Before we size the DSM, give
223 * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
224 * &pcxt->estimator.
225 *
226 * While we're at it, count the number of PlanState nodes in the tree, so
227 * we know how many Instrumentation structures we need.
228 */
229 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)230 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
231 {
232 if (planstate == NULL)
233 return false;
234
235 /* Count this node. */
236 e->nnodes++;
237
238 switch (nodeTag(planstate))
239 {
240 case T_SeqScanState:
241 if (planstate->plan->parallel_aware)
242 ExecSeqScanEstimate((SeqScanState *) planstate,
243 e->pcxt);
244 break;
245 case T_IndexScanState:
246 if (planstate->plan->parallel_aware)
247 ExecIndexScanEstimate((IndexScanState *) planstate,
248 e->pcxt);
249 break;
250 case T_IndexOnlyScanState:
251 if (planstate->plan->parallel_aware)
252 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
253 e->pcxt);
254 break;
255 case T_ForeignScanState:
256 if (planstate->plan->parallel_aware)
257 ExecForeignScanEstimate((ForeignScanState *) planstate,
258 e->pcxt);
259 break;
260 case T_AppendState:
261 if (planstate->plan->parallel_aware)
262 ExecAppendEstimate((AppendState *) planstate,
263 e->pcxt);
264 break;
265 case T_CustomScanState:
266 if (planstate->plan->parallel_aware)
267 ExecCustomScanEstimate((CustomScanState *) planstate,
268 e->pcxt);
269 break;
270 case T_BitmapHeapScanState:
271 if (planstate->plan->parallel_aware)
272 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
273 e->pcxt);
274 break;
275 case T_HashJoinState:
276 if (planstate->plan->parallel_aware)
277 ExecHashJoinEstimate((HashJoinState *) planstate,
278 e->pcxt);
279 break;
280 case T_HashState:
281 /* even when not parallel-aware, for EXPLAIN ANALYZE */
282 ExecHashEstimate((HashState *) planstate, e->pcxt);
283 break;
284 case T_SortState:
285 /* even when not parallel-aware, for EXPLAIN ANALYZE */
286 ExecSortEstimate((SortState *) planstate, e->pcxt);
287 break;
288 case T_IncrementalSortState:
289 /* even when not parallel-aware, for EXPLAIN ANALYZE */
290 ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
291 break;
292 case T_AggState:
293 /* even when not parallel-aware, for EXPLAIN ANALYZE */
294 ExecAggEstimate((AggState *) planstate, e->pcxt);
295 break;
296 case T_MemoizeState:
297 /* even when not parallel-aware, for EXPLAIN ANALYZE */
298 ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
299 break;
300 default:
301 break;
302 }
303
304 return planstate_tree_walker(planstate, ExecParallelEstimate, e);
305 }
306
307 /*
308 * Estimate the amount of space required to serialize the indicated parameters.
309 */
310 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)311 EstimateParamExecSpace(EState *estate, Bitmapset *params)
312 {
313 int paramid;
314 Size sz = sizeof(int);
315
316 paramid = -1;
317 while ((paramid = bms_next_member(params, paramid)) >= 0)
318 {
319 Oid typeOid;
320 int16 typLen;
321 bool typByVal;
322 ParamExecData *prm;
323
324 prm = &(estate->es_param_exec_vals[paramid]);
325 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
326 paramid);
327
328 sz = add_size(sz, sizeof(int)); /* space for paramid */
329
330 /* space for datum/isnull */
331 if (OidIsValid(typeOid))
332 get_typlenbyval(typeOid, &typLen, &typByVal);
333 else
334 {
335 /* If no type OID, assume by-value, like copyParamList does. */
336 typLen = sizeof(Datum);
337 typByVal = true;
338 }
339 sz = add_size(sz,
340 datumEstimateSpace(prm->value, prm->isnull,
341 typByVal, typLen));
342 }
343 return sz;
344 }
345
346 /*
347 * Serialize specified PARAM_EXEC parameters.
348 *
349 * We write the number of parameters first, as a 4-byte integer, and then
350 * write details for each parameter in turn. The details for each parameter
351 * consist of a 4-byte paramid (location of param in execution time internal
352 * parameter array) and then the datum as serialized by datumSerialize().
353 */
354 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)355 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
356 {
357 Size size;
358 int nparams;
359 int paramid;
360 ParamExecData *prm;
361 dsa_pointer handle;
362 char *start_address;
363
364 /* Allocate enough space for the current parameter values. */
365 size = EstimateParamExecSpace(estate, params);
366 handle = dsa_allocate(area, size);
367 start_address = dsa_get_address(area, handle);
368
369 /* First write the number of parameters as a 4-byte integer. */
370 nparams = bms_num_members(params);
371 memcpy(start_address, &nparams, sizeof(int));
372 start_address += sizeof(int);
373
374 /* Write details for each parameter in turn. */
375 paramid = -1;
376 while ((paramid = bms_next_member(params, paramid)) >= 0)
377 {
378 Oid typeOid;
379 int16 typLen;
380 bool typByVal;
381
382 prm = &(estate->es_param_exec_vals[paramid]);
383 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
384 paramid);
385
386 /* Write paramid. */
387 memcpy(start_address, ¶mid, sizeof(int));
388 start_address += sizeof(int);
389
390 /* Write datum/isnull */
391 if (OidIsValid(typeOid))
392 get_typlenbyval(typeOid, &typLen, &typByVal);
393 else
394 {
395 /* If no type OID, assume by-value, like copyParamList does. */
396 typLen = sizeof(Datum);
397 typByVal = true;
398 }
399 datumSerialize(prm->value, prm->isnull, typByVal, typLen,
400 &start_address);
401 }
402
403 return handle;
404 }
405
406 /*
407 * Restore specified PARAM_EXEC parameters.
408 */
409 static void
RestoreParamExecParams(char * start_address,EState * estate)410 RestoreParamExecParams(char *start_address, EState *estate)
411 {
412 int nparams;
413 int i;
414 int paramid;
415
416 memcpy(&nparams, start_address, sizeof(int));
417 start_address += sizeof(int);
418
419 for (i = 0; i < nparams; i++)
420 {
421 ParamExecData *prm;
422
423 /* Read paramid */
424 memcpy(¶mid, start_address, sizeof(int));
425 start_address += sizeof(int);
426 prm = &(estate->es_param_exec_vals[paramid]);
427
428 /* Read datum/isnull. */
429 prm->value = datumRestore(&start_address, &prm->isnull);
430 prm->execPlan = NULL;
431 }
432 }
433
434 /*
435 * Initialize the dynamic shared memory segment that will be used to control
436 * parallel execution.
437 */
438 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)439 ExecParallelInitializeDSM(PlanState *planstate,
440 ExecParallelInitializeDSMContext *d)
441 {
442 if (planstate == NULL)
443 return false;
444
445 /* If instrumentation is enabled, initialize slot for this node. */
446 if (d->instrumentation != NULL)
447 d->instrumentation->plan_node_id[d->nnodes] =
448 planstate->plan->plan_node_id;
449
450 /* Count this node. */
451 d->nnodes++;
452
453 /*
454 * Call initializers for DSM-using plan nodes.
455 *
456 * Most plan nodes won't do anything here, but plan nodes that allocated
457 * DSM may need to initialize shared state in the DSM before parallel
458 * workers are launched. They can allocate the space they previously
459 * estimated using shm_toc_allocate, and add the keys they previously
460 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
461 */
462 switch (nodeTag(planstate))
463 {
464 case T_SeqScanState:
465 if (planstate->plan->parallel_aware)
466 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
467 d->pcxt);
468 break;
469 case T_IndexScanState:
470 if (planstate->plan->parallel_aware)
471 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
472 d->pcxt);
473 break;
474 case T_IndexOnlyScanState:
475 if (planstate->plan->parallel_aware)
476 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
477 d->pcxt);
478 break;
479 case T_ForeignScanState:
480 if (planstate->plan->parallel_aware)
481 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
482 d->pcxt);
483 break;
484 case T_AppendState:
485 if (planstate->plan->parallel_aware)
486 ExecAppendInitializeDSM((AppendState *) planstate,
487 d->pcxt);
488 break;
489 case T_CustomScanState:
490 if (planstate->plan->parallel_aware)
491 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
492 d->pcxt);
493 break;
494 case T_BitmapHeapScanState:
495 if (planstate->plan->parallel_aware)
496 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
497 d->pcxt);
498 break;
499 case T_HashJoinState:
500 if (planstate->plan->parallel_aware)
501 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
502 d->pcxt);
503 break;
504 case T_HashState:
505 /* even when not parallel-aware, for EXPLAIN ANALYZE */
506 ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
507 break;
508 case T_SortState:
509 /* even when not parallel-aware, for EXPLAIN ANALYZE */
510 ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
511 break;
512 case T_IncrementalSortState:
513 /* even when not parallel-aware, for EXPLAIN ANALYZE */
514 ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
515 break;
516 case T_AggState:
517 /* even when not parallel-aware, for EXPLAIN ANALYZE */
518 ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
519 break;
520 case T_MemoizeState:
521 /* even when not parallel-aware, for EXPLAIN ANALYZE */
522 ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
523 break;
524 default:
525 break;
526 }
527
528 return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
529 }
530
531 /*
532 * It sets up the response queues for backend workers to return tuples
533 * to the main backend and start the workers.
534 */
535 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)536 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
537 {
538 shm_mq_handle **responseq;
539 char *tqueuespace;
540 int i;
541
542 /* Skip this if no workers. */
543 if (pcxt->nworkers == 0)
544 return NULL;
545
546 /* Allocate memory for shared memory queue handles. */
547 responseq = (shm_mq_handle **)
548 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
549
550 /*
551 * If not reinitializing, allocate space from the DSM for the queues;
552 * otherwise, find the already allocated space.
553 */
554 if (!reinitialize)
555 tqueuespace =
556 shm_toc_allocate(pcxt->toc,
557 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
558 pcxt->nworkers));
559 else
560 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
561
562 /* Create the queues, and become the receiver for each. */
563 for (i = 0; i < pcxt->nworkers; ++i)
564 {
565 shm_mq *mq;
566
567 mq = shm_mq_create(tqueuespace +
568 ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
569 (Size) PARALLEL_TUPLE_QUEUE_SIZE);
570
571 shm_mq_set_receiver(mq, MyProc);
572 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
573 }
574
575 /* Add array of queues to shm_toc, so others can find it. */
576 if (!reinitialize)
577 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
578
579 /* Return array of handles. */
580 return responseq;
581 }
582
583 /*
584 * Sets up the required infrastructure for backend workers to perform
585 * execution and return results to the main backend.
586 */
587 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)588 ExecInitParallelPlan(PlanState *planstate, EState *estate,
589 Bitmapset *sendParams, int nworkers,
590 int64 tuples_needed)
591 {
592 ParallelExecutorInfo *pei;
593 ParallelContext *pcxt;
594 ExecParallelEstimateContext e;
595 ExecParallelInitializeDSMContext d;
596 FixedParallelExecutorState *fpes;
597 char *pstmt_data;
598 char *pstmt_space;
599 char *paramlistinfo_space;
600 BufferUsage *bufusage_space;
601 WalUsage *walusage_space;
602 SharedExecutorInstrumentation *instrumentation = NULL;
603 SharedJitInstrumentation *jit_instrumentation = NULL;
604 int pstmt_len;
605 int paramlistinfo_len;
606 int instrumentation_len = 0;
607 int jit_instrumentation_len = 0;
608 int instrument_offset = 0;
609 Size dsa_minsize = dsa_minimum_size();
610 char *query_string;
611 int query_len;
612
613 /*
614 * Force any initplan outputs that we're going to pass to workers to be
615 * evaluated, if they weren't already.
616 *
617 * For simplicity, we use the EState's per-output-tuple ExprContext here.
618 * That risks intra-query memory leakage, since we might pass through here
619 * many times before that ExprContext gets reset; but ExecSetParamPlan
620 * doesn't normally leak any memory in the context (see its comments), so
621 * it doesn't seem worth complicating this function's API to pass it a
622 * shorter-lived ExprContext. This might need to change someday.
623 */
624 ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
625
626 /* Allocate object for return value. */
627 pei = palloc0(sizeof(ParallelExecutorInfo));
628 pei->finished = false;
629 pei->planstate = planstate;
630
631 /* Fix up and serialize plan to be sent to workers. */
632 pstmt_data = ExecSerializePlan(planstate->plan, estate);
633
634 /* Create a parallel context. */
635 pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
636 pei->pcxt = pcxt;
637
638 /*
639 * Before telling the parallel context to create a dynamic shared memory
640 * segment, we need to figure out how big it should be. Estimate space
641 * for the various things we need to store.
642 */
643
644 /* Estimate space for fixed-size state. */
645 shm_toc_estimate_chunk(&pcxt->estimator,
646 sizeof(FixedParallelExecutorState));
647 shm_toc_estimate_keys(&pcxt->estimator, 1);
648
649 /* Estimate space for query text. */
650 query_len = strlen(estate->es_sourceText);
651 shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
652 shm_toc_estimate_keys(&pcxt->estimator, 1);
653
654 /* Estimate space for serialized PlannedStmt. */
655 pstmt_len = strlen(pstmt_data) + 1;
656 shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
657 shm_toc_estimate_keys(&pcxt->estimator, 1);
658
659 /* Estimate space for serialized ParamListInfo. */
660 paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
661 shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
662 shm_toc_estimate_keys(&pcxt->estimator, 1);
663
664 /*
665 * Estimate space for BufferUsage.
666 *
667 * If EXPLAIN is not in use and there are no extensions loaded that care,
668 * we could skip this. But we have no way of knowing whether anyone's
669 * looking at pgBufferUsage, so do it unconditionally.
670 */
671 shm_toc_estimate_chunk(&pcxt->estimator,
672 mul_size(sizeof(BufferUsage), pcxt->nworkers));
673 shm_toc_estimate_keys(&pcxt->estimator, 1);
674
675 /*
676 * Same thing for WalUsage.
677 */
678 shm_toc_estimate_chunk(&pcxt->estimator,
679 mul_size(sizeof(WalUsage), pcxt->nworkers));
680 shm_toc_estimate_keys(&pcxt->estimator, 1);
681
682 /* Estimate space for tuple queues. */
683 shm_toc_estimate_chunk(&pcxt->estimator,
684 mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
685 shm_toc_estimate_keys(&pcxt->estimator, 1);
686
687 /*
688 * Give parallel-aware nodes a chance to add to the estimates, and get a
689 * count of how many PlanState nodes there are.
690 */
691 e.pcxt = pcxt;
692 e.nnodes = 0;
693 ExecParallelEstimate(planstate, &e);
694
695 /* Estimate space for instrumentation, if required. */
696 if (estate->es_instrument)
697 {
698 instrumentation_len =
699 offsetof(SharedExecutorInstrumentation, plan_node_id) +
700 sizeof(int) * e.nnodes;
701 instrumentation_len = MAXALIGN(instrumentation_len);
702 instrument_offset = instrumentation_len;
703 instrumentation_len +=
704 mul_size(sizeof(Instrumentation),
705 mul_size(e.nnodes, nworkers));
706 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
707 shm_toc_estimate_keys(&pcxt->estimator, 1);
708
709 /* Estimate space for JIT instrumentation, if required. */
710 if (estate->es_jit_flags != PGJIT_NONE)
711 {
712 jit_instrumentation_len =
713 offsetof(SharedJitInstrumentation, jit_instr) +
714 sizeof(JitInstrumentation) * nworkers;
715 shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
716 shm_toc_estimate_keys(&pcxt->estimator, 1);
717 }
718 }
719
720 /* Estimate space for DSA area. */
721 shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
722 shm_toc_estimate_keys(&pcxt->estimator, 1);
723
724 /* Everyone's had a chance to ask for space, so now create the DSM. */
725 InitializeParallelDSM(pcxt);
726
727 /*
728 * OK, now we have a dynamic shared memory segment, and it should be big
729 * enough to store all of the data we estimated we would want to put into
730 * it, plus whatever general stuff (not specifically executor-related) the
731 * ParallelContext itself needs to store there. None of the space we
732 * asked for has been allocated or initialized yet, though, so do that.
733 */
734
735 /* Store fixed-size state. */
736 fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
737 fpes->tuples_needed = tuples_needed;
738 fpes->param_exec = InvalidDsaPointer;
739 fpes->eflags = estate->es_top_eflags;
740 fpes->jit_flags = estate->es_jit_flags;
741 shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
742
743 /* Store query string */
744 query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
745 memcpy(query_string, estate->es_sourceText, query_len + 1);
746 shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
747
748 /* Store serialized PlannedStmt. */
749 pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
750 memcpy(pstmt_space, pstmt_data, pstmt_len);
751 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
752
753 /* Store serialized ParamListInfo. */
754 paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
755 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
756 SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
757
758 /* Allocate space for each worker's BufferUsage; no need to initialize. */
759 bufusage_space = shm_toc_allocate(pcxt->toc,
760 mul_size(sizeof(BufferUsage), pcxt->nworkers));
761 shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
762 pei->buffer_usage = bufusage_space;
763
764 /* Same for WalUsage. */
765 walusage_space = shm_toc_allocate(pcxt->toc,
766 mul_size(sizeof(WalUsage), pcxt->nworkers));
767 shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
768 pei->wal_usage = walusage_space;
769
770 /* Set up the tuple queues that the workers will write into. */
771 pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
772
773 /* We don't need the TupleQueueReaders yet, though. */
774 pei->reader = NULL;
775
776 /*
777 * If instrumentation options were supplied, allocate space for the data.
778 * It only gets partially initialized here; the rest happens during
779 * ExecParallelInitializeDSM.
780 */
781 if (estate->es_instrument)
782 {
783 Instrumentation *instrument;
784 int i;
785
786 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
787 instrumentation->instrument_options = estate->es_instrument;
788 instrumentation->instrument_offset = instrument_offset;
789 instrumentation->num_workers = nworkers;
790 instrumentation->num_plan_nodes = e.nnodes;
791 instrument = GetInstrumentationArray(instrumentation);
792 for (i = 0; i < nworkers * e.nnodes; ++i)
793 InstrInit(&instrument[i], estate->es_instrument);
794 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
795 instrumentation);
796 pei->instrumentation = instrumentation;
797
798 if (estate->es_jit_flags != PGJIT_NONE)
799 {
800 jit_instrumentation = shm_toc_allocate(pcxt->toc,
801 jit_instrumentation_len);
802 jit_instrumentation->num_workers = nworkers;
803 memset(jit_instrumentation->jit_instr, 0,
804 sizeof(JitInstrumentation) * nworkers);
805 shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
806 jit_instrumentation);
807 pei->jit_instrumentation = jit_instrumentation;
808 }
809 }
810
811 /*
812 * Create a DSA area that can be used by the leader and all workers.
813 * (However, if we failed to create a DSM and are using private memory
814 * instead, then skip this.)
815 */
816 if (pcxt->seg != NULL)
817 {
818 char *area_space;
819
820 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
821 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
822 pei->area = dsa_create_in_place(area_space, dsa_minsize,
823 LWTRANCHE_PARALLEL_QUERY_DSA,
824 pcxt->seg);
825
826 /*
827 * Serialize parameters, if any, using DSA storage. We don't dare use
828 * the main parallel query DSM for this because we might relaunch
829 * workers after the values have changed (and thus the amount of
830 * storage required has changed).
831 */
832 if (!bms_is_empty(sendParams))
833 {
834 pei->param_exec = SerializeParamExecParams(estate, sendParams,
835 pei->area);
836 fpes->param_exec = pei->param_exec;
837 }
838 }
839
840 /*
841 * Give parallel-aware nodes a chance to initialize their shared data.
842 * This also initializes the elements of instrumentation->ps_instrument,
843 * if it exists.
844 */
845 d.pcxt = pcxt;
846 d.instrumentation = instrumentation;
847 d.nnodes = 0;
848
849 /* Install our DSA area while initializing the plan. */
850 estate->es_query_dsa = pei->area;
851 ExecParallelInitializeDSM(planstate, &d);
852 estate->es_query_dsa = NULL;
853
854 /*
855 * Make sure that the world hasn't shifted under our feet. This could
856 * probably just be an Assert(), but let's be conservative for now.
857 */
858 if (e.nnodes != d.nnodes)
859 elog(ERROR, "inconsistent count of PlanState nodes");
860
861 /* OK, we're ready to rock and roll. */
862 return pei;
863 }
864
865 /*
866 * Set up tuple queue readers to read the results of a parallel subplan.
867 *
868 * This is separate from ExecInitParallelPlan() because we can launch the
869 * worker processes and let them start doing something before we do this.
870 */
871 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)872 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
873 {
874 int nworkers = pei->pcxt->nworkers_launched;
875 int i;
876
877 Assert(pei->reader == NULL);
878
879 if (nworkers > 0)
880 {
881 pei->reader = (TupleQueueReader **)
882 palloc(nworkers * sizeof(TupleQueueReader *));
883
884 for (i = 0; i < nworkers; i++)
885 {
886 shm_mq_set_handle(pei->tqueue[i],
887 pei->pcxt->worker[i].bgwhandle);
888 pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
889 }
890 }
891 }
892
893 /*
894 * Re-initialize the parallel executor shared memory state before launching
895 * a fresh batch of workers.
896 */
897 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)898 ExecParallelReinitialize(PlanState *planstate,
899 ParallelExecutorInfo *pei,
900 Bitmapset *sendParams)
901 {
902 EState *estate = planstate->state;
903 FixedParallelExecutorState *fpes;
904
905 /* Old workers must already be shut down */
906 Assert(pei->finished);
907
908 /*
909 * Force any initplan outputs that we're going to pass to workers to be
910 * evaluated, if they weren't already (see comments in
911 * ExecInitParallelPlan).
912 */
913 ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
914
915 ReinitializeParallelDSM(pei->pcxt);
916 pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
917 pei->reader = NULL;
918 pei->finished = false;
919
920 fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
921
922 /* Free any serialized parameters from the last round. */
923 if (DsaPointerIsValid(fpes->param_exec))
924 {
925 dsa_free(pei->area, fpes->param_exec);
926 fpes->param_exec = InvalidDsaPointer;
927 }
928
929 /* Serialize current parameter values if required. */
930 if (!bms_is_empty(sendParams))
931 {
932 pei->param_exec = SerializeParamExecParams(estate, sendParams,
933 pei->area);
934 fpes->param_exec = pei->param_exec;
935 }
936
937 /* Traverse plan tree and let each child node reset associated state. */
938 estate->es_query_dsa = pei->area;
939 ExecParallelReInitializeDSM(planstate, pei->pcxt);
940 estate->es_query_dsa = NULL;
941 }
942
943 /*
944 * Traverse plan tree to reinitialize per-node dynamic shared memory state
945 */
946 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)947 ExecParallelReInitializeDSM(PlanState *planstate,
948 ParallelContext *pcxt)
949 {
950 if (planstate == NULL)
951 return false;
952
953 /*
954 * Call reinitializers for DSM-using plan nodes.
955 */
956 switch (nodeTag(planstate))
957 {
958 case T_SeqScanState:
959 if (planstate->plan->parallel_aware)
960 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
961 pcxt);
962 break;
963 case T_IndexScanState:
964 if (planstate->plan->parallel_aware)
965 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
966 pcxt);
967 break;
968 case T_IndexOnlyScanState:
969 if (planstate->plan->parallel_aware)
970 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
971 pcxt);
972 break;
973 case T_ForeignScanState:
974 if (planstate->plan->parallel_aware)
975 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
976 pcxt);
977 break;
978 case T_AppendState:
979 if (planstate->plan->parallel_aware)
980 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
981 break;
982 case T_CustomScanState:
983 if (planstate->plan->parallel_aware)
984 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
985 pcxt);
986 break;
987 case T_BitmapHeapScanState:
988 if (planstate->plan->parallel_aware)
989 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
990 pcxt);
991 break;
992 case T_HashJoinState:
993 if (planstate->plan->parallel_aware)
994 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
995 pcxt);
996 break;
997 case T_HashState:
998 case T_SortState:
999 case T_IncrementalSortState:
1000 case T_MemoizeState:
1001 /* these nodes have DSM state, but no reinitialization is required */
1002 break;
1003
1004 default:
1005 break;
1006 }
1007
1008 return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1009 }
1010
1011 /*
1012 * Copy instrumentation information about this node and its descendants from
1013 * dynamic shared memory.
1014 */
1015 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1016 ExecParallelRetrieveInstrumentation(PlanState *planstate,
1017 SharedExecutorInstrumentation *instrumentation)
1018 {
1019 Instrumentation *instrument;
1020 int i;
1021 int n;
1022 int ibytes;
1023 int plan_node_id = planstate->plan->plan_node_id;
1024 MemoryContext oldcontext;
1025
1026 /* Find the instrumentation for this node. */
1027 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1028 if (instrumentation->plan_node_id[i] == plan_node_id)
1029 break;
1030 if (i >= instrumentation->num_plan_nodes)
1031 elog(ERROR, "plan node %d not found", plan_node_id);
1032
1033 /* Accumulate the statistics from all workers. */
1034 instrument = GetInstrumentationArray(instrumentation);
1035 instrument += i * instrumentation->num_workers;
1036 for (n = 0; n < instrumentation->num_workers; ++n)
1037 InstrAggNode(planstate->instrument, &instrument[n]);
1038
1039 /*
1040 * Also store the per-worker detail.
1041 *
1042 * Worker instrumentation should be allocated in the same context as the
1043 * regular instrumentation information, which is the per-query context.
1044 * Switch into per-query memory context.
1045 */
1046 oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1047 ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1048 planstate->worker_instrument =
1049 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1050 MemoryContextSwitchTo(oldcontext);
1051
1052 planstate->worker_instrument->num_workers = instrumentation->num_workers;
1053 memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1054
1055 /* Perform any node-type-specific work that needs to be done. */
1056 switch (nodeTag(planstate))
1057 {
1058 case T_SortState:
1059 ExecSortRetrieveInstrumentation((SortState *) planstate);
1060 break;
1061 case T_IncrementalSortState:
1062 ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1063 break;
1064 case T_HashState:
1065 ExecHashRetrieveInstrumentation((HashState *) planstate);
1066 break;
1067 case T_AggState:
1068 ExecAggRetrieveInstrumentation((AggState *) planstate);
1069 break;
1070 case T_MemoizeState:
1071 ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1072 break;
1073 default:
1074 break;
1075 }
1076
1077 return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1078 instrumentation);
1079 }
1080
1081 /*
1082 * Add up the workers' JIT instrumentation from dynamic shared memory.
1083 */
1084 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1085 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1086 SharedJitInstrumentation *shared_jit)
1087 {
1088 JitInstrumentation *combined;
1089 int ibytes;
1090
1091 int n;
1092
1093 /*
1094 * Accumulate worker JIT instrumentation into the combined JIT
1095 * instrumentation, allocating it if required.
1096 */
1097 if (!planstate->state->es_jit_worker_instr)
1098 planstate->state->es_jit_worker_instr =
1099 MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1100 combined = planstate->state->es_jit_worker_instr;
1101
1102 /* Accumulate all the workers' instrumentations. */
1103 for (n = 0; n < shared_jit->num_workers; ++n)
1104 InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1105
1106 /*
1107 * Store the per-worker detail.
1108 *
1109 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1110 * instrumentation in per-query context.
1111 */
1112 ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1113 + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1114 planstate->worker_jit_instrument =
1115 MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1116
1117 memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1118 }
1119
1120 /*
1121 * Finish parallel execution. We wait for parallel workers to finish, and
1122 * accumulate their buffer/WAL usage.
1123 */
1124 void
ExecParallelFinish(ParallelExecutorInfo * pei)1125 ExecParallelFinish(ParallelExecutorInfo *pei)
1126 {
1127 int nworkers = pei->pcxt->nworkers_launched;
1128 int i;
1129
1130 /* Make this be a no-op if called twice in a row. */
1131 if (pei->finished)
1132 return;
1133
1134 /*
1135 * Detach from tuple queues ASAP, so that any still-active workers will
1136 * notice that no further results are wanted.
1137 */
1138 if (pei->tqueue != NULL)
1139 {
1140 for (i = 0; i < nworkers; i++)
1141 shm_mq_detach(pei->tqueue[i]);
1142 pfree(pei->tqueue);
1143 pei->tqueue = NULL;
1144 }
1145
1146 /*
1147 * While we're waiting for the workers to finish, let's get rid of the
1148 * tuple queue readers. (Any other local cleanup could be done here too.)
1149 */
1150 if (pei->reader != NULL)
1151 {
1152 for (i = 0; i < nworkers; i++)
1153 DestroyTupleQueueReader(pei->reader[i]);
1154 pfree(pei->reader);
1155 pei->reader = NULL;
1156 }
1157
1158 /* Now wait for the workers to finish. */
1159 WaitForParallelWorkersToFinish(pei->pcxt);
1160
1161 /*
1162 * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1163 * finish, or we might get incomplete data.)
1164 */
1165 for (i = 0; i < nworkers; i++)
1166 InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1167
1168 pei->finished = true;
1169 }
1170
1171 /*
1172 * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1173 * resources still exist after ExecParallelFinish. We separate these
1174 * routines because someone might want to examine the contents of the DSM
1175 * after ExecParallelFinish and before calling this routine.
1176 */
1177 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1178 ExecParallelCleanup(ParallelExecutorInfo *pei)
1179 {
1180 /* Accumulate instrumentation, if any. */
1181 if (pei->instrumentation)
1182 ExecParallelRetrieveInstrumentation(pei->planstate,
1183 pei->instrumentation);
1184
1185 /* Accumulate JIT instrumentation, if any. */
1186 if (pei->jit_instrumentation)
1187 ExecParallelRetrieveJitInstrumentation(pei->planstate,
1188 pei->jit_instrumentation);
1189
1190 /* Free any serialized parameters. */
1191 if (DsaPointerIsValid(pei->param_exec))
1192 {
1193 dsa_free(pei->area, pei->param_exec);
1194 pei->param_exec = InvalidDsaPointer;
1195 }
1196 if (pei->area != NULL)
1197 {
1198 dsa_detach(pei->area);
1199 pei->area = NULL;
1200 }
1201 if (pei->pcxt != NULL)
1202 {
1203 DestroyParallelContext(pei->pcxt);
1204 pei->pcxt = NULL;
1205 }
1206 pfree(pei);
1207 }
1208
1209 /*
1210 * Create a DestReceiver to write tuples we produce to the shm_mq designated
1211 * for that purpose.
1212 */
1213 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1214 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1215 {
1216 char *mqspace;
1217 shm_mq *mq;
1218
1219 mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1220 mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1221 mq = (shm_mq *) mqspace;
1222 shm_mq_set_sender(mq, MyProc);
1223 return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1224 }
1225
1226 /*
1227 * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1228 */
1229 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1230 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1231 int instrument_options)
1232 {
1233 char *pstmtspace;
1234 char *paramspace;
1235 PlannedStmt *pstmt;
1236 ParamListInfo paramLI;
1237 char *queryString;
1238
1239 /* Get the query string from shared memory */
1240 queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1241
1242 /* Reconstruct leader-supplied PlannedStmt. */
1243 pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1244 pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1245
1246 /* Reconstruct ParamListInfo. */
1247 paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1248 paramLI = RestoreParamList(¶mspace);
1249
1250 /* Create a QueryDesc for the query. */
1251 return CreateQueryDesc(pstmt,
1252 queryString,
1253 GetActiveSnapshot(), InvalidSnapshot,
1254 receiver, paramLI, NULL, instrument_options);
1255 }
1256
1257 /*
1258 * Copy instrumentation information from this node and its descendants into
1259 * dynamic shared memory, so that the parallel leader can retrieve it.
1260 */
1261 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1262 ExecParallelReportInstrumentation(PlanState *planstate,
1263 SharedExecutorInstrumentation *instrumentation)
1264 {
1265 int i;
1266 int plan_node_id = planstate->plan->plan_node_id;
1267 Instrumentation *instrument;
1268
1269 InstrEndLoop(planstate->instrument);
1270
1271 /*
1272 * If we shuffled the plan_node_id values in ps_instrument into sorted
1273 * order, we could use binary search here. This might matter someday if
1274 * we're pushing down sufficiently large plan trees. For now, do it the
1275 * slow, dumb way.
1276 */
1277 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1278 if (instrumentation->plan_node_id[i] == plan_node_id)
1279 break;
1280 if (i >= instrumentation->num_plan_nodes)
1281 elog(ERROR, "plan node %d not found", plan_node_id);
1282
1283 /*
1284 * Add our statistics to the per-node, per-worker totals. It's possible
1285 * that this could happen more than once if we relaunched workers.
1286 */
1287 instrument = GetInstrumentationArray(instrumentation);
1288 instrument += i * instrumentation->num_workers;
1289 Assert(IsParallelWorker());
1290 Assert(ParallelWorkerNumber < instrumentation->num_workers);
1291 InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1292
1293 return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1294 instrumentation);
1295 }
1296
1297 /*
1298 * Initialize the PlanState and its descendants with the information
1299 * retrieved from shared memory. This has to be done once the PlanState
1300 * is allocated and initialized by executor; that is, after ExecutorStart().
1301 */
1302 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1303 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1304 {
1305 if (planstate == NULL)
1306 return false;
1307
1308 switch (nodeTag(planstate))
1309 {
1310 case T_SeqScanState:
1311 if (planstate->plan->parallel_aware)
1312 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1313 break;
1314 case T_IndexScanState:
1315 if (planstate->plan->parallel_aware)
1316 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1317 pwcxt);
1318 break;
1319 case T_IndexOnlyScanState:
1320 if (planstate->plan->parallel_aware)
1321 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1322 pwcxt);
1323 break;
1324 case T_ForeignScanState:
1325 if (planstate->plan->parallel_aware)
1326 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1327 pwcxt);
1328 break;
1329 case T_AppendState:
1330 if (planstate->plan->parallel_aware)
1331 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1332 break;
1333 case T_CustomScanState:
1334 if (planstate->plan->parallel_aware)
1335 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1336 pwcxt);
1337 break;
1338 case T_BitmapHeapScanState:
1339 if (planstate->plan->parallel_aware)
1340 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1341 pwcxt);
1342 break;
1343 case T_HashJoinState:
1344 if (planstate->plan->parallel_aware)
1345 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1346 pwcxt);
1347 break;
1348 case T_HashState:
1349 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1350 ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1351 break;
1352 case T_SortState:
1353 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1354 ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1355 break;
1356 case T_IncrementalSortState:
1357 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1358 ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1359 pwcxt);
1360 break;
1361 case T_AggState:
1362 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1363 ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1364 break;
1365 case T_MemoizeState:
1366 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1367 ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1368 break;
1369 default:
1370 break;
1371 }
1372
1373 return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1374 pwcxt);
1375 }
1376
1377 /*
1378 * Main entrypoint for parallel query worker processes.
1379 *
1380 * We reach this function from ParallelWorkerMain, so the setup necessary to
1381 * create a sensible parallel environment has already been done;
1382 * ParallelWorkerMain worries about stuff like the transaction state, combo
1383 * CID mappings, and GUC values, so we don't need to deal with any of that
1384 * here.
1385 *
1386 * Our job is to deal with concerns specific to the executor. The parallel
1387 * group leader will have stored a serialized PlannedStmt, and it's our job
1388 * to execute that plan and write the resulting tuples to the appropriate
1389 * tuple queue. Various bits of supporting information that we need in order
1390 * to do this are also stored in the dsm_segment and can be accessed through
1391 * the shm_toc.
1392 */
1393 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1394 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1395 {
1396 FixedParallelExecutorState *fpes;
1397 BufferUsage *buffer_usage;
1398 WalUsage *wal_usage;
1399 DestReceiver *receiver;
1400 QueryDesc *queryDesc;
1401 SharedExecutorInstrumentation *instrumentation;
1402 SharedJitInstrumentation *jit_instrumentation;
1403 int instrument_options = 0;
1404 void *area_space;
1405 dsa_area *area;
1406 ParallelWorkerContext pwcxt;
1407
1408 /* Get fixed-size state. */
1409 fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1410
1411 /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1412 receiver = ExecParallelGetReceiver(seg, toc);
1413 instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1414 if (instrumentation != NULL)
1415 instrument_options = instrumentation->instrument_options;
1416 jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1417 true);
1418 queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1419
1420 /* Setting debug_query_string for individual workers */
1421 debug_query_string = queryDesc->sourceText;
1422
1423 /* Report workers' query and queryId for monitoring purposes */
1424 pgstat_report_activity(STATE_RUNNING, debug_query_string);
1425
1426 /* Attach to the dynamic shared memory area. */
1427 area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1428 area = dsa_attach_in_place(area_space, seg);
1429
1430 /* Start up the executor */
1431 queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1432 ExecutorStart(queryDesc, fpes->eflags);
1433
1434 /* Special executor initialization steps for parallel workers */
1435 queryDesc->planstate->state->es_query_dsa = area;
1436 if (DsaPointerIsValid(fpes->param_exec))
1437 {
1438 char *paramexec_space;
1439
1440 paramexec_space = dsa_get_address(area, fpes->param_exec);
1441 RestoreParamExecParams(paramexec_space, queryDesc->estate);
1442
1443 }
1444 pwcxt.toc = toc;
1445 pwcxt.seg = seg;
1446 ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1447
1448 /* Pass down any tuple bound */
1449 ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1450
1451 /*
1452 * Prepare to track buffer/WAL usage during query execution.
1453 *
1454 * We do this after starting up the executor to match what happens in the
1455 * leader, which also doesn't count buffer accesses and WAL activity that
1456 * occur during executor startup.
1457 */
1458 InstrStartParallelQuery();
1459
1460 /*
1461 * Run the plan. If we specified a tuple bound, be careful not to demand
1462 * more tuples than that.
1463 */
1464 ExecutorRun(queryDesc,
1465 ForwardScanDirection,
1466 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1467 true);
1468
1469 /* Shut down the executor */
1470 ExecutorFinish(queryDesc);
1471
1472 /* Report buffer/WAL usage during parallel execution. */
1473 buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1474 wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1475 InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1476 &wal_usage[ParallelWorkerNumber]);
1477
1478 /* Report instrumentation data if any instrumentation options are set. */
1479 if (instrumentation != NULL)
1480 ExecParallelReportInstrumentation(queryDesc->planstate,
1481 instrumentation);
1482
1483 /* Report JIT instrumentation data if any */
1484 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1485 {
1486 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1487 jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1488 queryDesc->estate->es_jit->instr;
1489 }
1490
1491 /* Must do this after capturing instrumentation. */
1492 ExecutorEnd(queryDesc);
1493
1494 /* Cleanup. */
1495 dsa_detach(area);
1496 FreeQueryDesc(queryDesc);
1497 receiver->rDestroy(receiver);
1498 }
1499