1 /*-------------------------------------------------------------------------
2 *
3 * nodeAppend.c
4 * routines to handle append nodes.
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 /* INTERFACE ROUTINES
16 * ExecInitAppend - initialize the append node
17 * ExecAppend - retrieve the next tuple from the node
18 * ExecEndAppend - shut down the append node
19 * ExecReScanAppend - rescan the append node
20 *
21 * NOTES
22 * Each append node contains a list of one or more subplans which
23 * must be iteratively processed (forwards or backwards).
24 * Tuples are retrieved by executing the 'whichplan'th subplan
25 * until the subplan stops returning tuples, at which point that
26 * plan is shut down and the next started up.
27 *
28 * Append nodes don't make use of their left and right
29 * subtrees, rather they maintain a list of subplans so
30 * a typical append node looks like this in the plan tree:
31 *
32 * ...
33 * /
34 * Append -------+------+------+--- nil
35 * / \ | | |
36 * nil nil ... ... ...
37 * subplans
38 *
39 * Append nodes are currently used for unions, and to support
40 * inheritance queries, where several relations need to be scanned.
41 * For example, in our standard person/student/employee/student-emp
42 * example, where student and employee inherit from person
43 * and student-emp inherits from student and employee, the
44 * query:
45 *
46 * select name from person
47 *
48 * generates the plan:
49 *
50 * |
51 * Append -------+-------+--------+--------+
52 * / \ | | | |
53 * nil nil Scan Scan Scan Scan
54 * | | | |
55 * person employee student student-emp
56 */
57
58 #include "postgres.h"
59
60 #include "executor/execAsync.h"
61 #include "executor/execdebug.h"
62 #include "executor/execPartition.h"
63 #include "executor/nodeAppend.h"
64 #include "miscadmin.h"
65 #include "pgstat.h"
66 #include "storage/latch.h"
67
68 /* Shared state for parallel-aware Append. */
69 struct ParallelAppendState
70 {
71 LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 int pa_next_plan; /* next plan to choose by any worker */
73
74 /*
75 * pa_finished[i] should be true if no more workers should select subplan
76 * i. for a non-partial plan, this should be set to true as soon as a
77 * worker selects the plan; for a partial plan, it remains false until
78 * some worker executes the plan to completion.
79 */
80 bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 };
82
83 #define INVALID_SUBPLAN_INDEX -1
84 #define EVENT_BUFFER_SIZE 16
85
86 static TupleTableSlot *ExecAppend(PlanState *pstate);
87 static bool choose_next_subplan_locally(AppendState *node);
88 static bool choose_next_subplan_for_leader(AppendState *node);
89 static bool choose_next_subplan_for_worker(AppendState *node);
90 static void mark_invalid_subplans_as_finished(AppendState *node);
91 static void ExecAppendAsyncBegin(AppendState *node);
92 static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 static void ExecAppendAsyncEventWait(AppendState *node);
95 static void classify_matching_subplans(AppendState *node);
96
97 /* ----------------------------------------------------------------
98 * ExecInitAppend
99 *
100 * Begin all of the subscans of the append node.
101 *
102 * (This is potentially wasteful, since the entire result of the
103 * append node may not be scanned, but this way all of the
104 * structures get allocated in the executor's top level memory
105 * block instead of that of the call to ExecAppend.)
106 * ----------------------------------------------------------------
107 */
108 AppendState *
ExecInitAppend(Append * node,EState * estate,int eflags)109 ExecInitAppend(Append *node, EState *estate, int eflags)
110 {
111 AppendState *appendstate = makeNode(AppendState);
112 PlanState **appendplanstates;
113 Bitmapset *validsubplans;
114 Bitmapset *asyncplans;
115 int nplans;
116 int nasyncplans;
117 int firstvalid;
118 int i,
119 j;
120
121 /* check for unsupported flags */
122 Assert(!(eflags & EXEC_FLAG_MARK));
123
124 /*
125 * create new AppendState for our append node
126 */
127 appendstate->ps.plan = (Plan *) node;
128 appendstate->ps.state = estate;
129 appendstate->ps.ExecProcNode = ExecAppend;
130
131 /* Let choose_next_subplan_* function handle setting the first subplan */
132 appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
133 appendstate->as_syncdone = false;
134 appendstate->as_begun = false;
135
136 /* If run-time partition pruning is enabled, then set that up now */
137 if (node->part_prune_info != NULL)
138 {
139 PartitionPruneState *prunestate;
140
141 /* We may need an expression context to evaluate partition exprs */
142 ExecAssignExprContext(estate, &appendstate->ps);
143
144 /* Create the working data structure for pruning. */
145 prunestate = ExecCreatePartitionPruneState(&appendstate->ps,
146 node->part_prune_info);
147 appendstate->as_prune_state = prunestate;
148
149 /* Perform an initial partition prune, if required. */
150 if (prunestate->do_initial_prune)
151 {
152 /* Determine which subplans survive initial pruning */
153 validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
154 list_length(node->appendplans));
155
156 nplans = bms_num_members(validsubplans);
157 }
158 else
159 {
160 /* We'll need to initialize all subplans */
161 nplans = list_length(node->appendplans);
162 Assert(nplans > 0);
163 validsubplans = bms_add_range(NULL, 0, nplans - 1);
164 }
165
166 /*
167 * When no run-time pruning is required and there's at least one
168 * subplan, we can fill as_valid_subplans immediately, preventing
169 * later calls to ExecFindMatchingSubPlans.
170 */
171 if (!prunestate->do_exec_prune && nplans > 0)
172 appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
173 }
174 else
175 {
176 nplans = list_length(node->appendplans);
177
178 /*
179 * When run-time partition pruning is not enabled we can just mark all
180 * subplans as valid; they must also all be initialized.
181 */
182 Assert(nplans > 0);
183 appendstate->as_valid_subplans = validsubplans =
184 bms_add_range(NULL, 0, nplans - 1);
185 appendstate->as_prune_state = NULL;
186 }
187
188 /*
189 * Initialize result tuple type and slot.
190 */
191 ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
192
193 /* node returns slots from each of its subnodes, therefore not fixed */
194 appendstate->ps.resultopsset = true;
195 appendstate->ps.resultopsfixed = false;
196
197 appendplanstates = (PlanState **) palloc(nplans *
198 sizeof(PlanState *));
199
200 /*
201 * call ExecInitNode on each of the valid plans to be executed and save
202 * the results into the appendplanstates array.
203 *
204 * While at it, find out the first valid partial plan.
205 */
206 j = 0;
207 asyncplans = NULL;
208 nasyncplans = 0;
209 firstvalid = nplans;
210 i = -1;
211 while ((i = bms_next_member(validsubplans, i)) >= 0)
212 {
213 Plan *initNode = (Plan *) list_nth(node->appendplans, i);
214
215 /*
216 * Record async subplans. When executing EvalPlanQual, we treat them
217 * as sync ones; don't do this when initializing an EvalPlanQual plan
218 * tree.
219 */
220 if (initNode->async_capable && estate->es_epq_active == NULL)
221 {
222 asyncplans = bms_add_member(asyncplans, j);
223 nasyncplans++;
224 }
225
226 /*
227 * Record the lowest appendplans index which is a valid partial plan.
228 */
229 if (i >= node->first_partial_plan && j < firstvalid)
230 firstvalid = j;
231
232 appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
233 }
234
235 appendstate->as_first_partial_plan = firstvalid;
236 appendstate->appendplans = appendplanstates;
237 appendstate->as_nplans = nplans;
238
239 /* Initialize async state */
240 appendstate->as_asyncplans = asyncplans;
241 appendstate->as_nasyncplans = nasyncplans;
242 appendstate->as_asyncrequests = NULL;
243 appendstate->as_asyncresults = NULL;
244 appendstate->as_nasyncresults = 0;
245 appendstate->as_nasyncremain = 0;
246 appendstate->as_needrequest = NULL;
247 appendstate->as_eventset = NULL;
248 appendstate->as_valid_asyncplans = NULL;
249
250 if (nasyncplans > 0)
251 {
252 appendstate->as_asyncrequests = (AsyncRequest **)
253 palloc0(nplans * sizeof(AsyncRequest *));
254
255 i = -1;
256 while ((i = bms_next_member(asyncplans, i)) >= 0)
257 {
258 AsyncRequest *areq;
259
260 areq = palloc(sizeof(AsyncRequest));
261 areq->requestor = (PlanState *) appendstate;
262 areq->requestee = appendplanstates[i];
263 areq->request_index = i;
264 areq->callback_pending = false;
265 areq->request_complete = false;
266 areq->result = NULL;
267
268 appendstate->as_asyncrequests[i] = areq;
269 }
270
271 appendstate->as_asyncresults = (TupleTableSlot **)
272 palloc0(nasyncplans * sizeof(TupleTableSlot *));
273
274 if (appendstate->as_valid_subplans != NULL)
275 classify_matching_subplans(appendstate);
276 }
277
278 /*
279 * Miscellaneous initialization
280 */
281
282 appendstate->ps.ps_ProjInfo = NULL;
283
284 /* For parallel query, this will be overridden later. */
285 appendstate->choose_next_subplan = choose_next_subplan_locally;
286
287 return appendstate;
288 }
289
290 /* ----------------------------------------------------------------
291 * ExecAppend
292 *
293 * Handles iteration over multiple subplans.
294 * ----------------------------------------------------------------
295 */
296 static TupleTableSlot *
ExecAppend(PlanState * pstate)297 ExecAppend(PlanState *pstate)
298 {
299 AppendState *node = castNode(AppendState, pstate);
300 TupleTableSlot *result;
301
302 /*
303 * If this is the first call after Init or ReScan, we need to do the
304 * initialization work.
305 */
306 if (!node->as_begun)
307 {
308 Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
309 Assert(!node->as_syncdone);
310
311 /* Nothing to do if there are no subplans */
312 if (node->as_nplans == 0)
313 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
314
315 /* If there are any async subplans, begin executing them. */
316 if (node->as_nasyncplans > 0)
317 ExecAppendAsyncBegin(node);
318
319 /*
320 * If no sync subplan has been chosen, we must choose one before
321 * proceeding.
322 */
323 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
324 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
325
326 Assert(node->as_syncdone ||
327 (node->as_whichplan >= 0 &&
328 node->as_whichplan < node->as_nplans));
329
330 /* And we're initialized. */
331 node->as_begun = true;
332 }
333
334 for (;;)
335 {
336 PlanState *subnode;
337
338 CHECK_FOR_INTERRUPTS();
339
340 /*
341 * try to get a tuple from an async subplan if any
342 */
343 if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
344 {
345 if (ExecAppendAsyncGetNext(node, &result))
346 return result;
347 Assert(!node->as_syncdone);
348 Assert(bms_is_empty(node->as_needrequest));
349 }
350
351 /*
352 * figure out which sync subplan we are currently processing
353 */
354 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
355 subnode = node->appendplans[node->as_whichplan];
356
357 /*
358 * get a tuple from the subplan
359 */
360 result = ExecProcNode(subnode);
361
362 if (!TupIsNull(result))
363 {
364 /*
365 * If the subplan gave us something then return it as-is. We do
366 * NOT make use of the result slot that was set up in
367 * ExecInitAppend; there's no need for it.
368 */
369 return result;
370 }
371
372 /*
373 * wait or poll for async events if any. We do this before checking
374 * for the end of iteration, because it might drain the remaining
375 * async subplans.
376 */
377 if (node->as_nasyncremain > 0)
378 ExecAppendAsyncEventWait(node);
379
380 /* choose new sync subplan; if no sync/async subplans, we're done */
381 if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
382 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
383 }
384 }
385
386 /* ----------------------------------------------------------------
387 * ExecEndAppend
388 *
389 * Shuts down the subscans of the append node.
390 *
391 * Returns nothing of interest.
392 * ----------------------------------------------------------------
393 */
394 void
ExecEndAppend(AppendState * node)395 ExecEndAppend(AppendState *node)
396 {
397 PlanState **appendplans;
398 int nplans;
399 int i;
400
401 /*
402 * get information from the node
403 */
404 appendplans = node->appendplans;
405 nplans = node->as_nplans;
406
407 /*
408 * shut down each of the subscans
409 */
410 for (i = 0; i < nplans; i++)
411 ExecEndNode(appendplans[i]);
412 }
413
414 void
ExecReScanAppend(AppendState * node)415 ExecReScanAppend(AppendState *node)
416 {
417 int nasyncplans = node->as_nasyncplans;
418 int i;
419
420 /*
421 * If any PARAM_EXEC Params used in pruning expressions have changed, then
422 * we'd better unset the valid subplans so that they are reselected for
423 * the new parameter values.
424 */
425 if (node->as_prune_state &&
426 bms_overlap(node->ps.chgParam,
427 node->as_prune_state->execparamids))
428 {
429 bms_free(node->as_valid_subplans);
430 node->as_valid_subplans = NULL;
431 if (nasyncplans > 0)
432 {
433 bms_free(node->as_valid_asyncplans);
434 node->as_valid_asyncplans = NULL;
435 }
436 }
437
438 for (i = 0; i < node->as_nplans; i++)
439 {
440 PlanState *subnode = node->appendplans[i];
441
442 /*
443 * ExecReScan doesn't know about my subplans, so I have to do
444 * changed-parameter signaling myself.
445 */
446 if (node->ps.chgParam != NULL)
447 UpdateChangedParamSet(subnode, node->ps.chgParam);
448
449 /*
450 * If chgParam of subnode is not null then plan will be re-scanned by
451 * first ExecProcNode or by first ExecAsyncRequest.
452 */
453 if (subnode->chgParam == NULL)
454 ExecReScan(subnode);
455 }
456
457 /* Reset async state */
458 if (nasyncplans > 0)
459 {
460 i = -1;
461 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
462 {
463 AsyncRequest *areq = node->as_asyncrequests[i];
464
465 areq->callback_pending = false;
466 areq->request_complete = false;
467 areq->result = NULL;
468 }
469
470 node->as_nasyncresults = 0;
471 node->as_nasyncremain = 0;
472 bms_free(node->as_needrequest);
473 node->as_needrequest = NULL;
474 }
475
476 /* Let choose_next_subplan_* function handle setting the first subplan */
477 node->as_whichplan = INVALID_SUBPLAN_INDEX;
478 node->as_syncdone = false;
479 node->as_begun = false;
480 }
481
482 /* ----------------------------------------------------------------
483 * Parallel Append Support
484 * ----------------------------------------------------------------
485 */
486
487 /* ----------------------------------------------------------------
488 * ExecAppendEstimate
489 *
490 * Compute the amount of space we'll need in the parallel
491 * query DSM, and inform pcxt->estimator about our needs.
492 * ----------------------------------------------------------------
493 */
494 void
ExecAppendEstimate(AppendState * node,ParallelContext * pcxt)495 ExecAppendEstimate(AppendState *node,
496 ParallelContext *pcxt)
497 {
498 node->pstate_len =
499 add_size(offsetof(ParallelAppendState, pa_finished),
500 sizeof(bool) * node->as_nplans);
501
502 shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
503 shm_toc_estimate_keys(&pcxt->estimator, 1);
504 }
505
506
507 /* ----------------------------------------------------------------
508 * ExecAppendInitializeDSM
509 *
510 * Set up shared state for Parallel Append.
511 * ----------------------------------------------------------------
512 */
513 void
ExecAppendInitializeDSM(AppendState * node,ParallelContext * pcxt)514 ExecAppendInitializeDSM(AppendState *node,
515 ParallelContext *pcxt)
516 {
517 ParallelAppendState *pstate;
518
519 pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
520 memset(pstate, 0, node->pstate_len);
521 LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
522 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
523
524 node->as_pstate = pstate;
525 node->choose_next_subplan = choose_next_subplan_for_leader;
526 }
527
528 /* ----------------------------------------------------------------
529 * ExecAppendReInitializeDSM
530 *
531 * Reset shared state before beginning a fresh scan.
532 * ----------------------------------------------------------------
533 */
534 void
ExecAppendReInitializeDSM(AppendState * node,ParallelContext * pcxt)535 ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
536 {
537 ParallelAppendState *pstate = node->as_pstate;
538
539 pstate->pa_next_plan = 0;
540 memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
541 }
542
543 /* ----------------------------------------------------------------
544 * ExecAppendInitializeWorker
545 *
546 * Copy relevant information from TOC into planstate, and initialize
547 * whatever is required to choose and execute the optimal subplan.
548 * ----------------------------------------------------------------
549 */
550 void
ExecAppendInitializeWorker(AppendState * node,ParallelWorkerContext * pwcxt)551 ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
552 {
553 node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
554 node->choose_next_subplan = choose_next_subplan_for_worker;
555 }
556
557 /* ----------------------------------------------------------------
558 * choose_next_subplan_locally
559 *
560 * Choose next sync subplan for a non-parallel-aware Append,
561 * returning false if there are no more.
562 * ----------------------------------------------------------------
563 */
564 static bool
choose_next_subplan_locally(AppendState * node)565 choose_next_subplan_locally(AppendState *node)
566 {
567 int whichplan = node->as_whichplan;
568 int nextplan;
569
570 /* We should never be called when there are no subplans */
571 Assert(node->as_nplans > 0);
572
573 /* Nothing to do if syncdone */
574 if (node->as_syncdone)
575 return false;
576
577 /*
578 * If first call then have the bms member function choose the first valid
579 * sync subplan by initializing whichplan to -1. If there happen to be no
580 * valid sync subplans then the bms member function will handle that by
581 * returning a negative number which will allow us to exit returning a
582 * false value.
583 */
584 if (whichplan == INVALID_SUBPLAN_INDEX)
585 {
586 if (node->as_nasyncplans > 0)
587 {
588 /* We'd have filled as_valid_subplans already */
589 Assert(node->as_valid_subplans);
590 }
591 else if (node->as_valid_subplans == NULL)
592 node->as_valid_subplans =
593 ExecFindMatchingSubPlans(node->as_prune_state);
594
595 whichplan = -1;
596 }
597
598 /* Ensure whichplan is within the expected range */
599 Assert(whichplan >= -1 && whichplan <= node->as_nplans);
600
601 if (ScanDirectionIsForward(node->ps.state->es_direction))
602 nextplan = bms_next_member(node->as_valid_subplans, whichplan);
603 else
604 nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
605
606 if (nextplan < 0)
607 {
608 /* Set as_syncdone if in async mode */
609 if (node->as_nasyncplans > 0)
610 node->as_syncdone = true;
611 return false;
612 }
613
614 node->as_whichplan = nextplan;
615
616 return true;
617 }
618
619 /* ----------------------------------------------------------------
620 * choose_next_subplan_for_leader
621 *
622 * Try to pick a plan which doesn't commit us to doing much
623 * work locally, so that as much work as possible is done in
624 * the workers. Cheapest subplans are at the end.
625 * ----------------------------------------------------------------
626 */
627 static bool
choose_next_subplan_for_leader(AppendState * node)628 choose_next_subplan_for_leader(AppendState *node)
629 {
630 ParallelAppendState *pstate = node->as_pstate;
631
632 /* Backward scan is not supported by parallel-aware plans */
633 Assert(ScanDirectionIsForward(node->ps.state->es_direction));
634
635 /* We should never be called when there are no subplans */
636 Assert(node->as_nplans > 0);
637
638 LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
639
640 if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
641 {
642 /* Mark just-completed subplan as finished. */
643 node->as_pstate->pa_finished[node->as_whichplan] = true;
644 }
645 else
646 {
647 /* Start with last subplan. */
648 node->as_whichplan = node->as_nplans - 1;
649
650 /*
651 * If we've yet to determine the valid subplans then do so now. If
652 * run-time pruning is disabled then the valid subplans will always be
653 * set to all subplans.
654 */
655 if (node->as_valid_subplans == NULL)
656 {
657 node->as_valid_subplans =
658 ExecFindMatchingSubPlans(node->as_prune_state);
659
660 /*
661 * Mark each invalid plan as finished to allow the loop below to
662 * select the first valid subplan.
663 */
664 mark_invalid_subplans_as_finished(node);
665 }
666 }
667
668 /* Loop until we find a subplan to execute. */
669 while (pstate->pa_finished[node->as_whichplan])
670 {
671 if (node->as_whichplan == 0)
672 {
673 pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
674 node->as_whichplan = INVALID_SUBPLAN_INDEX;
675 LWLockRelease(&pstate->pa_lock);
676 return false;
677 }
678
679 /*
680 * We needn't pay attention to as_valid_subplans here as all invalid
681 * plans have been marked as finished.
682 */
683 node->as_whichplan--;
684 }
685
686 /* If non-partial, immediately mark as finished. */
687 if (node->as_whichplan < node->as_first_partial_plan)
688 node->as_pstate->pa_finished[node->as_whichplan] = true;
689
690 LWLockRelease(&pstate->pa_lock);
691
692 return true;
693 }
694
695 /* ----------------------------------------------------------------
696 * choose_next_subplan_for_worker
697 *
698 * Choose next subplan for a parallel-aware Append, returning
699 * false if there are no more.
700 *
701 * We start from the first plan and advance through the list;
702 * when we get back to the end, we loop back to the first
703 * partial plan. This assigns the non-partial plans first in
704 * order of descending cost and then spreads out the workers
705 * as evenly as possible across the remaining partial plans.
706 * ----------------------------------------------------------------
707 */
708 static bool
choose_next_subplan_for_worker(AppendState * node)709 choose_next_subplan_for_worker(AppendState *node)
710 {
711 ParallelAppendState *pstate = node->as_pstate;
712
713 /* Backward scan is not supported by parallel-aware plans */
714 Assert(ScanDirectionIsForward(node->ps.state->es_direction));
715
716 /* We should never be called when there are no subplans */
717 Assert(node->as_nplans > 0);
718
719 LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
720
721 /* Mark just-completed subplan as finished. */
722 if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
723 node->as_pstate->pa_finished[node->as_whichplan] = true;
724
725 /*
726 * If we've yet to determine the valid subplans then do so now. If
727 * run-time pruning is disabled then the valid subplans will always be set
728 * to all subplans.
729 */
730 else if (node->as_valid_subplans == NULL)
731 {
732 node->as_valid_subplans =
733 ExecFindMatchingSubPlans(node->as_prune_state);
734 mark_invalid_subplans_as_finished(node);
735 }
736
737 /* If all the plans are already done, we have nothing to do */
738 if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
739 {
740 LWLockRelease(&pstate->pa_lock);
741 return false;
742 }
743
744 /* Save the plan from which we are starting the search. */
745 node->as_whichplan = pstate->pa_next_plan;
746
747 /* Loop until we find a valid subplan to execute. */
748 while (pstate->pa_finished[pstate->pa_next_plan])
749 {
750 int nextplan;
751
752 nextplan = bms_next_member(node->as_valid_subplans,
753 pstate->pa_next_plan);
754 if (nextplan >= 0)
755 {
756 /* Advance to the next valid plan. */
757 pstate->pa_next_plan = nextplan;
758 }
759 else if (node->as_whichplan > node->as_first_partial_plan)
760 {
761 /*
762 * Try looping back to the first valid partial plan, if there is
763 * one. If there isn't, arrange to bail out below.
764 */
765 nextplan = bms_next_member(node->as_valid_subplans,
766 node->as_first_partial_plan - 1);
767 pstate->pa_next_plan =
768 nextplan < 0 ? node->as_whichplan : nextplan;
769 }
770 else
771 {
772 /*
773 * At last plan, and either there are no partial plans or we've
774 * tried them all. Arrange to bail out.
775 */
776 pstate->pa_next_plan = node->as_whichplan;
777 }
778
779 if (pstate->pa_next_plan == node->as_whichplan)
780 {
781 /* We've tried everything! */
782 pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
783 LWLockRelease(&pstate->pa_lock);
784 return false;
785 }
786 }
787
788 /* Pick the plan we found, and advance pa_next_plan one more time. */
789 node->as_whichplan = pstate->pa_next_plan;
790 pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
791 pstate->pa_next_plan);
792
793 /*
794 * If there are no more valid plans then try setting the next plan to the
795 * first valid partial plan.
796 */
797 if (pstate->pa_next_plan < 0)
798 {
799 int nextplan = bms_next_member(node->as_valid_subplans,
800 node->as_first_partial_plan - 1);
801
802 if (nextplan >= 0)
803 pstate->pa_next_plan = nextplan;
804 else
805 {
806 /*
807 * There are no valid partial plans, and we already chose the last
808 * non-partial plan; so flag that there's nothing more for our
809 * fellow workers to do.
810 */
811 pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
812 }
813 }
814
815 /* If non-partial, immediately mark as finished. */
816 if (node->as_whichplan < node->as_first_partial_plan)
817 node->as_pstate->pa_finished[node->as_whichplan] = true;
818
819 LWLockRelease(&pstate->pa_lock);
820
821 return true;
822 }
823
824 /*
825 * mark_invalid_subplans_as_finished
826 * Marks the ParallelAppendState's pa_finished as true for each invalid
827 * subplan.
828 *
829 * This function should only be called for parallel Append with run-time
830 * pruning enabled.
831 */
832 static void
mark_invalid_subplans_as_finished(AppendState * node)833 mark_invalid_subplans_as_finished(AppendState *node)
834 {
835 int i;
836
837 /* Only valid to call this while in parallel Append mode */
838 Assert(node->as_pstate);
839
840 /* Shouldn't have been called when run-time pruning is not enabled */
841 Assert(node->as_prune_state);
842
843 /* Nothing to do if all plans are valid */
844 if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
845 return;
846
847 /* Mark all non-valid plans as finished */
848 for (i = 0; i < node->as_nplans; i++)
849 {
850 if (!bms_is_member(i, node->as_valid_subplans))
851 node->as_pstate->pa_finished[i] = true;
852 }
853 }
854
855 /* ----------------------------------------------------------------
856 * Asynchronous Append Support
857 * ----------------------------------------------------------------
858 */
859
860 /* ----------------------------------------------------------------
861 * ExecAppendAsyncBegin
862 *
863 * Begin executing designed async-capable subplans.
864 * ----------------------------------------------------------------
865 */
866 static void
ExecAppendAsyncBegin(AppendState * node)867 ExecAppendAsyncBegin(AppendState *node)
868 {
869 int i;
870
871 /* Backward scan is not supported by async-aware Appends. */
872 Assert(ScanDirectionIsForward(node->ps.state->es_direction));
873
874 /* We should never be called when there are no subplans */
875 Assert(node->as_nplans > 0);
876
877 /* We should never be called when there are no async subplans. */
878 Assert(node->as_nasyncplans > 0);
879
880 /* If we've yet to determine the valid subplans then do so now. */
881 if (node->as_valid_subplans == NULL)
882 {
883 node->as_valid_subplans =
884 ExecFindMatchingSubPlans(node->as_prune_state);
885
886 classify_matching_subplans(node);
887 }
888
889 /* Initialize state variables. */
890 node->as_syncdone = bms_is_empty(node->as_valid_subplans);
891 node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
892
893 /* Nothing to do if there are no valid async subplans. */
894 if (node->as_nasyncremain == 0)
895 return;
896
897 /* Make a request for each of the valid async subplans. */
898 i = -1;
899 while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
900 {
901 AsyncRequest *areq = node->as_asyncrequests[i];
902
903 Assert(areq->request_index == i);
904 Assert(!areq->callback_pending);
905
906 /* Do the actual work. */
907 ExecAsyncRequest(areq);
908 }
909 }
910
911 /* ----------------------------------------------------------------
912 * ExecAppendAsyncGetNext
913 *
914 * Get the next tuple from any of the asynchronous subplans.
915 * ----------------------------------------------------------------
916 */
917 static bool
ExecAppendAsyncGetNext(AppendState * node,TupleTableSlot ** result)918 ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
919 {
920 *result = NULL;
921
922 /* We should never be called when there are no valid async subplans. */
923 Assert(node->as_nasyncremain > 0);
924
925 /* Request a tuple asynchronously. */
926 if (ExecAppendAsyncRequest(node, result))
927 return true;
928
929 while (node->as_nasyncremain > 0)
930 {
931 CHECK_FOR_INTERRUPTS();
932
933 /* Wait or poll for async events. */
934 ExecAppendAsyncEventWait(node);
935
936 /* Request a tuple asynchronously. */
937 if (ExecAppendAsyncRequest(node, result))
938 return true;
939
940 /* Break from loop if there's any sync subplan that isn't complete. */
941 if (!node->as_syncdone)
942 break;
943 }
944
945 /*
946 * If all sync subplans are complete, we're totally done scanning the
947 * given node. Otherwise, we're done with the asynchronous stuff but must
948 * continue scanning the sync subplans.
949 */
950 if (node->as_syncdone)
951 {
952 Assert(node->as_nasyncremain == 0);
953 *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
954 return true;
955 }
956
957 return false;
958 }
959
960 /* ----------------------------------------------------------------
961 * ExecAppendAsyncRequest
962 *
963 * Request a tuple asynchronously.
964 * ----------------------------------------------------------------
965 */
966 static bool
ExecAppendAsyncRequest(AppendState * node,TupleTableSlot ** result)967 ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
968 {
969 Bitmapset *needrequest;
970 int i;
971
972 /* Nothing to do if there are no async subplans needing a new request. */
973 if (bms_is_empty(node->as_needrequest))
974 {
975 Assert(node->as_nasyncresults == 0);
976 return false;
977 }
978
979 /*
980 * If there are any asynchronously-generated results that have not yet
981 * been returned, we have nothing to do; just return one of them.
982 */
983 if (node->as_nasyncresults > 0)
984 {
985 --node->as_nasyncresults;
986 *result = node->as_asyncresults[node->as_nasyncresults];
987 return true;
988 }
989
990 /* Make a new request for each of the async subplans that need it. */
991 needrequest = node->as_needrequest;
992 node->as_needrequest = NULL;
993 i = -1;
994 while ((i = bms_next_member(needrequest, i)) >= 0)
995 {
996 AsyncRequest *areq = node->as_asyncrequests[i];
997
998 /* Do the actual work. */
999 ExecAsyncRequest(areq);
1000 }
1001 bms_free(needrequest);
1002
1003 /* Return one of the asynchronously-generated results if any. */
1004 if (node->as_nasyncresults > 0)
1005 {
1006 --node->as_nasyncresults;
1007 *result = node->as_asyncresults[node->as_nasyncresults];
1008 return true;
1009 }
1010
1011 return false;
1012 }
1013
1014 /* ----------------------------------------------------------------
1015 * ExecAppendAsyncEventWait
1016 *
1017 * Wait or poll for file descriptor events and fire callbacks.
1018 * ----------------------------------------------------------------
1019 */
1020 static void
ExecAppendAsyncEventWait(AppendState * node)1021 ExecAppendAsyncEventWait(AppendState *node)
1022 {
1023 int nevents = node->as_nasyncplans + 1;
1024 long timeout = node->as_syncdone ? -1 : 0;
1025 WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1026 int noccurred;
1027 int i;
1028
1029 /* We should never be called when there are no valid async subplans. */
1030 Assert(node->as_nasyncremain > 0);
1031
1032 node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
1033 AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1034 NULL, NULL);
1035
1036 /* Give each waiting subplan a chance to add an event. */
1037 i = -1;
1038 while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1039 {
1040 AsyncRequest *areq = node->as_asyncrequests[i];
1041
1042 if (areq->callback_pending)
1043 ExecAsyncConfigureWait(areq);
1044 }
1045
1046 /*
1047 * No need for further processing if there are no configured events other
1048 * than the postmaster death event.
1049 */
1050 if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1051 {
1052 FreeWaitEventSet(node->as_eventset);
1053 node->as_eventset = NULL;
1054 return;
1055 }
1056
1057 /* We wait on at most EVENT_BUFFER_SIZE events. */
1058 if (nevents > EVENT_BUFFER_SIZE)
1059 nevents = EVENT_BUFFER_SIZE;
1060
1061 /*
1062 * If the timeout is -1, wait until at least one event occurs. If the
1063 * timeout is 0, poll for events, but do not wait at all.
1064 */
1065 noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1066 nevents, WAIT_EVENT_APPEND_READY);
1067 FreeWaitEventSet(node->as_eventset);
1068 node->as_eventset = NULL;
1069 if (noccurred == 0)
1070 return;
1071
1072 /* Deliver notifications. */
1073 for (i = 0; i < noccurred; i++)
1074 {
1075 WaitEvent *w = &occurred_event[i];
1076
1077 /*
1078 * Each waiting subplan should have registered its wait event with
1079 * user_data pointing back to its AsyncRequest.
1080 */
1081 if ((w->events & WL_SOCKET_READABLE) != 0)
1082 {
1083 AsyncRequest *areq = (AsyncRequest *) w->user_data;
1084
1085 if (areq->callback_pending)
1086 {
1087 /*
1088 * Mark it as no longer needing a callback. We must do this
1089 * before dispatching the callback in case the callback resets
1090 * the flag.
1091 */
1092 areq->callback_pending = false;
1093
1094 /* Do the actual work. */
1095 ExecAsyncNotify(areq);
1096 }
1097 }
1098 }
1099 }
1100
1101 /* ----------------------------------------------------------------
1102 * ExecAsyncAppendResponse
1103 *
1104 * Receive a response from an asynchronous request we made.
1105 * ----------------------------------------------------------------
1106 */
1107 void
ExecAsyncAppendResponse(AsyncRequest * areq)1108 ExecAsyncAppendResponse(AsyncRequest *areq)
1109 {
1110 AppendState *node = (AppendState *) areq->requestor;
1111 TupleTableSlot *slot = areq->result;
1112
1113 /* The result should be a TupleTableSlot or NULL. */
1114 Assert(slot == NULL || IsA(slot, TupleTableSlot));
1115
1116 /* Nothing to do if the request is pending. */
1117 if (!areq->request_complete)
1118 {
1119 /* The request would have been pending for a callback. */
1120 Assert(areq->callback_pending);
1121 return;
1122 }
1123
1124 /* If the result is NULL or an empty slot, there's nothing more to do. */
1125 if (TupIsNull(slot))
1126 {
1127 /* The ending subplan wouldn't have been pending for a callback. */
1128 Assert(!areq->callback_pending);
1129 --node->as_nasyncremain;
1130 return;
1131 }
1132
1133 /* Save result so we can return it. */
1134 Assert(node->as_nasyncresults < node->as_nasyncplans);
1135 node->as_asyncresults[node->as_nasyncresults++] = slot;
1136
1137 /*
1138 * Mark the subplan that returned a result as ready for a new request. We
1139 * don't launch another one here immediately because it might complete.
1140 */
1141 node->as_needrequest = bms_add_member(node->as_needrequest,
1142 areq->request_index);
1143 }
1144
1145 /* ----------------------------------------------------------------
1146 * classify_matching_subplans
1147 *
1148 * Classify the node's as_valid_subplans into sync ones and
1149 * async ones, adjust it to contain sync ones only, and save
1150 * async ones in the node's as_valid_asyncplans.
1151 * ----------------------------------------------------------------
1152 */
1153 static void
classify_matching_subplans(AppendState * node)1154 classify_matching_subplans(AppendState *node)
1155 {
1156 Bitmapset *valid_asyncplans;
1157
1158 Assert(node->as_valid_asyncplans == NULL);
1159
1160 /* Nothing to do if there are no valid subplans. */
1161 if (bms_is_empty(node->as_valid_subplans))
1162 {
1163 node->as_syncdone = true;
1164 node->as_nasyncremain = 0;
1165 return;
1166 }
1167
1168 /* Nothing to do if there are no valid async subplans. */
1169 if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1170 {
1171 node->as_nasyncremain = 0;
1172 return;
1173 }
1174
1175 /* Get valid async subplans. */
1176 valid_asyncplans = bms_copy(node->as_asyncplans);
1177 valid_asyncplans = bms_int_members(valid_asyncplans,
1178 node->as_valid_subplans);
1179
1180 /* Adjust the valid subplans to contain sync subplans only. */
1181 node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1182 valid_asyncplans);
1183
1184 /* Save valid async subplans. */
1185 node->as_valid_asyncplans = valid_asyncplans;
1186 }
1187