1 /*-------------------------------------------------------------------------
2  *
3  * nodeAppend.c
4  *	  routines to handle append nodes.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *		ExecInitAppend	- initialize the append node
17  *		ExecAppend		- retrieve the next tuple from the node
18  *		ExecEndAppend	- shut down the append node
19  *		ExecReScanAppend - rescan the append node
20  *
21  *	 NOTES
22  *		Each append node contains a list of one or more subplans which
23  *		must be iteratively processed (forwards or backwards).
24  *		Tuples are retrieved by executing the 'whichplan'th subplan
25  *		until the subplan stops returning tuples, at which point that
26  *		plan is shut down and the next started up.
27  *
28  *		Append nodes don't make use of their left and right
29  *		subtrees, rather they maintain a list of subplans so
30  *		a typical append node looks like this in the plan tree:
31  *
32  *				   ...
33  *				   /
34  *				Append -------+------+------+--- nil
35  *				/	\		  |		 |		|
36  *			  nil	nil		 ...    ...    ...
37  *								 subplans
38  *
39  *		Append nodes are currently used for unions, and to support
40  *		inheritance queries, where several relations need to be scanned.
41  *		For example, in our standard person/student/employee/student-emp
42  *		example, where student and employee inherit from person
43  *		and student-emp inherits from student and employee, the
44  *		query:
45  *
46  *				select name from person
47  *
48  *		generates the plan:
49  *
50  *				  |
51  *				Append -------+-------+--------+--------+
52  *				/	\		  |		  |		   |		|
53  *			  nil	nil		 Scan	 Scan	  Scan	   Scan
54  *							  |		  |		   |		|
55  *							person employee student student-emp
56  */
57 
58 #include "postgres.h"
59 
60 #include "executor/execAsync.h"
61 #include "executor/execdebug.h"
62 #include "executor/execPartition.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
65 #include "pgstat.h"
66 #include "storage/latch.h"
67 
68 /* Shared state for parallel-aware Append. */
69 struct ParallelAppendState
70 {
71 	LWLock		pa_lock;		/* mutual exclusion to choose next subplan */
72 	int			pa_next_plan;	/* next plan to choose by any worker */
73 
74 	/*
75 	 * pa_finished[i] should be true if no more workers should select subplan
76 	 * i.  for a non-partial plan, this should be set to true as soon as a
77 	 * worker selects the plan; for a partial plan, it remains false until
78 	 * some worker executes the plan to completion.
79 	 */
80 	bool		pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 };
82 
83 #define INVALID_SUBPLAN_INDEX		-1
84 #define EVENT_BUFFER_SIZE			16
85 
86 static TupleTableSlot *ExecAppend(PlanState *pstate);
87 static bool choose_next_subplan_locally(AppendState *node);
88 static bool choose_next_subplan_for_leader(AppendState *node);
89 static bool choose_next_subplan_for_worker(AppendState *node);
90 static void mark_invalid_subplans_as_finished(AppendState *node);
91 static void ExecAppendAsyncBegin(AppendState *node);
92 static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 static void ExecAppendAsyncEventWait(AppendState *node);
95 static void classify_matching_subplans(AppendState *node);
96 
97 /* ----------------------------------------------------------------
98  *		ExecInitAppend
99  *
100  *		Begin all of the subscans of the append node.
101  *
102  *	   (This is potentially wasteful, since the entire result of the
103  *		append node may not be scanned, but this way all of the
104  *		structures get allocated in the executor's top level memory
105  *		block instead of that of the call to ExecAppend.)
106  * ----------------------------------------------------------------
107  */
108 AppendState *
ExecInitAppend(Append * node,EState * estate,int eflags)109 ExecInitAppend(Append *node, EState *estate, int eflags)
110 {
111 	AppendState *appendstate = makeNode(AppendState);
112 	PlanState **appendplanstates;
113 	Bitmapset  *validsubplans;
114 	Bitmapset  *asyncplans;
115 	int			nplans;
116 	int			nasyncplans;
117 	int			firstvalid;
118 	int			i,
119 				j;
120 
121 	/* check for unsupported flags */
122 	Assert(!(eflags & EXEC_FLAG_MARK));
123 
124 	/*
125 	 * create new AppendState for our append node
126 	 */
127 	appendstate->ps.plan = (Plan *) node;
128 	appendstate->ps.state = estate;
129 	appendstate->ps.ExecProcNode = ExecAppend;
130 
131 	/* Let choose_next_subplan_* function handle setting the first subplan */
132 	appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133 	appendstate->as_syncdone = false;
134 	appendstate->as_begun = false;
135 
136 	/* If run-time partition pruning is enabled, then set that up now */
137 	if (node->part_prune_info != NULL)
138 	{
139 		PartitionPruneState *prunestate;
140 
141 		/* We may need an expression context to evaluate partition exprs */
142 		ExecAssignExprContext(estate, &appendstate->ps);
143 
144 		/* Create the working data structure for pruning. */
145 		prunestate = ExecCreatePartitionPruneState(&appendstate->ps,
146 												   node->part_prune_info);
147 		appendstate->as_prune_state = prunestate;
148 
149 		/* Perform an initial partition prune, if required. */
150 		if (prunestate->do_initial_prune)
151 		{
152 			/* Determine which subplans survive initial pruning */
153 			validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
154 															list_length(node->appendplans));
155 
156 			nplans = bms_num_members(validsubplans);
157 		}
158 		else
159 		{
160 			/* We'll need to initialize all subplans */
161 			nplans = list_length(node->appendplans);
162 			Assert(nplans > 0);
163 			validsubplans = bms_add_range(NULL, 0, nplans - 1);
164 		}
165 
166 		/*
167 		 * When no run-time pruning is required and there's at least one
168 		 * subplan, we can fill as_valid_subplans immediately, preventing
169 		 * later calls to ExecFindMatchingSubPlans.
170 		 */
171 		if (!prunestate->do_exec_prune && nplans > 0)
172 			appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
173 	}
174 	else
175 	{
176 		nplans = list_length(node->appendplans);
177 
178 		/*
179 		 * When run-time partition pruning is not enabled we can just mark all
180 		 * subplans as valid; they must also all be initialized.
181 		 */
182 		Assert(nplans > 0);
183 		appendstate->as_valid_subplans = validsubplans =
184 			bms_add_range(NULL, 0, nplans - 1);
185 		appendstate->as_prune_state = NULL;
186 	}
187 
188 	/*
189 	 * Initialize result tuple type and slot.
190 	 */
191 	ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
192 
193 	/* node returns slots from each of its subnodes, therefore not fixed */
194 	appendstate->ps.resultopsset = true;
195 	appendstate->ps.resultopsfixed = false;
196 
197 	appendplanstates = (PlanState **) palloc(nplans *
198 											 sizeof(PlanState *));
199 
200 	/*
201 	 * call ExecInitNode on each of the valid plans to be executed and save
202 	 * the results into the appendplanstates array.
203 	 *
204 	 * While at it, find out the first valid partial plan.
205 	 */
206 	j = 0;
207 	asyncplans = NULL;
208 	nasyncplans = 0;
209 	firstvalid = nplans;
210 	i = -1;
211 	while ((i = bms_next_member(validsubplans, i)) >= 0)
212 	{
213 		Plan	   *initNode = (Plan *) list_nth(node->appendplans, i);
214 
215 		/*
216 		 * Record async subplans.  When executing EvalPlanQual, we treat them
217 		 * as sync ones; don't do this when initializing an EvalPlanQual plan
218 		 * tree.
219 		 */
220 		if (initNode->async_capable && estate->es_epq_active == NULL)
221 		{
222 			asyncplans = bms_add_member(asyncplans, j);
223 			nasyncplans++;
224 		}
225 
226 		/*
227 		 * Record the lowest appendplans index which is a valid partial plan.
228 		 */
229 		if (i >= node->first_partial_plan && j < firstvalid)
230 			firstvalid = j;
231 
232 		appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
233 	}
234 
235 	appendstate->as_first_partial_plan = firstvalid;
236 	appendstate->appendplans = appendplanstates;
237 	appendstate->as_nplans = nplans;
238 
239 	/* Initialize async state */
240 	appendstate->as_asyncplans = asyncplans;
241 	appendstate->as_nasyncplans = nasyncplans;
242 	appendstate->as_asyncrequests = NULL;
243 	appendstate->as_asyncresults = NULL;
244 	appendstate->as_nasyncresults = 0;
245 	appendstate->as_nasyncremain = 0;
246 	appendstate->as_needrequest = NULL;
247 	appendstate->as_eventset = NULL;
248 	appendstate->as_valid_asyncplans = NULL;
249 
250 	if (nasyncplans > 0)
251 	{
252 		appendstate->as_asyncrequests = (AsyncRequest **)
253 			palloc0(nplans * sizeof(AsyncRequest *));
254 
255 		i = -1;
256 		while ((i = bms_next_member(asyncplans, i)) >= 0)
257 		{
258 			AsyncRequest *areq;
259 
260 			areq = palloc(sizeof(AsyncRequest));
261 			areq->requestor = (PlanState *) appendstate;
262 			areq->requestee = appendplanstates[i];
263 			areq->request_index = i;
264 			areq->callback_pending = false;
265 			areq->request_complete = false;
266 			areq->result = NULL;
267 
268 			appendstate->as_asyncrequests[i] = areq;
269 		}
270 
271 		appendstate->as_asyncresults = (TupleTableSlot **)
272 			palloc0(nasyncplans * sizeof(TupleTableSlot *));
273 
274 		if (appendstate->as_valid_subplans != NULL)
275 			classify_matching_subplans(appendstate);
276 	}
277 
278 	/*
279 	 * Miscellaneous initialization
280 	 */
281 
282 	appendstate->ps.ps_ProjInfo = NULL;
283 
284 	/* For parallel query, this will be overridden later. */
285 	appendstate->choose_next_subplan = choose_next_subplan_locally;
286 
287 	return appendstate;
288 }
289 
290 /* ----------------------------------------------------------------
291  *	   ExecAppend
292  *
293  *		Handles iteration over multiple subplans.
294  * ----------------------------------------------------------------
295  */
296 static TupleTableSlot *
ExecAppend(PlanState * pstate)297 ExecAppend(PlanState *pstate)
298 {
299 	AppendState *node = castNode(AppendState, pstate);
300 	TupleTableSlot *result;
301 
302 	/*
303 	 * If this is the first call after Init or ReScan, we need to do the
304 	 * initialization work.
305 	 */
306 	if (!node->as_begun)
307 	{
308 		Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
309 		Assert(!node->as_syncdone);
310 
311 		/* Nothing to do if there are no subplans */
312 		if (node->as_nplans == 0)
313 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
314 
315 		/* If there are any async subplans, begin executing them. */
316 		if (node->as_nasyncplans > 0)
317 			ExecAppendAsyncBegin(node);
318 
319 		/*
320 		 * If no sync subplan has been chosen, we must choose one before
321 		 * proceeding.
322 		 */
323 		if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
324 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
325 
326 		Assert(node->as_syncdone ||
327 			   (node->as_whichplan >= 0 &&
328 				node->as_whichplan < node->as_nplans));
329 
330 		/* And we're initialized. */
331 		node->as_begun = true;
332 	}
333 
334 	for (;;)
335 	{
336 		PlanState  *subnode;
337 
338 		CHECK_FOR_INTERRUPTS();
339 
340 		/*
341 		 * try to get a tuple from an async subplan if any
342 		 */
343 		if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
344 		{
345 			if (ExecAppendAsyncGetNext(node, &result))
346 				return result;
347 			Assert(!node->as_syncdone);
348 			Assert(bms_is_empty(node->as_needrequest));
349 		}
350 
351 		/*
352 		 * figure out which sync subplan we are currently processing
353 		 */
354 		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
355 		subnode = node->appendplans[node->as_whichplan];
356 
357 		/*
358 		 * get a tuple from the subplan
359 		 */
360 		result = ExecProcNode(subnode);
361 
362 		if (!TupIsNull(result))
363 		{
364 			/*
365 			 * If the subplan gave us something then return it as-is. We do
366 			 * NOT make use of the result slot that was set up in
367 			 * ExecInitAppend; there's no need for it.
368 			 */
369 			return result;
370 		}
371 
372 		/*
373 		 * wait or poll for async events if any. We do this before checking
374 		 * for the end of iteration, because it might drain the remaining
375 		 * async subplans.
376 		 */
377 		if (node->as_nasyncremain > 0)
378 			ExecAppendAsyncEventWait(node);
379 
380 		/* choose new sync subplan; if no sync/async subplans, we're done */
381 		if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
382 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
383 	}
384 }
385 
386 /* ----------------------------------------------------------------
387  *		ExecEndAppend
388  *
389  *		Shuts down the subscans of the append node.
390  *
391  *		Returns nothing of interest.
392  * ----------------------------------------------------------------
393  */
394 void
ExecEndAppend(AppendState * node)395 ExecEndAppend(AppendState *node)
396 {
397 	PlanState **appendplans;
398 	int			nplans;
399 	int			i;
400 
401 	/*
402 	 * get information from the node
403 	 */
404 	appendplans = node->appendplans;
405 	nplans = node->as_nplans;
406 
407 	/*
408 	 * shut down each of the subscans
409 	 */
410 	for (i = 0; i < nplans; i++)
411 		ExecEndNode(appendplans[i]);
412 }
413 
414 void
ExecReScanAppend(AppendState * node)415 ExecReScanAppend(AppendState *node)
416 {
417 	int			nasyncplans = node->as_nasyncplans;
418 	int			i;
419 
420 	/*
421 	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
422 	 * we'd better unset the valid subplans so that they are reselected for
423 	 * the new parameter values.
424 	 */
425 	if (node->as_prune_state &&
426 		bms_overlap(node->ps.chgParam,
427 					node->as_prune_state->execparamids))
428 	{
429 		bms_free(node->as_valid_subplans);
430 		node->as_valid_subplans = NULL;
431 		if (nasyncplans > 0)
432 		{
433 			bms_free(node->as_valid_asyncplans);
434 			node->as_valid_asyncplans = NULL;
435 		}
436 	}
437 
438 	for (i = 0; i < node->as_nplans; i++)
439 	{
440 		PlanState  *subnode = node->appendplans[i];
441 
442 		/*
443 		 * ExecReScan doesn't know about my subplans, so I have to do
444 		 * changed-parameter signaling myself.
445 		 */
446 		if (node->ps.chgParam != NULL)
447 			UpdateChangedParamSet(subnode, node->ps.chgParam);
448 
449 		/*
450 		 * If chgParam of subnode is not null then plan will be re-scanned by
451 		 * first ExecProcNode or by first ExecAsyncRequest.
452 		 */
453 		if (subnode->chgParam == NULL)
454 			ExecReScan(subnode);
455 	}
456 
457 	/* Reset async state */
458 	if (nasyncplans > 0)
459 	{
460 		i = -1;
461 		while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
462 		{
463 			AsyncRequest *areq = node->as_asyncrequests[i];
464 
465 			areq->callback_pending = false;
466 			areq->request_complete = false;
467 			areq->result = NULL;
468 		}
469 
470 		node->as_nasyncresults = 0;
471 		node->as_nasyncremain = 0;
472 		bms_free(node->as_needrequest);
473 		node->as_needrequest = NULL;
474 	}
475 
476 	/* Let choose_next_subplan_* function handle setting the first subplan */
477 	node->as_whichplan = INVALID_SUBPLAN_INDEX;
478 	node->as_syncdone = false;
479 	node->as_begun = false;
480 }
481 
482 /* ----------------------------------------------------------------
483  *						Parallel Append Support
484  * ----------------------------------------------------------------
485  */
486 
487 /* ----------------------------------------------------------------
488  *		ExecAppendEstimate
489  *
490  *		Compute the amount of space we'll need in the parallel
491  *		query DSM, and inform pcxt->estimator about our needs.
492  * ----------------------------------------------------------------
493  */
494 void
ExecAppendEstimate(AppendState * node,ParallelContext * pcxt)495 ExecAppendEstimate(AppendState *node,
496 				   ParallelContext *pcxt)
497 {
498 	node->pstate_len =
499 		add_size(offsetof(ParallelAppendState, pa_finished),
500 				 sizeof(bool) * node->as_nplans);
501 
502 	shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
503 	shm_toc_estimate_keys(&pcxt->estimator, 1);
504 }
505 
506 
507 /* ----------------------------------------------------------------
508  *		ExecAppendInitializeDSM
509  *
510  *		Set up shared state for Parallel Append.
511  * ----------------------------------------------------------------
512  */
513 void
ExecAppendInitializeDSM(AppendState * node,ParallelContext * pcxt)514 ExecAppendInitializeDSM(AppendState *node,
515 						ParallelContext *pcxt)
516 {
517 	ParallelAppendState *pstate;
518 
519 	pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
520 	memset(pstate, 0, node->pstate_len);
521 	LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
522 	shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
523 
524 	node->as_pstate = pstate;
525 	node->choose_next_subplan = choose_next_subplan_for_leader;
526 }
527 
528 /* ----------------------------------------------------------------
529  *		ExecAppendReInitializeDSM
530  *
531  *		Reset shared state before beginning a fresh scan.
532  * ----------------------------------------------------------------
533  */
534 void
ExecAppendReInitializeDSM(AppendState * node,ParallelContext * pcxt)535 ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
536 {
537 	ParallelAppendState *pstate = node->as_pstate;
538 
539 	pstate->pa_next_plan = 0;
540 	memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
541 }
542 
543 /* ----------------------------------------------------------------
544  *		ExecAppendInitializeWorker
545  *
546  *		Copy relevant information from TOC into planstate, and initialize
547  *		whatever is required to choose and execute the optimal subplan.
548  * ----------------------------------------------------------------
549  */
550 void
ExecAppendInitializeWorker(AppendState * node,ParallelWorkerContext * pwcxt)551 ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
552 {
553 	node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
554 	node->choose_next_subplan = choose_next_subplan_for_worker;
555 }
556 
557 /* ----------------------------------------------------------------
558  *		choose_next_subplan_locally
559  *
560  *		Choose next sync subplan for a non-parallel-aware Append,
561  *		returning false if there are no more.
562  * ----------------------------------------------------------------
563  */
564 static bool
choose_next_subplan_locally(AppendState * node)565 choose_next_subplan_locally(AppendState *node)
566 {
567 	int			whichplan = node->as_whichplan;
568 	int			nextplan;
569 
570 	/* We should never be called when there are no subplans */
571 	Assert(node->as_nplans > 0);
572 
573 	/* Nothing to do if syncdone */
574 	if (node->as_syncdone)
575 		return false;
576 
577 	/*
578 	 * If first call then have the bms member function choose the first valid
579 	 * sync subplan by initializing whichplan to -1.  If there happen to be no
580 	 * valid sync subplans then the bms member function will handle that by
581 	 * returning a negative number which will allow us to exit returning a
582 	 * false value.
583 	 */
584 	if (whichplan == INVALID_SUBPLAN_INDEX)
585 	{
586 		if (node->as_nasyncplans > 0)
587 		{
588 			/* We'd have filled as_valid_subplans already */
589 			Assert(node->as_valid_subplans);
590 		}
591 		else if (node->as_valid_subplans == NULL)
592 			node->as_valid_subplans =
593 				ExecFindMatchingSubPlans(node->as_prune_state);
594 
595 		whichplan = -1;
596 	}
597 
598 	/* Ensure whichplan is within the expected range */
599 	Assert(whichplan >= -1 && whichplan <= node->as_nplans);
600 
601 	if (ScanDirectionIsForward(node->ps.state->es_direction))
602 		nextplan = bms_next_member(node->as_valid_subplans, whichplan);
603 	else
604 		nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
605 
606 	if (nextplan < 0)
607 	{
608 		/* Set as_syncdone if in async mode */
609 		if (node->as_nasyncplans > 0)
610 			node->as_syncdone = true;
611 		return false;
612 	}
613 
614 	node->as_whichplan = nextplan;
615 
616 	return true;
617 }
618 
619 /* ----------------------------------------------------------------
620  *		choose_next_subplan_for_leader
621  *
622  *      Try to pick a plan which doesn't commit us to doing much
623  *      work locally, so that as much work as possible is done in
624  *      the workers.  Cheapest subplans are at the end.
625  * ----------------------------------------------------------------
626  */
627 static bool
choose_next_subplan_for_leader(AppendState * node)628 choose_next_subplan_for_leader(AppendState *node)
629 {
630 	ParallelAppendState *pstate = node->as_pstate;
631 
632 	/* Backward scan is not supported by parallel-aware plans */
633 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
634 
635 	/* We should never be called when there are no subplans */
636 	Assert(node->as_nplans > 0);
637 
638 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
639 
640 	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
641 	{
642 		/* Mark just-completed subplan as finished. */
643 		node->as_pstate->pa_finished[node->as_whichplan] = true;
644 	}
645 	else
646 	{
647 		/* Start with last subplan. */
648 		node->as_whichplan = node->as_nplans - 1;
649 
650 		/*
651 		 * If we've yet to determine the valid subplans then do so now.  If
652 		 * run-time pruning is disabled then the valid subplans will always be
653 		 * set to all subplans.
654 		 */
655 		if (node->as_valid_subplans == NULL)
656 		{
657 			node->as_valid_subplans =
658 				ExecFindMatchingSubPlans(node->as_prune_state);
659 
660 			/*
661 			 * Mark each invalid plan as finished to allow the loop below to
662 			 * select the first valid subplan.
663 			 */
664 			mark_invalid_subplans_as_finished(node);
665 		}
666 	}
667 
668 	/* Loop until we find a subplan to execute. */
669 	while (pstate->pa_finished[node->as_whichplan])
670 	{
671 		if (node->as_whichplan == 0)
672 		{
673 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
674 			node->as_whichplan = INVALID_SUBPLAN_INDEX;
675 			LWLockRelease(&pstate->pa_lock);
676 			return false;
677 		}
678 
679 		/*
680 		 * We needn't pay attention to as_valid_subplans here as all invalid
681 		 * plans have been marked as finished.
682 		 */
683 		node->as_whichplan--;
684 	}
685 
686 	/* If non-partial, immediately mark as finished. */
687 	if (node->as_whichplan < node->as_first_partial_plan)
688 		node->as_pstate->pa_finished[node->as_whichplan] = true;
689 
690 	LWLockRelease(&pstate->pa_lock);
691 
692 	return true;
693 }
694 
695 /* ----------------------------------------------------------------
696  *		choose_next_subplan_for_worker
697  *
698  *		Choose next subplan for a parallel-aware Append, returning
699  *		false if there are no more.
700  *
701  *		We start from the first plan and advance through the list;
702  *		when we get back to the end, we loop back to the first
703  *		partial plan.  This assigns the non-partial plans first in
704  *		order of descending cost and then spreads out the workers
705  *		as evenly as possible across the remaining partial plans.
706  * ----------------------------------------------------------------
707  */
708 static bool
choose_next_subplan_for_worker(AppendState * node)709 choose_next_subplan_for_worker(AppendState *node)
710 {
711 	ParallelAppendState *pstate = node->as_pstate;
712 
713 	/* Backward scan is not supported by parallel-aware plans */
714 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
715 
716 	/* We should never be called when there are no subplans */
717 	Assert(node->as_nplans > 0);
718 
719 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
720 
721 	/* Mark just-completed subplan as finished. */
722 	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
723 		node->as_pstate->pa_finished[node->as_whichplan] = true;
724 
725 	/*
726 	 * If we've yet to determine the valid subplans then do so now.  If
727 	 * run-time pruning is disabled then the valid subplans will always be set
728 	 * to all subplans.
729 	 */
730 	else if (node->as_valid_subplans == NULL)
731 	{
732 		node->as_valid_subplans =
733 			ExecFindMatchingSubPlans(node->as_prune_state);
734 		mark_invalid_subplans_as_finished(node);
735 	}
736 
737 	/* If all the plans are already done, we have nothing to do */
738 	if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
739 	{
740 		LWLockRelease(&pstate->pa_lock);
741 		return false;
742 	}
743 
744 	/* Save the plan from which we are starting the search. */
745 	node->as_whichplan = pstate->pa_next_plan;
746 
747 	/* Loop until we find a valid subplan to execute. */
748 	while (pstate->pa_finished[pstate->pa_next_plan])
749 	{
750 		int			nextplan;
751 
752 		nextplan = bms_next_member(node->as_valid_subplans,
753 								   pstate->pa_next_plan);
754 		if (nextplan >= 0)
755 		{
756 			/* Advance to the next valid plan. */
757 			pstate->pa_next_plan = nextplan;
758 		}
759 		else if (node->as_whichplan > node->as_first_partial_plan)
760 		{
761 			/*
762 			 * Try looping back to the first valid partial plan, if there is
763 			 * one.  If there isn't, arrange to bail out below.
764 			 */
765 			nextplan = bms_next_member(node->as_valid_subplans,
766 									   node->as_first_partial_plan - 1);
767 			pstate->pa_next_plan =
768 				nextplan < 0 ? node->as_whichplan : nextplan;
769 		}
770 		else
771 		{
772 			/*
773 			 * At last plan, and either there are no partial plans or we've
774 			 * tried them all.  Arrange to bail out.
775 			 */
776 			pstate->pa_next_plan = node->as_whichplan;
777 		}
778 
779 		if (pstate->pa_next_plan == node->as_whichplan)
780 		{
781 			/* We've tried everything! */
782 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
783 			LWLockRelease(&pstate->pa_lock);
784 			return false;
785 		}
786 	}
787 
788 	/* Pick the plan we found, and advance pa_next_plan one more time. */
789 	node->as_whichplan = pstate->pa_next_plan;
790 	pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
791 										   pstate->pa_next_plan);
792 
793 	/*
794 	 * If there are no more valid plans then try setting the next plan to the
795 	 * first valid partial plan.
796 	 */
797 	if (pstate->pa_next_plan < 0)
798 	{
799 		int			nextplan = bms_next_member(node->as_valid_subplans,
800 											   node->as_first_partial_plan - 1);
801 
802 		if (nextplan >= 0)
803 			pstate->pa_next_plan = nextplan;
804 		else
805 		{
806 			/*
807 			 * There are no valid partial plans, and we already chose the last
808 			 * non-partial plan; so flag that there's nothing more for our
809 			 * fellow workers to do.
810 			 */
811 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
812 		}
813 	}
814 
815 	/* If non-partial, immediately mark as finished. */
816 	if (node->as_whichplan < node->as_first_partial_plan)
817 		node->as_pstate->pa_finished[node->as_whichplan] = true;
818 
819 	LWLockRelease(&pstate->pa_lock);
820 
821 	return true;
822 }
823 
824 /*
825  * mark_invalid_subplans_as_finished
826  *		Marks the ParallelAppendState's pa_finished as true for each invalid
827  *		subplan.
828  *
829  * This function should only be called for parallel Append with run-time
830  * pruning enabled.
831  */
832 static void
mark_invalid_subplans_as_finished(AppendState * node)833 mark_invalid_subplans_as_finished(AppendState *node)
834 {
835 	int			i;
836 
837 	/* Only valid to call this while in parallel Append mode */
838 	Assert(node->as_pstate);
839 
840 	/* Shouldn't have been called when run-time pruning is not enabled */
841 	Assert(node->as_prune_state);
842 
843 	/* Nothing to do if all plans are valid */
844 	if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
845 		return;
846 
847 	/* Mark all non-valid plans as finished */
848 	for (i = 0; i < node->as_nplans; i++)
849 	{
850 		if (!bms_is_member(i, node->as_valid_subplans))
851 			node->as_pstate->pa_finished[i] = true;
852 	}
853 }
854 
855 /* ----------------------------------------------------------------
856  *						Asynchronous Append Support
857  * ----------------------------------------------------------------
858  */
859 
860 /* ----------------------------------------------------------------
861  *		ExecAppendAsyncBegin
862  *
863  *		Begin executing designed async-capable subplans.
864  * ----------------------------------------------------------------
865  */
866 static void
ExecAppendAsyncBegin(AppendState * node)867 ExecAppendAsyncBegin(AppendState *node)
868 {
869 	int			i;
870 
871 	/* Backward scan is not supported by async-aware Appends. */
872 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
873 
874 	/* We should never be called when there are no subplans */
875 	Assert(node->as_nplans > 0);
876 
877 	/* We should never be called when there are no async subplans. */
878 	Assert(node->as_nasyncplans > 0);
879 
880 	/* If we've yet to determine the valid subplans then do so now. */
881 	if (node->as_valid_subplans == NULL)
882 	{
883 		node->as_valid_subplans =
884 			ExecFindMatchingSubPlans(node->as_prune_state);
885 
886 		classify_matching_subplans(node);
887 	}
888 
889 	/* Initialize state variables. */
890 	node->as_syncdone = bms_is_empty(node->as_valid_subplans);
891 	node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
892 
893 	/* Nothing to do if there are no valid async subplans. */
894 	if (node->as_nasyncremain == 0)
895 		return;
896 
897 	/* Make a request for each of the valid async subplans. */
898 	i = -1;
899 	while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
900 	{
901 		AsyncRequest *areq = node->as_asyncrequests[i];
902 
903 		Assert(areq->request_index == i);
904 		Assert(!areq->callback_pending);
905 
906 		/* Do the actual work. */
907 		ExecAsyncRequest(areq);
908 	}
909 }
910 
911 /* ----------------------------------------------------------------
912  *		ExecAppendAsyncGetNext
913  *
914  *		Get the next tuple from any of the asynchronous subplans.
915  * ----------------------------------------------------------------
916  */
917 static bool
ExecAppendAsyncGetNext(AppendState * node,TupleTableSlot ** result)918 ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
919 {
920 	*result = NULL;
921 
922 	/* We should never be called when there are no valid async subplans. */
923 	Assert(node->as_nasyncremain > 0);
924 
925 	/* Request a tuple asynchronously. */
926 	if (ExecAppendAsyncRequest(node, result))
927 		return true;
928 
929 	while (node->as_nasyncremain > 0)
930 	{
931 		CHECK_FOR_INTERRUPTS();
932 
933 		/* Wait or poll for async events. */
934 		ExecAppendAsyncEventWait(node);
935 
936 		/* Request a tuple asynchronously. */
937 		if (ExecAppendAsyncRequest(node, result))
938 			return true;
939 
940 		/* Break from loop if there's any sync subplan that isn't complete. */
941 		if (!node->as_syncdone)
942 			break;
943 	}
944 
945 	/*
946 	 * If all sync subplans are complete, we're totally done scanning the
947 	 * given node.  Otherwise, we're done with the asynchronous stuff but must
948 	 * continue scanning the sync subplans.
949 	 */
950 	if (node->as_syncdone)
951 	{
952 		Assert(node->as_nasyncremain == 0);
953 		*result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
954 		return true;
955 	}
956 
957 	return false;
958 }
959 
960 /* ----------------------------------------------------------------
961  *		ExecAppendAsyncRequest
962  *
963  *		Request a tuple asynchronously.
964  * ----------------------------------------------------------------
965  */
966 static bool
ExecAppendAsyncRequest(AppendState * node,TupleTableSlot ** result)967 ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
968 {
969 	Bitmapset  *needrequest;
970 	int			i;
971 
972 	/* Nothing to do if there are no async subplans needing a new request. */
973 	if (bms_is_empty(node->as_needrequest))
974 	{
975 		Assert(node->as_nasyncresults == 0);
976 		return false;
977 	}
978 
979 	/*
980 	 * If there are any asynchronously-generated results that have not yet
981 	 * been returned, we have nothing to do; just return one of them.
982 	 */
983 	if (node->as_nasyncresults > 0)
984 	{
985 		--node->as_nasyncresults;
986 		*result = node->as_asyncresults[node->as_nasyncresults];
987 		return true;
988 	}
989 
990 	/* Make a new request for each of the async subplans that need it. */
991 	needrequest = node->as_needrequest;
992 	node->as_needrequest = NULL;
993 	i = -1;
994 	while ((i = bms_next_member(needrequest, i)) >= 0)
995 	{
996 		AsyncRequest *areq = node->as_asyncrequests[i];
997 
998 		/* Do the actual work. */
999 		ExecAsyncRequest(areq);
1000 	}
1001 	bms_free(needrequest);
1002 
1003 	/* Return one of the asynchronously-generated results if any. */
1004 	if (node->as_nasyncresults > 0)
1005 	{
1006 		--node->as_nasyncresults;
1007 		*result = node->as_asyncresults[node->as_nasyncresults];
1008 		return true;
1009 	}
1010 
1011 	return false;
1012 }
1013 
1014 /* ----------------------------------------------------------------
1015  *		ExecAppendAsyncEventWait
1016  *
1017  *		Wait or poll for file descriptor events and fire callbacks.
1018  * ----------------------------------------------------------------
1019  */
1020 static void
ExecAppendAsyncEventWait(AppendState * node)1021 ExecAppendAsyncEventWait(AppendState *node)
1022 {
1023 	int			nevents = node->as_nasyncplans + 1;
1024 	long		timeout = node->as_syncdone ? -1 : 0;
1025 	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
1026 	int			noccurred;
1027 	int			i;
1028 
1029 	/* We should never be called when there are no valid async subplans. */
1030 	Assert(node->as_nasyncremain > 0);
1031 
1032 	node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
1033 	AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1034 					  NULL, NULL);
1035 
1036 	/* Give each waiting subplan a chance to add an event. */
1037 	i = -1;
1038 	while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1039 	{
1040 		AsyncRequest *areq = node->as_asyncrequests[i];
1041 
1042 		if (areq->callback_pending)
1043 			ExecAsyncConfigureWait(areq);
1044 	}
1045 
1046 	/*
1047 	 * No need for further processing if there are no configured events other
1048 	 * than the postmaster death event.
1049 	 */
1050 	if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1051 	{
1052 		FreeWaitEventSet(node->as_eventset);
1053 		node->as_eventset = NULL;
1054 		return;
1055 	}
1056 
1057 	/* We wait on at most EVENT_BUFFER_SIZE events. */
1058 	if (nevents > EVENT_BUFFER_SIZE)
1059 		nevents = EVENT_BUFFER_SIZE;
1060 
1061 	/*
1062 	 * If the timeout is -1, wait until at least one event occurs.  If the
1063 	 * timeout is 0, poll for events, but do not wait at all.
1064 	 */
1065 	noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1066 								 nevents, WAIT_EVENT_APPEND_READY);
1067 	FreeWaitEventSet(node->as_eventset);
1068 	node->as_eventset = NULL;
1069 	if (noccurred == 0)
1070 		return;
1071 
1072 	/* Deliver notifications. */
1073 	for (i = 0; i < noccurred; i++)
1074 	{
1075 		WaitEvent  *w = &occurred_event[i];
1076 
1077 		/*
1078 		 * Each waiting subplan should have registered its wait event with
1079 		 * user_data pointing back to its AsyncRequest.
1080 		 */
1081 		if ((w->events & WL_SOCKET_READABLE) != 0)
1082 		{
1083 			AsyncRequest *areq = (AsyncRequest *) w->user_data;
1084 
1085 			if (areq->callback_pending)
1086 			{
1087 				/*
1088 				 * Mark it as no longer needing a callback.  We must do this
1089 				 * before dispatching the callback in case the callback resets
1090 				 * the flag.
1091 				 */
1092 				areq->callback_pending = false;
1093 
1094 				/* Do the actual work. */
1095 				ExecAsyncNotify(areq);
1096 			}
1097 		}
1098 	}
1099 }
1100 
1101 /* ----------------------------------------------------------------
1102  *		ExecAsyncAppendResponse
1103  *
1104  *		Receive a response from an asynchronous request we made.
1105  * ----------------------------------------------------------------
1106  */
1107 void
ExecAsyncAppendResponse(AsyncRequest * areq)1108 ExecAsyncAppendResponse(AsyncRequest *areq)
1109 {
1110 	AppendState *node = (AppendState *) areq->requestor;
1111 	TupleTableSlot *slot = areq->result;
1112 
1113 	/* The result should be a TupleTableSlot or NULL. */
1114 	Assert(slot == NULL || IsA(slot, TupleTableSlot));
1115 
1116 	/* Nothing to do if the request is pending. */
1117 	if (!areq->request_complete)
1118 	{
1119 		/* The request would have been pending for a callback. */
1120 		Assert(areq->callback_pending);
1121 		return;
1122 	}
1123 
1124 	/* If the result is NULL or an empty slot, there's nothing more to do. */
1125 	if (TupIsNull(slot))
1126 	{
1127 		/* The ending subplan wouldn't have been pending for a callback. */
1128 		Assert(!areq->callback_pending);
1129 		--node->as_nasyncremain;
1130 		return;
1131 	}
1132 
1133 	/* Save result so we can return it. */
1134 	Assert(node->as_nasyncresults < node->as_nasyncplans);
1135 	node->as_asyncresults[node->as_nasyncresults++] = slot;
1136 
1137 	/*
1138 	 * Mark the subplan that returned a result as ready for a new request.  We
1139 	 * don't launch another one here immediately because it might complete.
1140 	 */
1141 	node->as_needrequest = bms_add_member(node->as_needrequest,
1142 										  areq->request_index);
1143 }
1144 
1145 /* ----------------------------------------------------------------
1146  *		classify_matching_subplans
1147  *
1148  *		Classify the node's as_valid_subplans into sync ones and
1149  *		async ones, adjust it to contain sync ones only, and save
1150  *		async ones in the node's as_valid_asyncplans.
1151  * ----------------------------------------------------------------
1152  */
1153 static void
classify_matching_subplans(AppendState * node)1154 classify_matching_subplans(AppendState *node)
1155 {
1156 	Bitmapset  *valid_asyncplans;
1157 
1158 	Assert(node->as_valid_asyncplans == NULL);
1159 
1160 	/* Nothing to do if there are no valid subplans. */
1161 	if (bms_is_empty(node->as_valid_subplans))
1162 	{
1163 		node->as_syncdone = true;
1164 		node->as_nasyncremain = 0;
1165 		return;
1166 	}
1167 
1168 	/* Nothing to do if there are no valid async subplans. */
1169 	if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1170 	{
1171 		node->as_nasyncremain = 0;
1172 		return;
1173 	}
1174 
1175 	/* Get valid async subplans. */
1176 	valid_asyncplans = bms_copy(node->as_asyncplans);
1177 	valid_asyncplans = bms_int_members(valid_asyncplans,
1178 									   node->as_valid_subplans);
1179 
1180 	/* Adjust the valid subplans to contain sync subplans only. */
1181 	node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1182 											  valid_asyncplans);
1183 
1184 	/* Save valid async subplans. */
1185 	node->as_valid_asyncplans = valid_asyncplans;
1186 }
1187