1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *	  Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer/WAL usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *	  src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeAgg.h"
29 #include "executor/nodeAppend.h"
30 #include "executor/nodeBitmapHeapscan.h"
31 #include "executor/nodeCustom.h"
32 #include "executor/nodeForeignscan.h"
33 #include "executor/nodeHash.h"
34 #include "executor/nodeHashjoin.h"
35 #include "executor/nodeIncrementalSort.h"
36 #include "executor/nodeIndexonlyscan.h"
37 #include "executor/nodeIndexscan.h"
38 #include "executor/nodeMemoize.h"
39 #include "executor/nodeSeqscan.h"
40 #include "executor/nodeSort.h"
41 #include "executor/nodeSubplan.h"
42 #include "executor/tqueue.h"
43 #include "jit/jit.h"
44 #include "nodes/nodeFuncs.h"
45 #include "pgstat.h"
46 #include "storage/spin.h"
47 #include "tcop/tcopprot.h"
48 #include "utils/datum.h"
49 #include "utils/dsa.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/snapmgr.h"
53 
54 /*
55  * Magic numbers for parallel executor communication.  We use constants
56  * greater than any 32-bit integer here so that values < 2^32 can be used
57  * by individual parallel nodes to store their own state.
58  */
59 #define PARALLEL_KEY_EXECUTOR_FIXED		UINT64CONST(0xE000000000000001)
60 #define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000002)
61 #define PARALLEL_KEY_PARAMLISTINFO		UINT64CONST(0xE000000000000003)
62 #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000004)
63 #define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000005)
64 #define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000006)
65 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
66 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
67 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
69 
70 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
71 
72 /*
73  * Fixed-size random stuff that we need to pass to parallel workers.
74  */
75 typedef struct FixedParallelExecutorState
76 {
77 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
78 	dsa_pointer param_exec;
79 	int			eflags;
80 	int			jit_flags;
81 } FixedParallelExecutorState;
82 
83 /*
84  * DSM structure for accumulating per-PlanState instrumentation.
85  *
86  * instrument_options: Same meaning here as in instrument.c.
87  *
88  * instrument_offset: Offset, relative to the start of this structure,
89  * of the first Instrumentation object.  This will depend on the length of
90  * the plan_node_id array.
91  *
92  * num_workers: Number of workers.
93  *
94  * num_plan_nodes: Number of plan nodes.
95  *
96  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97  * from parallel workers.  The length of this array is given by num_plan_nodes.
98  */
99 struct SharedExecutorInstrumentation
100 {
101 	int			instrument_options;
102 	int			instrument_offset;
103 	int			num_workers;
104 	int			num_plan_nodes;
105 	int			plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 	/* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 };
108 #define GetInstrumentationArray(sei) \
109 	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 	 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111 
112 /* Context object for ExecParallelEstimate. */
113 typedef struct ExecParallelEstimateContext
114 {
115 	ParallelContext *pcxt;
116 	int			nnodes;
117 } ExecParallelEstimateContext;
118 
119 /* Context object for ExecParallelInitializeDSM. */
120 typedef struct ExecParallelInitializeDSMContext
121 {
122 	ParallelContext *pcxt;
123 	SharedExecutorInstrumentation *instrumentation;
124 	int			nnodes;
125 } ExecParallelInitializeDSMContext;
126 
127 /* Helper functions that run in the parallel leader. */
128 static char *ExecSerializePlan(Plan *plan, EState *estate);
129 static bool ExecParallelEstimate(PlanState *node,
130 								 ExecParallelEstimateContext *e);
131 static bool ExecParallelInitializeDSM(PlanState *node,
132 									  ExecParallelInitializeDSMContext *d);
133 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 													bool reinitialize);
135 static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 										ParallelContext *pcxt);
137 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 												SharedExecutorInstrumentation *instrumentation);
139 
140 /* Helper function that runs in the parallel worker. */
141 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142 
143 /*
144  * Create a serialized representation of the plan to be sent to each worker.
145  */
146 static char *
ExecSerializePlan(Plan * plan,EState * estate)147 ExecSerializePlan(Plan *plan, EState *estate)
148 {
149 	PlannedStmt *pstmt;
150 	ListCell   *lc;
151 
152 	/* We can't scribble on the original plan, so make a copy. */
153 	plan = copyObject(plan);
154 
155 	/*
156 	 * The worker will start its own copy of the executor, and that copy will
157 	 * insert a junk filter if the toplevel node has any resjunk entries. We
158 	 * don't want that to happen, because while resjunk columns shouldn't be
159 	 * sent back to the user, here the tuples are coming back to another
160 	 * backend which may very well need them.  So mutate the target list
161 	 * accordingly.  This is sort of a hack; there might be better ways to do
162 	 * this...
163 	 */
164 	foreach(lc, plan->targetlist)
165 	{
166 		TargetEntry *tle = lfirst_node(TargetEntry, lc);
167 
168 		tle->resjunk = false;
169 	}
170 
171 	/*
172 	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
173 	 * for our purposes, but the worker will need at least a minimal
174 	 * PlannedStmt to start the executor.
175 	 */
176 	pstmt = makeNode(PlannedStmt);
177 	pstmt->commandType = CMD_SELECT;
178 	pstmt->queryId = pgstat_get_my_query_id();
179 	pstmt->hasReturning = false;
180 	pstmt->hasModifyingCTE = false;
181 	pstmt->canSetTag = true;
182 	pstmt->transientPlan = false;
183 	pstmt->dependsOnRole = false;
184 	pstmt->parallelModeNeeded = false;
185 	pstmt->planTree = plan;
186 	pstmt->rtable = estate->es_range_table;
187 	pstmt->resultRelations = NIL;
188 	pstmt->appendRelations = NIL;
189 
190 	/*
191 	 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
192 	 * for unsafe ones (so that the list indexes of the safe ones are
193 	 * preserved).  This positively ensures that the worker won't try to run,
194 	 * or even do ExecInitNode on, an unsafe subplan.  That's important to
195 	 * protect, eg, non-parallel-aware FDWs from getting into trouble.
196 	 */
197 	pstmt->subplans = NIL;
198 	foreach(lc, estate->es_plannedstmt->subplans)
199 	{
200 		Plan	   *subplan = (Plan *) lfirst(lc);
201 
202 		if (subplan && !subplan->parallel_safe)
203 			subplan = NULL;
204 		pstmt->subplans = lappend(pstmt->subplans, subplan);
205 	}
206 
207 	pstmt->rewindPlanIDs = NULL;
208 	pstmt->rowMarks = NIL;
209 	pstmt->relationOids = NIL;
210 	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
211 	pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
212 	pstmt->utilityStmt = NULL;
213 	pstmt->stmt_location = -1;
214 	pstmt->stmt_len = -1;
215 
216 	/* Return serialized copy of our dummy PlannedStmt. */
217 	return nodeToString(pstmt);
218 }
219 
220 /*
221  * Parallel-aware plan nodes (and occasionally others) may need some state
222  * which is shared across all parallel workers.  Before we size the DSM, give
223  * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
224  * &pcxt->estimator.
225  *
226  * While we're at it, count the number of PlanState nodes in the tree, so
227  * we know how many Instrumentation structures we need.
228  */
229 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)230 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
231 {
232 	if (planstate == NULL)
233 		return false;
234 
235 	/* Count this node. */
236 	e->nnodes++;
237 
238 	switch (nodeTag(planstate))
239 	{
240 		case T_SeqScanState:
241 			if (planstate->plan->parallel_aware)
242 				ExecSeqScanEstimate((SeqScanState *) planstate,
243 									e->pcxt);
244 			break;
245 		case T_IndexScanState:
246 			if (planstate->plan->parallel_aware)
247 				ExecIndexScanEstimate((IndexScanState *) planstate,
248 									  e->pcxt);
249 			break;
250 		case T_IndexOnlyScanState:
251 			if (planstate->plan->parallel_aware)
252 				ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
253 										  e->pcxt);
254 			break;
255 		case T_ForeignScanState:
256 			if (planstate->plan->parallel_aware)
257 				ExecForeignScanEstimate((ForeignScanState *) planstate,
258 										e->pcxt);
259 			break;
260 		case T_AppendState:
261 			if (planstate->plan->parallel_aware)
262 				ExecAppendEstimate((AppendState *) planstate,
263 								   e->pcxt);
264 			break;
265 		case T_CustomScanState:
266 			if (planstate->plan->parallel_aware)
267 				ExecCustomScanEstimate((CustomScanState *) planstate,
268 									   e->pcxt);
269 			break;
270 		case T_BitmapHeapScanState:
271 			if (planstate->plan->parallel_aware)
272 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
273 									   e->pcxt);
274 			break;
275 		case T_HashJoinState:
276 			if (planstate->plan->parallel_aware)
277 				ExecHashJoinEstimate((HashJoinState *) planstate,
278 									 e->pcxt);
279 			break;
280 		case T_HashState:
281 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
282 			ExecHashEstimate((HashState *) planstate, e->pcxt);
283 			break;
284 		case T_SortState:
285 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
286 			ExecSortEstimate((SortState *) planstate, e->pcxt);
287 			break;
288 		case T_IncrementalSortState:
289 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
290 			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
291 			break;
292 		case T_AggState:
293 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
294 			ExecAggEstimate((AggState *) planstate, e->pcxt);
295 			break;
296 		case T_MemoizeState:
297 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
298 			ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
299 			break;
300 		default:
301 			break;
302 	}
303 
304 	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
305 }
306 
307 /*
308  * Estimate the amount of space required to serialize the indicated parameters.
309  */
310 static Size
EstimateParamExecSpace(EState * estate,Bitmapset * params)311 EstimateParamExecSpace(EState *estate, Bitmapset *params)
312 {
313 	int			paramid;
314 	Size		sz = sizeof(int);
315 
316 	paramid = -1;
317 	while ((paramid = bms_next_member(params, paramid)) >= 0)
318 	{
319 		Oid			typeOid;
320 		int16		typLen;
321 		bool		typByVal;
322 		ParamExecData *prm;
323 
324 		prm = &(estate->es_param_exec_vals[paramid]);
325 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
326 							   paramid);
327 
328 		sz = add_size(sz, sizeof(int)); /* space for paramid */
329 
330 		/* space for datum/isnull */
331 		if (OidIsValid(typeOid))
332 			get_typlenbyval(typeOid, &typLen, &typByVal);
333 		else
334 		{
335 			/* If no type OID, assume by-value, like copyParamList does. */
336 			typLen = sizeof(Datum);
337 			typByVal = true;
338 		}
339 		sz = add_size(sz,
340 					  datumEstimateSpace(prm->value, prm->isnull,
341 										 typByVal, typLen));
342 	}
343 	return sz;
344 }
345 
346 /*
347  * Serialize specified PARAM_EXEC parameters.
348  *
349  * We write the number of parameters first, as a 4-byte integer, and then
350  * write details for each parameter in turn.  The details for each parameter
351  * consist of a 4-byte paramid (location of param in execution time internal
352  * parameter array) and then the datum as serialized by datumSerialize().
353  */
354 static dsa_pointer
SerializeParamExecParams(EState * estate,Bitmapset * params,dsa_area * area)355 SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
356 {
357 	Size		size;
358 	int			nparams;
359 	int			paramid;
360 	ParamExecData *prm;
361 	dsa_pointer handle;
362 	char	   *start_address;
363 
364 	/* Allocate enough space for the current parameter values. */
365 	size = EstimateParamExecSpace(estate, params);
366 	handle = dsa_allocate(area, size);
367 	start_address = dsa_get_address(area, handle);
368 
369 	/* First write the number of parameters as a 4-byte integer. */
370 	nparams = bms_num_members(params);
371 	memcpy(start_address, &nparams, sizeof(int));
372 	start_address += sizeof(int);
373 
374 	/* Write details for each parameter in turn. */
375 	paramid = -1;
376 	while ((paramid = bms_next_member(params, paramid)) >= 0)
377 	{
378 		Oid			typeOid;
379 		int16		typLen;
380 		bool		typByVal;
381 
382 		prm = &(estate->es_param_exec_vals[paramid]);
383 		typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
384 							   paramid);
385 
386 		/* Write paramid. */
387 		memcpy(start_address, &paramid, sizeof(int));
388 		start_address += sizeof(int);
389 
390 		/* Write datum/isnull */
391 		if (OidIsValid(typeOid))
392 			get_typlenbyval(typeOid, &typLen, &typByVal);
393 		else
394 		{
395 			/* If no type OID, assume by-value, like copyParamList does. */
396 			typLen = sizeof(Datum);
397 			typByVal = true;
398 		}
399 		datumSerialize(prm->value, prm->isnull, typByVal, typLen,
400 					   &start_address);
401 	}
402 
403 	return handle;
404 }
405 
406 /*
407  * Restore specified PARAM_EXEC parameters.
408  */
409 static void
RestoreParamExecParams(char * start_address,EState * estate)410 RestoreParamExecParams(char *start_address, EState *estate)
411 {
412 	int			nparams;
413 	int			i;
414 	int			paramid;
415 
416 	memcpy(&nparams, start_address, sizeof(int));
417 	start_address += sizeof(int);
418 
419 	for (i = 0; i < nparams; i++)
420 	{
421 		ParamExecData *prm;
422 
423 		/* Read paramid */
424 		memcpy(&paramid, start_address, sizeof(int));
425 		start_address += sizeof(int);
426 		prm = &(estate->es_param_exec_vals[paramid]);
427 
428 		/* Read datum/isnull. */
429 		prm->value = datumRestore(&start_address, &prm->isnull);
430 		prm->execPlan = NULL;
431 	}
432 }
433 
434 /*
435  * Initialize the dynamic shared memory segment that will be used to control
436  * parallel execution.
437  */
438 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)439 ExecParallelInitializeDSM(PlanState *planstate,
440 						  ExecParallelInitializeDSMContext *d)
441 {
442 	if (planstate == NULL)
443 		return false;
444 
445 	/* If instrumentation is enabled, initialize slot for this node. */
446 	if (d->instrumentation != NULL)
447 		d->instrumentation->plan_node_id[d->nnodes] =
448 			planstate->plan->plan_node_id;
449 
450 	/* Count this node. */
451 	d->nnodes++;
452 
453 	/*
454 	 * Call initializers for DSM-using plan nodes.
455 	 *
456 	 * Most plan nodes won't do anything here, but plan nodes that allocated
457 	 * DSM may need to initialize shared state in the DSM before parallel
458 	 * workers are launched.  They can allocate the space they previously
459 	 * estimated using shm_toc_allocate, and add the keys they previously
460 	 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
461 	 */
462 	switch (nodeTag(planstate))
463 	{
464 		case T_SeqScanState:
465 			if (planstate->plan->parallel_aware)
466 				ExecSeqScanInitializeDSM((SeqScanState *) planstate,
467 										 d->pcxt);
468 			break;
469 		case T_IndexScanState:
470 			if (planstate->plan->parallel_aware)
471 				ExecIndexScanInitializeDSM((IndexScanState *) planstate,
472 										   d->pcxt);
473 			break;
474 		case T_IndexOnlyScanState:
475 			if (planstate->plan->parallel_aware)
476 				ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
477 											   d->pcxt);
478 			break;
479 		case T_ForeignScanState:
480 			if (planstate->plan->parallel_aware)
481 				ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
482 											 d->pcxt);
483 			break;
484 		case T_AppendState:
485 			if (planstate->plan->parallel_aware)
486 				ExecAppendInitializeDSM((AppendState *) planstate,
487 										d->pcxt);
488 			break;
489 		case T_CustomScanState:
490 			if (planstate->plan->parallel_aware)
491 				ExecCustomScanInitializeDSM((CustomScanState *) planstate,
492 											d->pcxt);
493 			break;
494 		case T_BitmapHeapScanState:
495 			if (planstate->plan->parallel_aware)
496 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
497 											d->pcxt);
498 			break;
499 		case T_HashJoinState:
500 			if (planstate->plan->parallel_aware)
501 				ExecHashJoinInitializeDSM((HashJoinState *) planstate,
502 										  d->pcxt);
503 			break;
504 		case T_HashState:
505 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
506 			ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
507 			break;
508 		case T_SortState:
509 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
510 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
511 			break;
512 		case T_IncrementalSortState:
513 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
514 			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
515 			break;
516 		case T_AggState:
517 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
518 			ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
519 			break;
520 		case T_MemoizeState:
521 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
522 			ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
523 			break;
524 		default:
525 			break;
526 	}
527 
528 	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
529 }
530 
531 /*
532  * It sets up the response queues for backend workers to return tuples
533  * to the main backend and start the workers.
534  */
535 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)536 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
537 {
538 	shm_mq_handle **responseq;
539 	char	   *tqueuespace;
540 	int			i;
541 
542 	/* Skip this if no workers. */
543 	if (pcxt->nworkers == 0)
544 		return NULL;
545 
546 	/* Allocate memory for shared memory queue handles. */
547 	responseq = (shm_mq_handle **)
548 		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
549 
550 	/*
551 	 * If not reinitializing, allocate space from the DSM for the queues;
552 	 * otherwise, find the already allocated space.
553 	 */
554 	if (!reinitialize)
555 		tqueuespace =
556 			shm_toc_allocate(pcxt->toc,
557 							 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
558 									  pcxt->nworkers));
559 	else
560 		tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
561 
562 	/* Create the queues, and become the receiver for each. */
563 	for (i = 0; i < pcxt->nworkers; ++i)
564 	{
565 		shm_mq	   *mq;
566 
567 		mq = shm_mq_create(tqueuespace +
568 						   ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
569 						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
570 
571 		shm_mq_set_receiver(mq, MyProc);
572 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
573 	}
574 
575 	/* Add array of queues to shm_toc, so others can find it. */
576 	if (!reinitialize)
577 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
578 
579 	/* Return array of handles. */
580 	return responseq;
581 }
582 
583 /*
584  * Sets up the required infrastructure for backend workers to perform
585  * execution and return results to the main backend.
586  */
587 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,Bitmapset * sendParams,int nworkers,int64 tuples_needed)588 ExecInitParallelPlan(PlanState *planstate, EState *estate,
589 					 Bitmapset *sendParams, int nworkers,
590 					 int64 tuples_needed)
591 {
592 	ParallelExecutorInfo *pei;
593 	ParallelContext *pcxt;
594 	ExecParallelEstimateContext e;
595 	ExecParallelInitializeDSMContext d;
596 	FixedParallelExecutorState *fpes;
597 	char	   *pstmt_data;
598 	char	   *pstmt_space;
599 	char	   *paramlistinfo_space;
600 	BufferUsage *bufusage_space;
601 	WalUsage   *walusage_space;
602 	SharedExecutorInstrumentation *instrumentation = NULL;
603 	SharedJitInstrumentation *jit_instrumentation = NULL;
604 	int			pstmt_len;
605 	int			paramlistinfo_len;
606 	int			instrumentation_len = 0;
607 	int			jit_instrumentation_len = 0;
608 	int			instrument_offset = 0;
609 	Size		dsa_minsize = dsa_minimum_size();
610 	char	   *query_string;
611 	int			query_len;
612 
613 	/*
614 	 * Force any initplan outputs that we're going to pass to workers to be
615 	 * evaluated, if they weren't already.
616 	 *
617 	 * For simplicity, we use the EState's per-output-tuple ExprContext here.
618 	 * That risks intra-query memory leakage, since we might pass through here
619 	 * many times before that ExprContext gets reset; but ExecSetParamPlan
620 	 * doesn't normally leak any memory in the context (see its comments), so
621 	 * it doesn't seem worth complicating this function's API to pass it a
622 	 * shorter-lived ExprContext.  This might need to change someday.
623 	 */
624 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
625 
626 	/* Allocate object for return value. */
627 	pei = palloc0(sizeof(ParallelExecutorInfo));
628 	pei->finished = false;
629 	pei->planstate = planstate;
630 
631 	/* Fix up and serialize plan to be sent to workers. */
632 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
633 
634 	/* Create a parallel context. */
635 	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
636 	pei->pcxt = pcxt;
637 
638 	/*
639 	 * Before telling the parallel context to create a dynamic shared memory
640 	 * segment, we need to figure out how big it should be.  Estimate space
641 	 * for the various things we need to store.
642 	 */
643 
644 	/* Estimate space for fixed-size state. */
645 	shm_toc_estimate_chunk(&pcxt->estimator,
646 						   sizeof(FixedParallelExecutorState));
647 	shm_toc_estimate_keys(&pcxt->estimator, 1);
648 
649 	/* Estimate space for query text. */
650 	query_len = strlen(estate->es_sourceText);
651 	shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
652 	shm_toc_estimate_keys(&pcxt->estimator, 1);
653 
654 	/* Estimate space for serialized PlannedStmt. */
655 	pstmt_len = strlen(pstmt_data) + 1;
656 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
657 	shm_toc_estimate_keys(&pcxt->estimator, 1);
658 
659 	/* Estimate space for serialized ParamListInfo. */
660 	paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
661 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
662 	shm_toc_estimate_keys(&pcxt->estimator, 1);
663 
664 	/*
665 	 * Estimate space for BufferUsage.
666 	 *
667 	 * If EXPLAIN is not in use and there are no extensions loaded that care,
668 	 * we could skip this.  But we have no way of knowing whether anyone's
669 	 * looking at pgBufferUsage, so do it unconditionally.
670 	 */
671 	shm_toc_estimate_chunk(&pcxt->estimator,
672 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
673 	shm_toc_estimate_keys(&pcxt->estimator, 1);
674 
675 	/*
676 	 * Same thing for WalUsage.
677 	 */
678 	shm_toc_estimate_chunk(&pcxt->estimator,
679 						   mul_size(sizeof(WalUsage), pcxt->nworkers));
680 	shm_toc_estimate_keys(&pcxt->estimator, 1);
681 
682 	/* Estimate space for tuple queues. */
683 	shm_toc_estimate_chunk(&pcxt->estimator,
684 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
685 	shm_toc_estimate_keys(&pcxt->estimator, 1);
686 
687 	/*
688 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
689 	 * count of how many PlanState nodes there are.
690 	 */
691 	e.pcxt = pcxt;
692 	e.nnodes = 0;
693 	ExecParallelEstimate(planstate, &e);
694 
695 	/* Estimate space for instrumentation, if required. */
696 	if (estate->es_instrument)
697 	{
698 		instrumentation_len =
699 			offsetof(SharedExecutorInstrumentation, plan_node_id) +
700 			sizeof(int) * e.nnodes;
701 		instrumentation_len = MAXALIGN(instrumentation_len);
702 		instrument_offset = instrumentation_len;
703 		instrumentation_len +=
704 			mul_size(sizeof(Instrumentation),
705 					 mul_size(e.nnodes, nworkers));
706 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
707 		shm_toc_estimate_keys(&pcxt->estimator, 1);
708 
709 		/* Estimate space for JIT instrumentation, if required. */
710 		if (estate->es_jit_flags != PGJIT_NONE)
711 		{
712 			jit_instrumentation_len =
713 				offsetof(SharedJitInstrumentation, jit_instr) +
714 				sizeof(JitInstrumentation) * nworkers;
715 			shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
716 			shm_toc_estimate_keys(&pcxt->estimator, 1);
717 		}
718 	}
719 
720 	/* Estimate space for DSA area. */
721 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
722 	shm_toc_estimate_keys(&pcxt->estimator, 1);
723 
724 	/* Everyone's had a chance to ask for space, so now create the DSM. */
725 	InitializeParallelDSM(pcxt);
726 
727 	/*
728 	 * OK, now we have a dynamic shared memory segment, and it should be big
729 	 * enough to store all of the data we estimated we would want to put into
730 	 * it, plus whatever general stuff (not specifically executor-related) the
731 	 * ParallelContext itself needs to store there.  None of the space we
732 	 * asked for has been allocated or initialized yet, though, so do that.
733 	 */
734 
735 	/* Store fixed-size state. */
736 	fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
737 	fpes->tuples_needed = tuples_needed;
738 	fpes->param_exec = InvalidDsaPointer;
739 	fpes->eflags = estate->es_top_eflags;
740 	fpes->jit_flags = estate->es_jit_flags;
741 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
742 
743 	/* Store query string */
744 	query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
745 	memcpy(query_string, estate->es_sourceText, query_len + 1);
746 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
747 
748 	/* Store serialized PlannedStmt. */
749 	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
750 	memcpy(pstmt_space, pstmt_data, pstmt_len);
751 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
752 
753 	/* Store serialized ParamListInfo. */
754 	paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
755 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
756 	SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
757 
758 	/* Allocate space for each worker's BufferUsage; no need to initialize. */
759 	bufusage_space = shm_toc_allocate(pcxt->toc,
760 									  mul_size(sizeof(BufferUsage), pcxt->nworkers));
761 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
762 	pei->buffer_usage = bufusage_space;
763 
764 	/* Same for WalUsage. */
765 	walusage_space = shm_toc_allocate(pcxt->toc,
766 									  mul_size(sizeof(WalUsage), pcxt->nworkers));
767 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
768 	pei->wal_usage = walusage_space;
769 
770 	/* Set up the tuple queues that the workers will write into. */
771 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
772 
773 	/* We don't need the TupleQueueReaders yet, though. */
774 	pei->reader = NULL;
775 
776 	/*
777 	 * If instrumentation options were supplied, allocate space for the data.
778 	 * It only gets partially initialized here; the rest happens during
779 	 * ExecParallelInitializeDSM.
780 	 */
781 	if (estate->es_instrument)
782 	{
783 		Instrumentation *instrument;
784 		int			i;
785 
786 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
787 		instrumentation->instrument_options = estate->es_instrument;
788 		instrumentation->instrument_offset = instrument_offset;
789 		instrumentation->num_workers = nworkers;
790 		instrumentation->num_plan_nodes = e.nnodes;
791 		instrument = GetInstrumentationArray(instrumentation);
792 		for (i = 0; i < nworkers * e.nnodes; ++i)
793 			InstrInit(&instrument[i], estate->es_instrument);
794 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
795 					   instrumentation);
796 		pei->instrumentation = instrumentation;
797 
798 		if (estate->es_jit_flags != PGJIT_NONE)
799 		{
800 			jit_instrumentation = shm_toc_allocate(pcxt->toc,
801 												   jit_instrumentation_len);
802 			jit_instrumentation->num_workers = nworkers;
803 			memset(jit_instrumentation->jit_instr, 0,
804 				   sizeof(JitInstrumentation) * nworkers);
805 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
806 						   jit_instrumentation);
807 			pei->jit_instrumentation = jit_instrumentation;
808 		}
809 	}
810 
811 	/*
812 	 * Create a DSA area that can be used by the leader and all workers.
813 	 * (However, if we failed to create a DSM and are using private memory
814 	 * instead, then skip this.)
815 	 */
816 	if (pcxt->seg != NULL)
817 	{
818 		char	   *area_space;
819 
820 		area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
821 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
822 		pei->area = dsa_create_in_place(area_space, dsa_minsize,
823 										LWTRANCHE_PARALLEL_QUERY_DSA,
824 										pcxt->seg);
825 
826 		/*
827 		 * Serialize parameters, if any, using DSA storage.  We don't dare use
828 		 * the main parallel query DSM for this because we might relaunch
829 		 * workers after the values have changed (and thus the amount of
830 		 * storage required has changed).
831 		 */
832 		if (!bms_is_empty(sendParams))
833 		{
834 			pei->param_exec = SerializeParamExecParams(estate, sendParams,
835 													   pei->area);
836 			fpes->param_exec = pei->param_exec;
837 		}
838 	}
839 
840 	/*
841 	 * Give parallel-aware nodes a chance to initialize their shared data.
842 	 * This also initializes the elements of instrumentation->ps_instrument,
843 	 * if it exists.
844 	 */
845 	d.pcxt = pcxt;
846 	d.instrumentation = instrumentation;
847 	d.nnodes = 0;
848 
849 	/* Install our DSA area while initializing the plan. */
850 	estate->es_query_dsa = pei->area;
851 	ExecParallelInitializeDSM(planstate, &d);
852 	estate->es_query_dsa = NULL;
853 
854 	/*
855 	 * Make sure that the world hasn't shifted under our feet.  This could
856 	 * probably just be an Assert(), but let's be conservative for now.
857 	 */
858 	if (e.nnodes != d.nnodes)
859 		elog(ERROR, "inconsistent count of PlanState nodes");
860 
861 	/* OK, we're ready to rock and roll. */
862 	return pei;
863 }
864 
865 /*
866  * Set up tuple queue readers to read the results of a parallel subplan.
867  *
868  * This is separate from ExecInitParallelPlan() because we can launch the
869  * worker processes and let them start doing something before we do this.
870  */
871 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei)872 ExecParallelCreateReaders(ParallelExecutorInfo *pei)
873 {
874 	int			nworkers = pei->pcxt->nworkers_launched;
875 	int			i;
876 
877 	Assert(pei->reader == NULL);
878 
879 	if (nworkers > 0)
880 	{
881 		pei->reader = (TupleQueueReader **)
882 			palloc(nworkers * sizeof(TupleQueueReader *));
883 
884 		for (i = 0; i < nworkers; i++)
885 		{
886 			shm_mq_set_handle(pei->tqueue[i],
887 							  pei->pcxt->worker[i].bgwhandle);
888 			pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
889 		}
890 	}
891 }
892 
893 /*
894  * Re-initialize the parallel executor shared memory state before launching
895  * a fresh batch of workers.
896  */
897 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei,Bitmapset * sendParams)898 ExecParallelReinitialize(PlanState *planstate,
899 						 ParallelExecutorInfo *pei,
900 						 Bitmapset *sendParams)
901 {
902 	EState	   *estate = planstate->state;
903 	FixedParallelExecutorState *fpes;
904 
905 	/* Old workers must already be shut down */
906 	Assert(pei->finished);
907 
908 	/*
909 	 * Force any initplan outputs that we're going to pass to workers to be
910 	 * evaluated, if they weren't already (see comments in
911 	 * ExecInitParallelPlan).
912 	 */
913 	ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
914 
915 	ReinitializeParallelDSM(pei->pcxt);
916 	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
917 	pei->reader = NULL;
918 	pei->finished = false;
919 
920 	fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
921 
922 	/* Free any serialized parameters from the last round. */
923 	if (DsaPointerIsValid(fpes->param_exec))
924 	{
925 		dsa_free(pei->area, fpes->param_exec);
926 		fpes->param_exec = InvalidDsaPointer;
927 	}
928 
929 	/* Serialize current parameter values if required. */
930 	if (!bms_is_empty(sendParams))
931 	{
932 		pei->param_exec = SerializeParamExecParams(estate, sendParams,
933 												   pei->area);
934 		fpes->param_exec = pei->param_exec;
935 	}
936 
937 	/* Traverse plan tree and let each child node reset associated state. */
938 	estate->es_query_dsa = pei->area;
939 	ExecParallelReInitializeDSM(planstate, pei->pcxt);
940 	estate->es_query_dsa = NULL;
941 }
942 
943 /*
944  * Traverse plan tree to reinitialize per-node dynamic shared memory state
945  */
946 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)947 ExecParallelReInitializeDSM(PlanState *planstate,
948 							ParallelContext *pcxt)
949 {
950 	if (planstate == NULL)
951 		return false;
952 
953 	/*
954 	 * Call reinitializers for DSM-using plan nodes.
955 	 */
956 	switch (nodeTag(planstate))
957 	{
958 		case T_SeqScanState:
959 			if (planstate->plan->parallel_aware)
960 				ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
961 										   pcxt);
962 			break;
963 		case T_IndexScanState:
964 			if (planstate->plan->parallel_aware)
965 				ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
966 											 pcxt);
967 			break;
968 		case T_IndexOnlyScanState:
969 			if (planstate->plan->parallel_aware)
970 				ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
971 												 pcxt);
972 			break;
973 		case T_ForeignScanState:
974 			if (planstate->plan->parallel_aware)
975 				ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
976 											   pcxt);
977 			break;
978 		case T_AppendState:
979 			if (planstate->plan->parallel_aware)
980 				ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
981 			break;
982 		case T_CustomScanState:
983 			if (planstate->plan->parallel_aware)
984 				ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
985 											  pcxt);
986 			break;
987 		case T_BitmapHeapScanState:
988 			if (planstate->plan->parallel_aware)
989 				ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
990 											  pcxt);
991 			break;
992 		case T_HashJoinState:
993 			if (planstate->plan->parallel_aware)
994 				ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
995 											pcxt);
996 			break;
997 		case T_HashState:
998 		case T_SortState:
999 		case T_IncrementalSortState:
1000 		case T_MemoizeState:
1001 			/* these nodes have DSM state, but no reinitialization is required */
1002 			break;
1003 
1004 		default:
1005 			break;
1006 	}
1007 
1008 	return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1009 }
1010 
1011 /*
1012  * Copy instrumentation information about this node and its descendants from
1013  * dynamic shared memory.
1014  */
1015 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1016 ExecParallelRetrieveInstrumentation(PlanState *planstate,
1017 									SharedExecutorInstrumentation *instrumentation)
1018 {
1019 	Instrumentation *instrument;
1020 	int			i;
1021 	int			n;
1022 	int			ibytes;
1023 	int			plan_node_id = planstate->plan->plan_node_id;
1024 	MemoryContext oldcontext;
1025 
1026 	/* Find the instrumentation for this node. */
1027 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1028 		if (instrumentation->plan_node_id[i] == plan_node_id)
1029 			break;
1030 	if (i >= instrumentation->num_plan_nodes)
1031 		elog(ERROR, "plan node %d not found", plan_node_id);
1032 
1033 	/* Accumulate the statistics from all workers. */
1034 	instrument = GetInstrumentationArray(instrumentation);
1035 	instrument += i * instrumentation->num_workers;
1036 	for (n = 0; n < instrumentation->num_workers; ++n)
1037 		InstrAggNode(planstate->instrument, &instrument[n]);
1038 
1039 	/*
1040 	 * Also store the per-worker detail.
1041 	 *
1042 	 * Worker instrumentation should be allocated in the same context as the
1043 	 * regular instrumentation information, which is the per-query context.
1044 	 * Switch into per-query memory context.
1045 	 */
1046 	oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1047 	ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1048 	planstate->worker_instrument =
1049 		palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1050 	MemoryContextSwitchTo(oldcontext);
1051 
1052 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
1053 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1054 
1055 	/* Perform any node-type-specific work that needs to be done. */
1056 	switch (nodeTag(planstate))
1057 	{
1058 		case T_SortState:
1059 			ExecSortRetrieveInstrumentation((SortState *) planstate);
1060 			break;
1061 		case T_IncrementalSortState:
1062 			ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1063 			break;
1064 		case T_HashState:
1065 			ExecHashRetrieveInstrumentation((HashState *) planstate);
1066 			break;
1067 		case T_AggState:
1068 			ExecAggRetrieveInstrumentation((AggState *) planstate);
1069 			break;
1070 		case T_MemoizeState:
1071 			ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1072 			break;
1073 		default:
1074 			break;
1075 	}
1076 
1077 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1078 								 instrumentation);
1079 }
1080 
1081 /*
1082  * Add up the workers' JIT instrumentation from dynamic shared memory.
1083  */
1084 static void
ExecParallelRetrieveJitInstrumentation(PlanState * planstate,SharedJitInstrumentation * shared_jit)1085 ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1086 									   SharedJitInstrumentation *shared_jit)
1087 {
1088 	JitInstrumentation *combined;
1089 	int			ibytes;
1090 
1091 	int			n;
1092 
1093 	/*
1094 	 * Accumulate worker JIT instrumentation into the combined JIT
1095 	 * instrumentation, allocating it if required.
1096 	 */
1097 	if (!planstate->state->es_jit_worker_instr)
1098 		planstate->state->es_jit_worker_instr =
1099 			MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
1100 	combined = planstate->state->es_jit_worker_instr;
1101 
1102 	/* Accumulate all the workers' instrumentations. */
1103 	for (n = 0; n < shared_jit->num_workers; ++n)
1104 		InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1105 
1106 	/*
1107 	 * Store the per-worker detail.
1108 	 *
1109 	 * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1110 	 * instrumentation in per-query context.
1111 	 */
1112 	ibytes = offsetof(SharedJitInstrumentation, jit_instr)
1113 		+ mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
1114 	planstate->worker_jit_instrument =
1115 		MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1116 
1117 	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1118 }
1119 
1120 /*
1121  * Finish parallel execution.  We wait for parallel workers to finish, and
1122  * accumulate their buffer/WAL usage.
1123  */
1124 void
ExecParallelFinish(ParallelExecutorInfo * pei)1125 ExecParallelFinish(ParallelExecutorInfo *pei)
1126 {
1127 	int			nworkers = pei->pcxt->nworkers_launched;
1128 	int			i;
1129 
1130 	/* Make this be a no-op if called twice in a row. */
1131 	if (pei->finished)
1132 		return;
1133 
1134 	/*
1135 	 * Detach from tuple queues ASAP, so that any still-active workers will
1136 	 * notice that no further results are wanted.
1137 	 */
1138 	if (pei->tqueue != NULL)
1139 	{
1140 		for (i = 0; i < nworkers; i++)
1141 			shm_mq_detach(pei->tqueue[i]);
1142 		pfree(pei->tqueue);
1143 		pei->tqueue = NULL;
1144 	}
1145 
1146 	/*
1147 	 * While we're waiting for the workers to finish, let's get rid of the
1148 	 * tuple queue readers.  (Any other local cleanup could be done here too.)
1149 	 */
1150 	if (pei->reader != NULL)
1151 	{
1152 		for (i = 0; i < nworkers; i++)
1153 			DestroyTupleQueueReader(pei->reader[i]);
1154 		pfree(pei->reader);
1155 		pei->reader = NULL;
1156 	}
1157 
1158 	/* Now wait for the workers to finish. */
1159 	WaitForParallelWorkersToFinish(pei->pcxt);
1160 
1161 	/*
1162 	 * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
1163 	 * finish, or we might get incomplete data.)
1164 	 */
1165 	for (i = 0; i < nworkers; i++)
1166 		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1167 
1168 	pei->finished = true;
1169 }
1170 
1171 /*
1172  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1173  * resources still exist after ExecParallelFinish.  We separate these
1174  * routines because someone might want to examine the contents of the DSM
1175  * after ExecParallelFinish and before calling this routine.
1176  */
1177 void
ExecParallelCleanup(ParallelExecutorInfo * pei)1178 ExecParallelCleanup(ParallelExecutorInfo *pei)
1179 {
1180 	/* Accumulate instrumentation, if any. */
1181 	if (pei->instrumentation)
1182 		ExecParallelRetrieveInstrumentation(pei->planstate,
1183 											pei->instrumentation);
1184 
1185 	/* Accumulate JIT instrumentation, if any. */
1186 	if (pei->jit_instrumentation)
1187 		ExecParallelRetrieveJitInstrumentation(pei->planstate,
1188 											   pei->jit_instrumentation);
1189 
1190 	/* Free any serialized parameters. */
1191 	if (DsaPointerIsValid(pei->param_exec))
1192 	{
1193 		dsa_free(pei->area, pei->param_exec);
1194 		pei->param_exec = InvalidDsaPointer;
1195 	}
1196 	if (pei->area != NULL)
1197 	{
1198 		dsa_detach(pei->area);
1199 		pei->area = NULL;
1200 	}
1201 	if (pei->pcxt != NULL)
1202 	{
1203 		DestroyParallelContext(pei->pcxt);
1204 		pei->pcxt = NULL;
1205 	}
1206 	pfree(pei);
1207 }
1208 
1209 /*
1210  * Create a DestReceiver to write tuples we produce to the shm_mq designated
1211  * for that purpose.
1212  */
1213 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)1214 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1215 {
1216 	char	   *mqspace;
1217 	shm_mq	   *mq;
1218 
1219 	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
1220 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1221 	mq = (shm_mq *) mqspace;
1222 	shm_mq_set_sender(mq, MyProc);
1223 	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1224 }
1225 
1226 /*
1227  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1228  */
1229 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)1230 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1231 						 int instrument_options)
1232 {
1233 	char	   *pstmtspace;
1234 	char	   *paramspace;
1235 	PlannedStmt *pstmt;
1236 	ParamListInfo paramLI;
1237 	char	   *queryString;
1238 
1239 	/* Get the query string from shared memory */
1240 	queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1241 
1242 	/* Reconstruct leader-supplied PlannedStmt. */
1243 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
1244 	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1245 
1246 	/* Reconstruct ParamListInfo. */
1247 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
1248 	paramLI = RestoreParamList(&paramspace);
1249 
1250 	/* Create a QueryDesc for the query. */
1251 	return CreateQueryDesc(pstmt,
1252 						   queryString,
1253 						   GetActiveSnapshot(), InvalidSnapshot,
1254 						   receiver, paramLI, NULL, instrument_options);
1255 }
1256 
1257 /*
1258  * Copy instrumentation information from this node and its descendants into
1259  * dynamic shared memory, so that the parallel leader can retrieve it.
1260  */
1261 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)1262 ExecParallelReportInstrumentation(PlanState *planstate,
1263 								  SharedExecutorInstrumentation *instrumentation)
1264 {
1265 	int			i;
1266 	int			plan_node_id = planstate->plan->plan_node_id;
1267 	Instrumentation *instrument;
1268 
1269 	InstrEndLoop(planstate->instrument);
1270 
1271 	/*
1272 	 * If we shuffled the plan_node_id values in ps_instrument into sorted
1273 	 * order, we could use binary search here.  This might matter someday if
1274 	 * we're pushing down sufficiently large plan trees.  For now, do it the
1275 	 * slow, dumb way.
1276 	 */
1277 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1278 		if (instrumentation->plan_node_id[i] == plan_node_id)
1279 			break;
1280 	if (i >= instrumentation->num_plan_nodes)
1281 		elog(ERROR, "plan node %d not found", plan_node_id);
1282 
1283 	/*
1284 	 * Add our statistics to the per-node, per-worker totals.  It's possible
1285 	 * that this could happen more than once if we relaunched workers.
1286 	 */
1287 	instrument = GetInstrumentationArray(instrumentation);
1288 	instrument += i * instrumentation->num_workers;
1289 	Assert(IsParallelWorker());
1290 	Assert(ParallelWorkerNumber < instrumentation->num_workers);
1291 	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1292 
1293 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1294 								 instrumentation);
1295 }
1296 
1297 /*
1298  * Initialize the PlanState and its descendants with the information
1299  * retrieved from shared memory.  This has to be done once the PlanState
1300  * is allocated and initialized by executor; that is, after ExecutorStart().
1301  */
1302 static bool
ExecParallelInitializeWorker(PlanState * planstate,ParallelWorkerContext * pwcxt)1303 ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1304 {
1305 	if (planstate == NULL)
1306 		return false;
1307 
1308 	switch (nodeTag(planstate))
1309 	{
1310 		case T_SeqScanState:
1311 			if (planstate->plan->parallel_aware)
1312 				ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1313 			break;
1314 		case T_IndexScanState:
1315 			if (planstate->plan->parallel_aware)
1316 				ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1317 											  pwcxt);
1318 			break;
1319 		case T_IndexOnlyScanState:
1320 			if (planstate->plan->parallel_aware)
1321 				ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1322 												  pwcxt);
1323 			break;
1324 		case T_ForeignScanState:
1325 			if (planstate->plan->parallel_aware)
1326 				ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1327 												pwcxt);
1328 			break;
1329 		case T_AppendState:
1330 			if (planstate->plan->parallel_aware)
1331 				ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1332 			break;
1333 		case T_CustomScanState:
1334 			if (planstate->plan->parallel_aware)
1335 				ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1336 											   pwcxt);
1337 			break;
1338 		case T_BitmapHeapScanState:
1339 			if (planstate->plan->parallel_aware)
1340 				ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1341 											   pwcxt);
1342 			break;
1343 		case T_HashJoinState:
1344 			if (planstate->plan->parallel_aware)
1345 				ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1346 											 pwcxt);
1347 			break;
1348 		case T_HashState:
1349 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1350 			ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1351 			break;
1352 		case T_SortState:
1353 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1354 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
1355 			break;
1356 		case T_IncrementalSortState:
1357 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1358 			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1359 												pwcxt);
1360 			break;
1361 		case T_AggState:
1362 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1363 			ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1364 			break;
1365 		case T_MemoizeState:
1366 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
1367 			ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1368 			break;
1369 		default:
1370 			break;
1371 	}
1372 
1373 	return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1374 								 pwcxt);
1375 }
1376 
1377 /*
1378  * Main entrypoint for parallel query worker processes.
1379  *
1380  * We reach this function from ParallelWorkerMain, so the setup necessary to
1381  * create a sensible parallel environment has already been done;
1382  * ParallelWorkerMain worries about stuff like the transaction state, combo
1383  * CID mappings, and GUC values, so we don't need to deal with any of that
1384  * here.
1385  *
1386  * Our job is to deal with concerns specific to the executor.  The parallel
1387  * group leader will have stored a serialized PlannedStmt, and it's our job
1388  * to execute that plan and write the resulting tuples to the appropriate
1389  * tuple queue.  Various bits of supporting information that we need in order
1390  * to do this are also stored in the dsm_segment and can be accessed through
1391  * the shm_toc.
1392  */
1393 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)1394 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1395 {
1396 	FixedParallelExecutorState *fpes;
1397 	BufferUsage *buffer_usage;
1398 	WalUsage   *wal_usage;
1399 	DestReceiver *receiver;
1400 	QueryDesc  *queryDesc;
1401 	SharedExecutorInstrumentation *instrumentation;
1402 	SharedJitInstrumentation *jit_instrumentation;
1403 	int			instrument_options = 0;
1404 	void	   *area_space;
1405 	dsa_area   *area;
1406 	ParallelWorkerContext pwcxt;
1407 
1408 	/* Get fixed-size state. */
1409 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1410 
1411 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
1412 	receiver = ExecParallelGetReceiver(seg, toc);
1413 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
1414 	if (instrumentation != NULL)
1415 		instrument_options = instrumentation->instrument_options;
1416 	jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1417 										 true);
1418 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1419 
1420 	/* Setting debug_query_string for individual workers */
1421 	debug_query_string = queryDesc->sourceText;
1422 
1423 	/* Report workers' query and queryId for monitoring purposes */
1424 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
1425 
1426 	/* Attach to the dynamic shared memory area. */
1427 	area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
1428 	area = dsa_attach_in_place(area_space, seg);
1429 
1430 	/* Start up the executor */
1431 	queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
1432 	ExecutorStart(queryDesc, fpes->eflags);
1433 
1434 	/* Special executor initialization steps for parallel workers */
1435 	queryDesc->planstate->state->es_query_dsa = area;
1436 	if (DsaPointerIsValid(fpes->param_exec))
1437 	{
1438 		char	   *paramexec_space;
1439 
1440 		paramexec_space = dsa_get_address(area, fpes->param_exec);
1441 		RestoreParamExecParams(paramexec_space, queryDesc->estate);
1442 
1443 	}
1444 	pwcxt.toc = toc;
1445 	pwcxt.seg = seg;
1446 	ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1447 
1448 	/* Pass down any tuple bound */
1449 	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1450 
1451 	/*
1452 	 * Prepare to track buffer/WAL usage during query execution.
1453 	 *
1454 	 * We do this after starting up the executor to match what happens in the
1455 	 * leader, which also doesn't count buffer accesses and WAL activity that
1456 	 * occur during executor startup.
1457 	 */
1458 	InstrStartParallelQuery();
1459 
1460 	/*
1461 	 * Run the plan.  If we specified a tuple bound, be careful not to demand
1462 	 * more tuples than that.
1463 	 */
1464 	ExecutorRun(queryDesc,
1465 				ForwardScanDirection,
1466 				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
1467 				true);
1468 
1469 	/* Shut down the executor */
1470 	ExecutorFinish(queryDesc);
1471 
1472 	/* Report buffer/WAL usage during parallel execution. */
1473 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1474 	wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1475 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1476 						  &wal_usage[ParallelWorkerNumber]);
1477 
1478 	/* Report instrumentation data if any instrumentation options are set. */
1479 	if (instrumentation != NULL)
1480 		ExecParallelReportInstrumentation(queryDesc->planstate,
1481 										  instrumentation);
1482 
1483 	/* Report JIT instrumentation data if any */
1484 	if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1485 	{
1486 		Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1487 		jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1488 			queryDesc->estate->es_jit->instr;
1489 	}
1490 
1491 	/* Must do this after capturing instrumentation. */
1492 	ExecutorEnd(queryDesc);
1493 
1494 	/* Cleanup. */
1495 	dsa_detach(area);
1496 	FreeQueryDesc(queryDesc);
1497 	receiver->rDestroy(receiver);
1498 }
1499