1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *	  Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *	  src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAppend.h"
29 #include "executor/nodeBitmapHeapscan.h"
30 #include "executor/nodeCustom.h"
31 #include "executor/nodeForeignscan.h"
32 #include "executor/nodeHash.h"
33 #include "executor/nodeHashjoin.h"
34 #include "executor/nodeIndexscan.h"
35 #include "executor/nodeIndexonlyscan.h"
36 #include "executor/nodeSeqscan.h"
37 #include "executor/nodeSort.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "jit/jit.h"
41 #include "nodes/nodeFuncs.h"
42 #include "optimizer/planmain.h"
43 #include "optimizer/planner.h"
44 #include "storage/spin.h"
45 #include "tcop/tcopprot.h"
46 #include "utils/datum.h"
47 #include "utils/dsa.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/snapmgr.h"
51 #include "pgstat.h"
52 
53 /*
54  * Magic numbers for parallel executor communication.  We use constants
55  * greater than any 32-bit integer here so that values < 2^32 can be used
56  * by individual parallel nodes to store their own state.
57  */
58 #define PARALLEL_KEY_EXECUTOR_FIXED		UINT64CONST(0xE000000000000001)
59 #define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000002)
60 #define PARALLEL_KEY_PARAMLISTINFO		UINT64CONST(0xE000000000000003)
61 #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000004)
62 #define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000005)
63 #define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000006)
64 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
65 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
66 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67 
68 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
69 
70 /*
71  * Fixed-size random stuff that we need to pass to parallel workers.
72  */
73 typedef struct FixedParallelExecutorState
74 {
75 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
76 	dsa_pointer param_exec;
77 	int			eflags;
78 	int			jit_flags;
79 } FixedParallelExecutorState;
80 
81 /*
82  * DSM structure for accumulating per-PlanState instrumentation.
83  *
84  * instrument_options: Same meaning here as in instrument.c.
85  *
86  * instrument_offset: Offset, relative to the start of this structure,
87  * of the first Instrumentation object.  This will depend on the length of
88  * the plan_node_id array.
89  *
90  * num_workers: Number of workers.
91  *
92  * num_plan_nodes: Number of plan nodes.
93  *
94  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
95  * from parallel workers.  The length of this array is given by num_plan_nodes.
96  */
97 struct SharedExecutorInstrumentation
98 {
99 	int			instrument_options;
100 	int			instrument_offset;
101 	int			num_workers;
102 	int			num_plan_nodes;
103 	int			plan_node_id[FLEXIBLE_ARRAY_MEMBER];
104 	/* array of num_plan_nodes * num_workers Instrumentation objects follows */
105 };
106 #define GetInstrumentationArray(sei) \
107 	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
108 	 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
109 
110 /* Context object for ExecParallelEstimate. */
111 typedef struct ExecParallelEstimateContext
112 {
113 	ParallelContext *pcxt;
114 	int			nnodes;
115 } ExecParallelEstimateContext;
116 
117 /* Context object for ExecParallelInitializeDSM. */
118 typedef struct ExecParallelInitializeDSMContext
119 {
120 	ParallelContext *pcxt;
121 	SharedExecutorInstrumentation *instrumentation;
122 	int			nnodes;
123 } ExecParallelInitializeDSMContext;
124 
125 /* Helper functions that run in the parallel leader. */
126 static char *ExecSerializePlan(Plan *plan, EState *estate);
127 static bool ExecParallelEstimate(PlanState *node,
128 					 ExecParallelEstimateContext *e);
129 static bool ExecParallelInitializeDSM(PlanState *node,
130 						  ExecParallelInitializeDSMContext *d);
131 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
132 							 bool reinitialize);
133 static bool ExecParallelReInitializeDSM(PlanState *planstate,
134 							ParallelContext *pcxt);
135 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
136 									SharedExecutorInstrumentation *instrumentation);
137 
138 /* Helper function that runs in the parallel worker. */
139 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
140 
141 /*
142  * Create a serialized representation of the plan to be sent to each worker.
143  */
144 static char *
ExecSerializePlan(Plan * plan,EState * estate)145 ExecSerializePlan(Plan *plan, EState *estate)
146 {
147 	PlannedStmt *pstmt;
148 	ListCell   *lc;
149 
150 	/* We can't scribble on the original plan, so make a copy. */
151 	plan = copyObject(plan);
152 
153 	/*
154 	 * The worker will start its own copy of the executor, and that copy will
155 	 * insert a junk filter if the toplevel node has any resjunk entries. We
156 	 * don't want that to happen, because while resjunk columns shouldn't be
157 	 * sent back to the user, here the tuples are coming back to another
158 	 * backend which may very well need them.  So mutate the target list
159 	 * accordingly.  This is sort of a hack; there might be better ways to do
160 	 * this...
161 	 */
162 	foreach(lc, plan->targetlist)
163 	{
164 		TargetEntry *tle = lfirst_node(TargetEntry, lc);
165 
166 		tle->resjunk = false;
167 	}
168 
169 	/*
170 	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
171 	 * for our purposes, but the worker will need at least a minimal
172 	 * PlannedStmt to start the executor.
173 	 */
174 	pstmt = makeNode(PlannedStmt);
175 	pstmt->commandType = CMD_SELECT;
176 	pstmt->queryId = UINT64CONST(0);
177 	pstmt->hasReturning = false;
178 	pstmt->hasModifyingCTE = false;
179 	pstmt->canSetTag = true;
180 	pstmt->transientPlan = false;
181 	pstmt->dependsOnRole = false;
182 	pstmt->parallelModeNeeded = false;
183 	pstmt->planTree = plan;
184 	pstmt->rtable = estate->es_range_table;
185 	pstmt->resultRelations = NIL;
186 	pstmt->nonleafResultRelations = NIL;
187 
188 	/*
189 	 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
190 	 * for unsafe ones (so that the list indexes of the safe ones are
191 	 * preserved).  This positively ensures that the worker won't try to run,
192 	 * or even do ExecInitNode on, an unsafe subplan.  That's important to
193 	 * protect, eg, non-parallel-aware FDWs from getting into trouble.
194 	 */
195 	pstmt->subplans = NIL;
196 	foreach(lc, estate->es_plannedstmt->subplans)
197 	{
198 		Plan	   *subplan = (Plan *) lfirst(lc);
199 
200 		if (subplan && !subplan->parallel_safe)
201 			subplan = NULL;
202 		pstmt->subplans = lappend(pstmt->subplans, subplan);
203 	}
204 
205 	pstmt->rewindPlanIDs = NULL;
206 	pstmt->rowMarks = NIL;
207 	pstmt->relationOids = NIL;
208 	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
209 	pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
210 	pstmt->utilityStmt = NULL;
211 	pstmt->stmt_location = -1;
212 	pstmt->stmt_len = -1;
213 
214 	/* Return serialized copy of our dummy PlannedStmt. */
215 	return nodeToString(pstmt);
216 }
217 
218 /*
219  * Parallel-aware plan nodes (and occasionally others) may need some state
220  * which is shared across all parallel workers.  Before we size the DSM, give
221  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
222  * &pcxt->estimator.
223  *
224  * While we're at it, count the number of PlanState nodes in the tree, so
225  * we know how many Instrumentation structures we need.
226  */
227 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)228 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
229 {
230 	if (planstate == NULL)
231 		return false;
232 
233 	/* Count this node. */
234 	e->nnodes++;
235 
236 	switch (nodeTag(planstate))
237 	{
238 		case T_SeqScanState:
239 			if (planstate->plan->parallel_aware)
240 				ExecSeqScanEstimate((SeqScanState *) planstate,
241 									e->pcxt);
242 			break;
243 		case T_IndexScanState:
244 			if (planstate->plan->parallel_aware)
245 				ExecIndexScanEstimate((IndexScanState *) planstate,
246 									  e->pcxt);
247 			break;
248 		case T_IndexOnlyScanState:
249 			if (planstate->plan->parallel_aware)
250 				ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
251 										  e->pcxt);
252 			break;
253 		case T_ForeignScanState:
254 			if (planstate->plan->parallel_aware)
255 				ExecForeignScanEstimate((ForeignScanState *) planstate,
256 										e->pcxt);
257 			break;
258 		case T_AppendState:
259 			if (planstate->plan->parallel_aware)
260 				ExecAppendEstimate((AppendState *) planstate,
261 								   e->pcxt);
262 			break;
263 		case T_CustomScanState:
264 			if (planstate->plan->parallel_aware)
265 				ExecCustomScanEstimate((CustomScanState *) planstate,
266 									   e->pcxt);
267 			break;
268 		case T_BitmapHeapScanState:
269 			if (planstate->plan->parallel_aware)
270 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
271 									   e->pcxt);
272 			break;
273 		case T_HashJoinState:
274 			if (planstate->plan->parallel_aware)
275 				ExecHashJoinEstimate((HashJoinState *) planstate,
276 									 e->pcxt);
277 			break;
278 		case T_HashState:
279 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
280 			ExecHashEstimate((HashState *) planstate, e->pcxt);
281 			break;
282 		case T_SortState:
283 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
284 			ExecSortEstimate((SortState *) planstate, e->pcxt);
285 			break;
286 
287 		default:
288 			break;
289 	}
290 
291 	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
292 }
293 
294 /*
295  * Estimate the amount of space required to serialize the indicated parameters.
296  */
297 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)298 EstimateParamExecSpace(EState *estate, Bitmapset *params)
299 {
300 	int			paramid;
301 	Size		sz = sizeof(int);
302 
303 	paramid = -1;
304 	while ((paramid = bms_next_member(params, paramid)) >= 0)
305 	{
306 		Oid			typeOid;
307 		int16		typLen;
308 		bool		typByVal;
309 		ParamExecData *prm;
310 
311 		prm = &(estate->es_param_exec_vals[paramid]);
312 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
313 							   paramid);
314 
315 		sz = add_size(sz, sizeof(int)); /* space for paramid */
316 
317 		/* space for datum/isnull */
318 		if (OidIsValid(typeOid))
319 			get_typlenbyval(typeOid, &typLen, &typByVal);
320 		else
321 		{
322 			/* If no type OID, assume by-value, like copyParamList does. */
323 			typLen = sizeof(Datum);
324 			typByVal = true;
325 		}
326 		sz = add_size(sz,
327 					  datumEstimateSpace(prm->value, prm->isnull,
328 										 typByVal, typLen));
329 	}
330 	return sz;
331 }
332 
333 /*
334  * Serialize specified PARAM_EXEC parameters.
335  *
336  * We write the number of parameters first, as a 4-byte integer, and then
337  * write details for each parameter in turn.  The details for each parameter
338  * consist of a 4-byte paramid (location of param in execution time internal
339  * parameter array) and then the datum as serialized by datumSerialize().
340  */
341 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)342 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
343 {
344 	Size		size;
345 	int			nparams;
346 	int			paramid;
347 	ParamExecData *prm;
348 	dsa_pointer handle;
349 	char	   *start_address;
350 
351 	/* Allocate enough space for the current parameter values. */
352 	size = EstimateParamExecSpace(estate, params);
353 	handle = dsa_allocate(area, size);
354 	start_address = dsa_get_address(area, handle);
355 
356 	/* First write the number of parameters as a 4-byte integer. */
357 	nparams = bms_num_members(params);
358 	memcpy(start_address, &nparams, sizeof(int));
359 	start_address += sizeof(int);
360 
361 	/* Write details for each parameter in turn. */
362 	paramid = -1;
363 	while ((paramid = bms_next_member(params, paramid)) >= 0)
364 	{
365 		Oid			typeOid;
366 		int16		typLen;
367 		bool		typByVal;
368 
369 		prm = &(estate->es_param_exec_vals[paramid]);
370 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
371 							   paramid);
372 
373 		/* Write paramid. */
374 		memcpy(start_address, &paramid, sizeof(int));
375 		start_address += sizeof(int);
376 
377 		/* Write datum/isnull */
378 		if (OidIsValid(typeOid))
379 			get_typlenbyval(typeOid, &typLen, &typByVal);
380 		else
381 		{
382 			/* If no type OID, assume by-value, like copyParamList does. */
383 			typLen = sizeof(Datum);
384 			typByVal = true;
385 		}
386 		datumSerialize(prm->value, prm->isnull, typByVal, typLen,
387 					   &start_address);
388 	}
389 
390 	return handle;
391 }
392 
393 /*
394  * Restore specified PARAM_EXEC parameters.
395  */
396 static void
RestoreParamExecParams(char * start_address,EState * estate)397 RestoreParamExecParams(char *start_address, EState *estate)
398 {
399 	int			nparams;
400 	int			i;
401 	int			paramid;
402 
403 	memcpy(&nparams, start_address, sizeof(int));
404 	start_address += sizeof(int);
405 
406 	for (i = 0; i < nparams; i++)
407 	{
408 		ParamExecData *prm;
409 
410 		/* Read paramid */
411 		memcpy(&paramid, start_address, sizeof(int));
412 		start_address += sizeof(int);
413 		prm = &(estate->es_param_exec_vals[paramid]);
414 
415 		/* Read datum/isnull. */
416 		prm->value = datumRestore(&start_address, &prm->isnull);
417 		prm->execPlan = NULL;
418 	}
419 }
420 
421 /*
422  * Initialize the dynamic shared memory segment that will be used to control
423  * parallel execution.
424  */
425 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)426 ExecParallelInitializeDSM(PlanState *planstate,
427 						  ExecParallelInitializeDSMContext *d)
428 {
429 	if (planstate == NULL)
430 		return false;
431 
432 	/* If instrumentation is enabled, initialize slot for this node. */
433 	if (d->instrumentation != NULL)
434 		d->instrumentation->plan_node_id[d->nnodes] =
435 			planstate->plan->plan_node_id;
436 
437 	/* Count this node. */
438 	d->nnodes++;
439 
440 	/*
441 	 * Call initializers for DSM-using plan nodes.
442 	 *
443 	 * Most plan nodes won't do anything here, but plan nodes that allocated
444 	 * DSM may need to initialize shared state in the DSM before parallel
445 	 * workers are launched.  They can allocate the space they previously
446 	 * estimated using shm_toc_allocate, and add the keys they previously
447 	 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
448 	 */
449 	switch (nodeTag(planstate))
450 	{
451 		case T_SeqScanState:
452 			if (planstate->plan->parallel_aware)
453 				ExecSeqScanInitializeDSM((SeqScanState *) planstate,
454 										 d->pcxt);
455 			break;
456 		case T_IndexScanState:
457 			if (planstate->plan->parallel_aware)
458 				ExecIndexScanInitializeDSM((IndexScanState *) planstate,
459 										   d->pcxt);
460 			break;
461 		case T_IndexOnlyScanState:
462 			if (planstate->plan->parallel_aware)
463 				ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
464 											   d->pcxt);
465 			break;
466 		case T_ForeignScanState:
467 			if (planstate->plan->parallel_aware)
468 				ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
469 											 d->pcxt);
470 			break;
471 		case T_AppendState:
472 			if (planstate->plan->parallel_aware)
473 				ExecAppendInitializeDSM((AppendState *) planstate,
474 										d->pcxt);
475 			break;
476 		case T_CustomScanState:
477 			if (planstate->plan->parallel_aware)
478 				ExecCustomScanInitializeDSM((CustomScanState *) planstate,
479 											d->pcxt);
480 			break;
481 		case T_BitmapHeapScanState:
482 			if (planstate->plan->parallel_aware)
483 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
484 											d->pcxt);
485 			break;
486 		case T_HashJoinState:
487 			if (planstate->plan->parallel_aware)
488 				ExecHashJoinInitializeDSM((HashJoinState *) planstate,
489 										  d->pcxt);
490 			break;
491 		case T_HashState:
492 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
493 			ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
494 			break;
495 		case T_SortState:
496 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
497 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
498 			break;
499 
500 		default:
501 			break;
502 	}
503 
504 	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
505 }
506 
507 /*
508  * It sets up the response queues for backend workers to return tuples
509  * to the main backend and start the workers.
510  */
511 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)512 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
513 {
514 	shm_mq_handle **responseq;
515 	char	   *tqueuespace;
516 	int			i;
517 
518 	/* Skip this if no workers. */
519 	if (pcxt->nworkers == 0)
520 		return NULL;
521 
522 	/* Allocate memory for shared memory queue handles. */
523 	responseq = (shm_mq_handle **)
524 		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
525 
526 	/*
527 	 * If not reinitializing, allocate space from the DSM for the queues;
528 	 * otherwise, find the already allocated space.
529 	 */
530 	if (!reinitialize)
531 		tqueuespace =
532 			shm_toc_allocate(pcxt->toc,
533 							 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
534 									  pcxt->nworkers));
535 	else
536 		tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
537 
538 	/* Create the queues, and become the receiver for each. */
539 	for (i = 0; i < pcxt->nworkers; ++i)
540 	{
541 		shm_mq	   *mq;
542 
543 		mq = shm_mq_create(tqueuespace +
544 						   ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
545 						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
546 
547 		shm_mq_set_receiver(mq, MyProc);
548 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
549 	}
550 
551 	/* Add array of queues to shm_toc, so others can find it. */
552 	if (!reinitialize)
553 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
554 
555 	/* Return array of handles. */
556 	return responseq;
557 }
558 
559 /*
560  * Sets up the required infrastructure for backend workers to perform
561  * execution and return results to the main backend.
562  */
563 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)564 ExecInitParallelPlan(PlanState *planstate, EState *estate,
565 					 Bitmapset *sendParams, int nworkers,
566 					 int64 tuples_needed)
567 {
568 	ParallelExecutorInfo *pei;
569 	ParallelContext *pcxt;
570 	ExecParallelEstimateContext e;
571 	ExecParallelInitializeDSMContext d;
572 	FixedParallelExecutorState *fpes;
573 	char	   *pstmt_data;
574 	char	   *pstmt_space;
575 	char	   *paramlistinfo_space;
576 	BufferUsage *bufusage_space;
577 	SharedExecutorInstrumentation *instrumentation = NULL;
578 	SharedJitInstrumentation *jit_instrumentation = NULL;
579 	int			pstmt_len;
580 	int			paramlistinfo_len;
581 	int			instrumentation_len = 0;
582 	int			jit_instrumentation_len = 0;
583 	int			instrument_offset = 0;
584 	Size		dsa_minsize = dsa_minimum_size();
585 	char	   *query_string;
586 	int			query_len;
587 
588 	/*
589 	 * Force any initplan outputs that we're going to pass to workers to be
590 	 * evaluated, if they weren't already.
591 	 *
592 	 * For simplicity, we use the EState's per-output-tuple ExprContext here.
593 	 * That risks intra-query memory leakage, since we might pass through here
594 	 * many times before that ExprContext gets reset; but ExecSetParamPlan
595 	 * doesn't normally leak any memory in the context (see its comments), so
596 	 * it doesn't seem worth complicating this function's API to pass it a
597 	 * shorter-lived ExprContext.  This might need to change someday.
598 	 */
599 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
600 
601 	/* Allocate object for return value. */
602 	pei = palloc0(sizeof(ParallelExecutorInfo));
603 	pei->finished = false;
604 	pei->planstate = planstate;
605 
606 	/* Fix up and serialize plan to be sent to workers. */
607 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
608 
609 	/* Create a parallel context. */
610 	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
611 	pei->pcxt = pcxt;
612 
613 	/*
614 	 * Before telling the parallel context to create a dynamic shared memory
615 	 * segment, we need to figure out how big it should be.  Estimate space
616 	 * for the various things we need to store.
617 	 */
618 
619 	/* Estimate space for fixed-size state. */
620 	shm_toc_estimate_chunk(&pcxt->estimator,
621 						   sizeof(FixedParallelExecutorState));
622 	shm_toc_estimate_keys(&pcxt->estimator, 1);
623 
624 	/* Estimate space for query text. */
625 	query_len = strlen(estate->es_sourceText);
626 	shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
627 	shm_toc_estimate_keys(&pcxt->estimator, 1);
628 
629 	/* Estimate space for serialized PlannedStmt. */
630 	pstmt_len = strlen(pstmt_data) + 1;
631 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
632 	shm_toc_estimate_keys(&pcxt->estimator, 1);
633 
634 	/* Estimate space for serialized ParamListInfo. */
635 	paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
636 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
637 	shm_toc_estimate_keys(&pcxt->estimator, 1);
638 
639 	/*
640 	 * Estimate space for BufferUsage.
641 	 *
642 	 * If EXPLAIN is not in use and there are no extensions loaded that care,
643 	 * we could skip this.  But we have no way of knowing whether anyone's
644 	 * looking at pgBufferUsage, so do it unconditionally.
645 	 */
646 	shm_toc_estimate_chunk(&pcxt->estimator,
647 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
648 	shm_toc_estimate_keys(&pcxt->estimator, 1);
649 
650 	/* Estimate space for tuple queues. */
651 	shm_toc_estimate_chunk(&pcxt->estimator,
652 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
653 	shm_toc_estimate_keys(&pcxt->estimator, 1);
654 
655 	/*
656 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
657 	 * count of how many PlanState nodes there are.
658 	 */
659 	e.pcxt = pcxt;
660 	e.nnodes = 0;
661 	ExecParallelEstimate(planstate, &e);
662 
663 	/* Estimate space for instrumentation, if required. */
664 	if (estate->es_instrument)
665 	{
666 		instrumentation_len =
667 			offsetof(SharedExecutorInstrumentation, plan_node_id) +
668 			sizeof(int) * e.nnodes;
669 		instrumentation_len = MAXALIGN(instrumentation_len);
670 		instrument_offset = instrumentation_len;
671 		instrumentation_len +=
672 			mul_size(sizeof(Instrumentation),
673 					 mul_size(e.nnodes, nworkers));
674 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
675 		shm_toc_estimate_keys(&pcxt->estimator, 1);
676 
677 		/* Estimate space for JIT instrumentation, if required. */
678 		if (estate->es_jit_flags != PGJIT_NONE)
679 		{
680 			jit_instrumentation_len =
681 				offsetof(SharedJitInstrumentation, jit_instr) +
682 				sizeof(JitInstrumentation) * nworkers;
683 			shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
684 			shm_toc_estimate_keys(&pcxt->estimator, 1);
685 		}
686 	}
687 
688 	/* Estimate space for DSA area. */
689 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
690 	shm_toc_estimate_keys(&pcxt->estimator, 1);
691 
692 	/* Everyone's had a chance to ask for space, so now create the DSM. */
693 	InitializeParallelDSM(pcxt);
694 
695 	/*
696 	 * OK, now we have a dynamic shared memory segment, and it should be big
697 	 * enough to store all of the data we estimated we would want to put into
698 	 * it, plus whatever general stuff (not specifically executor-related) the
699 	 * ParallelContext itself needs to store there.  None of the space we
700 	 * asked for has been allocated or initialized yet, though, so do that.
701 	 */
702 
703 	/* Store fixed-size state. */
704 	fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
705 	fpes->tuples_needed = tuples_needed;
706 	fpes->param_exec = InvalidDsaPointer;
707 	fpes->eflags = estate->es_top_eflags;
708 	fpes->jit_flags = estate->es_jit_flags;
709 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
710 
711 	/* Store query string */
712 	query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
713 	memcpy(query_string, estate->es_sourceText, query_len + 1);
714 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
715 
716 	/* Store serialized PlannedStmt. */
717 	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
718 	memcpy(pstmt_space, pstmt_data, pstmt_len);
719 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
720 
721 	/* Store serialized ParamListInfo. */
722 	paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
723 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
724 	SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
725 
726 	/* Allocate space for each worker's BufferUsage; no need to initialize. */
727 	bufusage_space = shm_toc_allocate(pcxt->toc,
728 									  mul_size(sizeof(BufferUsage), pcxt->nworkers));
729 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
730 	pei->buffer_usage = bufusage_space;
731 
732 	/* Set up the tuple queues that the workers will write into. */
733 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
734 
735 	/* We don't need the TupleQueueReaders yet, though. */
736 	pei->reader = NULL;
737 
738 	/*
739 	 * If instrumentation options were supplied, allocate space for the data.
740 	 * It only gets partially initialized here; the rest happens during
741 	 * ExecParallelInitializeDSM.
742 	 */
743 	if (estate->es_instrument)
744 	{
745 		Instrumentation *instrument;
746 		int			i;
747 
748 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
749 		instrumentation->instrument_options = estate->es_instrument;
750 		instrumentation->instrument_offset = instrument_offset;
751 		instrumentation->num_workers = nworkers;
752 		instrumentation->num_plan_nodes = e.nnodes;
753 		instrument = GetInstrumentationArray(instrumentation);
754 		for (i = 0; i < nworkers * e.nnodes; ++i)
755 			InstrInit(&instrument[i], estate->es_instrument);
756 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
757 					   instrumentation);
758 		pei->instrumentation = instrumentation;
759 
760 		if (estate->es_jit_flags != PGJIT_NONE)
761 		{
762 			jit_instrumentation = shm_toc_allocate(pcxt->toc,
763 												   jit_instrumentation_len);
764 			jit_instrumentation->num_workers = nworkers;
765 			memset(jit_instrumentation->jit_instr, 0,
766 				   sizeof(JitInstrumentation) * nworkers);
767 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
768 						   jit_instrumentation);
769 			pei->jit_instrumentation = jit_instrumentation;
770 		}
771 	}
772 
773 	/*
774 	 * Create a DSA area that can be used by the leader and all workers.
775 	 * (However, if we failed to create a DSM and are using private memory
776 	 * instead, then skip this.)
777 	 */
778 	if (pcxt->seg != NULL)
779 	{
780 		char	   *area_space;
781 
782 		area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
783 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
784 		pei->area = dsa_create_in_place(area_space, dsa_minsize,
785 										LWTRANCHE_PARALLEL_QUERY_DSA,
786 										pcxt->seg);
787 
788 		/*
789 		 * Serialize parameters, if any, using DSA storage.  We don't dare use
790 		 * the main parallel query DSM for this because we might relaunch
791 		 * workers after the values have changed (and thus the amount of
792 		 * storage required has changed).
793 		 */
794 		if (!bms_is_empty(sendParams))
795 		{
796 			pei->param_exec = SerializeParamExecParams(estate, sendParams,
797 													   pei->area);
798 			fpes->param_exec = pei->param_exec;
799 		}
800 	}
801 
802 	/*
803 	 * Give parallel-aware nodes a chance to initialize their shared data.
804 	 * This also initializes the elements of instrumentation->ps_instrument,
805 	 * if it exists.
806 	 */
807 	d.pcxt = pcxt;
808 	d.instrumentation = instrumentation;
809 	d.nnodes = 0;
810 
811 	/* Install our DSA area while initializing the plan. */
812 	estate->es_query_dsa = pei->area;
813 	ExecParallelInitializeDSM(planstate, &d);
814 	estate->es_query_dsa = NULL;
815 
816 	/*
817 	 * Make sure that the world hasn't shifted under our feet.  This could
818 	 * probably just be an Assert(), but let's be conservative for now.
819 	 */
820 	if (e.nnodes != d.nnodes)
821 		elog(ERROR, "inconsistent count of PlanState nodes");
822 
823 	/* OK, we're ready to rock and roll. */
824 	return pei;
825 }
826 
827 /*
828  * Set up tuple queue readers to read the results of a parallel subplan.
829  *
830  * This is separate from ExecInitParallelPlan() because we can launch the
831  * worker processes and let them start doing something before we do this.
832  */
833 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)834 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
835 {
836 	int			nworkers = pei->pcxt->nworkers_launched;
837 	int			i;
838 
839 	Assert(pei->reader == NULL);
840 
841 	if (nworkers > 0)
842 	{
843 		pei->reader = (TupleQueueReader **)
844 			palloc(nworkers * sizeof(TupleQueueReader *));
845 
846 		for (i = 0; i < nworkers; i++)
847 		{
848 			shm_mq_set_handle(pei->tqueue[i],
849 							  pei->pcxt->worker[i].bgwhandle);
850 			pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
851 		}
852 	}
853 }
854 
855 /*
856  * Re-initialize the parallel executor shared memory state before launching
857  * a fresh batch of workers.
858  */
859 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)860 ExecParallelReinitialize(PlanState *planstate,
861 						 ParallelExecutorInfo *pei,
862 						 Bitmapset *sendParams)
863 {
864 	EState	   *estate = planstate->state;
865 	FixedParallelExecutorState *fpes;
866 
867 	/* Old workers must already be shut down */
868 	Assert(pei->finished);
869 
870 	/*
871 	 * Force any initplan outputs that we're going to pass to workers to be
872 	 * evaluated, if they weren't already (see comments in
873 	 * ExecInitParallelPlan).
874 	 */
875 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
876 
877 	ReinitializeParallelDSM(pei->pcxt);
878 	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
879 	pei->reader = NULL;
880 	pei->finished = false;
881 
882 	fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
883 
884 	/* Free any serialized parameters from the last round. */
885 	if (DsaPointerIsValid(fpes->param_exec))
886 	{
887 		dsa_free(pei->area, fpes->param_exec);
888 		fpes->param_exec = InvalidDsaPointer;
889 	}
890 
891 	/* Serialize current parameter values if required. */
892 	if (!bms_is_empty(sendParams))
893 	{
894 		pei->param_exec = SerializeParamExecParams(estate, sendParams,
895 												   pei->area);
896 		fpes->param_exec = pei->param_exec;
897 	}
898 
899 	/* Traverse plan tree and let each child node reset associated state. */
900 	estate->es_query_dsa = pei->area;
901 	ExecParallelReInitializeDSM(planstate, pei->pcxt);
902 	estate->es_query_dsa = NULL;
903 }
904 
905 /*
906  * Traverse plan tree to reinitialize per-node dynamic shared memory state
907  */
908 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)909 ExecParallelReInitializeDSM(PlanState *planstate,
910 							ParallelContext *pcxt)
911 {
912 	if (planstate == NULL)
913 		return false;
914 
915 	/*
916 	 * Call reinitializers for DSM-using plan nodes.
917 	 */
918 	switch (nodeTag(planstate))
919 	{
920 		case T_SeqScanState:
921 			if (planstate->plan->parallel_aware)
922 				ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
923 										   pcxt);
924 			break;
925 		case T_IndexScanState:
926 			if (planstate->plan->parallel_aware)
927 				ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
928 											 pcxt);
929 			break;
930 		case T_IndexOnlyScanState:
931 			if (planstate->plan->parallel_aware)
932 				ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
933 												 pcxt);
934 			break;
935 		case T_ForeignScanState:
936 			if (planstate->plan->parallel_aware)
937 				ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
938 											   pcxt);
939 			break;
940 		case T_AppendState:
941 			if (planstate->plan->parallel_aware)
942 				ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
943 			break;
944 		case T_CustomScanState:
945 			if (planstate->plan->parallel_aware)
946 				ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
947 											  pcxt);
948 			break;
949 		case T_BitmapHeapScanState:
950 			if (planstate->plan->parallel_aware)
951 				ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
952 											  pcxt);
953 			break;
954 		case T_HashJoinState:
955 			if (planstate->plan->parallel_aware)
956 				ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
957 											pcxt);
958 			break;
959 		case T_HashState:
960 		case T_SortState:
961 			/* these nodes have DSM state, but no reinitialization is required */
962 			break;
963 
964 		default:
965 			break;
966 	}
967 
968 	return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
969 }
970 
971 /*
972  * Copy instrumentation information about this node and its descendants from
973  * dynamic shared memory.
974  */
975 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)976 ExecParallelRetrieveInstrumentation(PlanState *planstate,
977 									SharedExecutorInstrumentation *instrumentation)
978 {
979 	Instrumentation *instrument;
980 	int			i;
981 	int			n;
982 	int			ibytes;
983 	int			plan_node_id = planstate->plan->plan_node_id;
984 	MemoryContext oldcontext;
985 
986 	/* Find the instrumentation for this node. */
987 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
988 		if (instrumentation->plan_node_id[i] == plan_node_id)
989 			break;
990 	if (i >= instrumentation->num_plan_nodes)
991 		elog(ERROR, "plan node %d not found", plan_node_id);
992 
993 	/* Accumulate the statistics from all workers. */
994 	instrument = GetInstrumentationArray(instrumentation);
995 	instrument += i * instrumentation->num_workers;
996 	for (n = 0; n < instrumentation->num_workers; ++n)
997 		InstrAggNode(planstate->instrument, &instrument[n]);
998 
999 	/*
1000 	 * Also store the per-worker detail.
1001 	 *
1002 	 * Worker instrumentation should be allocated in the same context as the
1003 	 * regular instrumentation information, which is the per-query context.
1004 	 * Switch into per-query memory context.
1005 	 */
1006 	oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1007 	ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1008 	planstate->worker_instrument =
1009 		palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1010 	MemoryContextSwitchTo(oldcontext);
1011 
1012 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
1013 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1014 
1015 	/* Perform any node-type-specific work that needs to be done. */
1016 	switch (nodeTag(planstate))
1017 	{
1018 		case T_SortState:
1019 			ExecSortRetrieveInstrumentation((SortState *) planstate);
1020 			break;
1021 		case T_HashState:
1022 			ExecHashRetrieveInstrumentation((HashState *) planstate);
1023 			break;
1024 		default:
1025 			break;
1026 	}
1027 
1028 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1029 								 instrumentation);
1030 }
1031 
1032 /*
1033  * Add up the workers' JIT instrumentation from dynamic shared memory.
1034  */
1035 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1036 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1037 									   SharedJitInstrumentation *shared_jit)
1038 {
1039 	JitInstrumentation *combined;
1040 	int			ibytes;
1041 
1042 	int			n;
1043 
1044 	/*
1045 	 * Accumulate worker JIT instrumentation into the combined JIT
1046 	 * instrumentation, allocating it if required.
1047 	 */
1048 	if (!planstate->state->es_jit_worker_instr)
1049 		planstate->state->es_jit_worker_instr =
1050 			MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1051 	combined = planstate->state->es_jit_worker_instr;
1052 
1053 	/* Accumulate all the workers' instrumentations. */
1054 	for (n = 0; n < shared_jit->num_workers; ++n)
1055 		InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1056 
1057 	/*
1058 	 * Store the per-worker detail.
1059 	 *
1060 	 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1061 	 * instrumentation in per-query context.
1062 	 */
1063 	ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1064 			 + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1065 	planstate->worker_jit_instrument =
1066 		MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1067 
1068 	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1069 }
1070 
1071 /*
1072  * Finish parallel execution.  We wait for parallel workers to finish, and
1073  * accumulate their buffer usage.
1074  */
1075 void
ExecParallelFinish(ParallelExecutorInfo * pei)1076 ExecParallelFinish(ParallelExecutorInfo *pei)
1077 {
1078 	int			nworkers = pei->pcxt->nworkers_launched;
1079 	int			i;
1080 
1081 	/* Make this be a no-op if called twice in a row. */
1082 	if (pei->finished)
1083 		return;
1084 
1085 	/*
1086 	 * Detach from tuple queues ASAP, so that any still-active workers will
1087 	 * notice that no further results are wanted.
1088 	 */
1089 	if (pei->tqueue != NULL)
1090 	{
1091 		for (i = 0; i < nworkers; i++)
1092 			shm_mq_detach(pei->tqueue[i]);
1093 		pfree(pei->tqueue);
1094 		pei->tqueue = NULL;
1095 	}
1096 
1097 	/*
1098 	 * While we're waiting for the workers to finish, let's get rid of the
1099 	 * tuple queue readers.  (Any other local cleanup could be done here too.)
1100 	 */
1101 	if (pei->reader != NULL)
1102 	{
1103 		for (i = 0; i < nworkers; i++)
1104 			DestroyTupleQueueReader(pei->reader[i]);
1105 		pfree(pei->reader);
1106 		pei->reader = NULL;
1107 	}
1108 
1109 	/* Now wait for the workers to finish. */
1110 	WaitForParallelWorkersToFinish(pei->pcxt);
1111 
1112 	/*
1113 	 * Next, accumulate buffer usage.  (This must wait for the workers to
1114 	 * finish, or we might get incomplete data.)
1115 	 */
1116 	for (i = 0; i < nworkers; i++)
1117 		InstrAccumParallelQuery(&pei->buffer_usage[i]);
1118 
1119 	pei->finished = true;
1120 }
1121 
1122 /*
1123  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1124  * resources still exist after ExecParallelFinish.  We separate these
1125  * routines because someone might want to examine the contents of the DSM
1126  * after ExecParallelFinish and before calling this routine.
1127  */
1128 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1129 ExecParallelCleanup(ParallelExecutorInfo *pei)
1130 {
1131 	/* Accumulate instrumentation, if any. */
1132 	if (pei->instrumentation)
1133 		ExecParallelRetrieveInstrumentation(pei->planstate,
1134 											pei->instrumentation);
1135 
1136 	/* Accumulate JIT instrumentation, if any. */
1137 	if (pei->jit_instrumentation)
1138 		ExecParallelRetrieveJitInstrumentation(pei->planstate,
1139 											pei->jit_instrumentation);
1140 
1141 	/* Free any serialized parameters. */
1142 	if (DsaPointerIsValid(pei->param_exec))
1143 	{
1144 		dsa_free(pei->area, pei->param_exec);
1145 		pei->param_exec = InvalidDsaPointer;
1146 	}
1147 	if (pei->area != NULL)
1148 	{
1149 		dsa_detach(pei->area);
1150 		pei->area = NULL;
1151 	}
1152 	if (pei->pcxt != NULL)
1153 	{
1154 		DestroyParallelContext(pei->pcxt);
1155 		pei->pcxt = NULL;
1156 	}
1157 	pfree(pei);
1158 }
1159 
1160 /*
1161  * Create a DestReceiver to write tuples we produce to the shm_mq designated
1162  * for that purpose.
1163  */
1164 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1165 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1166 {
1167 	char	   *mqspace;
1168 	shm_mq	   *mq;
1169 
1170 	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1171 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1172 	mq = (shm_mq *) mqspace;
1173 	shm_mq_set_sender(mq, MyProc);
1174 	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1175 }
1176 
1177 /*
1178  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1179  */
1180 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1181 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1182 						 int instrument_options)
1183 {
1184 	char	   *pstmtspace;
1185 	char	   *paramspace;
1186 	PlannedStmt *pstmt;
1187 	ParamListInfo paramLI;
1188 	char	   *queryString;
1189 
1190 	/* Get the query string from shared memory */
1191 	queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1192 
1193 	/* Reconstruct leader-supplied PlannedStmt. */
1194 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1195 	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1196 
1197 	/* Reconstruct ParamListInfo. */
1198 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1199 	paramLI = RestoreParamList(&paramspace);
1200 
1201 	/* Create a QueryDesc for the query. */
1202 	return CreateQueryDesc(pstmt,
1203 						   queryString,
1204 						   GetActiveSnapshot(), InvalidSnapshot,
1205 						   receiver, paramLI, NULL, instrument_options);
1206 }
1207 
1208 /*
1209  * Copy instrumentation information from this node and its descendants into
1210  * dynamic shared memory, so that the parallel leader can retrieve it.
1211  */
1212 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1213 ExecParallelReportInstrumentation(PlanState *planstate,
1214 								  SharedExecutorInstrumentation *instrumentation)
1215 {
1216 	int			i;
1217 	int			plan_node_id = planstate->plan->plan_node_id;
1218 	Instrumentation *instrument;
1219 
1220 	InstrEndLoop(planstate->instrument);
1221 
1222 	/*
1223 	 * If we shuffled the plan_node_id values in ps_instrument into sorted
1224 	 * order, we could use binary search here.  This might matter someday if
1225 	 * we're pushing down sufficiently large plan trees.  For now, do it the
1226 	 * slow, dumb way.
1227 	 */
1228 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1229 		if (instrumentation->plan_node_id[i] == plan_node_id)
1230 			break;
1231 	if (i >= instrumentation->num_plan_nodes)
1232 		elog(ERROR, "plan node %d not found", plan_node_id);
1233 
1234 	/*
1235 	 * Add our statistics to the per-node, per-worker totals.  It's possible
1236 	 * that this could happen more than once if we relaunched workers.
1237 	 */
1238 	instrument = GetInstrumentationArray(instrumentation);
1239 	instrument += i * instrumentation->num_workers;
1240 	Assert(IsParallelWorker());
1241 	Assert(ParallelWorkerNumber < instrumentation->num_workers);
1242 	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1243 
1244 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1245 								 instrumentation);
1246 }
1247 
1248 /*
1249  * Initialize the PlanState and its descendants with the information
1250  * retrieved from shared memory.  This has to be done once the PlanState
1251  * is allocated and initialized by executor; that is, after ExecutorStart().
1252  */
1253 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1254 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1255 {
1256 	if (planstate == NULL)
1257 		return false;
1258 
1259 	switch (nodeTag(planstate))
1260 	{
1261 		case T_SeqScanState:
1262 			if (planstate->plan->parallel_aware)
1263 				ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1264 			break;
1265 		case T_IndexScanState:
1266 			if (planstate->plan->parallel_aware)
1267 				ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1268 											  pwcxt);
1269 			break;
1270 		case T_IndexOnlyScanState:
1271 			if (planstate->plan->parallel_aware)
1272 				ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1273 												  pwcxt);
1274 			break;
1275 		case T_ForeignScanState:
1276 			if (planstate->plan->parallel_aware)
1277 				ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1278 												pwcxt);
1279 			break;
1280 		case T_AppendState:
1281 			if (planstate->plan->parallel_aware)
1282 				ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1283 			break;
1284 		case T_CustomScanState:
1285 			if (planstate->plan->parallel_aware)
1286 				ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1287 											   pwcxt);
1288 			break;
1289 		case T_BitmapHeapScanState:
1290 			if (planstate->plan->parallel_aware)
1291 				ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1292 											   pwcxt);
1293 			break;
1294 		case T_HashJoinState:
1295 			if (planstate->plan->parallel_aware)
1296 				ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1297 											 pwcxt);
1298 			break;
1299 		case T_HashState:
1300 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1301 			ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1302 			break;
1303 		case T_SortState:
1304 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1305 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1306 			break;
1307 
1308 		default:
1309 			break;
1310 	}
1311 
1312 	return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1313 								 pwcxt);
1314 }
1315 
1316 /*
1317  * Main entrypoint for parallel query worker processes.
1318  *
1319  * We reach this function from ParallelWorkerMain, so the setup necessary to
1320  * create a sensible parallel environment has already been done;
1321  * ParallelWorkerMain worries about stuff like the transaction state, combo
1322  * CID mappings, and GUC values, so we don't need to deal with any of that
1323  * here.
1324  *
1325  * Our job is to deal with concerns specific to the executor.  The parallel
1326  * group leader will have stored a serialized PlannedStmt, and it's our job
1327  * to execute that plan and write the resulting tuples to the appropriate
1328  * tuple queue.  Various bits of supporting information that we need in order
1329  * to do this are also stored in the dsm_segment and can be accessed through
1330  * the shm_toc.
1331  */
1332 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1333 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1334 {
1335 	FixedParallelExecutorState *fpes;
1336 	BufferUsage *buffer_usage;
1337 	DestReceiver *receiver;
1338 	QueryDesc  *queryDesc;
1339 	SharedExecutorInstrumentation *instrumentation;
1340 	SharedJitInstrumentation *jit_instrumentation;
1341 	int			instrument_options = 0;
1342 	void	   *area_space;
1343 	dsa_area   *area;
1344 	ParallelWorkerContext pwcxt;
1345 
1346 	/* Get fixed-size state. */
1347 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1348 
1349 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1350 	receiver = ExecParallelGetReceiver(seg, toc);
1351 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1352 	if (instrumentation != NULL)
1353 		instrument_options = instrumentation->instrument_options;
1354 	jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1355 										 true);
1356 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1357 
1358 	/* Setting debug_query_string for individual workers */
1359 	debug_query_string = queryDesc->sourceText;
1360 
1361 	/* Report workers' query for monitoring purposes */
1362 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
1363 
1364 	/* Attach to the dynamic shared memory area. */
1365 	area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1366 	area = dsa_attach_in_place(area_space, seg);
1367 
1368 	/* Start up the executor */
1369 	queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1370 	ExecutorStart(queryDesc, fpes->eflags);
1371 
1372 	/* Special executor initialization steps for parallel workers */
1373 	queryDesc->planstate->state->es_query_dsa = area;
1374 	if (DsaPointerIsValid(fpes->param_exec))
1375 	{
1376 		char	   *paramexec_space;
1377 
1378 		paramexec_space = dsa_get_address(area, fpes->param_exec);
1379 		RestoreParamExecParams(paramexec_space, queryDesc->estate);
1380 
1381 	}
1382 	pwcxt.toc = toc;
1383 	pwcxt.seg = seg;
1384 	ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1385 
1386 	/* Pass down any tuple bound */
1387 	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1388 
1389 	/*
1390 	 * Prepare to track buffer usage during query execution.
1391 	 *
1392 	 * We do this after starting up the executor to match what happens in the
1393 	 * leader, which also doesn't count buffer accesses that occur during
1394 	 * executor startup.
1395 	 */
1396 	InstrStartParallelQuery();
1397 
1398 	/*
1399 	 * Run the plan.  If we specified a tuple bound, be careful not to demand
1400 	 * more tuples than that.
1401 	 */
1402 	ExecutorRun(queryDesc,
1403 				ForwardScanDirection,
1404 				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1405 				true);
1406 
1407 	/* Shut down the executor */
1408 	ExecutorFinish(queryDesc);
1409 
1410 	/* Report buffer usage during parallel execution. */
1411 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1412 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
1413 
1414 	/* Report instrumentation data if any instrumentation options are set. */
1415 	if (instrumentation != NULL)
1416 		ExecParallelReportInstrumentation(queryDesc->planstate,
1417 										  instrumentation);
1418 
1419 	/* Report JIT instrumentation data if any */
1420 	if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1421 	{
1422 		Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1423 		jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1424 			queryDesc->estate->es_jit->instr;
1425 	}
1426 
1427 	/* Must do this after capturing instrumentation. */
1428 	ExecutorEnd(queryDesc);
1429 
1430 	/* Cleanup. */
1431 	dsa_detach(area);
1432 	FreeQueryDesc(queryDesc);
1433 	receiver->rDestroy(receiver);
1434 }
1435