1 /*------------------------------------------------------------------------- 2 * 3 * tuplesort.c 4 * Generalized tuple sorting routines. 5 * 6 * This module handles sorting of heap tuples, index tuples, or single 7 * Datums (and could easily support other kinds of sortable objects, 8 * if necessary). It works efficiently for both small and large amounts 9 * of data. Small amounts are sorted in-memory using qsort(). Large 10 * amounts are sorted using temporary files and a standard external sort 11 * algorithm. 12 * 13 * See Knuth, volume 3, for more than you want to know about the external 14 * sorting algorithm. Historically, we divided the input into sorted runs 15 * using replacement selection, in the form of a priority tree implemented 16 * as a heap (essentially his Algorithm 5.2.3H), but now we always use 17 * quicksort for run generation. We merge the runs using polyphase merge, 18 * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are 19 * implemented by logtape.c, which avoids space wastage by recycling disk 20 * space as soon as each block is read from its "tape". 21 * 22 * The approximate amount of memory allowed for any one sort operation 23 * is specified in kilobytes by the caller (most pass work_mem). Initially, 24 * we absorb tuples and simply store them in an unsorted array as long as 25 * we haven't exceeded workMem. If we reach the end of the input without 26 * exceeding workMem, we sort the array using qsort() and subsequently return 27 * tuples just by scanning the tuple array sequentially. If we do exceed 28 * workMem, we begin to emit tuples into sorted runs in temporary tapes. 29 * When tuples are dumped in batch after quicksorting, we begin a new run 30 * with a new output tape (selected per Algorithm D). After the end of the 31 * input is reached, we dump out remaining tuples in memory into a final run, 32 * then merge the runs using Algorithm D. 33 * 34 * When merging runs, we use a heap containing just the frontmost tuple from 35 * each source run; we repeatedly output the smallest tuple and replace it 36 * with the next tuple from its source tape (if any). When the heap empties, 37 * the merge is complete. The basic merge algorithm thus needs very little 38 * memory --- only M tuples for an M-way merge, and M is constrained to a 39 * small number. However, we can still make good use of our full workMem 40 * allocation by pre-reading additional blocks from each source tape. Without 41 * prereading, our access pattern to the temporary file would be very erratic; 42 * on average we'd read one block from each of M source tapes during the same 43 * time that we're writing M blocks to the output tape, so there is no 44 * sequentiality of access at all, defeating the read-ahead methods used by 45 * most Unix kernels. Worse, the output tape gets written into a very random 46 * sequence of blocks of the temp file, ensuring that things will be even 47 * worse when it comes time to read that tape. A straightforward merge pass 48 * thus ends up doing a lot of waiting for disk seeks. We can improve matters 49 * by prereading from each source tape sequentially, loading about workMem/M 50 * bytes from each tape in turn, and making the sequential blocks immediately 51 * available for reuse. This approach helps to localize both read and write 52 * accesses. The pre-reading is handled by logtape.c, we just tell it how 53 * much memory to use for the buffers. 54 * 55 * When the caller requests random access to the sort result, we form 56 * the final sorted run on a logical tape which is then "frozen", so 57 * that we can access it randomly. When the caller does not need random 58 * access, we return from tuplesort_performsort() as soon as we are down 59 * to one run per logical tape. The final merge is then performed 60 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this 61 * saves one cycle of writing all the data out to disk and reading it in. 62 * 63 * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the 64 * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according 65 * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that 66 * tape drives are expensive beasts, and in particular that there will always 67 * be many more runs than tape drives. In our implementation a "tape drive" 68 * doesn't cost much more than a few Kb of memory buffers, so we can afford 69 * to have lots of them. In particular, if we can have as many tape drives 70 * as sorted runs, we can eliminate any repeated I/O at all. In the current 71 * code we determine the number of tapes M on the basis of workMem: we want 72 * workMem/M to be large enough that we read a fair amount of data each time 73 * we preread from a tape, so as to maintain the locality of access described 74 * above. Nonetheless, with large workMem we can have many tapes (but not 75 * too many -- see the comments in tuplesort_merge_order). 76 * 77 * This module supports parallel sorting. Parallel sorts involve coordination 78 * among one or more worker processes, and a leader process, each with its own 79 * tuplesort state. The leader process (or, more accurately, the 80 * Tuplesortstate associated with a leader process) creates a full tapeset 81 * consisting of worker tapes with one run to merge; a run for every 82 * worker process. This is then merged. Worker processes are guaranteed to 83 * produce exactly one output run from their partial input. 84 * 85 * 86 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 87 * Portions Copyright (c) 1994, Regents of the University of California 88 * 89 * IDENTIFICATION 90 * src/backend/utils/sort/tuplesort.c 91 * 92 *------------------------------------------------------------------------- 93 */ 94 95 #include "postgres.h" 96 97 #include <limits.h> 98 99 #include "access/hash.h" 100 #include "access/htup_details.h" 101 #include "access/nbtree.h" 102 #include "catalog/index.h" 103 #include "catalog/pg_am.h" 104 #include "commands/tablespace.h" 105 #include "executor/executor.h" 106 #include "miscadmin.h" 107 #include "pg_trace.h" 108 #include "utils/datum.h" 109 #include "utils/logtape.h" 110 #include "utils/lsyscache.h" 111 #include "utils/memutils.h" 112 #include "utils/pg_rusage.h" 113 #include "utils/rel.h" 114 #include "utils/sortsupport.h" 115 #include "utils/tuplesort.h" 116 117 118 /* sort-type codes for sort__start probes */ 119 #define HEAP_SORT 0 120 #define INDEX_SORT 1 121 #define DATUM_SORT 2 122 #define CLUSTER_SORT 3 123 124 /* Sort parallel code from state for sort__start probes */ 125 #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \ 126 (state)->worker >= 0 ? 1 : 2) 127 128 /* GUC variables */ 129 #ifdef TRACE_SORT 130 bool trace_sort = false; 131 #endif 132 133 #ifdef DEBUG_BOUNDED_SORT 134 bool optimize_bounded_sort = true; 135 #endif 136 137 138 /* 139 * The objects we actually sort are SortTuple structs. These contain 140 * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), 141 * which is a separate palloc chunk --- we assume it is just one chunk and 142 * can be freed by a simple pfree() (except during merge, when we use a 143 * simple slab allocator). SortTuples also contain the tuple's first key 144 * column in Datum/nullflag format, and an index integer. 145 * 146 * Storing the first key column lets us save heap_getattr or index_getattr 147 * calls during tuple comparisons. We could extract and save all the key 148 * columns not just the first, but this would increase code complexity and 149 * overhead, and wouldn't actually save any comparison cycles in the common 150 * case where the first key determines the comparison result. Note that 151 * for a pass-by-reference datatype, datum1 points into the "tuple" storage. 152 * 153 * There is one special case: when the sort support infrastructure provides an 154 * "abbreviated key" representation, where the key is (typically) a pass by 155 * value proxy for a pass by reference type. In this case, the abbreviated key 156 * is stored in datum1 in place of the actual first key column. 157 * 158 * When sorting single Datums, the data value is represented directly by 159 * datum1/isnull1 for pass by value types (or null values). If the datatype is 160 * pass-by-reference and isnull1 is false, then "tuple" points to a separately 161 * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then 162 * either the same pointer as "tuple", or is an abbreviated key value as 163 * described above. Accordingly, "tuple" is always used in preference to 164 * datum1 as the authoritative value for pass-by-reference cases. 165 * 166 * tupindex holds the input tape number that each tuple in the heap was read 167 * from during merge passes. 168 */ 169 typedef struct 170 { 171 void *tuple; /* the tuple itself */ 172 Datum datum1; /* value of first key column */ 173 bool isnull1; /* is first key column NULL? */ 174 int tupindex; /* see notes above */ 175 } SortTuple; 176 177 /* 178 * During merge, we use a pre-allocated set of fixed-size slots to hold 179 * tuples. To avoid palloc/pfree overhead. 180 * 181 * Merge doesn't require a lot of memory, so we can afford to waste some, 182 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the 183 * palloc() overhead is not significant anymore. 184 * 185 * 'nextfree' is valid when this chunk is in the free list. When in use, the 186 * slot holds a tuple. 187 */ 188 #define SLAB_SLOT_SIZE 1024 189 190 typedef union SlabSlot 191 { 192 union SlabSlot *nextfree; 193 char buffer[SLAB_SLOT_SIZE]; 194 } SlabSlot; 195 196 /* 197 * Possible states of a Tuplesort object. These denote the states that 198 * persist between calls of Tuplesort routines. 199 */ 200 typedef enum 201 { 202 TSS_INITIAL, /* Loading tuples; still within memory limit */ 203 TSS_BOUNDED, /* Loading tuples into bounded-size heap */ 204 TSS_BUILDRUNS, /* Loading tuples; writing to tape */ 205 TSS_SORTEDINMEM, /* Sort completed entirely in memory */ 206 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */ 207 TSS_FINALMERGE /* Performing final merge on-the-fly */ 208 } TupSortStatus; 209 210 /* 211 * Parameters for calculation of number of tapes to use --- see inittapes() 212 * and tuplesort_merge_order(). 213 * 214 * In this calculation we assume that each tape will cost us about 1 blocks 215 * worth of buffer space. This ignores the overhead of all the other data 216 * structures needed for each tape, but it's probably close enough. 217 * 218 * MERGE_BUFFER_SIZE is how much data we'd like to read from each input 219 * tape during a preread cycle (see discussion at top of file). 220 */ 221 #define MINORDER 6 /* minimum merge order */ 222 #define MAXORDER 500 /* maximum merge order */ 223 #define TAPE_BUFFER_OVERHEAD BLCKSZ 224 #define MERGE_BUFFER_SIZE (BLCKSZ * 32) 225 226 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b, 227 Tuplesortstate *state); 228 229 /* 230 * Private state of a Tuplesort operation. 231 */ 232 struct Tuplesortstate 233 { 234 TupSortStatus status; /* enumerated value as shown above */ 235 int nKeys; /* number of columns in sort key */ 236 bool randomAccess; /* did caller request random access? */ 237 bool bounded; /* did caller specify a maximum number of 238 * tuples to return? */ 239 bool boundUsed; /* true if we made use of a bounded heap */ 240 int bound; /* if bounded, the maximum number of tuples */ 241 bool tuples; /* Can SortTuple.tuple ever be set? */ 242 int64 availMem; /* remaining memory available, in bytes */ 243 int64 allowedMem; /* total memory allowed, in bytes */ 244 int maxTapes; /* number of tapes (Knuth's T) */ 245 int tapeRange; /* maxTapes-1 (Knuth's P) */ 246 MemoryContext sortcontext; /* memory context holding most sort data */ 247 MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ 248 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ 249 250 /* 251 * These function pointers decouple the routines that must know what kind 252 * of tuple we are sorting from the routines that don't need to know it. 253 * They are set up by the tuplesort_begin_xxx routines. 254 * 255 * Function to compare two tuples; result is per qsort() convention, ie: 256 * <0, 0, >0 according as a<b, a=b, a>b. The API must match 257 * qsort_arg_comparator. 258 */ 259 SortTupleComparator comparetup; 260 261 /* 262 * Function to copy a supplied input tuple into palloc'd space and set up 263 * its SortTuple representation (ie, set tuple/datum1/isnull1). Also, 264 * state->availMem must be decreased by the amount of space used for the 265 * tuple copy (note the SortTuple struct itself is not counted). 266 */ 267 void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup); 268 269 /* 270 * Function to write a stored tuple onto tape. The representation of the 271 * tuple on tape need not be the same as it is in memory; requirements on 272 * the tape representation are given below. Unless the slab allocator is 273 * used, after writing the tuple, pfree() the out-of-line data (not the 274 * SortTuple struct!), and increase state->availMem by the amount of 275 * memory space thereby released. 276 */ 277 void (*writetup) (Tuplesortstate *state, int tapenum, 278 SortTuple *stup); 279 280 /* 281 * Function to read a stored tuple from tape back into memory. 'len' is 282 * the already-read length of the stored tuple. The tuple is allocated 283 * from the slab memory arena, or is palloc'd, see readtup_alloc(). 284 */ 285 void (*readtup) (Tuplesortstate *state, SortTuple *stup, 286 int tapenum, unsigned int len); 287 288 /* 289 * This array holds the tuples now in sort memory. If we are in state 290 * INITIAL, the tuples are in no particular order; if we are in state 291 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS 292 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm 293 * H. In state SORTEDONTAPE, the array is not used. 294 */ 295 SortTuple *memtuples; /* array of SortTuple structs */ 296 int memtupcount; /* number of tuples currently present */ 297 int memtupsize; /* allocated length of memtuples array */ 298 bool growmemtuples; /* memtuples' growth still underway? */ 299 300 /* 301 * Memory for tuples is sometimes allocated using a simple slab allocator, 302 * rather than with palloc(). Currently, we switch to slab allocation 303 * when we start merging. Merging only needs to keep a small, fixed 304 * number of tuples in memory at any time, so we can avoid the 305 * palloc/pfree overhead by recycling a fixed number of fixed-size slots 306 * to hold the tuples. 307 * 308 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE 309 * slots. The allocation is sized to have one slot per tape, plus one 310 * additional slot. We need that many slots to hold all the tuples kept 311 * in the heap during merge, plus the one we have last returned from the 312 * sort, with tuplesort_gettuple. 313 * 314 * Initially, all the slots are kept in a linked list of free slots. When 315 * a tuple is read from a tape, it is put to the next available slot, if 316 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd 317 * instead. 318 * 319 * When we're done processing a tuple, we return the slot back to the free 320 * list, or pfree() if it was palloc'd. We know that a tuple was 321 * allocated from the slab, if its pointer value is between 322 * slabMemoryBegin and -End. 323 * 324 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of 325 * tracking memory usage is not used. 326 */ 327 bool slabAllocatorUsed; 328 329 char *slabMemoryBegin; /* beginning of slab memory arena */ 330 char *slabMemoryEnd; /* end of slab memory arena */ 331 SlabSlot *slabFreeHead; /* head of free list */ 332 333 /* Buffer size to use for reading input tapes, during merge. */ 334 size_t read_buffer_size; 335 336 /* 337 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that 338 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE 339 * modes), we remember the tuple in 'lastReturnedTuple', so that we can 340 * recycle the memory on next gettuple call. 341 */ 342 void *lastReturnedTuple; 343 344 /* 345 * While building initial runs, this is the current output run number. 346 * Afterwards, it is the number of initial runs we made. 347 */ 348 int currentRun; 349 350 /* 351 * Unless otherwise noted, all pointer variables below are pointers to 352 * arrays of length maxTapes, holding per-tape data. 353 */ 354 355 /* 356 * This variable is only used during merge passes. mergeactive[i] is true 357 * if we are reading an input run from (actual) tape number i and have not 358 * yet exhausted that run. 359 */ 360 bool *mergeactive; /* active input run source? */ 361 362 /* 363 * Variables for Algorithm D. Note that destTape is a "logical" tape 364 * number, ie, an index into the tp_xxx[] arrays. Be careful to keep 365 * "logical" and "actual" tape numbers straight! 366 */ 367 int Level; /* Knuth's l */ 368 int destTape; /* current output tape (Knuth's j, less 1) */ 369 int *tp_fib; /* Target Fibonacci run counts (A[]) */ 370 int *tp_runs; /* # of real runs on each tape */ 371 int *tp_dummy; /* # of dummy runs for each tape (D[]) */ 372 int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ 373 int activeTapes; /* # of active input tapes in merge pass */ 374 375 /* 376 * These variables are used after completion of sorting to keep track of 377 * the next tuple to return. (In the tape case, the tape's current read 378 * position is also critical state.) 379 */ 380 int result_tape; /* actual tape number of finished output */ 381 int current; /* array index (only used if SORTEDINMEM) */ 382 bool eof_reached; /* reached EOF (needed for cursors) */ 383 384 /* markpos_xxx holds marked position for mark and restore */ 385 long markpos_block; /* tape block# (only used if SORTEDONTAPE) */ 386 int markpos_offset; /* saved "current", or offset in tape block */ 387 bool markpos_eof; /* saved "eof_reached" */ 388 389 /* 390 * These variables are used during parallel sorting. 391 * 392 * worker is our worker identifier. Follows the general convention that 393 * -1 value relates to a leader tuplesort, and values >= 0 worker 394 * tuplesorts. (-1 can also be a serial tuplesort.) 395 * 396 * shared is mutable shared memory state, which is used to coordinate 397 * parallel sorts. 398 * 399 * nParticipants is the number of worker Tuplesortstates known by the 400 * leader to have actually been launched, which implies that they must 401 * finish a run leader can merge. Typically includes a worker state held 402 * by the leader process itself. Set in the leader Tuplesortstate only. 403 */ 404 int worker; 405 Sharedsort *shared; 406 int nParticipants; 407 408 /* 409 * The sortKeys variable is used by every case other than the hash index 410 * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the 411 * MinimalTuple and CLUSTER routines, though. 412 */ 413 TupleDesc tupDesc; 414 SortSupport sortKeys; /* array of length nKeys */ 415 416 /* 417 * This variable is shared by the single-key MinimalTuple case and the 418 * Datum case (which both use qsort_ssup()). Otherwise it's NULL. 419 */ 420 SortSupport onlyKey; 421 422 /* 423 * Additional state for managing "abbreviated key" sortsupport routines 424 * (which currently may be used by all cases except the hash index case). 425 * Tracks the intervals at which the optimization's effectiveness is 426 * tested. 427 */ 428 int64 abbrevNext; /* Tuple # at which to next check 429 * applicability */ 430 431 /* 432 * These variables are specific to the CLUSTER case; they are set by 433 * tuplesort_begin_cluster. 434 */ 435 IndexInfo *indexInfo; /* info about index being used for reference */ 436 EState *estate; /* for evaluating index expressions */ 437 438 /* 439 * These variables are specific to the IndexTuple case; they are set by 440 * tuplesort_begin_index_xxx and used only by the IndexTuple routines. 441 */ 442 Relation heapRel; /* table the index is being built on */ 443 Relation indexRel; /* index being built */ 444 445 /* These are specific to the index_btree subcase: */ 446 bool enforceUnique; /* complain if we find duplicate tuples */ 447 448 /* These are specific to the index_hash subcase: */ 449 uint32 high_mask; /* masks for sortable part of hash code */ 450 uint32 low_mask; 451 uint32 max_buckets; 452 453 /* 454 * These variables are specific to the Datum case; they are set by 455 * tuplesort_begin_datum and used only by the DatumTuple routines. 456 */ 457 Oid datumType; 458 /* we need typelen in order to know how to copy the Datums. */ 459 int datumTypeLen; 460 461 /* 462 * Resource snapshot for time of sort start. 463 */ 464 #ifdef TRACE_SORT 465 PGRUsage ru_start; 466 #endif 467 }; 468 469 /* 470 * Private mutable state of tuplesort-parallel-operation. This is allocated 471 * in shared memory. 472 */ 473 struct Sharedsort 474 { 475 /* mutex protects all fields prior to tapes */ 476 slock_t mutex; 477 478 /* 479 * currentWorker generates ordinal identifier numbers for parallel sort 480 * workers. These start from 0, and are always gapless. 481 * 482 * Workers increment workersFinished to indicate having finished. If this 483 * is equal to state.nParticipants within the leader, leader is ready to 484 * merge worker runs. 485 */ 486 int currentWorker; 487 int workersFinished; 488 489 /* Temporary file space */ 490 SharedFileSet fileset; 491 492 /* Size of tapes flexible array */ 493 int nTapes; 494 495 /* 496 * Tapes array used by workers to report back information needed by the 497 * leader to concatenate all worker tapes into one for merging 498 */ 499 TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]; 500 }; 501 502 /* 503 * Is the given tuple allocated from the slab memory arena? 504 */ 505 #define IS_SLAB_SLOT(state, tuple) \ 506 ((char *) (tuple) >= (state)->slabMemoryBegin && \ 507 (char *) (tuple) < (state)->slabMemoryEnd) 508 509 /* 510 * Return the given tuple to the slab memory free list, or free it 511 * if it was palloc'd. 512 */ 513 #define RELEASE_SLAB_SLOT(state, tuple) \ 514 do { \ 515 SlabSlot *buf = (SlabSlot *) tuple; \ 516 \ 517 if (IS_SLAB_SLOT((state), buf)) \ 518 { \ 519 buf->nextfree = (state)->slabFreeHead; \ 520 (state)->slabFreeHead = buf; \ 521 } else \ 522 pfree(buf); \ 523 } while(0) 524 525 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state)) 526 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) 527 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) 528 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) 529 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) 530 #define USEMEM(state,amt) ((state)->availMem -= (amt)) 531 #define FREEMEM(state,amt) ((state)->availMem += (amt)) 532 #define SERIAL(state) ((state)->shared == NULL) 533 #define WORKER(state) ((state)->shared && (state)->worker != -1) 534 #define LEADER(state) ((state)->shared && (state)->worker == -1) 535 536 /* 537 * NOTES about on-tape representation of tuples: 538 * 539 * We require the first "unsigned int" of a stored tuple to be the total size 540 * on-tape of the tuple, including itself (so it is never zero; an all-zero 541 * unsigned int is used to delimit runs). The remainder of the stored tuple 542 * may or may not match the in-memory representation of the tuple --- 543 * any conversion needed is the job of the writetup and readtup routines. 544 * 545 * If state->randomAccess is true, then the stored representation of the 546 * tuple must be followed by another "unsigned int" that is a copy of the 547 * length --- so the total tape space used is actually sizeof(unsigned int) 548 * more than the stored length value. This allows read-backwards. When 549 * randomAccess is not true, the write/read routines may omit the extra 550 * length word. 551 * 552 * writetup is expected to write both length words as well as the tuple 553 * data. When readtup is called, the tape is positioned just after the 554 * front length word; readtup must read the tuple data and advance past 555 * the back length word (if present). 556 * 557 * The write/read routines can make use of the tuple description data 558 * stored in the Tuplesortstate record, if needed. They are also expected 559 * to adjust state->availMem by the amount of memory space (not tape space!) 560 * released or consumed. There is no error return from either writetup 561 * or readtup; they should ereport() on failure. 562 * 563 * 564 * NOTES about memory consumption calculations: 565 * 566 * We count space allocated for tuples against the workMem limit, plus 567 * the space used by the variable-size memtuples array. Fixed-size space 568 * is not counted; it's small enough to not be interesting. 569 * 570 * Note that we count actual space used (as shown by GetMemoryChunkSpace) 571 * rather than the originally-requested size. This is important since 572 * palloc can add substantial overhead. It's not a complete answer since 573 * we won't count any wasted space in palloc allocation blocks, but it's 574 * a lot better than what we were doing before 7.3. As of 9.6, a 575 * separate memory context is used for caller passed tuples. Resetting 576 * it at certain key increments significantly ameliorates fragmentation. 577 * Note that this places a responsibility on readtup and copytup routines 578 * to use the right memory context for these tuples (and to not use the 579 * reset context for anything whose lifetime needs to span multiple 580 * external sort runs). 581 */ 582 583 /* When using this macro, beware of double evaluation of len */ 584 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \ 585 do { \ 586 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \ 587 elog(ERROR, "unexpected end of data"); \ 588 } while(0) 589 590 591 static Tuplesortstate *tuplesort_begin_common(int workMem, 592 SortCoordinate coordinate, 593 bool randomAccess); 594 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); 595 static bool consider_abort_common(Tuplesortstate *state); 596 static void inittapes(Tuplesortstate *state, bool mergeruns); 597 static void inittapestate(Tuplesortstate *state, int maxTapes); 598 static void selectnewtape(Tuplesortstate *state); 599 static void init_slab_allocator(Tuplesortstate *state, int numSlots); 600 static void mergeruns(Tuplesortstate *state); 601 static void mergeonerun(Tuplesortstate *state); 602 static void beginmerge(Tuplesortstate *state); 603 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); 604 static void dumptuples(Tuplesortstate *state, bool alltuples); 605 static void make_bounded_heap(Tuplesortstate *state); 606 static void sort_bounded_heap(Tuplesortstate *state); 607 static void tuplesort_sort_memtuples(Tuplesortstate *state); 608 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); 609 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); 610 static void tuplesort_heap_delete_top(Tuplesortstate *state); 611 static void reversedirection(Tuplesortstate *state); 612 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); 613 static void markrunend(Tuplesortstate *state, int tapenum); 614 static void *readtup_alloc(Tuplesortstate *state, Size tuplen); 615 static int comparetup_heap(const SortTuple *a, const SortTuple *b, 616 Tuplesortstate *state); 617 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); 618 static void writetup_heap(Tuplesortstate *state, int tapenum, 619 SortTuple *stup); 620 static void readtup_heap(Tuplesortstate *state, SortTuple *stup, 621 int tapenum, unsigned int len); 622 static int comparetup_cluster(const SortTuple *a, const SortTuple *b, 623 Tuplesortstate *state); 624 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); 625 static void writetup_cluster(Tuplesortstate *state, int tapenum, 626 SortTuple *stup); 627 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, 628 int tapenum, unsigned int len); 629 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, 630 Tuplesortstate *state); 631 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, 632 Tuplesortstate *state); 633 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); 634 static void writetup_index(Tuplesortstate *state, int tapenum, 635 SortTuple *stup); 636 static void readtup_index(Tuplesortstate *state, SortTuple *stup, 637 int tapenum, unsigned int len); 638 static int comparetup_datum(const SortTuple *a, const SortTuple *b, 639 Tuplesortstate *state); 640 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); 641 static void writetup_datum(Tuplesortstate *state, int tapenum, 642 SortTuple *stup); 643 static void readtup_datum(Tuplesortstate *state, SortTuple *stup, 644 int tapenum, unsigned int len); 645 static int worker_get_identifier(Tuplesortstate *state); 646 static void worker_freeze_result_tape(Tuplesortstate *state); 647 static void worker_nomergeruns(Tuplesortstate *state); 648 static void leader_takeover_tapes(Tuplesortstate *state); 649 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); 650 651 /* 652 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts 653 * any variant of SortTuples, using the appropriate comparetup function. 654 * qsort_ssup() is specialized for the case where the comparetup function 655 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts 656 * and Datum sorts. 657 */ 658 #include "qsort_tuple.c" 659 660 661 /* 662 * tuplesort_begin_xxx 663 * 664 * Initialize for a tuple sort operation. 665 * 666 * After calling tuplesort_begin, the caller should call tuplesort_putXXX 667 * zero or more times, then call tuplesort_performsort when all the tuples 668 * have been supplied. After performsort, retrieve the tuples in sorted 669 * order by calling tuplesort_getXXX until it returns false/NULL. (If random 670 * access was requested, rescan, markpos, and restorepos can also be called.) 671 * Call tuplesort_end to terminate the operation and release memory/disk space. 672 * 673 * Each variant of tuplesort_begin has a workMem parameter specifying the 674 * maximum number of kilobytes of RAM to use before spilling data to disk. 675 * (The normal value of this parameter is work_mem, but some callers use 676 * other values.) Each variant also has a randomAccess parameter specifying 677 * whether the caller needs non-sequential access to the sort result. 678 */ 679 680 static Tuplesortstate * 681 tuplesort_begin_common(int workMem, SortCoordinate coordinate, 682 bool randomAccess) 683 { 684 Tuplesortstate *state; 685 MemoryContext sortcontext; 686 MemoryContext tuplecontext; 687 MemoryContext oldcontext; 688 689 /* See leader_takeover_tapes() remarks on randomAccess support */ 690 if (coordinate && randomAccess) 691 elog(ERROR, "random access disallowed under parallel sort"); 692 693 /* 694 * Create a working memory context for this sort operation. All data 695 * needed by the sort will live inside this context. 696 */ 697 sortcontext = AllocSetContextCreate(CurrentMemoryContext, 698 "TupleSort main", 699 ALLOCSET_DEFAULT_SIZES); 700 701 /* 702 * Caller tuple (e.g. IndexTuple) memory context. 703 * 704 * A dedicated child context used exclusively for caller passed tuples 705 * eases memory management. Resetting at key points reduces 706 * fragmentation. Note that the memtuples array of SortTuples is allocated 707 * in the parent context, not this context, because there is no need to 708 * free memtuples early. 709 */ 710 tuplecontext = AllocSetContextCreate(sortcontext, 711 "Caller tuples", 712 ALLOCSET_DEFAULT_SIZES); 713 714 /* 715 * Make the Tuplesortstate within the per-sort context. This way, we 716 * don't need a separate pfree() operation for it at shutdown. 717 */ 718 oldcontext = MemoryContextSwitchTo(sortcontext); 719 720 state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate)); 721 722 #ifdef TRACE_SORT 723 if (trace_sort) 724 pg_rusage_init(&state->ru_start); 725 #endif 726 727 state->status = TSS_INITIAL; 728 state->randomAccess = randomAccess; 729 state->bounded = false; 730 state->tuples = true; 731 state->boundUsed = false; 732 733 /* 734 * workMem is forced to be at least 64KB, the current minimum valid value 735 * for the work_mem GUC. This is a defense against parallel sort callers 736 * that divide out memory among many workers in a way that leaves each 737 * with very little memory. 738 */ 739 state->allowedMem = Max(workMem, 64) * (int64) 1024; 740 state->availMem = state->allowedMem; 741 state->sortcontext = sortcontext; 742 state->tuplecontext = tuplecontext; 743 state->tapeset = NULL; 744 745 state->memtupcount = 0; 746 747 /* 748 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; 749 * see comments in grow_memtuples(). 750 */ 751 state->memtupsize = Max(1024, 752 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1); 753 754 state->growmemtuples = true; 755 state->slabAllocatorUsed = false; 756 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); 757 758 USEMEM(state, GetMemoryChunkSpace(state->memtuples)); 759 760 /* workMem must be large enough for the minimal memtuples array */ 761 if (LACKMEM(state)) 762 elog(ERROR, "insufficient memory allowed for sort"); 763 764 state->currentRun = 0; 765 766 /* 767 * maxTapes, tapeRange, and Algorithm D variables will be initialized by 768 * inittapes(), if needed 769 */ 770 771 state->result_tape = -1; /* flag that result tape has not been formed */ 772 773 /* 774 * Initialize parallel-related state based on coordination information 775 * from caller 776 */ 777 if (!coordinate) 778 { 779 /* Serial sort */ 780 state->shared = NULL; 781 state->worker = -1; 782 state->nParticipants = -1; 783 } 784 else if (coordinate->isWorker) 785 { 786 /* Parallel worker produces exactly one final run from all input */ 787 state->shared = coordinate->sharedsort; 788 state->worker = worker_get_identifier(state); 789 state->nParticipants = -1; 790 } 791 else 792 { 793 /* Parallel leader state only used for final merge */ 794 state->shared = coordinate->sharedsort; 795 state->worker = -1; 796 state->nParticipants = coordinate->nParticipants; 797 Assert(state->nParticipants >= 1); 798 } 799 800 MemoryContextSwitchTo(oldcontext); 801 802 return state; 803 } 804 805 Tuplesortstate * 806 tuplesort_begin_heap(TupleDesc tupDesc, 807 int nkeys, AttrNumber *attNums, 808 Oid *sortOperators, Oid *sortCollations, 809 bool *nullsFirstFlags, 810 int workMem, SortCoordinate coordinate, bool randomAccess) 811 { 812 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 813 randomAccess); 814 MemoryContext oldcontext; 815 int i; 816 817 oldcontext = MemoryContextSwitchTo(state->sortcontext); 818 819 AssertArg(nkeys > 0); 820 821 #ifdef TRACE_SORT 822 if (trace_sort) 823 elog(LOG, 824 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", 825 nkeys, workMem, randomAccess ? 't' : 'f'); 826 #endif 827 828 state->nKeys = nkeys; 829 830 TRACE_POSTGRESQL_SORT_START(HEAP_SORT, 831 false, /* no unique check */ 832 nkeys, 833 workMem, 834 randomAccess, 835 PARALLEL_SORT(state)); 836 837 state->comparetup = comparetup_heap; 838 state->copytup = copytup_heap; 839 state->writetup = writetup_heap; 840 state->readtup = readtup_heap; 841 842 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ 843 state->abbrevNext = 10; 844 845 /* Prepare SortSupport data for each column */ 846 state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData)); 847 848 for (i = 0; i < nkeys; i++) 849 { 850 SortSupport sortKey = state->sortKeys + i; 851 852 AssertArg(attNums[i] != 0); 853 AssertArg(sortOperators[i] != 0); 854 855 sortKey->ssup_cxt = CurrentMemoryContext; 856 sortKey->ssup_collation = sortCollations[i]; 857 sortKey->ssup_nulls_first = nullsFirstFlags[i]; 858 sortKey->ssup_attno = attNums[i]; 859 /* Convey if abbreviation optimization is applicable in principle */ 860 sortKey->abbreviate = (i == 0); 861 862 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); 863 } 864 865 /* 866 * The "onlyKey" optimization cannot be used with abbreviated keys, since 867 * tie-breaker comparisons may be required. Typically, the optimization 868 * is only of value to pass-by-value types anyway, whereas abbreviated 869 * keys are typically only of value to pass-by-reference types. 870 */ 871 if (nkeys == 1 && !state->sortKeys->abbrev_converter) 872 state->onlyKey = state->sortKeys; 873 874 MemoryContextSwitchTo(oldcontext); 875 876 return state; 877 } 878 879 Tuplesortstate * 880 tuplesort_begin_cluster(TupleDesc tupDesc, 881 Relation indexRel, 882 int workMem, 883 SortCoordinate coordinate, bool randomAccess) 884 { 885 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 886 randomAccess); 887 BTScanInsert indexScanKey; 888 MemoryContext oldcontext; 889 int i; 890 891 Assert(indexRel->rd_rel->relam == BTREE_AM_OID); 892 893 oldcontext = MemoryContextSwitchTo(state->sortcontext); 894 895 #ifdef TRACE_SORT 896 if (trace_sort) 897 elog(LOG, 898 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", 899 RelationGetNumberOfAttributes(indexRel), 900 workMem, randomAccess ? 't' : 'f'); 901 #endif 902 903 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); 904 905 TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT, 906 false, /* no unique check */ 907 state->nKeys, 908 workMem, 909 randomAccess, 910 PARALLEL_SORT(state)); 911 912 state->comparetup = comparetup_cluster; 913 state->copytup = copytup_cluster; 914 state->writetup = writetup_cluster; 915 state->readtup = readtup_cluster; 916 state->abbrevNext = 10; 917 918 state->indexInfo = BuildIndexInfo(indexRel); 919 920 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ 921 922 indexScanKey = _bt_mkscankey(indexRel, NULL); 923 924 if (state->indexInfo->ii_Expressions != NULL) 925 { 926 TupleTableSlot *slot; 927 ExprContext *econtext; 928 929 /* 930 * We will need to use FormIndexDatum to evaluate the index 931 * expressions. To do that, we need an EState, as well as a 932 * TupleTableSlot to put the table tuples into. The econtext's 933 * scantuple has to point to that slot, too. 934 */ 935 state->estate = CreateExecutorState(); 936 slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple); 937 econtext = GetPerTupleExprContext(state->estate); 938 econtext->ecxt_scantuple = slot; 939 } 940 941 /* Prepare SortSupport data for each column */ 942 state->sortKeys = (SortSupport) palloc0(state->nKeys * 943 sizeof(SortSupportData)); 944 945 for (i = 0; i < state->nKeys; i++) 946 { 947 SortSupport sortKey = state->sortKeys + i; 948 ScanKey scanKey = indexScanKey->scankeys + i; 949 int16 strategy; 950 951 sortKey->ssup_cxt = CurrentMemoryContext; 952 sortKey->ssup_collation = scanKey->sk_collation; 953 sortKey->ssup_nulls_first = 954 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; 955 sortKey->ssup_attno = scanKey->sk_attno; 956 /* Convey if abbreviation optimization is applicable in principle */ 957 sortKey->abbreviate = (i == 0); 958 959 AssertState(sortKey->ssup_attno != 0); 960 961 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? 962 BTGreaterStrategyNumber : BTLessStrategyNumber; 963 964 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); 965 } 966 967 pfree(indexScanKey); 968 969 MemoryContextSwitchTo(oldcontext); 970 971 return state; 972 } 973 974 Tuplesortstate * 975 tuplesort_begin_index_btree(Relation heapRel, 976 Relation indexRel, 977 bool enforceUnique, 978 int workMem, 979 SortCoordinate coordinate, 980 bool randomAccess) 981 { 982 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 983 randomAccess); 984 BTScanInsert indexScanKey; 985 MemoryContext oldcontext; 986 int i; 987 988 oldcontext = MemoryContextSwitchTo(state->sortcontext); 989 990 #ifdef TRACE_SORT 991 if (trace_sort) 992 elog(LOG, 993 "begin index sort: unique = %c, workMem = %d, randomAccess = %c", 994 enforceUnique ? 't' : 'f', 995 workMem, randomAccess ? 't' : 'f'); 996 #endif 997 998 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); 999 1000 TRACE_POSTGRESQL_SORT_START(INDEX_SORT, 1001 enforceUnique, 1002 state->nKeys, 1003 workMem, 1004 randomAccess, 1005 PARALLEL_SORT(state)); 1006 1007 state->comparetup = comparetup_index_btree; 1008 state->copytup = copytup_index; 1009 state->writetup = writetup_index; 1010 state->readtup = readtup_index; 1011 state->abbrevNext = 10; 1012 1013 state->heapRel = heapRel; 1014 state->indexRel = indexRel; 1015 state->enforceUnique = enforceUnique; 1016 1017 indexScanKey = _bt_mkscankey(indexRel, NULL); 1018 1019 /* Prepare SortSupport data for each column */ 1020 state->sortKeys = (SortSupport) palloc0(state->nKeys * 1021 sizeof(SortSupportData)); 1022 1023 for (i = 0; i < state->nKeys; i++) 1024 { 1025 SortSupport sortKey = state->sortKeys + i; 1026 ScanKey scanKey = indexScanKey->scankeys + i; 1027 int16 strategy; 1028 1029 sortKey->ssup_cxt = CurrentMemoryContext; 1030 sortKey->ssup_collation = scanKey->sk_collation; 1031 sortKey->ssup_nulls_first = 1032 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; 1033 sortKey->ssup_attno = scanKey->sk_attno; 1034 /* Convey if abbreviation optimization is applicable in principle */ 1035 sortKey->abbreviate = (i == 0); 1036 1037 AssertState(sortKey->ssup_attno != 0); 1038 1039 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? 1040 BTGreaterStrategyNumber : BTLessStrategyNumber; 1041 1042 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); 1043 } 1044 1045 pfree(indexScanKey); 1046 1047 MemoryContextSwitchTo(oldcontext); 1048 1049 return state; 1050 } 1051 1052 Tuplesortstate * 1053 tuplesort_begin_index_hash(Relation heapRel, 1054 Relation indexRel, 1055 uint32 high_mask, 1056 uint32 low_mask, 1057 uint32 max_buckets, 1058 int workMem, 1059 SortCoordinate coordinate, 1060 bool randomAccess) 1061 { 1062 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 1063 randomAccess); 1064 MemoryContext oldcontext; 1065 1066 oldcontext = MemoryContextSwitchTo(state->sortcontext); 1067 1068 #ifdef TRACE_SORT 1069 if (trace_sort) 1070 elog(LOG, 1071 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, " 1072 "max_buckets = 0x%x, workMem = %d, randomAccess = %c", 1073 high_mask, 1074 low_mask, 1075 max_buckets, 1076 workMem, randomAccess ? 't' : 'f'); 1077 #endif 1078 1079 state->nKeys = 1; /* Only one sort column, the hash code */ 1080 1081 state->comparetup = comparetup_index_hash; 1082 state->copytup = copytup_index; 1083 state->writetup = writetup_index; 1084 state->readtup = readtup_index; 1085 1086 state->heapRel = heapRel; 1087 state->indexRel = indexRel; 1088 1089 state->high_mask = high_mask; 1090 state->low_mask = low_mask; 1091 state->max_buckets = max_buckets; 1092 1093 MemoryContextSwitchTo(oldcontext); 1094 1095 return state; 1096 } 1097 1098 Tuplesortstate * 1099 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, 1100 bool nullsFirstFlag, int workMem, 1101 SortCoordinate coordinate, bool randomAccess) 1102 { 1103 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 1104 randomAccess); 1105 MemoryContext oldcontext; 1106 int16 typlen; 1107 bool typbyval; 1108 1109 oldcontext = MemoryContextSwitchTo(state->sortcontext); 1110 1111 #ifdef TRACE_SORT 1112 if (trace_sort) 1113 elog(LOG, 1114 "begin datum sort: workMem = %d, randomAccess = %c", 1115 workMem, randomAccess ? 't' : 'f'); 1116 #endif 1117 1118 state->nKeys = 1; /* always a one-column sort */ 1119 1120 TRACE_POSTGRESQL_SORT_START(DATUM_SORT, 1121 false, /* no unique check */ 1122 1, 1123 workMem, 1124 randomAccess, 1125 PARALLEL_SORT(state)); 1126 1127 state->comparetup = comparetup_datum; 1128 state->copytup = copytup_datum; 1129 state->writetup = writetup_datum; 1130 state->readtup = readtup_datum; 1131 state->abbrevNext = 10; 1132 1133 state->datumType = datumType; 1134 1135 /* lookup necessary attributes of the datum type */ 1136 get_typlenbyval(datumType, &typlen, &typbyval); 1137 state->datumTypeLen = typlen; 1138 state->tuples = !typbyval; 1139 1140 /* Prepare SortSupport data */ 1141 state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); 1142 1143 state->sortKeys->ssup_cxt = CurrentMemoryContext; 1144 state->sortKeys->ssup_collation = sortCollation; 1145 state->sortKeys->ssup_nulls_first = nullsFirstFlag; 1146 1147 /* 1148 * Abbreviation is possible here only for by-reference types. In theory, 1149 * a pass-by-value datatype could have an abbreviated form that is cheaper 1150 * to compare. In a tuple sort, we could support that, because we can 1151 * always extract the original datum from the tuple is needed. Here, we 1152 * can't, because a datum sort only stores a single copy of the datum; the 1153 * "tuple" field of each sortTuple is NULL. 1154 */ 1155 state->sortKeys->abbreviate = !typbyval; 1156 1157 PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys); 1158 1159 /* 1160 * The "onlyKey" optimization cannot be used with abbreviated keys, since 1161 * tie-breaker comparisons may be required. Typically, the optimization 1162 * is only of value to pass-by-value types anyway, whereas abbreviated 1163 * keys are typically only of value to pass-by-reference types. 1164 */ 1165 if (!state->sortKeys->abbrev_converter) 1166 state->onlyKey = state->sortKeys; 1167 1168 MemoryContextSwitchTo(oldcontext); 1169 1170 return state; 1171 } 1172 1173 /* 1174 * tuplesort_set_bound 1175 * 1176 * Advise tuplesort that at most the first N result tuples are required. 1177 * 1178 * Must be called before inserting any tuples. (Actually, we could allow it 1179 * as long as the sort hasn't spilled to disk, but there seems no need for 1180 * delayed calls at the moment.) 1181 * 1182 * This is a hint only. The tuplesort may still return more tuples than 1183 * requested. Parallel leader tuplesorts will always ignore the hint. 1184 */ 1185 void 1186 tuplesort_set_bound(Tuplesortstate *state, int64 bound) 1187 { 1188 /* Assert we're called before loading any tuples */ 1189 Assert(state->status == TSS_INITIAL); 1190 Assert(state->memtupcount == 0); 1191 Assert(!state->bounded); 1192 Assert(!WORKER(state)); 1193 1194 #ifdef DEBUG_BOUNDED_SORT 1195 /* Honor GUC setting that disables the feature (for easy testing) */ 1196 if (!optimize_bounded_sort) 1197 return; 1198 #endif 1199 1200 /* Parallel leader ignores hint */ 1201 if (LEADER(state)) 1202 return; 1203 1204 /* We want to be able to compute bound * 2, so limit the setting */ 1205 if (bound > (int64) (INT_MAX / 2)) 1206 return; 1207 1208 state->bounded = true; 1209 state->bound = (int) bound; 1210 1211 /* 1212 * Bounded sorts are not an effective target for abbreviated key 1213 * optimization. Disable by setting state to be consistent with no 1214 * abbreviation support. 1215 */ 1216 state->sortKeys->abbrev_converter = NULL; 1217 if (state->sortKeys->abbrev_full_comparator) 1218 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; 1219 1220 /* Not strictly necessary, but be tidy */ 1221 state->sortKeys->abbrev_abort = NULL; 1222 state->sortKeys->abbrev_full_comparator = NULL; 1223 } 1224 1225 /* 1226 * tuplesort_end 1227 * 1228 * Release resources and clean up. 1229 * 1230 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are 1231 * pointing to garbage. Be careful not to attempt to use or free such 1232 * pointers afterwards! 1233 */ 1234 void 1235 tuplesort_end(Tuplesortstate *state) 1236 { 1237 /* context swap probably not needed, but let's be safe */ 1238 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1239 1240 #ifdef TRACE_SORT 1241 long spaceUsed; 1242 1243 if (state->tapeset) 1244 spaceUsed = LogicalTapeSetBlocks(state->tapeset); 1245 else 1246 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; 1247 #endif 1248 1249 /* 1250 * Delete temporary "tape" files, if any. 1251 * 1252 * Note: want to include this in reported total cost of sort, hence need 1253 * for two #ifdef TRACE_SORT sections. 1254 */ 1255 if (state->tapeset) 1256 LogicalTapeSetClose(state->tapeset); 1257 1258 #ifdef TRACE_SORT 1259 if (trace_sort) 1260 { 1261 if (state->tapeset) 1262 elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s", 1263 SERIAL(state) ? "external sort" : "parallel external sort", 1264 state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); 1265 else 1266 elog(LOG, "%s of worker %d ended, %ld KB used: %s", 1267 SERIAL(state) ? "internal sort" : "unperformed parallel sort", 1268 state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); 1269 } 1270 1271 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed); 1272 #else 1273 1274 /* 1275 * If you disabled TRACE_SORT, you can still probe sort__done, but you 1276 * ain't getting space-used stats. 1277 */ 1278 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L); 1279 #endif 1280 1281 /* Free any execution state created for CLUSTER case */ 1282 if (state->estate != NULL) 1283 { 1284 ExprContext *econtext = GetPerTupleExprContext(state->estate); 1285 1286 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple); 1287 FreeExecutorState(state->estate); 1288 } 1289 1290 MemoryContextSwitchTo(oldcontext); 1291 1292 /* 1293 * Free the per-sort memory context, thereby releasing all working memory, 1294 * including the Tuplesortstate struct itself. 1295 */ 1296 MemoryContextDelete(state->sortcontext); 1297 } 1298 1299 /* 1300 * Grow the memtuples[] array, if possible within our memory constraint. We 1301 * must not exceed INT_MAX tuples in memory or the caller-provided memory 1302 * limit. Return true if we were able to enlarge the array, false if not. 1303 * 1304 * Normally, at each increment we double the size of the array. When doing 1305 * that would exceed a limit, we attempt one last, smaller increase (and then 1306 * clear the growmemtuples flag so we don't try any more). That allows us to 1307 * use memory as fully as permitted; sticking to the pure doubling rule could 1308 * result in almost half going unused. Because availMem moves around with 1309 * tuple addition/removal, we need some rule to prevent making repeated small 1310 * increases in memtupsize, which would just be useless thrashing. The 1311 * growmemtuples flag accomplishes that and also prevents useless 1312 * recalculations in this function. 1313 */ 1314 static bool 1315 grow_memtuples(Tuplesortstate *state) 1316 { 1317 int newmemtupsize; 1318 int memtupsize = state->memtupsize; 1319 int64 memNowUsed = state->allowedMem - state->availMem; 1320 1321 /* Forget it if we've already maxed out memtuples, per comment above */ 1322 if (!state->growmemtuples) 1323 return false; 1324 1325 /* Select new value of memtupsize */ 1326 if (memNowUsed <= state->availMem) 1327 { 1328 /* 1329 * We've used no more than half of allowedMem; double our usage, 1330 * clamping at INT_MAX tuples. 1331 */ 1332 if (memtupsize < INT_MAX / 2) 1333 newmemtupsize = memtupsize * 2; 1334 else 1335 { 1336 newmemtupsize = INT_MAX; 1337 state->growmemtuples = false; 1338 } 1339 } 1340 else 1341 { 1342 /* 1343 * This will be the last increment of memtupsize. Abandon doubling 1344 * strategy and instead increase as much as we safely can. 1345 * 1346 * To stay within allowedMem, we can't increase memtupsize by more 1347 * than availMem / sizeof(SortTuple) elements. In practice, we want 1348 * to increase it by considerably less, because we need to leave some 1349 * space for the tuples to which the new array slots will refer. We 1350 * assume the new tuples will be about the same size as the tuples 1351 * we've already seen, and thus we can extrapolate from the space 1352 * consumption so far to estimate an appropriate new size for the 1353 * memtuples array. The optimal value might be higher or lower than 1354 * this estimate, but it's hard to know that in advance. We again 1355 * clamp at INT_MAX tuples. 1356 * 1357 * This calculation is safe against enlarging the array so much that 1358 * LACKMEM becomes true, because the memory currently used includes 1359 * the present array; thus, there would be enough allowedMem for the 1360 * new array elements even if no other memory were currently used. 1361 * 1362 * We do the arithmetic in float8, because otherwise the product of 1363 * memtupsize and allowedMem could overflow. Any inaccuracy in the 1364 * result should be insignificant; but even if we computed a 1365 * completely insane result, the checks below will prevent anything 1366 * really bad from happening. 1367 */ 1368 double grow_ratio; 1369 1370 grow_ratio = (double) state->allowedMem / (double) memNowUsed; 1371 if (memtupsize * grow_ratio < INT_MAX) 1372 newmemtupsize = (int) (memtupsize * grow_ratio); 1373 else 1374 newmemtupsize = INT_MAX; 1375 1376 /* We won't make any further enlargement attempts */ 1377 state->growmemtuples = false; 1378 } 1379 1380 /* Must enlarge array by at least one element, else report failure */ 1381 if (newmemtupsize <= memtupsize) 1382 goto noalloc; 1383 1384 /* 1385 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp 1386 * to ensure our request won't be rejected. Note that we can easily 1387 * exhaust address space before facing this outcome. (This is presently 1388 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but 1389 * don't rely on that at this distance.) 1390 */ 1391 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple)) 1392 { 1393 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple)); 1394 state->growmemtuples = false; /* can't grow any more */ 1395 } 1396 1397 /* 1398 * We need to be sure that we do not cause LACKMEM to become true, else 1399 * the space management algorithm will go nuts. The code above should 1400 * never generate a dangerous request, but to be safe, check explicitly 1401 * that the array growth fits within availMem. (We could still cause 1402 * LACKMEM if the memory chunk overhead associated with the memtuples 1403 * array were to increase. That shouldn't happen because we chose the 1404 * initial array size large enough to ensure that palloc will be treating 1405 * both old and new arrays as separate chunks. But we'll check LACKMEM 1406 * explicitly below just in case.) 1407 */ 1408 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple))) 1409 goto noalloc; 1410 1411 /* OK, do it */ 1412 FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); 1413 state->memtupsize = newmemtupsize; 1414 state->memtuples = (SortTuple *) 1415 repalloc_huge(state->memtuples, 1416 state->memtupsize * sizeof(SortTuple)); 1417 USEMEM(state, GetMemoryChunkSpace(state->memtuples)); 1418 if (LACKMEM(state)) 1419 elog(ERROR, "unexpected out-of-memory situation in tuplesort"); 1420 return true; 1421 1422 noalloc: 1423 /* If for any reason we didn't realloc, shut off future attempts */ 1424 state->growmemtuples = false; 1425 return false; 1426 } 1427 1428 /* 1429 * Accept one tuple while collecting input data for sort. 1430 * 1431 * Note that the input data is always copied; the caller need not save it. 1432 */ 1433 void 1434 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot) 1435 { 1436 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1437 SortTuple stup; 1438 1439 /* 1440 * Copy the given tuple into memory we control, and decrease availMem. 1441 * Then call the common code. 1442 */ 1443 COPYTUP(state, &stup, (void *) slot); 1444 1445 puttuple_common(state, &stup); 1446 1447 MemoryContextSwitchTo(oldcontext); 1448 } 1449 1450 /* 1451 * Accept one tuple while collecting input data for sort. 1452 * 1453 * Note that the input data is always copied; the caller need not save it. 1454 */ 1455 void 1456 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup) 1457 { 1458 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1459 SortTuple stup; 1460 1461 /* 1462 * Copy the given tuple into memory we control, and decrease availMem. 1463 * Then call the common code. 1464 */ 1465 COPYTUP(state, &stup, (void *) tup); 1466 1467 puttuple_common(state, &stup); 1468 1469 MemoryContextSwitchTo(oldcontext); 1470 } 1471 1472 /* 1473 * Collect one index tuple while collecting input data for sort, building 1474 * it from caller-supplied values. 1475 */ 1476 void 1477 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, 1478 ItemPointer self, Datum *values, 1479 bool *isnull) 1480 { 1481 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 1482 SortTuple stup; 1483 Datum original; 1484 IndexTuple tuple; 1485 1486 stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull); 1487 tuple = ((IndexTuple) stup.tuple); 1488 tuple->t_tid = *self; 1489 USEMEM(state, GetMemoryChunkSpace(stup.tuple)); 1490 /* set up first-column key value */ 1491 original = index_getattr(tuple, 1492 1, 1493 RelationGetDescr(state->indexRel), 1494 &stup.isnull1); 1495 1496 MemoryContextSwitchTo(state->sortcontext); 1497 1498 if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) 1499 { 1500 /* 1501 * Store ordinary Datum representation, or NULL value. If there is a 1502 * converter it won't expect NULL values, and cost model is not 1503 * required to account for NULL, so in that case we avoid calling 1504 * converter and just set datum1 to zeroed representation (to be 1505 * consistent, and to support cheap inequality tests for NULL 1506 * abbreviated keys). 1507 */ 1508 stup.datum1 = original; 1509 } 1510 else if (!consider_abort_common(state)) 1511 { 1512 /* Store abbreviated key representation */ 1513 stup.datum1 = state->sortKeys->abbrev_converter(original, 1514 state->sortKeys); 1515 } 1516 else 1517 { 1518 /* Abort abbreviation */ 1519 int i; 1520 1521 stup.datum1 = original; 1522 1523 /* 1524 * Set state to be consistent with never trying abbreviation. 1525 * 1526 * Alter datum1 representation in already-copied tuples, so as to 1527 * ensure a consistent representation (current tuple was just 1528 * handled). It does not matter if some dumped tuples are already 1529 * sorted on tape, since serialized tuples lack abbreviated keys 1530 * (TSS_BUILDRUNS state prevents control reaching here in any case). 1531 */ 1532 for (i = 0; i < state->memtupcount; i++) 1533 { 1534 SortTuple *mtup = &state->memtuples[i]; 1535 1536 tuple = mtup->tuple; 1537 mtup->datum1 = index_getattr(tuple, 1538 1, 1539 RelationGetDescr(state->indexRel), 1540 &mtup->isnull1); 1541 } 1542 } 1543 1544 puttuple_common(state, &stup); 1545 1546 MemoryContextSwitchTo(oldcontext); 1547 } 1548 1549 /* 1550 * Accept one Datum while collecting input data for sort. 1551 * 1552 * If the Datum is pass-by-ref type, the value will be copied. 1553 */ 1554 void 1555 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) 1556 { 1557 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 1558 SortTuple stup; 1559 1560 /* 1561 * Pass-by-value types or null values are just stored directly in 1562 * stup.datum1 (and stup.tuple is not used and set to NULL). 1563 * 1564 * Non-null pass-by-reference values need to be copied into memory we 1565 * control, and possibly abbreviated. The copied value is pointed to by 1566 * stup.tuple and is treated as the canonical copy (e.g. to return via 1567 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the 1568 * abbreviated value if abbreviation is happening, otherwise it's 1569 * identical to stup.tuple. 1570 */ 1571 1572 if (isNull || !state->tuples) 1573 { 1574 /* 1575 * Set datum1 to zeroed representation for NULLs (to be consistent, 1576 * and to support cheap inequality tests for NULL abbreviated keys). 1577 */ 1578 stup.datum1 = !isNull ? val : (Datum) 0; 1579 stup.isnull1 = isNull; 1580 stup.tuple = NULL; /* no separate storage */ 1581 MemoryContextSwitchTo(state->sortcontext); 1582 } 1583 else 1584 { 1585 Datum original = datumCopy(val, false, state->datumTypeLen); 1586 1587 stup.isnull1 = false; 1588 stup.tuple = DatumGetPointer(original); 1589 USEMEM(state, GetMemoryChunkSpace(stup.tuple)); 1590 MemoryContextSwitchTo(state->sortcontext); 1591 1592 if (!state->sortKeys->abbrev_converter) 1593 { 1594 stup.datum1 = original; 1595 } 1596 else if (!consider_abort_common(state)) 1597 { 1598 /* Store abbreviated key representation */ 1599 stup.datum1 = state->sortKeys->abbrev_converter(original, 1600 state->sortKeys); 1601 } 1602 else 1603 { 1604 /* Abort abbreviation */ 1605 int i; 1606 1607 stup.datum1 = original; 1608 1609 /* 1610 * Set state to be consistent with never trying abbreviation. 1611 * 1612 * Alter datum1 representation in already-copied tuples, so as to 1613 * ensure a consistent representation (current tuple was just 1614 * handled). It does not matter if some dumped tuples are already 1615 * sorted on tape, since serialized tuples lack abbreviated keys 1616 * (TSS_BUILDRUNS state prevents control reaching here in any 1617 * case). 1618 */ 1619 for (i = 0; i < state->memtupcount; i++) 1620 { 1621 SortTuple *mtup = &state->memtuples[i]; 1622 1623 mtup->datum1 = PointerGetDatum(mtup->tuple); 1624 } 1625 } 1626 } 1627 1628 puttuple_common(state, &stup); 1629 1630 MemoryContextSwitchTo(oldcontext); 1631 } 1632 1633 /* 1634 * Shared code for tuple and datum cases. 1635 */ 1636 static void 1637 puttuple_common(Tuplesortstate *state, SortTuple *tuple) 1638 { 1639 Assert(!LEADER(state)); 1640 1641 switch (state->status) 1642 { 1643 case TSS_INITIAL: 1644 1645 /* 1646 * Save the tuple into the unsorted array. First, grow the array 1647 * as needed. Note that we try to grow the array when there is 1648 * still one free slot remaining --- if we fail, there'll still be 1649 * room to store the incoming tuple, and then we'll switch to 1650 * tape-based operation. 1651 */ 1652 if (state->memtupcount >= state->memtupsize - 1) 1653 { 1654 (void) grow_memtuples(state); 1655 Assert(state->memtupcount < state->memtupsize); 1656 } 1657 state->memtuples[state->memtupcount++] = *tuple; 1658 1659 /* 1660 * Check if it's time to switch over to a bounded heapsort. We do 1661 * so if the input tuple count exceeds twice the desired tuple 1662 * count (this is a heuristic for where heapsort becomes cheaper 1663 * than a quicksort), or if we've just filled workMem and have 1664 * enough tuples to meet the bound. 1665 * 1666 * Note that once we enter TSS_BOUNDED state we will always try to 1667 * complete the sort that way. In the worst case, if later input 1668 * tuples are larger than earlier ones, this might cause us to 1669 * exceed workMem significantly. 1670 */ 1671 if (state->bounded && 1672 (state->memtupcount > state->bound * 2 || 1673 (state->memtupcount > state->bound && LACKMEM(state)))) 1674 { 1675 #ifdef TRACE_SORT 1676 if (trace_sort) 1677 elog(LOG, "switching to bounded heapsort at %d tuples: %s", 1678 state->memtupcount, 1679 pg_rusage_show(&state->ru_start)); 1680 #endif 1681 make_bounded_heap(state); 1682 return; 1683 } 1684 1685 /* 1686 * Done if we still fit in available memory and have array slots. 1687 */ 1688 if (state->memtupcount < state->memtupsize && !LACKMEM(state)) 1689 return; 1690 1691 /* 1692 * Nope; time to switch to tape-based operation. 1693 */ 1694 inittapes(state, true); 1695 1696 /* 1697 * Dump all tuples. 1698 */ 1699 dumptuples(state, false); 1700 break; 1701 1702 case TSS_BOUNDED: 1703 1704 /* 1705 * We don't want to grow the array here, so check whether the new 1706 * tuple can be discarded before putting it in. This should be a 1707 * good speed optimization, too, since when there are many more 1708 * input tuples than the bound, most input tuples can be discarded 1709 * with just this one comparison. Note that because we currently 1710 * have the sort direction reversed, we must check for <= not >=. 1711 */ 1712 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0) 1713 { 1714 /* new tuple <= top of the heap, so we can discard it */ 1715 free_sort_tuple(state, tuple); 1716 CHECK_FOR_INTERRUPTS(); 1717 } 1718 else 1719 { 1720 /* discard top of heap, replacing it with the new tuple */ 1721 free_sort_tuple(state, &state->memtuples[0]); 1722 tuplesort_heap_replace_top(state, tuple); 1723 } 1724 break; 1725 1726 case TSS_BUILDRUNS: 1727 1728 /* 1729 * Save the tuple into the unsorted array (there must be space) 1730 */ 1731 state->memtuples[state->memtupcount++] = *tuple; 1732 1733 /* 1734 * If we are over the memory limit, dump all tuples. 1735 */ 1736 dumptuples(state, false); 1737 break; 1738 1739 default: 1740 elog(ERROR, "invalid tuplesort state"); 1741 break; 1742 } 1743 } 1744 1745 static bool 1746 consider_abort_common(Tuplesortstate *state) 1747 { 1748 Assert(state->sortKeys[0].abbrev_converter != NULL); 1749 Assert(state->sortKeys[0].abbrev_abort != NULL); 1750 Assert(state->sortKeys[0].abbrev_full_comparator != NULL); 1751 1752 /* 1753 * Check effectiveness of abbreviation optimization. Consider aborting 1754 * when still within memory limit. 1755 */ 1756 if (state->status == TSS_INITIAL && 1757 state->memtupcount >= state->abbrevNext) 1758 { 1759 state->abbrevNext *= 2; 1760 1761 /* 1762 * Check opclass-supplied abbreviation abort routine. It may indicate 1763 * that abbreviation should not proceed. 1764 */ 1765 if (!state->sortKeys->abbrev_abort(state->memtupcount, 1766 state->sortKeys)) 1767 return false; 1768 1769 /* 1770 * Finally, restore authoritative comparator, and indicate that 1771 * abbreviation is not in play by setting abbrev_converter to NULL 1772 */ 1773 state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator; 1774 state->sortKeys[0].abbrev_converter = NULL; 1775 /* Not strictly necessary, but be tidy */ 1776 state->sortKeys[0].abbrev_abort = NULL; 1777 state->sortKeys[0].abbrev_full_comparator = NULL; 1778 1779 /* Give up - expect original pass-by-value representation */ 1780 return true; 1781 } 1782 1783 return false; 1784 } 1785 1786 /* 1787 * All tuples have been provided; finish the sort. 1788 */ 1789 void 1790 tuplesort_performsort(Tuplesortstate *state) 1791 { 1792 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1793 1794 #ifdef TRACE_SORT 1795 if (trace_sort) 1796 elog(LOG, "performsort of worker %d starting: %s", 1797 state->worker, pg_rusage_show(&state->ru_start)); 1798 #endif 1799 1800 switch (state->status) 1801 { 1802 case TSS_INITIAL: 1803 1804 /* 1805 * We were able to accumulate all the tuples within the allowed 1806 * amount of memory, or leader to take over worker tapes 1807 */ 1808 if (SERIAL(state)) 1809 { 1810 /* Just qsort 'em and we're done */ 1811 tuplesort_sort_memtuples(state); 1812 state->status = TSS_SORTEDINMEM; 1813 } 1814 else if (WORKER(state)) 1815 { 1816 /* 1817 * Parallel workers must still dump out tuples to tape. No 1818 * merge is required to produce single output run, though. 1819 */ 1820 inittapes(state, false); 1821 dumptuples(state, true); 1822 worker_nomergeruns(state); 1823 state->status = TSS_SORTEDONTAPE; 1824 } 1825 else 1826 { 1827 /* 1828 * Leader will take over worker tapes and merge worker runs. 1829 * Note that mergeruns sets the correct state->status. 1830 */ 1831 leader_takeover_tapes(state); 1832 mergeruns(state); 1833 } 1834 state->current = 0; 1835 state->eof_reached = false; 1836 state->markpos_block = 0L; 1837 state->markpos_offset = 0; 1838 state->markpos_eof = false; 1839 break; 1840 1841 case TSS_BOUNDED: 1842 1843 /* 1844 * We were able to accumulate all the tuples required for output 1845 * in memory, using a heap to eliminate excess tuples. Now we 1846 * have to transform the heap to a properly-sorted array. 1847 */ 1848 sort_bounded_heap(state); 1849 state->current = 0; 1850 state->eof_reached = false; 1851 state->markpos_offset = 0; 1852 state->markpos_eof = false; 1853 state->status = TSS_SORTEDINMEM; 1854 break; 1855 1856 case TSS_BUILDRUNS: 1857 1858 /* 1859 * Finish tape-based sort. First, flush all tuples remaining in 1860 * memory out to tape; then merge until we have a single remaining 1861 * run (or, if !randomAccess and !WORKER(), one run per tape). 1862 * Note that mergeruns sets the correct state->status. 1863 */ 1864 dumptuples(state, true); 1865 mergeruns(state); 1866 state->eof_reached = false; 1867 state->markpos_block = 0L; 1868 state->markpos_offset = 0; 1869 state->markpos_eof = false; 1870 break; 1871 1872 default: 1873 elog(ERROR, "invalid tuplesort state"); 1874 break; 1875 } 1876 1877 #ifdef TRACE_SORT 1878 if (trace_sort) 1879 { 1880 if (state->status == TSS_FINALMERGE) 1881 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s", 1882 state->worker, state->activeTapes, 1883 pg_rusage_show(&state->ru_start)); 1884 else 1885 elog(LOG, "performsort of worker %d done: %s", 1886 state->worker, pg_rusage_show(&state->ru_start)); 1887 } 1888 #endif 1889 1890 MemoryContextSwitchTo(oldcontext); 1891 } 1892 1893 /* 1894 * Internal routine to fetch the next tuple in either forward or back 1895 * direction into *stup. Returns false if no more tuples. 1896 * Returned tuple belongs to tuplesort memory context, and must not be freed 1897 * by caller. Note that fetched tuple is stored in memory that may be 1898 * recycled by any future fetch. 1899 */ 1900 static bool 1901 tuplesort_gettuple_common(Tuplesortstate *state, bool forward, 1902 SortTuple *stup) 1903 { 1904 unsigned int tuplen; 1905 size_t nmoved; 1906 1907 Assert(!WORKER(state)); 1908 1909 switch (state->status) 1910 { 1911 case TSS_SORTEDINMEM: 1912 Assert(forward || state->randomAccess); 1913 Assert(!state->slabAllocatorUsed); 1914 if (forward) 1915 { 1916 if (state->current < state->memtupcount) 1917 { 1918 *stup = state->memtuples[state->current++]; 1919 return true; 1920 } 1921 state->eof_reached = true; 1922 1923 /* 1924 * Complain if caller tries to retrieve more tuples than 1925 * originally asked for in a bounded sort. This is because 1926 * returning EOF here might be the wrong thing. 1927 */ 1928 if (state->bounded && state->current >= state->bound) 1929 elog(ERROR, "retrieved too many tuples in a bounded sort"); 1930 1931 return false; 1932 } 1933 else 1934 { 1935 if (state->current <= 0) 1936 return false; 1937 1938 /* 1939 * if all tuples are fetched already then we return last 1940 * tuple, else - tuple before last returned. 1941 */ 1942 if (state->eof_reached) 1943 state->eof_reached = false; 1944 else 1945 { 1946 state->current--; /* last returned tuple */ 1947 if (state->current <= 0) 1948 return false; 1949 } 1950 *stup = state->memtuples[state->current - 1]; 1951 return true; 1952 } 1953 break; 1954 1955 case TSS_SORTEDONTAPE: 1956 Assert(forward || state->randomAccess); 1957 Assert(state->slabAllocatorUsed); 1958 1959 /* 1960 * The slot that held the tuple that we returned in previous 1961 * gettuple call can now be reused. 1962 */ 1963 if (state->lastReturnedTuple) 1964 { 1965 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); 1966 state->lastReturnedTuple = NULL; 1967 } 1968 1969 if (forward) 1970 { 1971 if (state->eof_reached) 1972 return false; 1973 1974 if ((tuplen = getlen(state, state->result_tape, true)) != 0) 1975 { 1976 READTUP(state, stup, state->result_tape, tuplen); 1977 1978 /* 1979 * Remember the tuple we return, so that we can recycle 1980 * its memory on next call. (This can be NULL, in the 1981 * !state->tuples case). 1982 */ 1983 state->lastReturnedTuple = stup->tuple; 1984 1985 return true; 1986 } 1987 else 1988 { 1989 state->eof_reached = true; 1990 return false; 1991 } 1992 } 1993 1994 /* 1995 * Backward. 1996 * 1997 * if all tuples are fetched already then we return last tuple, 1998 * else - tuple before last returned. 1999 */ 2000 if (state->eof_reached) 2001 { 2002 /* 2003 * Seek position is pointing just past the zero tuplen at the 2004 * end of file; back up to fetch last tuple's ending length 2005 * word. If seek fails we must have a completely empty file. 2006 */ 2007 nmoved = LogicalTapeBackspace(state->tapeset, 2008 state->result_tape, 2009 2 * sizeof(unsigned int)); 2010 if (nmoved == 0) 2011 return false; 2012 else if (nmoved != 2 * sizeof(unsigned int)) 2013 elog(ERROR, "unexpected tape position"); 2014 state->eof_reached = false; 2015 } 2016 else 2017 { 2018 /* 2019 * Back up and fetch previously-returned tuple's ending length 2020 * word. If seek fails, assume we are at start of file. 2021 */ 2022 nmoved = LogicalTapeBackspace(state->tapeset, 2023 state->result_tape, 2024 sizeof(unsigned int)); 2025 if (nmoved == 0) 2026 return false; 2027 else if (nmoved != sizeof(unsigned int)) 2028 elog(ERROR, "unexpected tape position"); 2029 tuplen = getlen(state, state->result_tape, false); 2030 2031 /* 2032 * Back up to get ending length word of tuple before it. 2033 */ 2034 nmoved = LogicalTapeBackspace(state->tapeset, 2035 state->result_tape, 2036 tuplen + 2 * sizeof(unsigned int)); 2037 if (nmoved == tuplen + sizeof(unsigned int)) 2038 { 2039 /* 2040 * We backed up over the previous tuple, but there was no 2041 * ending length word before it. That means that the prev 2042 * tuple is the first tuple in the file. It is now the 2043 * next to read in forward direction (not obviously right, 2044 * but that is what in-memory case does). 2045 */ 2046 return false; 2047 } 2048 else if (nmoved != tuplen + 2 * sizeof(unsigned int)) 2049 elog(ERROR, "bogus tuple length in backward scan"); 2050 } 2051 2052 tuplen = getlen(state, state->result_tape, false); 2053 2054 /* 2055 * Now we have the length of the prior tuple, back up and read it. 2056 * Note: READTUP expects we are positioned after the initial 2057 * length word of the tuple, so back up to that point. 2058 */ 2059 nmoved = LogicalTapeBackspace(state->tapeset, 2060 state->result_tape, 2061 tuplen); 2062 if (nmoved != tuplen) 2063 elog(ERROR, "bogus tuple length in backward scan"); 2064 READTUP(state, stup, state->result_tape, tuplen); 2065 2066 /* 2067 * Remember the tuple we return, so that we can recycle its memory 2068 * on next call. (This can be NULL, in the Datum case). 2069 */ 2070 state->lastReturnedTuple = stup->tuple; 2071 2072 return true; 2073 2074 case TSS_FINALMERGE: 2075 Assert(forward); 2076 /* We are managing memory ourselves, with the slab allocator. */ 2077 Assert(state->slabAllocatorUsed); 2078 2079 /* 2080 * The slab slot holding the tuple that we returned in previous 2081 * gettuple call can now be reused. 2082 */ 2083 if (state->lastReturnedTuple) 2084 { 2085 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); 2086 state->lastReturnedTuple = NULL; 2087 } 2088 2089 /* 2090 * This code should match the inner loop of mergeonerun(). 2091 */ 2092 if (state->memtupcount > 0) 2093 { 2094 int srcTape = state->memtuples[0].tupindex; 2095 SortTuple newtup; 2096 2097 *stup = state->memtuples[0]; 2098 2099 /* 2100 * Remember the tuple we return, so that we can recycle its 2101 * memory on next call. (This can be NULL, in the Datum case). 2102 */ 2103 state->lastReturnedTuple = stup->tuple; 2104 2105 /* 2106 * Pull next tuple from tape, and replace the returned tuple 2107 * at top of the heap with it. 2108 */ 2109 if (!mergereadnext(state, srcTape, &newtup)) 2110 { 2111 /* 2112 * If no more data, we've reached end of run on this tape. 2113 * Remove the top node from the heap. 2114 */ 2115 tuplesort_heap_delete_top(state); 2116 2117 /* 2118 * Rewind to free the read buffer. It'd go away at the 2119 * end of the sort anyway, but better to release the 2120 * memory early. 2121 */ 2122 LogicalTapeRewindForWrite(state->tapeset, srcTape); 2123 return true; 2124 } 2125 newtup.tupindex = srcTape; 2126 tuplesort_heap_replace_top(state, &newtup); 2127 return true; 2128 } 2129 return false; 2130 2131 default: 2132 elog(ERROR, "invalid tuplesort state"); 2133 return false; /* keep compiler quiet */ 2134 } 2135 } 2136 2137 /* 2138 * Fetch the next tuple in either forward or back direction. 2139 * If successful, put tuple in slot and return true; else, clear the slot 2140 * and return false. 2141 * 2142 * Caller may optionally be passed back abbreviated value (on true return 2143 * value) when abbreviation was used, which can be used to cheaply avoid 2144 * equality checks that might otherwise be required. Caller can safely make a 2145 * determination of "non-equal tuple" based on simple binary inequality. A 2146 * NULL value in leading attribute will set abbreviated value to zeroed 2147 * representation, which caller may rely on in abbreviated inequality check. 2148 * 2149 * If copy is true, the slot receives a tuple that's been copied into the 2150 * caller's memory context, so that it will stay valid regardless of future 2151 * manipulations of the tuplesort's state (up to and including deleting the 2152 * tuplesort). If copy is false, the slot will just receive a pointer to a 2153 * tuple held within the tuplesort, which is more efficient, but only safe for 2154 * callers that are prepared to have any subsequent manipulation of the 2155 * tuplesort's state invalidate slot contents. 2156 */ 2157 bool 2158 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, 2159 TupleTableSlot *slot, Datum *abbrev) 2160 { 2161 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2162 SortTuple stup; 2163 2164 if (!tuplesort_gettuple_common(state, forward, &stup)) 2165 stup.tuple = NULL; 2166 2167 MemoryContextSwitchTo(oldcontext); 2168 2169 if (stup.tuple) 2170 { 2171 /* Record abbreviated key for caller */ 2172 if (state->sortKeys->abbrev_converter && abbrev) 2173 *abbrev = stup.datum1; 2174 2175 if (copy) 2176 stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple); 2177 2178 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy); 2179 return true; 2180 } 2181 else 2182 { 2183 ExecClearTuple(slot); 2184 return false; 2185 } 2186 } 2187 2188 /* 2189 * Fetch the next tuple in either forward or back direction. 2190 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory 2191 * context, and must not be freed by caller. Caller may not rely on tuple 2192 * remaining valid after any further manipulation of tuplesort. 2193 */ 2194 HeapTuple 2195 tuplesort_getheaptuple(Tuplesortstate *state, bool forward) 2196 { 2197 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2198 SortTuple stup; 2199 2200 if (!tuplesort_gettuple_common(state, forward, &stup)) 2201 stup.tuple = NULL; 2202 2203 MemoryContextSwitchTo(oldcontext); 2204 2205 return stup.tuple; 2206 } 2207 2208 /* 2209 * Fetch the next index tuple in either forward or back direction. 2210 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory 2211 * context, and must not be freed by caller. Caller may not rely on tuple 2212 * remaining valid after any further manipulation of tuplesort. 2213 */ 2214 IndexTuple 2215 tuplesort_getindextuple(Tuplesortstate *state, bool forward) 2216 { 2217 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2218 SortTuple stup; 2219 2220 if (!tuplesort_gettuple_common(state, forward, &stup)) 2221 stup.tuple = NULL; 2222 2223 MemoryContextSwitchTo(oldcontext); 2224 2225 return (IndexTuple) stup.tuple; 2226 } 2227 2228 /* 2229 * Fetch the next Datum in either forward or back direction. 2230 * Returns false if no more datums. 2231 * 2232 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd 2233 * in caller's context, and is now owned by the caller (this differs from 2234 * similar routines for other types of tuplesorts). 2235 * 2236 * Caller may optionally be passed back abbreviated value (on true return 2237 * value) when abbreviation was used, which can be used to cheaply avoid 2238 * equality checks that might otherwise be required. Caller can safely make a 2239 * determination of "non-equal tuple" based on simple binary inequality. A 2240 * NULL value will have a zeroed abbreviated value representation, which caller 2241 * may rely on in abbreviated inequality check. 2242 */ 2243 bool 2244 tuplesort_getdatum(Tuplesortstate *state, bool forward, 2245 Datum *val, bool *isNull, Datum *abbrev) 2246 { 2247 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2248 SortTuple stup; 2249 2250 if (!tuplesort_gettuple_common(state, forward, &stup)) 2251 { 2252 MemoryContextSwitchTo(oldcontext); 2253 return false; 2254 } 2255 2256 /* Ensure we copy into caller's memory context */ 2257 MemoryContextSwitchTo(oldcontext); 2258 2259 /* Record abbreviated key for caller */ 2260 if (state->sortKeys->abbrev_converter && abbrev) 2261 *abbrev = stup.datum1; 2262 2263 if (stup.isnull1 || !state->tuples) 2264 { 2265 *val = stup.datum1; 2266 *isNull = stup.isnull1; 2267 } 2268 else 2269 { 2270 /* use stup.tuple because stup.datum1 may be an abbreviation */ 2271 *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen); 2272 *isNull = false; 2273 } 2274 2275 return true; 2276 } 2277 2278 /* 2279 * Advance over N tuples in either forward or back direction, 2280 * without returning any data. N==0 is a no-op. 2281 * Returns true if successful, false if ran out of tuples. 2282 */ 2283 bool 2284 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward) 2285 { 2286 MemoryContext oldcontext; 2287 2288 /* 2289 * We don't actually support backwards skip yet, because no callers need 2290 * it. The API is designed to allow for that later, though. 2291 */ 2292 Assert(forward); 2293 Assert(ntuples >= 0); 2294 Assert(!WORKER(state)); 2295 2296 switch (state->status) 2297 { 2298 case TSS_SORTEDINMEM: 2299 if (state->memtupcount - state->current >= ntuples) 2300 { 2301 state->current += ntuples; 2302 return true; 2303 } 2304 state->current = state->memtupcount; 2305 state->eof_reached = true; 2306 2307 /* 2308 * Complain if caller tries to retrieve more tuples than 2309 * originally asked for in a bounded sort. This is because 2310 * returning EOF here might be the wrong thing. 2311 */ 2312 if (state->bounded && state->current >= state->bound) 2313 elog(ERROR, "retrieved too many tuples in a bounded sort"); 2314 2315 return false; 2316 2317 case TSS_SORTEDONTAPE: 2318 case TSS_FINALMERGE: 2319 2320 /* 2321 * We could probably optimize these cases better, but for now it's 2322 * not worth the trouble. 2323 */ 2324 oldcontext = MemoryContextSwitchTo(state->sortcontext); 2325 while (ntuples-- > 0) 2326 { 2327 SortTuple stup; 2328 2329 if (!tuplesort_gettuple_common(state, forward, &stup)) 2330 { 2331 MemoryContextSwitchTo(oldcontext); 2332 return false; 2333 } 2334 CHECK_FOR_INTERRUPTS(); 2335 } 2336 MemoryContextSwitchTo(oldcontext); 2337 return true; 2338 2339 default: 2340 elog(ERROR, "invalid tuplesort state"); 2341 return false; /* keep compiler quiet */ 2342 } 2343 } 2344 2345 /* 2346 * tuplesort_merge_order - report merge order we'll use for given memory 2347 * (note: "merge order" just means the number of input tapes in the merge). 2348 * 2349 * This is exported for use by the planner. allowedMem is in bytes. 2350 */ 2351 int 2352 tuplesort_merge_order(int64 allowedMem) 2353 { 2354 int mOrder; 2355 2356 /* 2357 * We need one tape for each merge input, plus another one for the output, 2358 * and each of these tapes needs buffer space. In addition we want 2359 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't 2360 * count). 2361 * 2362 * Note: you might be thinking we need to account for the memtuples[] 2363 * array in this calculation, but we effectively treat that as part of the 2364 * MERGE_BUFFER_SIZE workspace. 2365 */ 2366 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / 2367 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); 2368 2369 /* 2370 * Even in minimum memory, use at least a MINORDER merge. On the other 2371 * hand, even when we have lots of memory, do not use more than a MAXORDER 2372 * merge. Tapes are pretty cheap, but they're not entirely free. Each 2373 * additional tape reduces the amount of memory available to build runs, 2374 * which in turn can cause the same sort to need more runs, which makes 2375 * merging slower even if it can still be done in a single pass. Also, 2376 * high order merges are quite slow due to CPU cache effects; it can be 2377 * faster to pay the I/O cost of a polyphase merge than to perform a 2378 * single merge pass across many hundreds of tapes. 2379 */ 2380 mOrder = Max(mOrder, MINORDER); 2381 mOrder = Min(mOrder, MAXORDER); 2382 2383 return mOrder; 2384 } 2385 2386 /* 2387 * inittapes - initialize for tape sorting. 2388 * 2389 * This is called only if we have found we won't sort in memory. 2390 */ 2391 static void 2392 inittapes(Tuplesortstate *state, bool mergeruns) 2393 { 2394 int maxTapes, 2395 j; 2396 2397 Assert(!LEADER(state)); 2398 2399 if (mergeruns) 2400 { 2401 /* Compute number of tapes to use: merge order plus 1 */ 2402 maxTapes = tuplesort_merge_order(state->allowedMem) + 1; 2403 } 2404 else 2405 { 2406 /* Workers can sometimes produce single run, output without merge */ 2407 Assert(WORKER(state)); 2408 maxTapes = MINORDER + 1; 2409 } 2410 2411 #ifdef TRACE_SORT 2412 if (trace_sort) 2413 elog(LOG, "worker %d switching to external sort with %d tapes: %s", 2414 state->worker, maxTapes, pg_rusage_show(&state->ru_start)); 2415 #endif 2416 2417 /* Create the tape set and allocate the per-tape data arrays */ 2418 inittapestate(state, maxTapes); 2419 state->tapeset = 2420 LogicalTapeSetCreate(maxTapes, NULL, 2421 state->shared ? &state->shared->fileset : NULL, 2422 state->worker); 2423 2424 state->currentRun = 0; 2425 2426 /* 2427 * Initialize variables of Algorithm D (step D1). 2428 */ 2429 for (j = 0; j < maxTapes; j++) 2430 { 2431 state->tp_fib[j] = 1; 2432 state->tp_runs[j] = 0; 2433 state->tp_dummy[j] = 1; 2434 state->tp_tapenum[j] = j; 2435 } 2436 state->tp_fib[state->tapeRange] = 0; 2437 state->tp_dummy[state->tapeRange] = 0; 2438 2439 state->Level = 1; 2440 state->destTape = 0; 2441 2442 state->status = TSS_BUILDRUNS; 2443 } 2444 2445 /* 2446 * inittapestate - initialize generic tape management state 2447 */ 2448 static void 2449 inittapestate(Tuplesortstate *state, int maxTapes) 2450 { 2451 int64 tapeSpace; 2452 2453 /* 2454 * Decrease availMem to reflect the space needed for tape buffers; but 2455 * don't decrease it to the point that we have no room for tuples. (That 2456 * case is only likely to occur if sorting pass-by-value Datums; in all 2457 * other scenarios the memtuples[] array is unlikely to occupy more than 2458 * half of allowedMem. In the pass-by-value case it's not important to 2459 * account for tuple space, so we don't care if LACKMEM becomes 2460 * inaccurate.) 2461 */ 2462 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; 2463 2464 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) 2465 USEMEM(state, tapeSpace); 2466 2467 /* 2468 * Make sure that the temp file(s) underlying the tape set are created in 2469 * suitable temp tablespaces. For parallel sorts, this should have been 2470 * called already, but it doesn't matter if it is called a second time. 2471 */ 2472 PrepareTempTablespaces(); 2473 2474 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); 2475 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); 2476 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); 2477 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); 2478 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); 2479 2480 /* Record # of tapes allocated (for duration of sort) */ 2481 state->maxTapes = maxTapes; 2482 /* Record maximum # of tapes usable as inputs when merging */ 2483 state->tapeRange = maxTapes - 1; 2484 } 2485 2486 /* 2487 * selectnewtape -- select new tape for new initial run. 2488 * 2489 * This is called after finishing a run when we know another run 2490 * must be started. This implements steps D3, D4 of Algorithm D. 2491 */ 2492 static void 2493 selectnewtape(Tuplesortstate *state) 2494 { 2495 int j; 2496 int a; 2497 2498 /* Step D3: advance j (destTape) */ 2499 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) 2500 { 2501 state->destTape++; 2502 return; 2503 } 2504 if (state->tp_dummy[state->destTape] != 0) 2505 { 2506 state->destTape = 0; 2507 return; 2508 } 2509 2510 /* Step D4: increase level */ 2511 state->Level++; 2512 a = state->tp_fib[0]; 2513 for (j = 0; j < state->tapeRange; j++) 2514 { 2515 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; 2516 state->tp_fib[j] = a + state->tp_fib[j + 1]; 2517 } 2518 state->destTape = 0; 2519 } 2520 2521 /* 2522 * Initialize the slab allocation arena, for the given number of slots. 2523 */ 2524 static void 2525 init_slab_allocator(Tuplesortstate *state, int numSlots) 2526 { 2527 if (numSlots > 0) 2528 { 2529 char *p; 2530 int i; 2531 2532 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE); 2533 state->slabMemoryEnd = state->slabMemoryBegin + 2534 numSlots * SLAB_SLOT_SIZE; 2535 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; 2536 USEMEM(state, numSlots * SLAB_SLOT_SIZE); 2537 2538 p = state->slabMemoryBegin; 2539 for (i = 0; i < numSlots - 1; i++) 2540 { 2541 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); 2542 p += SLAB_SLOT_SIZE; 2543 } 2544 ((SlabSlot *) p)->nextfree = NULL; 2545 } 2546 else 2547 { 2548 state->slabMemoryBegin = state->slabMemoryEnd = NULL; 2549 state->slabFreeHead = NULL; 2550 } 2551 state->slabAllocatorUsed = true; 2552 } 2553 2554 /* 2555 * mergeruns -- merge all the completed initial runs. 2556 * 2557 * This implements steps D5, D6 of Algorithm D. All input data has 2558 * already been written to initial runs on tape (see dumptuples). 2559 */ 2560 static void 2561 mergeruns(Tuplesortstate *state) 2562 { 2563 int tapenum, 2564 svTape, 2565 svRuns, 2566 svDummy; 2567 int numTapes; 2568 int numInputTapes; 2569 2570 Assert(state->status == TSS_BUILDRUNS); 2571 Assert(state->memtupcount == 0); 2572 2573 if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL) 2574 { 2575 /* 2576 * If there are multiple runs to be merged, when we go to read back 2577 * tuples from disk, abbreviated keys will not have been stored, and 2578 * we don't care to regenerate them. Disable abbreviation from this 2579 * point on. 2580 */ 2581 state->sortKeys->abbrev_converter = NULL; 2582 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; 2583 2584 /* Not strictly necessary, but be tidy */ 2585 state->sortKeys->abbrev_abort = NULL; 2586 state->sortKeys->abbrev_full_comparator = NULL; 2587 } 2588 2589 /* 2590 * Reset tuple memory. We've freed all the tuples that we previously 2591 * allocated. We will use the slab allocator from now on. 2592 */ 2593 MemoryContextDelete(state->tuplecontext); 2594 state->tuplecontext = NULL; 2595 2596 /* 2597 * We no longer need a large memtuples array. (We will allocate a smaller 2598 * one for the heap later.) 2599 */ 2600 FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); 2601 pfree(state->memtuples); 2602 state->memtuples = NULL; 2603 2604 /* 2605 * If we had fewer runs than tapes, refund the memory that we imagined we 2606 * would need for the tape buffers of the unused tapes. 2607 * 2608 * numTapes and numInputTapes reflect the actual number of tapes we will 2609 * use. Note that the output tape's tape number is maxTapes - 1, so the 2610 * tape numbers of the used tapes are not consecutive, and you cannot just 2611 * loop from 0 to numTapes to visit all used tapes! 2612 */ 2613 if (state->Level == 1) 2614 { 2615 numInputTapes = state->currentRun; 2616 numTapes = numInputTapes + 1; 2617 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); 2618 } 2619 else 2620 { 2621 numInputTapes = state->tapeRange; 2622 numTapes = state->maxTapes; 2623 } 2624 2625 /* 2626 * Initialize the slab allocator. We need one slab slot per input tape, 2627 * for the tuples in the heap, plus one to hold the tuple last returned 2628 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, 2629 * however, we don't need to do allocate anything.) 2630 * 2631 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism 2632 * to track memory usage of individual tuples. 2633 */ 2634 if (state->tuples) 2635 init_slab_allocator(state, numInputTapes + 1); 2636 else 2637 init_slab_allocator(state, 0); 2638 2639 /* 2640 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple 2641 * from each input tape. 2642 */ 2643 state->memtupsize = numInputTapes; 2644 state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple)); 2645 USEMEM(state, GetMemoryChunkSpace(state->memtuples)); 2646 2647 /* 2648 * Use all the remaining memory we have available for read buffers among 2649 * the input tapes. 2650 * 2651 * We don't try to "rebalance" the memory among tapes, when we start a new 2652 * merge phase, even if some tapes are inactive in the new phase. That 2653 * would be hard, because logtape.c doesn't know where one run ends and 2654 * another begins. When a new merge phase begins, and a tape doesn't 2655 * participate in it, its buffer nevertheless already contains tuples from 2656 * the next run on same tape, so we cannot release the buffer. That's OK 2657 * in practice, merge performance isn't that sensitive to the amount of 2658 * buffers used, and most merge phases use all or almost all tapes, 2659 * anyway. 2660 */ 2661 #ifdef TRACE_SORT 2662 if (trace_sort) 2663 elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", 2664 state->worker, state->availMem / 1024, numInputTapes); 2665 #endif 2666 2667 state->read_buffer_size = Max(state->availMem / numInputTapes, 0); 2668 USEMEM(state, state->read_buffer_size * numInputTapes); 2669 2670 /* End of step D2: rewind all output tapes to prepare for merging */ 2671 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2672 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); 2673 2674 for (;;) 2675 { 2676 /* 2677 * At this point we know that tape[T] is empty. If there's just one 2678 * (real or dummy) run left on each input tape, then only one merge 2679 * pass remains. If we don't have to produce a materialized sorted 2680 * tape, we can stop at this point and do the final merge on-the-fly. 2681 */ 2682 if (!state->randomAccess && !WORKER(state)) 2683 { 2684 bool allOneRun = true; 2685 2686 Assert(state->tp_runs[state->tapeRange] == 0); 2687 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2688 { 2689 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) 2690 { 2691 allOneRun = false; 2692 break; 2693 } 2694 } 2695 if (allOneRun) 2696 { 2697 /* Tell logtape.c we won't be writing anymore */ 2698 LogicalTapeSetForgetFreeSpace(state->tapeset); 2699 /* Initialize for the final merge pass */ 2700 beginmerge(state); 2701 state->status = TSS_FINALMERGE; 2702 return; 2703 } 2704 } 2705 2706 /* Step D5: merge runs onto tape[T] until tape[P] is empty */ 2707 while (state->tp_runs[state->tapeRange - 1] || 2708 state->tp_dummy[state->tapeRange - 1]) 2709 { 2710 bool allDummy = true; 2711 2712 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2713 { 2714 if (state->tp_dummy[tapenum] == 0) 2715 { 2716 allDummy = false; 2717 break; 2718 } 2719 } 2720 2721 if (allDummy) 2722 { 2723 state->tp_dummy[state->tapeRange]++; 2724 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2725 state->tp_dummy[tapenum]--; 2726 } 2727 else 2728 mergeonerun(state); 2729 } 2730 2731 /* Step D6: decrease level */ 2732 if (--state->Level == 0) 2733 break; 2734 /* rewind output tape T to use as new input */ 2735 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], 2736 state->read_buffer_size); 2737 /* rewind used-up input tape P, and prepare it for write pass */ 2738 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); 2739 state->tp_runs[state->tapeRange - 1] = 0; 2740 2741 /* 2742 * reassign tape units per step D6; note we no longer care about A[] 2743 */ 2744 svTape = state->tp_tapenum[state->tapeRange]; 2745 svDummy = state->tp_dummy[state->tapeRange]; 2746 svRuns = state->tp_runs[state->tapeRange]; 2747 for (tapenum = state->tapeRange; tapenum > 0; tapenum--) 2748 { 2749 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; 2750 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; 2751 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; 2752 } 2753 state->tp_tapenum[0] = svTape; 2754 state->tp_dummy[0] = svDummy; 2755 state->tp_runs[0] = svRuns; 2756 } 2757 2758 /* 2759 * Done. Knuth says that the result is on TAPE[1], but since we exited 2760 * the loop without performing the last iteration of step D6, we have not 2761 * rearranged the tape unit assignment, and therefore the result is on 2762 * TAPE[T]. We need to do it this way so that we can freeze the final 2763 * output tape while rewinding it. The last iteration of step D6 would be 2764 * a waste of cycles anyway... 2765 */ 2766 state->result_tape = state->tp_tapenum[state->tapeRange]; 2767 if (!WORKER(state)) 2768 LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); 2769 else 2770 worker_freeze_result_tape(state); 2771 state->status = TSS_SORTEDONTAPE; 2772 2773 /* Release the read buffers of all the other tapes, by rewinding them. */ 2774 for (tapenum = 0; tapenum < state->maxTapes; tapenum++) 2775 { 2776 if (tapenum != state->result_tape) 2777 LogicalTapeRewindForWrite(state->tapeset, tapenum); 2778 } 2779 } 2780 2781 /* 2782 * Merge one run from each input tape, except ones with dummy runs. 2783 * 2784 * This is the inner loop of Algorithm D step D5. We know that the 2785 * output tape is TAPE[T]. 2786 */ 2787 static void 2788 mergeonerun(Tuplesortstate *state) 2789 { 2790 int destTape = state->tp_tapenum[state->tapeRange]; 2791 int srcTape; 2792 2793 /* 2794 * Start the merge by loading one tuple from each active source tape into 2795 * the heap. We can also decrease the input run/dummy run counts. 2796 */ 2797 beginmerge(state); 2798 2799 /* 2800 * Execute merge by repeatedly extracting lowest tuple in heap, writing it 2801 * out, and replacing it with next tuple from same tape (if there is 2802 * another one). 2803 */ 2804 while (state->memtupcount > 0) 2805 { 2806 SortTuple stup; 2807 2808 /* write the tuple to destTape */ 2809 srcTape = state->memtuples[0].tupindex; 2810 WRITETUP(state, destTape, &state->memtuples[0]); 2811 2812 /* recycle the slot of the tuple we just wrote out, for the next read */ 2813 if (state->memtuples[0].tuple) 2814 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); 2815 2816 /* 2817 * pull next tuple from the tape, and replace the written-out tuple in 2818 * the heap with it. 2819 */ 2820 if (mergereadnext(state, srcTape, &stup)) 2821 { 2822 stup.tupindex = srcTape; 2823 tuplesort_heap_replace_top(state, &stup); 2824 2825 } 2826 else 2827 tuplesort_heap_delete_top(state); 2828 } 2829 2830 /* 2831 * When the heap empties, we're done. Write an end-of-run marker on the 2832 * output tape, and increment its count of real runs. 2833 */ 2834 markrunend(state, destTape); 2835 state->tp_runs[state->tapeRange]++; 2836 2837 #ifdef TRACE_SORT 2838 if (trace_sort) 2839 elog(LOG, "worker %d finished %d-way merge step: %s", state->worker, 2840 state->activeTapes, pg_rusage_show(&state->ru_start)); 2841 #endif 2842 } 2843 2844 /* 2845 * beginmerge - initialize for a merge pass 2846 * 2847 * We decrease the counts of real and dummy runs for each tape, and mark 2848 * which tapes contain active input runs in mergeactive[]. Then, fill the 2849 * merge heap with the first tuple from each active tape. 2850 */ 2851 static void 2852 beginmerge(Tuplesortstate *state) 2853 { 2854 int activeTapes; 2855 int tapenum; 2856 int srcTape; 2857 2858 /* Heap should be empty here */ 2859 Assert(state->memtupcount == 0); 2860 2861 /* Adjust run counts and mark the active tapes */ 2862 memset(state->mergeactive, 0, 2863 state->maxTapes * sizeof(*state->mergeactive)); 2864 activeTapes = 0; 2865 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2866 { 2867 if (state->tp_dummy[tapenum] > 0) 2868 state->tp_dummy[tapenum]--; 2869 else 2870 { 2871 Assert(state->tp_runs[tapenum] > 0); 2872 state->tp_runs[tapenum]--; 2873 srcTape = state->tp_tapenum[tapenum]; 2874 state->mergeactive[srcTape] = true; 2875 activeTapes++; 2876 } 2877 } 2878 Assert(activeTapes > 0); 2879 state->activeTapes = activeTapes; 2880 2881 /* Load the merge heap with the first tuple from each input tape */ 2882 for (srcTape = 0; srcTape < state->maxTapes; srcTape++) 2883 { 2884 SortTuple tup; 2885 2886 if (mergereadnext(state, srcTape, &tup)) 2887 { 2888 tup.tupindex = srcTape; 2889 tuplesort_heap_insert(state, &tup); 2890 } 2891 } 2892 } 2893 2894 /* 2895 * mergereadnext - read next tuple from one merge input tape 2896 * 2897 * Returns false on EOF. 2898 */ 2899 static bool 2900 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) 2901 { 2902 unsigned int tuplen; 2903 2904 if (!state->mergeactive[srcTape]) 2905 return false; /* tape's run is already exhausted */ 2906 2907 /* read next tuple, if any */ 2908 if ((tuplen = getlen(state, srcTape, true)) == 0) 2909 { 2910 state->mergeactive[srcTape] = false; 2911 return false; 2912 } 2913 READTUP(state, stup, srcTape, tuplen); 2914 2915 return true; 2916 } 2917 2918 /* 2919 * dumptuples - remove tuples from memtuples and write initial run to tape 2920 * 2921 * When alltuples = true, dump everything currently in memory. (This case is 2922 * only used at end of input data.) 2923 */ 2924 static void 2925 dumptuples(Tuplesortstate *state, bool alltuples) 2926 { 2927 int memtupwrite; 2928 int i; 2929 2930 /* 2931 * Nothing to do if we still fit in available memory and have array slots, 2932 * unless this is the final call during initial run generation. 2933 */ 2934 if (state->memtupcount < state->memtupsize && !LACKMEM(state) && 2935 !alltuples) 2936 return; 2937 2938 /* 2939 * Final call might require no sorting, in rare cases where we just so 2940 * happen to have previously LACKMEM()'d at the point where exactly all 2941 * remaining tuples are loaded into memory, just before input was 2942 * exhausted. 2943 * 2944 * In general, short final runs are quite possible. Rather than allowing 2945 * a special case where there was a superfluous selectnewtape() call (i.e. 2946 * a call with no subsequent run actually written to destTape), we prefer 2947 * to write out a 0 tuple run. 2948 * 2949 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark 2950 * the tape inactive for the merge when called from beginmerge(). This 2951 * case is therefore similar to the case where mergeonerun() finds a dummy 2952 * run for the tape, and so doesn't need to merge a run from the tape (or 2953 * conceptually "merges" the dummy run, if you prefer). According to 2954 * Knuth, Algorithm D "isn't strictly optimal" in its method of 2955 * distribution and dummy run assignment; this edge case seems very 2956 * unlikely to make that appreciably worse. 2957 */ 2958 Assert(state->status == TSS_BUILDRUNS); 2959 2960 /* 2961 * It seems unlikely that this limit will ever be exceeded, but take no 2962 * chances 2963 */ 2964 if (state->currentRun == INT_MAX) 2965 ereport(ERROR, 2966 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 2967 errmsg("cannot have more than %d runs for an external sort", 2968 INT_MAX))); 2969 2970 state->currentRun++; 2971 2972 #ifdef TRACE_SORT 2973 if (trace_sort) 2974 elog(LOG, "worker %d starting quicksort of run %d: %s", 2975 state->worker, state->currentRun, 2976 pg_rusage_show(&state->ru_start)); 2977 #endif 2978 2979 /* 2980 * Sort all tuples accumulated within the allowed amount of memory for 2981 * this run using quicksort 2982 */ 2983 tuplesort_sort_memtuples(state); 2984 2985 #ifdef TRACE_SORT 2986 if (trace_sort) 2987 elog(LOG, "worker %d finished quicksort of run %d: %s", 2988 state->worker, state->currentRun, 2989 pg_rusage_show(&state->ru_start)); 2990 #endif 2991 2992 memtupwrite = state->memtupcount; 2993 for (i = 0; i < memtupwrite; i++) 2994 { 2995 WRITETUP(state, state->tp_tapenum[state->destTape], 2996 &state->memtuples[i]); 2997 state->memtupcount--; 2998 } 2999 3000 /* 3001 * Reset tuple memory. We've freed all of the tuples that we previously 3002 * allocated. It's important to avoid fragmentation when there is a stark 3003 * change in the sizes of incoming tuples. Fragmentation due to 3004 * AllocSetFree's bucketing by size class might be particularly bad if 3005 * this step wasn't taken. 3006 */ 3007 MemoryContextReset(state->tuplecontext); 3008 3009 markrunend(state, state->tp_tapenum[state->destTape]); 3010 state->tp_runs[state->destTape]++; 3011 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ 3012 3013 #ifdef TRACE_SORT 3014 if (trace_sort) 3015 elog(LOG, "worker %d finished writing run %d to tape %d: %s", 3016 state->worker, state->currentRun, state->destTape, 3017 pg_rusage_show(&state->ru_start)); 3018 #endif 3019 3020 if (!alltuples) 3021 selectnewtape(state); 3022 } 3023 3024 /* 3025 * tuplesort_rescan - rewind and replay the scan 3026 */ 3027 void 3028 tuplesort_rescan(Tuplesortstate *state) 3029 { 3030 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 3031 3032 Assert(state->randomAccess); 3033 3034 switch (state->status) 3035 { 3036 case TSS_SORTEDINMEM: 3037 state->current = 0; 3038 state->eof_reached = false; 3039 state->markpos_offset = 0; 3040 state->markpos_eof = false; 3041 break; 3042 case TSS_SORTEDONTAPE: 3043 LogicalTapeRewindForRead(state->tapeset, 3044 state->result_tape, 3045 0); 3046 state->eof_reached = false; 3047 state->markpos_block = 0L; 3048 state->markpos_offset = 0; 3049 state->markpos_eof = false; 3050 break; 3051 default: 3052 elog(ERROR, "invalid tuplesort state"); 3053 break; 3054 } 3055 3056 MemoryContextSwitchTo(oldcontext); 3057 } 3058 3059 /* 3060 * tuplesort_markpos - saves current position in the merged sort file 3061 */ 3062 void 3063 tuplesort_markpos(Tuplesortstate *state) 3064 { 3065 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 3066 3067 Assert(state->randomAccess); 3068 3069 switch (state->status) 3070 { 3071 case TSS_SORTEDINMEM: 3072 state->markpos_offset = state->current; 3073 state->markpos_eof = state->eof_reached; 3074 break; 3075 case TSS_SORTEDONTAPE: 3076 LogicalTapeTell(state->tapeset, 3077 state->result_tape, 3078 &state->markpos_block, 3079 &state->markpos_offset); 3080 state->markpos_eof = state->eof_reached; 3081 break; 3082 default: 3083 elog(ERROR, "invalid tuplesort state"); 3084 break; 3085 } 3086 3087 MemoryContextSwitchTo(oldcontext); 3088 } 3089 3090 /* 3091 * tuplesort_restorepos - restores current position in merged sort file to 3092 * last saved position 3093 */ 3094 void 3095 tuplesort_restorepos(Tuplesortstate *state) 3096 { 3097 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 3098 3099 Assert(state->randomAccess); 3100 3101 switch (state->status) 3102 { 3103 case TSS_SORTEDINMEM: 3104 state->current = state->markpos_offset; 3105 state->eof_reached = state->markpos_eof; 3106 break; 3107 case TSS_SORTEDONTAPE: 3108 LogicalTapeSeek(state->tapeset, 3109 state->result_tape, 3110 state->markpos_block, 3111 state->markpos_offset); 3112 state->eof_reached = state->markpos_eof; 3113 break; 3114 default: 3115 elog(ERROR, "invalid tuplesort state"); 3116 break; 3117 } 3118 3119 MemoryContextSwitchTo(oldcontext); 3120 } 3121 3122 /* 3123 * tuplesort_get_stats - extract summary statistics 3124 * 3125 * This can be called after tuplesort_performsort() finishes to obtain 3126 * printable summary information about how the sort was performed. 3127 */ 3128 void 3129 tuplesort_get_stats(Tuplesortstate *state, 3130 TuplesortInstrumentation *stats) 3131 { 3132 /* 3133 * Note: it might seem we should provide both memory and disk usage for a 3134 * disk-based sort. However, the current code doesn't track memory space 3135 * accurately once we have begun to return tuples to the caller (since we 3136 * don't account for pfree's the caller is expected to do), so we cannot 3137 * rely on availMem in a disk sort. This does not seem worth the overhead 3138 * to fix. Is it worth creating an API for the memory context code to 3139 * tell us how much is actually used in sortcontext? 3140 */ 3141 if (state->tapeset) 3142 { 3143 stats->spaceType = SORT_SPACE_TYPE_DISK; 3144 stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024); 3145 } 3146 else 3147 { 3148 stats->spaceType = SORT_SPACE_TYPE_MEMORY; 3149 stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; 3150 } 3151 3152 switch (state->status) 3153 { 3154 case TSS_SORTEDINMEM: 3155 if (state->boundUsed) 3156 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT; 3157 else 3158 stats->sortMethod = SORT_TYPE_QUICKSORT; 3159 break; 3160 case TSS_SORTEDONTAPE: 3161 stats->sortMethod = SORT_TYPE_EXTERNAL_SORT; 3162 break; 3163 case TSS_FINALMERGE: 3164 stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE; 3165 break; 3166 default: 3167 stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS; 3168 break; 3169 } 3170 } 3171 3172 /* 3173 * Convert TuplesortMethod to a string. 3174 */ 3175 const char * 3176 tuplesort_method_name(TuplesortMethod m) 3177 { 3178 switch (m) 3179 { 3180 case SORT_TYPE_STILL_IN_PROGRESS: 3181 return "still in progress"; 3182 case SORT_TYPE_TOP_N_HEAPSORT: 3183 return "top-N heapsort"; 3184 case SORT_TYPE_QUICKSORT: 3185 return "quicksort"; 3186 case SORT_TYPE_EXTERNAL_SORT: 3187 return "external sort"; 3188 case SORT_TYPE_EXTERNAL_MERGE: 3189 return "external merge"; 3190 } 3191 3192 return "unknown"; 3193 } 3194 3195 /* 3196 * Convert TuplesortSpaceType to a string. 3197 */ 3198 const char * 3199 tuplesort_space_type_name(TuplesortSpaceType t) 3200 { 3201 Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY); 3202 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory"; 3203 } 3204 3205 3206 /* 3207 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. 3208 */ 3209 3210 /* 3211 * Convert the existing unordered array of SortTuples to a bounded heap, 3212 * discarding all but the smallest "state->bound" tuples. 3213 * 3214 * When working with a bounded heap, we want to keep the largest entry 3215 * at the root (array entry zero), instead of the smallest as in the normal 3216 * sort case. This allows us to discard the largest entry cheaply. 3217 * Therefore, we temporarily reverse the sort direction. 3218 */ 3219 static void 3220 make_bounded_heap(Tuplesortstate *state) 3221 { 3222 int tupcount = state->memtupcount; 3223 int i; 3224 3225 Assert(state->status == TSS_INITIAL); 3226 Assert(state->bounded); 3227 Assert(tupcount >= state->bound); 3228 Assert(SERIAL(state)); 3229 3230 /* Reverse sort direction so largest entry will be at root */ 3231 reversedirection(state); 3232 3233 state->memtupcount = 0; /* make the heap empty */ 3234 for (i = 0; i < tupcount; i++) 3235 { 3236 if (state->memtupcount < state->bound) 3237 { 3238 /* Insert next tuple into heap */ 3239 /* Must copy source tuple to avoid possible overwrite */ 3240 SortTuple stup = state->memtuples[i]; 3241 3242 tuplesort_heap_insert(state, &stup); 3243 } 3244 else 3245 { 3246 /* 3247 * The heap is full. Replace the largest entry with the new 3248 * tuple, or just discard it, if it's larger than anything already 3249 * in the heap. 3250 */ 3251 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) 3252 { 3253 free_sort_tuple(state, &state->memtuples[i]); 3254 CHECK_FOR_INTERRUPTS(); 3255 } 3256 else 3257 tuplesort_heap_replace_top(state, &state->memtuples[i]); 3258 } 3259 } 3260 3261 Assert(state->memtupcount == state->bound); 3262 state->status = TSS_BOUNDED; 3263 } 3264 3265 /* 3266 * Convert the bounded heap to a properly-sorted array 3267 */ 3268 static void 3269 sort_bounded_heap(Tuplesortstate *state) 3270 { 3271 int tupcount = state->memtupcount; 3272 3273 Assert(state->status == TSS_BOUNDED); 3274 Assert(state->bounded); 3275 Assert(tupcount == state->bound); 3276 Assert(SERIAL(state)); 3277 3278 /* 3279 * We can unheapify in place because each delete-top call will remove the 3280 * largest entry, which we can promptly store in the newly freed slot at 3281 * the end. Once we're down to a single-entry heap, we're done. 3282 */ 3283 while (state->memtupcount > 1) 3284 { 3285 SortTuple stup = state->memtuples[0]; 3286 3287 /* this sifts-up the next-largest entry and decreases memtupcount */ 3288 tuplesort_heap_delete_top(state); 3289 state->memtuples[state->memtupcount] = stup; 3290 } 3291 state->memtupcount = tupcount; 3292 3293 /* 3294 * Reverse sort direction back to the original state. This is not 3295 * actually necessary but seems like a good idea for tidiness. 3296 */ 3297 reversedirection(state); 3298 3299 state->status = TSS_SORTEDINMEM; 3300 state->boundUsed = true; 3301 } 3302 3303 /* 3304 * Sort all memtuples using specialized qsort() routines. 3305 * 3306 * Quicksort is used for small in-memory sorts, and external sort runs. 3307 */ 3308 static void 3309 tuplesort_sort_memtuples(Tuplesortstate *state) 3310 { 3311 Assert(!LEADER(state)); 3312 3313 if (state->memtupcount > 1) 3314 { 3315 /* Can we use the single-key sort function? */ 3316 if (state->onlyKey != NULL) 3317 qsort_ssup(state->memtuples, state->memtupcount, 3318 state->onlyKey); 3319 else 3320 qsort_tuple(state->memtuples, 3321 state->memtupcount, 3322 state->comparetup, 3323 state); 3324 } 3325 } 3326 3327 /* 3328 * Insert a new tuple into an empty or existing heap, maintaining the 3329 * heap invariant. Caller is responsible for ensuring there's room. 3330 * 3331 * Note: For some callers, tuple points to a memtuples[] entry above the 3332 * end of the heap. This is safe as long as it's not immediately adjacent 3333 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it 3334 * is, it might get overwritten before being moved into the heap! 3335 */ 3336 static void 3337 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple) 3338 { 3339 SortTuple *memtuples; 3340 int j; 3341 3342 memtuples = state->memtuples; 3343 Assert(state->memtupcount < state->memtupsize); 3344 3345 CHECK_FOR_INTERRUPTS(); 3346 3347 /* 3348 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is 3349 * using 1-based array indexes, not 0-based. 3350 */ 3351 j = state->memtupcount++; 3352 while (j > 0) 3353 { 3354 int i = (j - 1) >> 1; 3355 3356 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0) 3357 break; 3358 memtuples[j] = memtuples[i]; 3359 j = i; 3360 } 3361 memtuples[j] = *tuple; 3362 } 3363 3364 /* 3365 * Remove the tuple at state->memtuples[0] from the heap. Decrement 3366 * memtupcount, and sift up to maintain the heap invariant. 3367 * 3368 * The caller has already free'd the tuple the top node points to, 3369 * if necessary. 3370 */ 3371 static void 3372 tuplesort_heap_delete_top(Tuplesortstate *state) 3373 { 3374 SortTuple *memtuples = state->memtuples; 3375 SortTuple *tuple; 3376 3377 if (--state->memtupcount <= 0) 3378 return; 3379 3380 /* 3381 * Remove the last tuple in the heap, and re-insert it, by replacing the 3382 * current top node with it. 3383 */ 3384 tuple = &memtuples[state->memtupcount]; 3385 tuplesort_heap_replace_top(state, tuple); 3386 } 3387 3388 /* 3389 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to 3390 * maintain the heap invariant. 3391 * 3392 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H, 3393 * Heapsort, steps H3-H8). 3394 */ 3395 static void 3396 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple) 3397 { 3398 SortTuple *memtuples = state->memtuples; 3399 unsigned int i, 3400 n; 3401 3402 Assert(state->memtupcount >= 1); 3403 3404 CHECK_FOR_INTERRUPTS(); 3405 3406 /* 3407 * state->memtupcount is "int", but we use "unsigned int" for i, j, n. 3408 * This prevents overflow in the "2 * i + 1" calculation, since at the top 3409 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2. 3410 */ 3411 n = state->memtupcount; 3412 i = 0; /* i is where the "hole" is */ 3413 for (;;) 3414 { 3415 unsigned int j = 2 * i + 1; 3416 3417 if (j >= n) 3418 break; 3419 if (j + 1 < n && 3420 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0) 3421 j++; 3422 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0) 3423 break; 3424 memtuples[i] = memtuples[j]; 3425 i = j; 3426 } 3427 memtuples[i] = *tuple; 3428 } 3429 3430 /* 3431 * Function to reverse the sort direction from its current state 3432 * 3433 * It is not safe to call this when performing hash tuplesorts 3434 */ 3435 static void 3436 reversedirection(Tuplesortstate *state) 3437 { 3438 SortSupport sortKey = state->sortKeys; 3439 int nkey; 3440 3441 for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) 3442 { 3443 sortKey->ssup_reverse = !sortKey->ssup_reverse; 3444 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; 3445 } 3446 } 3447 3448 3449 /* 3450 * Tape interface routines 3451 */ 3452 3453 static unsigned int 3454 getlen(Tuplesortstate *state, int tapenum, bool eofOK) 3455 { 3456 unsigned int len; 3457 3458 if (LogicalTapeRead(state->tapeset, tapenum, 3459 &len, sizeof(len)) != sizeof(len)) 3460 elog(ERROR, "unexpected end of tape"); 3461 if (len == 0 && !eofOK) 3462 elog(ERROR, "unexpected end of data"); 3463 return len; 3464 } 3465 3466 static void 3467 markrunend(Tuplesortstate *state, int tapenum) 3468 { 3469 unsigned int len = 0; 3470 3471 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); 3472 } 3473 3474 /* 3475 * Get memory for tuple from within READTUP() routine. 3476 * 3477 * We use next free slot from the slab allocator, or palloc() if the tuple 3478 * is too large for that. 3479 */ 3480 static void * 3481 readtup_alloc(Tuplesortstate *state, Size tuplen) 3482 { 3483 SlabSlot *buf; 3484 3485 /* 3486 * We pre-allocate enough slots in the slab arena that we should never run 3487 * out. 3488 */ 3489 Assert(state->slabFreeHead); 3490 3491 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) 3492 return MemoryContextAlloc(state->sortcontext, tuplen); 3493 else 3494 { 3495 buf = state->slabFreeHead; 3496 /* Reuse this slot */ 3497 state->slabFreeHead = buf->nextfree; 3498 3499 return buf; 3500 } 3501 } 3502 3503 3504 /* 3505 * Routines specialized for HeapTuple (actually MinimalTuple) case 3506 */ 3507 3508 static int 3509 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) 3510 { 3511 SortSupport sortKey = state->sortKeys; 3512 HeapTupleData ltup; 3513 HeapTupleData rtup; 3514 TupleDesc tupDesc; 3515 int nkey; 3516 int32 compare; 3517 AttrNumber attno; 3518 Datum datum1, 3519 datum2; 3520 bool isnull1, 3521 isnull2; 3522 3523 3524 /* Compare the leading sort key */ 3525 compare = ApplySortComparator(a->datum1, a->isnull1, 3526 b->datum1, b->isnull1, 3527 sortKey); 3528 if (compare != 0) 3529 return compare; 3530 3531 /* Compare additional sort keys */ 3532 ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; 3533 ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET); 3534 rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET; 3535 rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET); 3536 tupDesc = state->tupDesc; 3537 3538 if (sortKey->abbrev_converter) 3539 { 3540 attno = sortKey->ssup_attno; 3541 3542 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); 3543 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); 3544 3545 compare = ApplySortAbbrevFullComparator(datum1, isnull1, 3546 datum2, isnull2, 3547 sortKey); 3548 if (compare != 0) 3549 return compare; 3550 } 3551 3552 sortKey++; 3553 for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++) 3554 { 3555 attno = sortKey->ssup_attno; 3556 3557 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); 3558 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); 3559 3560 compare = ApplySortComparator(datum1, isnull1, 3561 datum2, isnull2, 3562 sortKey); 3563 if (compare != 0) 3564 return compare; 3565 } 3566 3567 return 0; 3568 } 3569 3570 static void 3571 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) 3572 { 3573 /* 3574 * We expect the passed "tup" to be a TupleTableSlot, and form a 3575 * MinimalTuple using the exported interface for that. 3576 */ 3577 TupleTableSlot *slot = (TupleTableSlot *) tup; 3578 Datum original; 3579 MinimalTuple tuple; 3580 HeapTupleData htup; 3581 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 3582 3583 /* copy the tuple into sort storage */ 3584 tuple = ExecCopySlotMinimalTuple(slot); 3585 stup->tuple = (void *) tuple; 3586 USEMEM(state, GetMemoryChunkSpace(tuple)); 3587 /* set up first-column key value */ 3588 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; 3589 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); 3590 original = heap_getattr(&htup, 3591 state->sortKeys[0].ssup_attno, 3592 state->tupDesc, 3593 &stup->isnull1); 3594 3595 MemoryContextSwitchTo(oldcontext); 3596 3597 if (!state->sortKeys->abbrev_converter || stup->isnull1) 3598 { 3599 /* 3600 * Store ordinary Datum representation, or NULL value. If there is a 3601 * converter it won't expect NULL values, and cost model is not 3602 * required to account for NULL, so in that case we avoid calling 3603 * converter and just set datum1 to zeroed representation (to be 3604 * consistent, and to support cheap inequality tests for NULL 3605 * abbreviated keys). 3606 */ 3607 stup->datum1 = original; 3608 } 3609 else if (!consider_abort_common(state)) 3610 { 3611 /* Store abbreviated key representation */ 3612 stup->datum1 = state->sortKeys->abbrev_converter(original, 3613 state->sortKeys); 3614 } 3615 else 3616 { 3617 /* Abort abbreviation */ 3618 int i; 3619 3620 stup->datum1 = original; 3621 3622 /* 3623 * Set state to be consistent with never trying abbreviation. 3624 * 3625 * Alter datum1 representation in already-copied tuples, so as to 3626 * ensure a consistent representation (current tuple was just 3627 * handled). It does not matter if some dumped tuples are already 3628 * sorted on tape, since serialized tuples lack abbreviated keys 3629 * (TSS_BUILDRUNS state prevents control reaching here in any case). 3630 */ 3631 for (i = 0; i < state->memtupcount; i++) 3632 { 3633 SortTuple *mtup = &state->memtuples[i]; 3634 3635 htup.t_len = ((MinimalTuple) mtup->tuple)->t_len + 3636 MINIMAL_TUPLE_OFFSET; 3637 htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple - 3638 MINIMAL_TUPLE_OFFSET); 3639 3640 mtup->datum1 = heap_getattr(&htup, 3641 state->sortKeys[0].ssup_attno, 3642 state->tupDesc, 3643 &mtup->isnull1); 3644 } 3645 } 3646 } 3647 3648 static void 3649 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) 3650 { 3651 MinimalTuple tuple = (MinimalTuple) stup->tuple; 3652 3653 /* the part of the MinimalTuple we'll write: */ 3654 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; 3655 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; 3656 3657 /* total on-disk footprint: */ 3658 unsigned int tuplen = tupbodylen + sizeof(int); 3659 3660 LogicalTapeWrite(state->tapeset, tapenum, 3661 (void *) &tuplen, sizeof(tuplen)); 3662 LogicalTapeWrite(state->tapeset, tapenum, 3663 (void *) tupbody, tupbodylen); 3664 if (state->randomAccess) /* need trailing length word? */ 3665 LogicalTapeWrite(state->tapeset, tapenum, 3666 (void *) &tuplen, sizeof(tuplen)); 3667 3668 if (!state->slabAllocatorUsed) 3669 { 3670 FREEMEM(state, GetMemoryChunkSpace(tuple)); 3671 heap_free_minimal_tuple(tuple); 3672 } 3673 } 3674 3675 static void 3676 readtup_heap(Tuplesortstate *state, SortTuple *stup, 3677 int tapenum, unsigned int len) 3678 { 3679 unsigned int tupbodylen = len - sizeof(int); 3680 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; 3681 MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); 3682 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; 3683 HeapTupleData htup; 3684 3685 /* read in the tuple proper */ 3686 tuple->t_len = tuplen; 3687 LogicalTapeReadExact(state->tapeset, tapenum, 3688 tupbody, tupbodylen); 3689 if (state->randomAccess) /* need trailing length word? */ 3690 LogicalTapeReadExact(state->tapeset, tapenum, 3691 &tuplen, sizeof(tuplen)); 3692 stup->tuple = (void *) tuple; 3693 /* set up first-column key value */ 3694 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; 3695 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); 3696 stup->datum1 = heap_getattr(&htup, 3697 state->sortKeys[0].ssup_attno, 3698 state->tupDesc, 3699 &stup->isnull1); 3700 } 3701 3702 /* 3703 * Routines specialized for the CLUSTER case (HeapTuple data, with 3704 * comparisons per a btree index definition) 3705 */ 3706 3707 static int 3708 comparetup_cluster(const SortTuple *a, const SortTuple *b, 3709 Tuplesortstate *state) 3710 { 3711 SortSupport sortKey = state->sortKeys; 3712 HeapTuple ltup; 3713 HeapTuple rtup; 3714 TupleDesc tupDesc; 3715 int nkey; 3716 int32 compare; 3717 Datum datum1, 3718 datum2; 3719 bool isnull1, 3720 isnull2; 3721 AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0]; 3722 3723 /* Be prepared to compare additional sort keys */ 3724 ltup = (HeapTuple) a->tuple; 3725 rtup = (HeapTuple) b->tuple; 3726 tupDesc = state->tupDesc; 3727 3728 /* Compare the leading sort key, if it's simple */ 3729 if (leading != 0) 3730 { 3731 compare = ApplySortComparator(a->datum1, a->isnull1, 3732 b->datum1, b->isnull1, 3733 sortKey); 3734 if (compare != 0) 3735 return compare; 3736 3737 if (sortKey->abbrev_converter) 3738 { 3739 datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1); 3740 datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2); 3741 3742 compare = ApplySortAbbrevFullComparator(datum1, isnull1, 3743 datum2, isnull2, 3744 sortKey); 3745 } 3746 if (compare != 0 || state->nKeys == 1) 3747 return compare; 3748 /* Compare additional columns the hard way */ 3749 sortKey++; 3750 nkey = 1; 3751 } 3752 else 3753 { 3754 /* Must compare all keys the hard way */ 3755 nkey = 0; 3756 } 3757 3758 if (state->indexInfo->ii_Expressions == NULL) 3759 { 3760 /* If not expression index, just compare the proper heap attrs */ 3761 3762 for (; nkey < state->nKeys; nkey++, sortKey++) 3763 { 3764 AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey]; 3765 3766 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); 3767 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); 3768 3769 compare = ApplySortComparator(datum1, isnull1, 3770 datum2, isnull2, 3771 sortKey); 3772 if (compare != 0) 3773 return compare; 3774 } 3775 } 3776 else 3777 { 3778 /* 3779 * In the expression index case, compute the whole index tuple and 3780 * then compare values. It would perhaps be faster to compute only as 3781 * many columns as we need to compare, but that would require 3782 * duplicating all the logic in FormIndexDatum. 3783 */ 3784 Datum l_index_values[INDEX_MAX_KEYS]; 3785 bool l_index_isnull[INDEX_MAX_KEYS]; 3786 Datum r_index_values[INDEX_MAX_KEYS]; 3787 bool r_index_isnull[INDEX_MAX_KEYS]; 3788 TupleTableSlot *ecxt_scantuple; 3789 3790 /* Reset context each time to prevent memory leakage */ 3791 ResetPerTupleExprContext(state->estate); 3792 3793 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple; 3794 3795 ExecStoreHeapTuple(ltup, ecxt_scantuple, false); 3796 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, 3797 l_index_values, l_index_isnull); 3798 3799 ExecStoreHeapTuple(rtup, ecxt_scantuple, false); 3800 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, 3801 r_index_values, r_index_isnull); 3802 3803 for (; nkey < state->nKeys; nkey++, sortKey++) 3804 { 3805 compare = ApplySortComparator(l_index_values[nkey], 3806 l_index_isnull[nkey], 3807 r_index_values[nkey], 3808 r_index_isnull[nkey], 3809 sortKey); 3810 if (compare != 0) 3811 return compare; 3812 } 3813 } 3814 3815 return 0; 3816 } 3817 3818 static void 3819 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) 3820 { 3821 HeapTuple tuple = (HeapTuple) tup; 3822 Datum original; 3823 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 3824 3825 /* copy the tuple into sort storage */ 3826 tuple = heap_copytuple(tuple); 3827 stup->tuple = (void *) tuple; 3828 USEMEM(state, GetMemoryChunkSpace(tuple)); 3829 3830 MemoryContextSwitchTo(oldcontext); 3831 3832 /* 3833 * set up first-column key value, and potentially abbreviate, if it's a 3834 * simple column 3835 */ 3836 if (state->indexInfo->ii_IndexAttrNumbers[0] == 0) 3837 return; 3838 3839 original = heap_getattr(tuple, 3840 state->indexInfo->ii_IndexAttrNumbers[0], 3841 state->tupDesc, 3842 &stup->isnull1); 3843 3844 if (!state->sortKeys->abbrev_converter || stup->isnull1) 3845 { 3846 /* 3847 * Store ordinary Datum representation, or NULL value. If there is a 3848 * converter it won't expect NULL values, and cost model is not 3849 * required to account for NULL, so in that case we avoid calling 3850 * converter and just set datum1 to zeroed representation (to be 3851 * consistent, and to support cheap inequality tests for NULL 3852 * abbreviated keys). 3853 */ 3854 stup->datum1 = original; 3855 } 3856 else if (!consider_abort_common(state)) 3857 { 3858 /* Store abbreviated key representation */ 3859 stup->datum1 = state->sortKeys->abbrev_converter(original, 3860 state->sortKeys); 3861 } 3862 else 3863 { 3864 /* Abort abbreviation */ 3865 int i; 3866 3867 stup->datum1 = original; 3868 3869 /* 3870 * Set state to be consistent with never trying abbreviation. 3871 * 3872 * Alter datum1 representation in already-copied tuples, so as to 3873 * ensure a consistent representation (current tuple was just 3874 * handled). It does not matter if some dumped tuples are already 3875 * sorted on tape, since serialized tuples lack abbreviated keys 3876 * (TSS_BUILDRUNS state prevents control reaching here in any case). 3877 */ 3878 for (i = 0; i < state->memtupcount; i++) 3879 { 3880 SortTuple *mtup = &state->memtuples[i]; 3881 3882 tuple = (HeapTuple) mtup->tuple; 3883 mtup->datum1 = heap_getattr(tuple, 3884 state->indexInfo->ii_IndexAttrNumbers[0], 3885 state->tupDesc, 3886 &mtup->isnull1); 3887 } 3888 } 3889 } 3890 3891 static void 3892 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) 3893 { 3894 HeapTuple tuple = (HeapTuple) stup->tuple; 3895 unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); 3896 3897 /* We need to store t_self, but not other fields of HeapTupleData */ 3898 LogicalTapeWrite(state->tapeset, tapenum, 3899 &tuplen, sizeof(tuplen)); 3900 LogicalTapeWrite(state->tapeset, tapenum, 3901 &tuple->t_self, sizeof(ItemPointerData)); 3902 LogicalTapeWrite(state->tapeset, tapenum, 3903 tuple->t_data, tuple->t_len); 3904 if (state->randomAccess) /* need trailing length word? */ 3905 LogicalTapeWrite(state->tapeset, tapenum, 3906 &tuplen, sizeof(tuplen)); 3907 3908 if (!state->slabAllocatorUsed) 3909 { 3910 FREEMEM(state, GetMemoryChunkSpace(tuple)); 3911 heap_freetuple(tuple); 3912 } 3913 } 3914 3915 static void 3916 readtup_cluster(Tuplesortstate *state, SortTuple *stup, 3917 int tapenum, unsigned int tuplen) 3918 { 3919 unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); 3920 HeapTuple tuple = (HeapTuple) readtup_alloc(state, 3921 t_len + HEAPTUPLESIZE); 3922 3923 /* Reconstruct the HeapTupleData header */ 3924 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); 3925 tuple->t_len = t_len; 3926 LogicalTapeReadExact(state->tapeset, tapenum, 3927 &tuple->t_self, sizeof(ItemPointerData)); 3928 /* We don't currently bother to reconstruct t_tableOid */ 3929 tuple->t_tableOid = InvalidOid; 3930 /* Read in the tuple body */ 3931 LogicalTapeReadExact(state->tapeset, tapenum, 3932 tuple->t_data, tuple->t_len); 3933 if (state->randomAccess) /* need trailing length word? */ 3934 LogicalTapeReadExact(state->tapeset, tapenum, 3935 &tuplen, sizeof(tuplen)); 3936 stup->tuple = (void *) tuple; 3937 /* set up first-column key value, if it's a simple column */ 3938 if (state->indexInfo->ii_IndexAttrNumbers[0] != 0) 3939 stup->datum1 = heap_getattr(tuple, 3940 state->indexInfo->ii_IndexAttrNumbers[0], 3941 state->tupDesc, 3942 &stup->isnull1); 3943 } 3944 3945 /* 3946 * Routines specialized for IndexTuple case 3947 * 3948 * The btree and hash cases require separate comparison functions, but the 3949 * IndexTuple representation is the same so the copy/write/read support 3950 * functions can be shared. 3951 */ 3952 3953 static int 3954 comparetup_index_btree(const SortTuple *a, const SortTuple *b, 3955 Tuplesortstate *state) 3956 { 3957 /* 3958 * This is similar to comparetup_heap(), but expects index tuples. There 3959 * is also special handling for enforcing uniqueness, and special 3960 * treatment for equal keys at the end. 3961 */ 3962 SortSupport sortKey = state->sortKeys; 3963 IndexTuple tuple1; 3964 IndexTuple tuple2; 3965 int keysz; 3966 TupleDesc tupDes; 3967 bool equal_hasnull = false; 3968 int nkey; 3969 int32 compare; 3970 Datum datum1, 3971 datum2; 3972 bool isnull1, 3973 isnull2; 3974 3975 3976 /* Compare the leading sort key */ 3977 compare = ApplySortComparator(a->datum1, a->isnull1, 3978 b->datum1, b->isnull1, 3979 sortKey); 3980 if (compare != 0) 3981 return compare; 3982 3983 /* Compare additional sort keys */ 3984 tuple1 = (IndexTuple) a->tuple; 3985 tuple2 = (IndexTuple) b->tuple; 3986 keysz = state->nKeys; 3987 tupDes = RelationGetDescr(state->indexRel); 3988 3989 if (sortKey->abbrev_converter) 3990 { 3991 datum1 = index_getattr(tuple1, 1, tupDes, &isnull1); 3992 datum2 = index_getattr(tuple2, 1, tupDes, &isnull2); 3993 3994 compare = ApplySortAbbrevFullComparator(datum1, isnull1, 3995 datum2, isnull2, 3996 sortKey); 3997 if (compare != 0) 3998 return compare; 3999 } 4000 4001 /* they are equal, so we only need to examine one null flag */ 4002 if (a->isnull1) 4003 equal_hasnull = true; 4004 4005 sortKey++; 4006 for (nkey = 2; nkey <= keysz; nkey++, sortKey++) 4007 { 4008 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); 4009 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); 4010 4011 compare = ApplySortComparator(datum1, isnull1, 4012 datum2, isnull2, 4013 sortKey); 4014 if (compare != 0) 4015 return compare; /* done when we find unequal attributes */ 4016 4017 /* they are equal, so we only need to examine one null flag */ 4018 if (isnull1) 4019 equal_hasnull = true; 4020 } 4021 4022 /* 4023 * If btree has asked us to enforce uniqueness, complain if two equal 4024 * tuples are detected (unless there was at least one NULL field). 4025 * 4026 * It is sufficient to make the test here, because if two tuples are equal 4027 * they *must* get compared at some stage of the sort --- otherwise the 4028 * sort algorithm wouldn't have checked whether one must appear before the 4029 * other. 4030 */ 4031 if (state->enforceUnique && !equal_hasnull) 4032 { 4033 Datum values[INDEX_MAX_KEYS]; 4034 bool isnull[INDEX_MAX_KEYS]; 4035 char *key_desc; 4036 4037 /* 4038 * Some rather brain-dead implementations of qsort (such as the one in 4039 * QNX 4) will sometimes call the comparison routine to compare a 4040 * value to itself, but we always use our own implementation, which 4041 * does not. 4042 */ 4043 Assert(tuple1 != tuple2); 4044 4045 index_deform_tuple(tuple1, tupDes, values, isnull); 4046 4047 key_desc = BuildIndexValueDescription(state->indexRel, values, isnull); 4048 4049 ereport(ERROR, 4050 (errcode(ERRCODE_UNIQUE_VIOLATION), 4051 errmsg("could not create unique index \"%s\"", 4052 RelationGetRelationName(state->indexRel)), 4053 key_desc ? errdetail("Key %s is duplicated.", key_desc) : 4054 errdetail("Duplicate keys exist."), 4055 errtableconstraint(state->heapRel, 4056 RelationGetRelationName(state->indexRel)))); 4057 } 4058 4059 /* 4060 * If key values are equal, we sort on ItemPointer. This is required for 4061 * btree indexes, since heap TID is treated as an implicit last key 4062 * attribute in order to ensure that all keys in the index are physically 4063 * unique. 4064 */ 4065 { 4066 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); 4067 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); 4068 4069 if (blk1 != blk2) 4070 return (blk1 < blk2) ? -1 : 1; 4071 } 4072 { 4073 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); 4074 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); 4075 4076 if (pos1 != pos2) 4077 return (pos1 < pos2) ? -1 : 1; 4078 } 4079 4080 /* ItemPointer values should never be equal */ 4081 Assert(false); 4082 4083 return 0; 4084 } 4085 4086 static int 4087 comparetup_index_hash(const SortTuple *a, const SortTuple *b, 4088 Tuplesortstate *state) 4089 { 4090 Bucket bucket1; 4091 Bucket bucket2; 4092 IndexTuple tuple1; 4093 IndexTuple tuple2; 4094 4095 /* 4096 * Fetch hash keys and mask off bits we don't want to sort by. We know 4097 * that the first column of the index tuple is the hash key. 4098 */ 4099 Assert(!a->isnull1); 4100 bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1), 4101 state->max_buckets, state->high_mask, 4102 state->low_mask); 4103 Assert(!b->isnull1); 4104 bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1), 4105 state->max_buckets, state->high_mask, 4106 state->low_mask); 4107 if (bucket1 > bucket2) 4108 return 1; 4109 else if (bucket1 < bucket2) 4110 return -1; 4111 4112 /* 4113 * If hash values are equal, we sort on ItemPointer. This does not affect 4114 * validity of the finished index, but it may be useful to have index 4115 * scans in physical order. 4116 */ 4117 tuple1 = (IndexTuple) a->tuple; 4118 tuple2 = (IndexTuple) b->tuple; 4119 4120 { 4121 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); 4122 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); 4123 4124 if (blk1 != blk2) 4125 return (blk1 < blk2) ? -1 : 1; 4126 } 4127 { 4128 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); 4129 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); 4130 4131 if (pos1 != pos2) 4132 return (pos1 < pos2) ? -1 : 1; 4133 } 4134 4135 /* ItemPointer values should never be equal */ 4136 Assert(false); 4137 4138 return 0; 4139 } 4140 4141 static void 4142 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) 4143 { 4144 IndexTuple tuple = (IndexTuple) tup; 4145 unsigned int tuplen = IndexTupleSize(tuple); 4146 IndexTuple newtuple; 4147 Datum original; 4148 4149 /* copy the tuple into sort storage */ 4150 newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen); 4151 memcpy(newtuple, tuple, tuplen); 4152 USEMEM(state, GetMemoryChunkSpace(newtuple)); 4153 stup->tuple = (void *) newtuple; 4154 /* set up first-column key value */ 4155 original = index_getattr(newtuple, 4156 1, 4157 RelationGetDescr(state->indexRel), 4158 &stup->isnull1); 4159 4160 if (!state->sortKeys->abbrev_converter || stup->isnull1) 4161 { 4162 /* 4163 * Store ordinary Datum representation, or NULL value. If there is a 4164 * converter it won't expect NULL values, and cost model is not 4165 * required to account for NULL, so in that case we avoid calling 4166 * converter and just set datum1 to zeroed representation (to be 4167 * consistent, and to support cheap inequality tests for NULL 4168 * abbreviated keys). 4169 */ 4170 stup->datum1 = original; 4171 } 4172 else if (!consider_abort_common(state)) 4173 { 4174 /* Store abbreviated key representation */ 4175 stup->datum1 = state->sortKeys->abbrev_converter(original, 4176 state->sortKeys); 4177 } 4178 else 4179 { 4180 /* Abort abbreviation */ 4181 int i; 4182 4183 stup->datum1 = original; 4184 4185 /* 4186 * Set state to be consistent with never trying abbreviation. 4187 * 4188 * Alter datum1 representation in already-copied tuples, so as to 4189 * ensure a consistent representation (current tuple was just 4190 * handled). It does not matter if some dumped tuples are already 4191 * sorted on tape, since serialized tuples lack abbreviated keys 4192 * (TSS_BUILDRUNS state prevents control reaching here in any case). 4193 */ 4194 for (i = 0; i < state->memtupcount; i++) 4195 { 4196 SortTuple *mtup = &state->memtuples[i]; 4197 4198 tuple = (IndexTuple) mtup->tuple; 4199 mtup->datum1 = index_getattr(tuple, 4200 1, 4201 RelationGetDescr(state->indexRel), 4202 &mtup->isnull1); 4203 } 4204 } 4205 } 4206 4207 static void 4208 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) 4209 { 4210 IndexTuple tuple = (IndexTuple) stup->tuple; 4211 unsigned int tuplen; 4212 4213 tuplen = IndexTupleSize(tuple) + sizeof(tuplen); 4214 LogicalTapeWrite(state->tapeset, tapenum, 4215 (void *) &tuplen, sizeof(tuplen)); 4216 LogicalTapeWrite(state->tapeset, tapenum, 4217 (void *) tuple, IndexTupleSize(tuple)); 4218 if (state->randomAccess) /* need trailing length word? */ 4219 LogicalTapeWrite(state->tapeset, tapenum, 4220 (void *) &tuplen, sizeof(tuplen)); 4221 4222 if (!state->slabAllocatorUsed) 4223 { 4224 FREEMEM(state, GetMemoryChunkSpace(tuple)); 4225 pfree(tuple); 4226 } 4227 } 4228 4229 static void 4230 readtup_index(Tuplesortstate *state, SortTuple *stup, 4231 int tapenum, unsigned int len) 4232 { 4233 unsigned int tuplen = len - sizeof(unsigned int); 4234 IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); 4235 4236 LogicalTapeReadExact(state->tapeset, tapenum, 4237 tuple, tuplen); 4238 if (state->randomAccess) /* need trailing length word? */ 4239 LogicalTapeReadExact(state->tapeset, tapenum, 4240 &tuplen, sizeof(tuplen)); 4241 stup->tuple = (void *) tuple; 4242 /* set up first-column key value */ 4243 stup->datum1 = index_getattr(tuple, 4244 1, 4245 RelationGetDescr(state->indexRel), 4246 &stup->isnull1); 4247 } 4248 4249 /* 4250 * Routines specialized for DatumTuple case 4251 */ 4252 4253 static int 4254 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) 4255 { 4256 int compare; 4257 4258 compare = ApplySortComparator(a->datum1, a->isnull1, 4259 b->datum1, b->isnull1, 4260 state->sortKeys); 4261 if (compare != 0) 4262 return compare; 4263 4264 /* if we have abbreviations, then "tuple" has the original value */ 4265 4266 if (state->sortKeys->abbrev_converter) 4267 compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1, 4268 PointerGetDatum(b->tuple), b->isnull1, 4269 state->sortKeys); 4270 4271 return compare; 4272 } 4273 4274 static void 4275 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) 4276 { 4277 /* Not currently needed */ 4278 elog(ERROR, "copytup_datum() should not be called"); 4279 } 4280 4281 static void 4282 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) 4283 { 4284 void *waddr; 4285 unsigned int tuplen; 4286 unsigned int writtenlen; 4287 4288 if (stup->isnull1) 4289 { 4290 waddr = NULL; 4291 tuplen = 0; 4292 } 4293 else if (!state->tuples) 4294 { 4295 waddr = &stup->datum1; 4296 tuplen = sizeof(Datum); 4297 } 4298 else 4299 { 4300 waddr = stup->tuple; 4301 tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen); 4302 Assert(tuplen != 0); 4303 } 4304 4305 writtenlen = tuplen + sizeof(unsigned int); 4306 4307 LogicalTapeWrite(state->tapeset, tapenum, 4308 (void *) &writtenlen, sizeof(writtenlen)); 4309 LogicalTapeWrite(state->tapeset, tapenum, 4310 waddr, tuplen); 4311 if (state->randomAccess) /* need trailing length word? */ 4312 LogicalTapeWrite(state->tapeset, tapenum, 4313 (void *) &writtenlen, sizeof(writtenlen)); 4314 4315 if (!state->slabAllocatorUsed && stup->tuple) 4316 { 4317 FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); 4318 pfree(stup->tuple); 4319 } 4320 } 4321 4322 static void 4323 readtup_datum(Tuplesortstate *state, SortTuple *stup, 4324 int tapenum, unsigned int len) 4325 { 4326 unsigned int tuplen = len - sizeof(unsigned int); 4327 4328 if (tuplen == 0) 4329 { 4330 /* it's NULL */ 4331 stup->datum1 = (Datum) 0; 4332 stup->isnull1 = true; 4333 stup->tuple = NULL; 4334 } 4335 else if (!state->tuples) 4336 { 4337 Assert(tuplen == sizeof(Datum)); 4338 LogicalTapeReadExact(state->tapeset, tapenum, 4339 &stup->datum1, tuplen); 4340 stup->isnull1 = false; 4341 stup->tuple = NULL; 4342 } 4343 else 4344 { 4345 void *raddr = readtup_alloc(state, tuplen); 4346 4347 LogicalTapeReadExact(state->tapeset, tapenum, 4348 raddr, tuplen); 4349 stup->datum1 = PointerGetDatum(raddr); 4350 stup->isnull1 = false; 4351 stup->tuple = raddr; 4352 } 4353 4354 if (state->randomAccess) /* need trailing length word? */ 4355 LogicalTapeReadExact(state->tapeset, tapenum, 4356 &tuplen, sizeof(tuplen)); 4357 } 4358 4359 /* 4360 * Parallel sort routines 4361 */ 4362 4363 /* 4364 * tuplesort_estimate_shared - estimate required shared memory allocation 4365 * 4366 * nWorkers is an estimate of the number of workers (it's the number that 4367 * will be requested). 4368 */ 4369 Size 4370 tuplesort_estimate_shared(int nWorkers) 4371 { 4372 Size tapesSize; 4373 4374 Assert(nWorkers > 0); 4375 4376 /* Make sure that BufFile shared state is MAXALIGN'd */ 4377 tapesSize = mul_size(sizeof(TapeShare), nWorkers); 4378 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes))); 4379 4380 return tapesSize; 4381 } 4382 4383 /* 4384 * tuplesort_initialize_shared - initialize shared tuplesort state 4385 * 4386 * Must be called from leader process before workers are launched, to 4387 * establish state needed up-front for worker tuplesortstates. nWorkers 4388 * should match the argument passed to tuplesort_estimate_shared(). 4389 */ 4390 void 4391 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg) 4392 { 4393 int i; 4394 4395 Assert(nWorkers > 0); 4396 4397 SpinLockInit(&shared->mutex); 4398 shared->currentWorker = 0; 4399 shared->workersFinished = 0; 4400 SharedFileSetInit(&shared->fileset, seg); 4401 shared->nTapes = nWorkers; 4402 for (i = 0; i < nWorkers; i++) 4403 { 4404 shared->tapes[i].firstblocknumber = 0L; 4405 } 4406 } 4407 4408 /* 4409 * tuplesort_attach_shared - attach to shared tuplesort state 4410 * 4411 * Must be called by all worker processes. 4412 */ 4413 void 4414 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg) 4415 { 4416 /* Attach to SharedFileSet */ 4417 SharedFileSetAttach(&shared->fileset, seg); 4418 } 4419 4420 /* 4421 * worker_get_identifier - Assign and return ordinal identifier for worker 4422 * 4423 * The order in which these are assigned is not well defined, and should not 4424 * matter; worker numbers across parallel sort participants need only be 4425 * distinct and gapless. logtape.c requires this. 4426 * 4427 * Note that the identifiers assigned from here have no relation to 4428 * ParallelWorkerNumber number, to avoid making any assumption about 4429 * caller's requirements. However, we do follow the ParallelWorkerNumber 4430 * convention of representing a non-worker with worker number -1. This 4431 * includes the leader, as well as serial Tuplesort processes. 4432 */ 4433 static int 4434 worker_get_identifier(Tuplesortstate *state) 4435 { 4436 Sharedsort *shared = state->shared; 4437 int worker; 4438 4439 Assert(WORKER(state)); 4440 4441 SpinLockAcquire(&shared->mutex); 4442 worker = shared->currentWorker++; 4443 SpinLockRelease(&shared->mutex); 4444 4445 return worker; 4446 } 4447 4448 /* 4449 * worker_freeze_result_tape - freeze worker's result tape for leader 4450 * 4451 * This is called by workers just after the result tape has been determined, 4452 * instead of calling LogicalTapeFreeze() directly. They do so because 4453 * workers require a few additional steps over similar serial 4454 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra 4455 * steps are around freeing now unneeded resources, and representing to 4456 * leader that worker's input run is available for its merge. 4457 * 4458 * There should only be one final output run for each worker, which consists 4459 * of all tuples that were originally input into worker. 4460 */ 4461 static void 4462 worker_freeze_result_tape(Tuplesortstate *state) 4463 { 4464 Sharedsort *shared = state->shared; 4465 TapeShare output; 4466 4467 Assert(WORKER(state)); 4468 Assert(state->result_tape != -1); 4469 Assert(state->memtupcount == 0); 4470 4471 /* 4472 * Free most remaining memory, in case caller is sensitive to our holding 4473 * on to it. memtuples may not be a tiny merge heap at this point. 4474 */ 4475 pfree(state->memtuples); 4476 /* Be tidy */ 4477 state->memtuples = NULL; 4478 state->memtupsize = 0; 4479 4480 /* 4481 * Parallel worker requires result tape metadata, which is to be stored in 4482 * shared memory for leader 4483 */ 4484 LogicalTapeFreeze(state->tapeset, state->result_tape, &output); 4485 4486 /* Store properties of output tape, and update finished worker count */ 4487 SpinLockAcquire(&shared->mutex); 4488 shared->tapes[state->worker] = output; 4489 shared->workersFinished++; 4490 SpinLockRelease(&shared->mutex); 4491 } 4492 4493 /* 4494 * worker_nomergeruns - dump memtuples in worker, without merging 4495 * 4496 * This called as an alternative to mergeruns() with a worker when no 4497 * merging is required. 4498 */ 4499 static void 4500 worker_nomergeruns(Tuplesortstate *state) 4501 { 4502 Assert(WORKER(state)); 4503 Assert(state->result_tape == -1); 4504 4505 state->result_tape = state->tp_tapenum[state->destTape]; 4506 worker_freeze_result_tape(state); 4507 } 4508 4509 /* 4510 * leader_takeover_tapes - create tapeset for leader from worker tapes 4511 * 4512 * So far, leader Tuplesortstate has performed no actual sorting. By now, all 4513 * sorting has occurred in workers, all of which must have already returned 4514 * from tuplesort_performsort(). 4515 * 4516 * When this returns, leader process is left in a state that is virtually 4517 * indistinguishable from it having generated runs as a serial external sort 4518 * might have. 4519 */ 4520 static void 4521 leader_takeover_tapes(Tuplesortstate *state) 4522 { 4523 Sharedsort *shared = state->shared; 4524 int nParticipants = state->nParticipants; 4525 int workersFinished; 4526 int j; 4527 4528 Assert(LEADER(state)); 4529 Assert(nParticipants >= 1); 4530 4531 SpinLockAcquire(&shared->mutex); 4532 workersFinished = shared->workersFinished; 4533 SpinLockRelease(&shared->mutex); 4534 4535 if (nParticipants != workersFinished) 4536 elog(ERROR, "cannot take over tapes before all workers finish"); 4537 4538 /* 4539 * Create the tapeset from worker tapes, including a leader-owned tape at 4540 * the end. Parallel workers are far more expensive than logical tapes, 4541 * so the number of tapes allocated here should never be excessive. 4542 * 4543 * We still have a leader tape, though it's not possible to write to it 4544 * due to restrictions in the shared fileset infrastructure used by 4545 * logtape.c. It will never be written to in practice because 4546 * randomAccess is disallowed for parallel sorts. 4547 */ 4548 inittapestate(state, nParticipants + 1); 4549 state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes, 4550 &shared->fileset, state->worker); 4551 4552 /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ 4553 state->currentRun = nParticipants; 4554 4555 /* 4556 * Initialize variables of Algorithm D to be consistent with runs from 4557 * workers having been generated in the leader. 4558 * 4559 * There will always be exactly 1 run per worker, and exactly one input 4560 * tape per run, because workers always output exactly 1 run, even when 4561 * there were no input tuples for workers to sort. 4562 */ 4563 for (j = 0; j < state->maxTapes; j++) 4564 { 4565 /* One real run; no dummy runs for worker tapes */ 4566 state->tp_fib[j] = 1; 4567 state->tp_runs[j] = 1; 4568 state->tp_dummy[j] = 0; 4569 state->tp_tapenum[j] = j; 4570 } 4571 /* Leader tape gets one dummy run, and no real runs */ 4572 state->tp_fib[state->tapeRange] = 0; 4573 state->tp_runs[state->tapeRange] = 0; 4574 state->tp_dummy[state->tapeRange] = 1; 4575 4576 state->Level = 1; 4577 state->destTape = 0; 4578 4579 state->status = TSS_BUILDRUNS; 4580 } 4581 4582 /* 4583 * Convenience routine to free a tuple previously loaded into sort memory 4584 */ 4585 static void 4586 free_sort_tuple(Tuplesortstate *state, SortTuple *stup) 4587 { 4588 if (stup->tuple) 4589 { 4590 FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); 4591 pfree(stup->tuple); 4592 stup->tuple = NULL; 4593 } 4594 } 4595