1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *	  Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *	  src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeBitmapHeapscan.h"
29 #include "executor/nodeCustom.h"
30 #include "executor/nodeForeignscan.h"
31 #include "executor/nodeSeqscan.h"
32 #include "executor/nodeIndexscan.h"
33 #include "executor/nodeIndexonlyscan.h"
34 #include "executor/tqueue.h"
35 #include "nodes/nodeFuncs.h"
36 #include "optimizer/planmain.h"
37 #include "optimizer/planner.h"
38 #include "storage/spin.h"
39 #include "tcop/tcopprot.h"
40 #include "utils/dsa.h"
41 #include "utils/memutils.h"
42 #include "utils/snapmgr.h"
43 #include "pgstat.h"
44 
45 /*
46  * Magic numbers for parallel executor communication.  We use constants
47  * greater than any 32-bit integer here so that values < 2^32 can be used
48  * by individual parallel nodes to store their own state.
49  */
50 #define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000001)
51 #define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000002)
52 #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000003)
53 #define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000004)
54 #define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000005)
55 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000006)
56 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000007)
57 
58 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
59 
60 /*
61  * DSM structure for accumulating per-PlanState instrumentation.
62  *
63  * instrument_options: Same meaning here as in instrument.c.
64  *
65  * instrument_offset: Offset, relative to the start of this structure,
66  * of the first Instrumentation object.  This will depend on the length of
67  * the plan_node_id array.
68  *
69  * num_workers: Number of workers.
70  *
71  * num_plan_nodes: Number of plan nodes.
72  *
73  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
74  * from parallel workers.  The length of this array is given by num_plan_nodes.
75  */
76 struct SharedExecutorInstrumentation
77 {
78 	int			instrument_options;
79 	int			instrument_offset;
80 	int			num_workers;
81 	int			num_plan_nodes;
82 	int			plan_node_id[FLEXIBLE_ARRAY_MEMBER];
83 	/* array of num_plan_nodes * num_workers Instrumentation objects follows */
84 };
85 #define GetInstrumentationArray(sei) \
86 	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
87 	 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
88 
89 /* Context object for ExecParallelEstimate. */
90 typedef struct ExecParallelEstimateContext
91 {
92 	ParallelContext *pcxt;
93 	int			nnodes;
94 } ExecParallelEstimateContext;
95 
96 /* Context object for ExecParallelInitializeDSM. */
97 typedef struct ExecParallelInitializeDSMContext
98 {
99 	ParallelContext *pcxt;
100 	SharedExecutorInstrumentation *instrumentation;
101 	int			nnodes;
102 } ExecParallelInitializeDSMContext;
103 
104 /* Helper functions that run in the parallel leader. */
105 static char *ExecSerializePlan(Plan *plan, EState *estate);
106 static bool ExecParallelEstimate(PlanState *node,
107 					 ExecParallelEstimateContext *e);
108 static bool ExecParallelInitializeDSM(PlanState *node,
109 						  ExecParallelInitializeDSMContext *d);
110 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
111 							 bool reinitialize);
112 static bool ExecParallelReInitializeDSM(PlanState *planstate,
113 							ParallelContext *pcxt);
114 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
115 									SharedExecutorInstrumentation *instrumentation);
116 
117 /* Helper function that runs in the parallel worker. */
118 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
119 
120 /*
121  * Create a serialized representation of the plan to be sent to each worker.
122  */
123 static char *
ExecSerializePlan(Plan * plan,EState * estate)124 ExecSerializePlan(Plan *plan, EState *estate)
125 {
126 	PlannedStmt *pstmt;
127 	ListCell   *lc;
128 
129 	/* We can't scribble on the original plan, so make a copy. */
130 	plan = copyObject(plan);
131 
132 	/*
133 	 * The worker will start its own copy of the executor, and that copy will
134 	 * insert a junk filter if the toplevel node has any resjunk entries. We
135 	 * don't want that to happen, because while resjunk columns shouldn't be
136 	 * sent back to the user, here the tuples are coming back to another
137 	 * backend which may very well need them.  So mutate the target list
138 	 * accordingly.  This is sort of a hack; there might be better ways to do
139 	 * this...
140 	 */
141 	foreach(lc, plan->targetlist)
142 	{
143 		TargetEntry *tle = lfirst_node(TargetEntry, lc);
144 
145 		tle->resjunk = false;
146 	}
147 
148 	/*
149 	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
150 	 * for our purposes, but the worker will need at least a minimal
151 	 * PlannedStmt to start the executor.
152 	 */
153 	pstmt = makeNode(PlannedStmt);
154 	pstmt->commandType = CMD_SELECT;
155 	pstmt->queryId = 0;
156 	pstmt->hasReturning = false;
157 	pstmt->hasModifyingCTE = false;
158 	pstmt->canSetTag = true;
159 	pstmt->transientPlan = false;
160 	pstmt->dependsOnRole = false;
161 	pstmt->parallelModeNeeded = false;
162 	pstmt->planTree = plan;
163 	pstmt->rtable = estate->es_range_table;
164 	pstmt->resultRelations = NIL;
165 	pstmt->nonleafResultRelations = NIL;
166 
167 	/*
168 	 * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
169 	 * for unsafe ones (so that the list indexes of the safe ones are
170 	 * preserved).  This positively ensures that the worker won't try to run,
171 	 * or even do ExecInitNode on, an unsafe subplan.  That's important to
172 	 * protect, eg, non-parallel-aware FDWs from getting into trouble.
173 	 */
174 	pstmt->subplans = NIL;
175 	foreach(lc, estate->es_plannedstmt->subplans)
176 	{
177 		Plan	   *subplan = (Plan *) lfirst(lc);
178 
179 		if (subplan && !subplan->parallel_safe)
180 			subplan = NULL;
181 		pstmt->subplans = lappend(pstmt->subplans, subplan);
182 	}
183 
184 	pstmt->rewindPlanIDs = NULL;
185 	pstmt->rowMarks = NIL;
186 	pstmt->relationOids = NIL;
187 	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
188 	pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
189 	pstmt->utilityStmt = NULL;
190 	pstmt->stmt_location = -1;
191 	pstmt->stmt_len = -1;
192 
193 	/* Return serialized copy of our dummy PlannedStmt. */
194 	return nodeToString(pstmt);
195 }
196 
197 /*
198  * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
199  * may need some state which is shared across all parallel workers.  Before
200  * we size the DSM, give them a chance to call shm_toc_estimate_chunk or
201  * shm_toc_estimate_keys on &pcxt->estimator.
202  *
203  * While we're at it, count the number of PlanState nodes in the tree, so
204  * we know how many Instrumentation structures we need.
205  */
206 static bool
ExecParallelEstimate(PlanState * planstate,ExecParallelEstimateContext * e)207 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
208 {
209 	if (planstate == NULL)
210 		return false;
211 
212 	/* Count this node. */
213 	e->nnodes++;
214 
215 	/* Call estimators for parallel-aware nodes. */
216 	if (planstate->plan->parallel_aware)
217 	{
218 		switch (nodeTag(planstate))
219 		{
220 			case T_SeqScanState:
221 				ExecSeqScanEstimate((SeqScanState *) planstate,
222 									e->pcxt);
223 				break;
224 			case T_IndexScanState:
225 				ExecIndexScanEstimate((IndexScanState *) planstate,
226 									  e->pcxt);
227 				break;
228 			case T_IndexOnlyScanState:
229 				ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
230 										  e->pcxt);
231 				break;
232 			case T_ForeignScanState:
233 				ExecForeignScanEstimate((ForeignScanState *) planstate,
234 										e->pcxt);
235 				break;
236 			case T_CustomScanState:
237 				ExecCustomScanEstimate((CustomScanState *) planstate,
238 									   e->pcxt);
239 				break;
240 			case T_BitmapHeapScanState:
241 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
242 									   e->pcxt);
243 				break;
244 			default:
245 				break;
246 		}
247 	}
248 
249 	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
250 }
251 
252 /*
253  * Initialize the dynamic shared memory segment that will be used to control
254  * parallel execution.
255  */
256 static bool
ExecParallelInitializeDSM(PlanState * planstate,ExecParallelInitializeDSMContext * d)257 ExecParallelInitializeDSM(PlanState *planstate,
258 						  ExecParallelInitializeDSMContext *d)
259 {
260 	if (planstate == NULL)
261 		return false;
262 
263 	/* If instrumentation is enabled, initialize slot for this node. */
264 	if (d->instrumentation != NULL)
265 		d->instrumentation->plan_node_id[d->nnodes] =
266 			planstate->plan->plan_node_id;
267 
268 	/* Count this node. */
269 	d->nnodes++;
270 
271 	/*
272 	 * Call initializers for parallel-aware plan nodes.
273 	 *
274 	 * Ordinary plan nodes won't do anything here, but parallel-aware plan
275 	 * nodes may need to initialize shared state in the DSM before parallel
276 	 * workers are available.  They can allocate the space they previously
277 	 * estimated using shm_toc_allocate, and add the keys they previously
278 	 * estimated using shm_toc_insert, in each case targeting pcxt->toc.
279 	 */
280 	if (planstate->plan->parallel_aware)
281 	{
282 		switch (nodeTag(planstate))
283 		{
284 			case T_SeqScanState:
285 				ExecSeqScanInitializeDSM((SeqScanState *) planstate,
286 										 d->pcxt);
287 				break;
288 			case T_IndexScanState:
289 				ExecIndexScanInitializeDSM((IndexScanState *) planstate,
290 										   d->pcxt);
291 				break;
292 			case T_IndexOnlyScanState:
293 				ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
294 											   d->pcxt);
295 				break;
296 			case T_ForeignScanState:
297 				ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
298 											 d->pcxt);
299 				break;
300 			case T_CustomScanState:
301 				ExecCustomScanInitializeDSM((CustomScanState *) planstate,
302 											d->pcxt);
303 				break;
304 			case T_BitmapHeapScanState:
305 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
306 											d->pcxt);
307 				break;
308 
309 			default:
310 				break;
311 		}
312 	}
313 
314 	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
315 }
316 
317 /*
318  * It sets up the response queues for backend workers to return tuples
319  * to the main backend and start the workers.
320  */
321 static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext * pcxt,bool reinitialize)322 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
323 {
324 	shm_mq_handle **responseq;
325 	char	   *tqueuespace;
326 	int			i;
327 
328 	/* Skip this if no workers. */
329 	if (pcxt->nworkers == 0)
330 		return NULL;
331 
332 	/* Allocate memory for shared memory queue handles. */
333 	responseq = (shm_mq_handle **)
334 		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
335 
336 	/*
337 	 * If not reinitializing, allocate space from the DSM for the queues;
338 	 * otherwise, find the already allocated space.
339 	 */
340 	if (!reinitialize)
341 		tqueuespace =
342 			shm_toc_allocate(pcxt->toc,
343 							 mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
344 									  pcxt->nworkers));
345 	else
346 		tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
347 
348 	/* Create the queues, and become the receiver for each. */
349 	for (i = 0; i < pcxt->nworkers; ++i)
350 	{
351 		shm_mq	   *mq;
352 
353 		mq = shm_mq_create(tqueuespace +
354 						   ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
355 						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
356 
357 		shm_mq_set_receiver(mq, MyProc);
358 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
359 	}
360 
361 	/* Add array of queues to shm_toc, so others can find it. */
362 	if (!reinitialize)
363 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
364 
365 	/* Return array of handles. */
366 	return responseq;
367 }
368 
369 /*
370  * Sets up the required infrastructure for backend workers to perform
371  * execution and return results to the main backend.
372  */
373 ParallelExecutorInfo *
ExecInitParallelPlan(PlanState * planstate,EState * estate,int nworkers)374 ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
375 {
376 	ParallelExecutorInfo *pei;
377 	ParallelContext *pcxt;
378 	ExecParallelEstimateContext e;
379 	ExecParallelInitializeDSMContext d;
380 	char	   *pstmt_data;
381 	char	   *pstmt_space;
382 	char	   *param_space;
383 	BufferUsage *bufusage_space;
384 	SharedExecutorInstrumentation *instrumentation = NULL;
385 	int			pstmt_len;
386 	int			param_len;
387 	int			instrumentation_len = 0;
388 	int			instrument_offset = 0;
389 	Size		dsa_minsize = dsa_minimum_size();
390 	char	   *query_string;
391 	int			query_len;
392 
393 	/* Allocate object for return value. */
394 	pei = palloc0(sizeof(ParallelExecutorInfo));
395 	pei->finished = false;
396 	pei->planstate = planstate;
397 
398 	/* Fix up and serialize plan to be sent to workers. */
399 	pstmt_data = ExecSerializePlan(planstate->plan, estate);
400 
401 	/* Create a parallel context. */
402 	pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
403 	pei->pcxt = pcxt;
404 
405 	/*
406 	 * Before telling the parallel context to create a dynamic shared memory
407 	 * segment, we need to figure out how big it should be.  Estimate space
408 	 * for the various things we need to store.
409 	 */
410 
411 	/* Estimate space for query text. */
412 	query_len = strlen(estate->es_sourceText);
413 	shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
414 	shm_toc_estimate_keys(&pcxt->estimator, 1);
415 
416 	/* Estimate space for serialized PlannedStmt. */
417 	pstmt_len = strlen(pstmt_data) + 1;
418 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
419 	shm_toc_estimate_keys(&pcxt->estimator, 1);
420 
421 	/* Estimate space for serialized ParamListInfo. */
422 	param_len = EstimateParamListSpace(estate->es_param_list_info);
423 	shm_toc_estimate_chunk(&pcxt->estimator, param_len);
424 	shm_toc_estimate_keys(&pcxt->estimator, 1);
425 
426 	/*
427 	 * Estimate space for BufferUsage.
428 	 *
429 	 * If EXPLAIN is not in use and there are no extensions loaded that care,
430 	 * we could skip this.  But we have no way of knowing whether anyone's
431 	 * looking at pgBufferUsage, so do it unconditionally.
432 	 */
433 	shm_toc_estimate_chunk(&pcxt->estimator,
434 						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
435 	shm_toc_estimate_keys(&pcxt->estimator, 1);
436 
437 	/* Estimate space for tuple queues. */
438 	shm_toc_estimate_chunk(&pcxt->estimator,
439 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
440 	shm_toc_estimate_keys(&pcxt->estimator, 1);
441 
442 	/*
443 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
444 	 * count of how many PlanState nodes there are.
445 	 */
446 	e.pcxt = pcxt;
447 	e.nnodes = 0;
448 	ExecParallelEstimate(planstate, &e);
449 
450 	/* Estimate space for instrumentation, if required. */
451 	if (estate->es_instrument)
452 	{
453 		instrumentation_len =
454 			offsetof(SharedExecutorInstrumentation, plan_node_id) +
455 			sizeof(int) * e.nnodes;
456 		instrumentation_len = MAXALIGN(instrumentation_len);
457 		instrument_offset = instrumentation_len;
458 		instrumentation_len +=
459 			mul_size(sizeof(Instrumentation),
460 					 mul_size(e.nnodes, nworkers));
461 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
462 		shm_toc_estimate_keys(&pcxt->estimator, 1);
463 	}
464 
465 	/* Estimate space for DSA area. */
466 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
467 	shm_toc_estimate_keys(&pcxt->estimator, 1);
468 
469 	/* Everyone's had a chance to ask for space, so now create the DSM. */
470 	InitializeParallelDSM(pcxt);
471 
472 	/*
473 	 * OK, now we have a dynamic shared memory segment, and it should be big
474 	 * enough to store all of the data we estimated we would want to put into
475 	 * it, plus whatever general stuff (not specifically executor-related) the
476 	 * ParallelContext itself needs to store there.  None of the space we
477 	 * asked for has been allocated or initialized yet, though, so do that.
478 	 */
479 
480 	/* Store query string */
481 	query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
482 	memcpy(query_string, estate->es_sourceText, query_len + 1);
483 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
484 
485 	/* Store serialized PlannedStmt. */
486 	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
487 	memcpy(pstmt_space, pstmt_data, pstmt_len);
488 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
489 
490 	/* Store serialized ParamListInfo. */
491 	param_space = shm_toc_allocate(pcxt->toc, param_len);
492 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
493 	SerializeParamList(estate->es_param_list_info, &param_space);
494 
495 	/* Allocate space for each worker's BufferUsage; no need to initialize. */
496 	bufusage_space = shm_toc_allocate(pcxt->toc,
497 									  mul_size(sizeof(BufferUsage), pcxt->nworkers));
498 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
499 	pei->buffer_usage = bufusage_space;
500 
501 	/* Set up the tuple queues that the workers will write into. */
502 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
503 
504 	/* We don't need the TupleQueueReaders yet, though. */
505 	pei->reader = NULL;
506 
507 	/*
508 	 * If instrumentation options were supplied, allocate space for the data.
509 	 * It only gets partially initialized here; the rest happens during
510 	 * ExecParallelInitializeDSM.
511 	 */
512 	if (estate->es_instrument)
513 	{
514 		Instrumentation *instrument;
515 		int			i;
516 
517 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
518 		instrumentation->instrument_options = estate->es_instrument;
519 		instrumentation->instrument_offset = instrument_offset;
520 		instrumentation->num_workers = nworkers;
521 		instrumentation->num_plan_nodes = e.nnodes;
522 		instrument = GetInstrumentationArray(instrumentation);
523 		for (i = 0; i < nworkers * e.nnodes; ++i)
524 			InstrInit(&instrument[i], estate->es_instrument);
525 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
526 					   instrumentation);
527 		pei->instrumentation = instrumentation;
528 	}
529 
530 	/*
531 	 * Create a DSA area that can be used by the leader and all workers.
532 	 * (However, if we failed to create a DSM and are using private memory
533 	 * instead, then skip this.)
534 	 */
535 	if (pcxt->seg != NULL)
536 	{
537 		char	   *area_space;
538 
539 		area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
540 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
541 		pei->area = dsa_create_in_place(area_space, dsa_minsize,
542 										LWTRANCHE_PARALLEL_QUERY_DSA,
543 										pcxt->seg);
544 	}
545 
546 	/*
547 	 * Give parallel-aware nodes a chance to initialize their shared data.
548 	 * This also initializes the elements of instrumentation->ps_instrument,
549 	 * if it exists.
550 	 */
551 	d.pcxt = pcxt;
552 	d.instrumentation = instrumentation;
553 	d.nnodes = 0;
554 
555 	/* Install our DSA area while initializing the plan. */
556 	estate->es_query_dsa = pei->area;
557 	ExecParallelInitializeDSM(planstate, &d);
558 	estate->es_query_dsa = NULL;
559 
560 	/*
561 	 * Make sure that the world hasn't shifted under our feet.  This could
562 	 * probably just be an Assert(), but let's be conservative for now.
563 	 */
564 	if (e.nnodes != d.nnodes)
565 		elog(ERROR, "inconsistent count of PlanState nodes");
566 
567 	/* OK, we're ready to rock and roll. */
568 	return pei;
569 }
570 
571 /*
572  * Set up tuple queue readers to read the results of a parallel subplan.
573  * All the workers are expected to return tuples matching tupDesc.
574  *
575  * This is separate from ExecInitParallelPlan() because we can launch the
576  * worker processes and let them start doing something before we do this.
577  */
578 void
ExecParallelCreateReaders(ParallelExecutorInfo * pei,TupleDesc tupDesc)579 ExecParallelCreateReaders(ParallelExecutorInfo *pei,
580 						  TupleDesc tupDesc)
581 {
582 	int			nworkers = pei->pcxt->nworkers_launched;
583 	int			i;
584 
585 	Assert(pei->reader == NULL);
586 
587 	if (nworkers > 0)
588 	{
589 		pei->reader = (TupleQueueReader **)
590 			palloc(nworkers * sizeof(TupleQueueReader *));
591 
592 		for (i = 0; i < nworkers; i++)
593 		{
594 			shm_mq_set_handle(pei->tqueue[i],
595 							  pei->pcxt->worker[i].bgwhandle);
596 			pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
597 													tupDesc);
598 		}
599 	}
600 }
601 
602 /*
603  * Re-initialize the parallel executor shared memory state before launching
604  * a fresh batch of workers.
605  */
606 void
ExecParallelReinitialize(PlanState * planstate,ParallelExecutorInfo * pei)607 ExecParallelReinitialize(PlanState *planstate,
608 						 ParallelExecutorInfo *pei)
609 {
610 	EState	   *estate = planstate->state;
611 
612 	/* Old workers must already be shut down */
613 	Assert(pei->finished);
614 
615 	ReinitializeParallelDSM(pei->pcxt);
616 	pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
617 	pei->reader = NULL;
618 	pei->finished = false;
619 
620 	/* Traverse plan tree and let each child node reset associated state. */
621 	estate->es_query_dsa = pei->area;
622 	ExecParallelReInitializeDSM(planstate, pei->pcxt);
623 	estate->es_query_dsa = NULL;
624 }
625 
626 /*
627  * Traverse plan tree to reinitialize per-node dynamic shared memory state
628  */
629 static bool
ExecParallelReInitializeDSM(PlanState * planstate,ParallelContext * pcxt)630 ExecParallelReInitializeDSM(PlanState *planstate,
631 							ParallelContext *pcxt)
632 {
633 	if (planstate == NULL)
634 		return false;
635 
636 	/*
637 	 * Call reinitializers for DSM-using plan nodes.
638 	 */
639 	if (planstate->plan->parallel_aware)
640 	{
641 		switch (nodeTag(planstate))
642 		{
643 			case T_SeqScanState:
644 				ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
645 										   pcxt);
646 				break;
647 			case T_IndexScanState:
648 				ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
649 											 pcxt);
650 				break;
651 			case T_IndexOnlyScanState:
652 				ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
653 												 pcxt);
654 				break;
655 			case T_ForeignScanState:
656 				ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
657 											   pcxt);
658 				break;
659 			case T_CustomScanState:
660 				ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
661 											  pcxt);
662 				break;
663 			case T_BitmapHeapScanState:
664 				ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
665 											  pcxt);
666 				break;
667 
668 			default:
669 				break;
670 		}
671 	}
672 
673 	return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
674 }
675 
676 /*
677  * Copy instrumentation information about this node and its descendants from
678  * dynamic shared memory.
679  */
680 static bool
ExecParallelRetrieveInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)681 ExecParallelRetrieveInstrumentation(PlanState *planstate,
682 									SharedExecutorInstrumentation *instrumentation)
683 {
684 	Instrumentation *instrument;
685 	int			i;
686 	int			n;
687 	int			ibytes;
688 	int			plan_node_id = planstate->plan->plan_node_id;
689 	MemoryContext oldcontext;
690 
691 	/* Find the instrumentation for this node. */
692 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
693 		if (instrumentation->plan_node_id[i] == plan_node_id)
694 			break;
695 	if (i >= instrumentation->num_plan_nodes)
696 		elog(ERROR, "plan node %d not found", plan_node_id);
697 
698 	/* Accumulate the statistics from all workers. */
699 	instrument = GetInstrumentationArray(instrumentation);
700 	instrument += i * instrumentation->num_workers;
701 	for (n = 0; n < instrumentation->num_workers; ++n)
702 		InstrAggNode(planstate->instrument, &instrument[n]);
703 
704 	/*
705 	 * Also store the per-worker detail.
706 	 *
707 	 * Worker instrumentation should be allocated in the same context as the
708 	 * regular instrumentation information, which is the per-query context.
709 	 * Switch into per-query memory context.
710 	 */
711 	oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
712 	ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
713 	planstate->worker_instrument =
714 		palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
715 	MemoryContextSwitchTo(oldcontext);
716 
717 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
718 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
719 
720 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
721 								 instrumentation);
722 }
723 
724 /*
725  * Finish parallel execution.  We wait for parallel workers to finish, and
726  * accumulate their buffer usage.
727  */
728 void
ExecParallelFinish(ParallelExecutorInfo * pei)729 ExecParallelFinish(ParallelExecutorInfo *pei)
730 {
731 	int			nworkers = pei->pcxt->nworkers_launched;
732 	int			i;
733 
734 	/* Make this be a no-op if called twice in a row. */
735 	if (pei->finished)
736 		return;
737 
738 	/*
739 	 * Detach from tuple queues ASAP, so that any still-active workers will
740 	 * notice that no further results are wanted.
741 	 */
742 	if (pei->tqueue != NULL)
743 	{
744 		for (i = 0; i < nworkers; i++)
745 			shm_mq_detach(pei->tqueue[i]);
746 		pfree(pei->tqueue);
747 		pei->tqueue = NULL;
748 	}
749 
750 	/*
751 	 * While we're waiting for the workers to finish, let's get rid of the
752 	 * tuple queue readers.  (Any other local cleanup could be done here too.)
753 	 */
754 	if (pei->reader != NULL)
755 	{
756 		for (i = 0; i < nworkers; i++)
757 			DestroyTupleQueueReader(pei->reader[i]);
758 		pfree(pei->reader);
759 		pei->reader = NULL;
760 	}
761 
762 	/* Now wait for the workers to finish. */
763 	WaitForParallelWorkersToFinish(pei->pcxt);
764 
765 	/*
766 	 * Next, accumulate buffer usage.  (This must wait for the workers to
767 	 * finish, or we might get incomplete data.)
768 	 */
769 	for (i = 0; i < nworkers; i++)
770 		InstrAccumParallelQuery(&pei->buffer_usage[i]);
771 
772 	pei->finished = true;
773 }
774 
775 /*
776  * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
777  * resources still exist after ExecParallelFinish.  We separate these
778  * routines because someone might want to examine the contents of the DSM
779  * after ExecParallelFinish and before calling this routine.
780  */
781 void
ExecParallelCleanup(ParallelExecutorInfo * pei)782 ExecParallelCleanup(ParallelExecutorInfo *pei)
783 {
784 	/* Accumulate instrumentation, if any. */
785 	if (pei->instrumentation)
786 		ExecParallelRetrieveInstrumentation(pei->planstate,
787 											pei->instrumentation);
788 
789 	if (pei->area != NULL)
790 	{
791 		dsa_detach(pei->area);
792 		pei->area = NULL;
793 	}
794 	if (pei->pcxt != NULL)
795 	{
796 		DestroyParallelContext(pei->pcxt);
797 		pei->pcxt = NULL;
798 	}
799 	pfree(pei);
800 }
801 
802 /*
803  * Create a DestReceiver to write tuples we produce to the shm_mq designated
804  * for that purpose.
805  */
806 static DestReceiver *
ExecParallelGetReceiver(dsm_segment * seg,shm_toc * toc)807 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
808 {
809 	char	   *mqspace;
810 	shm_mq	   *mq;
811 
812 	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
813 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
814 	mq = (shm_mq *) mqspace;
815 	shm_mq_set_sender(mq, MyProc);
816 	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
817 }
818 
819 /*
820  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
821  */
822 static QueryDesc *
ExecParallelGetQueryDesc(shm_toc * toc,DestReceiver * receiver,int instrument_options)823 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
824 						 int instrument_options)
825 {
826 	char	   *pstmtspace;
827 	char	   *paramspace;
828 	PlannedStmt *pstmt;
829 	ParamListInfo paramLI;
830 	char	   *queryString;
831 
832 	/* Get the query string from shared memory */
833 	queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
834 
835 	/* Reconstruct leader-supplied PlannedStmt. */
836 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
837 	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
838 
839 	/* Reconstruct ParamListInfo. */
840 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false);
841 	paramLI = RestoreParamList(&paramspace);
842 
843 	/* Create a QueryDesc for the query. */
844 	return CreateQueryDesc(pstmt,
845 						   queryString,
846 						   GetActiveSnapshot(), InvalidSnapshot,
847 						   receiver, paramLI, NULL, instrument_options);
848 }
849 
850 /*
851  * Copy instrumentation information from this node and its descendants into
852  * dynamic shared memory, so that the parallel leader can retrieve it.
853  */
854 static bool
ExecParallelReportInstrumentation(PlanState * planstate,SharedExecutorInstrumentation * instrumentation)855 ExecParallelReportInstrumentation(PlanState *planstate,
856 								  SharedExecutorInstrumentation *instrumentation)
857 {
858 	int			i;
859 	int			plan_node_id = planstate->plan->plan_node_id;
860 	Instrumentation *instrument;
861 
862 	InstrEndLoop(planstate->instrument);
863 
864 	/*
865 	 * If we shuffled the plan_node_id values in ps_instrument into sorted
866 	 * order, we could use binary search here.  This might matter someday if
867 	 * we're pushing down sufficiently large plan trees.  For now, do it the
868 	 * slow, dumb way.
869 	 */
870 	for (i = 0; i < instrumentation->num_plan_nodes; ++i)
871 		if (instrumentation->plan_node_id[i] == plan_node_id)
872 			break;
873 	if (i >= instrumentation->num_plan_nodes)
874 		elog(ERROR, "plan node %d not found", plan_node_id);
875 
876 	/*
877 	 * Add our statistics to the per-node, per-worker totals.  It's possible
878 	 * that this could happen more than once if we relaunched workers.
879 	 */
880 	instrument = GetInstrumentationArray(instrumentation);
881 	instrument += i * instrumentation->num_workers;
882 	Assert(IsParallelWorker());
883 	Assert(ParallelWorkerNumber < instrumentation->num_workers);
884 	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
885 
886 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
887 								 instrumentation);
888 }
889 
890 /*
891  * Initialize the PlanState and its descendants with the information
892  * retrieved from shared memory.  This has to be done once the PlanState
893  * is allocated and initialized by executor; that is, after ExecutorStart().
894  */
895 static bool
ExecParallelInitializeWorker(PlanState * planstate,shm_toc * toc)896 ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
897 {
898 	if (planstate == NULL)
899 		return false;
900 
901 	/* Call initializers for parallel-aware plan nodes. */
902 	if (planstate->plan->parallel_aware)
903 	{
904 		switch (nodeTag(planstate))
905 		{
906 			case T_SeqScanState:
907 				ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
908 				break;
909 			case T_IndexScanState:
910 				ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
911 				break;
912 			case T_IndexOnlyScanState:
913 				ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc);
914 				break;
915 			case T_ForeignScanState:
916 				ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
917 												toc);
918 				break;
919 			case T_CustomScanState:
920 				ExecCustomScanInitializeWorker((CustomScanState *) planstate,
921 											   toc);
922 				break;
923 			case T_BitmapHeapScanState:
924 				ExecBitmapHeapInitializeWorker(
925 											   (BitmapHeapScanState *) planstate, toc);
926 				break;
927 			default:
928 				break;
929 		}
930 	}
931 
932 	return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
933 }
934 
935 /*
936  * Main entrypoint for parallel query worker processes.
937  *
938  * We reach this function from ParallelWorkerMain, so the setup necessary to
939  * create a sensible parallel environment has already been done;
940  * ParallelWorkerMain worries about stuff like the transaction state, combo
941  * CID mappings, and GUC values, so we don't need to deal with any of that
942  * here.
943  *
944  * Our job is to deal with concerns specific to the executor.  The parallel
945  * group leader will have stored a serialized PlannedStmt, and it's our job
946  * to execute that plan and write the resulting tuples to the appropriate
947  * tuple queue.  Various bits of supporting information that we need in order
948  * to do this are also stored in the dsm_segment and can be accessed through
949  * the shm_toc.
950  */
951 void
ParallelQueryMain(dsm_segment * seg,shm_toc * toc)952 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
953 {
954 	BufferUsage *buffer_usage;
955 	DestReceiver *receiver;
956 	QueryDesc  *queryDesc;
957 	SharedExecutorInstrumentation *instrumentation;
958 	int			instrument_options = 0;
959 	void	   *area_space;
960 	dsa_area   *area;
961 
962 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
963 	receiver = ExecParallelGetReceiver(seg, toc);
964 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
965 	if (instrumentation != NULL)
966 		instrument_options = instrumentation->instrument_options;
967 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
968 
969 	/* Setting debug_query_string for individual workers */
970 	debug_query_string = queryDesc->sourceText;
971 
972 	/* Report workers' query for monitoring purposes */
973 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
974 
975 	/* Attach to the dynamic shared memory area. */
976 	area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
977 	area = dsa_attach_in_place(area_space, seg);
978 
979 	/* Start up the executor */
980 	ExecutorStart(queryDesc, 0);
981 
982 	/* Special executor initialization steps for parallel workers */
983 	queryDesc->planstate->state->es_query_dsa = area;
984 	ExecParallelInitializeWorker(queryDesc->planstate, toc);
985 
986 	/*
987 	 * Prepare to track buffer usage during query execution.
988 	 *
989 	 * We do this after starting up the executor to match what happens in the
990 	 * leader, which also doesn't count buffer accesses that occur during
991 	 * executor startup.
992 	 */
993 	InstrStartParallelQuery();
994 
995 	/* Run the plan */
996 	ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
997 
998 	/* Shut down the executor */
999 	ExecutorFinish(queryDesc);
1000 
1001 	/* Report buffer usage during parallel execution. */
1002 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1003 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
1004 
1005 	/* Report instrumentation data if any instrumentation options are set. */
1006 	if (instrumentation != NULL)
1007 		ExecParallelReportInstrumentation(queryDesc->planstate,
1008 										  instrumentation);
1009 
1010 	/* Must do this after capturing instrumentation. */
1011 	ExecutorEnd(queryDesc);
1012 
1013 	/* Cleanup. */
1014 	dsa_detach(area);
1015 	FreeQueryDesc(queryDesc);
1016 	(*receiver->rDestroy) (receiver);
1017 }
1018