1 /*------------------------------------------------------------------------- 2 * 3 * nodeIncrementalSort.c 4 * Routines to handle incremental sorting of relations. 5 * 6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * IDENTIFICATION 10 * src/backend/executor/nodeIncrementalSort.c 11 * 12 * DESCRIPTION 13 * 14 * Incremental sort is an optimized variant of multikey sort for cases 15 * when the input is already sorted by a prefix of the sort keys. For 16 * example when a sort by (key1, key2 ... keyN) is requested, and the 17 * input is already sorted by (key1, key2 ... keyM), M < N, we can 18 * divide the input into groups where keys (key1, ... keyM) are equal, 19 * and only sort on the remaining columns. 20 * 21 * Consider the following example. We have input tuples consisting of 22 * two integers (X, Y) already presorted by X, while it's required to 23 * sort them by both X and Y. Let input tuples be following. 24 * 25 * (1, 5) 26 * (1, 2) 27 * (2, 9) 28 * (2, 1) 29 * (2, 5) 30 * (3, 3) 31 * (3, 7) 32 * 33 * An incremental sort algorithm would split the input into the following 34 * groups, which have equal X, and then sort them by Y individually: 35 * 36 * (1, 5) (1, 2) 37 * (2, 9) (2, 1) (2, 5) 38 * (3, 3) (3, 7) 39 * 40 * After sorting these groups and putting them altogether, we would get 41 * the following result which is sorted by X and Y, as requested: 42 * 43 * (1, 2) 44 * (1, 5) 45 * (2, 1) 46 * (2, 5) 47 * (2, 9) 48 * (3, 3) 49 * (3, 7) 50 * 51 * Incremental sort may be more efficient than plain sort, particularly 52 * on large datasets, as it reduces the amount of data to sort at once, 53 * making it more likely it fits into work_mem (eliminating the need to 54 * spill to disk). But the main advantage of incremental sort is that 55 * it can start producing rows early, before sorting the whole dataset, 56 * which is a significant benefit especially for queries with LIMIT. 57 * 58 * The algorithm we've implemented here is modified from the theoretical 59 * base described above by operating in two different modes: 60 * - Fetching a minimum number of tuples without checking prefix key 61 * group membership and sorting on all columns when safe. 62 * - Fetching all tuples for a single prefix key group and sorting on 63 * solely the unsorted columns. 64 * We always begin in the first mode, and employ a heuristic to switch 65 * into the second mode if we believe it's beneficial. 66 * 67 * Sorting incrementally can potentially use less memory, avoid fetching 68 * and sorting all tuples in the dataset, and begin returning tuples before 69 * the entire result set is available. 70 * 71 * The hybrid mode approach allows us to optimize for both very small 72 * groups (where the overhead of a new tuplesort is high) and very large 73 * groups (where we can lower cost by not having to sort on already sorted 74 * columns), albeit at some extra cost while switching between modes. 75 * 76 *------------------------------------------------------------------------- 77 */ 78 79 #include "postgres.h" 80 81 #include "access/htup_details.h" 82 #include "executor/execdebug.h" 83 #include "executor/nodeIncrementalSort.h" 84 #include "miscadmin.h" 85 #include "utils/lsyscache.h" 86 #include "utils/tuplesort.h" 87 88 /* 89 * We need to store the instrumentation information in either local node's sort 90 * info or, for a parallel worker process, in the shared info (this avoids 91 * having to additionally memcpy the info from local memory to shared memory 92 * at each instrumentation call). This macro expands to choose the proper sort 93 * state and group info. 94 * 95 * Arguments: 96 * - node: type IncrementalSortState * 97 * - groupName: the token fullsort or prefixsort 98 */ 99 #define INSTRUMENT_SORT_GROUP(node, groupName) \ 100 do { \ 101 if ((node)->ss.ps.instrument != NULL) \ 102 { \ 103 if ((node)->shared_info && (node)->am_worker) \ 104 { \ 105 Assert(IsParallelWorker()); \ 106 Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \ 107 instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \ 108 (node)->groupName##_state); \ 109 } \ 110 else \ 111 { \ 112 instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \ 113 (node)->groupName##_state); \ 114 } \ 115 } \ 116 } while (0) 117 118 119 /* ---------------------------------------------------------------- 120 * instrumentSortedGroup 121 * 122 * Because incremental sort processes (potentially many) sort batches, we need 123 * to capture tuplesort stats each time we finalize a sort state. This summary 124 * data is later used for EXPLAIN ANALYZE output. 125 * ---------------------------------------------------------------- 126 */ 127 static void 128 instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo, 129 Tuplesortstate *sortState) 130 { 131 TuplesortInstrumentation sort_instr; 132 133 groupInfo->groupCount++; 134 135 tuplesort_get_stats(sortState, &sort_instr); 136 137 /* Calculate total and maximum memory and disk space used. */ 138 switch (sort_instr.spaceType) 139 { 140 case SORT_SPACE_TYPE_DISK: 141 groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed; 142 if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed) 143 groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed; 144 145 break; 146 case SORT_SPACE_TYPE_MEMORY: 147 groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed; 148 if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed) 149 groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed; 150 151 break; 152 } 153 154 /* Track each sort method we've used. */ 155 groupInfo->sortMethods |= sort_instr.sortMethod; 156 } 157 158 /* ---------------------------------------------------------------- 159 * preparePresortedCols 160 * 161 * Prepare information for presorted_keys comparisons. 162 * ---------------------------------------------------------------- 163 */ 164 static void 165 preparePresortedCols(IncrementalSortState *node) 166 { 167 IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan); 168 169 node->presorted_keys = 170 (PresortedKeyData *) palloc(plannode->nPresortedCols * 171 sizeof(PresortedKeyData)); 172 173 /* Pre-cache comparison functions for each pre-sorted key. */ 174 for (int i = 0; i < plannode->nPresortedCols; i++) 175 { 176 Oid equalityOp, 177 equalityFunc; 178 PresortedKeyData *key; 179 180 key = &node->presorted_keys[i]; 181 key->attno = plannode->sort.sortColIdx[i]; 182 183 equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i], 184 NULL); 185 if (!OidIsValid(equalityOp)) 186 elog(ERROR, "missing equality operator for ordering operator %u", 187 plannode->sort.sortOperators[i]); 188 189 equalityFunc = get_opcode(equalityOp); 190 if (!OidIsValid(equalityFunc)) 191 elog(ERROR, "missing function for operator %u", equalityOp); 192 193 /* Lookup the comparison function */ 194 fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext); 195 196 /* We can initialize the callinfo just once and re-use it */ 197 key->fcinfo = palloc0(SizeForFunctionCallInfo(2)); 198 InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2, 199 plannode->sort.collations[i], NULL, NULL); 200 key->fcinfo->args[0].isnull = false; 201 key->fcinfo->args[1].isnull = false; 202 } 203 } 204 205 /* ---------------------------------------------------------------- 206 * isCurrentGroup 207 * 208 * Check whether a given tuple belongs to the current sort group by comparing 209 * the presorted column values to the pivot tuple of the current group. 210 * ---------------------------------------------------------------- 211 */ 212 static bool 213 isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple) 214 { 215 int nPresortedCols; 216 217 nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols; 218 219 /* 220 * That the input is sorted by keys * (0, ... n) implies that the tail 221 * keys are more likely to change. Therefore we do our comparison starting 222 * from the last pre-sorted column to optimize for early detection of 223 * inequality and minimizing the number of function calls.. 224 */ 225 for (int i = nPresortedCols - 1; i >= 0; i--) 226 { 227 Datum datumA, 228 datumB, 229 result; 230 bool isnullA, 231 isnullB; 232 AttrNumber attno = node->presorted_keys[i].attno; 233 PresortedKeyData *key; 234 235 datumA = slot_getattr(pivot, attno, &isnullA); 236 datumB = slot_getattr(tuple, attno, &isnullB); 237 238 /* Special case for NULL-vs-NULL, else use standard comparison */ 239 if (isnullA || isnullB) 240 { 241 if (isnullA == isnullB) 242 continue; 243 else 244 return false; 245 } 246 247 key = &node->presorted_keys[i]; 248 249 key->fcinfo->args[0].value = datumA; 250 key->fcinfo->args[1].value = datumB; 251 252 /* just for paranoia's sake, we reset isnull each time */ 253 key->fcinfo->isnull = false; 254 255 result = FunctionCallInvoke(key->fcinfo); 256 257 /* Check for null result, since caller is clearly not expecting one */ 258 if (key->fcinfo->isnull) 259 elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid); 260 261 if (!DatumGetBool(result)) 262 return false; 263 } 264 return true; 265 } 266 267 /* ---------------------------------------------------------------- 268 * switchToPresortedPrefixMode 269 * 270 * When we determine that we've likely encountered a large batch of tuples all 271 * having the same presorted prefix values, we want to optimize tuplesort by 272 * only sorting on unsorted suffix keys. 273 * 274 * The problem is that we've already accumulated several tuples in another 275 * tuplesort configured to sort by all columns (assuming that there may be 276 * more than one prefix key group). So to switch to presorted prefix mode we 277 * have to go back and look at all the tuples we've already accumulated to 278 * verify they're all part of the same prefix key group before sorting them 279 * solely by unsorted suffix keys. 280 * 281 * While it's likely that all tuples already fetched are all part of a single 282 * prefix group, we also have to handle the possibility that there is at least 283 * one different prefix key group before the large prefix key group. 284 * ---------------------------------------------------------------- 285 */ 286 static void 287 switchToPresortedPrefixMode(PlanState *pstate) 288 { 289 IncrementalSortState *node = castNode(IncrementalSortState, pstate); 290 ScanDirection dir; 291 int64 nTuples; 292 TupleDesc tupDesc; 293 PlanState *outerNode; 294 IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan); 295 296 dir = node->ss.ps.state->es_direction; 297 outerNode = outerPlanState(node); 298 tupDesc = ExecGetResultType(outerNode); 299 300 /* Configure the prefix sort state the first time around. */ 301 if (node->prefixsort_state == NULL) 302 { 303 Tuplesortstate *prefixsort_state; 304 int nPresortedCols = plannode->nPresortedCols; 305 306 /* 307 * Optimize the sort by assuming the prefix columns are all equal and 308 * thus we only need to sort by any remaining columns. 309 */ 310 prefixsort_state = tuplesort_begin_heap(tupDesc, 311 plannode->sort.numCols - nPresortedCols, 312 &(plannode->sort.sortColIdx[nPresortedCols]), 313 &(plannode->sort.sortOperators[nPresortedCols]), 314 &(plannode->sort.collations[nPresortedCols]), 315 &(plannode->sort.nullsFirst[nPresortedCols]), 316 work_mem, 317 NULL, 318 false); 319 node->prefixsort_state = prefixsort_state; 320 } 321 else 322 { 323 /* Next group of presorted data */ 324 tuplesort_reset(node->prefixsort_state); 325 } 326 327 /* 328 * If the current node has a bound, then it's reasonably likely that a 329 * large prefix key group will benefit from bounded sort, so configure the 330 * tuplesort to allow for that optimization. 331 */ 332 if (node->bounded) 333 { 334 SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n", 335 node->bound - node->bound_Done); 336 tuplesort_set_bound(node->prefixsort_state, 337 node->bound - node->bound_Done); 338 } 339 340 /* 341 * Copy as many tuples as we can (i.e., in the same prefix key group) from 342 * the full sort state to the prefix sort state. 343 */ 344 for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++) 345 { 346 /* 347 * When we encounter multiple prefix key groups inside the full sort 348 * tuplesort we have to carry over the last read tuple into the next 349 * batch. 350 */ 351 if (nTuples == 0 && !TupIsNull(node->transfer_tuple)) 352 { 353 tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple); 354 /* The carried over tuple is our new group pivot tuple. */ 355 ExecCopySlot(node->group_pivot, node->transfer_tuple); 356 } 357 else 358 { 359 tuplesort_gettupleslot(node->fullsort_state, 360 ScanDirectionIsForward(dir), 361 false, node->transfer_tuple, NULL); 362 363 /* 364 * If this is our first time through the loop, then we need to 365 * save the first tuple we get as our new group pivot. 366 */ 367 if (TupIsNull(node->group_pivot)) 368 ExecCopySlot(node->group_pivot, node->transfer_tuple); 369 370 if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple)) 371 { 372 tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple); 373 } 374 else 375 { 376 /* 377 * The tuple isn't part of the current batch so we need to 378 * carry it over into the next batch of tuples we transfer out 379 * of the full sort tuplesort into the presorted prefix 380 * tuplesort. We don't actually have to do anything special to 381 * save the tuple since we've already loaded it into the 382 * node->transfer_tuple slot, and, even though that slot 383 * points to memory inside the full sort tuplesort, we can't 384 * reset that tuplesort anyway until we've fully transferred 385 * out its tuples, so this reference is safe. We do need to 386 * reset the group pivot tuple though since we've finished the 387 * current prefix key group. 388 */ 389 ExecClearTuple(node->group_pivot); 390 391 /* Break out of for-loop early */ 392 break; 393 } 394 } 395 } 396 397 /* 398 * Track how many tuples remain in the full sort batch so that we know if 399 * we need to sort multiple prefix key groups before processing tuples 400 * remaining in the large single prefix key group we think we've 401 * encountered. 402 */ 403 SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples); 404 node->n_fullsort_remaining -= nTuples; 405 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining); 406 407 if (node->n_fullsort_remaining == 0) 408 { 409 /* 410 * We've found that all tuples remaining in the full sort batch are in 411 * the same prefix key group and moved all of those tuples into the 412 * presorted prefix tuplesort. We don't know that we've yet found the 413 * last tuple in the current prefix key group, so save our pivot 414 * comparison tuple and continue fetching tuples from the outer 415 * execution node to load into the presorted prefix tuplesort. 416 */ 417 ExecCopySlot(node->group_pivot, node->transfer_tuple); 418 SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n"); 419 node->execution_status = INCSORT_LOADPREFIXSORT; 420 421 /* 422 * Make sure we clear the transfer tuple slot so that next time we 423 * encounter a large prefix key group we don't incorrectly assume we 424 * have a tuple carried over from the previous group. 425 */ 426 ExecClearTuple(node->transfer_tuple); 427 } 428 else 429 { 430 /* 431 * We finished a group but didn't consume all of the tuples from the 432 * full sort state, so we'll sort this batch, let the outer node read 433 * out all of those tuples, and then come back around to find another 434 * batch. 435 */ 436 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples); 437 tuplesort_performsort(node->prefixsort_state); 438 439 INSTRUMENT_SORT_GROUP(node, prefixsort); 440 441 if (node->bounded) 442 { 443 /* 444 * If the current node has a bound and we've already sorted n 445 * tuples, then the functional bound remaining is (original bound 446 * - n), so store the current number of processed tuples for use 447 * in configuring sorting bound. 448 */ 449 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n", 450 Min(node->bound, node->bound_Done + nTuples), node->bound_Done); 451 node->bound_Done = Min(node->bound, node->bound_Done + nTuples); 452 } 453 454 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n"); 455 node->execution_status = INCSORT_READPREFIXSORT; 456 } 457 } 458 459 /* 460 * Sorting many small groups with tuplesort is inefficient. In order to 461 * cope with this problem we don't start a new group until the current one 462 * contains at least DEFAULT_MIN_GROUP_SIZE tuples (unfortunately this also 463 * means we can't assume small groups of tuples all have the same prefix keys.) 464 * When we have a bound that's less than DEFAULT_MIN_GROUP_SIZE we start looking 465 * for the new group as soon as we've met our bound to avoid fetching more 466 * tuples than we absolutely have to fetch. 467 */ 468 #define DEFAULT_MIN_GROUP_SIZE 32 469 470 /* 471 * While we've optimized for small prefix key groups by not starting our prefix 472 * key comparisons until we've reached a minimum number of tuples, we don't want 473 * that optimization to cause us to lose out on the benefits of being able to 474 * assume a large group of tuples is fully presorted by its prefix keys. 475 * Therefore we use the DEFAULT_MAX_FULL_SORT_GROUP_SIZE cutoff as a heuristic 476 * for determining when we believe we've encountered a large group, and, if we 477 * get to that point without finding a new prefix key group we transition to 478 * presorted prefix key mode. 479 */ 480 #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE) 481 482 /* ---------------------------------------------------------------- 483 * ExecIncrementalSort 484 * 485 * Assuming that outer subtree returns tuple presorted by some prefix 486 * of target sort columns, performs incremental sort. 487 * 488 * Conditions: 489 * -- none. 490 * 491 * Initial States: 492 * -- the outer child is prepared to return the first tuple. 493 * ---------------------------------------------------------------- 494 */ 495 static TupleTableSlot * 496 ExecIncrementalSort(PlanState *pstate) 497 { 498 IncrementalSortState *node = castNode(IncrementalSortState, pstate); 499 EState *estate; 500 ScanDirection dir; 501 Tuplesortstate *read_sortstate; 502 Tuplesortstate *fullsort_state; 503 TupleTableSlot *slot; 504 IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan; 505 PlanState *outerNode; 506 TupleDesc tupDesc; 507 int64 nTuples = 0; 508 int64 minGroupSize; 509 510 CHECK_FOR_INTERRUPTS(); 511 512 estate = node->ss.ps.state; 513 dir = estate->es_direction; 514 fullsort_state = node->fullsort_state; 515 516 /* 517 * If a previous iteration has sorted a batch, then we need to check to 518 * see if there are any remaining tuples in that batch that we can return 519 * before moving on to other execution states. 520 */ 521 if (node->execution_status == INCSORT_READFULLSORT 522 || node->execution_status == INCSORT_READPREFIXSORT) 523 { 524 /* 525 * Return next tuple from the current sorted group set if available. 526 */ 527 read_sortstate = node->execution_status == INCSORT_READFULLSORT ? 528 fullsort_state : node->prefixsort_state; 529 slot = node->ss.ps.ps_ResultTupleSlot; 530 531 /* 532 * We have to populate the slot from the tuplesort before checking 533 * outerNodeDone because it will set the slot to NULL if no more 534 * tuples remain. If the tuplesort is empty, but we don't have any 535 * more tuples available for sort from the outer node, then 536 * outerNodeDone will have been set so we'll return that now-empty 537 * slot to the caller. 538 */ 539 if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir), 540 false, slot, NULL) || node->outerNodeDone) 541 542 /* 543 * Note: there isn't a good test case for the node->outerNodeDone 544 * check directly, but we need it for any plan where the outer 545 * node will fail when trying to fetch too many tuples. 546 */ 547 return slot; 548 else if (node->n_fullsort_remaining > 0) 549 { 550 /* 551 * When we transition to presorted prefix mode, we might have 552 * accumulated at least one additional prefix key group in the 553 * full sort tuplesort. The first call to 554 * switchToPresortedPrefixMode() will have pulled the first one of 555 * those groups out, and we've returned those tuples to the parent 556 * node, but if at this point we still have tuples remaining in 557 * the full sort state (i.e., n_fullsort_remaining > 0), then we 558 * need to re-execute the prefix mode transition function to pull 559 * out the next prefix key group. 560 */ 561 SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n", 562 node->n_fullsort_remaining); 563 switchToPresortedPrefixMode(pstate); 564 } 565 else 566 { 567 /* 568 * If we don't have any sorted tuples to read and we're not 569 * currently transitioning into presorted prefix sort mode, then 570 * it's time to start the process all over again by building a new 571 * group in the full sort state. 572 */ 573 SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n"); 574 node->execution_status = INCSORT_LOADFULLSORT; 575 } 576 } 577 578 /* 579 * Scan the subplan in the forward direction while creating the sorted 580 * data. 581 */ 582 estate->es_direction = ForwardScanDirection; 583 584 outerNode = outerPlanState(node); 585 tupDesc = ExecGetResultType(outerNode); 586 587 /* Load tuples into the full sort state. */ 588 if (node->execution_status == INCSORT_LOADFULLSORT) 589 { 590 /* 591 * Initialize sorting structures. 592 */ 593 if (fullsort_state == NULL) 594 { 595 /* 596 * Initialize presorted column support structures for 597 * isCurrentGroup(). It's correct to do this along with the 598 * initial initialization for the full sort state (and not for the 599 * prefix sort state) since we always load the full sort state 600 * first. 601 */ 602 preparePresortedCols(node); 603 604 /* 605 * Since we optimize small prefix key groups by accumulating a 606 * minimum number of tuples before sorting, we can't assume that a 607 * group of tuples all have the same prefix key values. Hence we 608 * setup the full sort tuplesort to sort by all requested sort 609 * keys. 610 */ 611 fullsort_state = tuplesort_begin_heap(tupDesc, 612 plannode->sort.numCols, 613 plannode->sort.sortColIdx, 614 plannode->sort.sortOperators, 615 plannode->sort.collations, 616 plannode->sort.nullsFirst, 617 work_mem, 618 NULL, 619 false); 620 node->fullsort_state = fullsort_state; 621 } 622 else 623 { 624 /* Reset sort for the next batch. */ 625 tuplesort_reset(fullsort_state); 626 } 627 628 /* 629 * Calculate the remaining tuples left if bounded and configure both 630 * bounded sort and the minimum group size accordingly. 631 */ 632 if (node->bounded) 633 { 634 int64 currentBound = node->bound - node->bound_Done; 635 636 /* 637 * Bounded sort isn't likely to be a useful optimization for full 638 * sort mode since we limit full sort mode to a relatively small 639 * number of tuples and tuplesort doesn't switch over to top-n 640 * heap sort anyway unless it hits (2 * bound) tuples. 641 */ 642 if (currentBound < DEFAULT_MIN_GROUP_SIZE) 643 tuplesort_set_bound(fullsort_state, currentBound); 644 645 minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound); 646 } 647 else 648 minGroupSize = DEFAULT_MIN_GROUP_SIZE; 649 650 /* 651 * Because we have to read the next tuple to find out that we've 652 * encountered a new prefix key group, on subsequent groups we have to 653 * carry over that extra tuple and add it to the new group's sort here 654 * before we read any new tuples from the outer node. 655 */ 656 if (!TupIsNull(node->group_pivot)) 657 { 658 tuplesort_puttupleslot(fullsort_state, node->group_pivot); 659 nTuples++; 660 661 /* 662 * We're in full sort mode accumulating a minimum number of tuples 663 * and not checking for prefix key equality yet, so we can't 664 * assume the group pivot tuple will remain the same -- unless 665 * we're using a minimum group size of 1, in which case the pivot 666 * is obviously still the pivot. 667 */ 668 if (nTuples != minGroupSize) 669 ExecClearTuple(node->group_pivot); 670 } 671 672 673 /* 674 * Pull as many tuples from the outer node as possible given our 675 * current operating mode. 676 */ 677 for (;;) 678 { 679 slot = ExecProcNode(outerNode); 680 681 /* 682 * If the outer node can't provide us any more tuples, then we can 683 * sort the current group and return those tuples. 684 */ 685 if (TupIsNull(slot)) 686 { 687 /* 688 * We need to know later if the outer node has completed to be 689 * able to distinguish between being done with a batch and 690 * being done with the whole node. 691 */ 692 node->outerNodeDone = true; 693 694 SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples); 695 tuplesort_performsort(fullsort_state); 696 697 INSTRUMENT_SORT_GROUP(node, fullsort); 698 699 SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n"); 700 node->execution_status = INCSORT_READFULLSORT; 701 break; 702 } 703 704 /* Accumulate the next group of presorted tuples. */ 705 if (nTuples < minGroupSize) 706 { 707 /* 708 * If we haven't yet hit our target minimum group size, then 709 * we don't need to bother checking for inclusion in the 710 * current prefix group since at this point we'll assume that 711 * we'll full sort this batch to avoid a large number of very 712 * tiny (and thus inefficient) sorts. 713 */ 714 tuplesort_puttupleslot(fullsort_state, slot); 715 nTuples++; 716 717 /* 718 * If we've reached our minimum group size, then we need to 719 * store the most recent tuple as a pivot. 720 */ 721 if (nTuples == minGroupSize) 722 ExecCopySlot(node->group_pivot, slot); 723 } 724 else 725 { 726 /* 727 * If we've already accumulated enough tuples to reach our 728 * minimum group size, then we need to compare any additional 729 * tuples to our pivot tuple to see if we reach the end of 730 * that prefix key group. Only after we find changed prefix 731 * keys can we guarantee sort stability of the tuples we've 732 * already accumulated. 733 */ 734 if (isCurrentGroup(node, node->group_pivot, slot)) 735 { 736 /* 737 * As long as the prefix keys match the pivot tuple then 738 * load the tuple into the tuplesort. 739 */ 740 tuplesort_puttupleslot(fullsort_state, slot); 741 nTuples++; 742 } 743 else 744 { 745 /* 746 * Since the tuple we fetched isn't part of the current 747 * prefix key group we don't want to sort it as part of 748 * the current batch. Instead we use the group_pivot slot 749 * to carry it over to the next batch (even though we 750 * won't actually treat it as a group pivot). 751 */ 752 ExecCopySlot(node->group_pivot, slot); 753 754 if (node->bounded) 755 { 756 /* 757 * If the current node has a bound, and we've already 758 * sorted n tuples, then the functional bound 759 * remaining is (original bound - n), so store the 760 * current number of processed tuples for later use 761 * configuring the sort state's bound. 762 */ 763 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n", 764 node->bound_Done, 765 Min(node->bound, node->bound_Done + nTuples)); 766 node->bound_Done = Min(node->bound, node->bound_Done + nTuples); 767 } 768 769 /* 770 * Once we find changed prefix keys we can complete the 771 * sort and transition modes to reading out the sorted 772 * tuples. 773 */ 774 SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", 775 nTuples); 776 tuplesort_performsort(fullsort_state); 777 778 INSTRUMENT_SORT_GROUP(node, fullsort); 779 780 SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n"); 781 node->execution_status = INCSORT_READFULLSORT; 782 break; 783 } 784 } 785 786 /* 787 * Unless we've already transitioned modes to reading from the 788 * full sort state, then we assume that having read at least 789 * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're 790 * processing a large group of tuples all having equal prefix keys 791 * (but haven't yet found the final tuple in that prefix key 792 * group), so we need to transition into presorted prefix mode. 793 */ 794 if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE && 795 node->execution_status != INCSORT_READFULLSORT) 796 { 797 /* 798 * The group pivot we have stored has already been put into 799 * the tuplesort; we don't want to carry it over. Since we 800 * haven't yet found the end of the prefix key group, it might 801 * seem like we should keep this, but we don't actually know 802 * how many prefix key groups might be represented in the full 803 * sort state, so we'll let the mode transition function 804 * manage this state for us. 805 */ 806 ExecClearTuple(node->group_pivot); 807 808 /* 809 * Unfortunately the tuplesort API doesn't include a way to 810 * retrieve tuples unless a sort has been performed, so we 811 * perform the sort even though we could just as easily rely 812 * on FIFO retrieval semantics when transferring them to the 813 * presorted prefix tuplesort. 814 */ 815 SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples); 816 tuplesort_performsort(fullsort_state); 817 818 INSTRUMENT_SORT_GROUP(node, fullsort); 819 820 /* 821 * If the full sort tuplesort happened to switch into top-n 822 * heapsort mode then we will only be able to retrieve 823 * currentBound tuples (since the tuplesort will have only 824 * retained the top-n tuples). This is safe even though we 825 * haven't yet completed fetching the current prefix key group 826 * because the tuples we've "lost" already sorted "below" the 827 * retained ones, and we're already contractually guaranteed 828 * to not need any more than the currentBound tuples. 829 */ 830 if (tuplesort_used_bound(node->fullsort_state)) 831 { 832 int64 currentBound = node->bound - node->bound_Done; 833 834 SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n", 835 nTuples, Min(currentBound, nTuples)); 836 nTuples = Min(currentBound, nTuples); 837 } 838 839 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n", 840 nTuples); 841 842 /* 843 * We might have multiple prefix key groups in the full sort 844 * state, so the mode transition function needs to know that 845 * it needs to move from the fullsort to presorted prefix 846 * sort. 847 */ 848 node->n_fullsort_remaining = nTuples; 849 850 /* Transition the tuples to the presorted prefix tuplesort. */ 851 switchToPresortedPrefixMode(pstate); 852 853 /* 854 * Since we know we had tuples to move to the presorted prefix 855 * tuplesort, we know that unless that transition has verified 856 * that all tuples belonged to the same prefix key group (in 857 * which case we can go straight to continuing to load tuples 858 * into that tuplesort), we should have a tuple to return 859 * here. 860 * 861 * Either way, the appropriate execution status should have 862 * been set by switchToPresortedPrefixMode(), so we can drop 863 * out of the loop here and let the appropriate path kick in. 864 */ 865 break; 866 } 867 } 868 } 869 870 if (node->execution_status == INCSORT_LOADPREFIXSORT) 871 { 872 /* 873 * We only enter this state after the mode transition function has 874 * confirmed all remaining tuples from the full sort state have the 875 * same prefix and moved those tuples to the prefix sort state. That 876 * function has also set a group pivot tuple (which doesn't need to be 877 * carried over; it's already been put into the prefix sort state). 878 */ 879 Assert(!TupIsNull(node->group_pivot)); 880 881 /* 882 * Read tuples from the outer node and load them into the prefix sort 883 * state until we encounter a tuple whose prefix keys don't match the 884 * current group_pivot tuple, since we can't guarantee sort stability 885 * until we have all tuples matching those prefix keys. 886 */ 887 for (;;) 888 { 889 slot = ExecProcNode(outerNode); 890 891 /* 892 * If we've exhausted tuples from the outer node we're done 893 * loading the prefix sort state. 894 */ 895 if (TupIsNull(slot)) 896 { 897 /* 898 * We need to know later if the outer node has completed to be 899 * able to distinguish between being done with a batch and 900 * being done with the whole node. 901 */ 902 node->outerNodeDone = true; 903 break; 904 } 905 906 /* 907 * If the tuple's prefix keys match our pivot tuple, we're not 908 * done yet and can load it into the prefix sort state. If not, we 909 * don't want to sort it as part of the current batch. Instead we 910 * use the group_pivot slot to carry it over to the next batch 911 * (even though we won't actually treat it as a group pivot). 912 */ 913 if (isCurrentGroup(node, node->group_pivot, slot)) 914 { 915 tuplesort_puttupleslot(node->prefixsort_state, slot); 916 nTuples++; 917 } 918 else 919 { 920 ExecCopySlot(node->group_pivot, slot); 921 break; 922 } 923 } 924 925 /* 926 * Perform the sort and begin returning the tuples to the parent plan 927 * node. 928 */ 929 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples); 930 tuplesort_performsort(node->prefixsort_state); 931 932 INSTRUMENT_SORT_GROUP(node, prefixsort); 933 934 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n"); 935 node->execution_status = INCSORT_READPREFIXSORT; 936 937 if (node->bounded) 938 { 939 /* 940 * If the current node has a bound, and we've already sorted n 941 * tuples, then the functional bound remaining is (original bound 942 * - n), so store the current number of processed tuples for use 943 * in configuring sorting bound. 944 */ 945 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n", 946 node->bound_Done, 947 Min(node->bound, node->bound_Done + nTuples)); 948 node->bound_Done = Min(node->bound, node->bound_Done + nTuples); 949 } 950 } 951 952 /* Restore to user specified direction. */ 953 estate->es_direction = dir; 954 955 /* 956 * Get the first or next tuple from tuplesort. Returns NULL if no more 957 * tuples. 958 */ 959 read_sortstate = node->execution_status == INCSORT_READFULLSORT ? 960 fullsort_state : node->prefixsort_state; 961 slot = node->ss.ps.ps_ResultTupleSlot; 962 (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir), 963 false, slot, NULL); 964 return slot; 965 } 966 967 /* ---------------------------------------------------------------- 968 * ExecInitIncrementalSort 969 * 970 * Creates the run-time state information for the sort node 971 * produced by the planner and initializes its outer subtree. 972 * ---------------------------------------------------------------- 973 */ 974 IncrementalSortState * 975 ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags) 976 { 977 IncrementalSortState *incrsortstate; 978 979 SO_printf("ExecInitIncrementalSort: initializing sort node\n"); 980 981 /* 982 * Incremental sort can't be used with EXEC_FLAG_BACKWARD or 983 * EXEC_FLAG_MARK, because the current sort state contains only one sort 984 * batch rather than the full result set. 985 */ 986 Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0); 987 988 /* Initialize state structure. */ 989 incrsortstate = makeNode(IncrementalSortState); 990 incrsortstate->ss.ps.plan = (Plan *) node; 991 incrsortstate->ss.ps.state = estate; 992 incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort; 993 994 incrsortstate->execution_status = INCSORT_LOADFULLSORT; 995 incrsortstate->bounded = false; 996 incrsortstate->outerNodeDone = false; 997 incrsortstate->bound_Done = 0; 998 incrsortstate->fullsort_state = NULL; 999 incrsortstate->prefixsort_state = NULL; 1000 incrsortstate->group_pivot = NULL; 1001 incrsortstate->transfer_tuple = NULL; 1002 incrsortstate->n_fullsort_remaining = 0; 1003 incrsortstate->presorted_keys = NULL; 1004 1005 if (incrsortstate->ss.ps.instrument != NULL) 1006 { 1007 IncrementalSortGroupInfo *fullsortGroupInfo = 1008 &incrsortstate->incsort_info.fullsortGroupInfo; 1009 IncrementalSortGroupInfo *prefixsortGroupInfo = 1010 &incrsortstate->incsort_info.prefixsortGroupInfo; 1011 1012 fullsortGroupInfo->groupCount = 0; 1013 fullsortGroupInfo->maxDiskSpaceUsed = 0; 1014 fullsortGroupInfo->totalDiskSpaceUsed = 0; 1015 fullsortGroupInfo->maxMemorySpaceUsed = 0; 1016 fullsortGroupInfo->totalMemorySpaceUsed = 0; 1017 fullsortGroupInfo->sortMethods = 0; 1018 prefixsortGroupInfo->groupCount = 0; 1019 prefixsortGroupInfo->maxDiskSpaceUsed = 0; 1020 prefixsortGroupInfo->totalDiskSpaceUsed = 0; 1021 prefixsortGroupInfo->maxMemorySpaceUsed = 0; 1022 prefixsortGroupInfo->totalMemorySpaceUsed = 0; 1023 prefixsortGroupInfo->sortMethods = 0; 1024 } 1025 1026 /* 1027 * Miscellaneous initialization 1028 * 1029 * Sort nodes don't initialize their ExprContexts because they never call 1030 * ExecQual or ExecProject. 1031 */ 1032 1033 /* 1034 * Initialize child nodes. 1035 * 1036 * Incremental sort does not support backwards scans and mark/restore, so 1037 * we don't bother removing the flags from eflags here. We allow passing a 1038 * REWIND flag, because although incremental sort can't use it, the child 1039 * nodes may be able to do something more useful. 1040 */ 1041 outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags); 1042 1043 /* 1044 * Initialize scan slot and type. 1045 */ 1046 ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple); 1047 1048 /* 1049 * Initialize return slot and type. No need to initialize projection info 1050 * because we don't do any projections. 1051 */ 1052 ExecInitResultTupleSlotTL(&incrsortstate->ss.ps, &TTSOpsMinimalTuple); 1053 incrsortstate->ss.ps.ps_ProjInfo = NULL; 1054 1055 /* 1056 * Initialize standalone slots to store a tuple for pivot prefix keys and 1057 * for carrying over a tuple from one batch to the next. 1058 */ 1059 incrsortstate->group_pivot = 1060 MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)), 1061 &TTSOpsMinimalTuple); 1062 incrsortstate->transfer_tuple = 1063 MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)), 1064 &TTSOpsMinimalTuple); 1065 1066 SO_printf("ExecInitIncrementalSort: sort node initialized\n"); 1067 1068 return incrsortstate; 1069 } 1070 1071 /* ---------------------------------------------------------------- 1072 * ExecEndIncrementalSort(node) 1073 * ---------------------------------------------------------------- 1074 */ 1075 void 1076 ExecEndIncrementalSort(IncrementalSortState *node) 1077 { 1078 SO_printf("ExecEndIncrementalSort: shutting down sort node\n"); 1079 1080 /* clean out the scan tuple */ 1081 ExecClearTuple(node->ss.ss_ScanTupleSlot); 1082 /* must drop pointer to sort result tuple */ 1083 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); 1084 /* must drop standalone tuple slots from outer node */ 1085 ExecDropSingleTupleTableSlot(node->group_pivot); 1086 ExecDropSingleTupleTableSlot(node->transfer_tuple); 1087 1088 /* 1089 * Release tuplesort resources. 1090 */ 1091 if (node->fullsort_state != NULL) 1092 { 1093 tuplesort_end(node->fullsort_state); 1094 node->fullsort_state = NULL; 1095 } 1096 if (node->prefixsort_state != NULL) 1097 { 1098 tuplesort_end(node->prefixsort_state); 1099 node->prefixsort_state = NULL; 1100 } 1101 1102 /* 1103 * Shut down the subplan. 1104 */ 1105 ExecEndNode(outerPlanState(node)); 1106 1107 SO_printf("ExecEndIncrementalSort: sort node shutdown\n"); 1108 } 1109 1110 void 1111 ExecReScanIncrementalSort(IncrementalSortState *node) 1112 { 1113 PlanState *outerPlan = outerPlanState(node); 1114 1115 /* 1116 * Incremental sort doesn't support efficient rescan even when parameters 1117 * haven't changed (e.g., rewind) because unlike regular sort we don't 1118 * store all tuples at once for the full sort. 1119 * 1120 * So even if EXEC_FLAG_REWIND is set we just reset all of our state and 1121 * re-execute the sort along with the child node. Incremental sort itself 1122 * can't do anything smarter, but maybe the child nodes can. 1123 * 1124 * In theory if we've only filled the full sort with one batch (and 1125 * haven't reset it for a new batch yet) then we could efficiently rewind, 1126 * but that seems a narrow enough case that it's not worth handling 1127 * specially at this time. 1128 */ 1129 1130 /* must drop pointer to sort result tuple */ 1131 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); 1132 1133 if (node->group_pivot != NULL) 1134 ExecClearTuple(node->group_pivot); 1135 if (node->transfer_tuple != NULL) 1136 ExecClearTuple(node->transfer_tuple); 1137 1138 node->outerNodeDone = false; 1139 node->n_fullsort_remaining = 0; 1140 node->bound_Done = 0; 1141 node->presorted_keys = NULL; 1142 1143 node->execution_status = INCSORT_LOADFULLSORT; 1144 1145 /* 1146 * If we've set up either of the sort states yet, we need to reset them. 1147 * We could end them and null out the pointers, but there's no reason to 1148 * repay the setup cost, and because ExecIncrementalSort guards presorted 1149 * column functions by checking to see if the full sort state has been 1150 * initialized yet, setting the sort states to null here might actually 1151 * cause a leak. 1152 */ 1153 if (node->fullsort_state != NULL) 1154 { 1155 tuplesort_reset(node->fullsort_state); 1156 node->fullsort_state = NULL; 1157 } 1158 if (node->prefixsort_state != NULL) 1159 { 1160 tuplesort_reset(node->prefixsort_state); 1161 node->prefixsort_state = NULL; 1162 } 1163 1164 /* 1165 * If chgParam of subnode is not null, then the plan will be re-scanned by 1166 * the first ExecProcNode. 1167 */ 1168 if (outerPlan->chgParam == NULL) 1169 ExecReScan(outerPlan); 1170 } 1171 1172 /* ---------------------------------------------------------------- 1173 * Parallel Query Support 1174 * ---------------------------------------------------------------- 1175 */ 1176 1177 /* ---------------------------------------------------------------- 1178 * ExecSortEstimate 1179 * 1180 * Estimate space required to propagate sort statistics. 1181 * ---------------------------------------------------------------- 1182 */ 1183 void 1184 ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt) 1185 { 1186 Size size; 1187 1188 /* don't need this if not instrumenting or no workers */ 1189 if (!node->ss.ps.instrument || pcxt->nworkers == 0) 1190 return; 1191 1192 size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo)); 1193 size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo)); 1194 shm_toc_estimate_chunk(&pcxt->estimator, size); 1195 shm_toc_estimate_keys(&pcxt->estimator, 1); 1196 } 1197 1198 /* ---------------------------------------------------------------- 1199 * ExecSortInitializeDSM 1200 * 1201 * Initialize DSM space for sort statistics. 1202 * ---------------------------------------------------------------- 1203 */ 1204 void 1205 ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt) 1206 { 1207 Size size; 1208 1209 /* don't need this if not instrumenting or no workers */ 1210 if (!node->ss.ps.instrument || pcxt->nworkers == 0) 1211 return; 1212 1213 size = offsetof(SharedIncrementalSortInfo, sinfo) 1214 + pcxt->nworkers * sizeof(IncrementalSortInfo); 1215 node->shared_info = shm_toc_allocate(pcxt->toc, size); 1216 /* ensure any unfilled slots will contain zeroes */ 1217 memset(node->shared_info, 0, size); 1218 node->shared_info->num_workers = pcxt->nworkers; 1219 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, 1220 node->shared_info); 1221 } 1222 1223 /* ---------------------------------------------------------------- 1224 * ExecSortInitializeWorker 1225 * 1226 * Attach worker to DSM space for sort statistics. 1227 * ---------------------------------------------------------------- 1228 */ 1229 void 1230 ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt) 1231 { 1232 node->shared_info = 1233 shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); 1234 node->am_worker = true; 1235 } 1236 1237 /* ---------------------------------------------------------------- 1238 * ExecSortRetrieveInstrumentation 1239 * 1240 * Transfer sort statistics from DSM to private memory. 1241 * ---------------------------------------------------------------- 1242 */ 1243 void 1244 ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node) 1245 { 1246 Size size; 1247 SharedIncrementalSortInfo *si; 1248 1249 if (node->shared_info == NULL) 1250 return; 1251 1252 size = offsetof(SharedIncrementalSortInfo, sinfo) 1253 + node->shared_info->num_workers * sizeof(IncrementalSortInfo); 1254 si = palloc(size); 1255 memcpy(si, node->shared_info, size); 1256 node->shared_info = si; 1257 } 1258