1 /*-------------------------------------------------------------------------
2 *
3 * nodeIncrementalSort.c
4 * Routines to handle incremental sorting of relations.
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/nodeIncrementalSort.c
11 *
12 * DESCRIPTION
13 *
14 * Incremental sort is an optimized variant of multikey sort for cases
15 * when the input is already sorted by a prefix of the sort keys. For
16 * example when a sort by (key1, key2 ... keyN) is requested, and the
17 * input is already sorted by (key1, key2 ... keyM), M < N, we can
18 * divide the input into groups where keys (key1, ... keyM) are equal,
19 * and only sort on the remaining columns.
20 *
21 * Consider the following example. We have input tuples consisting of
22 * two integers (X, Y) already presorted by X, while it's required to
23 * sort them by both X and Y. Let input tuples be following.
24 *
25 * (1, 5)
26 * (1, 2)
27 * (2, 9)
28 * (2, 1)
29 * (2, 5)
30 * (3, 3)
31 * (3, 7)
32 *
33 * An incremental sort algorithm would split the input into the following
34 * groups, which have equal X, and then sort them by Y individually:
35 *
36 * (1, 5) (1, 2)
37 * (2, 9) (2, 1) (2, 5)
38 * (3, 3) (3, 7)
39 *
40 * After sorting these groups and putting them altogether, we would get
41 * the following result which is sorted by X and Y, as requested:
42 *
43 * (1, 2)
44 * (1, 5)
45 * (2, 1)
46 * (2, 5)
47 * (2, 9)
48 * (3, 3)
49 * (3, 7)
50 *
51 * Incremental sort may be more efficient than plain sort, particularly
52 * on large datasets, as it reduces the amount of data to sort at once,
53 * making it more likely it fits into work_mem (eliminating the need to
54 * spill to disk). But the main advantage of incremental sort is that
55 * it can start producing rows early, before sorting the whole dataset,
56 * which is a significant benefit especially for queries with LIMIT.
57 *
58 * The algorithm we've implemented here is modified from the theoretical
59 * base described above by operating in two different modes:
60 * - Fetching a minimum number of tuples without checking prefix key
61 * group membership and sorting on all columns when safe.
62 * - Fetching all tuples for a single prefix key group and sorting on
63 * solely the unsorted columns.
64 * We always begin in the first mode, and employ a heuristic to switch
65 * into the second mode if we believe it's beneficial.
66 *
67 * Sorting incrementally can potentially use less memory, avoid fetching
68 * and sorting all tuples in the dataset, and begin returning tuples before
69 * the entire result set is available.
70 *
71 * The hybrid mode approach allows us to optimize for both very small
72 * groups (where the overhead of a new tuplesort is high) and very large
73 * groups (where we can lower cost by not having to sort on already sorted
74 * columns), albeit at some extra cost while switching between modes.
75 *
76 *-------------------------------------------------------------------------
77 */
78
79 #include "postgres.h"
80
81 #include "access/htup_details.h"
82 #include "executor/execdebug.h"
83 #include "executor/nodeIncrementalSort.h"
84 #include "miscadmin.h"
85 #include "utils/lsyscache.h"
86 #include "utils/tuplesort.h"
87
88 /*
89 * We need to store the instrumentation information in either local node's sort
90 * info or, for a parallel worker process, in the shared info (this avoids
91 * having to additionally memcpy the info from local memory to shared memory
92 * at each instrumentation call). This macro expands to choose the proper sort
93 * state and group info.
94 *
95 * Arguments:
96 * - node: type IncrementalSortState *
97 * - groupName: the token fullsort or prefixsort
98 */
99 #define INSTRUMENT_SORT_GROUP(node, groupName) \
100 do { \
101 if ((node)->ss.ps.instrument != NULL) \
102 { \
103 if ((node)->shared_info && (node)->am_worker) \
104 { \
105 Assert(IsParallelWorker()); \
106 Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
107 instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
108 (node)->groupName##_state); \
109 } \
110 else \
111 { \
112 instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
113 (node)->groupName##_state); \
114 } \
115 } \
116 } while (0)
117
118
119 /* ----------------------------------------------------------------
120 * instrumentSortedGroup
121 *
122 * Because incremental sort processes (potentially many) sort batches, we need
123 * to capture tuplesort stats each time we finalize a sort state. This summary
124 * data is later used for EXPLAIN ANALYZE output.
125 * ----------------------------------------------------------------
126 */
127 static void
instrumentSortedGroup(IncrementalSortGroupInfo * groupInfo,Tuplesortstate * sortState)128 instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo,
129 Tuplesortstate *sortState)
130 {
131 TuplesortInstrumentation sort_instr;
132
133 groupInfo->groupCount++;
134
135 tuplesort_get_stats(sortState, &sort_instr);
136
137 /* Calculate total and maximum memory and disk space used. */
138 switch (sort_instr.spaceType)
139 {
140 case SORT_SPACE_TYPE_DISK:
141 groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
142 if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
143 groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
144
145 break;
146 case SORT_SPACE_TYPE_MEMORY:
147 groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
148 if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
149 groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
150
151 break;
152 }
153
154 /* Track each sort method we've used. */
155 groupInfo->sortMethods |= sort_instr.sortMethod;
156 }
157
158 /* ----------------------------------------------------------------
159 * preparePresortedCols
160 *
161 * Prepare information for presorted_keys comparisons.
162 * ----------------------------------------------------------------
163 */
164 static void
preparePresortedCols(IncrementalSortState * node)165 preparePresortedCols(IncrementalSortState *node)
166 {
167 IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
168
169 node->presorted_keys =
170 (PresortedKeyData *) palloc(plannode->nPresortedCols *
171 sizeof(PresortedKeyData));
172
173 /* Pre-cache comparison functions for each pre-sorted key. */
174 for (int i = 0; i < plannode->nPresortedCols; i++)
175 {
176 Oid equalityOp,
177 equalityFunc;
178 PresortedKeyData *key;
179
180 key = &node->presorted_keys[i];
181 key->attno = plannode->sort.sortColIdx[i];
182
183 equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
184 NULL);
185 if (!OidIsValid(equalityOp))
186 elog(ERROR, "missing equality operator for ordering operator %u",
187 plannode->sort.sortOperators[i]);
188
189 equalityFunc = get_opcode(equalityOp);
190 if (!OidIsValid(equalityFunc))
191 elog(ERROR, "missing function for operator %u", equalityOp);
192
193 /* Lookup the comparison function */
194 fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
195
196 /* We can initialize the callinfo just once and re-use it */
197 key->fcinfo = palloc0(SizeForFunctionCallInfo(2));
198 InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
199 plannode->sort.collations[i], NULL, NULL);
200 key->fcinfo->args[0].isnull = false;
201 key->fcinfo->args[1].isnull = false;
202 }
203 }
204
205 /* ----------------------------------------------------------------
206 * isCurrentGroup
207 *
208 * Check whether a given tuple belongs to the current sort group by comparing
209 * the presorted column values to the pivot tuple of the current group.
210 * ----------------------------------------------------------------
211 */
212 static bool
isCurrentGroup(IncrementalSortState * node,TupleTableSlot * pivot,TupleTableSlot * tuple)213 isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
214 {
215 int nPresortedCols;
216
217 nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
218
219 /*
220 * That the input is sorted by keys * (0, ... n) implies that the tail
221 * keys are more likely to change. Therefore we do our comparison starting
222 * from the last pre-sorted column to optimize for early detection of
223 * inequality and minimizing the number of function calls..
224 */
225 for (int i = nPresortedCols - 1; i >= 0; i--)
226 {
227 Datum datumA,
228 datumB,
229 result;
230 bool isnullA,
231 isnullB;
232 AttrNumber attno = node->presorted_keys[i].attno;
233 PresortedKeyData *key;
234
235 datumA = slot_getattr(pivot, attno, &isnullA);
236 datumB = slot_getattr(tuple, attno, &isnullB);
237
238 /* Special case for NULL-vs-NULL, else use standard comparison */
239 if (isnullA || isnullB)
240 {
241 if (isnullA == isnullB)
242 continue;
243 else
244 return false;
245 }
246
247 key = &node->presorted_keys[i];
248
249 key->fcinfo->args[0].value = datumA;
250 key->fcinfo->args[1].value = datumB;
251
252 /* just for paranoia's sake, we reset isnull each time */
253 key->fcinfo->isnull = false;
254
255 result = FunctionCallInvoke(key->fcinfo);
256
257 /* Check for null result, since caller is clearly not expecting one */
258 if (key->fcinfo->isnull)
259 elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
260
261 if (!DatumGetBool(result))
262 return false;
263 }
264 return true;
265 }
266
267 /* ----------------------------------------------------------------
268 * switchToPresortedPrefixMode
269 *
270 * When we determine that we've likely encountered a large batch of tuples all
271 * having the same presorted prefix values, we want to optimize tuplesort by
272 * only sorting on unsorted suffix keys.
273 *
274 * The problem is that we've already accumulated several tuples in another
275 * tuplesort configured to sort by all columns (assuming that there may be
276 * more than one prefix key group). So to switch to presorted prefix mode we
277 * have to go back and look at all the tuples we've already accumulated to
278 * verify they're all part of the same prefix key group before sorting them
279 * solely by unsorted suffix keys.
280 *
281 * While it's likely that all tuples already fetched are all part of a single
282 * prefix group, we also have to handle the possibility that there is at least
283 * one different prefix key group before the large prefix key group.
284 * ----------------------------------------------------------------
285 */
286 static void
switchToPresortedPrefixMode(PlanState * pstate)287 switchToPresortedPrefixMode(PlanState *pstate)
288 {
289 IncrementalSortState *node = castNode(IncrementalSortState, pstate);
290 ScanDirection dir;
291 int64 nTuples;
292 TupleDesc tupDesc;
293 PlanState *outerNode;
294 IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
295
296 dir = node->ss.ps.state->es_direction;
297 outerNode = outerPlanState(node);
298 tupDesc = ExecGetResultType(outerNode);
299
300 /* Configure the prefix sort state the first time around. */
301 if (node->prefixsort_state == NULL)
302 {
303 Tuplesortstate *prefixsort_state;
304 int nPresortedCols = plannode->nPresortedCols;
305
306 /*
307 * Optimize the sort by assuming the prefix columns are all equal and
308 * thus we only need to sort by any remaining columns.
309 */
310 prefixsort_state = tuplesort_begin_heap(tupDesc,
311 plannode->sort.numCols - nPresortedCols,
312 &(plannode->sort.sortColIdx[nPresortedCols]),
313 &(plannode->sort.sortOperators[nPresortedCols]),
314 &(plannode->sort.collations[nPresortedCols]),
315 &(plannode->sort.nullsFirst[nPresortedCols]),
316 work_mem,
317 NULL,
318 false);
319 node->prefixsort_state = prefixsort_state;
320 }
321 else
322 {
323 /* Next group of presorted data */
324 tuplesort_reset(node->prefixsort_state);
325 }
326
327 /*
328 * If the current node has a bound, then it's reasonably likely that a
329 * large prefix key group will benefit from bounded sort, so configure the
330 * tuplesort to allow for that optimization.
331 */
332 if (node->bounded)
333 {
334 SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n",
335 node->bound - node->bound_Done);
336 tuplesort_set_bound(node->prefixsort_state,
337 node->bound - node->bound_Done);
338 }
339
340 /*
341 * Copy as many tuples as we can (i.e., in the same prefix key group) from
342 * the full sort state to the prefix sort state.
343 */
344 for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++)
345 {
346 /*
347 * When we encounter multiple prefix key groups inside the full sort
348 * tuplesort we have to carry over the last read tuple into the next
349 * batch.
350 */
351 if (nTuples == 0 && !TupIsNull(node->transfer_tuple))
352 {
353 tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
354 /* The carried over tuple is our new group pivot tuple. */
355 ExecCopySlot(node->group_pivot, node->transfer_tuple);
356 }
357 else
358 {
359 tuplesort_gettupleslot(node->fullsort_state,
360 ScanDirectionIsForward(dir),
361 false, node->transfer_tuple, NULL);
362
363 /*
364 * If this is our first time through the loop, then we need to
365 * save the first tuple we get as our new group pivot.
366 */
367 if (TupIsNull(node->group_pivot))
368 ExecCopySlot(node->group_pivot, node->transfer_tuple);
369
370 if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
371 {
372 tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple);
373 }
374 else
375 {
376 /*
377 * The tuple isn't part of the current batch so we need to
378 * carry it over into the next batch of tuples we transfer out
379 * of the full sort tuplesort into the presorted prefix
380 * tuplesort. We don't actually have to do anything special to
381 * save the tuple since we've already loaded it into the
382 * node->transfer_tuple slot, and, even though that slot
383 * points to memory inside the full sort tuplesort, we can't
384 * reset that tuplesort anyway until we've fully transferred
385 * out its tuples, so this reference is safe. We do need to
386 * reset the group pivot tuple though since we've finished the
387 * current prefix key group.
388 */
389 ExecClearTuple(node->group_pivot);
390
391 /* Break out of for-loop early */
392 break;
393 }
394 }
395 }
396
397 /*
398 * Track how many tuples remain in the full sort batch so that we know if
399 * we need to sort multiple prefix key groups before processing tuples
400 * remaining in the large single prefix key group we think we've
401 * encountered.
402 */
403 SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);
404 node->n_fullsort_remaining -= nTuples;
405 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining);
406
407 if (node->n_fullsort_remaining == 0)
408 {
409 /*
410 * We've found that all tuples remaining in the full sort batch are in
411 * the same prefix key group and moved all of those tuples into the
412 * presorted prefix tuplesort. We don't know that we've yet found the
413 * last tuple in the current prefix key group, so save our pivot
414 * comparison tuple and continue fetching tuples from the outer
415 * execution node to load into the presorted prefix tuplesort.
416 */
417 ExecCopySlot(node->group_pivot, node->transfer_tuple);
418 SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
419 node->execution_status = INCSORT_LOADPREFIXSORT;
420
421 /*
422 * Make sure we clear the transfer tuple slot so that next time we
423 * encounter a large prefix key group we don't incorrectly assume we
424 * have a tuple carried over from the previous group.
425 */
426 ExecClearTuple(node->transfer_tuple);
427 }
428 else
429 {
430 /*
431 * We finished a group but didn't consume all of the tuples from the
432 * full sort state, so we'll sort this batch, let the outer node read
433 * out all of those tuples, and then come back around to find another
434 * batch.
435 */
436 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
437 tuplesort_performsort(node->prefixsort_state);
438
439 INSTRUMENT_SORT_GROUP(node, prefixsort);
440
441 if (node->bounded)
442 {
443 /*
444 * If the current node has a bound and we've already sorted n
445 * tuples, then the functional bound remaining is (original bound
446 * - n), so store the current number of processed tuples for use
447 * in configuring sorting bound.
448 */
449 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
450 Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
451 node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
452 }
453
454 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
455 node->execution_status = INCSORT_READPREFIXSORT;
456 }
457 }
458
459 /*
460 * Sorting many small groups with tuplesort is inefficient. In order to
461 * cope with this problem we don't start a new group until the current one
462 * contains at least DEFAULT_MIN_GROUP_SIZE tuples (unfortunately this also
463 * means we can't assume small groups of tuples all have the same prefix keys.)
464 * When we have a bound that's less than DEFAULT_MIN_GROUP_SIZE we start looking
465 * for the new group as soon as we've met our bound to avoid fetching more
466 * tuples than we absolutely have to fetch.
467 */
468 #define DEFAULT_MIN_GROUP_SIZE 32
469
470 /*
471 * While we've optimized for small prefix key groups by not starting our prefix
472 * key comparisons until we've reached a minimum number of tuples, we don't want
473 * that optimization to cause us to lose out on the benefits of being able to
474 * assume a large group of tuples is fully presorted by its prefix keys.
475 * Therefore we use the DEFAULT_MAX_FULL_SORT_GROUP_SIZE cutoff as a heuristic
476 * for determining when we believe we've encountered a large group, and, if we
477 * get to that point without finding a new prefix key group we transition to
478 * presorted prefix key mode.
479 */
480 #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)
481
482 /* ----------------------------------------------------------------
483 * ExecIncrementalSort
484 *
485 * Assuming that outer subtree returns tuple presorted by some prefix
486 * of target sort columns, performs incremental sort.
487 *
488 * Conditions:
489 * -- none.
490 *
491 * Initial States:
492 * -- the outer child is prepared to return the first tuple.
493 * ----------------------------------------------------------------
494 */
495 static TupleTableSlot *
ExecIncrementalSort(PlanState * pstate)496 ExecIncrementalSort(PlanState *pstate)
497 {
498 IncrementalSortState *node = castNode(IncrementalSortState, pstate);
499 EState *estate;
500 ScanDirection dir;
501 Tuplesortstate *read_sortstate;
502 Tuplesortstate *fullsort_state;
503 TupleTableSlot *slot;
504 IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
505 PlanState *outerNode;
506 TupleDesc tupDesc;
507 int64 nTuples = 0;
508 int64 minGroupSize;
509
510 CHECK_FOR_INTERRUPTS();
511
512 estate = node->ss.ps.state;
513 dir = estate->es_direction;
514 fullsort_state = node->fullsort_state;
515
516 /*
517 * If a previous iteration has sorted a batch, then we need to check to
518 * see if there are any remaining tuples in that batch that we can return
519 * before moving on to other execution states.
520 */
521 if (node->execution_status == INCSORT_READFULLSORT
522 || node->execution_status == INCSORT_READPREFIXSORT)
523 {
524 /*
525 * Return next tuple from the current sorted group set if available.
526 */
527 read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
528 fullsort_state : node->prefixsort_state;
529 slot = node->ss.ps.ps_ResultTupleSlot;
530
531 /*
532 * We have to populate the slot from the tuplesort before checking
533 * outerNodeDone because it will set the slot to NULL if no more
534 * tuples remain. If the tuplesort is empty, but we don't have any
535 * more tuples available for sort from the outer node, then
536 * outerNodeDone will have been set so we'll return that now-empty
537 * slot to the caller.
538 */
539 if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
540 false, slot, NULL) || node->outerNodeDone)
541
542 /*
543 * Note: there isn't a good test case for the node->outerNodeDone
544 * check directly, but we need it for any plan where the outer
545 * node will fail when trying to fetch too many tuples.
546 */
547 return slot;
548 else if (node->n_fullsort_remaining > 0)
549 {
550 /*
551 * When we transition to presorted prefix mode, we might have
552 * accumulated at least one additional prefix key group in the
553 * full sort tuplesort. The first call to
554 * switchToPresortedPrefixMode() will have pulled the first one of
555 * those groups out, and we've returned those tuples to the parent
556 * node, but if at this point we still have tuples remaining in
557 * the full sort state (i.e., n_fullsort_remaining > 0), then we
558 * need to re-execute the prefix mode transition function to pull
559 * out the next prefix key group.
560 */
561 SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
562 node->n_fullsort_remaining);
563 switchToPresortedPrefixMode(pstate);
564 }
565 else
566 {
567 /*
568 * If we don't have any sorted tuples to read and we're not
569 * currently transitioning into presorted prefix sort mode, then
570 * it's time to start the process all over again by building a new
571 * group in the full sort state.
572 */
573 SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
574 node->execution_status = INCSORT_LOADFULLSORT;
575 }
576 }
577
578 /*
579 * Scan the subplan in the forward direction while creating the sorted
580 * data.
581 */
582 estate->es_direction = ForwardScanDirection;
583
584 outerNode = outerPlanState(node);
585 tupDesc = ExecGetResultType(outerNode);
586
587 /* Load tuples into the full sort state. */
588 if (node->execution_status == INCSORT_LOADFULLSORT)
589 {
590 /*
591 * Initialize sorting structures.
592 */
593 if (fullsort_state == NULL)
594 {
595 /*
596 * Initialize presorted column support structures for
597 * isCurrentGroup(). It's correct to do this along with the
598 * initial initialization for the full sort state (and not for the
599 * prefix sort state) since we always load the full sort state
600 * first.
601 */
602 preparePresortedCols(node);
603
604 /*
605 * Since we optimize small prefix key groups by accumulating a
606 * minimum number of tuples before sorting, we can't assume that a
607 * group of tuples all have the same prefix key values. Hence we
608 * setup the full sort tuplesort to sort by all requested sort
609 * keys.
610 */
611 fullsort_state = tuplesort_begin_heap(tupDesc,
612 plannode->sort.numCols,
613 plannode->sort.sortColIdx,
614 plannode->sort.sortOperators,
615 plannode->sort.collations,
616 plannode->sort.nullsFirst,
617 work_mem,
618 NULL,
619 false);
620 node->fullsort_state = fullsort_state;
621 }
622 else
623 {
624 /* Reset sort for the next batch. */
625 tuplesort_reset(fullsort_state);
626 }
627
628 /*
629 * Calculate the remaining tuples left if bounded and configure both
630 * bounded sort and the minimum group size accordingly.
631 */
632 if (node->bounded)
633 {
634 int64 currentBound = node->bound - node->bound_Done;
635
636 /*
637 * Bounded sort isn't likely to be a useful optimization for full
638 * sort mode since we limit full sort mode to a relatively small
639 * number of tuples and tuplesort doesn't switch over to top-n
640 * heap sort anyway unless it hits (2 * bound) tuples.
641 */
642 if (currentBound < DEFAULT_MIN_GROUP_SIZE)
643 tuplesort_set_bound(fullsort_state, currentBound);
644
645 minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
646 }
647 else
648 minGroupSize = DEFAULT_MIN_GROUP_SIZE;
649
650 /*
651 * Because we have to read the next tuple to find out that we've
652 * encountered a new prefix key group, on subsequent groups we have to
653 * carry over that extra tuple and add it to the new group's sort here
654 * before we read any new tuples from the outer node.
655 */
656 if (!TupIsNull(node->group_pivot))
657 {
658 tuplesort_puttupleslot(fullsort_state, node->group_pivot);
659 nTuples++;
660
661 /*
662 * We're in full sort mode accumulating a minimum number of tuples
663 * and not checking for prefix key equality yet, so we can't
664 * assume the group pivot tuple will reamin the same -- unless
665 * we're using a minimum group size of 1, in which case the pivot
666 * is obviously still the pviot.
667 */
668 if (nTuples != minGroupSize)
669 ExecClearTuple(node->group_pivot);
670 }
671
672
673 /*
674 * Pull as many tuples from the outer node as possible given our
675 * current operating mode.
676 */
677 for (;;)
678 {
679 slot = ExecProcNode(outerNode);
680
681 /*
682 * If the outer node can't provide us any more tuples, then we can
683 * sort the current group and return those tuples.
684 */
685 if (TupIsNull(slot))
686 {
687 /*
688 * We need to know later if the outer node has completed to be
689 * able to distinguish between being done with a batch and
690 * being done with the whole node.
691 */
692 node->outerNodeDone = true;
693
694 SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples);
695 tuplesort_performsort(fullsort_state);
696
697 INSTRUMENT_SORT_GROUP(node, fullsort);
698
699 SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
700 node->execution_status = INCSORT_READFULLSORT;
701 break;
702 }
703
704 /* Accumulate the next group of presorted tuples. */
705 if (nTuples < minGroupSize)
706 {
707 /*
708 * If we haven't yet hit our target minimum group size, then
709 * we don't need to bother checking for inclusion in the
710 * current prefix group since at this point we'll assume that
711 * we'll full sort this batch to avoid a large number of very
712 * tiny (and thus inefficient) sorts.
713 */
714 tuplesort_puttupleslot(fullsort_state, slot);
715 nTuples++;
716
717 /*
718 * If we've reached our minimum group size, then we need to
719 * store the most recent tuple as a pivot.
720 */
721 if (nTuples == minGroupSize)
722 ExecCopySlot(node->group_pivot, slot);
723 }
724 else
725 {
726 /*
727 * If we've already accumulated enough tuples to reach our
728 * minimum group size, then we need to compare any additional
729 * tuples to our pivot tuple to see if we reach the end of
730 * that prefix key group. Only after we find changed prefix
731 * keys can we guarantee sort stability of the tuples we've
732 * already accumulated.
733 */
734 if (isCurrentGroup(node, node->group_pivot, slot))
735 {
736 /*
737 * As long as the prefix keys match the pivot tuple then
738 * load the tuple into the tuplesort.
739 */
740 tuplesort_puttupleslot(fullsort_state, slot);
741 nTuples++;
742 }
743 else
744 {
745 /*
746 * Since the tuple we fetched isn't part of the current
747 * prefix key group we don't want to sort it as part of
748 * the current batch. Instead we use the group_pivot slot
749 * to carry it over to the next batch (even though we
750 * won't actually treat it as a group pivot).
751 */
752 ExecCopySlot(node->group_pivot, slot);
753
754 if (node->bounded)
755 {
756 /*
757 * If the current node has a bound, and we've already
758 * sorted n tuples, then the functional bound
759 * remaining is (original bound - n), so store the
760 * current number of processed tuples for later use
761 * configuring the sort state's bound.
762 */
763 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
764 node->bound_Done,
765 Min(node->bound, node->bound_Done + nTuples));
766 node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
767 }
768
769 /*
770 * Once we find changed prefix keys we can complete the
771 * sort and transition modes to reading out the sorted
772 * tuples.
773 */
774 SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n",
775 nTuples);
776 tuplesort_performsort(fullsort_state);
777
778 INSTRUMENT_SORT_GROUP(node, fullsort);
779
780 SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
781 node->execution_status = INCSORT_READFULLSORT;
782 break;
783 }
784 }
785
786 /*
787 * Unless we've already transitioned modes to reading from the
788 * full sort state, then we assume that having read at least
789 * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
790 * processing a large group of tuples all having equal prefix keys
791 * (but haven't yet found the final tuple in that prefix key
792 * group), so we need to transition into presorted prefix mode.
793 */
794 if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
795 node->execution_status != INCSORT_READFULLSORT)
796 {
797 /*
798 * The group pivot we have stored has already been put into
799 * the tuplesort; we don't want to carry it over. Since we
800 * haven't yet found the end of the prefix key group, it might
801 * seem like we should keep this, but we don't actually know
802 * how many prefix key groups might be represented in the full
803 * sort state, so we'll let the mode transition function
804 * manage this state for us.
805 */
806 ExecClearTuple(node->group_pivot);
807
808 /*
809 * Unfortunately the tuplesort API doesn't include a way to
810 * retrieve tuples unless a sort has been performed, so we
811 * perform the sort even though we could just as easily rely
812 * on FIFO retrieval semantics when transferring them to the
813 * presorted prefix tuplesort.
814 */
815 SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples);
816 tuplesort_performsort(fullsort_state);
817
818 INSTRUMENT_SORT_GROUP(node, fullsort);
819
820 /*
821 * If the full sort tuplesort happened to switch into top-n
822 * heapsort mode then we will only be able to retrieve
823 * currentBound tuples (since the tuplesort will have only
824 * retained the top-n tuples). This is safe even though we
825 * haven't yet completed fetching the current prefix key group
826 * because the tuples we've "lost" already sorted "below" the
827 * retained ones, and we're already contractually guaranteed
828 * to not need any more than the currentBound tuples.
829 */
830 if (tuplesort_used_bound(node->fullsort_state))
831 {
832 int64 currentBound = node->bound - node->bound_Done;
833
834 SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n",
835 nTuples, Min(currentBound, nTuples));
836 nTuples = Min(currentBound, nTuples);
837 }
838
839 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
840 nTuples);
841
842 /*
843 * We might have multiple prefix key groups in the full sort
844 * state, so the mode transition function needs to know that
845 * it needs to move from the fullsort to presorted prefix
846 * sort.
847 */
848 node->n_fullsort_remaining = nTuples;
849
850 /* Transition the tuples to the presorted prefix tuplesort. */
851 switchToPresortedPrefixMode(pstate);
852
853 /*
854 * Since we know we had tuples to move to the presorted prefix
855 * tuplesort, we know that unless that transition has verified
856 * that all tuples belonged to the same prefix key group (in
857 * which case we can go straight to continuing to load tuples
858 * into that tuplesort), we should have a tuple to return
859 * here.
860 *
861 * Either way, the appropriate execution status should have
862 * been set by switchToPresortedPrefixMode(), so we can drop
863 * out of the loop here and let the appropriate path kick in.
864 */
865 break;
866 }
867 }
868 }
869
870 if (node->execution_status == INCSORT_LOADPREFIXSORT)
871 {
872 /*
873 * We only enter this state after the mode transition function has
874 * confirmed all remaining tuples from the full sort state have the
875 * same prefix and moved those tuples to the prefix sort state. That
876 * function has also set a group pivot tuple (which doesn't need to be
877 * carried over; it's already been put into the prefix sort state).
878 */
879 Assert(!TupIsNull(node->group_pivot));
880
881 /*
882 * Read tuples from the outer node and load them into the prefix sort
883 * state until we encounter a tuple whose prefix keys don't match the
884 * current group_pivot tuple, since we can't guarantee sort stability
885 * until we have all tuples matching those prefix keys.
886 */
887 for (;;)
888 {
889 slot = ExecProcNode(outerNode);
890
891 /*
892 * If we've exhausted tuples from the outer node we're done
893 * loading the prefix sort state.
894 */
895 if (TupIsNull(slot))
896 {
897 /*
898 * We need to know later if the outer node has completed to be
899 * able to distinguish between being done with a batch and
900 * being done with the whole node.
901 */
902 node->outerNodeDone = true;
903 break;
904 }
905
906 /*
907 * If the tuple's prefix keys match our pivot tuple, we're not
908 * done yet and can load it into the prefix sort state. If not, we
909 * don't want to sort it as part of the current batch. Instead we
910 * use the group_pivot slot to carry it over to the next batch
911 * (even though we won't actually treat it as a group pivot).
912 */
913 if (isCurrentGroup(node, node->group_pivot, slot))
914 {
915 tuplesort_puttupleslot(node->prefixsort_state, slot);
916 nTuples++;
917 }
918 else
919 {
920 ExecCopySlot(node->group_pivot, slot);
921 break;
922 }
923 }
924
925 /*
926 * Perform the sort and begin returning the tuples to the parent plan
927 * node.
928 */
929 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
930 tuplesort_performsort(node->prefixsort_state);
931
932 INSTRUMENT_SORT_GROUP(node, prefixsort);
933
934 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
935 node->execution_status = INCSORT_READPREFIXSORT;
936
937 if (node->bounded)
938 {
939 /*
940 * If the current node has a bound, and we've already sorted n
941 * tuples, then the functional bound remaining is (original bound
942 * - n), so store the current number of processed tuples for use
943 * in configuring sorting bound.
944 */
945 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
946 node->bound_Done,
947 Min(node->bound, node->bound_Done + nTuples));
948 node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
949 }
950 }
951
952 /* Restore to user specified direction. */
953 estate->es_direction = dir;
954
955 /*
956 * Get the first or next tuple from tuplesort. Returns NULL if no more
957 * tuples.
958 */
959 read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
960 fullsort_state : node->prefixsort_state;
961 slot = node->ss.ps.ps_ResultTupleSlot;
962 (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
963 false, slot, NULL);
964 return slot;
965 }
966
967 /* ----------------------------------------------------------------
968 * ExecInitIncrementalSort
969 *
970 * Creates the run-time state information for the sort node
971 * produced by the planner and initializes its outer subtree.
972 * ----------------------------------------------------------------
973 */
974 IncrementalSortState *
ExecInitIncrementalSort(IncrementalSort * node,EState * estate,int eflags)975 ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
976 {
977 IncrementalSortState *incrsortstate;
978
979 SO_printf("ExecInitIncrementalSort: initializing sort node\n");
980
981 /*
982 * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
983 * EXEC_FLAG_MARK, because the current sort state contains only one sort
984 * batch rather than the full result set.
985 */
986 Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
987
988 /* Initialize state structure. */
989 incrsortstate = makeNode(IncrementalSortState);
990 incrsortstate->ss.ps.plan = (Plan *) node;
991 incrsortstate->ss.ps.state = estate;
992 incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
993
994 incrsortstate->execution_status = INCSORT_LOADFULLSORT;
995 incrsortstate->bounded = false;
996 incrsortstate->outerNodeDone = false;
997 incrsortstate->bound_Done = 0;
998 incrsortstate->fullsort_state = NULL;
999 incrsortstate->prefixsort_state = NULL;
1000 incrsortstate->group_pivot = NULL;
1001 incrsortstate->transfer_tuple = NULL;
1002 incrsortstate->n_fullsort_remaining = 0;
1003 incrsortstate->presorted_keys = NULL;
1004
1005 if (incrsortstate->ss.ps.instrument != NULL)
1006 {
1007 IncrementalSortGroupInfo *fullsortGroupInfo =
1008 &incrsortstate->incsort_info.fullsortGroupInfo;
1009 IncrementalSortGroupInfo *prefixsortGroupInfo =
1010 &incrsortstate->incsort_info.prefixsortGroupInfo;
1011
1012 fullsortGroupInfo->groupCount = 0;
1013 fullsortGroupInfo->maxDiskSpaceUsed = 0;
1014 fullsortGroupInfo->totalDiskSpaceUsed = 0;
1015 fullsortGroupInfo->maxMemorySpaceUsed = 0;
1016 fullsortGroupInfo->totalMemorySpaceUsed = 0;
1017 fullsortGroupInfo->sortMethods = 0;
1018 prefixsortGroupInfo->groupCount = 0;
1019 prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1020 prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1021 prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1022 prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1023 prefixsortGroupInfo->sortMethods = 0;
1024 }
1025
1026 /*
1027 * Miscellaneous initialization
1028 *
1029 * Sort nodes don't initialize their ExprContexts because they never call
1030 * ExecQual or ExecProject.
1031 */
1032
1033 /*
1034 * Initialize child nodes.
1035 *
1036 * Incremental sort does not support backwards scans and mark/restore, so
1037 * we don't bother removing the flags from eflags here. We allow passing a
1038 * REWIND flag, because although incremental sort can't use it, the child
1039 * nodes may be able to do something more useful.
1040 */
1041 outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1042
1043 /*
1044 * Initialize scan slot and type.
1045 */
1046 ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1047
1048 /*
1049 * Initialize return slot and type. No need to initialize projection info
1050 * because we don't do any projections.
1051 */
1052 ExecInitResultTupleSlotTL(&incrsortstate->ss.ps, &TTSOpsMinimalTuple);
1053 incrsortstate->ss.ps.ps_ProjInfo = NULL;
1054
1055 /*
1056 * Initialize standalone slots to store a tuple for pivot prefix keys and
1057 * for carrying over a tuple from one batch to the next.
1058 */
1059 incrsortstate->group_pivot =
1060 MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1061 &TTSOpsMinimalTuple);
1062 incrsortstate->transfer_tuple =
1063 MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)),
1064 &TTSOpsMinimalTuple);
1065
1066 SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1067
1068 return incrsortstate;
1069 }
1070
1071 /* ----------------------------------------------------------------
1072 * ExecEndIncrementalSort(node)
1073 * ----------------------------------------------------------------
1074 */
1075 void
ExecEndIncrementalSort(IncrementalSortState * node)1076 ExecEndIncrementalSort(IncrementalSortState *node)
1077 {
1078 SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1079
1080 /* clean out the scan tuple */
1081 ExecClearTuple(node->ss.ss_ScanTupleSlot);
1082 /* must drop pointer to sort result tuple */
1083 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1084 /* must drop standalone tuple slots from outer node */
1085 ExecDropSingleTupleTableSlot(node->group_pivot);
1086 ExecDropSingleTupleTableSlot(node->transfer_tuple);
1087
1088 /*
1089 * Release tuplesort resources.
1090 */
1091 if (node->fullsort_state != NULL)
1092 {
1093 tuplesort_end(node->fullsort_state);
1094 node->fullsort_state = NULL;
1095 }
1096 if (node->prefixsort_state != NULL)
1097 {
1098 tuplesort_end(node->prefixsort_state);
1099 node->prefixsort_state = NULL;
1100 }
1101
1102 /*
1103 * Shut down the subplan.
1104 */
1105 ExecEndNode(outerPlanState(node));
1106
1107 SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1108 }
1109
1110 void
ExecReScanIncrementalSort(IncrementalSortState * node)1111 ExecReScanIncrementalSort(IncrementalSortState *node)
1112 {
1113 PlanState *outerPlan = outerPlanState(node);
1114
1115 /*
1116 * Incremental sort doesn't support efficient rescan even when parameters
1117 * haven't changed (e.g., rewind) because unlike regular sort we don't
1118 * store all tuples at once for the full sort.
1119 *
1120 * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1121 * re-execute the sort along with the child node. Incremental sort itself
1122 * can't do anything smarter, but maybe the child nodes can.
1123 *
1124 * In theory if we've only filled the full sort with one batch (and
1125 * haven't reset it for a new batch yet) then we could efficiently rewind,
1126 * but that seems a narrow enough case that it's not worth handling
1127 * specially at this time.
1128 */
1129
1130 /* must drop pointer to sort result tuple */
1131 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
1132
1133 if (node->group_pivot != NULL)
1134 ExecClearTuple(node->group_pivot);
1135 if (node->transfer_tuple != NULL)
1136 ExecClearTuple(node->transfer_tuple);
1137
1138 node->outerNodeDone = false;
1139 node->n_fullsort_remaining = 0;
1140 node->bound_Done = 0;
1141 node->presorted_keys = NULL;
1142
1143 node->execution_status = INCSORT_LOADFULLSORT;
1144
1145 /*
1146 * If we've set up either of the sort states yet, we need to reset them.
1147 * We could end them and null out the pointers, but there's no reason to
1148 * repay the setup cost, and because ExecIncrementalSort guards presorted
1149 * column functions by checking to see if the full sort state has been
1150 * initialized yet, setting the sort states to null here might actually
1151 * cause a leak.
1152 */
1153 if (node->fullsort_state != NULL)
1154 {
1155 tuplesort_reset(node->fullsort_state);
1156 node->fullsort_state = NULL;
1157 }
1158 if (node->prefixsort_state != NULL)
1159 {
1160 tuplesort_reset(node->prefixsort_state);
1161 node->prefixsort_state = NULL;
1162 }
1163
1164 /*
1165 * If chgParam of subnode is not null, theni the plan will be re-scanned
1166 * by the first ExecProcNode.
1167 */
1168 if (outerPlan->chgParam == NULL)
1169 ExecReScan(outerPlan);
1170 }
1171
1172 /* ----------------------------------------------------------------
1173 * Parallel Query Support
1174 * ----------------------------------------------------------------
1175 */
1176
1177 /* ----------------------------------------------------------------
1178 * ExecSortEstimate
1179 *
1180 * Estimate space required to propagate sort statistics.
1181 * ----------------------------------------------------------------
1182 */
1183 void
ExecIncrementalSortEstimate(IncrementalSortState * node,ParallelContext * pcxt)1184 ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
1185 {
1186 Size size;
1187
1188 /* don't need this if not instrumenting or no workers */
1189 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1190 return;
1191
1192 size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1193 size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1194 shm_toc_estimate_chunk(&pcxt->estimator, size);
1195 shm_toc_estimate_keys(&pcxt->estimator, 1);
1196 }
1197
1198 /* ----------------------------------------------------------------
1199 * ExecSortInitializeDSM
1200 *
1201 * Initialize DSM space for sort statistics.
1202 * ----------------------------------------------------------------
1203 */
1204 void
ExecIncrementalSortInitializeDSM(IncrementalSortState * node,ParallelContext * pcxt)1205 ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
1206 {
1207 Size size;
1208
1209 /* don't need this if not instrumenting or no workers */
1210 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1211 return;
1212
1213 size = offsetof(SharedIncrementalSortInfo, sinfo)
1214 + pcxt->nworkers * sizeof(IncrementalSortInfo);
1215 node->shared_info = shm_toc_allocate(pcxt->toc, size);
1216 /* ensure any unfilled slots will contain zeroes */
1217 memset(node->shared_info, 0, size);
1218 node->shared_info->num_workers = pcxt->nworkers;
1219 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1220 node->shared_info);
1221 }
1222
1223 /* ----------------------------------------------------------------
1224 * ExecSortInitializeWorker
1225 *
1226 * Attach worker to DSM space for sort statistics.
1227 * ----------------------------------------------------------------
1228 */
1229 void
ExecIncrementalSortInitializeWorker(IncrementalSortState * node,ParallelWorkerContext * pwcxt)1230 ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
1231 {
1232 node->shared_info =
1233 shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1234 node->am_worker = true;
1235 }
1236
1237 /* ----------------------------------------------------------------
1238 * ExecSortRetrieveInstrumentation
1239 *
1240 * Transfer sort statistics from DSM to private memory.
1241 * ----------------------------------------------------------------
1242 */
1243 void
ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState * node)1244 ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
1245 {
1246 Size size;
1247 SharedIncrementalSortInfo *si;
1248
1249 if (node->shared_info == NULL)
1250 return;
1251
1252 size = offsetof(SharedIncrementalSortInfo, sinfo)
1253 + node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1254 si = palloc(size);
1255 memcpy(si, node->shared_info, size);
1256 node->shared_info = si;
1257 }
1258