1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  *	  Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
test()10  * plan.  It can also run the plan itself, if the workers are not available
11  * or have not started up yet.  It then merges all of the results it produces
12  * and the results from the workers into a single output stream.  Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set.  In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself.  In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results.  Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  *	  src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "optimizer/planmain.h"
42 #include "pgstat.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45 
46 
47 static TupleTableSlot *ExecGather(PlanState *pstate);
48 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 static HeapTuple gather_readnext(GatherState *gatherstate);
50 static void ExecShutdownGatherWorkers(GatherState *node);
51 
52 
53 /* ----------------------------------------------------------------
54  *		ExecInitGather
55  * ----------------------------------------------------------------
56  */
57 GatherState *
58 ExecInitGather(Gather *node, EState *estate, int eflags)
59 {
60 	GatherState *gatherstate;
61 	Plan	   *outerNode;
62 	TupleDesc	tupDesc;
63 
64 	/* Gather node doesn't have innerPlan node. */
65 	Assert(innerPlan(node) == NULL);
66 
67 	/*
68 	 * create state structure
69 	 */
70 	gatherstate = makeNode(GatherState);
71 	gatherstate->ps.plan = (Plan *) node;
72 	gatherstate->ps.state = estate;
73 	gatherstate->ps.ExecProcNode = ExecGather;
74 
75 	gatherstate->initialized = false;
76 	gatherstate->need_to_scan_locally =
77 		!node->single_copy && parallel_leader_participation;
78 	gatherstate->tuples_needed = -1;
79 
80 	/*
81 	 * Miscellaneous initialization
82 	 *
83 	 * create expression context for node
84 	 */
85 	ExecAssignExprContext(estate, &gatherstate->ps);
86 
87 	/*
88 	 * now initialize outer plan
89 	 */
90 	outerNode = outerPlan(node);
91 	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
92 	tupDesc = ExecGetResultType(outerPlanState(gatherstate));
93 
94 	/*
95 	 * Initialize result slot, type and projection.
96 	 */
97 	ExecInitResultTupleSlotTL(estate, &gatherstate->ps);
98 	ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
99 
100 	/*
101 	 * Initialize funnel slot to same tuple descriptor as outer plan.
102 	 */
103 	gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc);
104 
105 	/*
106 	 * Gather doesn't support checking a qual (it's always more efficient to
107 	 * do it in the child node).
108 	 */
109 	Assert(!node->plan.qual);
110 
111 	return gatherstate;
112 }
113 
114 /* ----------------------------------------------------------------
115  *		ExecGather(node)
116  *
117  *		Scans the relation via multiple workers and returns
118  *		the next qualifying tuple.
119  * ----------------------------------------------------------------
120  */
121 static TupleTableSlot *
122 ExecGather(PlanState *pstate)
123 {
124 	GatherState *node = castNode(GatherState, pstate);
125 	TupleTableSlot *slot;
126 	ExprContext *econtext;
test2()127 
128 	CHECK_FOR_INTERRUPTS();
129 
130 	/*
131 	 * Initialize the parallel context and workers on first execution. We do
132 	 * this on first execution rather than during node initialization, as it
133 	 * needs to allocate a large dynamic segment, so it is better to do it
134 	 * only if it is really needed.
135 	 */
f(T t)136 	if (!node->initialized)
137 	{
138 		EState	   *estate = node->ps.state;
139 		Gather	   *gather = (Gather *) node->ps.plan;
140 
141 		/*
142 		 * Sometimes we might have to run without parallelism; but if parallel
143 		 * mode is active then we can try to fire up some workers.
144 		 */
145 		if (gather->num_workers > 0 && estate->es_use_parallel_mode)
146 		{
147 			ParallelContext *pcxt;
148 
149 			/* Initialize, or re-initialize, shared state needed by workers. */
150 			if (!node->pei)
151 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
152 												 estate,
153 												 gather->initParam,
154 												 gather->num_workers,
155 												 node->tuples_needed);
156 			else
157 				ExecParallelReinitialize(node->ps.lefttree,
158 										 node->pei,
159 										 gather->initParam);
160 
161 			/*
162 			 * Register backend workers. We might not get as many as we
163 			 * requested, or indeed any at all.
164 			 */
165 			pcxt = node->pei->pcxt;
166 			LaunchParallelWorkers(pcxt);
167 			/* We save # workers launched for the benefit of EXPLAIN */
168 			node->nworkers_launched = pcxt->nworkers_launched;
169 
170 			/* Set up tuple queue readers to read the results. */
171 			if (pcxt->nworkers_launched > 0)
172 			{
173 				ExecParallelCreateReaders(node->pei);
174 				/* Make a working array showing the active readers */
175 				node->nreaders = pcxt->nworkers_launched;
176 				node->reader = (TupleQueueReader **)
177 					palloc(node->nreaders * sizeof(TupleQueueReader *));
178 				memcpy(node->reader, node->pei->reader,
179 					   node->nreaders * sizeof(TupleQueueReader *));
180 			}
181 			else
182 			{
183 				/* No workers?	Then never mind. */
184 				node->nreaders = 0;
185 				node->reader = NULL;
186 			}
187 			node->nextreader = 0;
188 		}
189 
190 		/* Run plan locally if no workers or enabled and not single-copy. */
191 		node->need_to_scan_locally = (node->nreaders == 0)
192 			|| (!gather->single_copy && parallel_leader_participation);
193 		node->initialized = true;
194 	}
195 
196 	/*
197 	 * Reset per-tuple memory context to free any expression evaluation
198 	 * storage allocated in the previous tuple cycle.
199 	 */
200 	econtext = node->ps.ps_ExprContext;
201 	ResetExprContext(econtext);
202 
203 	/*
204 	 * Get next tuple, either from one of our workers, or by running the plan
205 	 * ourselves.
206 	 */
207 	slot = gather_getnext(node);
208 	if (TupIsNull(slot))
209 		return NULL;
210 
211 	/* If no projection is required, we're done. */
212 	if (node->ps.ps_ProjInfo == NULL)
213 		return slot;
214 
215 	/*
216 	 * Form the result tuple using ExecProject(), and return it.
217 	 */
218 	econtext->ecxt_outertuple = slot;
219 	return ExecProject(node->ps.ps_ProjInfo);
220 }
221 
222 /* ----------------------------------------------------------------
223  *		ExecEndGather
224  *
225  *		frees any storage allocated through C routines.
226  * ----------------------------------------------------------------
227  */
228 void
229 ExecEndGather(GatherState *node)
230 {
231 	ExecEndNode(outerPlanState(node));	/* let children clean up first */
232 	ExecShutdownGather(node);
233 	ExecFreeExprContext(&node->ps);
234 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
235 }
236 
237 /*
238  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
239  * using gather_readnext, or if no tuple queue contains a tuple and the
240  * single_copy flag is not set, we might generate one locally instead.
241  */
242 static TupleTableSlot *
243 gather_getnext(GatherState *gatherstate)
244 {
245 	PlanState  *outerPlan = outerPlanState(gatherstate);
246 	TupleTableSlot *outerTupleSlot;
247 	TupleTableSlot *fslot = gatherstate->funnel_slot;
248 	HeapTuple	tup;
249 
250 	while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
251 	{
252 		CHECK_FOR_INTERRUPTS();
253 
254 		if (gatherstate->nreaders > 0)
255 		{
256 			tup = gather_readnext(gatherstate);
257 
258 			if (HeapTupleIsValid(tup))
259 			{
260 				ExecStoreTuple(tup, /* tuple to store */
261 							   fslot,	/* slot in which to store the tuple */
262 							   InvalidBuffer,	/* buffer associated with this
263 												 * tuple */
264 							   true);	/* pfree tuple when done with it */
265 				return fslot;
266 			}
267 		}
268 
269 		if (gatherstate->need_to_scan_locally)
270 		{
271 			EState	   *estate = gatherstate->ps.state;
272 
273 			/* Install our DSA area while executing the plan. */
274 			estate->es_query_dsa =
275 				gatherstate->pei ? gatherstate->pei->area : NULL;
276 			outerTupleSlot = ExecProcNode(outerPlan);
277 			estate->es_query_dsa = NULL;
278 
279 			if (!TupIsNull(outerTupleSlot))
280 				return outerTupleSlot;
281 
282 			gatherstate->need_to_scan_locally = false;
283 		}
284 	}
285 
286 	return ExecClearTuple(fslot);
287 }
288 
289 /*
290  * Attempt to read a tuple from one of our parallel workers.
291  */
292 static HeapTuple
293 gather_readnext(GatherState *gatherstate)
294 {
295 	int			nvisited = 0;
296 
297 	for (;;)
298 	{
299 		TupleQueueReader *reader;
300 		HeapTuple	tup;
301 		bool		readerdone;
302 
303 		/* Check for async events, particularly messages from workers. */
304 		CHECK_FOR_INTERRUPTS();
305 
306 		/*
307 		 * Attempt to read a tuple, but don't block if none is available.
308 		 *
309 		 * Note that TupleQueueReaderNext will just return NULL for a worker
310 		 * which fails to initialize.  We'll treat that worker as having
311 		 * produced no tuples; WaitForParallelWorkersToFinish will error out
312 		 * when we get there.
313 		 */
314 		Assert(gatherstate->nextreader < gatherstate->nreaders);
315 		reader = gatherstate->reader[gatherstate->nextreader];
316 		tup = TupleQueueReaderNext(reader, true, &readerdone);
317 
318 		/*
319 		 * If this reader is done, remove it from our working array of active
320 		 * readers.  If all readers are done, we're outta here.
321 		 */
322 		if (readerdone)
323 		{
324 			Assert(!tup);
325 			--gatherstate->nreaders;
326 			if (gatherstate->nreaders == 0)
327 			{
328 				ExecShutdownGatherWorkers(gatherstate);
329 				return NULL;
330 			}
331 			memmove(&gatherstate->reader[gatherstate->nextreader],
332 					&gatherstate->reader[gatherstate->nextreader + 1],
333 					sizeof(TupleQueueReader *)
334 					* (gatherstate->nreaders - gatherstate->nextreader));
335 			if (gatherstate->nextreader >= gatherstate->nreaders)
336 				gatherstate->nextreader = 0;
337 			continue;
338 		}
339 
340 		/* If we got a tuple, return it. */
341 		if (tup)
342 			return tup;
343 
344 		/*
345 		 * Advance nextreader pointer in round-robin fashion.  Note that we
346 		 * only reach this code if we weren't able to get a tuple from the
347 		 * current worker.  We used to advance the nextreader pointer after
348 		 * every tuple, but it turns out to be much more efficient to keep
349 		 * reading from the same queue until that would require blocking.
350 		 */
351 		gatherstate->nextreader++;
352 		if (gatherstate->nextreader >= gatherstate->nreaders)
353 			gatherstate->nextreader = 0;
354 
355 		/* Have we visited every (surviving) TupleQueueReader? */
356 		nvisited++;
357 		if (nvisited >= gatherstate->nreaders)
358 		{
359 			/*
360 			 * If (still) running plan locally, return NULL so caller can
361 			 * generate another tuple from the local copy of the plan.
362 			 */
363 			if (gatherstate->need_to_scan_locally)
364 				return NULL;
365 
366 			/* Nothing to do except wait for developments. */
367 			WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER);
368 			ResetLatch(MyLatch);
369 			nvisited = 0;
370 		}
371 	}
372 }
373 
374 /* ----------------------------------------------------------------
375  *		ExecShutdownGatherWorkers
376  *
377  *		Stop all the parallel workers.
378  * ----------------------------------------------------------------
379  */
380 static void
381 ExecShutdownGatherWorkers(GatherState *node)
382 {
383 	if (node->pei != NULL)
384 		ExecParallelFinish(node->pei);
385 
386 	/* Flush local copy of reader array */
387 	if (node->reader)
388 		pfree(node->reader);
389 	node->reader = NULL;
390 }
391 
392 /* ----------------------------------------------------------------
393  *		ExecShutdownGather
394  *
395  *		Destroy the setup for parallel workers including parallel context.
396  * ----------------------------------------------------------------
397  */
398 void
399 ExecShutdownGather(GatherState *node)
400 {
401 	ExecShutdownGatherWorkers(node);
402 
403 	/* Now destroy the parallel context. */
404 	if (node->pei != NULL)
405 	{
406 		ExecParallelCleanup(node->pei);
407 		node->pei = NULL;
408 	}
409 }
410 
411 /* ----------------------------------------------------------------
412  *						Join Support
413  * ----------------------------------------------------------------
414  */
415 
416 /* ----------------------------------------------------------------
417  *		ExecReScanGather
418  *
419  *		Prepare to re-scan the result of a Gather.
420  * ----------------------------------------------------------------
421  */
422 void
423 ExecReScanGather(GatherState *node)
424 {
425 	Gather	   *gather = (Gather *) node->ps.plan;
426 	PlanState  *outerPlan = outerPlanState(node);
427 
428 	/* Make sure any existing workers are gracefully shut down */
429 	ExecShutdownGatherWorkers(node);
430 
431 	/* Mark node so that shared state will be rebuilt at next call */
432 	node->initialized = false;
433 
434 	/*
435 	 * Set child node's chgParam to tell it that the next scan might deliver a
436 	 * different set of rows within the leader process.  (The overall rowset
437 	 * shouldn't change, but the leader process's subset might; hence nodes
438 	 * between here and the parallel table scan node mustn't optimize on the
439 	 * assumption of an unchanging rowset.)
440 	 */
441 	if (gather->rescan_param >= 0)
442 		outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
443 											 gather->rescan_param);
444 
445 	/*
446 	 * If chgParam of subnode is not null then plan will be re-scanned by
447 	 * first ExecProcNode.  Note: because this does nothing if we have a
448 	 * rescan_param, it's currently guaranteed that parallel-aware child nodes
449 	 * will not see a ReScan call until after they get a ReInitializeDSM call.
450 	 * That ordering might not be something to rely on, though.  A good rule
451 	 * of thumb is that ReInitializeDSM should reset only shared state, ReScan
452 	 * should reset only local state, and anything that depends on both of
453 	 * those steps being finished must wait until the first ExecProcNode call.
454 	 */
455 	if (outerPlan->chgParam == NULL)
456 		ExecReScan(outerPlan);
457 }
458