1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *	  Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *	  src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAppend.h"
29 #include "executor/nodeBitmapHeapscan.h"
30 #include "executor/nodeCustom.h"
31 #include "executor/nodeForeignscan.h"
32 #include "executor/nodeHash.h"
33 #include "executor/nodeHashjoin.h"
34 #include "executor/nodeIndexscan.h"
35 #include "executor/nodeIndexonlyscan.h"
36 #include "executor/nodeSeqscan.h"
37 #include "executor/nodeSort.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "jit/jit.h"
41 #include "nodes/nodeFuncs.h"
42 #include "storage/spin.h"
43 #include "tcop/tcopprot.h"
44 #include "utils/datum.h"
45 #include "utils/dsa.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/snapmgr.h"
49 #include "pgstat.h"
50 
51 /*
52  * Magic numbers for parallel executor communication.  We use constants
53  * greater than any 32-bit integer here so that values < 2^32 can be used
54  * by individual parallel nodes to store their own state.
55  */
56 #define PARALLEL_KEY_EXECUTOR_FIXED		UINT64CONST(0xE000000000000001)
57 #define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000002)
58 #define PARALLEL_KEY_PARAMLISTINFO		UINT64CONST(0xE000000000000003)
59 #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000004)
60 #define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000005)
61 #define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000006)
62 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
63 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
64 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
65 
66 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
67 
68 /*
69  * Fixed-size random stuff that we need to pass to parallel workers.
70  */
71 typedef struct FixedParallelExecutorState
72 {
73 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
74 	dsa_pointer param_exec;
75 	int			eflags;
76 	int			jit_flags;
77 } FixedParallelExecutorState;
78 
79 /*
80  * DSM structure for accumulating per-PlanState instrumentation.
81  *
82  * instrument_options: Same meaning here as in instrument.c.
83  *
84  * instrument_offset: Offset, relative to the start of this structure,
85  * of the first Instrumentation object.  This will depend on the length of
86  * the plan_node_id array.
87  *
88  * num_workers: Number of workers.
89  *
90  * num_plan_nodes: Number of plan nodes.
91  *
92  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
93  * from parallel workers.  The length of this array is given by num_plan_nodes.
94  */
95 struct SharedExecutorInstrumentation
96 {
97 	int			instrument_options;
98 	int			instrument_offset;
99 	int			num_workers;
100 	int			num_plan_nodes;
101 	int			plan_node_id[FLEXIBLE_ARRAY_MEMBER];
102 	/* array of num_plan_nodes * num_workers Instrumentation objects follows */
103 };
104 #define GetInstrumentationArray(sei) \
105 	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
106 	 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
107 
108 /* Context object for ExecParallelEstimate. */
109 typedef struct ExecParallelEstimateContext
110 {
111 	ParallelContext *pcxt;
112 	int			nnodes;
113 } ExecParallelEstimateContext;
114 
115 /* Context object for ExecParallelInitializeDSM. */
116 typedef struct ExecParallelInitializeDSMContext
117 {
118 	ParallelContext *pcxt;
119 	SharedExecutorInstrumentation *instrumentation;
120 	int			nnodes;
121 } ExecParallelInitializeDSMContext;
122 
123 /* Helper functions that run in the parallel leader. */
124 static char *ExecSerializePlan(Plan *plan, EState *estate);
125 static bool ExecParallelEstimate(PlanState *node,
126 								 ExecParallelEstimateContext *e);
127 static bool ExecParallelInitializeDSM(PlanState *node,
128 									  ExecParallelInitializeDSMContext *d);
129 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
130 													bool reinitialize);
131 static bool ExecParallelReInitializeDSM(PlanState *planstate,
132 										ParallelContext *pcxt);
133 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
134 												SharedExecutorInstrumentation *instrumentation);
135 
136 /* Helper function that runs in the parallel worker. */
137 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
138 
139 /*
140  * Create a serialized representation of the plan to be sent to each worker.
141  */
142 static char *
ExecSerializePlan(Plan * plan,EState * estate)143 ExecSerializePlan(Plan *plan, EState *estate)
144 {
145 	PlannedStmt *pstmt;
146 	ListCell   *lc;
147 
148 	/* We can't scribble on the original plan, so make a copy. */
149 	plan = copyObject(plan);
150 
151 	/*
152 	 * The worker will start its own copy of the executor, and that copy will
153 	 * insert a junk filter if the toplevel node has any resjunk entries. We
154 	 * don't want that to happen, because while resjunk columns shouldn't be
155 	 * sent back to the user, here the tuples are coming back to another
156 	 * backend which may very well need them.  So mutate the target list
157 	 * accordingly.  This is sort of a hack; there might be better ways to do
158 	 * this...
159 	 */
160 	foreach(lc, plan->targetlist)
161 	{
162 		TargetEntry *tle = lfirst_node(TargetEntry, lc);
163 
164 		tle->resjunk = false;
165 	}
166 
167 	/*
168 	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
169 	 * for our purposes, but the worker will need at least a minimal
170 	 * PlannedStmt to start the executor.
171 	 */
172 	pstmt = makeNode(PlannedStmt);
173 	pstmt->commandType = CMD_SELECT;
174 	pstmt->queryId = UINT64CONST(0);
175 	pstmt->hasReturning = false;
176 	pstmt->hasModifyingCTE = false;
177 	pstmt->canSetTag = true;
178 	pstmt->transientPlan = false;
179 	pstmt->dependsOnRole = false;
180 	pstmt->parallelModeNeeded = false;
181 	pstmt->planTree = plan;
182 	pstmt->rtable = estate->es_range_table;
183 	pstmt->resultRelations = NIL;
184 
185 	/*
186 	 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
187 	 * for unsafe ones (so that the list indexes of the safe ones are
188 	 * preserved).  This positively ensures that the worker won't try to run,
189 	 * or even do ExecInitNode on, an unsafe subplan.  That's important to
190 	 * protect, eg, non-parallel-aware FDWs from getting into trouble.
191 	 */
192 	pstmt->subplans = NIL;
193 	foreach(lc, estate->es_plannedstmt->subplans)
194 	{
195 		Plan	   *subplan = (Plan *) lfirst(lc);
196 
197 		if (subplan && !subplan->parallel_safe)
198 			subplan = NULL;
199 		pstmt->subplans = lappend(pstmt->subplans, subplan);
200 	}
201 
202 	pstmt->rewindPlanIDs = NULL;
203 	pstmt->rowMarks = NIL;
204 	pstmt->relationOids = NIL;
205 	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
206 	pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
207 	pstmt->utilityStmt = NULL;
208 	pstmt->stmt_location = -1;
209 	pstmt->stmt_len = -1;
210 
211 	/* Return serialized copy of our dummy PlannedStmt. */
212 	return nodeToString(pstmt);
213 }
214 
215 /*
216  * Parallel-aware plan nodes (and occasionally others) may need some state
217  * which is shared across all parallel workers.  Before we size the DSM, give
218  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
219  * &pcxt->estimator.
220  *
221  * While we're at it, count the number of PlanState nodes in the tree, so
222  * we know how many Instrumentation structures we need.
223  */
224 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)225 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
226 {
227 	if (planstate == NULL)
228 		return false;
229 
230 	/* Count this node. */
231 	e->nnodes++;
232 
233 	switch (nodeTag(planstate))
234 	{
235 		case T_SeqScanState:
236 			if (planstate->plan->parallel_aware)
237 				ExecSeqScanEstimate((SeqScanState *) planstate,
238 									e->pcxt);
239 			break;
240 		case T_IndexScanState:
241 			if (planstate->plan->parallel_aware)
242 				ExecIndexScanEstimate((IndexScanState *) planstate,
243 									  e->pcxt);
244 			break;
245 		case T_IndexOnlyScanState:
246 			if (planstate->plan->parallel_aware)
247 				ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
248 										  e->pcxt);
249 			break;
250 		case T_ForeignScanState:
251 			if (planstate->plan->parallel_aware)
252 				ExecForeignScanEstimate((ForeignScanState *) planstate,
253 										e->pcxt);
254 			break;
255 		case T_AppendState:
256 			if (planstate->plan->parallel_aware)
257 				ExecAppendEstimate((AppendState *) planstate,
258 								   e->pcxt);
259 			break;
260 		case T_CustomScanState:
261 			if (planstate->plan->parallel_aware)
262 				ExecCustomScanEstimate((CustomScanState *) planstate,
263 									   e->pcxt);
264 			break;
265 		case T_BitmapHeapScanState:
266 			if (planstate->plan->parallel_aware)
267 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
268 									   e->pcxt);
269 			break;
270 		case T_HashJoinState:
271 			if (planstate->plan->parallel_aware)
272 				ExecHashJoinEstimate((HashJoinState *) planstate,
273 									 e->pcxt);
274 			break;
275 		case T_HashState:
276 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
277 			ExecHashEstimate((HashState *) planstate, e->pcxt);
278 			break;
279 		case T_SortState:
280 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
281 			ExecSortEstimate((SortState *) planstate, e->pcxt);
282 			break;
283 
284 		default:
285 			break;
286 	}
287 
288 	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
289 }
290 
291 /*
292  * Estimate the amount of space required to serialize the indicated parameters.
293  */
294 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)295 EstimateParamExecSpace(EState *estate, Bitmapset *params)
296 {
297 	int			paramid;
298 	Size		sz = sizeof(int);
299 
300 	paramid = -1;
301 	while ((paramid = bms_next_member(params, paramid)) >= 0)
302 	{
303 		Oid			typeOid;
304 		int16		typLen;
305 		bool		typByVal;
306 		ParamExecData *prm;
307 
308 		prm = &(estate->es_param_exec_vals[paramid]);
309 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
310 							   paramid);
311 
312 		sz = add_size(sz, sizeof(int)); /* space for paramid */
313 
314 		/* space for datum/isnull */
315 		if (OidIsValid(typeOid))
316 			get_typlenbyval(typeOid, &typLen, &typByVal);
317 		else
318 		{
319 			/* If no type OID, assume by-value, like copyParamList does. */
320 			typLen = sizeof(Datum);
321 			typByVal = true;
322 		}
323 		sz = add_size(sz,
324 					  datumEstimateSpace(prm->value, prm->isnull,
325 										 typByVal, typLen));
326 	}
327 	return sz;
328 }
329 
330 /*
331  * Serialize specified PARAM_EXEC parameters.
332  *
333  * We write the number of parameters first, as a 4-byte integer, and then
334  * write details for each parameter in turn.  The details for each parameter
335  * consist of a 4-byte paramid (location of param in execution time internal
336  * parameter array) and then the datum as serialized by datumSerialize().
337  */
338 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)339 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
340 {
341 	Size		size;
342 	int			nparams;
343 	int			paramid;
344 	ParamExecData *prm;
345 	dsa_pointer handle;
346 	char	   *start_address;
347 
348 	/* Allocate enough space for the current parameter values. */
349 	size = EstimateParamExecSpace(estate, params);
350 	handle = dsa_allocate(area, size);
351 	start_address = dsa_get_address(area, handle);
352 
353 	/* First write the number of parameters as a 4-byte integer. */
354 	nparams = bms_num_members(params);
355 	memcpy(start_address, &nparams, sizeof(int));
356 	start_address += sizeof(int);
357 
358 	/* Write details for each parameter in turn. */
359 	paramid = -1;
360 	while ((paramid = bms_next_member(params, paramid)) >= 0)
361 	{
362 		Oid			typeOid;
363 		int16		typLen;
364 		bool		typByVal;
365 
366 		prm = &(estate->es_param_exec_vals[paramid]);
367 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
368 							   paramid);
369 
370 		/* Write paramid. */
371 		memcpy(start_address, &paramid, sizeof(int));
372 		start_address += sizeof(int);
373 
374 		/* Write datum/isnull */
375 		if (OidIsValid(typeOid))
376 			get_typlenbyval(typeOid, &typLen, &typByVal);
377 		else
378 		{
379 			/* If no type OID, assume by-value, like copyParamList does. */
380 			typLen = sizeof(Datum);
381 			typByVal = true;
382 		}
383 		datumSerialize(prm->value, prm->isnull, typByVal, typLen,
384 					   &start_address);
385 	}
386 
387 	return handle;
388 }
389 
390 /*
391  * Restore specified PARAM_EXEC parameters.
392  */
393 static void
RestoreParamExecParams(char * start_address,EState * estate)394 RestoreParamExecParams(char *start_address, EState *estate)
395 {
396 	int			nparams;
397 	int			i;
398 	int			paramid;
399 
400 	memcpy(&nparams, start_address, sizeof(int));
401 	start_address += sizeof(int);
402 
403 	for (i = 0; i < nparams; i++)
404 	{
405 		ParamExecData *prm;
406 
407 		/* Read paramid */
408 		memcpy(&paramid, start_address, sizeof(int));
409 		start_address += sizeof(int);
410 		prm = &(estate->es_param_exec_vals[paramid]);
411 
412 		/* Read datum/isnull. */
413 		prm->value = datumRestore(&start_address, &prm->isnull);
414 		prm->execPlan = NULL;
415 	}
416 }
417 
418 /*
419  * Initialize the dynamic shared memory segment that will be used to control
420  * parallel execution.
421  */
422 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)423 ExecParallelInitializeDSM(PlanState *planstate,
424 						  ExecParallelInitializeDSMContext *d)
425 {
426 	if (planstate == NULL)
427 		return false;
428 
429 	/* If instrumentation is enabled, initialize slot for this node. */
430 	if (d->instrumentation != NULL)
431 		d->instrumentation->plan_node_id[d->nnodes] =
432 			planstate->plan->plan_node_id;
433 
434 	/* Count this node. */
435 	d->nnodes++;
436 
437 	/*
438 	 * Call initializers for DSM-using plan nodes.
439 	 *
440 	 * Most plan nodes won't do anything here, but plan nodes that allocated
441 	 * DSM may need to initialize shared state in the DSM before parallel
442 	 * workers are launched.  They can allocate the space they previously
443 	 * estimated using shm_toc_allocate, and add the keys they previously
444 	 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
445 	 */
446 	switch (nodeTag(planstate))
447 	{
448 		case T_SeqScanState:
449 			if (planstate->plan->parallel_aware)
450 				ExecSeqScanInitializeDSM((SeqScanState *) planstate,
451 										 d->pcxt);
452 			break;
453 		case T_IndexScanState:
454 			if (planstate->plan->parallel_aware)
455 				ExecIndexScanInitializeDSM((IndexScanState *) planstate,
456 										   d->pcxt);
457 			break;
458 		case T_IndexOnlyScanState:
459 			if (planstate->plan->parallel_aware)
460 				ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
461 											   d->pcxt);
462 			break;
463 		case T_ForeignScanState:
464 			if (planstate->plan->parallel_aware)
465 				ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
466 											 d->pcxt);
467 			break;
468 		case T_AppendState:
469 			if (planstate->plan->parallel_aware)
470 				ExecAppendInitializeDSM((AppendState *) planstate,
471 										d->pcxt);
472 			break;
473 		case T_CustomScanState:
474 			if (planstate->plan->parallel_aware)
475 				ExecCustomScanInitializeDSM((CustomScanState *) planstate,
476 											d->pcxt);
477 			break;
478 		case T_BitmapHeapScanState:
479 			if (planstate->plan->parallel_aware)
480 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
481 											d->pcxt);
482 			break;
483 		case T_HashJoinState:
484 			if (planstate->plan->parallel_aware)
485 				ExecHashJoinInitializeDSM((HashJoinState *) planstate,
486 										  d->pcxt);
487 			break;
488 		case T_HashState:
489 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
490 			ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
491 			break;
492 		case T_SortState:
493 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
494 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
495 			break;
496 
497 		default:
498 			break;
499 	}
500 
501 	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
502 }
503 
504 /*
505  * It sets up the response queues for backend workers to return tuples
506  * to the main backend and start the workers.
507  */
508 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)509 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
510 {
511 	shm_mq_handle **responseq;
512 	char	   *tqueuespace;
513 	int			i;
514 
515 	/* Skip this if no workers. */
516 	if (pcxt->nworkers == 0)
517 		return NULL;
518 
519 	/* Allocate memory for shared memory queue handles. */
520 	responseq = (shm_mq_handle **)
521 		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
522 
523 	/*
524 	 * If not reinitializing, allocate space from the DSM for the queues;
525 	 * otherwise, find the already allocated space.
526 	 */
527 	if (!reinitialize)
528 		tqueuespace =
529 			shm_toc_allocate(pcxt->toc,
530 							 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
531 									  pcxt->nworkers));
532 	else
533 		tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
534 
535 	/* Create the queues, and become the receiver for each. */
536 	for (i = 0; i < pcxt->nworkers; ++i)
537 	{
538 		shm_mq	   *mq;
539 
540 		mq = shm_mq_create(tqueuespace +
541 						   ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
542 						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
543 
544 		shm_mq_set_receiver(mq, MyProc);
545 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
546 	}
547 
548 	/* Add array of queues to shm_toc, so others can find it. */
549 	if (!reinitialize)
550 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
551 
552 	/* Return array of handles. */
553 	return responseq;
554 }
555 
556 /*
557  * Sets up the required infrastructure for backend workers to perform
558  * execution and return results to the main backend.
559  */
560 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)561 ExecInitParallelPlan(PlanState *planstate, EState *estate,
562 					 Bitmapset *sendParams, int nworkers,
563 					 int64 tuples_needed)
564 {
565 	ParallelExecutorInfo *pei;
566 	ParallelContext *pcxt;
567 	ExecParallelEstimateContext e;
568 	ExecParallelInitializeDSMContext d;
569 	FixedParallelExecutorState *fpes;
570 	char	   *pstmt_data;
571 	char	   *pstmt_space;
572 	char	   *paramlistinfo_space;
573 	BufferUsage *bufusage_space;
574 	SharedExecutorInstrumentation *instrumentation = NULL;
575 	SharedJitInstrumentation *jit_instrumentation = NULL;
576 	int			pstmt_len;
577 	int			paramlistinfo_len;
578 	int			instrumentation_len = 0;
579 	int			jit_instrumentation_len = 0;
580 	int			instrument_offset = 0;
581 	Size		dsa_minsize = dsa_minimum_size();
582 	char	   *query_string;
583 	int			query_len;
584 
585 	/*
586 	 * Force any initplan outputs that we're going to pass to workers to be
587 	 * evaluated, if they weren't already.
588 	 *
589 	 * For simplicity, we use the EState's per-output-tuple ExprContext here.
590 	 * That risks intra-query memory leakage, since we might pass through here
591 	 * many times before that ExprContext gets reset; but ExecSetParamPlan
592 	 * doesn't normally leak any memory in the context (see its comments), so
593 	 * it doesn't seem worth complicating this function's API to pass it a
594 	 * shorter-lived ExprContext.  This might need to change someday.
595 	 */
596 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
597 
598 	/* Allocate object for return value. */
599 	pei = palloc0(sizeof(ParallelExecutorInfo));
600 	pei->finished = false;
601 	pei->planstate = planstate;
602 
603 	/* Fix up and serialize plan to be sent to workers. */
604 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
605 
606 	/* Create a parallel context. */
607 	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
608 	pei->pcxt = pcxt;
609 
610 	/*
611 	 * Before telling the parallel context to create a dynamic shared memory
612 	 * segment, we need to figure out how big it should be.  Estimate space
613 	 * for the various things we need to store.
614 	 */
615 
616 	/* Estimate space for fixed-size state. */
617 	shm_toc_estimate_chunk(&pcxt->estimator,
618 						   sizeof(FixedParallelExecutorState));
619 	shm_toc_estimate_keys(&pcxt->estimator, 1);
620 
621 	/* Estimate space for query text. */
622 	query_len = strlen(estate->es_sourceText);
623 	shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
624 	shm_toc_estimate_keys(&pcxt->estimator, 1);
625 
626 	/* Estimate space for serialized PlannedStmt. */
627 	pstmt_len = strlen(pstmt_data) + 1;
628 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
629 	shm_toc_estimate_keys(&pcxt->estimator, 1);
630 
631 	/* Estimate space for serialized ParamListInfo. */
632 	paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
633 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
634 	shm_toc_estimate_keys(&pcxt->estimator, 1);
635 
636 	/*
637 	 * Estimate space for BufferUsage.
638 	 *
639 	 * If EXPLAIN is not in use and there are no extensions loaded that care,
640 	 * we could skip this.  But we have no way of knowing whether anyone's
641 	 * looking at pgBufferUsage, so do it unconditionally.
642 	 */
643 	shm_toc_estimate_chunk(&pcxt->estimator,
644 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
645 	shm_toc_estimate_keys(&pcxt->estimator, 1);
646 
647 	/* Estimate space for tuple queues. */
648 	shm_toc_estimate_chunk(&pcxt->estimator,
649 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
650 	shm_toc_estimate_keys(&pcxt->estimator, 1);
651 
652 	/*
653 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
654 	 * count of how many PlanState nodes there are.
655 	 */
656 	e.pcxt = pcxt;
657 	e.nnodes = 0;
658 	ExecParallelEstimate(planstate, &e);
659 
660 	/* Estimate space for instrumentation, if required. */
661 	if (estate->es_instrument)
662 	{
663 		instrumentation_len =
664 			offsetof(SharedExecutorInstrumentation, plan_node_id) +
665 			sizeof(int) * e.nnodes;
666 		instrumentation_len = MAXALIGN(instrumentation_len);
667 		instrument_offset = instrumentation_len;
668 		instrumentation_len +=
669 			mul_size(sizeof(Instrumentation),
670 					 mul_size(e.nnodes, nworkers));
671 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
672 		shm_toc_estimate_keys(&pcxt->estimator, 1);
673 
674 		/* Estimate space for JIT instrumentation, if required. */
675 		if (estate->es_jit_flags != PGJIT_NONE)
676 		{
677 			jit_instrumentation_len =
678 				offsetof(SharedJitInstrumentation, jit_instr) +
679 				sizeof(JitInstrumentation) * nworkers;
680 			shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
681 			shm_toc_estimate_keys(&pcxt->estimator, 1);
682 		}
683 	}
684 
685 	/* Estimate space for DSA area. */
686 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
687 	shm_toc_estimate_keys(&pcxt->estimator, 1);
688 
689 	/* Everyone's had a chance to ask for space, so now create the DSM. */
690 	InitializeParallelDSM(pcxt);
691 
692 	/*
693 	 * OK, now we have a dynamic shared memory segment, and it should be big
694 	 * enough to store all of the data we estimated we would want to put into
695 	 * it, plus whatever general stuff (not specifically executor-related) the
696 	 * ParallelContext itself needs to store there.  None of the space we
697 	 * asked for has been allocated or initialized yet, though, so do that.
698 	 */
699 
700 	/* Store fixed-size state. */
701 	fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
702 	fpes->tuples_needed = tuples_needed;
703 	fpes->param_exec = InvalidDsaPointer;
704 	fpes->eflags = estate->es_top_eflags;
705 	fpes->jit_flags = estate->es_jit_flags;
706 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
707 
708 	/* Store query string */
709 	query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
710 	memcpy(query_string, estate->es_sourceText, query_len + 1);
711 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
712 
713 	/* Store serialized PlannedStmt. */
714 	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
715 	memcpy(pstmt_space, pstmt_data, pstmt_len);
716 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
717 
718 	/* Store serialized ParamListInfo. */
719 	paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
720 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
721 	SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
722 
723 	/* Allocate space for each worker's BufferUsage; no need to initialize. */
724 	bufusage_space = shm_toc_allocate(pcxt->toc,
725 									  mul_size(sizeof(BufferUsage), pcxt->nworkers));
726 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
727 	pei->buffer_usage = bufusage_space;
728 
729 	/* Set up the tuple queues that the workers will write into. */
730 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
731 
732 	/* We don't need the TupleQueueReaders yet, though. */
733 	pei->reader = NULL;
734 
735 	/*
736 	 * If instrumentation options were supplied, allocate space for the data.
737 	 * It only gets partially initialized here; the rest happens during
738 	 * ExecParallelInitializeDSM.
739 	 */
740 	if (estate->es_instrument)
741 	{
742 		Instrumentation *instrument;
743 		int			i;
744 
745 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
746 		instrumentation->instrument_options = estate->es_instrument;
747 		instrumentation->instrument_offset = instrument_offset;
748 		instrumentation->num_workers = nworkers;
749 		instrumentation->num_plan_nodes = e.nnodes;
750 		instrument = GetInstrumentationArray(instrumentation);
751 		for (i = 0; i < nworkers * e.nnodes; ++i)
752 			InstrInit(&instrument[i], estate->es_instrument);
753 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
754 					   instrumentation);
755 		pei->instrumentation = instrumentation;
756 
757 		if (estate->es_jit_flags != PGJIT_NONE)
758 		{
759 			jit_instrumentation = shm_toc_allocate(pcxt->toc,
760 												   jit_instrumentation_len);
761 			jit_instrumentation->num_workers = nworkers;
762 			memset(jit_instrumentation->jit_instr, 0,
763 				   sizeof(JitInstrumentation) * nworkers);
764 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
765 						   jit_instrumentation);
766 			pei->jit_instrumentation = jit_instrumentation;
767 		}
768 	}
769 
770 	/*
771 	 * Create a DSA area that can be used by the leader and all workers.
772 	 * (However, if we failed to create a DSM and are using private memory
773 	 * instead, then skip this.)
774 	 */
775 	if (pcxt->seg != NULL)
776 	{
777 		char	   *area_space;
778 
779 		area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
780 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
781 		pei->area = dsa_create_in_place(area_space, dsa_minsize,
782 										LWTRANCHE_PARALLEL_QUERY_DSA,
783 										pcxt->seg);
784 
785 		/*
786 		 * Serialize parameters, if any, using DSA storage.  We don't dare use
787 		 * the main parallel query DSM for this because we might relaunch
788 		 * workers after the values have changed (and thus the amount of
789 		 * storage required has changed).
790 		 */
791 		if (!bms_is_empty(sendParams))
792 		{
793 			pei->param_exec = SerializeParamExecParams(estate, sendParams,
794 													   pei->area);
795 			fpes->param_exec = pei->param_exec;
796 		}
797 	}
798 
799 	/*
800 	 * Give parallel-aware nodes a chance to initialize their shared data.
801 	 * This also initializes the elements of instrumentation->ps_instrument,
802 	 * if it exists.
803 	 */
804 	d.pcxt = pcxt;
805 	d.instrumentation = instrumentation;
806 	d.nnodes = 0;
807 
808 	/* Install our DSA area while initializing the plan. */
809 	estate->es_query_dsa = pei->area;
810 	ExecParallelInitializeDSM(planstate, &d);
811 	estate->es_query_dsa = NULL;
812 
813 	/*
814 	 * Make sure that the world hasn't shifted under our feet.  This could
815 	 * probably just be an Assert(), but let's be conservative for now.
816 	 */
817 	if (e.nnodes != d.nnodes)
818 		elog(ERROR, "inconsistent count of PlanState nodes");
819 
820 	/* OK, we're ready to rock and roll. */
821 	return pei;
822 }
823 
824 /*
825  * Set up tuple queue readers to read the results of a parallel subplan.
826  *
827  * This is separate from ExecInitParallelPlan() because we can launch the
828  * worker processes and let them start doing something before we do this.
829  */
830 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)831 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
832 {
833 	int			nworkers = pei->pcxt->nworkers_launched;
834 	int			i;
835 
836 	Assert(pei->reader == NULL);
837 
838 	if (nworkers > 0)
839 	{
840 		pei->reader = (TupleQueueReader **)
841 			palloc(nworkers * sizeof(TupleQueueReader *));
842 
843 		for (i = 0; i < nworkers; i++)
844 		{
845 			shm_mq_set_handle(pei->tqueue[i],
846 							  pei->pcxt->worker[i].bgwhandle);
847 			pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
848 		}
849 	}
850 }
851 
852 /*
853  * Re-initialize the parallel executor shared memory state before launching
854  * a fresh batch of workers.
855  */
856 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)857 ExecParallelReinitialize(PlanState *planstate,
858 						 ParallelExecutorInfo *pei,
859 						 Bitmapset *sendParams)
860 {
861 	EState	   *estate = planstate->state;
862 	FixedParallelExecutorState *fpes;
863 
864 	/* Old workers must already be shut down */
865 	Assert(pei->finished);
866 
867 	/*
868 	 * Force any initplan outputs that we're going to pass to workers to be
869 	 * evaluated, if they weren't already (see comments in
870 	 * ExecInitParallelPlan).
871 	 */
872 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
873 
874 	ReinitializeParallelDSM(pei->pcxt);
875 	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
876 	pei->reader = NULL;
877 	pei->finished = false;
878 
879 	fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
880 
881 	/* Free any serialized parameters from the last round. */
882 	if (DsaPointerIsValid(fpes->param_exec))
883 	{
884 		dsa_free(pei->area, fpes->param_exec);
885 		fpes->param_exec = InvalidDsaPointer;
886 	}
887 
888 	/* Serialize current parameter values if required. */
889 	if (!bms_is_empty(sendParams))
890 	{
891 		pei->param_exec = SerializeParamExecParams(estate, sendParams,
892 												   pei->area);
893 		fpes->param_exec = pei->param_exec;
894 	}
895 
896 	/* Traverse plan tree and let each child node reset associated state. */
897 	estate->es_query_dsa = pei->area;
898 	ExecParallelReInitializeDSM(planstate, pei->pcxt);
899 	estate->es_query_dsa = NULL;
900 }
901 
902 /*
903  * Traverse plan tree to reinitialize per-node dynamic shared memory state
904  */
905 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)906 ExecParallelReInitializeDSM(PlanState *planstate,
907 							ParallelContext *pcxt)
908 {
909 	if (planstate == NULL)
910 		return false;
911 
912 	/*
913 	 * Call reinitializers for DSM-using plan nodes.
914 	 */
915 	switch (nodeTag(planstate))
916 	{
917 		case T_SeqScanState:
918 			if (planstate->plan->parallel_aware)
919 				ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
920 										   pcxt);
921 			break;
922 		case T_IndexScanState:
923 			if (planstate->plan->parallel_aware)
924 				ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
925 											 pcxt);
926 			break;
927 		case T_IndexOnlyScanState:
928 			if (planstate->plan->parallel_aware)
929 				ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
930 												 pcxt);
931 			break;
932 		case T_ForeignScanState:
933 			if (planstate->plan->parallel_aware)
934 				ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
935 											   pcxt);
936 			break;
937 		case T_AppendState:
938 			if (planstate->plan->parallel_aware)
939 				ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
940 			break;
941 		case T_CustomScanState:
942 			if (planstate->plan->parallel_aware)
943 				ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
944 											  pcxt);
945 			break;
946 		case T_BitmapHeapScanState:
947 			if (planstate->plan->parallel_aware)
948 				ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
949 											  pcxt);
950 			break;
951 		case T_HashJoinState:
952 			if (planstate->plan->parallel_aware)
953 				ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
954 											pcxt);
955 			break;
956 		case T_HashState:
957 		case T_SortState:
958 			/* these nodes have DSM state, but no reinitialization is required */
959 			break;
960 
961 		default:
962 			break;
963 	}
964 
965 	return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
966 }
967 
968 /*
969  * Copy instrumentation information about this node and its descendants from
970  * dynamic shared memory.
971  */
972 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)973 ExecParallelRetrieveInstrumentation(PlanState *planstate,
974 									SharedExecutorInstrumentation *instrumentation)
975 {
976 	Instrumentation *instrument;
977 	int			i;
978 	int			n;
979 	int			ibytes;
980 	int			plan_node_id = planstate->plan->plan_node_id;
981 	MemoryContext oldcontext;
982 
983 	/* Find the instrumentation for this node. */
984 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
985 		if (instrumentation->plan_node_id[i] == plan_node_id)
986 			break;
987 	if (i >= instrumentation->num_plan_nodes)
988 		elog(ERROR, "plan node %d not found", plan_node_id);
989 
990 	/* Accumulate the statistics from all workers. */
991 	instrument = GetInstrumentationArray(instrumentation);
992 	instrument += i * instrumentation->num_workers;
993 	for (n = 0; n < instrumentation->num_workers; ++n)
994 		InstrAggNode(planstate->instrument, &instrument[n]);
995 
996 	/*
997 	 * Also store the per-worker detail.
998 	 *
999 	 * Worker instrumentation should be allocated in the same context as the
1000 	 * regular instrumentation information, which is the per-query context.
1001 	 * Switch into per-query memory context.
1002 	 */
1003 	oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1004 	ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1005 	planstate->worker_instrument =
1006 		palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1007 	MemoryContextSwitchTo(oldcontext);
1008 
1009 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
1010 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1011 
1012 	/* Perform any node-type-specific work that needs to be done. */
1013 	switch (nodeTag(planstate))
1014 	{
1015 		case T_SortState:
1016 			ExecSortRetrieveInstrumentation((SortState *) planstate);
1017 			break;
1018 		case T_HashState:
1019 			ExecHashRetrieveInstrumentation((HashState *) planstate);
1020 			break;
1021 		default:
1022 			break;
1023 	}
1024 
1025 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1026 								 instrumentation);
1027 }
1028 
1029 /*
1030  * Add up the workers' JIT instrumentation from dynamic shared memory.
1031  */
1032 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1033 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1034 									   SharedJitInstrumentation *shared_jit)
1035 {
1036 	JitInstrumentation *combined;
1037 	int			ibytes;
1038 
1039 	int			n;
1040 
1041 	/*
1042 	 * Accumulate worker JIT instrumentation into the combined JIT
1043 	 * instrumentation, allocating it if required.
1044 	 */
1045 	if (!planstate->state->es_jit_worker_instr)
1046 		planstate->state->es_jit_worker_instr =
1047 			MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1048 	combined = planstate->state->es_jit_worker_instr;
1049 
1050 	/* Accumulate all the workers' instrumentations. */
1051 	for (n = 0; n < shared_jit->num_workers; ++n)
1052 		InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1053 
1054 	/*
1055 	 * Store the per-worker detail.
1056 	 *
1057 	 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1058 	 * instrumentation in per-query context.
1059 	 */
1060 	ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1061 		+ mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1062 	planstate->worker_jit_instrument =
1063 		MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1064 
1065 	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1066 }
1067 
1068 /*
1069  * Finish parallel execution.  We wait for parallel workers to finish, and
1070  * accumulate their buffer usage.
1071  */
1072 void
ExecParallelFinish(ParallelExecutorInfo * pei)1073 ExecParallelFinish(ParallelExecutorInfo *pei)
1074 {
1075 	int			nworkers = pei->pcxt->nworkers_launched;
1076 	int			i;
1077 
1078 	/* Make this be a no-op if called twice in a row. */
1079 	if (pei->finished)
1080 		return;
1081 
1082 	/*
1083 	 * Detach from tuple queues ASAP, so that any still-active workers will
1084 	 * notice that no further results are wanted.
1085 	 */
1086 	if (pei->tqueue != NULL)
1087 	{
1088 		for (i = 0; i < nworkers; i++)
1089 			shm_mq_detach(pei->tqueue[i]);
1090 		pfree(pei->tqueue);
1091 		pei->tqueue = NULL;
1092 	}
1093 
1094 	/*
1095 	 * While we're waiting for the workers to finish, let's get rid of the
1096 	 * tuple queue readers.  (Any other local cleanup could be done here too.)
1097 	 */
1098 	if (pei->reader != NULL)
1099 	{
1100 		for (i = 0; i < nworkers; i++)
1101 			DestroyTupleQueueReader(pei->reader[i]);
1102 		pfree(pei->reader);
1103 		pei->reader = NULL;
1104 	}
1105 
1106 	/* Now wait for the workers to finish. */
1107 	WaitForParallelWorkersToFinish(pei->pcxt);
1108 
1109 	/*
1110 	 * Next, accumulate buffer usage.  (This must wait for the workers to
1111 	 * finish, or we might get incomplete data.)
1112 	 */
1113 	for (i = 0; i < nworkers; i++)
1114 		InstrAccumParallelQuery(&pei->buffer_usage[i]);
1115 
1116 	pei->finished = true;
1117 }
1118 
1119 /*
1120  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1121  * resources still exist after ExecParallelFinish.  We separate these
1122  * routines because someone might want to examine the contents of the DSM
1123  * after ExecParallelFinish and before calling this routine.
1124  */
1125 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1126 ExecParallelCleanup(ParallelExecutorInfo *pei)
1127 {
1128 	/* Accumulate instrumentation, if any. */
1129 	if (pei->instrumentation)
1130 		ExecParallelRetrieveInstrumentation(pei->planstate,
1131 											pei->instrumentation);
1132 
1133 	/* Accumulate JIT instrumentation, if any. */
1134 	if (pei->jit_instrumentation)
1135 		ExecParallelRetrieveJitInstrumentation(pei->planstate,
1136 											   pei->jit_instrumentation);
1137 
1138 	/* Free any serialized parameters. */
1139 	if (DsaPointerIsValid(pei->param_exec))
1140 	{
1141 		dsa_free(pei->area, pei->param_exec);
1142 		pei->param_exec = InvalidDsaPointer;
1143 	}
1144 	if (pei->area != NULL)
1145 	{
1146 		dsa_detach(pei->area);
1147 		pei->area = NULL;
1148 	}
1149 	if (pei->pcxt != NULL)
1150 	{
1151 		DestroyParallelContext(pei->pcxt);
1152 		pei->pcxt = NULL;
1153 	}
1154 	pfree(pei);
1155 }
1156 
1157 /*
1158  * Create a DestReceiver to write tuples we produce to the shm_mq designated
1159  * for that purpose.
1160  */
1161 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1162 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1163 {
1164 	char	   *mqspace;
1165 	shm_mq	   *mq;
1166 
1167 	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1168 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1169 	mq = (shm_mq *) mqspace;
1170 	shm_mq_set_sender(mq, MyProc);
1171 	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1172 }
1173 
1174 /*
1175  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1176  */
1177 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1178 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1179 						 int instrument_options)
1180 {
1181 	char	   *pstmtspace;
1182 	char	   *paramspace;
1183 	PlannedStmt *pstmt;
1184 	ParamListInfo paramLI;
1185 	char	   *queryString;
1186 
1187 	/* Get the query string from shared memory */
1188 	queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1189 
1190 	/* Reconstruct leader-supplied PlannedStmt. */
1191 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1192 	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1193 
1194 	/* Reconstruct ParamListInfo. */
1195 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1196 	paramLI = RestoreParamList(&paramspace);
1197 
1198 	/* Create a QueryDesc for the query. */
1199 	return CreateQueryDesc(pstmt,
1200 						   queryString,
1201 						   GetActiveSnapshot(), InvalidSnapshot,
1202 						   receiver, paramLI, NULL, instrument_options);
1203 }
1204 
1205 /*
1206  * Copy instrumentation information from this node and its descendants into
1207  * dynamic shared memory, so that the parallel leader can retrieve it.
1208  */
1209 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1210 ExecParallelReportInstrumentation(PlanState *planstate,
1211 								  SharedExecutorInstrumentation *instrumentation)
1212 {
1213 	int			i;
1214 	int			plan_node_id = planstate->plan->plan_node_id;
1215 	Instrumentation *instrument;
1216 
1217 	InstrEndLoop(planstate->instrument);
1218 
1219 	/*
1220 	 * If we shuffled the plan_node_id values in ps_instrument into sorted
1221 	 * order, we could use binary search here.  This might matter someday if
1222 	 * we're pushing down sufficiently large plan trees.  For now, do it the
1223 	 * slow, dumb way.
1224 	 */
1225 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1226 		if (instrumentation->plan_node_id[i] == plan_node_id)
1227 			break;
1228 	if (i >= instrumentation->num_plan_nodes)
1229 		elog(ERROR, "plan node %d not found", plan_node_id);
1230 
1231 	/*
1232 	 * Add our statistics to the per-node, per-worker totals.  It's possible
1233 	 * that this could happen more than once if we relaunched workers.
1234 	 */
1235 	instrument = GetInstrumentationArray(instrumentation);
1236 	instrument += i * instrumentation->num_workers;
1237 	Assert(IsParallelWorker());
1238 	Assert(ParallelWorkerNumber < instrumentation->num_workers);
1239 	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1240 
1241 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1242 								 instrumentation);
1243 }
1244 
1245 /*
1246  * Initialize the PlanState and its descendants with the information
1247  * retrieved from shared memory.  This has to be done once the PlanState
1248  * is allocated and initialized by executor; that is, after ExecutorStart().
1249  */
1250 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1251 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1252 {
1253 	if (planstate == NULL)
1254 		return false;
1255 
1256 	switch (nodeTag(planstate))
1257 	{
1258 		case T_SeqScanState:
1259 			if (planstate->plan->parallel_aware)
1260 				ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1261 			break;
1262 		case T_IndexScanState:
1263 			if (planstate->plan->parallel_aware)
1264 				ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1265 											  pwcxt);
1266 			break;
1267 		case T_IndexOnlyScanState:
1268 			if (planstate->plan->parallel_aware)
1269 				ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1270 												  pwcxt);
1271 			break;
1272 		case T_ForeignScanState:
1273 			if (planstate->plan->parallel_aware)
1274 				ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1275 												pwcxt);
1276 			break;
1277 		case T_AppendState:
1278 			if (planstate->plan->parallel_aware)
1279 				ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1280 			break;
1281 		case T_CustomScanState:
1282 			if (planstate->plan->parallel_aware)
1283 				ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1284 											   pwcxt);
1285 			break;
1286 		case T_BitmapHeapScanState:
1287 			if (planstate->plan->parallel_aware)
1288 				ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1289 											   pwcxt);
1290 			break;
1291 		case T_HashJoinState:
1292 			if (planstate->plan->parallel_aware)
1293 				ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1294 											 pwcxt);
1295 			break;
1296 		case T_HashState:
1297 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1298 			ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1299 			break;
1300 		case T_SortState:
1301 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1302 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1303 			break;
1304 
1305 		default:
1306 			break;
1307 	}
1308 
1309 	return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1310 								 pwcxt);
1311 }
1312 
1313 /*
1314  * Main entrypoint for parallel query worker processes.
1315  *
1316  * We reach this function from ParallelWorkerMain, so the setup necessary to
1317  * create a sensible parallel environment has already been done;
1318  * ParallelWorkerMain worries about stuff like the transaction state, combo
1319  * CID mappings, and GUC values, so we don't need to deal with any of that
1320  * here.
1321  *
1322  * Our job is to deal with concerns specific to the executor.  The parallel
1323  * group leader will have stored a serialized PlannedStmt, and it's our job
1324  * to execute that plan and write the resulting tuples to the appropriate
1325  * tuple queue.  Various bits of supporting information that we need in order
1326  * to do this are also stored in the dsm_segment and can be accessed through
1327  * the shm_toc.
1328  */
1329 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1330 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1331 {
1332 	FixedParallelExecutorState *fpes;
1333 	BufferUsage *buffer_usage;
1334 	DestReceiver *receiver;
1335 	QueryDesc  *queryDesc;
1336 	SharedExecutorInstrumentation *instrumentation;
1337 	SharedJitInstrumentation *jit_instrumentation;
1338 	int			instrument_options = 0;
1339 	void	   *area_space;
1340 	dsa_area   *area;
1341 	ParallelWorkerContext pwcxt;
1342 
1343 	/* Get fixed-size state. */
1344 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1345 
1346 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1347 	receiver = ExecParallelGetReceiver(seg, toc);
1348 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1349 	if (instrumentation != NULL)
1350 		instrument_options = instrumentation->instrument_options;
1351 	jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1352 										 true);
1353 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1354 
1355 	/* Setting debug_query_string for individual workers */
1356 	debug_query_string = queryDesc->sourceText;
1357 
1358 	/* Report workers' query for monitoring purposes */
1359 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
1360 
1361 	/* Attach to the dynamic shared memory area. */
1362 	area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1363 	area = dsa_attach_in_place(area_space, seg);
1364 
1365 	/* Start up the executor */
1366 	queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1367 	ExecutorStart(queryDesc, fpes->eflags);
1368 
1369 	/* Special executor initialization steps for parallel workers */
1370 	queryDesc->planstate->state->es_query_dsa = area;
1371 	if (DsaPointerIsValid(fpes->param_exec))
1372 	{
1373 		char	   *paramexec_space;
1374 
1375 		paramexec_space = dsa_get_address(area, fpes->param_exec);
1376 		RestoreParamExecParams(paramexec_space, queryDesc->estate);
1377 
1378 	}
1379 	pwcxt.toc = toc;
1380 	pwcxt.seg = seg;
1381 	ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1382 
1383 	/* Pass down any tuple bound */
1384 	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1385 
1386 	/*
1387 	 * Prepare to track buffer usage during query execution.
1388 	 *
1389 	 * We do this after starting up the executor to match what happens in the
1390 	 * leader, which also doesn't count buffer accesses that occur during
1391 	 * executor startup.
1392 	 */
1393 	InstrStartParallelQuery();
1394 
1395 	/*
1396 	 * Run the plan.  If we specified a tuple bound, be careful not to demand
1397 	 * more tuples than that.
1398 	 */
1399 	ExecutorRun(queryDesc,
1400 				ForwardScanDirection,
1401 				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1402 				true);
1403 
1404 	/* Shut down the executor */
1405 	ExecutorFinish(queryDesc);
1406 
1407 	/* Report buffer usage during parallel execution. */
1408 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1409 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
1410 
1411 	/* Report instrumentation data if any instrumentation options are set. */
1412 	if (instrumentation != NULL)
1413 		ExecParallelReportInstrumentation(queryDesc->planstate,
1414 										  instrumentation);
1415 
1416 	/* Report JIT instrumentation data if any */
1417 	if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1418 	{
1419 		Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1420 		jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1421 			queryDesc->estate->es_jit->instr;
1422 	}
1423 
1424 	/* Must do this after capturing instrumentation. */
1425 	ExecutorEnd(queryDesc);
1426 
1427 	/* Cleanup. */
1428 	dsa_detach(area);
1429 	FreeQueryDesc(queryDesc);
1430 	receiver->rDestroy(receiver);
1431 }
1432