1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  *	  Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan.  It can also run the plan itself, if the workers are not available
11  * or have not started up yet.  It then merges all of the results it produces
12  * and the results from the workers into a single output stream.  Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set.  In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself.  In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results.  Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  *	  src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30 
31 #include "postgres.h"
32 
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "pgstat.h"
42 #include "utils/memutils.h"
43 #include "utils/rel.h"
44 
45 
46 static TupleTableSlot *ExecGather(PlanState *pstate);
47 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
48 static HeapTuple gather_readnext(GatherState *gatherstate);
49 static void ExecShutdownGatherWorkers(GatherState *node);
50 
51 
52 /* ----------------------------------------------------------------
53  *		ExecInitGather
54  * ----------------------------------------------------------------
55  */
56 GatherState *
ExecInitGather(Gather * node,EState * estate,int eflags)57 ExecInitGather(Gather *node, EState *estate, int eflags)
58 {
59 	GatherState *gatherstate;
60 	Plan	   *outerNode;
61 	bool		hasoid;
62 	TupleDesc	tupDesc;
63 
64 	/* Gather node doesn't have innerPlan node. */
65 	Assert(innerPlan(node) == NULL);
66 
67 	/*
68 	 * create state structure
69 	 */
70 	gatherstate = makeNode(GatherState);
71 	gatherstate->ps.plan = (Plan *) node;
72 	gatherstate->ps.state = estate;
73 	gatherstate->ps.ExecProcNode = ExecGather;
74 
75 	gatherstate->initialized = false;
76 	gatherstate->need_to_scan_locally = !node->single_copy;
77 
78 	/*
79 	 * Miscellaneous initialization
80 	 *
81 	 * create expression context for node
82 	 */
83 	ExecAssignExprContext(estate, &gatherstate->ps);
84 
85 	/*
86 	 * Gather doesn't support checking a qual (it's always more efficient to
87 	 * do it in the child node).
88 	 */
89 	Assert(!node->plan.qual);
90 
91 	/*
92 	 * tuple table initialization
93 	 */
94 	gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
95 	ExecInitResultTupleSlot(estate, &gatherstate->ps);
96 
97 	/*
98 	 * now initialize outer plan
99 	 */
100 	outerNode = outerPlan(node);
101 	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
102 
103 	/*
104 	 * Initialize result tuple type and projection info.
105 	 */
106 	ExecAssignResultTypeFromTL(&gatherstate->ps);
107 	ExecAssignProjectionInfo(&gatherstate->ps, NULL);
108 
109 	/*
110 	 * Initialize funnel slot to same tuple descriptor as outer plan.
111 	 */
112 	if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
113 		hasoid = false;
114 	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
115 	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
116 
117 	return gatherstate;
118 }
119 
120 /* ----------------------------------------------------------------
121  *		ExecGather(node)
122  *
123  *		Scans the relation via multiple workers and returns
124  *		the next qualifying tuple.
125  * ----------------------------------------------------------------
126  */
127 static TupleTableSlot *
ExecGather(PlanState * pstate)128 ExecGather(PlanState *pstate)
129 {
130 	GatherState *node = castNode(GatherState, pstate);
131 	TupleTableSlot *fslot = node->funnel_slot;
132 	TupleTableSlot *slot;
133 	ExprContext *econtext;
134 
135 	CHECK_FOR_INTERRUPTS();
136 
137 	/*
138 	 * Initialize the parallel context and workers on first execution. We do
139 	 * this on first execution rather than during node initialization, as it
140 	 * needs to allocate a large dynamic segment, so it is better to do it
141 	 * only if it is really needed.
142 	 */
143 	if (!node->initialized)
144 	{
145 		EState	   *estate = node->ps.state;
146 		Gather	   *gather = (Gather *) node->ps.plan;
147 
148 		/*
149 		 * Sometimes we might have to run without parallelism; but if parallel
150 		 * mode is active then we can try to fire up some workers.
151 		 */
152 		if (gather->num_workers > 0 && estate->es_use_parallel_mode)
153 		{
154 			ParallelContext *pcxt;
155 
156 			/* Initialize, or re-initialize, shared state needed by workers. */
157 			if (!node->pei)
158 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
159 												 estate,
160 												 gather->num_workers);
161 			else
162 				ExecParallelReinitialize(node->ps.lefttree,
163 										 node->pei);
164 
165 			/*
166 			 * Register backend workers. We might not get as many as we
167 			 * requested, or indeed any at all.
168 			 */
169 			pcxt = node->pei->pcxt;
170 			LaunchParallelWorkers(pcxt);
171 			/* We save # workers launched for the benefit of EXPLAIN */
172 			node->nworkers_launched = pcxt->nworkers_launched;
173 
174 			/* Set up tuple queue readers to read the results. */
175 			if (pcxt->nworkers_launched > 0)
176 			{
177 				ExecParallelCreateReaders(node->pei,
178 										  fslot->tts_tupleDescriptor);
179 				/* Make a working array showing the active readers */
180 				node->nreaders = pcxt->nworkers_launched;
181 				node->reader = (TupleQueueReader **)
182 					palloc(node->nreaders * sizeof(TupleQueueReader *));
183 				memcpy(node->reader, node->pei->reader,
184 					   node->nreaders * sizeof(TupleQueueReader *));
185 			}
186 			else
187 			{
188 				/* No workers?	Then never mind. */
189 				node->nreaders = 0;
190 				node->reader = NULL;
191 			}
192 			node->nextreader = 0;
193 		}
194 
195 		/* Run plan locally if no workers or not single-copy. */
196 		node->need_to_scan_locally = (node->nreaders == 0)
197 			|| !gather->single_copy;
198 		node->initialized = true;
199 	}
200 
201 	/*
202 	 * Reset per-tuple memory context to free any expression evaluation
203 	 * storage allocated in the previous tuple cycle.  This will also clear
204 	 * any previous tuple returned by a TupleQueueReader; to make sure we
205 	 * don't leave a dangling pointer around, clear the working slot first.
206 	 */
207 	ExecClearTuple(fslot);
208 	econtext = node->ps.ps_ExprContext;
209 	ResetExprContext(econtext);
210 
211 	/*
212 	 * Get next tuple, either from one of our workers, or by running the plan
213 	 * ourselves.
214 	 */
215 	slot = gather_getnext(node);
216 	if (TupIsNull(slot))
217 		return NULL;
218 
219 	/*
220 	 * Form the result tuple using ExecProject(), and return it.
221 	 */
222 	econtext->ecxt_outertuple = slot;
223 	return ExecProject(node->ps.ps_ProjInfo);
224 }
225 
226 /* ----------------------------------------------------------------
227  *		ExecEndGather
228  *
229  *		frees any storage allocated through C routines.
230  * ----------------------------------------------------------------
231  */
232 void
ExecEndGather(GatherState * node)233 ExecEndGather(GatherState *node)
234 {
235 	ExecEndNode(outerPlanState(node));	/* let children clean up first */
236 	ExecShutdownGather(node);
237 	ExecFreeExprContext(&node->ps);
238 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
239 }
240 
241 /*
242  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
243  * using gather_readnext, or if no tuple queue contains a tuple and the
244  * single_copy flag is not set, we might generate one locally instead.
245  */
246 static TupleTableSlot *
gather_getnext(GatherState * gatherstate)247 gather_getnext(GatherState *gatherstate)
248 {
249 	PlanState  *outerPlan = outerPlanState(gatherstate);
250 	TupleTableSlot *outerTupleSlot;
251 	TupleTableSlot *fslot = gatherstate->funnel_slot;
252 	MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
253 	HeapTuple	tup;
254 
255 	while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
256 	{
257 		CHECK_FOR_INTERRUPTS();
258 
259 		if (gatherstate->nreaders > 0)
260 		{
261 			MemoryContext oldContext;
262 
263 			/* Run TupleQueueReaders in per-tuple context */
264 			oldContext = MemoryContextSwitchTo(tupleContext);
265 			tup = gather_readnext(gatherstate);
266 			MemoryContextSwitchTo(oldContext);
267 
268 			if (HeapTupleIsValid(tup))
269 			{
270 				ExecStoreTuple(tup, /* tuple to store */
271 							   fslot,	/* slot in which to store the tuple */
272 							   InvalidBuffer,	/* buffer associated with this
273 												 * tuple */
274 							   false);	/* slot should not pfree tuple */
275 				return fslot;
276 			}
277 		}
278 
279 		if (gatherstate->need_to_scan_locally)
280 		{
281 			EState *estate = gatherstate->ps.state;
282 
283 			/* Install our DSA area while executing the plan. */
284 			estate->es_query_dsa =
285 				gatherstate->pei ? gatherstate->pei->area : NULL;
286 			outerTupleSlot = ExecProcNode(outerPlan);
287 			estate->es_query_dsa = NULL;
288 
289 			if (!TupIsNull(outerTupleSlot))
290 				return outerTupleSlot;
291 
292 			gatherstate->need_to_scan_locally = false;
293 		}
294 	}
295 
296 	return ExecClearTuple(fslot);
297 }
298 
299 /*
300  * Attempt to read a tuple from one of our parallel workers.
301  */
302 static HeapTuple
gather_readnext(GatherState * gatherstate)303 gather_readnext(GatherState *gatherstate)
304 {
305 	int			nvisited = 0;
306 
307 	for (;;)
308 	{
309 		TupleQueueReader *reader;
310 		HeapTuple	tup;
311 		bool		readerdone;
312 
313 		/* Check for async events, particularly messages from workers. */
314 		CHECK_FOR_INTERRUPTS();
315 
316 		/* Attempt to read a tuple, but don't block if none is available. */
317 		Assert(gatherstate->nextreader < gatherstate->nreaders);
318 		reader = gatherstate->reader[gatherstate->nextreader];
319 		tup = TupleQueueReaderNext(reader, true, &readerdone);
320 
321 		/*
322 		 * If this reader is done, remove it from our working array of active
323 		 * readers.  If all readers are done, we're outta here.
324 		 */
325 		if (readerdone)
326 		{
327 			Assert(!tup);
328 			--gatherstate->nreaders;
329 			if (gatherstate->nreaders == 0)
330 			{
331 				ExecShutdownGatherWorkers(gatherstate);
332 				return NULL;
333 			}
334 			memmove(&gatherstate->reader[gatherstate->nextreader],
335 					&gatherstate->reader[gatherstate->nextreader + 1],
336 					sizeof(TupleQueueReader *)
337 					* (gatherstate->nreaders - gatherstate->nextreader));
338 			if (gatherstate->nextreader >= gatherstate->nreaders)
339 				gatherstate->nextreader = 0;
340 			continue;
341 		}
342 
343 		/* If we got a tuple, return it. */
344 		if (tup)
345 			return tup;
346 
347 		/*
348 		 * Advance nextreader pointer in round-robin fashion.  Note that we
349 		 * only reach this code if we weren't able to get a tuple from the
350 		 * current worker.  We used to advance the nextreader pointer after
351 		 * every tuple, but it turns out to be much more efficient to keep
352 		 * reading from the same queue until that would require blocking.
353 		 */
354 		gatherstate->nextreader++;
355 		if (gatherstate->nextreader >= gatherstate->nreaders)
356 			gatherstate->nextreader = 0;
357 
358 		/* Have we visited every (surviving) TupleQueueReader? */
359 		nvisited++;
360 		if (nvisited >= gatherstate->nreaders)
361 		{
362 			/*
363 			 * If (still) running plan locally, return NULL so caller can
364 			 * generate another tuple from the local copy of the plan.
365 			 */
366 			if (gatherstate->need_to_scan_locally)
367 				return NULL;
368 
369 			/* Nothing to do except wait for developments. */
370 			WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER);
371 			ResetLatch(MyLatch);
372 			nvisited = 0;
373 		}
374 	}
375 }
376 
377 /* ----------------------------------------------------------------
378  *		ExecShutdownGatherWorkers
379  *
380  *		Stop all the parallel workers.
381  * ----------------------------------------------------------------
382  */
383 static void
ExecShutdownGatherWorkers(GatherState * node)384 ExecShutdownGatherWorkers(GatherState *node)
385 {
386 	if (node->pei != NULL)
387 		ExecParallelFinish(node->pei);
388 
389 	/* Flush local copy of reader array */
390 	if (node->reader)
391 		pfree(node->reader);
392 	node->reader = NULL;
393 }
394 
395 /* ----------------------------------------------------------------
396  *		ExecShutdownGather
397  *
398  *		Destroy the setup for parallel workers including parallel context.
399  * ----------------------------------------------------------------
400  */
401 void
ExecShutdownGather(GatherState * node)402 ExecShutdownGather(GatherState *node)
403 {
404 	ExecShutdownGatherWorkers(node);
405 
406 	/* Now destroy the parallel context. */
407 	if (node->pei != NULL)
408 	{
409 		ExecParallelCleanup(node->pei);
410 		node->pei = NULL;
411 	}
412 }
413 
414 /* ----------------------------------------------------------------
415  *						Join Support
416  * ----------------------------------------------------------------
417  */
418 
419 /* ----------------------------------------------------------------
420  *		ExecReScanGather
421  *
422  *		Prepare to re-scan the result of a Gather.
423  * ----------------------------------------------------------------
424  */
425 void
ExecReScanGather(GatherState * node)426 ExecReScanGather(GatherState *node)
427 {
428 	Gather	   *gather = (Gather *) node->ps.plan;
429 	PlanState  *outerPlan = outerPlanState(node);
430 
431 	/* Make sure any existing workers are gracefully shut down */
432 	ExecShutdownGatherWorkers(node);
433 
434 	/* Mark node so that shared state will be rebuilt at next call */
435 	node->initialized = false;
436 
437 	/*
438 	 * Set child node's chgParam to tell it that the next scan might deliver a
439 	 * different set of rows within the leader process.  (The overall rowset
440 	 * shouldn't change, but the leader process's subset might; hence nodes
441 	 * between here and the parallel table scan node mustn't optimize on the
442 	 * assumption of an unchanging rowset.)
443 	 */
444 	if (gather->rescan_param >= 0)
445 		outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
446 											 gather->rescan_param);
447 
448 	/*
449 	 * If chgParam of subnode is not null then plan will be re-scanned by
450 	 * first ExecProcNode.  Note: because this does nothing if we have a
451 	 * rescan_param, it's currently guaranteed that parallel-aware child nodes
452 	 * will not see a ReScan call until after they get a ReInitializeDSM call.
453 	 * That ordering might not be something to rely on, though.  A good rule
454 	 * of thumb is that ReInitializeDSM should reset only shared state, ReScan
455 	 * should reset only local state, and anything that depends on both of
456 	 * those steps being finished must wait until the first ExecProcNode call.
457 	 */
458 	if (outerPlan->chgParam == NULL)
459 		ExecReScan(outerPlan);
460 }
461