1 /*-------------------------------------------------------------------------
2 *
3 * execParallel.c
4 * Support routines for parallel execution.
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * This file contains routines that are intended to support setting up,
10 * using, and tearing down a ParallelContext from within the PostgreSQL
11 * executor. The ParallelContext machinery will handle starting the
12 * workers and ensuring that their state generally matches that of the
13 * leader; see src/backend/access/transam/README.parallel for details.
14 * However, we must save and restore relevant executor state, such as
15 * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 * the actual plan to be passed down to the worker.
17 *
18 * IDENTIFICATION
19 * src/backend/executor/execParallel.c
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAgg.h"
29 #include "executor/nodeAppend.h"
30 #include "executor/nodeBitmapHeapscan.h"
31 #include "executor/nodeCustom.h"
32 #include "executor/nodeForeignscan.h"
33 #include "executor/nodeHash.h"
34 #include "executor/nodeHashjoin.h"
35 #include "executor/nodeIncrementalSort.h"
36 #include "executor/nodeIndexonlyscan.h"
37 #include "executor/nodeIndexscan.h"
38 #include "executor/nodeSeqscan.h"
39 #include "executor/nodeSort.h"
40 #include "executor/nodeSubplan.h"
41 #include "executor/tqueue.h"
42 #include "jit/jit.h"
43 #include "nodes/nodeFuncs.h"
44 #include "pgstat.h"
45 #include "storage/spin.h"
46 #include "tcop/tcopprot.h"
47 #include "utils/datum.h"
48 #include "utils/dsa.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/snapmgr.h"
52
53 /*
54 * Magic numbers for parallel executor communication. We use constants
55 * greater than any 32-bit integer here so that values < 2^32 can be used
56 * by individual parallel nodes to store their own state.
57 */
58 #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
59 #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
60 #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
61 #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
62 #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
63 #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
64 #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
65 #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
66 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67 #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
68
69 #define PARALLEL_TUPLE_QUEUE_SIZE 65536
70
71 /*
72 * Fixed-size random stuff that we need to pass to parallel workers.
73 */
74 typedef struct FixedParallelExecutorState
75 {
76 int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
77 dsa_pointer param_exec;
78 int eflags;
79 int jit_flags;
80 } FixedParallelExecutorState;
81
82 /*
83 * DSM structure for accumulating per-PlanState instrumentation.
84 *
85 * instrument_options: Same meaning here as in instrument.c.
86 *
87 * instrument_offset: Offset, relative to the start of this structure,
88 * of the first Instrumentation object. This will depend on the length of
89 * the plan_node_id array.
90 *
91 * num_workers: Number of workers.
92 *
93 * num_plan_nodes: Number of plan nodes.
94 *
95 * plan_node_id: Array of plan nodes for which we are gathering instrumentation
96 * from parallel workers. The length of this array is given by num_plan_nodes.
97 */
98 struct SharedExecutorInstrumentation
99 {
100 int instrument_options;
101 int instrument_offset;
102 int num_workers;
103 int num_plan_nodes;
104 int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
105 /* array of num_plan_nodes * num_workers Instrumentation objects follows */
106 };
107 #define GetInstrumentationArray(sei) \
108 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
109 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
110
111 /* Context object for ExecParallelEstimate. */
112 typedef struct ExecParallelEstimateContext
113 {
114 ParallelContext *pcxt;
115 int nnodes;
116 } ExecParallelEstimateContext;
117
118 /* Context object for ExecParallelInitializeDSM. */
119 typedef struct ExecParallelInitializeDSMContext
120 {
121 ParallelContext *pcxt;
122 SharedExecutorInstrumentation *instrumentation;
123 int nnodes;
124 } ExecParallelInitializeDSMContext;
125
126 /* Helper functions that run in the parallel leader. */
127 static char *ExecSerializePlan(Plan *plan, EState *estate);
128 static bool ExecParallelEstimate(PlanState *node,
129 ExecParallelEstimateContext *e);
130 static bool ExecParallelInitializeDSM(PlanState *node,
131 ExecParallelInitializeDSMContext *d);
132 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
133 bool reinitialize);
134 static bool ExecParallelReInitializeDSM(PlanState *planstate,
135 ParallelContext *pcxt);
136 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
137 SharedExecutorInstrumentation *instrumentation);
138
139 /* Helper function that runs in the parallel worker. */
140 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
141
142 /*
143 * Create a serialized representation of the plan to be sent to each worker.
144 */
145 static char *
ExecSerializePlan(Plan * plan,EState * estate)146 ExecSerializePlan(Plan *plan, EState *estate)
147 {
148 PlannedStmt *pstmt;
149 ListCell *lc;
150
151 /* We can't scribble on the original plan, so make a copy. */
152 plan = copyObject(plan);
153
154 /*
155 * The worker will start its own copy of the executor, and that copy will
156 * insert a junk filter if the toplevel node has any resjunk entries. We
157 * don't want that to happen, because while resjunk columns shouldn't be
158 * sent back to the user, here the tuples are coming back to another
159 * backend which may very well need them. So mutate the target list
160 * accordingly. This is sort of a hack; there might be better ways to do
161 * this...
162 */
163 foreach(lc, plan->targetlist)
164 {
165 TargetEntry *tle = lfirst_node(TargetEntry, lc);
166
167 tle->resjunk = false;
168 }
169
170 /*
171 * Create a dummy PlannedStmt. Most of the fields don't need to be valid
172 * for our purposes, but the worker will need at least a minimal
173 * PlannedStmt to start the executor.
174 */
175 pstmt = makeNode(PlannedStmt);
176 pstmt->commandType = CMD_SELECT;
177 pstmt->queryId = UINT64CONST(0);
178 pstmt->hasReturning = false;
179 pstmt->hasModifyingCTE = false;
180 pstmt->canSetTag = true;
181 pstmt->transientPlan = false;
182 pstmt->dependsOnRole = false;
183 pstmt->parallelModeNeeded = false;
184 pstmt->planTree = plan;
185 pstmt->rtable = estate->es_range_table;
186 pstmt->resultRelations = NIL;
187 pstmt->rootResultRelations = NIL;
188 pstmt->appendRelations = NIL;
189
190 /*
191 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
192 * for unsafe ones (so that the list indexes of the safe ones are
193 * preserved). This positively ensures that the worker won't try to run,
194 * or even do ExecInitNode on, an unsafe subplan. That's important to
195 * protect, eg, non-parallel-aware FDWs from getting into trouble.
196 */
197 pstmt->subplans = NIL;
198 foreach(lc, estate->es_plannedstmt->subplans)
199 {
200 Plan *subplan = (Plan *) lfirst(lc);
201
202 if (subplan && !subplan->parallel_safe)
203 subplan = NULL;
204 pstmt->subplans = lappend(pstmt->subplans, subplan);
205 }
206
207 pstmt->rewindPlanIDs = NULL;
208 pstmt->rowMarks = NIL;
209 pstmt->relationOids = NIL;
210 pstmt->invalItems = NIL; /* workers can't replan anyway... */
211 pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
212 pstmt->utilityStmt = NULL;
213 pstmt->stmt_location = -1;
214 pstmt->stmt_len = -1;
215
216 /* Return serialized copy of our dummy PlannedStmt. */
217 return nodeToString(pstmt);
218 }
219
220 /*
221 * Parallel-aware plan nodes (and occasionally others) may need some state
222 * which is shared across all parallel workers. Before we size the DSM, give
223 * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
224 * &pcxt->estimator.
225 *
226 * While we're at it, count the number of PlanState nodes in the tree, so
227 * we know how many Instrumentation structures we need.
228 */
229 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)230 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
231 {
232 if (planstate == NULL)
233 return false;
234
235 /* Count this node. */
236 e->nnodes++;
237
238 switch (nodeTag(planstate))
239 {
240 case T_SeqScanState:
241 if (planstate->plan->parallel_aware)
242 ExecSeqScanEstimate((SeqScanState *) planstate,
243 e->pcxt);
244 break;
245 case T_IndexScanState:
246 if (planstate->plan->parallel_aware)
247 ExecIndexScanEstimate((IndexScanState *) planstate,
248 e->pcxt);
249 break;
250 case T_IndexOnlyScanState:
251 if (planstate->plan->parallel_aware)
252 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
253 e->pcxt);
254 break;
255 case T_ForeignScanState:
256 if (planstate->plan->parallel_aware)
257 ExecForeignScanEstimate((ForeignScanState *) planstate,
258 e->pcxt);
259 break;
260 case T_AppendState:
261 if (planstate->plan->parallel_aware)
262 ExecAppendEstimate((AppendState *) planstate,
263 e->pcxt);
264 break;
265 case T_CustomScanState:
266 if (planstate->plan->parallel_aware)
267 ExecCustomScanEstimate((CustomScanState *) planstate,
268 e->pcxt);
269 break;
270 case T_BitmapHeapScanState:
271 if (planstate->plan->parallel_aware)
272 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
273 e->pcxt);
274 break;
275 case T_HashJoinState:
276 if (planstate->plan->parallel_aware)
277 ExecHashJoinEstimate((HashJoinState *) planstate,
278 e->pcxt);
279 break;
280 case T_HashState:
281 /* even when not parallel-aware, for EXPLAIN ANALYZE */
282 ExecHashEstimate((HashState *) planstate, e->pcxt);
283 break;
284 case T_SortState:
285 /* even when not parallel-aware, for EXPLAIN ANALYZE */
286 ExecSortEstimate((SortState *) planstate, e->pcxt);
287 break;
288 case T_IncrementalSortState:
289 /* even when not parallel-aware, for EXPLAIN ANALYZE */
290 ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
291 break;
292 case T_AggState:
293 /* even when not parallel-aware, for EXPLAIN ANALYZE */
294 ExecAggEstimate((AggState *) planstate, e->pcxt);
295 break;
296 default:
297 break;
298 }
299
300 return planstate_tree_walker(planstate, ExecParallelEstimate, e);
301 }
302
303 /*
304 * Estimate the amount of space required to serialize the indicated parameters.
305 */
306 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)307 EstimateParamExecSpace(EState *estate, Bitmapset *params)
308 {
309 int paramid;
310 Size sz = sizeof(int);
311
312 paramid = -1;
313 while ((paramid = bms_next_member(params, paramid)) >= 0)
314 {
315 Oid typeOid;
316 int16 typLen;
317 bool typByVal;
318 ParamExecData *prm;
319
320 prm = &(estate->es_param_exec_vals[paramid]);
321 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
322 paramid);
323
324 sz = add_size(sz, sizeof(int)); /* space for paramid */
325
326 /* space for datum/isnull */
327 if (OidIsValid(typeOid))
328 get_typlenbyval(typeOid, &typLen, &typByVal);
329 else
330 {
331 /* If no type OID, assume by-value, like copyParamList does. */
332 typLen = sizeof(Datum);
333 typByVal = true;
334 }
335 sz = add_size(sz,
336 datumEstimateSpace(prm->value, prm->isnull,
337 typByVal, typLen));
338 }
339 return sz;
340 }
341
342 /*
343 * Serialize specified PARAM_EXEC parameters.
344 *
345 * We write the number of parameters first, as a 4-byte integer, and then
346 * write details for each parameter in turn. The details for each parameter
347 * consist of a 4-byte paramid (location of param in execution time internal
348 * parameter array) and then the datum as serialized by datumSerialize().
349 */
350 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)351 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
352 {
353 Size size;
354 int nparams;
355 int paramid;
356 ParamExecData *prm;
357 dsa_pointer handle;
358 char *start_address;
359
360 /* Allocate enough space for the current parameter values. */
361 size = EstimateParamExecSpace(estate, params);
362 handle = dsa_allocate(area, size);
363 start_address = dsa_get_address(area, handle);
364
365 /* First write the number of parameters as a 4-byte integer. */
366 nparams = bms_num_members(params);
367 memcpy(start_address, &nparams, sizeof(int));
368 start_address += sizeof(int);
369
370 /* Write details for each parameter in turn. */
371 paramid = -1;
372 while ((paramid = bms_next_member(params, paramid)) >= 0)
373 {
374 Oid typeOid;
375 int16 typLen;
376 bool typByVal;
377
378 prm = &(estate->es_param_exec_vals[paramid]);
379 typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
380 paramid);
381
382 /* Write paramid. */
383 memcpy(start_address, ¶mid, sizeof(int));
384 start_address += sizeof(int);
385
386 /* Write datum/isnull */
387 if (OidIsValid(typeOid))
388 get_typlenbyval(typeOid, &typLen, &typByVal);
389 else
390 {
391 /* If no type OID, assume by-value, like copyParamList does. */
392 typLen = sizeof(Datum);
393 typByVal = true;
394 }
395 datumSerialize(prm->value, prm->isnull, typByVal, typLen,
396 &start_address);
397 }
398
399 return handle;
400 }
401
402 /*
403 * Restore specified PARAM_EXEC parameters.
404 */
405 static void
RestoreParamExecParams(char * start_address,EState * estate)406 RestoreParamExecParams(char *start_address, EState *estate)
407 {
408 int nparams;
409 int i;
410 int paramid;
411
412 memcpy(&nparams, start_address, sizeof(int));
413 start_address += sizeof(int);
414
415 for (i = 0; i < nparams; i++)
416 {
417 ParamExecData *prm;
418
419 /* Read paramid */
420 memcpy(¶mid, start_address, sizeof(int));
421 start_address += sizeof(int);
422 prm = &(estate->es_param_exec_vals[paramid]);
423
424 /* Read datum/isnull. */
425 prm->value = datumRestore(&start_address, &prm->isnull);
426 prm->execPlan = NULL;
427 }
428 }
429
430 /*
431 * Initialize the dynamic shared memory segment that will be used to control
432 * parallel execution.
433 */
434 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)435 ExecParallelInitializeDSM(PlanState *planstate,
436 ExecParallelInitializeDSMContext *d)
437 {
438 if (planstate == NULL)
439 return false;
440
441 /* If instrumentation is enabled, initialize slot for this node. */
442 if (d->instrumentation != NULL)
443 d->instrumentation->plan_node_id[d->nnodes] =
444 planstate->plan->plan_node_id;
445
446 /* Count this node. */
447 d->nnodes++;
448
449 /*
450 * Call initializers for DSM-using plan nodes.
451 *
452 * Most plan nodes won't do anything here, but plan nodes that allocated
453 * DSM may need to initialize shared state in the DSM before parallel
454 * workers are launched. They can allocate the space they previously
455 * estimated using shm_toc_allocate, and add the keys they previously
456 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
457 */
458 switch (nodeTag(planstate))
459 {
460 case T_SeqScanState:
461 if (planstate->plan->parallel_aware)
462 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
463 d->pcxt);
464 break;
465 case T_IndexScanState:
466 if (planstate->plan->parallel_aware)
467 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
468 d->pcxt);
469 break;
470 case T_IndexOnlyScanState:
471 if (planstate->plan->parallel_aware)
472 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
473 d->pcxt);
474 break;
475 case T_ForeignScanState:
476 if (planstate->plan->parallel_aware)
477 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
478 d->pcxt);
479 break;
480 case T_AppendState:
481 if (planstate->plan->parallel_aware)
482 ExecAppendInitializeDSM((AppendState *) planstate,
483 d->pcxt);
484 break;
485 case T_CustomScanState:
486 if (planstate->plan->parallel_aware)
487 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
488 d->pcxt);
489 break;
490 case T_BitmapHeapScanState:
491 if (planstate->plan->parallel_aware)
492 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
493 d->pcxt);
494 break;
495 case T_HashJoinState:
496 if (planstate->plan->parallel_aware)
497 ExecHashJoinInitializeDSM((HashJoinState *) planstate,
498 d->pcxt);
499 break;
500 case T_HashState:
501 /* even when not parallel-aware, for EXPLAIN ANALYZE */
502 ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
503 break;
504 case T_SortState:
505 /* even when not parallel-aware, for EXPLAIN ANALYZE */
506 ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
507 break;
508 case T_IncrementalSortState:
509 /* even when not parallel-aware, for EXPLAIN ANALYZE */
510 ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
511 break;
512 case T_AggState:
513 /* even when not parallel-aware, for EXPLAIN ANALYZE */
514 ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
515 break;
516 default:
517 break;
518 }
519
520 return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
521 }
522
523 /*
524 * It sets up the response queues for backend workers to return tuples
525 * to the main backend and start the workers.
526 */
527 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)528 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
529 {
530 shm_mq_handle **responseq;
531 char *tqueuespace;
532 int i;
533
534 /* Skip this if no workers. */
535 if (pcxt->nworkers == 0)
536 return NULL;
537
538 /* Allocate memory for shared memory queue handles. */
539 responseq = (shm_mq_handle **)
540 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
541
542 /*
543 * If not reinitializing, allocate space from the DSM for the queues;
544 * otherwise, find the already allocated space.
545 */
546 if (!reinitialize)
547 tqueuespace =
548 shm_toc_allocate(pcxt->toc,
549 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
550 pcxt->nworkers));
551 else
552 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
553
554 /* Create the queues, and become the receiver for each. */
555 for (i = 0; i < pcxt->nworkers; ++i)
556 {
557 shm_mq *mq;
558
559 mq = shm_mq_create(tqueuespace +
560 ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
561 (Size) PARALLEL_TUPLE_QUEUE_SIZE);
562
563 shm_mq_set_receiver(mq, MyProc);
564 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
565 }
566
567 /* Add array of queues to shm_toc, so others can find it. */
568 if (!reinitialize)
569 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
570
571 /* Return array of handles. */
572 return responseq;
573 }
574
575 /*
576 * Sets up the required infrastructure for backend workers to perform
577 * execution and return results to the main backend.
578 */
579 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)580 ExecInitParallelPlan(PlanState *planstate, EState *estate,
581 Bitmapset *sendParams, int nworkers,
582 int64 tuples_needed)
583 {
584 ParallelExecutorInfo *pei;
585 ParallelContext *pcxt;
586 ExecParallelEstimateContext e;
587 ExecParallelInitializeDSMContext d;
588 FixedParallelExecutorState *fpes;
589 char *pstmt_data;
590 char *pstmt_space;
591 char *paramlistinfo_space;
592 BufferUsage *bufusage_space;
593 WalUsage *walusage_space;
594 SharedExecutorInstrumentation *instrumentation = NULL;
595 SharedJitInstrumentation *jit_instrumentation = NULL;
596 int pstmt_len;
597 int paramlistinfo_len;
598 int instrumentation_len = 0;
599 int jit_instrumentation_len = 0;
600 int instrument_offset = 0;
601 Size dsa_minsize = dsa_minimum_size();
602 char *query_string;
603 int query_len;
604
605 /*
606 * Force any initplan outputs that we're going to pass to workers to be
607 * evaluated, if they weren't already.
608 *
609 * For simplicity, we use the EState's per-output-tuple ExprContext here.
610 * That risks intra-query memory leakage, since we might pass through here
611 * many times before that ExprContext gets reset; but ExecSetParamPlan
612 * doesn't normally leak any memory in the context (see its comments), so
613 * it doesn't seem worth complicating this function's API to pass it a
614 * shorter-lived ExprContext. This might need to change someday.
615 */
616 ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
617
618 /* Allocate object for return value. */
619 pei = palloc0(sizeof(ParallelExecutorInfo));
620 pei->finished = false;
621 pei->planstate = planstate;
622
623 /* Fix up and serialize plan to be sent to workers. */
624 pstmt_data = ExecSerializePlan(planstate->plan, estate);
625
626 /* Create a parallel context. */
627 pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
628 pei->pcxt = pcxt;
629
630 /*
631 * Before telling the parallel context to create a dynamic shared memory
632 * segment, we need to figure out how big it should be. Estimate space
633 * for the various things we need to store.
634 */
635
636 /* Estimate space for fixed-size state. */
637 shm_toc_estimate_chunk(&pcxt->estimator,
638 sizeof(FixedParallelExecutorState));
639 shm_toc_estimate_keys(&pcxt->estimator, 1);
640
641 /* Estimate space for query text. */
642 query_len = strlen(estate->es_sourceText);
643 shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
644 shm_toc_estimate_keys(&pcxt->estimator, 1);
645
646 /* Estimate space for serialized PlannedStmt. */
647 pstmt_len = strlen(pstmt_data) + 1;
648 shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
649 shm_toc_estimate_keys(&pcxt->estimator, 1);
650
651 /* Estimate space for serialized ParamListInfo. */
652 paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
653 shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
654 shm_toc_estimate_keys(&pcxt->estimator, 1);
655
656 /*
657 * Estimate space for BufferUsage.
658 *
659 * If EXPLAIN is not in use and there are no extensions loaded that care,
660 * we could skip this. But we have no way of knowing whether anyone's
661 * looking at pgBufferUsage, so do it unconditionally.
662 */
663 shm_toc_estimate_chunk(&pcxt->estimator,
664 mul_size(sizeof(BufferUsage), pcxt->nworkers));
665 shm_toc_estimate_keys(&pcxt->estimator, 1);
666
667 /*
668 * Same thing for WalUsage.
669 */
670 shm_toc_estimate_chunk(&pcxt->estimator,
671 mul_size(sizeof(WalUsage), pcxt->nworkers));
672 shm_toc_estimate_keys(&pcxt->estimator, 1);
673
674 /* Estimate space for tuple queues. */
675 shm_toc_estimate_chunk(&pcxt->estimator,
676 mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
677 shm_toc_estimate_keys(&pcxt->estimator, 1);
678
679 /*
680 * Give parallel-aware nodes a chance to add to the estimates, and get a
681 * count of how many PlanState nodes there are.
682 */
683 e.pcxt = pcxt;
684 e.nnodes = 0;
685 ExecParallelEstimate(planstate, &e);
686
687 /* Estimate space for instrumentation, if required. */
688 if (estate->es_instrument)
689 {
690 instrumentation_len =
691 offsetof(SharedExecutorInstrumentation, plan_node_id) +
692 sizeof(int) * e.nnodes;
693 instrumentation_len = MAXALIGN(instrumentation_len);
694 instrument_offset = instrumentation_len;
695 instrumentation_len +=
696 mul_size(sizeof(Instrumentation),
697 mul_size(e.nnodes, nworkers));
698 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
699 shm_toc_estimate_keys(&pcxt->estimator, 1);
700
701 /* Estimate space for JIT instrumentation, if required. */
702 if (estate->es_jit_flags != PGJIT_NONE)
703 {
704 jit_instrumentation_len =
705 offsetof(SharedJitInstrumentation, jit_instr) +
706 sizeof(JitInstrumentation) * nworkers;
707 shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
708 shm_toc_estimate_keys(&pcxt->estimator, 1);
709 }
710 }
711
712 /* Estimate space for DSA area. */
713 shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
714 shm_toc_estimate_keys(&pcxt->estimator, 1);
715
716 /* Everyone's had a chance to ask for space, so now create the DSM. */
717 InitializeParallelDSM(pcxt);
718
719 /*
720 * OK, now we have a dynamic shared memory segment, and it should be big
721 * enough to store all of the data we estimated we would want to put into
722 * it, plus whatever general stuff (not specifically executor-related) the
723 * ParallelContext itself needs to store there. None of the space we
724 * asked for has been allocated or initialized yet, though, so do that.
725 */
726
727 /* Store fixed-size state. */
728 fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
729 fpes->tuples_needed = tuples_needed;
730 fpes->param_exec = InvalidDsaPointer;
731 fpes->eflags = estate->es_top_eflags;
732 fpes->jit_flags = estate->es_jit_flags;
733 shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
734
735 /* Store query string */
736 query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
737 memcpy(query_string, estate->es_sourceText, query_len + 1);
738 shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
739
740 /* Store serialized PlannedStmt. */
741 pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
742 memcpy(pstmt_space, pstmt_data, pstmt_len);
743 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
744
745 /* Store serialized ParamListInfo. */
746 paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
747 shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
748 SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
749
750 /* Allocate space for each worker's BufferUsage; no need to initialize. */
751 bufusage_space = shm_toc_allocate(pcxt->toc,
752 mul_size(sizeof(BufferUsage), pcxt->nworkers));
753 shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
754 pei->buffer_usage = bufusage_space;
755
756 /* Same for WalUsage. */
757 walusage_space = shm_toc_allocate(pcxt->toc,
758 mul_size(sizeof(WalUsage), pcxt->nworkers));
759 shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
760 pei->wal_usage = walusage_space;
761
762 /* Set up the tuple queues that the workers will write into. */
763 pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
764
765 /* We don't need the TupleQueueReaders yet, though. */
766 pei->reader = NULL;
767
768 /*
769 * If instrumentation options were supplied, allocate space for the data.
770 * It only gets partially initialized here; the rest happens during
771 * ExecParallelInitializeDSM.
772 */
773 if (estate->es_instrument)
774 {
775 Instrumentation *instrument;
776 int i;
777
778 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
779 instrumentation->instrument_options = estate->es_instrument;
780 instrumentation->instrument_offset = instrument_offset;
781 instrumentation->num_workers = nworkers;
782 instrumentation->num_plan_nodes = e.nnodes;
783 instrument = GetInstrumentationArray(instrumentation);
784 for (i = 0; i < nworkers * e.nnodes; ++i)
785 InstrInit(&instrument[i], estate->es_instrument);
786 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
787 instrumentation);
788 pei->instrumentation = instrumentation;
789
790 if (estate->es_jit_flags != PGJIT_NONE)
791 {
792 jit_instrumentation = shm_toc_allocate(pcxt->toc,
793 jit_instrumentation_len);
794 jit_instrumentation->num_workers = nworkers;
795 memset(jit_instrumentation->jit_instr, 0,
796 sizeof(JitInstrumentation) * nworkers);
797 shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
798 jit_instrumentation);
799 pei->jit_instrumentation = jit_instrumentation;
800 }
801 }
802
803 /*
804 * Create a DSA area that can be used by the leader and all workers.
805 * (However, if we failed to create a DSM and are using private memory
806 * instead, then skip this.)
807 */
808 if (pcxt->seg != NULL)
809 {
810 char *area_space;
811
812 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
813 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
814 pei->area = dsa_create_in_place(area_space, dsa_minsize,
815 LWTRANCHE_PARALLEL_QUERY_DSA,
816 pcxt->seg);
817
818 /*
819 * Serialize parameters, if any, using DSA storage. We don't dare use
820 * the main parallel query DSM for this because we might relaunch
821 * workers after the values have changed (and thus the amount of
822 * storage required has changed).
823 */
824 if (!bms_is_empty(sendParams))
825 {
826 pei->param_exec = SerializeParamExecParams(estate, sendParams,
827 pei->area);
828 fpes->param_exec = pei->param_exec;
829 }
830 }
831
832 /*
833 * Give parallel-aware nodes a chance to initialize their shared data.
834 * This also initializes the elements of instrumentation->ps_instrument,
835 * if it exists.
836 */
837 d.pcxt = pcxt;
838 d.instrumentation = instrumentation;
839 d.nnodes = 0;
840
841 /* Install our DSA area while initializing the plan. */
842 estate->es_query_dsa = pei->area;
843 ExecParallelInitializeDSM(planstate, &d);
844 estate->es_query_dsa = NULL;
845
846 /*
847 * Make sure that the world hasn't shifted under our feet. This could
848 * probably just be an Assert(), but let's be conservative for now.
849 */
850 if (e.nnodes != d.nnodes)
851 elog(ERROR, "inconsistent count of PlanState nodes");
852
853 /* OK, we're ready to rock and roll. */
854 return pei;
855 }
856
857 /*
858 * Set up tuple queue readers to read the results of a parallel subplan.
859 *
860 * This is separate from ExecInitParallelPlan() because we can launch the
861 * worker processes and let them start doing something before we do this.
862 */
863 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)864 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
865 {
866 int nworkers = pei->pcxt->nworkers_launched;
867 int i;
868
869 Assert(pei->reader == NULL);
870
871 if (nworkers > 0)
872 {
873 pei->reader = (TupleQueueReader **)
874 palloc(nworkers * sizeof(TupleQueueReader *));
875
876 for (i = 0; i < nworkers; i++)
877 {
878 shm_mq_set_handle(pei->tqueue[i],
879 pei->pcxt->worker[i].bgwhandle);
880 pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
881 }
882 }
883 }
884
885 /*
886 * Re-initialize the parallel executor shared memory state before launching
887 * a fresh batch of workers.
888 */
889 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)890 ExecParallelReinitialize(PlanState *planstate,
891 ParallelExecutorInfo *pei,
892 Bitmapset *sendParams)
893 {
894 EState *estate = planstate->state;
895 FixedParallelExecutorState *fpes;
896
897 /* Old workers must already be shut down */
898 Assert(pei->finished);
899
900 /*
901 * Force any initplan outputs that we're going to pass to workers to be
902 * evaluated, if they weren't already (see comments in
903 * ExecInitParallelPlan).
904 */
905 ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
906
907 ReinitializeParallelDSM(pei->pcxt);
908 pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
909 pei->reader = NULL;
910 pei->finished = false;
911
912 fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
913
914 /* Free any serialized parameters from the last round. */
915 if (DsaPointerIsValid(fpes->param_exec))
916 {
917 dsa_free(pei->area, fpes->param_exec);
918 fpes->param_exec = InvalidDsaPointer;
919 }
920
921 /* Serialize current parameter values if required. */
922 if (!bms_is_empty(sendParams))
923 {
924 pei->param_exec = SerializeParamExecParams(estate, sendParams,
925 pei->area);
926 fpes->param_exec = pei->param_exec;
927 }
928
929 /* Traverse plan tree and let each child node reset associated state. */
930 estate->es_query_dsa = pei->area;
931 ExecParallelReInitializeDSM(planstate, pei->pcxt);
932 estate->es_query_dsa = NULL;
933 }
934
935 /*
936 * Traverse plan tree to reinitialize per-node dynamic shared memory state
937 */
938 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)939 ExecParallelReInitializeDSM(PlanState *planstate,
940 ParallelContext *pcxt)
941 {
942 if (planstate == NULL)
943 return false;
944
945 /*
946 * Call reinitializers for DSM-using plan nodes.
947 */
948 switch (nodeTag(planstate))
949 {
950 case T_SeqScanState:
951 if (planstate->plan->parallel_aware)
952 ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
953 pcxt);
954 break;
955 case T_IndexScanState:
956 if (planstate->plan->parallel_aware)
957 ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
958 pcxt);
959 break;
960 case T_IndexOnlyScanState:
961 if (planstate->plan->parallel_aware)
962 ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
963 pcxt);
964 break;
965 case T_ForeignScanState:
966 if (planstate->plan->parallel_aware)
967 ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
968 pcxt);
969 break;
970 case T_AppendState:
971 if (planstate->plan->parallel_aware)
972 ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
973 break;
974 case T_CustomScanState:
975 if (planstate->plan->parallel_aware)
976 ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
977 pcxt);
978 break;
979 case T_BitmapHeapScanState:
980 if (planstate->plan->parallel_aware)
981 ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
982 pcxt);
983 break;
984 case T_HashJoinState:
985 if (planstate->plan->parallel_aware)
986 ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
987 pcxt);
988 break;
989 case T_HashState:
990 case T_SortState:
991 case T_IncrementalSortState:
992 /* these nodes have DSM state, but no reinitialization is required */
993 break;
994
995 default:
996 break;
997 }
998
999 return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1000 }
1001
1002 /*
1003 * Copy instrumentation information about this node and its descendants from
1004 * dynamic shared memory.
1005 */
1006 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1007 ExecParallelRetrieveInstrumentation(PlanState *planstate,
1008 SharedExecutorInstrumentation *instrumentation)
1009 {
1010 Instrumentation *instrument;
1011 int i;
1012 int n;
1013 int ibytes;
1014 int plan_node_id = planstate->plan->plan_node_id;
1015 MemoryContext oldcontext;
1016
1017 /* Find the instrumentation for this node. */
1018 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1019 if (instrumentation->plan_node_id[i] == plan_node_id)
1020 break;
1021 if (i >= instrumentation->num_plan_nodes)
1022 elog(ERROR, "plan node %d not found", plan_node_id);
1023
1024 /* Accumulate the statistics from all workers. */
1025 instrument = GetInstrumentationArray(instrumentation);
1026 instrument += i * instrumentation->num_workers;
1027 for (n = 0; n < instrumentation->num_workers; ++n)
1028 InstrAggNode(planstate->instrument, &instrument[n]);
1029
1030 /*
1031 * Also store the per-worker detail.
1032 *
1033 * Worker instrumentation should be allocated in the same context as the
1034 * regular instrumentation information, which is the per-query context.
1035 * Switch into per-query memory context.
1036 */
1037 oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1038 ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1039 planstate->worker_instrument =
1040 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1041 MemoryContextSwitchTo(oldcontext);
1042
1043 planstate->worker_instrument->num_workers = instrumentation->num_workers;
1044 memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1045
1046 /* Perform any node-type-specific work that needs to be done. */
1047 switch (nodeTag(planstate))
1048 {
1049 case T_SortState:
1050 ExecSortRetrieveInstrumentation((SortState *) planstate);
1051 break;
1052 case T_IncrementalSortState:
1053 ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1054 break;
1055 case T_HashState:
1056 ExecHashRetrieveInstrumentation((HashState *) planstate);
1057 break;
1058 case T_AggState:
1059 ExecAggRetrieveInstrumentation((AggState *) planstate);
1060 break;
1061 default:
1062 break;
1063 }
1064
1065 return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1066 instrumentation);
1067 }
1068
1069 /*
1070 * Add up the workers' JIT instrumentation from dynamic shared memory.
1071 */
1072 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1073 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1074 SharedJitInstrumentation *shared_jit)
1075 {
1076 JitInstrumentation *combined;
1077 int ibytes;
1078
1079 int n;
1080
1081 /*
1082 * Accumulate worker JIT instrumentation into the combined JIT
1083 * instrumentation, allocating it if required.
1084 */
1085 if (!planstate->state->es_jit_worker_instr)
1086 planstate->state->es_jit_worker_instr =
1087 MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1088 combined = planstate->state->es_jit_worker_instr;
1089
1090 /* Accumulate all the workers' instrumentations. */
1091 for (n = 0; n < shared_jit->num_workers; ++n)
1092 InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1093
1094 /*
1095 * Store the per-worker detail.
1096 *
1097 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1098 * instrumentation in per-query context.
1099 */
1100 ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1101 + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1102 planstate->worker_jit_instrument =
1103 MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1104
1105 memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1106 }
1107
1108 /*
1109 * Finish parallel execution. We wait for parallel workers to finish, and
1110 * accumulate their buffer/WAL usage.
1111 */
1112 void
ExecParallelFinish(ParallelExecutorInfo * pei)1113 ExecParallelFinish(ParallelExecutorInfo *pei)
1114 {
1115 int nworkers = pei->pcxt->nworkers_launched;
1116 int i;
1117
1118 /* Make this be a no-op if called twice in a row. */
1119 if (pei->finished)
1120 return;
1121
1122 /*
1123 * Detach from tuple queues ASAP, so that any still-active workers will
1124 * notice that no further results are wanted.
1125 */
1126 if (pei->tqueue != NULL)
1127 {
1128 for (i = 0; i < nworkers; i++)
1129 shm_mq_detach(pei->tqueue[i]);
1130 pfree(pei->tqueue);
1131 pei->tqueue = NULL;
1132 }
1133
1134 /*
1135 * While we're waiting for the workers to finish, let's get rid of the
1136 * tuple queue readers. (Any other local cleanup could be done here too.)
1137 */
1138 if (pei->reader != NULL)
1139 {
1140 for (i = 0; i < nworkers; i++)
1141 DestroyTupleQueueReader(pei->reader[i]);
1142 pfree(pei->reader);
1143 pei->reader = NULL;
1144 }
1145
1146 /* Now wait for the workers to finish. */
1147 WaitForParallelWorkersToFinish(pei->pcxt);
1148
1149 /*
1150 * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1151 * finish, or we might get incomplete data.)
1152 */
1153 for (i = 0; i < nworkers; i++)
1154 InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1155
1156 pei->finished = true;
1157 }
1158
1159 /*
1160 * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1161 * resources still exist after ExecParallelFinish. We separate these
1162 * routines because someone might want to examine the contents of the DSM
1163 * after ExecParallelFinish and before calling this routine.
1164 */
1165 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1166 ExecParallelCleanup(ParallelExecutorInfo *pei)
1167 {
1168 /* Accumulate instrumentation, if any. */
1169 if (pei->instrumentation)
1170 ExecParallelRetrieveInstrumentation(pei->planstate,
1171 pei->instrumentation);
1172
1173 /* Accumulate JIT instrumentation, if any. */
1174 if (pei->jit_instrumentation)
1175 ExecParallelRetrieveJitInstrumentation(pei->planstate,
1176 pei->jit_instrumentation);
1177
1178 /* Free any serialized parameters. */
1179 if (DsaPointerIsValid(pei->param_exec))
1180 {
1181 dsa_free(pei->area, pei->param_exec);
1182 pei->param_exec = InvalidDsaPointer;
1183 }
1184 if (pei->area != NULL)
1185 {
1186 dsa_detach(pei->area);
1187 pei->area = NULL;
1188 }
1189 if (pei->pcxt != NULL)
1190 {
1191 DestroyParallelContext(pei->pcxt);
1192 pei->pcxt = NULL;
1193 }
1194 pfree(pei);
1195 }
1196
1197 /*
1198 * Create a DestReceiver to write tuples we produce to the shm_mq designated
1199 * for that purpose.
1200 */
1201 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1202 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1203 {
1204 char *mqspace;
1205 shm_mq *mq;
1206
1207 mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1208 mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1209 mq = (shm_mq *) mqspace;
1210 shm_mq_set_sender(mq, MyProc);
1211 return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1212 }
1213
1214 /*
1215 * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1216 */
1217 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1218 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1219 int instrument_options)
1220 {
1221 char *pstmtspace;
1222 char *paramspace;
1223 PlannedStmt *pstmt;
1224 ParamListInfo paramLI;
1225 char *queryString;
1226
1227 /* Get the query string from shared memory */
1228 queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1229
1230 /* Reconstruct leader-supplied PlannedStmt. */
1231 pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1232 pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1233
1234 /* Reconstruct ParamListInfo. */
1235 paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1236 paramLI = RestoreParamList(¶mspace);
1237
1238 /* Create a QueryDesc for the query. */
1239 return CreateQueryDesc(pstmt,
1240 queryString,
1241 GetActiveSnapshot(), InvalidSnapshot,
1242 receiver, paramLI, NULL, instrument_options);
1243 }
1244
1245 /*
1246 * Copy instrumentation information from this node and its descendants into
1247 * dynamic shared memory, so that the parallel leader can retrieve it.
1248 */
1249 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1250 ExecParallelReportInstrumentation(PlanState *planstate,
1251 SharedExecutorInstrumentation *instrumentation)
1252 {
1253 int i;
1254 int plan_node_id = planstate->plan->plan_node_id;
1255 Instrumentation *instrument;
1256
1257 InstrEndLoop(planstate->instrument);
1258
1259 /*
1260 * If we shuffled the plan_node_id values in ps_instrument into sorted
1261 * order, we could use binary search here. This might matter someday if
1262 * we're pushing down sufficiently large plan trees. For now, do it the
1263 * slow, dumb way.
1264 */
1265 for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1266 if (instrumentation->plan_node_id[i] == plan_node_id)
1267 break;
1268 if (i >= instrumentation->num_plan_nodes)
1269 elog(ERROR, "plan node %d not found", plan_node_id);
1270
1271 /*
1272 * Add our statistics to the per-node, per-worker totals. It's possible
1273 * that this could happen more than once if we relaunched workers.
1274 */
1275 instrument = GetInstrumentationArray(instrumentation);
1276 instrument += i * instrumentation->num_workers;
1277 Assert(IsParallelWorker());
1278 Assert(ParallelWorkerNumber < instrumentation->num_workers);
1279 InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1280
1281 return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1282 instrumentation);
1283 }
1284
1285 /*
1286 * Initialize the PlanState and its descendants with the information
1287 * retrieved from shared memory. This has to be done once the PlanState
1288 * is allocated and initialized by executor; that is, after ExecutorStart().
1289 */
1290 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1291 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1292 {
1293 if (planstate == NULL)
1294 return false;
1295
1296 switch (nodeTag(planstate))
1297 {
1298 case T_SeqScanState:
1299 if (planstate->plan->parallel_aware)
1300 ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1301 break;
1302 case T_IndexScanState:
1303 if (planstate->plan->parallel_aware)
1304 ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1305 pwcxt);
1306 break;
1307 case T_IndexOnlyScanState:
1308 if (planstate->plan->parallel_aware)
1309 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1310 pwcxt);
1311 break;
1312 case T_ForeignScanState:
1313 if (planstate->plan->parallel_aware)
1314 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1315 pwcxt);
1316 break;
1317 case T_AppendState:
1318 if (planstate->plan->parallel_aware)
1319 ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1320 break;
1321 case T_CustomScanState:
1322 if (planstate->plan->parallel_aware)
1323 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1324 pwcxt);
1325 break;
1326 case T_BitmapHeapScanState:
1327 if (planstate->plan->parallel_aware)
1328 ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1329 pwcxt);
1330 break;
1331 case T_HashJoinState:
1332 if (planstate->plan->parallel_aware)
1333 ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1334 pwcxt);
1335 break;
1336 case T_HashState:
1337 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1338 ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1339 break;
1340 case T_SortState:
1341 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1342 ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1343 break;
1344 case T_IncrementalSortState:
1345 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1346 ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1347 pwcxt);
1348 break;
1349 case T_AggState:
1350 /* even when not parallel-aware, for EXPLAIN ANALYZE */
1351 ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1352 break;
1353 default:
1354 break;
1355 }
1356
1357 return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1358 pwcxt);
1359 }
1360
1361 /*
1362 * Main entrypoint for parallel query worker processes.
1363 *
1364 * We reach this function from ParallelWorkerMain, so the setup necessary to
1365 * create a sensible parallel environment has already been done;
1366 * ParallelWorkerMain worries about stuff like the transaction state, combo
1367 * CID mappings, and GUC values, so we don't need to deal with any of that
1368 * here.
1369 *
1370 * Our job is to deal with concerns specific to the executor. The parallel
1371 * group leader will have stored a serialized PlannedStmt, and it's our job
1372 * to execute that plan and write the resulting tuples to the appropriate
1373 * tuple queue. Various bits of supporting information that we need in order
1374 * to do this are also stored in the dsm_segment and can be accessed through
1375 * the shm_toc.
1376 */
1377 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1378 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1379 {
1380 FixedParallelExecutorState *fpes;
1381 BufferUsage *buffer_usage;
1382 WalUsage *wal_usage;
1383 DestReceiver *receiver;
1384 QueryDesc *queryDesc;
1385 SharedExecutorInstrumentation *instrumentation;
1386 SharedJitInstrumentation *jit_instrumentation;
1387 int instrument_options = 0;
1388 void *area_space;
1389 dsa_area *area;
1390 ParallelWorkerContext pwcxt;
1391
1392 /* Get fixed-size state. */
1393 fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1394
1395 /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1396 receiver = ExecParallelGetReceiver(seg, toc);
1397 instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1398 if (instrumentation != NULL)
1399 instrument_options = instrumentation->instrument_options;
1400 jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1401 true);
1402 queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1403
1404 /* Setting debug_query_string for individual workers */
1405 debug_query_string = queryDesc->sourceText;
1406
1407 /* Report workers' query for monitoring purposes */
1408 pgstat_report_activity(STATE_RUNNING, debug_query_string);
1409
1410 /* Attach to the dynamic shared memory area. */
1411 area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1412 area = dsa_attach_in_place(area_space, seg);
1413
1414 /* Start up the executor */
1415 queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1416 ExecutorStart(queryDesc, fpes->eflags);
1417
1418 /* Special executor initialization steps for parallel workers */
1419 queryDesc->planstate->state->es_query_dsa = area;
1420 if (DsaPointerIsValid(fpes->param_exec))
1421 {
1422 char *paramexec_space;
1423
1424 paramexec_space = dsa_get_address(area, fpes->param_exec);
1425 RestoreParamExecParams(paramexec_space, queryDesc->estate);
1426
1427 }
1428 pwcxt.toc = toc;
1429 pwcxt.seg = seg;
1430 ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1431
1432 /* Pass down any tuple bound */
1433 ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1434
1435 /*
1436 * Prepare to track buffer/WAL usage during query execution.
1437 *
1438 * We do this after starting up the executor to match what happens in the
1439 * leader, which also doesn't count buffer accesses and WAL activity that
1440 * occur during executor startup.
1441 */
1442 InstrStartParallelQuery();
1443
1444 /*
1445 * Run the plan. If we specified a tuple bound, be careful not to demand
1446 * more tuples than that.
1447 */
1448 ExecutorRun(queryDesc,
1449 ForwardScanDirection,
1450 fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1451 true);
1452
1453 /* Shut down the executor */
1454 ExecutorFinish(queryDesc);
1455
1456 /* Report buffer/WAL usage during parallel execution. */
1457 buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1458 wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1459 InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1460 &wal_usage[ParallelWorkerNumber]);
1461
1462 /* Report instrumentation data if any instrumentation options are set. */
1463 if (instrumentation != NULL)
1464 ExecParallelReportInstrumentation(queryDesc->planstate,
1465 instrumentation);
1466
1467 /* Report JIT instrumentation data if any */
1468 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1469 {
1470 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1471 jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1472 queryDesc->estate->es_jit->instr;
1473 }
1474
1475 /* Must do this after capturing instrumentation. */
1476 ExecutorEnd(queryDesc);
1477
1478 /* Cleanup. */
1479 dsa_detach(area);
1480 FreeQueryDesc(queryDesc);
1481 receiver->rDestroy(receiver);
1482 }
1483