1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *	  Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer/WAL usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *	  src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAgg.h"
29 #include "executor/nodeAppend.h"
30 #include "executor/nodeBitmapHeapscan.h"
31 #include "executor/nodeCustom.h"
32 #include "executor/nodeForeignscan.h"
33 #include "executor/nodeHash.h"
34 #include "executor/nodeHashjoin.h"
35 #include "executor/nodeIncrementalSort.h"
36 #include "executor/nodeIndexonlyscan.h"
37 #include "executor/nodeIndexscan.h"
38 #include "executor/nodeSeqscan.h"
39 #include "executor/nodeSort.h"
40 #include "executor/nodeSubplan.h"
41 #include "executor/tqueue.h"
42 #include "jit/jit.h"
43 #include "nodes/nodeFuncs.h"
44 #include "pgstat.h"
45 #include "storage/spin.h"
46 #include "tcop/tcopprot.h"
47 #include "utils/datum.h"
48 #include "utils/dsa.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/snapmgr.h"
52 
53 /*
54  * Magic numbers for parallel executor communication.  We use constants
55  * greater than any 32-bit integer here so that values < 2^32 can be used
56  * by individual parallel nodes to store their own state.
57  */
58 #define PARALLEL_KEY_EXECUTOR_FIXED		UINT64CONST(0xE000000000000001)
59 #define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000002)
60 #define PARALLEL_KEY_PARAMLISTINFO		UINT64CONST(0xE000000000000003)
61 #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000004)
62 #define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000005)
63 #define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000006)
64 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
65 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
66 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
68 
69 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
70 
71 /*
72  * Fixed-size random stuff that we need to pass to parallel workers.
73  */
74 typedef struct FixedParallelExecutorState
75 {
76 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
77 	dsa_pointer param_exec;
78 	int			eflags;
79 	int			jit_flags;
80 } FixedParallelExecutorState;
81 
82 /*
83  * DSM structure for accumulating per-PlanState instrumentation.
84  *
85  * instrument_options: Same meaning here as in instrument.c.
86  *
87  * instrument_offset: Offset, relative to the start of this structure,
88  * of the first Instrumentation object.  This will depend on the length of
89  * the plan_node_id array.
90  *
91  * num_workers: Number of workers.
92  *
93  * num_plan_nodes: Number of plan nodes.
94  *
95  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
96  * from parallel workers.  The length of this array is given by num_plan_nodes.
97  */
98 struct SharedExecutorInstrumentation
99 {
100 	int			instrument_options;
101 	int			instrument_offset;
102 	int			num_workers;
103 	int			num_plan_nodes;
104 	int			plan_node_id[FLEXIBLE_ARRAY_MEMBER];
105 	/* array of num_plan_nodes * num_workers Instrumentation objects follows */
106 };
107 #define GetInstrumentationArray(sei) \
108 	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
109 	 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
110 
111 /* Context object for ExecParallelEstimate. */
112 typedef struct ExecParallelEstimateContext
113 {
114 	ParallelContext *pcxt;
115 	int			nnodes;
116 } ExecParallelEstimateContext;
117 
118 /* Context object for ExecParallelInitializeDSM. */
119 typedef struct ExecParallelInitializeDSMContext
120 {
121 	ParallelContext *pcxt;
122 	SharedExecutorInstrumentation *instrumentation;
123 	int			nnodes;
124 } ExecParallelInitializeDSMContext;
125 
126 /* Helper functions that run in the parallel leader. */
127 static char *ExecSerializePlan(Plan *plan, EState *estate);
128 static bool ExecParallelEstimate(PlanState *node,
129 								 ExecParallelEstimateContext *e);
130 static bool ExecParallelInitializeDSM(PlanState *node,
131 									  ExecParallelInitializeDSMContext *d);
132 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
133 													bool reinitialize);
134 static bool ExecParallelReInitializeDSM(PlanState *planstate,
135 										ParallelContext *pcxt);
136 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
137 												SharedExecutorInstrumentation *instrumentation);
138 
139 /* Helper function that runs in the parallel worker. */
140 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
141 
142 /*
143  * Create a serialized representation of the plan to be sent to each worker.
144  */
145 static char *
ExecSerializePlan(Plan * plan,EState * estate)146 ExecSerializePlan(Plan *plan, EState *estate)
147 {
148 	PlannedStmt *pstmt;
149 	ListCell   *lc;
150 
151 	/* We can't scribble on the original plan, so make a copy. */
152 	plan = copyObject(plan);
153 
154 	/*
155 	 * The worker will start its own copy of the executor, and that copy will
156 	 * insert a junk filter if the toplevel node has any resjunk entries. We
157 	 * don't want that to happen, because while resjunk columns shouldn't be
158 	 * sent back to the user, here the tuples are coming back to another
159 	 * backend which may very well need them.  So mutate the target list
160 	 * accordingly.  This is sort of a hack; there might be better ways to do
161 	 * this...
162 	 */
163 	foreach(lc, plan->targetlist)
164 	{
165 		TargetEntry *tle = lfirst_node(TargetEntry, lc);
166 
167 		tle->resjunk = false;
168 	}
169 
170 	/*
171 	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
172 	 * for our purposes, but the worker will need at least a minimal
173 	 * PlannedStmt to start the executor.
174 	 */
175 	pstmt = makeNode(PlannedStmt);
176 	pstmt->commandType = CMD_SELECT;
177 	pstmt->queryId = UINT64CONST(0);
178 	pstmt->hasReturning = false;
179 	pstmt->hasModifyingCTE = false;
180 	pstmt->canSetTag = true;
181 	pstmt->transientPlan = false;
182 	pstmt->dependsOnRole = false;
183 	pstmt->parallelModeNeeded = false;
184 	pstmt->planTree = plan;
185 	pstmt->rtable = estate->es_range_table;
186 	pstmt->resultRelations = NIL;
187 	pstmt->rootResultRelations = NIL;
188 	pstmt->appendRelations = NIL;
189 
190 	/*
191 	 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
192 	 * for unsafe ones (so that the list indexes of the safe ones are
193 	 * preserved).  This positively ensures that the worker won't try to run,
194 	 * or even do ExecInitNode on, an unsafe subplan.  That's important to
195 	 * protect, eg, non-parallel-aware FDWs from getting into trouble.
196 	 */
197 	pstmt->subplans = NIL;
198 	foreach(lc, estate->es_plannedstmt->subplans)
199 	{
200 		Plan	   *subplan = (Plan *) lfirst(lc);
201 
202 		if (subplan && !subplan->parallel_safe)
203 			subplan = NULL;
204 		pstmt->subplans = lappend(pstmt->subplans, subplan);
205 	}
206 
207 	pstmt->rewindPlanIDs = NULL;
208 	pstmt->rowMarks = NIL;
209 	pstmt->relationOids = NIL;
210 	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
211 	pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
212 	pstmt->utilityStmt = NULL;
213 	pstmt->stmt_location = -1;
214 	pstmt->stmt_len = -1;
215 
216 	/* Return serialized copy of our dummy PlannedStmt. */
217 	return nodeToString(pstmt);
218 }
219 
220 /*
221  * Parallel-aware plan nodes (and occasionally others) may need some state
222  * which is shared across all parallel workers.  Before we size the DSM, give
223  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
224  * &pcxt->estimator.
225  *
226  * While we're at it, count the number of PlanState nodes in the tree, so
227  * we know how many Instrumentation structures we need.
228  */
229 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)230 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
231 {
232 	if (planstate == NULL)
233 		return false;
234 
235 	/* Count this node. */
236 	e->nnodes++;
237 
238 	switch (nodeTag(planstate))
239 	{
240 		case T_SeqScanState:
241 			if (planstate->plan->parallel_aware)
242 				ExecSeqScanEstimate((SeqScanState *) planstate,
243 									e->pcxt);
244 			break;
245 		case T_IndexScanState:
246 			if (planstate->plan->parallel_aware)
247 				ExecIndexScanEstimate((IndexScanState *) planstate,
248 									  e->pcxt);
249 			break;
250 		case T_IndexOnlyScanState:
251 			if (planstate->plan->parallel_aware)
252 				ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
253 										  e->pcxt);
254 			break;
255 		case T_ForeignScanState:
256 			if (planstate->plan->parallel_aware)
257 				ExecForeignScanEstimate((ForeignScanState *) planstate,
258 										e->pcxt);
259 			break;
260 		case T_AppendState:
261 			if (planstate->plan->parallel_aware)
262 				ExecAppendEstimate((AppendState *) planstate,
263 								   e->pcxt);
264 			break;
265 		case T_CustomScanState:
266 			if (planstate->plan->parallel_aware)
267 				ExecCustomScanEstimate((CustomScanState *) planstate,
268 									   e->pcxt);
269 			break;
270 		case T_BitmapHeapScanState:
271 			if (planstate->plan->parallel_aware)
272 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
273 									   e->pcxt);
274 			break;
275 		case T_HashJoinState:
276 			if (planstate->plan->parallel_aware)
277 				ExecHashJoinEstimate((HashJoinState *) planstate,
278 									 e->pcxt);
279 			break;
280 		case T_HashState:
281 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
282 			ExecHashEstimate((HashState *) planstate, e->pcxt);
283 			break;
284 		case T_SortState:
285 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
286 			ExecSortEstimate((SortState *) planstate, e->pcxt);
287 			break;
288 		case T_IncrementalSortState:
289 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
290 			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
291 			break;
292 		case T_AggState:
293 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
294 			ExecAggEstimate((AggState *) planstate, e->pcxt);
295 			break;
296 		default:
297 			break;
298 	}
299 
300 	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
301 }
302 
303 /*
304  * Estimate the amount of space required to serialize the indicated parameters.
305  */
306 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)307 EstimateParamExecSpace(EState *estate, Bitmapset *params)
308 {
309 	int			paramid;
310 	Size		sz = sizeof(int);
311 
312 	paramid = -1;
313 	while ((paramid = bms_next_member(params, paramid)) >= 0)
314 	{
315 		Oid			typeOid;
316 		int16		typLen;
317 		bool		typByVal;
318 		ParamExecData *prm;
319 
320 		prm = &(estate->es_param_exec_vals[paramid]);
321 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
322 							   paramid);
323 
324 		sz = add_size(sz, sizeof(int)); /* space for paramid */
325 
326 		/* space for datum/isnull */
327 		if (OidIsValid(typeOid))
328 			get_typlenbyval(typeOid, &typLen, &typByVal);
329 		else
330 		{
331 			/* If no type OID, assume by-value, like copyParamList does. */
332 			typLen = sizeof(Datum);
333 			typByVal = true;
334 		}
335 		sz = add_size(sz,
336 					  datumEstimateSpace(prm->value, prm->isnull,
337 										 typByVal, typLen));
338 	}
339 	return sz;
340 }
341 
342 /*
343  * Serialize specified PARAM_EXEC parameters.
344  *
345  * We write the number of parameters first, as a 4-byte integer, and then
346  * write details for each parameter in turn.  The details for each parameter
347  * consist of a 4-byte paramid (location of param in execution time internal
348  * parameter array) and then the datum as serialized by datumSerialize().
349  */
350 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)351 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
352 {
353 	Size		size;
354 	int			nparams;
355 	int			paramid;
356 	ParamExecData *prm;
357 	dsa_pointer handle;
358 	char	   *start_address;
359 
360 	/* Allocate enough space for the current parameter values. */
361 	size = EstimateParamExecSpace(estate, params);
362 	handle = dsa_allocate(area, size);
363 	start_address = dsa_get_address(area, handle);
364 
365 	/* First write the number of parameters as a 4-byte integer. */
366 	nparams = bms_num_members(params);
367 	memcpy(start_address, &nparams, sizeof(int));
368 	start_address += sizeof(int);
369 
370 	/* Write details for each parameter in turn. */
371 	paramid = -1;
372 	while ((paramid = bms_next_member(params, paramid)) >= 0)
373 	{
374 		Oid			typeOid;
375 		int16		typLen;
376 		bool		typByVal;
377 
378 		prm = &(estate->es_param_exec_vals[paramid]);
379 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
380 							   paramid);
381 
382 		/* Write paramid. */
383 		memcpy(start_address, &paramid, sizeof(int));
384 		start_address += sizeof(int);
385 
386 		/* Write datum/isnull */
387 		if (OidIsValid(typeOid))
388 			get_typlenbyval(typeOid, &typLen, &typByVal);
389 		else
390 		{
391 			/* If no type OID, assume by-value, like copyParamList does. */
392 			typLen = sizeof(Datum);
393 			typByVal = true;
394 		}
395 		datumSerialize(prm->value, prm->isnull, typByVal, typLen,
396 					   &start_address);
397 	}
398 
399 	return handle;
400 }
401 
402 /*
403  * Restore specified PARAM_EXEC parameters.
404  */
405 static void
RestoreParamExecParams(char * start_address,EState * estate)406 RestoreParamExecParams(char *start_address, EState *estate)
407 {
408 	int			nparams;
409 	int			i;
410 	int			paramid;
411 
412 	memcpy(&nparams, start_address, sizeof(int));
413 	start_address += sizeof(int);
414 
415 	for (i = 0; i < nparams; i++)
416 	{
417 		ParamExecData *prm;
418 
419 		/* Read paramid */
420 		memcpy(&paramid, start_address, sizeof(int));
421 		start_address += sizeof(int);
422 		prm = &(estate->es_param_exec_vals[paramid]);
423 
424 		/* Read datum/isnull. */
425 		prm->value = datumRestore(&start_address, &prm->isnull);
426 		prm->execPlan = NULL;
427 	}
428 }
429 
430 /*
431  * Initialize the dynamic shared memory segment that will be used to control
432  * parallel execution.
433  */
434 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)435 ExecParallelInitializeDSM(PlanState *planstate,
436 						  ExecParallelInitializeDSMContext *d)
437 {
438 	if (planstate == NULL)
439 		return false;
440 
441 	/* If instrumentation is enabled, initialize slot for this node. */
442 	if (d->instrumentation != NULL)
443 		d->instrumentation->plan_node_id[d->nnodes] =
444 			planstate->plan->plan_node_id;
445 
446 	/* Count this node. */
447 	d->nnodes++;
448 
449 	/*
450 	 * Call initializers for DSM-using plan nodes.
451 	 *
452 	 * Most plan nodes won't do anything here, but plan nodes that allocated
453 	 * DSM may need to initialize shared state in the DSM before parallel
454 	 * workers are launched.  They can allocate the space they previously
455 	 * estimated using shm_toc_allocate, and add the keys they previously
456 	 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
457 	 */
458 	switch (nodeTag(planstate))
459 	{
460 		case T_SeqScanState:
461 			if (planstate->plan->parallel_aware)
462 				ExecSeqScanInitializeDSM((SeqScanState *) planstate,
463 										 d->pcxt);
464 			break;
465 		case T_IndexScanState:
466 			if (planstate->plan->parallel_aware)
467 				ExecIndexScanInitializeDSM((IndexScanState *) planstate,
468 										   d->pcxt);
469 			break;
470 		case T_IndexOnlyScanState:
471 			if (planstate->plan->parallel_aware)
472 				ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
473 											   d->pcxt);
474 			break;
475 		case T_ForeignScanState:
476 			if (planstate->plan->parallel_aware)
477 				ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
478 											 d->pcxt);
479 			break;
480 		case T_AppendState:
481 			if (planstate->plan->parallel_aware)
482 				ExecAppendInitializeDSM((AppendState *) planstate,
483 										d->pcxt);
484 			break;
485 		case T_CustomScanState:
486 			if (planstate->plan->parallel_aware)
487 				ExecCustomScanInitializeDSM((CustomScanState *) planstate,
488 											d->pcxt);
489 			break;
490 		case T_BitmapHeapScanState:
491 			if (planstate->plan->parallel_aware)
492 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
493 											d->pcxt);
494 			break;
495 		case T_HashJoinState:
496 			if (planstate->plan->parallel_aware)
497 				ExecHashJoinInitializeDSM((HashJoinState *) planstate,
498 										  d->pcxt);
499 			break;
500 		case T_HashState:
501 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
502 			ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
503 			break;
504 		case T_SortState:
505 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
506 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
507 			break;
508 		case T_IncrementalSortState:
509 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
510 			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
511 			break;
512 		case T_AggState:
513 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
514 			ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
515 			break;
516 		default:
517 			break;
518 	}
519 
520 	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
521 }
522 
523 /*
524  * It sets up the response queues for backend workers to return tuples
525  * to the main backend and start the workers.
526  */
527 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)528 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
529 {
530 	shm_mq_handle **responseq;
531 	char	   *tqueuespace;
532 	int			i;
533 
534 	/* Skip this if no workers. */
535 	if (pcxt->nworkers == 0)
536 		return NULL;
537 
538 	/* Allocate memory for shared memory queue handles. */
539 	responseq = (shm_mq_handle **)
540 		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
541 
542 	/*
543 	 * If not reinitializing, allocate space from the DSM for the queues;
544 	 * otherwise, find the already allocated space.
545 	 */
546 	if (!reinitialize)
547 		tqueuespace =
548 			shm_toc_allocate(pcxt->toc,
549 							 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
550 									  pcxt->nworkers));
551 	else
552 		tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
553 
554 	/* Create the queues, and become the receiver for each. */
555 	for (i = 0; i < pcxt->nworkers; ++i)
556 	{
557 		shm_mq	   *mq;
558 
559 		mq = shm_mq_create(tqueuespace +
560 						   ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
561 						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
562 
563 		shm_mq_set_receiver(mq, MyProc);
564 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
565 	}
566 
567 	/* Add array of queues to shm_toc, so others can find it. */
568 	if (!reinitialize)
569 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
570 
571 	/* Return array of handles. */
572 	return responseq;
573 }
574 
575 /*
576  * Sets up the required infrastructure for backend workers to perform
577  * execution and return results to the main backend.
578  */
579 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)580 ExecInitParallelPlan(PlanState *planstate, EState *estate,
581 					 Bitmapset *sendParams, int nworkers,
582 					 int64 tuples_needed)
583 {
584 	ParallelExecutorInfo *pei;
585 	ParallelContext *pcxt;
586 	ExecParallelEstimateContext e;
587 	ExecParallelInitializeDSMContext d;
588 	FixedParallelExecutorState *fpes;
589 	char	   *pstmt_data;
590 	char	   *pstmt_space;
591 	char	   *paramlistinfo_space;
592 	BufferUsage *bufusage_space;
593 	WalUsage   *walusage_space;
594 	SharedExecutorInstrumentation *instrumentation = NULL;
595 	SharedJitInstrumentation *jit_instrumentation = NULL;
596 	int			pstmt_len;
597 	int			paramlistinfo_len;
598 	int			instrumentation_len = 0;
599 	int			jit_instrumentation_len = 0;
600 	int			instrument_offset = 0;
601 	Size		dsa_minsize = dsa_minimum_size();
602 	char	   *query_string;
603 	int			query_len;
604 
605 	/*
606 	 * Force any initplan outputs that we're going to pass to workers to be
607 	 * evaluated, if they weren't already.
608 	 *
609 	 * For simplicity, we use the EState's per-output-tuple ExprContext here.
610 	 * That risks intra-query memory leakage, since we might pass through here
611 	 * many times before that ExprContext gets reset; but ExecSetParamPlan
612 	 * doesn't normally leak any memory in the context (see its comments), so
613 	 * it doesn't seem worth complicating this function's API to pass it a
614 	 * shorter-lived ExprContext.  This might need to change someday.
615 	 */
616 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
617 
618 	/* Allocate object for return value. */
619 	pei = palloc0(sizeof(ParallelExecutorInfo));
620 	pei->finished = false;
621 	pei->planstate = planstate;
622 
623 	/* Fix up and serialize plan to be sent to workers. */
624 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
625 
626 	/* Create a parallel context. */
627 	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
628 	pei->pcxt = pcxt;
629 
630 	/*
631 	 * Before telling the parallel context to create a dynamic shared memory
632 	 * segment, we need to figure out how big it should be.  Estimate space
633 	 * for the various things we need to store.
634 	 */
635 
636 	/* Estimate space for fixed-size state. */
637 	shm_toc_estimate_chunk(&pcxt->estimator,
638 						   sizeof(FixedParallelExecutorState));
639 	shm_toc_estimate_keys(&pcxt->estimator, 1);
640 
641 	/* Estimate space for query text. */
642 	query_len = strlen(estate->es_sourceText);
643 	shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
644 	shm_toc_estimate_keys(&pcxt->estimator, 1);
645 
646 	/* Estimate space for serialized PlannedStmt. */
647 	pstmt_len = strlen(pstmt_data) + 1;
648 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
649 	shm_toc_estimate_keys(&pcxt->estimator, 1);
650 
651 	/* Estimate space for serialized ParamListInfo. */
652 	paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
653 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
654 	shm_toc_estimate_keys(&pcxt->estimator, 1);
655 
656 	/*
657 	 * Estimate space for BufferUsage.
658 	 *
659 	 * If EXPLAIN is not in use and there are no extensions loaded that care,
660 	 * we could skip this.  But we have no way of knowing whether anyone's
661 	 * looking at pgBufferUsage, so do it unconditionally.
662 	 */
663 	shm_toc_estimate_chunk(&pcxt->estimator,
664 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
665 	shm_toc_estimate_keys(&pcxt->estimator, 1);
666 
667 	/*
668 	 * Same thing for WalUsage.
669 	 */
670 	shm_toc_estimate_chunk(&pcxt->estimator,
671 						   mul_size(sizeof(WalUsage), pcxt->nworkers));
672 	shm_toc_estimate_keys(&pcxt->estimator, 1);
673 
674 	/* Estimate space for tuple queues. */
675 	shm_toc_estimate_chunk(&pcxt->estimator,
676 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
677 	shm_toc_estimate_keys(&pcxt->estimator, 1);
678 
679 	/*
680 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
681 	 * count of how many PlanState nodes there are.
682 	 */
683 	e.pcxt = pcxt;
684 	e.nnodes = 0;
685 	ExecParallelEstimate(planstate, &e);
686 
687 	/* Estimate space for instrumentation, if required. */
688 	if (estate->es_instrument)
689 	{
690 		instrumentation_len =
691 			offsetof(SharedExecutorInstrumentation, plan_node_id) +
692 			sizeof(int) * e.nnodes;
693 		instrumentation_len = MAXALIGN(instrumentation_len);
694 		instrument_offset = instrumentation_len;
695 		instrumentation_len +=
696 			mul_size(sizeof(Instrumentation),
697 					 mul_size(e.nnodes, nworkers));
698 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
699 		shm_toc_estimate_keys(&pcxt->estimator, 1);
700 
701 		/* Estimate space for JIT instrumentation, if required. */
702 		if (estate->es_jit_flags != PGJIT_NONE)
703 		{
704 			jit_instrumentation_len =
705 				offsetof(SharedJitInstrumentation, jit_instr) +
706 				sizeof(JitInstrumentation) * nworkers;
707 			shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
708 			shm_toc_estimate_keys(&pcxt->estimator, 1);
709 		}
710 	}
711 
712 	/* Estimate space for DSA area. */
713 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
714 	shm_toc_estimate_keys(&pcxt->estimator, 1);
715 
716 	/* Everyone's had a chance to ask for space, so now create the DSM. */
717 	InitializeParallelDSM(pcxt);
718 
719 	/*
720 	 * OK, now we have a dynamic shared memory segment, and it should be big
721 	 * enough to store all of the data we estimated we would want to put into
722 	 * it, plus whatever general stuff (not specifically executor-related) the
723 	 * ParallelContext itself needs to store there.  None of the space we
724 	 * asked for has been allocated or initialized yet, though, so do that.
725 	 */
726 
727 	/* Store fixed-size state. */
728 	fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
729 	fpes->tuples_needed = tuples_needed;
730 	fpes->param_exec = InvalidDsaPointer;
731 	fpes->eflags = estate->es_top_eflags;
732 	fpes->jit_flags = estate->es_jit_flags;
733 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
734 
735 	/* Store query string */
736 	query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
737 	memcpy(query_string, estate->es_sourceText, query_len + 1);
738 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
739 
740 	/* Store serialized PlannedStmt. */
741 	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
742 	memcpy(pstmt_space, pstmt_data, pstmt_len);
743 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
744 
745 	/* Store serialized ParamListInfo. */
746 	paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
747 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
748 	SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
749 
750 	/* Allocate space for each worker's BufferUsage; no need to initialize. */
751 	bufusage_space = shm_toc_allocate(pcxt->toc,
752 									  mul_size(sizeof(BufferUsage), pcxt->nworkers));
753 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
754 	pei->buffer_usage = bufusage_space;
755 
756 	/* Same for WalUsage. */
757 	walusage_space = shm_toc_allocate(pcxt->toc,
758 									  mul_size(sizeof(WalUsage), pcxt->nworkers));
759 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
760 	pei->wal_usage = walusage_space;
761 
762 	/* Set up the tuple queues that the workers will write into. */
763 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
764 
765 	/* We don't need the TupleQueueReaders yet, though. */
766 	pei->reader = NULL;
767 
768 	/*
769 	 * If instrumentation options were supplied, allocate space for the data.
770 	 * It only gets partially initialized here; the rest happens during
771 	 * ExecParallelInitializeDSM.
772 	 */
773 	if (estate->es_instrument)
774 	{
775 		Instrumentation *instrument;
776 		int			i;
777 
778 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
779 		instrumentation->instrument_options = estate->es_instrument;
780 		instrumentation->instrument_offset = instrument_offset;
781 		instrumentation->num_workers = nworkers;
782 		instrumentation->num_plan_nodes = e.nnodes;
783 		instrument = GetInstrumentationArray(instrumentation);
784 		for (i = 0; i < nworkers * e.nnodes; ++i)
785 			InstrInit(&instrument[i], estate->es_instrument);
786 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
787 					   instrumentation);
788 		pei->instrumentation = instrumentation;
789 
790 		if (estate->es_jit_flags != PGJIT_NONE)
791 		{
792 			jit_instrumentation = shm_toc_allocate(pcxt->toc,
793 												   jit_instrumentation_len);
794 			jit_instrumentation->num_workers = nworkers;
795 			memset(jit_instrumentation->jit_instr, 0,
796 				   sizeof(JitInstrumentation) * nworkers);
797 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
798 						   jit_instrumentation);
799 			pei->jit_instrumentation = jit_instrumentation;
800 		}
801 	}
802 
803 	/*
804 	 * Create a DSA area that can be used by the leader and all workers.
805 	 * (However, if we failed to create a DSM and are using private memory
806 	 * instead, then skip this.)
807 	 */
808 	if (pcxt->seg != NULL)
809 	{
810 		char	   *area_space;
811 
812 		area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
813 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
814 		pei->area = dsa_create_in_place(area_space, dsa_minsize,
815 										LWTRANCHE_PARALLEL_QUERY_DSA,
816 										pcxt->seg);
817 
818 		/*
819 		 * Serialize parameters, if any, using DSA storage.  We don't dare use
820 		 * the main parallel query DSM for this because we might relaunch
821 		 * workers after the values have changed (and thus the amount of
822 		 * storage required has changed).
823 		 */
824 		if (!bms_is_empty(sendParams))
825 		{
826 			pei->param_exec = SerializeParamExecParams(estate, sendParams,
827 													   pei->area);
828 			fpes->param_exec = pei->param_exec;
829 		}
830 	}
831 
832 	/*
833 	 * Give parallel-aware nodes a chance to initialize their shared data.
834 	 * This also initializes the elements of instrumentation->ps_instrument,
835 	 * if it exists.
836 	 */
837 	d.pcxt = pcxt;
838 	d.instrumentation = instrumentation;
839 	d.nnodes = 0;
840 
841 	/* Install our DSA area while initializing the plan. */
842 	estate->es_query_dsa = pei->area;
843 	ExecParallelInitializeDSM(planstate, &d);
844 	estate->es_query_dsa = NULL;
845 
846 	/*
847 	 * Make sure that the world hasn't shifted under our feet.  This could
848 	 * probably just be an Assert(), but let's be conservative for now.
849 	 */
850 	if (e.nnodes != d.nnodes)
851 		elog(ERROR, "inconsistent count of PlanState nodes");
852 
853 	/* OK, we're ready to rock and roll. */
854 	return pei;
855 }
856 
857 /*
858  * Set up tuple queue readers to read the results of a parallel subplan.
859  *
860  * This is separate from ExecInitParallelPlan() because we can launch the
861  * worker processes and let them start doing something before we do this.
862  */
863 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)864 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
865 {
866 	int			nworkers = pei->pcxt->nworkers_launched;
867 	int			i;
868 
869 	Assert(pei->reader == NULL);
870 
871 	if (nworkers > 0)
872 	{
873 		pei->reader = (TupleQueueReader **)
874 			palloc(nworkers * sizeof(TupleQueueReader *));
875 
876 		for (i = 0; i < nworkers; i++)
877 		{
878 			shm_mq_set_handle(pei->tqueue[i],
879 							  pei->pcxt->worker[i].bgwhandle);
880 			pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
881 		}
882 	}
883 }
884 
885 /*
886  * Re-initialize the parallel executor shared memory state before launching
887  * a fresh batch of workers.
888  */
889 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)890 ExecParallelReinitialize(PlanState *planstate,
891 						 ParallelExecutorInfo *pei,
892 						 Bitmapset *sendParams)
893 {
894 	EState	   *estate = planstate->state;
895 	FixedParallelExecutorState *fpes;
896 
897 	/* Old workers must already be shut down */
898 	Assert(pei->finished);
899 
900 	/*
901 	 * Force any initplan outputs that we're going to pass to workers to be
902 	 * evaluated, if they weren't already (see comments in
903 	 * ExecInitParallelPlan).
904 	 */
905 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
906 
907 	ReinitializeParallelDSM(pei->pcxt);
908 	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
909 	pei->reader = NULL;
910 	pei->finished = false;
911 
912 	fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
913 
914 	/* Free any serialized parameters from the last round. */
915 	if (DsaPointerIsValid(fpes->param_exec))
916 	{
917 		dsa_free(pei->area, fpes->param_exec);
918 		fpes->param_exec = InvalidDsaPointer;
919 	}
920 
921 	/* Serialize current parameter values if required. */
922 	if (!bms_is_empty(sendParams))
923 	{
924 		pei->param_exec = SerializeParamExecParams(estate, sendParams,
925 												   pei->area);
926 		fpes->param_exec = pei->param_exec;
927 	}
928 
929 	/* Traverse plan tree and let each child node reset associated state. */
930 	estate->es_query_dsa = pei->area;
931 	ExecParallelReInitializeDSM(planstate, pei->pcxt);
932 	estate->es_query_dsa = NULL;
933 }
934 
935 /*
936  * Traverse plan tree to reinitialize per-node dynamic shared memory state
937  */
938 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)939 ExecParallelReInitializeDSM(PlanState *planstate,
940 							ParallelContext *pcxt)
941 {
942 	if (planstate == NULL)
943 		return false;
944 
945 	/*
946 	 * Call reinitializers for DSM-using plan nodes.
947 	 */
948 	switch (nodeTag(planstate))
949 	{
950 		case T_SeqScanState:
951 			if (planstate->plan->parallel_aware)
952 				ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
953 										   pcxt);
954 			break;
955 		case T_IndexScanState:
956 			if (planstate->plan->parallel_aware)
957 				ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
958 											 pcxt);
959 			break;
960 		case T_IndexOnlyScanState:
961 			if (planstate->plan->parallel_aware)
962 				ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
963 												 pcxt);
964 			break;
965 		case T_ForeignScanState:
966 			if (planstate->plan->parallel_aware)
967 				ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
968 											   pcxt);
969 			break;
970 		case T_AppendState:
971 			if (planstate->plan->parallel_aware)
972 				ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
973 			break;
974 		case T_CustomScanState:
975 			if (planstate->plan->parallel_aware)
976 				ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
977 											  pcxt);
978 			break;
979 		case T_BitmapHeapScanState:
980 			if (planstate->plan->parallel_aware)
981 				ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
982 											  pcxt);
983 			break;
984 		case T_HashJoinState:
985 			if (planstate->plan->parallel_aware)
986 				ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
987 											pcxt);
988 			break;
989 		case T_HashState:
990 		case T_SortState:
991 		case T_IncrementalSortState:
992 			/* these nodes have DSM state, but no reinitialization is required */
993 			break;
994 
995 		default:
996 			break;
997 	}
998 
999 	return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1000 }
1001 
1002 /*
1003  * Copy instrumentation information about this node and its descendants from
1004  * dynamic shared memory.
1005  */
1006 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1007 ExecParallelRetrieveInstrumentation(PlanState *planstate,
1008 									SharedExecutorInstrumentation *instrumentation)
1009 {
1010 	Instrumentation *instrument;
1011 	int			i;
1012 	int			n;
1013 	int			ibytes;
1014 	int			plan_node_id = planstate->plan->plan_node_id;
1015 	MemoryContext oldcontext;
1016 
1017 	/* Find the instrumentation for this node. */
1018 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1019 		if (instrumentation->plan_node_id[i] == plan_node_id)
1020 			break;
1021 	if (i >= instrumentation->num_plan_nodes)
1022 		elog(ERROR, "plan node %d not found", plan_node_id);
1023 
1024 	/* Accumulate the statistics from all workers. */
1025 	instrument = GetInstrumentationArray(instrumentation);
1026 	instrument += i * instrumentation->num_workers;
1027 	for (n = 0; n < instrumentation->num_workers; ++n)
1028 		InstrAggNode(planstate->instrument, &instrument[n]);
1029 
1030 	/*
1031 	 * Also store the per-worker detail.
1032 	 *
1033 	 * Worker instrumentation should be allocated in the same context as the
1034 	 * regular instrumentation information, which is the per-query context.
1035 	 * Switch into per-query memory context.
1036 	 */
1037 	oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1038 	ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1039 	planstate->worker_instrument =
1040 		palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1041 	MemoryContextSwitchTo(oldcontext);
1042 
1043 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
1044 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1045 
1046 	/* Perform any node-type-specific work that needs to be done. */
1047 	switch (nodeTag(planstate))
1048 	{
1049 		case T_SortState:
1050 			ExecSortRetrieveInstrumentation((SortState *) planstate);
1051 			break;
1052 		case T_IncrementalSortState:
1053 			ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1054 			break;
1055 		case T_HashState:
1056 			ExecHashRetrieveInstrumentation((HashState *) planstate);
1057 			break;
1058 		case T_AggState:
1059 			ExecAggRetrieveInstrumentation((AggState *) planstate);
1060 			break;
1061 		default:
1062 			break;
1063 	}
1064 
1065 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1066 								 instrumentation);
1067 }
1068 
1069 /*
1070  * Add up the workers' JIT instrumentation from dynamic shared memory.
1071  */
1072 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1073 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1074 									   SharedJitInstrumentation *shared_jit)
1075 {
1076 	JitInstrumentation *combined;
1077 	int			ibytes;
1078 
1079 	int			n;
1080 
1081 	/*
1082 	 * Accumulate worker JIT instrumentation into the combined JIT
1083 	 * instrumentation, allocating it if required.
1084 	 */
1085 	if (!planstate->state->es_jit_worker_instr)
1086 		planstate->state->es_jit_worker_instr =
1087 			MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1088 	combined = planstate->state->es_jit_worker_instr;
1089 
1090 	/* Accumulate all the workers' instrumentations. */
1091 	for (n = 0; n < shared_jit->num_workers; ++n)
1092 		InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1093 
1094 	/*
1095 	 * Store the per-worker detail.
1096 	 *
1097 	 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1098 	 * instrumentation in per-query context.
1099 	 */
1100 	ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1101 		+ mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1102 	planstate->worker_jit_instrument =
1103 		MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1104 
1105 	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1106 }
1107 
1108 /*
1109  * Finish parallel execution.  We wait for parallel workers to finish, and
1110  * accumulate their buffer/WAL usage.
1111  */
1112 void
ExecParallelFinish(ParallelExecutorInfo * pei)1113 ExecParallelFinish(ParallelExecutorInfo *pei)
1114 {
1115 	int			nworkers = pei->pcxt->nworkers_launched;
1116 	int			i;
1117 
1118 	/* Make this be a no-op if called twice in a row. */
1119 	if (pei->finished)
1120 		return;
1121 
1122 	/*
1123 	 * Detach from tuple queues ASAP, so that any still-active workers will
1124 	 * notice that no further results are wanted.
1125 	 */
1126 	if (pei->tqueue != NULL)
1127 	{
1128 		for (i = 0; i < nworkers; i++)
1129 			shm_mq_detach(pei->tqueue[i]);
1130 		pfree(pei->tqueue);
1131 		pei->tqueue = NULL;
1132 	}
1133 
1134 	/*
1135 	 * While we're waiting for the workers to finish, let's get rid of the
1136 	 * tuple queue readers.  (Any other local cleanup could be done here too.)
1137 	 */
1138 	if (pei->reader != NULL)
1139 	{
1140 		for (i = 0; i < nworkers; i++)
1141 			DestroyTupleQueueReader(pei->reader[i]);
1142 		pfree(pei->reader);
1143 		pei->reader = NULL;
1144 	}
1145 
1146 	/* Now wait for the workers to finish. */
1147 	WaitForParallelWorkersToFinish(pei->pcxt);
1148 
1149 	/*
1150 	 * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
1151 	 * finish, or we might get incomplete data.)
1152 	 */
1153 	for (i = 0; i < nworkers; i++)
1154 		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1155 
1156 	pei->finished = true;
1157 }
1158 
1159 /*
1160  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1161  * resources still exist after ExecParallelFinish.  We separate these
1162  * routines because someone might want to examine the contents of the DSM
1163  * after ExecParallelFinish and before calling this routine.
1164  */
1165 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1166 ExecParallelCleanup(ParallelExecutorInfo *pei)
1167 {
1168 	/* Accumulate instrumentation, if any. */
1169 	if (pei->instrumentation)
1170 		ExecParallelRetrieveInstrumentation(pei->planstate,
1171 											pei->instrumentation);
1172 
1173 	/* Accumulate JIT instrumentation, if any. */
1174 	if (pei->jit_instrumentation)
1175 		ExecParallelRetrieveJitInstrumentation(pei->planstate,
1176 											   pei->jit_instrumentation);
1177 
1178 	/* Free any serialized parameters. */
1179 	if (DsaPointerIsValid(pei->param_exec))
1180 	{
1181 		dsa_free(pei->area, pei->param_exec);
1182 		pei->param_exec = InvalidDsaPointer;
1183 	}
1184 	if (pei->area != NULL)
1185 	{
1186 		dsa_detach(pei->area);
1187 		pei->area = NULL;
1188 	}
1189 	if (pei->pcxt != NULL)
1190 	{
1191 		DestroyParallelContext(pei->pcxt);
1192 		pei->pcxt = NULL;
1193 	}
1194 	pfree(pei);
1195 }
1196 
1197 /*
1198  * Create a DestReceiver to write tuples we produce to the shm_mq designated
1199  * for that purpose.
1200  */
1201 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1202 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1203 {
1204 	char	   *mqspace;
1205 	shm_mq	   *mq;
1206 
1207 	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1208 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1209 	mq = (shm_mq *) mqspace;
1210 	shm_mq_set_sender(mq, MyProc);
1211 	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1212 }
1213 
1214 /*
1215  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1216  */
1217 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1218 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1219 						 int instrument_options)
1220 {
1221 	char	   *pstmtspace;
1222 	char	   *paramspace;
1223 	PlannedStmt *pstmt;
1224 	ParamListInfo paramLI;
1225 	char	   *queryString;
1226 
1227 	/* Get the query string from shared memory */
1228 	queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1229 
1230 	/* Reconstruct leader-supplied PlannedStmt. */
1231 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1232 	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1233 
1234 	/* Reconstruct ParamListInfo. */
1235 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1236 	paramLI = RestoreParamList(&paramspace);
1237 
1238 	/* Create a QueryDesc for the query. */
1239 	return CreateQueryDesc(pstmt,
1240 						   queryString,
1241 						   GetActiveSnapshot(), InvalidSnapshot,
1242 						   receiver, paramLI, NULL, instrument_options);
1243 }
1244 
1245 /*
1246  * Copy instrumentation information from this node and its descendants into
1247  * dynamic shared memory, so that the parallel leader can retrieve it.
1248  */
1249 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1250 ExecParallelReportInstrumentation(PlanState *planstate,
1251 								  SharedExecutorInstrumentation *instrumentation)
1252 {
1253 	int			i;
1254 	int			plan_node_id = planstate->plan->plan_node_id;
1255 	Instrumentation *instrument;
1256 
1257 	InstrEndLoop(planstate->instrument);
1258 
1259 	/*
1260 	 * If we shuffled the plan_node_id values in ps_instrument into sorted
1261 	 * order, we could use binary search here.  This might matter someday if
1262 	 * we're pushing down sufficiently large plan trees.  For now, do it the
1263 	 * slow, dumb way.
1264 	 */
1265 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1266 		if (instrumentation->plan_node_id[i] == plan_node_id)
1267 			break;
1268 	if (i >= instrumentation->num_plan_nodes)
1269 		elog(ERROR, "plan node %d not found", plan_node_id);
1270 
1271 	/*
1272 	 * Add our statistics to the per-node, per-worker totals.  It's possible
1273 	 * that this could happen more than once if we relaunched workers.
1274 	 */
1275 	instrument = GetInstrumentationArray(instrumentation);
1276 	instrument += i * instrumentation->num_workers;
1277 	Assert(IsParallelWorker());
1278 	Assert(ParallelWorkerNumber < instrumentation->num_workers);
1279 	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1280 
1281 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1282 								 instrumentation);
1283 }
1284 
1285 /*
1286  * Initialize the PlanState and its descendants with the information
1287  * retrieved from shared memory.  This has to be done once the PlanState
1288  * is allocated and initialized by executor; that is, after ExecutorStart().
1289  */
1290 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1291 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1292 {
1293 	if (planstate == NULL)
1294 		return false;
1295 
1296 	switch (nodeTag(planstate))
1297 	{
1298 		case T_SeqScanState:
1299 			if (planstate->plan->parallel_aware)
1300 				ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1301 			break;
1302 		case T_IndexScanState:
1303 			if (planstate->plan->parallel_aware)
1304 				ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1305 											  pwcxt);
1306 			break;
1307 		case T_IndexOnlyScanState:
1308 			if (planstate->plan->parallel_aware)
1309 				ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1310 												  pwcxt);
1311 			break;
1312 		case T_ForeignScanState:
1313 			if (planstate->plan->parallel_aware)
1314 				ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1315 												pwcxt);
1316 			break;
1317 		case T_AppendState:
1318 			if (planstate->plan->parallel_aware)
1319 				ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1320 			break;
1321 		case T_CustomScanState:
1322 			if (planstate->plan->parallel_aware)
1323 				ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1324 											   pwcxt);
1325 			break;
1326 		case T_BitmapHeapScanState:
1327 			if (planstate->plan->parallel_aware)
1328 				ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1329 											   pwcxt);
1330 			break;
1331 		case T_HashJoinState:
1332 			if (planstate->plan->parallel_aware)
1333 				ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1334 											 pwcxt);
1335 			break;
1336 		case T_HashState:
1337 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1338 			ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1339 			break;
1340 		case T_SortState:
1341 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1342 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1343 			break;
1344 		case T_IncrementalSortState:
1345 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1346 			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1347 												pwcxt);
1348 			break;
1349 		case T_AggState:
1350 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1351 			ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1352 			break;
1353 		default:
1354 			break;
1355 	}
1356 
1357 	return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1358 								 pwcxt);
1359 }
1360 
1361 /*
1362  * Main entrypoint for parallel query worker processes.
1363  *
1364  * We reach this function from ParallelWorkerMain, so the setup necessary to
1365  * create a sensible parallel environment has already been done;
1366  * ParallelWorkerMain worries about stuff like the transaction state, combo
1367  * CID mappings, and GUC values, so we don't need to deal with any of that
1368  * here.
1369  *
1370  * Our job is to deal with concerns specific to the executor.  The parallel
1371  * group leader will have stored a serialized PlannedStmt, and it's our job
1372  * to execute that plan and write the resulting tuples to the appropriate
1373  * tuple queue.  Various bits of supporting information that we need in order
1374  * to do this are also stored in the dsm_segment and can be accessed through
1375  * the shm_toc.
1376  */
1377 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1378 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1379 {
1380 	FixedParallelExecutorState *fpes;
1381 	BufferUsage *buffer_usage;
1382 	WalUsage   *wal_usage;
1383 	DestReceiver *receiver;
1384 	QueryDesc  *queryDesc;
1385 	SharedExecutorInstrumentation *instrumentation;
1386 	SharedJitInstrumentation *jit_instrumentation;
1387 	int			instrument_options = 0;
1388 	void	   *area_space;
1389 	dsa_area   *area;
1390 	ParallelWorkerContext pwcxt;
1391 
1392 	/* Get fixed-size state. */
1393 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1394 
1395 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1396 	receiver = ExecParallelGetReceiver(seg, toc);
1397 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1398 	if (instrumentation != NULL)
1399 		instrument_options = instrumentation->instrument_options;
1400 	jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1401 										 true);
1402 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1403 
1404 	/* Setting debug_query_string for individual workers */
1405 	debug_query_string = queryDesc->sourceText;
1406 
1407 	/* Report workers' query for monitoring purposes */
1408 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
1409 
1410 	/* Attach to the dynamic shared memory area. */
1411 	area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1412 	area = dsa_attach_in_place(area_space, seg);
1413 
1414 	/* Start up the executor */
1415 	queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1416 	ExecutorStart(queryDesc, fpes->eflags);
1417 
1418 	/* Special executor initialization steps for parallel workers */
1419 	queryDesc->planstate->state->es_query_dsa = area;
1420 	if (DsaPointerIsValid(fpes->param_exec))
1421 	{
1422 		char	   *paramexec_space;
1423 
1424 		paramexec_space = dsa_get_address(area, fpes->param_exec);
1425 		RestoreParamExecParams(paramexec_space, queryDesc->estate);
1426 
1427 	}
1428 	pwcxt.toc = toc;
1429 	pwcxt.seg = seg;
1430 	ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1431 
1432 	/* Pass down any tuple bound */
1433 	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1434 
1435 	/*
1436 	 * Prepare to track buffer/WAL usage during query execution.
1437 	 *
1438 	 * We do this after starting up the executor to match what happens in the
1439 	 * leader, which also doesn't count buffer accesses and WAL activity that
1440 	 * occur during executor startup.
1441 	 */
1442 	InstrStartParallelQuery();
1443 
1444 	/*
1445 	 * Run the plan.  If we specified a tuple bound, be careful not to demand
1446 	 * more tuples than that.
1447 	 */
1448 	ExecutorRun(queryDesc,
1449 				ForwardScanDirection,
1450 				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1451 				true);
1452 
1453 	/* Shut down the executor */
1454 	ExecutorFinish(queryDesc);
1455 
1456 	/* Report buffer/WAL usage during parallel execution. */
1457 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1458 	wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1459 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1460 						  &wal_usage[ParallelWorkerNumber]);
1461 
1462 	/* Report instrumentation data if any instrumentation options are set. */
1463 	if (instrumentation != NULL)
1464 		ExecParallelReportInstrumentation(queryDesc->planstate,
1465 										  instrumentation);
1466 
1467 	/* Report JIT instrumentation data if any */
1468 	if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1469 	{
1470 		Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1471 		jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1472 			queryDesc->estate->es_jit->instr;
1473 	}
1474 
1475 	/* Must do this after capturing instrumentation. */
1476 	ExecutorEnd(queryDesc);
1477 
1478 	/* Cleanup. */
1479 	dsa_detach(area);
1480 	FreeQueryDesc(queryDesc);
1481 	receiver->rDestroy(receiver);
1482 }
1483