1 /*------------------------------------------------------------------------- 2 * 3 * tuplesort.c 4 * Generalized tuple sorting routines. 5 * 6 * This module handles sorting of heap tuples, index tuples, or single 7 * Datums (and could easily support other kinds of sortable objects, 8 * if necessary). It works efficiently for both small and large amounts 9 * of data. Small amounts are sorted in-memory using qsort(). Large 10 * amounts are sorted using temporary files and a standard external sort 11 * algorithm. 12 * 13 * See Knuth, volume 3, for more than you want to know about the external 14 * sorting algorithm. Historically, we divided the input into sorted runs 15 * using replacement selection, in the form of a priority tree implemented 16 * as a heap (essentially his Algorithm 5.2.3H), but now we always use 17 * quicksort for run generation. We merge the runs using polyphase merge, 18 * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are 19 * implemented by logtape.c, which avoids space wastage by recycling disk 20 * space as soon as each block is read from its "tape". 21 * 22 * The approximate amount of memory allowed for any one sort operation 23 * is specified in kilobytes by the caller (most pass work_mem). Initially, 24 * we absorb tuples and simply store them in an unsorted array as long as 25 * we haven't exceeded workMem. If we reach the end of the input without 26 * exceeding workMem, we sort the array using qsort() and subsequently return 27 * tuples just by scanning the tuple array sequentially. If we do exceed 28 * workMem, we begin to emit tuples into sorted runs in temporary tapes. 29 * When tuples are dumped in batch after quicksorting, we begin a new run 30 * with a new output tape (selected per Algorithm D). After the end of the 31 * input is reached, we dump out remaining tuples in memory into a final run, 32 * then merge the runs using Algorithm D. 33 * 34 * When merging runs, we use a heap containing just the frontmost tuple from 35 * each source run; we repeatedly output the smallest tuple and replace it 36 * with the next tuple from its source tape (if any). When the heap empties, 37 * the merge is complete. The basic merge algorithm thus needs very little 38 * memory --- only M tuples for an M-way merge, and M is constrained to a 39 * small number. However, we can still make good use of our full workMem 40 * allocation by pre-reading additional blocks from each source tape. Without 41 * prereading, our access pattern to the temporary file would be very erratic; 42 * on average we'd read one block from each of M source tapes during the same 43 * time that we're writing M blocks to the output tape, so there is no 44 * sequentiality of access at all, defeating the read-ahead methods used by 45 * most Unix kernels. Worse, the output tape gets written into a very random 46 * sequence of blocks of the temp file, ensuring that things will be even 47 * worse when it comes time to read that tape. A straightforward merge pass 48 * thus ends up doing a lot of waiting for disk seeks. We can improve matters 49 * by prereading from each source tape sequentially, loading about workMem/M 50 * bytes from each tape in turn, and making the sequential blocks immediately 51 * available for reuse. This approach helps to localize both read and write 52 * accesses. The pre-reading is handled by logtape.c, we just tell it how 53 * much memory to use for the buffers. 54 * 55 * When the caller requests random access to the sort result, we form 56 * the final sorted run on a logical tape which is then "frozen", so 57 * that we can access it randomly. When the caller does not need random 58 * access, we return from tuplesort_performsort() as soon as we are down 59 * to one run per logical tape. The final merge is then performed 60 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this 61 * saves one cycle of writing all the data out to disk and reading it in. 62 * 63 * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the 64 * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according 65 * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that 66 * tape drives are expensive beasts, and in particular that there will always 67 * be many more runs than tape drives. In our implementation a "tape drive" 68 * doesn't cost much more than a few Kb of memory buffers, so we can afford 69 * to have lots of them. In particular, if we can have as many tape drives 70 * as sorted runs, we can eliminate any repeated I/O at all. In the current 71 * code we determine the number of tapes M on the basis of workMem: we want 72 * workMem/M to be large enough that we read a fair amount of data each time 73 * we preread from a tape, so as to maintain the locality of access described 74 * above. Nonetheless, with large workMem we can have many tapes (but not 75 * too many -- see the comments in tuplesort_merge_order). 76 * 77 * This module supports parallel sorting. Parallel sorts involve coordination 78 * among one or more worker processes, and a leader process, each with its own 79 * tuplesort state. The leader process (or, more accurately, the 80 * Tuplesortstate associated with a leader process) creates a full tapeset 81 * consisting of worker tapes with one run to merge; a run for every 82 * worker process. This is then merged. Worker processes are guaranteed to 83 * produce exactly one output run from their partial input. 84 * 85 * 86 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 87 * Portions Copyright (c) 1994, Regents of the University of California 88 * 89 * IDENTIFICATION 90 * src/backend/utils/sort/tuplesort.c 91 * 92 *------------------------------------------------------------------------- 93 */ 94 95 #include "postgres.h" 96 97 #include <limits.h> 98 99 #include "access/hash.h" 100 #include "access/htup_details.h" 101 #include "access/nbtree.h" 102 #include "catalog/index.h" 103 #include "catalog/pg_am.h" 104 #include "commands/tablespace.h" 105 #include "executor/executor.h" 106 #include "miscadmin.h" 107 #include "pg_trace.h" 108 #include "utils/datum.h" 109 #include "utils/logtape.h" 110 #include "utils/lsyscache.h" 111 #include "utils/memutils.h" 112 #include "utils/pg_rusage.h" 113 #include "utils/rel.h" 114 #include "utils/sortsupport.h" 115 #include "utils/tuplesort.h" 116 117 118 /* sort-type codes for sort__start probes */ 119 #define HEAP_SORT 0 120 #define INDEX_SORT 1 121 #define DATUM_SORT 2 122 #define CLUSTER_SORT 3 123 124 /* Sort parallel code from state for sort__start probes */ 125 #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \ 126 (state)->worker >= 0 ? 1 : 2) 127 128 /* 129 * Initial size of memtuples array. We're trying to select this size so that 130 * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of 131 * allocation might possibly be lowered. However, we don't consider array sizes 132 * less than 1024. 133 * 134 */ 135 #define INITIAL_MEMTUPSIZE Max(1024, \ 136 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1) 137 138 /* GUC variables */ 139 #ifdef TRACE_SORT 140 bool trace_sort = false; 141 #endif 142 143 #ifdef DEBUG_BOUNDED_SORT 144 bool optimize_bounded_sort = true; 145 #endif 146 147 148 /* 149 * The objects we actually sort are SortTuple structs. These contain 150 * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), 151 * which is a separate palloc chunk --- we assume it is just one chunk and 152 * can be freed by a simple pfree() (except during merge, when we use a 153 * simple slab allocator). SortTuples also contain the tuple's first key 154 * column in Datum/nullflag format, and a source/input tape number that 155 * tracks which tape each heap element/slot belongs to during merging. 156 * 157 * Storing the first key column lets us save heap_getattr or index_getattr 158 * calls during tuple comparisons. We could extract and save all the key 159 * columns not just the first, but this would increase code complexity and 160 * overhead, and wouldn't actually save any comparison cycles in the common 161 * case where the first key determines the comparison result. Note that 162 * for a pass-by-reference datatype, datum1 points into the "tuple" storage. 163 * 164 * There is one special case: when the sort support infrastructure provides an 165 * "abbreviated key" representation, where the key is (typically) a pass by 166 * value proxy for a pass by reference type. In this case, the abbreviated key 167 * is stored in datum1 in place of the actual first key column. 168 * 169 * When sorting single Datums, the data value is represented directly by 170 * datum1/isnull1 for pass by value types (or null values). If the datatype is 171 * pass-by-reference and isnull1 is false, then "tuple" points to a separately 172 * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then 173 * either the same pointer as "tuple", or is an abbreviated key value as 174 * described above. Accordingly, "tuple" is always used in preference to 175 * datum1 as the authoritative value for pass-by-reference cases. 176 */ 177 typedef struct 178 { 179 void *tuple; /* the tuple itself */ 180 Datum datum1; /* value of first key column */ 181 bool isnull1; /* is first key column NULL? */ 182 int srctape; /* source tape number */ 183 } SortTuple; 184 185 /* 186 * During merge, we use a pre-allocated set of fixed-size slots to hold 187 * tuples. To avoid palloc/pfree overhead. 188 * 189 * Merge doesn't require a lot of memory, so we can afford to waste some, 190 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the 191 * palloc() overhead is not significant anymore. 192 * 193 * 'nextfree' is valid when this chunk is in the free list. When in use, the 194 * slot holds a tuple. 195 */ 196 #define SLAB_SLOT_SIZE 1024 197 198 typedef union SlabSlot 199 { 200 union SlabSlot *nextfree; 201 char buffer[SLAB_SLOT_SIZE]; 202 } SlabSlot; 203 204 /* 205 * Possible states of a Tuplesort object. These denote the states that 206 * persist between calls of Tuplesort routines. 207 */ 208 typedef enum 209 { 210 TSS_INITIAL, /* Loading tuples; still within memory limit */ 211 TSS_BOUNDED, /* Loading tuples into bounded-size heap */ 212 TSS_BUILDRUNS, /* Loading tuples; writing to tape */ 213 TSS_SORTEDINMEM, /* Sort completed entirely in memory */ 214 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */ 215 TSS_FINALMERGE /* Performing final merge on-the-fly */ 216 } TupSortStatus; 217 218 /* 219 * Parameters for calculation of number of tapes to use --- see inittapes() 220 * and tuplesort_merge_order(). 221 * 222 * In this calculation we assume that each tape will cost us about 1 blocks 223 * worth of buffer space. This ignores the overhead of all the other data 224 * structures needed for each tape, but it's probably close enough. 225 * 226 * MERGE_BUFFER_SIZE is how much data we'd like to read from each input 227 * tape during a preread cycle (see discussion at top of file). 228 */ 229 #define MINORDER 6 /* minimum merge order */ 230 #define MAXORDER 500 /* maximum merge order */ 231 #define TAPE_BUFFER_OVERHEAD BLCKSZ 232 #define MERGE_BUFFER_SIZE (BLCKSZ * 32) 233 234 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b, 235 Tuplesortstate *state); 236 237 /* 238 * Private state of a Tuplesort operation. 239 */ 240 struct Tuplesortstate 241 { 242 TupSortStatus status; /* enumerated value as shown above */ 243 int nKeys; /* number of columns in sort key */ 244 bool randomAccess; /* did caller request random access? */ 245 bool bounded; /* did caller specify a maximum number of 246 * tuples to return? */ 247 bool boundUsed; /* true if we made use of a bounded heap */ 248 int bound; /* if bounded, the maximum number of tuples */ 249 bool tuples; /* Can SortTuple.tuple ever be set? */ 250 int64 availMem; /* remaining memory available, in bytes */ 251 int64 allowedMem; /* total memory allowed, in bytes */ 252 int maxTapes; /* number of tapes (Knuth's T) */ 253 int tapeRange; /* maxTapes-1 (Knuth's P) */ 254 int64 maxSpace; /* maximum amount of space occupied among sort 255 * of groups, either in-memory or on-disk */ 256 bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk 257 * space, false when it's value for in-memory 258 * space */ 259 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */ 260 MemoryContext maincontext; /* memory context for tuple sort metadata that 261 * persists across multiple batches */ 262 MemoryContext sortcontext; /* memory context holding most sort data */ 263 MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ 264 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ 265 266 /* 267 * These function pointers decouple the routines that must know what kind 268 * of tuple we are sorting from the routines that don't need to know it. 269 * They are set up by the tuplesort_begin_xxx routines. 270 * 271 * Function to compare two tuples; result is per qsort() convention, ie: 272 * <0, 0, >0 according as a<b, a=b, a>b. The API must match 273 * qsort_arg_comparator. 274 */ 275 SortTupleComparator comparetup; 276 277 /* 278 * Function to copy a supplied input tuple into palloc'd space and set up 279 * its SortTuple representation (ie, set tuple/datum1/isnull1). Also, 280 * state->availMem must be decreased by the amount of space used for the 281 * tuple copy (note the SortTuple struct itself is not counted). 282 */ 283 void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup); 284 285 /* 286 * Function to write a stored tuple onto tape. The representation of the 287 * tuple on tape need not be the same as it is in memory; requirements on 288 * the tape representation are given below. Unless the slab allocator is 289 * used, after writing the tuple, pfree() the out-of-line data (not the 290 * SortTuple struct!), and increase state->availMem by the amount of 291 * memory space thereby released. 292 */ 293 void (*writetup) (Tuplesortstate *state, int tapenum, 294 SortTuple *stup); 295 296 /* 297 * Function to read a stored tuple from tape back into memory. 'len' is 298 * the already-read length of the stored tuple. The tuple is allocated 299 * from the slab memory arena, or is palloc'd, see readtup_alloc(). 300 */ 301 void (*readtup) (Tuplesortstate *state, SortTuple *stup, 302 int tapenum, unsigned int len); 303 304 /* 305 * This array holds the tuples now in sort memory. If we are in state 306 * INITIAL, the tuples are in no particular order; if we are in state 307 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS 308 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm 309 * H. In state SORTEDONTAPE, the array is not used. 310 */ 311 SortTuple *memtuples; /* array of SortTuple structs */ 312 int memtupcount; /* number of tuples currently present */ 313 int memtupsize; /* allocated length of memtuples array */ 314 bool growmemtuples; /* memtuples' growth still underway? */ 315 316 /* 317 * Memory for tuples is sometimes allocated using a simple slab allocator, 318 * rather than with palloc(). Currently, we switch to slab allocation 319 * when we start merging. Merging only needs to keep a small, fixed 320 * number of tuples in memory at any time, so we can avoid the 321 * palloc/pfree overhead by recycling a fixed number of fixed-size slots 322 * to hold the tuples. 323 * 324 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE 325 * slots. The allocation is sized to have one slot per tape, plus one 326 * additional slot. We need that many slots to hold all the tuples kept 327 * in the heap during merge, plus the one we have last returned from the 328 * sort, with tuplesort_gettuple. 329 * 330 * Initially, all the slots are kept in a linked list of free slots. When 331 * a tuple is read from a tape, it is put to the next available slot, if 332 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd 333 * instead. 334 * 335 * When we're done processing a tuple, we return the slot back to the free 336 * list, or pfree() if it was palloc'd. We know that a tuple was 337 * allocated from the slab, if its pointer value is between 338 * slabMemoryBegin and -End. 339 * 340 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of 341 * tracking memory usage is not used. 342 */ 343 bool slabAllocatorUsed; 344 345 char *slabMemoryBegin; /* beginning of slab memory arena */ 346 char *slabMemoryEnd; /* end of slab memory arena */ 347 SlabSlot *slabFreeHead; /* head of free list */ 348 349 /* Buffer size to use for reading input tapes, during merge. */ 350 size_t read_buffer_size; 351 352 /* 353 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that 354 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE 355 * modes), we remember the tuple in 'lastReturnedTuple', so that we can 356 * recycle the memory on next gettuple call. 357 */ 358 void *lastReturnedTuple; 359 360 /* 361 * While building initial runs, this is the current output run number. 362 * Afterwards, it is the number of initial runs we made. 363 */ 364 int currentRun; 365 366 /* 367 * Unless otherwise noted, all pointer variables below are pointers to 368 * arrays of length maxTapes, holding per-tape data. 369 */ 370 371 /* 372 * This variable is only used during merge passes. mergeactive[i] is true 373 * if we are reading an input run from (actual) tape number i and have not 374 * yet exhausted that run. 375 */ 376 bool *mergeactive; /* active input run source? */ 377 378 /* 379 * Variables for Algorithm D. Note that destTape is a "logical" tape 380 * number, ie, an index into the tp_xxx[] arrays. Be careful to keep 381 * "logical" and "actual" tape numbers straight! 382 */ 383 int Level; /* Knuth's l */ 384 int destTape; /* current output tape (Knuth's j, less 1) */ 385 int *tp_fib; /* Target Fibonacci run counts (A[]) */ 386 int *tp_runs; /* # of real runs on each tape */ 387 int *tp_dummy; /* # of dummy runs for each tape (D[]) */ 388 int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ 389 int activeTapes; /* # of active input tapes in merge pass */ 390 391 /* 392 * These variables are used after completion of sorting to keep track of 393 * the next tuple to return. (In the tape case, the tape's current read 394 * position is also critical state.) 395 */ 396 int result_tape; /* actual tape number of finished output */ 397 int current; /* array index (only used if SORTEDINMEM) */ 398 bool eof_reached; /* reached EOF (needed for cursors) */ 399 400 /* markpos_xxx holds marked position for mark and restore */ 401 long markpos_block; /* tape block# (only used if SORTEDONTAPE) */ 402 int markpos_offset; /* saved "current", or offset in tape block */ 403 bool markpos_eof; /* saved "eof_reached" */ 404 405 /* 406 * These variables are used during parallel sorting. 407 * 408 * worker is our worker identifier. Follows the general convention that 409 * -1 value relates to a leader tuplesort, and values >= 0 worker 410 * tuplesorts. (-1 can also be a serial tuplesort.) 411 * 412 * shared is mutable shared memory state, which is used to coordinate 413 * parallel sorts. 414 * 415 * nParticipants is the number of worker Tuplesortstates known by the 416 * leader to have actually been launched, which implies that they must 417 * finish a run leader can merge. Typically includes a worker state held 418 * by the leader process itself. Set in the leader Tuplesortstate only. 419 */ 420 int worker; 421 Sharedsort *shared; 422 int nParticipants; 423 424 /* 425 * The sortKeys variable is used by every case other than the hash index 426 * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the 427 * MinimalTuple and CLUSTER routines, though. 428 */ 429 TupleDesc tupDesc; 430 SortSupport sortKeys; /* array of length nKeys */ 431 432 /* 433 * This variable is shared by the single-key MinimalTuple case and the 434 * Datum case (which both use qsort_ssup()). Otherwise it's NULL. 435 */ 436 SortSupport onlyKey; 437 438 /* 439 * Additional state for managing "abbreviated key" sortsupport routines 440 * (which currently may be used by all cases except the hash index case). 441 * Tracks the intervals at which the optimization's effectiveness is 442 * tested. 443 */ 444 int64 abbrevNext; /* Tuple # at which to next check 445 * applicability */ 446 447 /* 448 * These variables are specific to the CLUSTER case; they are set by 449 * tuplesort_begin_cluster. 450 */ 451 IndexInfo *indexInfo; /* info about index being used for reference */ 452 EState *estate; /* for evaluating index expressions */ 453 454 /* 455 * These variables are specific to the IndexTuple case; they are set by 456 * tuplesort_begin_index_xxx and used only by the IndexTuple routines. 457 */ 458 Relation heapRel; /* table the index is being built on */ 459 Relation indexRel; /* index being built */ 460 461 /* These are specific to the index_btree subcase: */ 462 bool enforceUnique; /* complain if we find duplicate tuples */ 463 464 /* These are specific to the index_hash subcase: */ 465 uint32 high_mask; /* masks for sortable part of hash code */ 466 uint32 low_mask; 467 uint32 max_buckets; 468 469 /* 470 * These variables are specific to the Datum case; they are set by 471 * tuplesort_begin_datum and used only by the DatumTuple routines. 472 */ 473 Oid datumType; 474 /* we need typelen in order to know how to copy the Datums. */ 475 int datumTypeLen; 476 477 /* 478 * Resource snapshot for time of sort start. 479 */ 480 #ifdef TRACE_SORT 481 PGRUsage ru_start; 482 #endif 483 }; 484 485 /* 486 * Private mutable state of tuplesort-parallel-operation. This is allocated 487 * in shared memory. 488 */ 489 struct Sharedsort 490 { 491 /* mutex protects all fields prior to tapes */ 492 slock_t mutex; 493 494 /* 495 * currentWorker generates ordinal identifier numbers for parallel sort 496 * workers. These start from 0, and are always gapless. 497 * 498 * Workers increment workersFinished to indicate having finished. If this 499 * is equal to state.nParticipants within the leader, leader is ready to 500 * merge worker runs. 501 */ 502 int currentWorker; 503 int workersFinished; 504 505 /* Temporary file space */ 506 SharedFileSet fileset; 507 508 /* Size of tapes flexible array */ 509 int nTapes; 510 511 /* 512 * Tapes array used by workers to report back information needed by the 513 * leader to concatenate all worker tapes into one for merging 514 */ 515 TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]; 516 }; 517 518 /* 519 * Is the given tuple allocated from the slab memory arena? 520 */ 521 #define IS_SLAB_SLOT(state, tuple) \ 522 ((char *) (tuple) >= (state)->slabMemoryBegin && \ 523 (char *) (tuple) < (state)->slabMemoryEnd) 524 525 /* 526 * Return the given tuple to the slab memory free list, or free it 527 * if it was palloc'd. 528 */ 529 #define RELEASE_SLAB_SLOT(state, tuple) \ 530 do { \ 531 SlabSlot *buf = (SlabSlot *) tuple; \ 532 \ 533 if (IS_SLAB_SLOT((state), buf)) \ 534 { \ 535 buf->nextfree = (state)->slabFreeHead; \ 536 (state)->slabFreeHead = buf; \ 537 } else \ 538 pfree(buf); \ 539 } while(0) 540 541 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state)) 542 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) 543 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) 544 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) 545 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) 546 #define USEMEM(state,amt) ((state)->availMem -= (amt)) 547 #define FREEMEM(state,amt) ((state)->availMem += (amt)) 548 #define SERIAL(state) ((state)->shared == NULL) 549 #define WORKER(state) ((state)->shared && (state)->worker != -1) 550 #define LEADER(state) ((state)->shared && (state)->worker == -1) 551 552 /* 553 * NOTES about on-tape representation of tuples: 554 * 555 * We require the first "unsigned int" of a stored tuple to be the total size 556 * on-tape of the tuple, including itself (so it is never zero; an all-zero 557 * unsigned int is used to delimit runs). The remainder of the stored tuple 558 * may or may not match the in-memory representation of the tuple --- 559 * any conversion needed is the job of the writetup and readtup routines. 560 * 561 * If state->randomAccess is true, then the stored representation of the 562 * tuple must be followed by another "unsigned int" that is a copy of the 563 * length --- so the total tape space used is actually sizeof(unsigned int) 564 * more than the stored length value. This allows read-backwards. When 565 * randomAccess is not true, the write/read routines may omit the extra 566 * length word. 567 * 568 * writetup is expected to write both length words as well as the tuple 569 * data. When readtup is called, the tape is positioned just after the 570 * front length word; readtup must read the tuple data and advance past 571 * the back length word (if present). 572 * 573 * The write/read routines can make use of the tuple description data 574 * stored in the Tuplesortstate record, if needed. They are also expected 575 * to adjust state->availMem by the amount of memory space (not tape space!) 576 * released or consumed. There is no error return from either writetup 577 * or readtup; they should ereport() on failure. 578 * 579 * 580 * NOTES about memory consumption calculations: 581 * 582 * We count space allocated for tuples against the workMem limit, plus 583 * the space used by the variable-size memtuples array. Fixed-size space 584 * is not counted; it's small enough to not be interesting. 585 * 586 * Note that we count actual space used (as shown by GetMemoryChunkSpace) 587 * rather than the originally-requested size. This is important since 588 * palloc can add substantial overhead. It's not a complete answer since 589 * we won't count any wasted space in palloc allocation blocks, but it's 590 * a lot better than what we were doing before 7.3. As of 9.6, a 591 * separate memory context is used for caller passed tuples. Resetting 592 * it at certain key increments significantly ameliorates fragmentation. 593 * Note that this places a responsibility on copytup routines to use the 594 * correct memory context for these tuples (and to not use the reset 595 * context for anything whose lifetime needs to span multiple external 596 * sort runs). readtup routines use the slab allocator (they cannot use 597 * the reset context because it gets deleted at the point that merging 598 * begins). 599 */ 600 601 /* When using this macro, beware of double evaluation of len */ 602 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \ 603 do { \ 604 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \ 605 elog(ERROR, "unexpected end of data"); \ 606 } while(0) 607 608 609 static Tuplesortstate *tuplesort_begin_common(int workMem, 610 SortCoordinate coordinate, 611 bool randomAccess); 612 static void tuplesort_begin_batch(Tuplesortstate *state); 613 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); 614 static bool consider_abort_common(Tuplesortstate *state); 615 static void inittapes(Tuplesortstate *state, bool mergeruns); 616 static void inittapestate(Tuplesortstate *state, int maxTapes); 617 static void selectnewtape(Tuplesortstate *state); 618 static void init_slab_allocator(Tuplesortstate *state, int numSlots); 619 static void mergeruns(Tuplesortstate *state); 620 static void mergeonerun(Tuplesortstate *state); 621 static void beginmerge(Tuplesortstate *state); 622 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); 623 static void dumptuples(Tuplesortstate *state, bool alltuples); 624 static void make_bounded_heap(Tuplesortstate *state); 625 static void sort_bounded_heap(Tuplesortstate *state); 626 static void tuplesort_sort_memtuples(Tuplesortstate *state); 627 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); 628 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); 629 static void tuplesort_heap_delete_top(Tuplesortstate *state); 630 static void reversedirection(Tuplesortstate *state); 631 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); 632 static void markrunend(Tuplesortstate *state, int tapenum); 633 static void *readtup_alloc(Tuplesortstate *state, Size tuplen); 634 static int comparetup_heap(const SortTuple *a, const SortTuple *b, 635 Tuplesortstate *state); 636 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); 637 static void writetup_heap(Tuplesortstate *state, int tapenum, 638 SortTuple *stup); 639 static void readtup_heap(Tuplesortstate *state, SortTuple *stup, 640 int tapenum, unsigned int len); 641 static int comparetup_cluster(const SortTuple *a, const SortTuple *b, 642 Tuplesortstate *state); 643 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); 644 static void writetup_cluster(Tuplesortstate *state, int tapenum, 645 SortTuple *stup); 646 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, 647 int tapenum, unsigned int len); 648 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, 649 Tuplesortstate *state); 650 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, 651 Tuplesortstate *state); 652 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); 653 static void writetup_index(Tuplesortstate *state, int tapenum, 654 SortTuple *stup); 655 static void readtup_index(Tuplesortstate *state, SortTuple *stup, 656 int tapenum, unsigned int len); 657 static int comparetup_datum(const SortTuple *a, const SortTuple *b, 658 Tuplesortstate *state); 659 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); 660 static void writetup_datum(Tuplesortstate *state, int tapenum, 661 SortTuple *stup); 662 static void readtup_datum(Tuplesortstate *state, SortTuple *stup, 663 int tapenum, unsigned int len); 664 static int worker_get_identifier(Tuplesortstate *state); 665 static void worker_freeze_result_tape(Tuplesortstate *state); 666 static void worker_nomergeruns(Tuplesortstate *state); 667 static void leader_takeover_tapes(Tuplesortstate *state); 668 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); 669 static void tuplesort_free(Tuplesortstate *state); 670 static void tuplesort_updatemax(Tuplesortstate *state); 671 672 /* 673 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts 674 * any variant of SortTuples, using the appropriate comparetup function. 675 * qsort_ssup() is specialized for the case where the comparetup function 676 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts 677 * and Datum sorts. 678 */ 679 #include "qsort_tuple.c" 680 681 682 /* 683 * tuplesort_begin_xxx 684 * 685 * Initialize for a tuple sort operation. 686 * 687 * After calling tuplesort_begin, the caller should call tuplesort_putXXX 688 * zero or more times, then call tuplesort_performsort when all the tuples 689 * have been supplied. After performsort, retrieve the tuples in sorted 690 * order by calling tuplesort_getXXX until it returns false/NULL. (If random 691 * access was requested, rescan, markpos, and restorepos can also be called.) 692 * Call tuplesort_end to terminate the operation and release memory/disk space. 693 * 694 * Each variant of tuplesort_begin has a workMem parameter specifying the 695 * maximum number of kilobytes of RAM to use before spilling data to disk. 696 * (The normal value of this parameter is work_mem, but some callers use 697 * other values.) Each variant also has a randomAccess parameter specifying 698 * whether the caller needs non-sequential access to the sort result. 699 */ 700 701 static Tuplesortstate * 702 tuplesort_begin_common(int workMem, SortCoordinate coordinate, 703 bool randomAccess) 704 { 705 Tuplesortstate *state; 706 MemoryContext maincontext; 707 MemoryContext sortcontext; 708 MemoryContext oldcontext; 709 710 /* See leader_takeover_tapes() remarks on randomAccess support */ 711 if (coordinate && randomAccess) 712 elog(ERROR, "random access disallowed under parallel sort"); 713 714 /* 715 * Memory context surviving tuplesort_reset. This memory context holds 716 * data which is useful to keep while sorting multiple similar batches. 717 */ 718 maincontext = AllocSetContextCreate(CurrentMemoryContext, 719 "TupleSort main", 720 ALLOCSET_DEFAULT_SIZES); 721 722 /* 723 * Create a working memory context for one sort operation. The content of 724 * this context is deleted by tuplesort_reset. 725 */ 726 sortcontext = AllocSetContextCreate(maincontext, 727 "TupleSort sort", 728 ALLOCSET_DEFAULT_SIZES); 729 730 /* 731 * Additionally a working memory context for tuples is setup in 732 * tuplesort_begin_batch. 733 */ 734 735 /* 736 * Make the Tuplesortstate within the per-sortstate context. This way, we 737 * don't need a separate pfree() operation for it at shutdown. 738 */ 739 oldcontext = MemoryContextSwitchTo(maincontext); 740 741 state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate)); 742 743 #ifdef TRACE_SORT 744 if (trace_sort) 745 pg_rusage_init(&state->ru_start); 746 #endif 747 748 state->randomAccess = randomAccess; 749 state->tuples = true; 750 751 /* 752 * workMem is forced to be at least 64KB, the current minimum valid value 753 * for the work_mem GUC. This is a defense against parallel sort callers 754 * that divide out memory among many workers in a way that leaves each 755 * with very little memory. 756 */ 757 state->allowedMem = Max(workMem, 64) * (int64) 1024; 758 state->sortcontext = sortcontext; 759 state->maincontext = maincontext; 760 761 /* 762 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; 763 * see comments in grow_memtuples(). 764 */ 765 state->memtupsize = INITIAL_MEMTUPSIZE; 766 state->memtuples = NULL; 767 768 /* 769 * After all of the other non-parallel-related state, we setup all of the 770 * state needed for each batch. 771 */ 772 tuplesort_begin_batch(state); 773 774 /* 775 * Initialize parallel-related state based on coordination information 776 * from caller 777 */ 778 if (!coordinate) 779 { 780 /* Serial sort */ 781 state->shared = NULL; 782 state->worker = -1; 783 state->nParticipants = -1; 784 } 785 else if (coordinate->isWorker) 786 { 787 /* Parallel worker produces exactly one final run from all input */ 788 state->shared = coordinate->sharedsort; 789 state->worker = worker_get_identifier(state); 790 state->nParticipants = -1; 791 } 792 else 793 { 794 /* Parallel leader state only used for final merge */ 795 state->shared = coordinate->sharedsort; 796 state->worker = -1; 797 state->nParticipants = coordinate->nParticipants; 798 Assert(state->nParticipants >= 1); 799 } 800 801 MemoryContextSwitchTo(oldcontext); 802 803 return state; 804 } 805 806 /* 807 * tuplesort_begin_batch 808 * 809 * Setup, or reset, all state need for processing a new set of tuples with this 810 * sort state. Called both from tuplesort_begin_common (the first time sorting 811 * with this sort state) and tuplesort_reset (for subsequent usages). 812 */ 813 static void 814 tuplesort_begin_batch(Tuplesortstate *state) 815 { 816 MemoryContext oldcontext; 817 818 oldcontext = MemoryContextSwitchTo(state->maincontext); 819 820 /* 821 * Caller tuple (e.g. IndexTuple) memory context. 822 * 823 * A dedicated child context used exclusively for caller passed tuples 824 * eases memory management. Resetting at key points reduces 825 * fragmentation. Note that the memtuples array of SortTuples is allocated 826 * in the parent context, not this context, because there is no need to 827 * free memtuples early. 828 */ 829 state->tuplecontext = AllocSetContextCreate(state->sortcontext, 830 "Caller tuples", 831 ALLOCSET_DEFAULT_SIZES); 832 833 state->status = TSS_INITIAL; 834 state->bounded = false; 835 state->boundUsed = false; 836 837 state->availMem = state->allowedMem; 838 839 state->tapeset = NULL; 840 841 state->memtupcount = 0; 842 843 /* 844 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; 845 * see comments in grow_memtuples(). 846 */ 847 state->growmemtuples = true; 848 state->slabAllocatorUsed = false; 849 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE) 850 { 851 pfree(state->memtuples); 852 state->memtuples = NULL; 853 state->memtupsize = INITIAL_MEMTUPSIZE; 854 } 855 if (state->memtuples == NULL) 856 { 857 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); 858 USEMEM(state, GetMemoryChunkSpace(state->memtuples)); 859 } 860 861 /* workMem must be large enough for the minimal memtuples array */ 862 if (LACKMEM(state)) 863 elog(ERROR, "insufficient memory allowed for sort"); 864 865 state->currentRun = 0; 866 867 /* 868 * maxTapes, tapeRange, and Algorithm D variables will be initialized by 869 * inittapes(), if needed 870 */ 871 872 state->result_tape = -1; /* flag that result tape has not been formed */ 873 874 MemoryContextSwitchTo(oldcontext); 875 } 876 877 Tuplesortstate * 878 tuplesort_begin_heap(TupleDesc tupDesc, 879 int nkeys, AttrNumber *attNums, 880 Oid *sortOperators, Oid *sortCollations, 881 bool *nullsFirstFlags, 882 int workMem, SortCoordinate coordinate, bool randomAccess) 883 { 884 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 885 randomAccess); 886 MemoryContext oldcontext; 887 int i; 888 889 oldcontext = MemoryContextSwitchTo(state->maincontext); 890 891 AssertArg(nkeys > 0); 892 893 #ifdef TRACE_SORT 894 if (trace_sort) 895 elog(LOG, 896 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", 897 nkeys, workMem, randomAccess ? 't' : 'f'); 898 #endif 899 900 state->nKeys = nkeys; 901 902 TRACE_POSTGRESQL_SORT_START(HEAP_SORT, 903 false, /* no unique check */ 904 nkeys, 905 workMem, 906 randomAccess, 907 PARALLEL_SORT(state)); 908 909 state->comparetup = comparetup_heap; 910 state->copytup = copytup_heap; 911 state->writetup = writetup_heap; 912 state->readtup = readtup_heap; 913 914 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ 915 state->abbrevNext = 10; 916 917 /* Prepare SortSupport data for each column */ 918 state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData)); 919 920 for (i = 0; i < nkeys; i++) 921 { 922 SortSupport sortKey = state->sortKeys + i; 923 924 AssertArg(attNums[i] != 0); 925 AssertArg(sortOperators[i] != 0); 926 927 sortKey->ssup_cxt = CurrentMemoryContext; 928 sortKey->ssup_collation = sortCollations[i]; 929 sortKey->ssup_nulls_first = nullsFirstFlags[i]; 930 sortKey->ssup_attno = attNums[i]; 931 /* Convey if abbreviation optimization is applicable in principle */ 932 sortKey->abbreviate = (i == 0); 933 934 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); 935 } 936 937 /* 938 * The "onlyKey" optimization cannot be used with abbreviated keys, since 939 * tie-breaker comparisons may be required. Typically, the optimization 940 * is only of value to pass-by-value types anyway, whereas abbreviated 941 * keys are typically only of value to pass-by-reference types. 942 */ 943 if (nkeys == 1 && !state->sortKeys->abbrev_converter) 944 state->onlyKey = state->sortKeys; 945 946 MemoryContextSwitchTo(oldcontext); 947 948 return state; 949 } 950 951 Tuplesortstate * 952 tuplesort_begin_cluster(TupleDesc tupDesc, 953 Relation indexRel, 954 int workMem, 955 SortCoordinate coordinate, bool randomAccess) 956 { 957 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 958 randomAccess); 959 BTScanInsert indexScanKey; 960 MemoryContext oldcontext; 961 int i; 962 963 Assert(indexRel->rd_rel->relam == BTREE_AM_OID); 964 965 oldcontext = MemoryContextSwitchTo(state->maincontext); 966 967 #ifdef TRACE_SORT 968 if (trace_sort) 969 elog(LOG, 970 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", 971 RelationGetNumberOfAttributes(indexRel), 972 workMem, randomAccess ? 't' : 'f'); 973 #endif 974 975 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); 976 977 TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT, 978 false, /* no unique check */ 979 state->nKeys, 980 workMem, 981 randomAccess, 982 PARALLEL_SORT(state)); 983 984 state->comparetup = comparetup_cluster; 985 state->copytup = copytup_cluster; 986 state->writetup = writetup_cluster; 987 state->readtup = readtup_cluster; 988 state->abbrevNext = 10; 989 990 state->indexInfo = BuildIndexInfo(indexRel); 991 992 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ 993 994 indexScanKey = _bt_mkscankey(indexRel, NULL); 995 996 if (state->indexInfo->ii_Expressions != NULL) 997 { 998 TupleTableSlot *slot; 999 ExprContext *econtext; 1000 1001 /* 1002 * We will need to use FormIndexDatum to evaluate the index 1003 * expressions. To do that, we need an EState, as well as a 1004 * TupleTableSlot to put the table tuples into. The econtext's 1005 * scantuple has to point to that slot, too. 1006 */ 1007 state->estate = CreateExecutorState(); 1008 slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple); 1009 econtext = GetPerTupleExprContext(state->estate); 1010 econtext->ecxt_scantuple = slot; 1011 } 1012 1013 /* Prepare SortSupport data for each column */ 1014 state->sortKeys = (SortSupport) palloc0(state->nKeys * 1015 sizeof(SortSupportData)); 1016 1017 for (i = 0; i < state->nKeys; i++) 1018 { 1019 SortSupport sortKey = state->sortKeys + i; 1020 ScanKey scanKey = indexScanKey->scankeys + i; 1021 int16 strategy; 1022 1023 sortKey->ssup_cxt = CurrentMemoryContext; 1024 sortKey->ssup_collation = scanKey->sk_collation; 1025 sortKey->ssup_nulls_first = 1026 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; 1027 sortKey->ssup_attno = scanKey->sk_attno; 1028 /* Convey if abbreviation optimization is applicable in principle */ 1029 sortKey->abbreviate = (i == 0); 1030 1031 AssertState(sortKey->ssup_attno != 0); 1032 1033 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? 1034 BTGreaterStrategyNumber : BTLessStrategyNumber; 1035 1036 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); 1037 } 1038 1039 pfree(indexScanKey); 1040 1041 MemoryContextSwitchTo(oldcontext); 1042 1043 return state; 1044 } 1045 1046 Tuplesortstate * 1047 tuplesort_begin_index_btree(Relation heapRel, 1048 Relation indexRel, 1049 bool enforceUnique, 1050 int workMem, 1051 SortCoordinate coordinate, 1052 bool randomAccess) 1053 { 1054 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 1055 randomAccess); 1056 BTScanInsert indexScanKey; 1057 MemoryContext oldcontext; 1058 int i; 1059 1060 oldcontext = MemoryContextSwitchTo(state->maincontext); 1061 1062 #ifdef TRACE_SORT 1063 if (trace_sort) 1064 elog(LOG, 1065 "begin index sort: unique = %c, workMem = %d, randomAccess = %c", 1066 enforceUnique ? 't' : 'f', 1067 workMem, randomAccess ? 't' : 'f'); 1068 #endif 1069 1070 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); 1071 1072 TRACE_POSTGRESQL_SORT_START(INDEX_SORT, 1073 enforceUnique, 1074 state->nKeys, 1075 workMem, 1076 randomAccess, 1077 PARALLEL_SORT(state)); 1078 1079 state->comparetup = comparetup_index_btree; 1080 state->copytup = copytup_index; 1081 state->writetup = writetup_index; 1082 state->readtup = readtup_index; 1083 state->abbrevNext = 10; 1084 1085 state->heapRel = heapRel; 1086 state->indexRel = indexRel; 1087 state->enforceUnique = enforceUnique; 1088 1089 indexScanKey = _bt_mkscankey(indexRel, NULL); 1090 1091 /* Prepare SortSupport data for each column */ 1092 state->sortKeys = (SortSupport) palloc0(state->nKeys * 1093 sizeof(SortSupportData)); 1094 1095 for (i = 0; i < state->nKeys; i++) 1096 { 1097 SortSupport sortKey = state->sortKeys + i; 1098 ScanKey scanKey = indexScanKey->scankeys + i; 1099 int16 strategy; 1100 1101 sortKey->ssup_cxt = CurrentMemoryContext; 1102 sortKey->ssup_collation = scanKey->sk_collation; 1103 sortKey->ssup_nulls_first = 1104 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; 1105 sortKey->ssup_attno = scanKey->sk_attno; 1106 /* Convey if abbreviation optimization is applicable in principle */ 1107 sortKey->abbreviate = (i == 0); 1108 1109 AssertState(sortKey->ssup_attno != 0); 1110 1111 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? 1112 BTGreaterStrategyNumber : BTLessStrategyNumber; 1113 1114 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); 1115 } 1116 1117 pfree(indexScanKey); 1118 1119 MemoryContextSwitchTo(oldcontext); 1120 1121 return state; 1122 } 1123 1124 Tuplesortstate * 1125 tuplesort_begin_index_hash(Relation heapRel, 1126 Relation indexRel, 1127 uint32 high_mask, 1128 uint32 low_mask, 1129 uint32 max_buckets, 1130 int workMem, 1131 SortCoordinate coordinate, 1132 bool randomAccess) 1133 { 1134 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 1135 randomAccess); 1136 MemoryContext oldcontext; 1137 1138 oldcontext = MemoryContextSwitchTo(state->maincontext); 1139 1140 #ifdef TRACE_SORT 1141 if (trace_sort) 1142 elog(LOG, 1143 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, " 1144 "max_buckets = 0x%x, workMem = %d, randomAccess = %c", 1145 high_mask, 1146 low_mask, 1147 max_buckets, 1148 workMem, randomAccess ? 't' : 'f'); 1149 #endif 1150 1151 state->nKeys = 1; /* Only one sort column, the hash code */ 1152 1153 state->comparetup = comparetup_index_hash; 1154 state->copytup = copytup_index; 1155 state->writetup = writetup_index; 1156 state->readtup = readtup_index; 1157 1158 state->heapRel = heapRel; 1159 state->indexRel = indexRel; 1160 1161 state->high_mask = high_mask; 1162 state->low_mask = low_mask; 1163 state->max_buckets = max_buckets; 1164 1165 MemoryContextSwitchTo(oldcontext); 1166 1167 return state; 1168 } 1169 1170 Tuplesortstate * 1171 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, 1172 bool nullsFirstFlag, int workMem, 1173 SortCoordinate coordinate, bool randomAccess) 1174 { 1175 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, 1176 randomAccess); 1177 MemoryContext oldcontext; 1178 int16 typlen; 1179 bool typbyval; 1180 1181 oldcontext = MemoryContextSwitchTo(state->maincontext); 1182 1183 #ifdef TRACE_SORT 1184 if (trace_sort) 1185 elog(LOG, 1186 "begin datum sort: workMem = %d, randomAccess = %c", 1187 workMem, randomAccess ? 't' : 'f'); 1188 #endif 1189 1190 state->nKeys = 1; /* always a one-column sort */ 1191 1192 TRACE_POSTGRESQL_SORT_START(DATUM_SORT, 1193 false, /* no unique check */ 1194 1, 1195 workMem, 1196 randomAccess, 1197 PARALLEL_SORT(state)); 1198 1199 state->comparetup = comparetup_datum; 1200 state->copytup = copytup_datum; 1201 state->writetup = writetup_datum; 1202 state->readtup = readtup_datum; 1203 state->abbrevNext = 10; 1204 1205 state->datumType = datumType; 1206 1207 /* lookup necessary attributes of the datum type */ 1208 get_typlenbyval(datumType, &typlen, &typbyval); 1209 state->datumTypeLen = typlen; 1210 state->tuples = !typbyval; 1211 1212 /* Prepare SortSupport data */ 1213 state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); 1214 1215 state->sortKeys->ssup_cxt = CurrentMemoryContext; 1216 state->sortKeys->ssup_collation = sortCollation; 1217 state->sortKeys->ssup_nulls_first = nullsFirstFlag; 1218 1219 /* 1220 * Abbreviation is possible here only for by-reference types. In theory, 1221 * a pass-by-value datatype could have an abbreviated form that is cheaper 1222 * to compare. In a tuple sort, we could support that, because we can 1223 * always extract the original datum from the tuple as needed. Here, we 1224 * can't, because a datum sort only stores a single copy of the datum; the 1225 * "tuple" field of each SortTuple is NULL. 1226 */ 1227 state->sortKeys->abbreviate = !typbyval; 1228 1229 PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys); 1230 1231 /* 1232 * The "onlyKey" optimization cannot be used with abbreviated keys, since 1233 * tie-breaker comparisons may be required. Typically, the optimization 1234 * is only of value to pass-by-value types anyway, whereas abbreviated 1235 * keys are typically only of value to pass-by-reference types. 1236 */ 1237 if (!state->sortKeys->abbrev_converter) 1238 state->onlyKey = state->sortKeys; 1239 1240 MemoryContextSwitchTo(oldcontext); 1241 1242 return state; 1243 } 1244 1245 /* 1246 * tuplesort_set_bound 1247 * 1248 * Advise tuplesort that at most the first N result tuples are required. 1249 * 1250 * Must be called before inserting any tuples. (Actually, we could allow it 1251 * as long as the sort hasn't spilled to disk, but there seems no need for 1252 * delayed calls at the moment.) 1253 * 1254 * This is a hint only. The tuplesort may still return more tuples than 1255 * requested. Parallel leader tuplesorts will always ignore the hint. 1256 */ 1257 void 1258 tuplesort_set_bound(Tuplesortstate *state, int64 bound) 1259 { 1260 /* Assert we're called before loading any tuples */ 1261 Assert(state->status == TSS_INITIAL && state->memtupcount == 0); 1262 /* Can't set the bound twice, either */ 1263 Assert(!state->bounded); 1264 /* Also, this shouldn't be called in a parallel worker */ 1265 Assert(!WORKER(state)); 1266 1267 /* Parallel leader allows but ignores hint */ 1268 if (LEADER(state)) 1269 return; 1270 1271 #ifdef DEBUG_BOUNDED_SORT 1272 /* Honor GUC setting that disables the feature (for easy testing) */ 1273 if (!optimize_bounded_sort) 1274 return; 1275 #endif 1276 1277 /* We want to be able to compute bound * 2, so limit the setting */ 1278 if (bound > (int64) (INT_MAX / 2)) 1279 return; 1280 1281 state->bounded = true; 1282 state->bound = (int) bound; 1283 1284 /* 1285 * Bounded sorts are not an effective target for abbreviated key 1286 * optimization. Disable by setting state to be consistent with no 1287 * abbreviation support. 1288 */ 1289 state->sortKeys->abbrev_converter = NULL; 1290 if (state->sortKeys->abbrev_full_comparator) 1291 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; 1292 1293 /* Not strictly necessary, but be tidy */ 1294 state->sortKeys->abbrev_abort = NULL; 1295 state->sortKeys->abbrev_full_comparator = NULL; 1296 } 1297 1298 /* 1299 * tuplesort_used_bound 1300 * 1301 * Allow callers to find out if the sort state was able to use a bound. 1302 */ 1303 bool 1304 tuplesort_used_bound(Tuplesortstate *state) 1305 { 1306 return state->boundUsed; 1307 } 1308 1309 /* 1310 * tuplesort_free 1311 * 1312 * Internal routine for freeing resources of tuplesort. 1313 */ 1314 static void 1315 tuplesort_free(Tuplesortstate *state) 1316 { 1317 /* context swap probably not needed, but let's be safe */ 1318 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1319 1320 #ifdef TRACE_SORT 1321 long spaceUsed; 1322 1323 if (state->tapeset) 1324 spaceUsed = LogicalTapeSetBlocks(state->tapeset); 1325 else 1326 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; 1327 #endif 1328 1329 /* 1330 * Delete temporary "tape" files, if any. 1331 * 1332 * Note: want to include this in reported total cost of sort, hence need 1333 * for two #ifdef TRACE_SORT sections. 1334 */ 1335 if (state->tapeset) 1336 LogicalTapeSetClose(state->tapeset); 1337 1338 #ifdef TRACE_SORT 1339 if (trace_sort) 1340 { 1341 if (state->tapeset) 1342 elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s", 1343 SERIAL(state) ? "external sort" : "parallel external sort", 1344 state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); 1345 else 1346 elog(LOG, "%s of worker %d ended, %ld KB used: %s", 1347 SERIAL(state) ? "internal sort" : "unperformed parallel sort", 1348 state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); 1349 } 1350 1351 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed); 1352 #else 1353 1354 /* 1355 * If you disabled TRACE_SORT, you can still probe sort__done, but you 1356 * ain't getting space-used stats. 1357 */ 1358 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L); 1359 #endif 1360 1361 /* Free any execution state created for CLUSTER case */ 1362 if (state->estate != NULL) 1363 { 1364 ExprContext *econtext = GetPerTupleExprContext(state->estate); 1365 1366 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple); 1367 FreeExecutorState(state->estate); 1368 } 1369 1370 MemoryContextSwitchTo(oldcontext); 1371 1372 /* 1373 * Free the per-sort memory context, thereby releasing all working memory. 1374 */ 1375 MemoryContextReset(state->sortcontext); 1376 } 1377 1378 /* 1379 * tuplesort_end 1380 * 1381 * Release resources and clean up. 1382 * 1383 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are 1384 * pointing to garbage. Be careful not to attempt to use or free such 1385 * pointers afterwards! 1386 */ 1387 void 1388 tuplesort_end(Tuplesortstate *state) 1389 { 1390 tuplesort_free(state); 1391 1392 /* 1393 * Free the main memory context, including the Tuplesortstate struct 1394 * itself. 1395 */ 1396 MemoryContextDelete(state->maincontext); 1397 } 1398 1399 /* 1400 * tuplesort_updatemax 1401 * 1402 * Update maximum resource usage statistics. 1403 */ 1404 static void 1405 tuplesort_updatemax(Tuplesortstate *state) 1406 { 1407 int64 spaceUsed; 1408 bool isSpaceDisk; 1409 1410 /* 1411 * Note: it might seem we should provide both memory and disk usage for a 1412 * disk-based sort. However, the current code doesn't track memory space 1413 * accurately once we have begun to return tuples to the caller (since we 1414 * don't account for pfree's the caller is expected to do), so we cannot 1415 * rely on availMem in a disk sort. This does not seem worth the overhead 1416 * to fix. Is it worth creating an API for the memory context code to 1417 * tell us how much is actually used in sortcontext? 1418 */ 1419 if (state->tapeset) 1420 { 1421 isSpaceDisk = true; 1422 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ; 1423 } 1424 else 1425 { 1426 isSpaceDisk = false; 1427 spaceUsed = state->allowedMem - state->availMem; 1428 } 1429 1430 /* 1431 * Sort evicts data to the disk when it wasn't able to fit that data into 1432 * main memory. This is why we assume space used on the disk to be more 1433 * important for tracking resource usage than space used in memory. Note 1434 * that the amount of space occupied by some tupleset on the disk might be 1435 * less than amount of space occupied by the same tupleset in memory due 1436 * to more compact representation. 1437 */ 1438 if ((isSpaceDisk && !state->isMaxSpaceDisk) || 1439 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace)) 1440 { 1441 state->maxSpace = spaceUsed; 1442 state->isMaxSpaceDisk = isSpaceDisk; 1443 state->maxSpaceStatus = state->status; 1444 } 1445 } 1446 1447 /* 1448 * tuplesort_reset 1449 * 1450 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the 1451 * meta-information in. After tuplesort_reset, tuplesort is ready to start 1452 * a new sort. This allows avoiding recreation of tuple sort states (and 1453 * save resources) when sorting multiple small batches. 1454 */ 1455 void 1456 tuplesort_reset(Tuplesortstate *state) 1457 { 1458 tuplesort_updatemax(state); 1459 tuplesort_free(state); 1460 1461 /* 1462 * After we've freed up per-batch memory, re-setup all of the state common 1463 * to both the first batch and any subsequent batch. 1464 */ 1465 tuplesort_begin_batch(state); 1466 1467 state->lastReturnedTuple = NULL; 1468 state->slabMemoryBegin = NULL; 1469 state->slabMemoryEnd = NULL; 1470 state->slabFreeHead = NULL; 1471 } 1472 1473 /* 1474 * Grow the memtuples[] array, if possible within our memory constraint. We 1475 * must not exceed INT_MAX tuples in memory or the caller-provided memory 1476 * limit. Return true if we were able to enlarge the array, false if not. 1477 * 1478 * Normally, at each increment we double the size of the array. When doing 1479 * that would exceed a limit, we attempt one last, smaller increase (and then 1480 * clear the growmemtuples flag so we don't try any more). That allows us to 1481 * use memory as fully as permitted; sticking to the pure doubling rule could 1482 * result in almost half going unused. Because availMem moves around with 1483 * tuple addition/removal, we need some rule to prevent making repeated small 1484 * increases in memtupsize, which would just be useless thrashing. The 1485 * growmemtuples flag accomplishes that and also prevents useless 1486 * recalculations in this function. 1487 */ 1488 static bool 1489 grow_memtuples(Tuplesortstate *state) 1490 { 1491 int newmemtupsize; 1492 int memtupsize = state->memtupsize; 1493 int64 memNowUsed = state->allowedMem - state->availMem; 1494 1495 /* Forget it if we've already maxed out memtuples, per comment above */ 1496 if (!state->growmemtuples) 1497 return false; 1498 1499 /* Select new value of memtupsize */ 1500 if (memNowUsed <= state->availMem) 1501 { 1502 /* 1503 * We've used no more than half of allowedMem; double our usage, 1504 * clamping at INT_MAX tuples. 1505 */ 1506 if (memtupsize < INT_MAX / 2) 1507 newmemtupsize = memtupsize * 2; 1508 else 1509 { 1510 newmemtupsize = INT_MAX; 1511 state->growmemtuples = false; 1512 } 1513 } 1514 else 1515 { 1516 /* 1517 * This will be the last increment of memtupsize. Abandon doubling 1518 * strategy and instead increase as much as we safely can. 1519 * 1520 * To stay within allowedMem, we can't increase memtupsize by more 1521 * than availMem / sizeof(SortTuple) elements. In practice, we want 1522 * to increase it by considerably less, because we need to leave some 1523 * space for the tuples to which the new array slots will refer. We 1524 * assume the new tuples will be about the same size as the tuples 1525 * we've already seen, and thus we can extrapolate from the space 1526 * consumption so far to estimate an appropriate new size for the 1527 * memtuples array. The optimal value might be higher or lower than 1528 * this estimate, but it's hard to know that in advance. We again 1529 * clamp at INT_MAX tuples. 1530 * 1531 * This calculation is safe against enlarging the array so much that 1532 * LACKMEM becomes true, because the memory currently used includes 1533 * the present array; thus, there would be enough allowedMem for the 1534 * new array elements even if no other memory were currently used. 1535 * 1536 * We do the arithmetic in float8, because otherwise the product of 1537 * memtupsize and allowedMem could overflow. Any inaccuracy in the 1538 * result should be insignificant; but even if we computed a 1539 * completely insane result, the checks below will prevent anything 1540 * really bad from happening. 1541 */ 1542 double grow_ratio; 1543 1544 grow_ratio = (double) state->allowedMem / (double) memNowUsed; 1545 if (memtupsize * grow_ratio < INT_MAX) 1546 newmemtupsize = (int) (memtupsize * grow_ratio); 1547 else 1548 newmemtupsize = INT_MAX; 1549 1550 /* We won't make any further enlargement attempts */ 1551 state->growmemtuples = false; 1552 } 1553 1554 /* Must enlarge array by at least one element, else report failure */ 1555 if (newmemtupsize <= memtupsize) 1556 goto noalloc; 1557 1558 /* 1559 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp 1560 * to ensure our request won't be rejected. Note that we can easily 1561 * exhaust address space before facing this outcome. (This is presently 1562 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but 1563 * don't rely on that at this distance.) 1564 */ 1565 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple)) 1566 { 1567 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple)); 1568 state->growmemtuples = false; /* can't grow any more */ 1569 } 1570 1571 /* 1572 * We need to be sure that we do not cause LACKMEM to become true, else 1573 * the space management algorithm will go nuts. The code above should 1574 * never generate a dangerous request, but to be safe, check explicitly 1575 * that the array growth fits within availMem. (We could still cause 1576 * LACKMEM if the memory chunk overhead associated with the memtuples 1577 * array were to increase. That shouldn't happen because we chose the 1578 * initial array size large enough to ensure that palloc will be treating 1579 * both old and new arrays as separate chunks. But we'll check LACKMEM 1580 * explicitly below just in case.) 1581 */ 1582 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple))) 1583 goto noalloc; 1584 1585 /* OK, do it */ 1586 FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); 1587 state->memtupsize = newmemtupsize; 1588 state->memtuples = (SortTuple *) 1589 repalloc_huge(state->memtuples, 1590 state->memtupsize * sizeof(SortTuple)); 1591 USEMEM(state, GetMemoryChunkSpace(state->memtuples)); 1592 if (LACKMEM(state)) 1593 elog(ERROR, "unexpected out-of-memory situation in tuplesort"); 1594 return true; 1595 1596 noalloc: 1597 /* If for any reason we didn't realloc, shut off future attempts */ 1598 state->growmemtuples = false; 1599 return false; 1600 } 1601 1602 /* 1603 * Accept one tuple while collecting input data for sort. 1604 * 1605 * Note that the input data is always copied; the caller need not save it. 1606 */ 1607 void 1608 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot) 1609 { 1610 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1611 SortTuple stup; 1612 1613 /* 1614 * Copy the given tuple into memory we control, and decrease availMem. 1615 * Then call the common code. 1616 */ 1617 COPYTUP(state, &stup, (void *) slot); 1618 1619 puttuple_common(state, &stup); 1620 1621 MemoryContextSwitchTo(oldcontext); 1622 } 1623 1624 /* 1625 * Accept one tuple while collecting input data for sort. 1626 * 1627 * Note that the input data is always copied; the caller need not save it. 1628 */ 1629 void 1630 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup) 1631 { 1632 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1633 SortTuple stup; 1634 1635 /* 1636 * Copy the given tuple into memory we control, and decrease availMem. 1637 * Then call the common code. 1638 */ 1639 COPYTUP(state, &stup, (void *) tup); 1640 1641 puttuple_common(state, &stup); 1642 1643 MemoryContextSwitchTo(oldcontext); 1644 } 1645 1646 /* 1647 * Collect one index tuple while collecting input data for sort, building 1648 * it from caller-supplied values. 1649 */ 1650 void 1651 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, 1652 ItemPointer self, Datum *values, 1653 bool *isnull) 1654 { 1655 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 1656 SortTuple stup; 1657 Datum original; 1658 IndexTuple tuple; 1659 1660 stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull); 1661 tuple = ((IndexTuple) stup.tuple); 1662 tuple->t_tid = *self; 1663 USEMEM(state, GetMemoryChunkSpace(stup.tuple)); 1664 /* set up first-column key value */ 1665 original = index_getattr(tuple, 1666 1, 1667 RelationGetDescr(state->indexRel), 1668 &stup.isnull1); 1669 1670 MemoryContextSwitchTo(state->sortcontext); 1671 1672 if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) 1673 { 1674 /* 1675 * Store ordinary Datum representation, or NULL value. If there is a 1676 * converter it won't expect NULL values, and cost model is not 1677 * required to account for NULL, so in that case we avoid calling 1678 * converter and just set datum1 to zeroed representation (to be 1679 * consistent, and to support cheap inequality tests for NULL 1680 * abbreviated keys). 1681 */ 1682 stup.datum1 = original; 1683 } 1684 else if (!consider_abort_common(state)) 1685 { 1686 /* Store abbreviated key representation */ 1687 stup.datum1 = state->sortKeys->abbrev_converter(original, 1688 state->sortKeys); 1689 } 1690 else 1691 { 1692 /* Abort abbreviation */ 1693 int i; 1694 1695 stup.datum1 = original; 1696 1697 /* 1698 * Set state to be consistent with never trying abbreviation. 1699 * 1700 * Alter datum1 representation in already-copied tuples, so as to 1701 * ensure a consistent representation (current tuple was just 1702 * handled). It does not matter if some dumped tuples are already 1703 * sorted on tape, since serialized tuples lack abbreviated keys 1704 * (TSS_BUILDRUNS state prevents control reaching here in any case). 1705 */ 1706 for (i = 0; i < state->memtupcount; i++) 1707 { 1708 SortTuple *mtup = &state->memtuples[i]; 1709 1710 tuple = mtup->tuple; 1711 mtup->datum1 = index_getattr(tuple, 1712 1, 1713 RelationGetDescr(state->indexRel), 1714 &mtup->isnull1); 1715 } 1716 } 1717 1718 puttuple_common(state, &stup); 1719 1720 MemoryContextSwitchTo(oldcontext); 1721 } 1722 1723 /* 1724 * Accept one Datum while collecting input data for sort. 1725 * 1726 * If the Datum is pass-by-ref type, the value will be copied. 1727 */ 1728 void 1729 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) 1730 { 1731 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 1732 SortTuple stup; 1733 1734 /* 1735 * Pass-by-value types or null values are just stored directly in 1736 * stup.datum1 (and stup.tuple is not used and set to NULL). 1737 * 1738 * Non-null pass-by-reference values need to be copied into memory we 1739 * control, and possibly abbreviated. The copied value is pointed to by 1740 * stup.tuple and is treated as the canonical copy (e.g. to return via 1741 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the 1742 * abbreviated value if abbreviation is happening, otherwise it's 1743 * identical to stup.tuple. 1744 */ 1745 1746 if (isNull || !state->tuples) 1747 { 1748 /* 1749 * Set datum1 to zeroed representation for NULLs (to be consistent, 1750 * and to support cheap inequality tests for NULL abbreviated keys). 1751 */ 1752 stup.datum1 = !isNull ? val : (Datum) 0; 1753 stup.isnull1 = isNull; 1754 stup.tuple = NULL; /* no separate storage */ 1755 MemoryContextSwitchTo(state->sortcontext); 1756 } 1757 else 1758 { 1759 Datum original = datumCopy(val, false, state->datumTypeLen); 1760 1761 stup.isnull1 = false; 1762 stup.tuple = DatumGetPointer(original); 1763 USEMEM(state, GetMemoryChunkSpace(stup.tuple)); 1764 MemoryContextSwitchTo(state->sortcontext); 1765 1766 if (!state->sortKeys->abbrev_converter) 1767 { 1768 stup.datum1 = original; 1769 } 1770 else if (!consider_abort_common(state)) 1771 { 1772 /* Store abbreviated key representation */ 1773 stup.datum1 = state->sortKeys->abbrev_converter(original, 1774 state->sortKeys); 1775 } 1776 else 1777 { 1778 /* Abort abbreviation */ 1779 int i; 1780 1781 stup.datum1 = original; 1782 1783 /* 1784 * Set state to be consistent with never trying abbreviation. 1785 * 1786 * Alter datum1 representation in already-copied tuples, so as to 1787 * ensure a consistent representation (current tuple was just 1788 * handled). It does not matter if some dumped tuples are already 1789 * sorted on tape, since serialized tuples lack abbreviated keys 1790 * (TSS_BUILDRUNS state prevents control reaching here in any 1791 * case). 1792 */ 1793 for (i = 0; i < state->memtupcount; i++) 1794 { 1795 SortTuple *mtup = &state->memtuples[i]; 1796 1797 mtup->datum1 = PointerGetDatum(mtup->tuple); 1798 } 1799 } 1800 } 1801 1802 puttuple_common(state, &stup); 1803 1804 MemoryContextSwitchTo(oldcontext); 1805 } 1806 1807 /* 1808 * Shared code for tuple and datum cases. 1809 */ 1810 static void 1811 puttuple_common(Tuplesortstate *state, SortTuple *tuple) 1812 { 1813 Assert(!LEADER(state)); 1814 1815 switch (state->status) 1816 { 1817 case TSS_INITIAL: 1818 1819 /* 1820 * Save the tuple into the unsorted array. First, grow the array 1821 * as needed. Note that we try to grow the array when there is 1822 * still one free slot remaining --- if we fail, there'll still be 1823 * room to store the incoming tuple, and then we'll switch to 1824 * tape-based operation. 1825 */ 1826 if (state->memtupcount >= state->memtupsize - 1) 1827 { 1828 (void) grow_memtuples(state); 1829 Assert(state->memtupcount < state->memtupsize); 1830 } 1831 state->memtuples[state->memtupcount++] = *tuple; 1832 1833 /* 1834 * Check if it's time to switch over to a bounded heapsort. We do 1835 * so if the input tuple count exceeds twice the desired tuple 1836 * count (this is a heuristic for where heapsort becomes cheaper 1837 * than a quicksort), or if we've just filled workMem and have 1838 * enough tuples to meet the bound. 1839 * 1840 * Note that once we enter TSS_BOUNDED state we will always try to 1841 * complete the sort that way. In the worst case, if later input 1842 * tuples are larger than earlier ones, this might cause us to 1843 * exceed workMem significantly. 1844 */ 1845 if (state->bounded && 1846 (state->memtupcount > state->bound * 2 || 1847 (state->memtupcount > state->bound && LACKMEM(state)))) 1848 { 1849 #ifdef TRACE_SORT 1850 if (trace_sort) 1851 elog(LOG, "switching to bounded heapsort at %d tuples: %s", 1852 state->memtupcount, 1853 pg_rusage_show(&state->ru_start)); 1854 #endif 1855 make_bounded_heap(state); 1856 return; 1857 } 1858 1859 /* 1860 * Done if we still fit in available memory and have array slots. 1861 */ 1862 if (state->memtupcount < state->memtupsize && !LACKMEM(state)) 1863 return; 1864 1865 /* 1866 * Nope; time to switch to tape-based operation. 1867 */ 1868 inittapes(state, true); 1869 1870 /* 1871 * Dump all tuples. 1872 */ 1873 dumptuples(state, false); 1874 break; 1875 1876 case TSS_BOUNDED: 1877 1878 /* 1879 * We don't want to grow the array here, so check whether the new 1880 * tuple can be discarded before putting it in. This should be a 1881 * good speed optimization, too, since when there are many more 1882 * input tuples than the bound, most input tuples can be discarded 1883 * with just this one comparison. Note that because we currently 1884 * have the sort direction reversed, we must check for <= not >=. 1885 */ 1886 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0) 1887 { 1888 /* new tuple <= top of the heap, so we can discard it */ 1889 free_sort_tuple(state, tuple); 1890 CHECK_FOR_INTERRUPTS(); 1891 } 1892 else 1893 { 1894 /* discard top of heap, replacing it with the new tuple */ 1895 free_sort_tuple(state, &state->memtuples[0]); 1896 tuplesort_heap_replace_top(state, tuple); 1897 } 1898 break; 1899 1900 case TSS_BUILDRUNS: 1901 1902 /* 1903 * Save the tuple into the unsorted array (there must be space) 1904 */ 1905 state->memtuples[state->memtupcount++] = *tuple; 1906 1907 /* 1908 * If we are over the memory limit, dump all tuples. 1909 */ 1910 dumptuples(state, false); 1911 break; 1912 1913 default: 1914 elog(ERROR, "invalid tuplesort state"); 1915 break; 1916 } 1917 } 1918 1919 static bool 1920 consider_abort_common(Tuplesortstate *state) 1921 { 1922 Assert(state->sortKeys[0].abbrev_converter != NULL); 1923 Assert(state->sortKeys[0].abbrev_abort != NULL); 1924 Assert(state->sortKeys[0].abbrev_full_comparator != NULL); 1925 1926 /* 1927 * Check effectiveness of abbreviation optimization. Consider aborting 1928 * when still within memory limit. 1929 */ 1930 if (state->status == TSS_INITIAL && 1931 state->memtupcount >= state->abbrevNext) 1932 { 1933 state->abbrevNext *= 2; 1934 1935 /* 1936 * Check opclass-supplied abbreviation abort routine. It may indicate 1937 * that abbreviation should not proceed. 1938 */ 1939 if (!state->sortKeys->abbrev_abort(state->memtupcount, 1940 state->sortKeys)) 1941 return false; 1942 1943 /* 1944 * Finally, restore authoritative comparator, and indicate that 1945 * abbreviation is not in play by setting abbrev_converter to NULL 1946 */ 1947 state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator; 1948 state->sortKeys[0].abbrev_converter = NULL; 1949 /* Not strictly necessary, but be tidy */ 1950 state->sortKeys[0].abbrev_abort = NULL; 1951 state->sortKeys[0].abbrev_full_comparator = NULL; 1952 1953 /* Give up - expect original pass-by-value representation */ 1954 return true; 1955 } 1956 1957 return false; 1958 } 1959 1960 /* 1961 * All tuples have been provided; finish the sort. 1962 */ 1963 void 1964 tuplesort_performsort(Tuplesortstate *state) 1965 { 1966 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 1967 1968 #ifdef TRACE_SORT 1969 if (trace_sort) 1970 elog(LOG, "performsort of worker %d starting: %s", 1971 state->worker, pg_rusage_show(&state->ru_start)); 1972 #endif 1973 1974 switch (state->status) 1975 { 1976 case TSS_INITIAL: 1977 1978 /* 1979 * We were able to accumulate all the tuples within the allowed 1980 * amount of memory, or leader to take over worker tapes 1981 */ 1982 if (SERIAL(state)) 1983 { 1984 /* Just qsort 'em and we're done */ 1985 tuplesort_sort_memtuples(state); 1986 state->status = TSS_SORTEDINMEM; 1987 } 1988 else if (WORKER(state)) 1989 { 1990 /* 1991 * Parallel workers must still dump out tuples to tape. No 1992 * merge is required to produce single output run, though. 1993 */ 1994 inittapes(state, false); 1995 dumptuples(state, true); 1996 worker_nomergeruns(state); 1997 state->status = TSS_SORTEDONTAPE; 1998 } 1999 else 2000 { 2001 /* 2002 * Leader will take over worker tapes and merge worker runs. 2003 * Note that mergeruns sets the correct state->status. 2004 */ 2005 leader_takeover_tapes(state); 2006 mergeruns(state); 2007 } 2008 state->current = 0; 2009 state->eof_reached = false; 2010 state->markpos_block = 0L; 2011 state->markpos_offset = 0; 2012 state->markpos_eof = false; 2013 break; 2014 2015 case TSS_BOUNDED: 2016 2017 /* 2018 * We were able to accumulate all the tuples required for output 2019 * in memory, using a heap to eliminate excess tuples. Now we 2020 * have to transform the heap to a properly-sorted array. 2021 */ 2022 sort_bounded_heap(state); 2023 state->current = 0; 2024 state->eof_reached = false; 2025 state->markpos_offset = 0; 2026 state->markpos_eof = false; 2027 state->status = TSS_SORTEDINMEM; 2028 break; 2029 2030 case TSS_BUILDRUNS: 2031 2032 /* 2033 * Finish tape-based sort. First, flush all tuples remaining in 2034 * memory out to tape; then merge until we have a single remaining 2035 * run (or, if !randomAccess and !WORKER(), one run per tape). 2036 * Note that mergeruns sets the correct state->status. 2037 */ 2038 dumptuples(state, true); 2039 mergeruns(state); 2040 state->eof_reached = false; 2041 state->markpos_block = 0L; 2042 state->markpos_offset = 0; 2043 state->markpos_eof = false; 2044 break; 2045 2046 default: 2047 elog(ERROR, "invalid tuplesort state"); 2048 break; 2049 } 2050 2051 #ifdef TRACE_SORT 2052 if (trace_sort) 2053 { 2054 if (state->status == TSS_FINALMERGE) 2055 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s", 2056 state->worker, state->activeTapes, 2057 pg_rusage_show(&state->ru_start)); 2058 else 2059 elog(LOG, "performsort of worker %d done: %s", 2060 state->worker, pg_rusage_show(&state->ru_start)); 2061 } 2062 #endif 2063 2064 MemoryContextSwitchTo(oldcontext); 2065 } 2066 2067 /* 2068 * Internal routine to fetch the next tuple in either forward or back 2069 * direction into *stup. Returns false if no more tuples. 2070 * Returned tuple belongs to tuplesort memory context, and must not be freed 2071 * by caller. Note that fetched tuple is stored in memory that may be 2072 * recycled by any future fetch. 2073 */ 2074 static bool 2075 tuplesort_gettuple_common(Tuplesortstate *state, bool forward, 2076 SortTuple *stup) 2077 { 2078 unsigned int tuplen; 2079 size_t nmoved; 2080 2081 Assert(!WORKER(state)); 2082 2083 switch (state->status) 2084 { 2085 case TSS_SORTEDINMEM: 2086 Assert(forward || state->randomAccess); 2087 Assert(!state->slabAllocatorUsed); 2088 if (forward) 2089 { 2090 if (state->current < state->memtupcount) 2091 { 2092 *stup = state->memtuples[state->current++]; 2093 return true; 2094 } 2095 state->eof_reached = true; 2096 2097 /* 2098 * Complain if caller tries to retrieve more tuples than 2099 * originally asked for in a bounded sort. This is because 2100 * returning EOF here might be the wrong thing. 2101 */ 2102 if (state->bounded && state->current >= state->bound) 2103 elog(ERROR, "retrieved too many tuples in a bounded sort"); 2104 2105 return false; 2106 } 2107 else 2108 { 2109 if (state->current <= 0) 2110 return false; 2111 2112 /* 2113 * if all tuples are fetched already then we return last 2114 * tuple, else - tuple before last returned. 2115 */ 2116 if (state->eof_reached) 2117 state->eof_reached = false; 2118 else 2119 { 2120 state->current--; /* last returned tuple */ 2121 if (state->current <= 0) 2122 return false; 2123 } 2124 *stup = state->memtuples[state->current - 1]; 2125 return true; 2126 } 2127 break; 2128 2129 case TSS_SORTEDONTAPE: 2130 Assert(forward || state->randomAccess); 2131 Assert(state->slabAllocatorUsed); 2132 2133 /* 2134 * The slot that held the tuple that we returned in previous 2135 * gettuple call can now be reused. 2136 */ 2137 if (state->lastReturnedTuple) 2138 { 2139 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); 2140 state->lastReturnedTuple = NULL; 2141 } 2142 2143 if (forward) 2144 { 2145 if (state->eof_reached) 2146 return false; 2147 2148 if ((tuplen = getlen(state, state->result_tape, true)) != 0) 2149 { 2150 READTUP(state, stup, state->result_tape, tuplen); 2151 2152 /* 2153 * Remember the tuple we return, so that we can recycle 2154 * its memory on next call. (This can be NULL, in the 2155 * !state->tuples case). 2156 */ 2157 state->lastReturnedTuple = stup->tuple; 2158 2159 return true; 2160 } 2161 else 2162 { 2163 state->eof_reached = true; 2164 return false; 2165 } 2166 } 2167 2168 /* 2169 * Backward. 2170 * 2171 * if all tuples are fetched already then we return last tuple, 2172 * else - tuple before last returned. 2173 */ 2174 if (state->eof_reached) 2175 { 2176 /* 2177 * Seek position is pointing just past the zero tuplen at the 2178 * end of file; back up to fetch last tuple's ending length 2179 * word. If seek fails we must have a completely empty file. 2180 */ 2181 nmoved = LogicalTapeBackspace(state->tapeset, 2182 state->result_tape, 2183 2 * sizeof(unsigned int)); 2184 if (nmoved == 0) 2185 return false; 2186 else if (nmoved != 2 * sizeof(unsigned int)) 2187 elog(ERROR, "unexpected tape position"); 2188 state->eof_reached = false; 2189 } 2190 else 2191 { 2192 /* 2193 * Back up and fetch previously-returned tuple's ending length 2194 * word. If seek fails, assume we are at start of file. 2195 */ 2196 nmoved = LogicalTapeBackspace(state->tapeset, 2197 state->result_tape, 2198 sizeof(unsigned int)); 2199 if (nmoved == 0) 2200 return false; 2201 else if (nmoved != sizeof(unsigned int)) 2202 elog(ERROR, "unexpected tape position"); 2203 tuplen = getlen(state, state->result_tape, false); 2204 2205 /* 2206 * Back up to get ending length word of tuple before it. 2207 */ 2208 nmoved = LogicalTapeBackspace(state->tapeset, 2209 state->result_tape, 2210 tuplen + 2 * sizeof(unsigned int)); 2211 if (nmoved == tuplen + sizeof(unsigned int)) 2212 { 2213 /* 2214 * We backed up over the previous tuple, but there was no 2215 * ending length word before it. That means that the prev 2216 * tuple is the first tuple in the file. It is now the 2217 * next to read in forward direction (not obviously right, 2218 * but that is what in-memory case does). 2219 */ 2220 return false; 2221 } 2222 else if (nmoved != tuplen + 2 * sizeof(unsigned int)) 2223 elog(ERROR, "bogus tuple length in backward scan"); 2224 } 2225 2226 tuplen = getlen(state, state->result_tape, false); 2227 2228 /* 2229 * Now we have the length of the prior tuple, back up and read it. 2230 * Note: READTUP expects we are positioned after the initial 2231 * length word of the tuple, so back up to that point. 2232 */ 2233 nmoved = LogicalTapeBackspace(state->tapeset, 2234 state->result_tape, 2235 tuplen); 2236 if (nmoved != tuplen) 2237 elog(ERROR, "bogus tuple length in backward scan"); 2238 READTUP(state, stup, state->result_tape, tuplen); 2239 2240 /* 2241 * Remember the tuple we return, so that we can recycle its memory 2242 * on next call. (This can be NULL, in the Datum case). 2243 */ 2244 state->lastReturnedTuple = stup->tuple; 2245 2246 return true; 2247 2248 case TSS_FINALMERGE: 2249 Assert(forward); 2250 /* We are managing memory ourselves, with the slab allocator. */ 2251 Assert(state->slabAllocatorUsed); 2252 2253 /* 2254 * The slab slot holding the tuple that we returned in previous 2255 * gettuple call can now be reused. 2256 */ 2257 if (state->lastReturnedTuple) 2258 { 2259 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); 2260 state->lastReturnedTuple = NULL; 2261 } 2262 2263 /* 2264 * This code should match the inner loop of mergeonerun(). 2265 */ 2266 if (state->memtupcount > 0) 2267 { 2268 int srcTape = state->memtuples[0].srctape; 2269 SortTuple newtup; 2270 2271 *stup = state->memtuples[0]; 2272 2273 /* 2274 * Remember the tuple we return, so that we can recycle its 2275 * memory on next call. (This can be NULL, in the Datum case). 2276 */ 2277 state->lastReturnedTuple = stup->tuple; 2278 2279 /* 2280 * Pull next tuple from tape, and replace the returned tuple 2281 * at top of the heap with it. 2282 */ 2283 if (!mergereadnext(state, srcTape, &newtup)) 2284 { 2285 /* 2286 * If no more data, we've reached end of run on this tape. 2287 * Remove the top node from the heap. 2288 */ 2289 tuplesort_heap_delete_top(state); 2290 2291 /* 2292 * Rewind to free the read buffer. It'd go away at the 2293 * end of the sort anyway, but better to release the 2294 * memory early. 2295 */ 2296 LogicalTapeRewindForWrite(state->tapeset, srcTape); 2297 return true; 2298 } 2299 newtup.srctape = srcTape; 2300 tuplesort_heap_replace_top(state, &newtup); 2301 return true; 2302 } 2303 return false; 2304 2305 default: 2306 elog(ERROR, "invalid tuplesort state"); 2307 return false; /* keep compiler quiet */ 2308 } 2309 } 2310 2311 /* 2312 * Fetch the next tuple in either forward or back direction. 2313 * If successful, put tuple in slot and return true; else, clear the slot 2314 * and return false. 2315 * 2316 * Caller may optionally be passed back abbreviated value (on true return 2317 * value) when abbreviation was used, which can be used to cheaply avoid 2318 * equality checks that might otherwise be required. Caller can safely make a 2319 * determination of "non-equal tuple" based on simple binary inequality. A 2320 * NULL value in leading attribute will set abbreviated value to zeroed 2321 * representation, which caller may rely on in abbreviated inequality check. 2322 * 2323 * If copy is true, the slot receives a tuple that's been copied into the 2324 * caller's memory context, so that it will stay valid regardless of future 2325 * manipulations of the tuplesort's state (up to and including deleting the 2326 * tuplesort). If copy is false, the slot will just receive a pointer to a 2327 * tuple held within the tuplesort, which is more efficient, but only safe for 2328 * callers that are prepared to have any subsequent manipulation of the 2329 * tuplesort's state invalidate slot contents. 2330 */ 2331 bool 2332 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, 2333 TupleTableSlot *slot, Datum *abbrev) 2334 { 2335 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2336 SortTuple stup; 2337 2338 if (!tuplesort_gettuple_common(state, forward, &stup)) 2339 stup.tuple = NULL; 2340 2341 MemoryContextSwitchTo(oldcontext); 2342 2343 if (stup.tuple) 2344 { 2345 /* Record abbreviated key for caller */ 2346 if (state->sortKeys->abbrev_converter && abbrev) 2347 *abbrev = stup.datum1; 2348 2349 if (copy) 2350 stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple); 2351 2352 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy); 2353 return true; 2354 } 2355 else 2356 { 2357 ExecClearTuple(slot); 2358 return false; 2359 } 2360 } 2361 2362 /* 2363 * Fetch the next tuple in either forward or back direction. 2364 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory 2365 * context, and must not be freed by caller. Caller may not rely on tuple 2366 * remaining valid after any further manipulation of tuplesort. 2367 */ 2368 HeapTuple 2369 tuplesort_getheaptuple(Tuplesortstate *state, bool forward) 2370 { 2371 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2372 SortTuple stup; 2373 2374 if (!tuplesort_gettuple_common(state, forward, &stup)) 2375 stup.tuple = NULL; 2376 2377 MemoryContextSwitchTo(oldcontext); 2378 2379 return stup.tuple; 2380 } 2381 2382 /* 2383 * Fetch the next index tuple in either forward or back direction. 2384 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory 2385 * context, and must not be freed by caller. Caller may not rely on tuple 2386 * remaining valid after any further manipulation of tuplesort. 2387 */ 2388 IndexTuple 2389 tuplesort_getindextuple(Tuplesortstate *state, bool forward) 2390 { 2391 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2392 SortTuple stup; 2393 2394 if (!tuplesort_gettuple_common(state, forward, &stup)) 2395 stup.tuple = NULL; 2396 2397 MemoryContextSwitchTo(oldcontext); 2398 2399 return (IndexTuple) stup.tuple; 2400 } 2401 2402 /* 2403 * Fetch the next Datum in either forward or back direction. 2404 * Returns false if no more datums. 2405 * 2406 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd 2407 * in caller's context, and is now owned by the caller (this differs from 2408 * similar routines for other types of tuplesorts). 2409 * 2410 * Caller may optionally be passed back abbreviated value (on true return 2411 * value) when abbreviation was used, which can be used to cheaply avoid 2412 * equality checks that might otherwise be required. Caller can safely make a 2413 * determination of "non-equal tuple" based on simple binary inequality. A 2414 * NULL value will have a zeroed abbreviated value representation, which caller 2415 * may rely on in abbreviated inequality check. 2416 */ 2417 bool 2418 tuplesort_getdatum(Tuplesortstate *state, bool forward, 2419 Datum *val, bool *isNull, Datum *abbrev) 2420 { 2421 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 2422 SortTuple stup; 2423 2424 if (!tuplesort_gettuple_common(state, forward, &stup)) 2425 { 2426 MemoryContextSwitchTo(oldcontext); 2427 return false; 2428 } 2429 2430 /* Ensure we copy into caller's memory context */ 2431 MemoryContextSwitchTo(oldcontext); 2432 2433 /* Record abbreviated key for caller */ 2434 if (state->sortKeys->abbrev_converter && abbrev) 2435 *abbrev = stup.datum1; 2436 2437 if (stup.isnull1 || !state->tuples) 2438 { 2439 *val = stup.datum1; 2440 *isNull = stup.isnull1; 2441 } 2442 else 2443 { 2444 /* use stup.tuple because stup.datum1 may be an abbreviation */ 2445 *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen); 2446 *isNull = false; 2447 } 2448 2449 return true; 2450 } 2451 2452 /* 2453 * Advance over N tuples in either forward or back direction, 2454 * without returning any data. N==0 is a no-op. 2455 * Returns true if successful, false if ran out of tuples. 2456 */ 2457 bool 2458 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward) 2459 { 2460 MemoryContext oldcontext; 2461 2462 /* 2463 * We don't actually support backwards skip yet, because no callers need 2464 * it. The API is designed to allow for that later, though. 2465 */ 2466 Assert(forward); 2467 Assert(ntuples >= 0); 2468 Assert(!WORKER(state)); 2469 2470 switch (state->status) 2471 { 2472 case TSS_SORTEDINMEM: 2473 if (state->memtupcount - state->current >= ntuples) 2474 { 2475 state->current += ntuples; 2476 return true; 2477 } 2478 state->current = state->memtupcount; 2479 state->eof_reached = true; 2480 2481 /* 2482 * Complain if caller tries to retrieve more tuples than 2483 * originally asked for in a bounded sort. This is because 2484 * returning EOF here might be the wrong thing. 2485 */ 2486 if (state->bounded && state->current >= state->bound) 2487 elog(ERROR, "retrieved too many tuples in a bounded sort"); 2488 2489 return false; 2490 2491 case TSS_SORTEDONTAPE: 2492 case TSS_FINALMERGE: 2493 2494 /* 2495 * We could probably optimize these cases better, but for now it's 2496 * not worth the trouble. 2497 */ 2498 oldcontext = MemoryContextSwitchTo(state->sortcontext); 2499 while (ntuples-- > 0) 2500 { 2501 SortTuple stup; 2502 2503 if (!tuplesort_gettuple_common(state, forward, &stup)) 2504 { 2505 MemoryContextSwitchTo(oldcontext); 2506 return false; 2507 } 2508 CHECK_FOR_INTERRUPTS(); 2509 } 2510 MemoryContextSwitchTo(oldcontext); 2511 return true; 2512 2513 default: 2514 elog(ERROR, "invalid tuplesort state"); 2515 return false; /* keep compiler quiet */ 2516 } 2517 } 2518 2519 /* 2520 * tuplesort_merge_order - report merge order we'll use for given memory 2521 * (note: "merge order" just means the number of input tapes in the merge). 2522 * 2523 * This is exported for use by the planner. allowedMem is in bytes. 2524 */ 2525 int 2526 tuplesort_merge_order(int64 allowedMem) 2527 { 2528 int mOrder; 2529 2530 /* 2531 * We need one tape for each merge input, plus another one for the output, 2532 * and each of these tapes needs buffer space. In addition we want 2533 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't 2534 * count). 2535 * 2536 * Note: you might be thinking we need to account for the memtuples[] 2537 * array in this calculation, but we effectively treat that as part of the 2538 * MERGE_BUFFER_SIZE workspace. 2539 */ 2540 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / 2541 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); 2542 2543 /* 2544 * Even in minimum memory, use at least a MINORDER merge. On the other 2545 * hand, even when we have lots of memory, do not use more than a MAXORDER 2546 * merge. Tapes are pretty cheap, but they're not entirely free. Each 2547 * additional tape reduces the amount of memory available to build runs, 2548 * which in turn can cause the same sort to need more runs, which makes 2549 * merging slower even if it can still be done in a single pass. Also, 2550 * high order merges are quite slow due to CPU cache effects; it can be 2551 * faster to pay the I/O cost of a polyphase merge than to perform a 2552 * single merge pass across many hundreds of tapes. 2553 */ 2554 mOrder = Max(mOrder, MINORDER); 2555 mOrder = Min(mOrder, MAXORDER); 2556 2557 return mOrder; 2558 } 2559 2560 /* 2561 * inittapes - initialize for tape sorting. 2562 * 2563 * This is called only if we have found we won't sort in memory. 2564 */ 2565 static void 2566 inittapes(Tuplesortstate *state, bool mergeruns) 2567 { 2568 int maxTapes, 2569 j; 2570 2571 Assert(!LEADER(state)); 2572 2573 if (mergeruns) 2574 { 2575 /* Compute number of tapes to use: merge order plus 1 */ 2576 maxTapes = tuplesort_merge_order(state->allowedMem) + 1; 2577 } 2578 else 2579 { 2580 /* Workers can sometimes produce single run, output without merge */ 2581 Assert(WORKER(state)); 2582 maxTapes = MINORDER + 1; 2583 } 2584 2585 #ifdef TRACE_SORT 2586 if (trace_sort) 2587 elog(LOG, "worker %d switching to external sort with %d tapes: %s", 2588 state->worker, maxTapes, pg_rusage_show(&state->ru_start)); 2589 #endif 2590 2591 /* Create the tape set and allocate the per-tape data arrays */ 2592 inittapestate(state, maxTapes); 2593 state->tapeset = 2594 LogicalTapeSetCreate(maxTapes, false, NULL, 2595 state->shared ? &state->shared->fileset : NULL, 2596 state->worker); 2597 2598 state->currentRun = 0; 2599 2600 /* 2601 * Initialize variables of Algorithm D (step D1). 2602 */ 2603 for (j = 0; j < maxTapes; j++) 2604 { 2605 state->tp_fib[j] = 1; 2606 state->tp_runs[j] = 0; 2607 state->tp_dummy[j] = 1; 2608 state->tp_tapenum[j] = j; 2609 } 2610 state->tp_fib[state->tapeRange] = 0; 2611 state->tp_dummy[state->tapeRange] = 0; 2612 2613 state->Level = 1; 2614 state->destTape = 0; 2615 2616 state->status = TSS_BUILDRUNS; 2617 } 2618 2619 /* 2620 * inittapestate - initialize generic tape management state 2621 */ 2622 static void 2623 inittapestate(Tuplesortstate *state, int maxTapes) 2624 { 2625 int64 tapeSpace; 2626 2627 /* 2628 * Decrease availMem to reflect the space needed for tape buffers; but 2629 * don't decrease it to the point that we have no room for tuples. (That 2630 * case is only likely to occur if sorting pass-by-value Datums; in all 2631 * other scenarios the memtuples[] array is unlikely to occupy more than 2632 * half of allowedMem. In the pass-by-value case it's not important to 2633 * account for tuple space, so we don't care if LACKMEM becomes 2634 * inaccurate.) 2635 */ 2636 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; 2637 2638 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) 2639 USEMEM(state, tapeSpace); 2640 2641 /* 2642 * Make sure that the temp file(s) underlying the tape set are created in 2643 * suitable temp tablespaces. For parallel sorts, this should have been 2644 * called already, but it doesn't matter if it is called a second time. 2645 */ 2646 PrepareTempTablespaces(); 2647 2648 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); 2649 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); 2650 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); 2651 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); 2652 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); 2653 2654 /* Record # of tapes allocated (for duration of sort) */ 2655 state->maxTapes = maxTapes; 2656 /* Record maximum # of tapes usable as inputs when merging */ 2657 state->tapeRange = maxTapes - 1; 2658 } 2659 2660 /* 2661 * selectnewtape -- select new tape for new initial run. 2662 * 2663 * This is called after finishing a run when we know another run 2664 * must be started. This implements steps D3, D4 of Algorithm D. 2665 */ 2666 static void 2667 selectnewtape(Tuplesortstate *state) 2668 { 2669 int j; 2670 int a; 2671 2672 /* Step D3: advance j (destTape) */ 2673 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) 2674 { 2675 state->destTape++; 2676 return; 2677 } 2678 if (state->tp_dummy[state->destTape] != 0) 2679 { 2680 state->destTape = 0; 2681 return; 2682 } 2683 2684 /* Step D4: increase level */ 2685 state->Level++; 2686 a = state->tp_fib[0]; 2687 for (j = 0; j < state->tapeRange; j++) 2688 { 2689 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; 2690 state->tp_fib[j] = a + state->tp_fib[j + 1]; 2691 } 2692 state->destTape = 0; 2693 } 2694 2695 /* 2696 * Initialize the slab allocation arena, for the given number of slots. 2697 */ 2698 static void 2699 init_slab_allocator(Tuplesortstate *state, int numSlots) 2700 { 2701 if (numSlots > 0) 2702 { 2703 char *p; 2704 int i; 2705 2706 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE); 2707 state->slabMemoryEnd = state->slabMemoryBegin + 2708 numSlots * SLAB_SLOT_SIZE; 2709 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; 2710 USEMEM(state, numSlots * SLAB_SLOT_SIZE); 2711 2712 p = state->slabMemoryBegin; 2713 for (i = 0; i < numSlots - 1; i++) 2714 { 2715 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); 2716 p += SLAB_SLOT_SIZE; 2717 } 2718 ((SlabSlot *) p)->nextfree = NULL; 2719 } 2720 else 2721 { 2722 state->slabMemoryBegin = state->slabMemoryEnd = NULL; 2723 state->slabFreeHead = NULL; 2724 } 2725 state->slabAllocatorUsed = true; 2726 } 2727 2728 /* 2729 * mergeruns -- merge all the completed initial runs. 2730 * 2731 * This implements steps D5, D6 of Algorithm D. All input data has 2732 * already been written to initial runs on tape (see dumptuples). 2733 */ 2734 static void 2735 mergeruns(Tuplesortstate *state) 2736 { 2737 int tapenum, 2738 svTape, 2739 svRuns, 2740 svDummy; 2741 int numTapes; 2742 int numInputTapes; 2743 2744 Assert(state->status == TSS_BUILDRUNS); 2745 Assert(state->memtupcount == 0); 2746 2747 if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL) 2748 { 2749 /* 2750 * If there are multiple runs to be merged, when we go to read back 2751 * tuples from disk, abbreviated keys will not have been stored, and 2752 * we don't care to regenerate them. Disable abbreviation from this 2753 * point on. 2754 */ 2755 state->sortKeys->abbrev_converter = NULL; 2756 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; 2757 2758 /* Not strictly necessary, but be tidy */ 2759 state->sortKeys->abbrev_abort = NULL; 2760 state->sortKeys->abbrev_full_comparator = NULL; 2761 } 2762 2763 /* 2764 * Reset tuple memory. We've freed all the tuples that we previously 2765 * allocated. We will use the slab allocator from now on. 2766 */ 2767 MemoryContextResetOnly(state->tuplecontext); 2768 2769 /* 2770 * We no longer need a large memtuples array. (We will allocate a smaller 2771 * one for the heap later.) 2772 */ 2773 FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); 2774 pfree(state->memtuples); 2775 state->memtuples = NULL; 2776 2777 /* 2778 * If we had fewer runs than tapes, refund the memory that we imagined we 2779 * would need for the tape buffers of the unused tapes. 2780 * 2781 * numTapes and numInputTapes reflect the actual number of tapes we will 2782 * use. Note that the output tape's tape number is maxTapes - 1, so the 2783 * tape numbers of the used tapes are not consecutive, and you cannot just 2784 * loop from 0 to numTapes to visit all used tapes! 2785 */ 2786 if (state->Level == 1) 2787 { 2788 numInputTapes = state->currentRun; 2789 numTapes = numInputTapes + 1; 2790 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); 2791 } 2792 else 2793 { 2794 numInputTapes = state->tapeRange; 2795 numTapes = state->maxTapes; 2796 } 2797 2798 /* 2799 * Initialize the slab allocator. We need one slab slot per input tape, 2800 * for the tuples in the heap, plus one to hold the tuple last returned 2801 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, 2802 * however, we don't need to do allocate anything.) 2803 * 2804 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism 2805 * to track memory usage of individual tuples. 2806 */ 2807 if (state->tuples) 2808 init_slab_allocator(state, numInputTapes + 1); 2809 else 2810 init_slab_allocator(state, 0); 2811 2812 /* 2813 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple 2814 * from each input tape. 2815 */ 2816 state->memtupsize = numInputTapes; 2817 state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext, 2818 numInputTapes * sizeof(SortTuple)); 2819 USEMEM(state, GetMemoryChunkSpace(state->memtuples)); 2820 2821 /* 2822 * Use all the remaining memory we have available for read buffers among 2823 * the input tapes. 2824 * 2825 * We don't try to "rebalance" the memory among tapes, when we start a new 2826 * merge phase, even if some tapes are inactive in the new phase. That 2827 * would be hard, because logtape.c doesn't know where one run ends and 2828 * another begins. When a new merge phase begins, and a tape doesn't 2829 * participate in it, its buffer nevertheless already contains tuples from 2830 * the next run on same tape, so we cannot release the buffer. That's OK 2831 * in practice, merge performance isn't that sensitive to the amount of 2832 * buffers used, and most merge phases use all or almost all tapes, 2833 * anyway. 2834 */ 2835 #ifdef TRACE_SORT 2836 if (trace_sort) 2837 elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", 2838 state->worker, state->availMem / 1024, numInputTapes); 2839 #endif 2840 2841 state->read_buffer_size = Max(state->availMem / numInputTapes, 0); 2842 USEMEM(state, state->read_buffer_size * numInputTapes); 2843 2844 /* End of step D2: rewind all output tapes to prepare for merging */ 2845 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2846 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); 2847 2848 for (;;) 2849 { 2850 /* 2851 * At this point we know that tape[T] is empty. If there's just one 2852 * (real or dummy) run left on each input tape, then only one merge 2853 * pass remains. If we don't have to produce a materialized sorted 2854 * tape, we can stop at this point and do the final merge on-the-fly. 2855 */ 2856 if (!state->randomAccess && !WORKER(state)) 2857 { 2858 bool allOneRun = true; 2859 2860 Assert(state->tp_runs[state->tapeRange] == 0); 2861 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2862 { 2863 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) 2864 { 2865 allOneRun = false; 2866 break; 2867 } 2868 } 2869 if (allOneRun) 2870 { 2871 /* Tell logtape.c we won't be writing anymore */ 2872 LogicalTapeSetForgetFreeSpace(state->tapeset); 2873 /* Initialize for the final merge pass */ 2874 beginmerge(state); 2875 state->status = TSS_FINALMERGE; 2876 return; 2877 } 2878 } 2879 2880 /* Step D5: merge runs onto tape[T] until tape[P] is empty */ 2881 while (state->tp_runs[state->tapeRange - 1] || 2882 state->tp_dummy[state->tapeRange - 1]) 2883 { 2884 bool allDummy = true; 2885 2886 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2887 { 2888 if (state->tp_dummy[tapenum] == 0) 2889 { 2890 allDummy = false; 2891 break; 2892 } 2893 } 2894 2895 if (allDummy) 2896 { 2897 state->tp_dummy[state->tapeRange]++; 2898 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 2899 state->tp_dummy[tapenum]--; 2900 } 2901 else 2902 mergeonerun(state); 2903 } 2904 2905 /* Step D6: decrease level */ 2906 if (--state->Level == 0) 2907 break; 2908 /* rewind output tape T to use as new input */ 2909 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], 2910 state->read_buffer_size); 2911 /* rewind used-up input tape P, and prepare it for write pass */ 2912 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); 2913 state->tp_runs[state->tapeRange - 1] = 0; 2914 2915 /* 2916 * reassign tape units per step D6; note we no longer care about A[] 2917 */ 2918 svTape = state->tp_tapenum[state->tapeRange]; 2919 svDummy = state->tp_dummy[state->tapeRange]; 2920 svRuns = state->tp_runs[state->tapeRange]; 2921 for (tapenum = state->tapeRange; tapenum > 0; tapenum--) 2922 { 2923 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; 2924 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; 2925 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; 2926 } 2927 state->tp_tapenum[0] = svTape; 2928 state->tp_dummy[0] = svDummy; 2929 state->tp_runs[0] = svRuns; 2930 } 2931 2932 /* 2933 * Done. Knuth says that the result is on TAPE[1], but since we exited 2934 * the loop without performing the last iteration of step D6, we have not 2935 * rearranged the tape unit assignment, and therefore the result is on 2936 * TAPE[T]. We need to do it this way so that we can freeze the final 2937 * output tape while rewinding it. The last iteration of step D6 would be 2938 * a waste of cycles anyway... 2939 */ 2940 state->result_tape = state->tp_tapenum[state->tapeRange]; 2941 if (!WORKER(state)) 2942 LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); 2943 else 2944 worker_freeze_result_tape(state); 2945 state->status = TSS_SORTEDONTAPE; 2946 2947 /* Release the read buffers of all the other tapes, by rewinding them. */ 2948 for (tapenum = 0; tapenum < state->maxTapes; tapenum++) 2949 { 2950 if (tapenum != state->result_tape) 2951 LogicalTapeRewindForWrite(state->tapeset, tapenum); 2952 } 2953 } 2954 2955 /* 2956 * Merge one run from each input tape, except ones with dummy runs. 2957 * 2958 * This is the inner loop of Algorithm D step D5. We know that the 2959 * output tape is TAPE[T]. 2960 */ 2961 static void 2962 mergeonerun(Tuplesortstate *state) 2963 { 2964 int destTape = state->tp_tapenum[state->tapeRange]; 2965 int srcTape; 2966 2967 /* 2968 * Start the merge by loading one tuple from each active source tape into 2969 * the heap. We can also decrease the input run/dummy run counts. 2970 */ 2971 beginmerge(state); 2972 2973 /* 2974 * Execute merge by repeatedly extracting lowest tuple in heap, writing it 2975 * out, and replacing it with next tuple from same tape (if there is 2976 * another one). 2977 */ 2978 while (state->memtupcount > 0) 2979 { 2980 SortTuple stup; 2981 2982 /* write the tuple to destTape */ 2983 srcTape = state->memtuples[0].srctape; 2984 WRITETUP(state, destTape, &state->memtuples[0]); 2985 2986 /* recycle the slot of the tuple we just wrote out, for the next read */ 2987 if (state->memtuples[0].tuple) 2988 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); 2989 2990 /* 2991 * pull next tuple from the tape, and replace the written-out tuple in 2992 * the heap with it. 2993 */ 2994 if (mergereadnext(state, srcTape, &stup)) 2995 { 2996 stup.srctape = srcTape; 2997 tuplesort_heap_replace_top(state, &stup); 2998 } 2999 else 3000 tuplesort_heap_delete_top(state); 3001 } 3002 3003 /* 3004 * When the heap empties, we're done. Write an end-of-run marker on the 3005 * output tape, and increment its count of real runs. 3006 */ 3007 markrunend(state, destTape); 3008 state->tp_runs[state->tapeRange]++; 3009 3010 #ifdef TRACE_SORT 3011 if (trace_sort) 3012 elog(LOG, "worker %d finished %d-way merge step: %s", state->worker, 3013 state->activeTapes, pg_rusage_show(&state->ru_start)); 3014 #endif 3015 } 3016 3017 /* 3018 * beginmerge - initialize for a merge pass 3019 * 3020 * We decrease the counts of real and dummy runs for each tape, and mark 3021 * which tapes contain active input runs in mergeactive[]. Then, fill the 3022 * merge heap with the first tuple from each active tape. 3023 */ 3024 static void 3025 beginmerge(Tuplesortstate *state) 3026 { 3027 int activeTapes; 3028 int tapenum; 3029 int srcTape; 3030 3031 /* Heap should be empty here */ 3032 Assert(state->memtupcount == 0); 3033 3034 /* Adjust run counts and mark the active tapes */ 3035 memset(state->mergeactive, 0, 3036 state->maxTapes * sizeof(*state->mergeactive)); 3037 activeTapes = 0; 3038 for (tapenum = 0; tapenum < state->tapeRange; tapenum++) 3039 { 3040 if (state->tp_dummy[tapenum] > 0) 3041 state->tp_dummy[tapenum]--; 3042 else 3043 { 3044 Assert(state->tp_runs[tapenum] > 0); 3045 state->tp_runs[tapenum]--; 3046 srcTape = state->tp_tapenum[tapenum]; 3047 state->mergeactive[srcTape] = true; 3048 activeTapes++; 3049 } 3050 } 3051 Assert(activeTapes > 0); 3052 state->activeTapes = activeTapes; 3053 3054 /* Load the merge heap with the first tuple from each input tape */ 3055 for (srcTape = 0; srcTape < state->maxTapes; srcTape++) 3056 { 3057 SortTuple tup; 3058 3059 if (mergereadnext(state, srcTape, &tup)) 3060 { 3061 tup.srctape = srcTape; 3062 tuplesort_heap_insert(state, &tup); 3063 } 3064 } 3065 } 3066 3067 /* 3068 * mergereadnext - read next tuple from one merge input tape 3069 * 3070 * Returns false on EOF. 3071 */ 3072 static bool 3073 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) 3074 { 3075 unsigned int tuplen; 3076 3077 if (!state->mergeactive[srcTape]) 3078 return false; /* tape's run is already exhausted */ 3079 3080 /* read next tuple, if any */ 3081 if ((tuplen = getlen(state, srcTape, true)) == 0) 3082 { 3083 state->mergeactive[srcTape] = false; 3084 return false; 3085 } 3086 READTUP(state, stup, srcTape, tuplen); 3087 3088 return true; 3089 } 3090 3091 /* 3092 * dumptuples - remove tuples from memtuples and write initial run to tape 3093 * 3094 * When alltuples = true, dump everything currently in memory. (This case is 3095 * only used at end of input data.) 3096 */ 3097 static void 3098 dumptuples(Tuplesortstate *state, bool alltuples) 3099 { 3100 int memtupwrite; 3101 int i; 3102 3103 /* 3104 * Nothing to do if we still fit in available memory and have array slots, 3105 * unless this is the final call during initial run generation. 3106 */ 3107 if (state->memtupcount < state->memtupsize && !LACKMEM(state) && 3108 !alltuples) 3109 return; 3110 3111 /* 3112 * Final call might require no sorting, in rare cases where we just so 3113 * happen to have previously LACKMEM()'d at the point where exactly all 3114 * remaining tuples are loaded into memory, just before input was 3115 * exhausted. 3116 * 3117 * In general, short final runs are quite possible. Rather than allowing 3118 * a special case where there was a superfluous selectnewtape() call (i.e. 3119 * a call with no subsequent run actually written to destTape), we prefer 3120 * to write out a 0 tuple run. 3121 * 3122 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark 3123 * the tape inactive for the merge when called from beginmerge(). This 3124 * case is therefore similar to the case where mergeonerun() finds a dummy 3125 * run for the tape, and so doesn't need to merge a run from the tape (or 3126 * conceptually "merges" the dummy run, if you prefer). According to 3127 * Knuth, Algorithm D "isn't strictly optimal" in its method of 3128 * distribution and dummy run assignment; this edge case seems very 3129 * unlikely to make that appreciably worse. 3130 */ 3131 Assert(state->status == TSS_BUILDRUNS); 3132 3133 /* 3134 * It seems unlikely that this limit will ever be exceeded, but take no 3135 * chances 3136 */ 3137 if (state->currentRun == INT_MAX) 3138 ereport(ERROR, 3139 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), 3140 errmsg("cannot have more than %d runs for an external sort", 3141 INT_MAX))); 3142 3143 state->currentRun++; 3144 3145 #ifdef TRACE_SORT 3146 if (trace_sort) 3147 elog(LOG, "worker %d starting quicksort of run %d: %s", 3148 state->worker, state->currentRun, 3149 pg_rusage_show(&state->ru_start)); 3150 #endif 3151 3152 /* 3153 * Sort all tuples accumulated within the allowed amount of memory for 3154 * this run using quicksort 3155 */ 3156 tuplesort_sort_memtuples(state); 3157 3158 #ifdef TRACE_SORT 3159 if (trace_sort) 3160 elog(LOG, "worker %d finished quicksort of run %d: %s", 3161 state->worker, state->currentRun, 3162 pg_rusage_show(&state->ru_start)); 3163 #endif 3164 3165 memtupwrite = state->memtupcount; 3166 for (i = 0; i < memtupwrite; i++) 3167 { 3168 WRITETUP(state, state->tp_tapenum[state->destTape], 3169 &state->memtuples[i]); 3170 state->memtupcount--; 3171 } 3172 3173 /* 3174 * Reset tuple memory. We've freed all of the tuples that we previously 3175 * allocated. It's important to avoid fragmentation when there is a stark 3176 * change in the sizes of incoming tuples. Fragmentation due to 3177 * AllocSetFree's bucketing by size class might be particularly bad if 3178 * this step wasn't taken. 3179 */ 3180 MemoryContextReset(state->tuplecontext); 3181 3182 markrunend(state, state->tp_tapenum[state->destTape]); 3183 state->tp_runs[state->destTape]++; 3184 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ 3185 3186 #ifdef TRACE_SORT 3187 if (trace_sort) 3188 elog(LOG, "worker %d finished writing run %d to tape %d: %s", 3189 state->worker, state->currentRun, state->destTape, 3190 pg_rusage_show(&state->ru_start)); 3191 #endif 3192 3193 if (!alltuples) 3194 selectnewtape(state); 3195 } 3196 3197 /* 3198 * tuplesort_rescan - rewind and replay the scan 3199 */ 3200 void 3201 tuplesort_rescan(Tuplesortstate *state) 3202 { 3203 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 3204 3205 Assert(state->randomAccess); 3206 3207 switch (state->status) 3208 { 3209 case TSS_SORTEDINMEM: 3210 state->current = 0; 3211 state->eof_reached = false; 3212 state->markpos_offset = 0; 3213 state->markpos_eof = false; 3214 break; 3215 case TSS_SORTEDONTAPE: 3216 LogicalTapeRewindForRead(state->tapeset, 3217 state->result_tape, 3218 0); 3219 state->eof_reached = false; 3220 state->markpos_block = 0L; 3221 state->markpos_offset = 0; 3222 state->markpos_eof = false; 3223 break; 3224 default: 3225 elog(ERROR, "invalid tuplesort state"); 3226 break; 3227 } 3228 3229 MemoryContextSwitchTo(oldcontext); 3230 } 3231 3232 /* 3233 * tuplesort_markpos - saves current position in the merged sort file 3234 */ 3235 void 3236 tuplesort_markpos(Tuplesortstate *state) 3237 { 3238 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 3239 3240 Assert(state->randomAccess); 3241 3242 switch (state->status) 3243 { 3244 case TSS_SORTEDINMEM: 3245 state->markpos_offset = state->current; 3246 state->markpos_eof = state->eof_reached; 3247 break; 3248 case TSS_SORTEDONTAPE: 3249 LogicalTapeTell(state->tapeset, 3250 state->result_tape, 3251 &state->markpos_block, 3252 &state->markpos_offset); 3253 state->markpos_eof = state->eof_reached; 3254 break; 3255 default: 3256 elog(ERROR, "invalid tuplesort state"); 3257 break; 3258 } 3259 3260 MemoryContextSwitchTo(oldcontext); 3261 } 3262 3263 /* 3264 * tuplesort_restorepos - restores current position in merged sort file to 3265 * last saved position 3266 */ 3267 void 3268 tuplesort_restorepos(Tuplesortstate *state) 3269 { 3270 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); 3271 3272 Assert(state->randomAccess); 3273 3274 switch (state->status) 3275 { 3276 case TSS_SORTEDINMEM: 3277 state->current = state->markpos_offset; 3278 state->eof_reached = state->markpos_eof; 3279 break; 3280 case TSS_SORTEDONTAPE: 3281 LogicalTapeSeek(state->tapeset, 3282 state->result_tape, 3283 state->markpos_block, 3284 state->markpos_offset); 3285 state->eof_reached = state->markpos_eof; 3286 break; 3287 default: 3288 elog(ERROR, "invalid tuplesort state"); 3289 break; 3290 } 3291 3292 MemoryContextSwitchTo(oldcontext); 3293 } 3294 3295 /* 3296 * tuplesort_get_stats - extract summary statistics 3297 * 3298 * This can be called after tuplesort_performsort() finishes to obtain 3299 * printable summary information about how the sort was performed. 3300 */ 3301 void 3302 tuplesort_get_stats(Tuplesortstate *state, 3303 TuplesortInstrumentation *stats) 3304 { 3305 /* 3306 * Note: it might seem we should provide both memory and disk usage for a 3307 * disk-based sort. However, the current code doesn't track memory space 3308 * accurately once we have begun to return tuples to the caller (since we 3309 * don't account for pfree's the caller is expected to do), so we cannot 3310 * rely on availMem in a disk sort. This does not seem worth the overhead 3311 * to fix. Is it worth creating an API for the memory context code to 3312 * tell us how much is actually used in sortcontext? 3313 */ 3314 tuplesort_updatemax(state); 3315 3316 if (state->isMaxSpaceDisk) 3317 stats->spaceType = SORT_SPACE_TYPE_DISK; 3318 else 3319 stats->spaceType = SORT_SPACE_TYPE_MEMORY; 3320 stats->spaceUsed = (state->maxSpace + 1023) / 1024; 3321 3322 switch (state->maxSpaceStatus) 3323 { 3324 case TSS_SORTEDINMEM: 3325 if (state->boundUsed) 3326 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT; 3327 else 3328 stats->sortMethod = SORT_TYPE_QUICKSORT; 3329 break; 3330 case TSS_SORTEDONTAPE: 3331 stats->sortMethod = SORT_TYPE_EXTERNAL_SORT; 3332 break; 3333 case TSS_FINALMERGE: 3334 stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE; 3335 break; 3336 default: 3337 stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS; 3338 break; 3339 } 3340 } 3341 3342 /* 3343 * Convert TuplesortMethod to a string. 3344 */ 3345 const char * 3346 tuplesort_method_name(TuplesortMethod m) 3347 { 3348 switch (m) 3349 { 3350 case SORT_TYPE_STILL_IN_PROGRESS: 3351 return "still in progress"; 3352 case SORT_TYPE_TOP_N_HEAPSORT: 3353 return "top-N heapsort"; 3354 case SORT_TYPE_QUICKSORT: 3355 return "quicksort"; 3356 case SORT_TYPE_EXTERNAL_SORT: 3357 return "external sort"; 3358 case SORT_TYPE_EXTERNAL_MERGE: 3359 return "external merge"; 3360 } 3361 3362 return "unknown"; 3363 } 3364 3365 /* 3366 * Convert TuplesortSpaceType to a string. 3367 */ 3368 const char * 3369 tuplesort_space_type_name(TuplesortSpaceType t) 3370 { 3371 Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY); 3372 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory"; 3373 } 3374 3375 3376 /* 3377 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. 3378 */ 3379 3380 /* 3381 * Convert the existing unordered array of SortTuples to a bounded heap, 3382 * discarding all but the smallest "state->bound" tuples. 3383 * 3384 * When working with a bounded heap, we want to keep the largest entry 3385 * at the root (array entry zero), instead of the smallest as in the normal 3386 * sort case. This allows us to discard the largest entry cheaply. 3387 * Therefore, we temporarily reverse the sort direction. 3388 */ 3389 static void 3390 make_bounded_heap(Tuplesortstate *state) 3391 { 3392 int tupcount = state->memtupcount; 3393 int i; 3394 3395 Assert(state->status == TSS_INITIAL); 3396 Assert(state->bounded); 3397 Assert(tupcount >= state->bound); 3398 Assert(SERIAL(state)); 3399 3400 /* Reverse sort direction so largest entry will be at root */ 3401 reversedirection(state); 3402 3403 state->memtupcount = 0; /* make the heap empty */ 3404 for (i = 0; i < tupcount; i++) 3405 { 3406 if (state->memtupcount < state->bound) 3407 { 3408 /* Insert next tuple into heap */ 3409 /* Must copy source tuple to avoid possible overwrite */ 3410 SortTuple stup = state->memtuples[i]; 3411 3412 tuplesort_heap_insert(state, &stup); 3413 } 3414 else 3415 { 3416 /* 3417 * The heap is full. Replace the largest entry with the new 3418 * tuple, or just discard it, if it's larger than anything already 3419 * in the heap. 3420 */ 3421 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) 3422 { 3423 free_sort_tuple(state, &state->memtuples[i]); 3424 CHECK_FOR_INTERRUPTS(); 3425 } 3426 else 3427 tuplesort_heap_replace_top(state, &state->memtuples[i]); 3428 } 3429 } 3430 3431 Assert(state->memtupcount == state->bound); 3432 state->status = TSS_BOUNDED; 3433 } 3434 3435 /* 3436 * Convert the bounded heap to a properly-sorted array 3437 */ 3438 static void 3439 sort_bounded_heap(Tuplesortstate *state) 3440 { 3441 int tupcount = state->memtupcount; 3442 3443 Assert(state->status == TSS_BOUNDED); 3444 Assert(state->bounded); 3445 Assert(tupcount == state->bound); 3446 Assert(SERIAL(state)); 3447 3448 /* 3449 * We can unheapify in place because each delete-top call will remove the 3450 * largest entry, which we can promptly store in the newly freed slot at 3451 * the end. Once we're down to a single-entry heap, we're done. 3452 */ 3453 while (state->memtupcount > 1) 3454 { 3455 SortTuple stup = state->memtuples[0]; 3456 3457 /* this sifts-up the next-largest entry and decreases memtupcount */ 3458 tuplesort_heap_delete_top(state); 3459 state->memtuples[state->memtupcount] = stup; 3460 } 3461 state->memtupcount = tupcount; 3462 3463 /* 3464 * Reverse sort direction back to the original state. This is not 3465 * actually necessary but seems like a good idea for tidiness. 3466 */ 3467 reversedirection(state); 3468 3469 state->status = TSS_SORTEDINMEM; 3470 state->boundUsed = true; 3471 } 3472 3473 /* 3474 * Sort all memtuples using specialized qsort() routines. 3475 * 3476 * Quicksort is used for small in-memory sorts, and external sort runs. 3477 */ 3478 static void 3479 tuplesort_sort_memtuples(Tuplesortstate *state) 3480 { 3481 Assert(!LEADER(state)); 3482 3483 if (state->memtupcount > 1) 3484 { 3485 /* Can we use the single-key sort function? */ 3486 if (state->onlyKey != NULL) 3487 qsort_ssup(state->memtuples, state->memtupcount, 3488 state->onlyKey); 3489 else 3490 qsort_tuple(state->memtuples, 3491 state->memtupcount, 3492 state->comparetup, 3493 state); 3494 } 3495 } 3496 3497 /* 3498 * Insert a new tuple into an empty or existing heap, maintaining the 3499 * heap invariant. Caller is responsible for ensuring there's room. 3500 * 3501 * Note: For some callers, tuple points to a memtuples[] entry above the 3502 * end of the heap. This is safe as long as it's not immediately adjacent 3503 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it 3504 * is, it might get overwritten before being moved into the heap! 3505 */ 3506 static void 3507 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple) 3508 { 3509 SortTuple *memtuples; 3510 int j; 3511 3512 memtuples = state->memtuples; 3513 Assert(state->memtupcount < state->memtupsize); 3514 3515 CHECK_FOR_INTERRUPTS(); 3516 3517 /* 3518 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is 3519 * using 1-based array indexes, not 0-based. 3520 */ 3521 j = state->memtupcount++; 3522 while (j > 0) 3523 { 3524 int i = (j - 1) >> 1; 3525 3526 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0) 3527 break; 3528 memtuples[j] = memtuples[i]; 3529 j = i; 3530 } 3531 memtuples[j] = *tuple; 3532 } 3533 3534 /* 3535 * Remove the tuple at state->memtuples[0] from the heap. Decrement 3536 * memtupcount, and sift up to maintain the heap invariant. 3537 * 3538 * The caller has already free'd the tuple the top node points to, 3539 * if necessary. 3540 */ 3541 static void 3542 tuplesort_heap_delete_top(Tuplesortstate *state) 3543 { 3544 SortTuple *memtuples = state->memtuples; 3545 SortTuple *tuple; 3546 3547 if (--state->memtupcount <= 0) 3548 return; 3549 3550 /* 3551 * Remove the last tuple in the heap, and re-insert it, by replacing the 3552 * current top node with it. 3553 */ 3554 tuple = &memtuples[state->memtupcount]; 3555 tuplesort_heap_replace_top(state, tuple); 3556 } 3557 3558 /* 3559 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to 3560 * maintain the heap invariant. 3561 * 3562 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H, 3563 * Heapsort, steps H3-H8). 3564 */ 3565 static void 3566 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple) 3567 { 3568 SortTuple *memtuples = state->memtuples; 3569 unsigned int i, 3570 n; 3571 3572 Assert(state->memtupcount >= 1); 3573 3574 CHECK_FOR_INTERRUPTS(); 3575 3576 /* 3577 * state->memtupcount is "int", but we use "unsigned int" for i, j, n. 3578 * This prevents overflow in the "2 * i + 1" calculation, since at the top 3579 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2. 3580 */ 3581 n = state->memtupcount; 3582 i = 0; /* i is where the "hole" is */ 3583 for (;;) 3584 { 3585 unsigned int j = 2 * i + 1; 3586 3587 if (j >= n) 3588 break; 3589 if (j + 1 < n && 3590 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0) 3591 j++; 3592 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0) 3593 break; 3594 memtuples[i] = memtuples[j]; 3595 i = j; 3596 } 3597 memtuples[i] = *tuple; 3598 } 3599 3600 /* 3601 * Function to reverse the sort direction from its current state 3602 * 3603 * It is not safe to call this when performing hash tuplesorts 3604 */ 3605 static void 3606 reversedirection(Tuplesortstate *state) 3607 { 3608 SortSupport sortKey = state->sortKeys; 3609 int nkey; 3610 3611 for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) 3612 { 3613 sortKey->ssup_reverse = !sortKey->ssup_reverse; 3614 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; 3615 } 3616 } 3617 3618 3619 /* 3620 * Tape interface routines 3621 */ 3622 3623 static unsigned int 3624 getlen(Tuplesortstate *state, int tapenum, bool eofOK) 3625 { 3626 unsigned int len; 3627 3628 if (LogicalTapeRead(state->tapeset, tapenum, 3629 &len, sizeof(len)) != sizeof(len)) 3630 elog(ERROR, "unexpected end of tape"); 3631 if (len == 0 && !eofOK) 3632 elog(ERROR, "unexpected end of data"); 3633 return len; 3634 } 3635 3636 static void 3637 markrunend(Tuplesortstate *state, int tapenum) 3638 { 3639 unsigned int len = 0; 3640 3641 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); 3642 } 3643 3644 /* 3645 * Get memory for tuple from within READTUP() routine. 3646 * 3647 * We use next free slot from the slab allocator, or palloc() if the tuple 3648 * is too large for that. 3649 */ 3650 static void * 3651 readtup_alloc(Tuplesortstate *state, Size tuplen) 3652 { 3653 SlabSlot *buf; 3654 3655 /* 3656 * We pre-allocate enough slots in the slab arena that we should never run 3657 * out. 3658 */ 3659 Assert(state->slabFreeHead); 3660 3661 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) 3662 return MemoryContextAlloc(state->sortcontext, tuplen); 3663 else 3664 { 3665 buf = state->slabFreeHead; 3666 /* Reuse this slot */ 3667 state->slabFreeHead = buf->nextfree; 3668 3669 return buf; 3670 } 3671 } 3672 3673 3674 /* 3675 * Routines specialized for HeapTuple (actually MinimalTuple) case 3676 */ 3677 3678 static int 3679 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) 3680 { 3681 SortSupport sortKey = state->sortKeys; 3682 HeapTupleData ltup; 3683 HeapTupleData rtup; 3684 TupleDesc tupDesc; 3685 int nkey; 3686 int32 compare; 3687 AttrNumber attno; 3688 Datum datum1, 3689 datum2; 3690 bool isnull1, 3691 isnull2; 3692 3693 3694 /* Compare the leading sort key */ 3695 compare = ApplySortComparator(a->datum1, a->isnull1, 3696 b->datum1, b->isnull1, 3697 sortKey); 3698 if (compare != 0) 3699 return compare; 3700 3701 /* Compare additional sort keys */ 3702 ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; 3703 ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET); 3704 rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET; 3705 rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET); 3706 tupDesc = state->tupDesc; 3707 3708 if (sortKey->abbrev_converter) 3709 { 3710 attno = sortKey->ssup_attno; 3711 3712 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); 3713 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); 3714 3715 compare = ApplySortAbbrevFullComparator(datum1, isnull1, 3716 datum2, isnull2, 3717 sortKey); 3718 if (compare != 0) 3719 return compare; 3720 } 3721 3722 sortKey++; 3723 for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++) 3724 { 3725 attno = sortKey->ssup_attno; 3726 3727 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); 3728 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); 3729 3730 compare = ApplySortComparator(datum1, isnull1, 3731 datum2, isnull2, 3732 sortKey); 3733 if (compare != 0) 3734 return compare; 3735 } 3736 3737 return 0; 3738 } 3739 3740 static void 3741 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) 3742 { 3743 /* 3744 * We expect the passed "tup" to be a TupleTableSlot, and form a 3745 * MinimalTuple using the exported interface for that. 3746 */ 3747 TupleTableSlot *slot = (TupleTableSlot *) tup; 3748 Datum original; 3749 MinimalTuple tuple; 3750 HeapTupleData htup; 3751 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 3752 3753 /* copy the tuple into sort storage */ 3754 tuple = ExecCopySlotMinimalTuple(slot); 3755 stup->tuple = (void *) tuple; 3756 USEMEM(state, GetMemoryChunkSpace(tuple)); 3757 /* set up first-column key value */ 3758 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; 3759 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); 3760 original = heap_getattr(&htup, 3761 state->sortKeys[0].ssup_attno, 3762 state->tupDesc, 3763 &stup->isnull1); 3764 3765 MemoryContextSwitchTo(oldcontext); 3766 3767 if (!state->sortKeys->abbrev_converter || stup->isnull1) 3768 { 3769 /* 3770 * Store ordinary Datum representation, or NULL value. If there is a 3771 * converter it won't expect NULL values, and cost model is not 3772 * required to account for NULL, so in that case we avoid calling 3773 * converter and just set datum1 to zeroed representation (to be 3774 * consistent, and to support cheap inequality tests for NULL 3775 * abbreviated keys). 3776 */ 3777 stup->datum1 = original; 3778 } 3779 else if (!consider_abort_common(state)) 3780 { 3781 /* Store abbreviated key representation */ 3782 stup->datum1 = state->sortKeys->abbrev_converter(original, 3783 state->sortKeys); 3784 } 3785 else 3786 { 3787 /* Abort abbreviation */ 3788 int i; 3789 3790 stup->datum1 = original; 3791 3792 /* 3793 * Set state to be consistent with never trying abbreviation. 3794 * 3795 * Alter datum1 representation in already-copied tuples, so as to 3796 * ensure a consistent representation (current tuple was just 3797 * handled). It does not matter if some dumped tuples are already 3798 * sorted on tape, since serialized tuples lack abbreviated keys 3799 * (TSS_BUILDRUNS state prevents control reaching here in any case). 3800 */ 3801 for (i = 0; i < state->memtupcount; i++) 3802 { 3803 SortTuple *mtup = &state->memtuples[i]; 3804 3805 htup.t_len = ((MinimalTuple) mtup->tuple)->t_len + 3806 MINIMAL_TUPLE_OFFSET; 3807 htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple - 3808 MINIMAL_TUPLE_OFFSET); 3809 3810 mtup->datum1 = heap_getattr(&htup, 3811 state->sortKeys[0].ssup_attno, 3812 state->tupDesc, 3813 &mtup->isnull1); 3814 } 3815 } 3816 } 3817 3818 static void 3819 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) 3820 { 3821 MinimalTuple tuple = (MinimalTuple) stup->tuple; 3822 3823 /* the part of the MinimalTuple we'll write: */ 3824 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; 3825 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; 3826 3827 /* total on-disk footprint: */ 3828 unsigned int tuplen = tupbodylen + sizeof(int); 3829 3830 LogicalTapeWrite(state->tapeset, tapenum, 3831 (void *) &tuplen, sizeof(tuplen)); 3832 LogicalTapeWrite(state->tapeset, tapenum, 3833 (void *) tupbody, tupbodylen); 3834 if (state->randomAccess) /* need trailing length word? */ 3835 LogicalTapeWrite(state->tapeset, tapenum, 3836 (void *) &tuplen, sizeof(tuplen)); 3837 3838 if (!state->slabAllocatorUsed) 3839 { 3840 FREEMEM(state, GetMemoryChunkSpace(tuple)); 3841 heap_free_minimal_tuple(tuple); 3842 } 3843 } 3844 3845 static void 3846 readtup_heap(Tuplesortstate *state, SortTuple *stup, 3847 int tapenum, unsigned int len) 3848 { 3849 unsigned int tupbodylen = len - sizeof(int); 3850 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; 3851 MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); 3852 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; 3853 HeapTupleData htup; 3854 3855 /* read in the tuple proper */ 3856 tuple->t_len = tuplen; 3857 LogicalTapeReadExact(state->tapeset, tapenum, 3858 tupbody, tupbodylen); 3859 if (state->randomAccess) /* need trailing length word? */ 3860 LogicalTapeReadExact(state->tapeset, tapenum, 3861 &tuplen, sizeof(tuplen)); 3862 stup->tuple = (void *) tuple; 3863 /* set up first-column key value */ 3864 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; 3865 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); 3866 stup->datum1 = heap_getattr(&htup, 3867 state->sortKeys[0].ssup_attno, 3868 state->tupDesc, 3869 &stup->isnull1); 3870 } 3871 3872 /* 3873 * Routines specialized for the CLUSTER case (HeapTuple data, with 3874 * comparisons per a btree index definition) 3875 */ 3876 3877 static int 3878 comparetup_cluster(const SortTuple *a, const SortTuple *b, 3879 Tuplesortstate *state) 3880 { 3881 SortSupport sortKey = state->sortKeys; 3882 HeapTuple ltup; 3883 HeapTuple rtup; 3884 TupleDesc tupDesc; 3885 int nkey; 3886 int32 compare; 3887 Datum datum1, 3888 datum2; 3889 bool isnull1, 3890 isnull2; 3891 AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0]; 3892 3893 /* Be prepared to compare additional sort keys */ 3894 ltup = (HeapTuple) a->tuple; 3895 rtup = (HeapTuple) b->tuple; 3896 tupDesc = state->tupDesc; 3897 3898 /* Compare the leading sort key, if it's simple */ 3899 if (leading != 0) 3900 { 3901 compare = ApplySortComparator(a->datum1, a->isnull1, 3902 b->datum1, b->isnull1, 3903 sortKey); 3904 if (compare != 0) 3905 return compare; 3906 3907 if (sortKey->abbrev_converter) 3908 { 3909 datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1); 3910 datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2); 3911 3912 compare = ApplySortAbbrevFullComparator(datum1, isnull1, 3913 datum2, isnull2, 3914 sortKey); 3915 } 3916 if (compare != 0 || state->nKeys == 1) 3917 return compare; 3918 /* Compare additional columns the hard way */ 3919 sortKey++; 3920 nkey = 1; 3921 } 3922 else 3923 { 3924 /* Must compare all keys the hard way */ 3925 nkey = 0; 3926 } 3927 3928 if (state->indexInfo->ii_Expressions == NULL) 3929 { 3930 /* If not expression index, just compare the proper heap attrs */ 3931 3932 for (; nkey < state->nKeys; nkey++, sortKey++) 3933 { 3934 AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey]; 3935 3936 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); 3937 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); 3938 3939 compare = ApplySortComparator(datum1, isnull1, 3940 datum2, isnull2, 3941 sortKey); 3942 if (compare != 0) 3943 return compare; 3944 } 3945 } 3946 else 3947 { 3948 /* 3949 * In the expression index case, compute the whole index tuple and 3950 * then compare values. It would perhaps be faster to compute only as 3951 * many columns as we need to compare, but that would require 3952 * duplicating all the logic in FormIndexDatum. 3953 */ 3954 Datum l_index_values[INDEX_MAX_KEYS]; 3955 bool l_index_isnull[INDEX_MAX_KEYS]; 3956 Datum r_index_values[INDEX_MAX_KEYS]; 3957 bool r_index_isnull[INDEX_MAX_KEYS]; 3958 TupleTableSlot *ecxt_scantuple; 3959 3960 /* Reset context each time to prevent memory leakage */ 3961 ResetPerTupleExprContext(state->estate); 3962 3963 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple; 3964 3965 ExecStoreHeapTuple(ltup, ecxt_scantuple, false); 3966 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, 3967 l_index_values, l_index_isnull); 3968 3969 ExecStoreHeapTuple(rtup, ecxt_scantuple, false); 3970 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, 3971 r_index_values, r_index_isnull); 3972 3973 for (; nkey < state->nKeys; nkey++, sortKey++) 3974 { 3975 compare = ApplySortComparator(l_index_values[nkey], 3976 l_index_isnull[nkey], 3977 r_index_values[nkey], 3978 r_index_isnull[nkey], 3979 sortKey); 3980 if (compare != 0) 3981 return compare; 3982 } 3983 } 3984 3985 return 0; 3986 } 3987 3988 static void 3989 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) 3990 { 3991 HeapTuple tuple = (HeapTuple) tup; 3992 Datum original; 3993 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); 3994 3995 /* copy the tuple into sort storage */ 3996 tuple = heap_copytuple(tuple); 3997 stup->tuple = (void *) tuple; 3998 USEMEM(state, GetMemoryChunkSpace(tuple)); 3999 4000 MemoryContextSwitchTo(oldcontext); 4001 4002 /* 4003 * set up first-column key value, and potentially abbreviate, if it's a 4004 * simple column 4005 */ 4006 if (state->indexInfo->ii_IndexAttrNumbers[0] == 0) 4007 return; 4008 4009 original = heap_getattr(tuple, 4010 state->indexInfo->ii_IndexAttrNumbers[0], 4011 state->tupDesc, 4012 &stup->isnull1); 4013 4014 if (!state->sortKeys->abbrev_converter || stup->isnull1) 4015 { 4016 /* 4017 * Store ordinary Datum representation, or NULL value. If there is a 4018 * converter it won't expect NULL values, and cost model is not 4019 * required to account for NULL, so in that case we avoid calling 4020 * converter and just set datum1 to zeroed representation (to be 4021 * consistent, and to support cheap inequality tests for NULL 4022 * abbreviated keys). 4023 */ 4024 stup->datum1 = original; 4025 } 4026 else if (!consider_abort_common(state)) 4027 { 4028 /* Store abbreviated key representation */ 4029 stup->datum1 = state->sortKeys->abbrev_converter(original, 4030 state->sortKeys); 4031 } 4032 else 4033 { 4034 /* Abort abbreviation */ 4035 int i; 4036 4037 stup->datum1 = original; 4038 4039 /* 4040 * Set state to be consistent with never trying abbreviation. 4041 * 4042 * Alter datum1 representation in already-copied tuples, so as to 4043 * ensure a consistent representation (current tuple was just 4044 * handled). It does not matter if some dumped tuples are already 4045 * sorted on tape, since serialized tuples lack abbreviated keys 4046 * (TSS_BUILDRUNS state prevents control reaching here in any case). 4047 */ 4048 for (i = 0; i < state->memtupcount; i++) 4049 { 4050 SortTuple *mtup = &state->memtuples[i]; 4051 4052 tuple = (HeapTuple) mtup->tuple; 4053 mtup->datum1 = heap_getattr(tuple, 4054 state->indexInfo->ii_IndexAttrNumbers[0], 4055 state->tupDesc, 4056 &mtup->isnull1); 4057 } 4058 } 4059 } 4060 4061 static void 4062 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) 4063 { 4064 HeapTuple tuple = (HeapTuple) stup->tuple; 4065 unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); 4066 4067 /* We need to store t_self, but not other fields of HeapTupleData */ 4068 LogicalTapeWrite(state->tapeset, tapenum, 4069 &tuplen, sizeof(tuplen)); 4070 LogicalTapeWrite(state->tapeset, tapenum, 4071 &tuple->t_self, sizeof(ItemPointerData)); 4072 LogicalTapeWrite(state->tapeset, tapenum, 4073 tuple->t_data, tuple->t_len); 4074 if (state->randomAccess) /* need trailing length word? */ 4075 LogicalTapeWrite(state->tapeset, tapenum, 4076 &tuplen, sizeof(tuplen)); 4077 4078 if (!state->slabAllocatorUsed) 4079 { 4080 FREEMEM(state, GetMemoryChunkSpace(tuple)); 4081 heap_freetuple(tuple); 4082 } 4083 } 4084 4085 static void 4086 readtup_cluster(Tuplesortstate *state, SortTuple *stup, 4087 int tapenum, unsigned int tuplen) 4088 { 4089 unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); 4090 HeapTuple tuple = (HeapTuple) readtup_alloc(state, 4091 t_len + HEAPTUPLESIZE); 4092 4093 /* Reconstruct the HeapTupleData header */ 4094 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); 4095 tuple->t_len = t_len; 4096 LogicalTapeReadExact(state->tapeset, tapenum, 4097 &tuple->t_self, sizeof(ItemPointerData)); 4098 /* We don't currently bother to reconstruct t_tableOid */ 4099 tuple->t_tableOid = InvalidOid; 4100 /* Read in the tuple body */ 4101 LogicalTapeReadExact(state->tapeset, tapenum, 4102 tuple->t_data, tuple->t_len); 4103 if (state->randomAccess) /* need trailing length word? */ 4104 LogicalTapeReadExact(state->tapeset, tapenum, 4105 &tuplen, sizeof(tuplen)); 4106 stup->tuple = (void *) tuple; 4107 /* set up first-column key value, if it's a simple column */ 4108 if (state->indexInfo->ii_IndexAttrNumbers[0] != 0) 4109 stup->datum1 = heap_getattr(tuple, 4110 state->indexInfo->ii_IndexAttrNumbers[0], 4111 state->tupDesc, 4112 &stup->isnull1); 4113 } 4114 4115 /* 4116 * Routines specialized for IndexTuple case 4117 * 4118 * The btree and hash cases require separate comparison functions, but the 4119 * IndexTuple representation is the same so the copy/write/read support 4120 * functions can be shared. 4121 */ 4122 4123 static int 4124 comparetup_index_btree(const SortTuple *a, const SortTuple *b, 4125 Tuplesortstate *state) 4126 { 4127 /* 4128 * This is similar to comparetup_heap(), but expects index tuples. There 4129 * is also special handling for enforcing uniqueness, and special 4130 * treatment for equal keys at the end. 4131 */ 4132 SortSupport sortKey = state->sortKeys; 4133 IndexTuple tuple1; 4134 IndexTuple tuple2; 4135 int keysz; 4136 TupleDesc tupDes; 4137 bool equal_hasnull = false; 4138 int nkey; 4139 int32 compare; 4140 Datum datum1, 4141 datum2; 4142 bool isnull1, 4143 isnull2; 4144 4145 4146 /* Compare the leading sort key */ 4147 compare = ApplySortComparator(a->datum1, a->isnull1, 4148 b->datum1, b->isnull1, 4149 sortKey); 4150 if (compare != 0) 4151 return compare; 4152 4153 /* Compare additional sort keys */ 4154 tuple1 = (IndexTuple) a->tuple; 4155 tuple2 = (IndexTuple) b->tuple; 4156 keysz = state->nKeys; 4157 tupDes = RelationGetDescr(state->indexRel); 4158 4159 if (sortKey->abbrev_converter) 4160 { 4161 datum1 = index_getattr(tuple1, 1, tupDes, &isnull1); 4162 datum2 = index_getattr(tuple2, 1, tupDes, &isnull2); 4163 4164 compare = ApplySortAbbrevFullComparator(datum1, isnull1, 4165 datum2, isnull2, 4166 sortKey); 4167 if (compare != 0) 4168 return compare; 4169 } 4170 4171 /* they are equal, so we only need to examine one null flag */ 4172 if (a->isnull1) 4173 equal_hasnull = true; 4174 4175 sortKey++; 4176 for (nkey = 2; nkey <= keysz; nkey++, sortKey++) 4177 { 4178 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); 4179 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); 4180 4181 compare = ApplySortComparator(datum1, isnull1, 4182 datum2, isnull2, 4183 sortKey); 4184 if (compare != 0) 4185 return compare; /* done when we find unequal attributes */ 4186 4187 /* they are equal, so we only need to examine one null flag */ 4188 if (isnull1) 4189 equal_hasnull = true; 4190 } 4191 4192 /* 4193 * If btree has asked us to enforce uniqueness, complain if two equal 4194 * tuples are detected (unless there was at least one NULL field). 4195 * 4196 * It is sufficient to make the test here, because if two tuples are equal 4197 * they *must* get compared at some stage of the sort --- otherwise the 4198 * sort algorithm wouldn't have checked whether one must appear before the 4199 * other. 4200 */ 4201 if (state->enforceUnique && !equal_hasnull) 4202 { 4203 Datum values[INDEX_MAX_KEYS]; 4204 bool isnull[INDEX_MAX_KEYS]; 4205 char *key_desc; 4206 4207 /* 4208 * Some rather brain-dead implementations of qsort (such as the one in 4209 * QNX 4) will sometimes call the comparison routine to compare a 4210 * value to itself, but we always use our own implementation, which 4211 * does not. 4212 */ 4213 Assert(tuple1 != tuple2); 4214 4215 index_deform_tuple(tuple1, tupDes, values, isnull); 4216 4217 key_desc = BuildIndexValueDescription(state->indexRel, values, isnull); 4218 4219 ereport(ERROR, 4220 (errcode(ERRCODE_UNIQUE_VIOLATION), 4221 errmsg("could not create unique index \"%s\"", 4222 RelationGetRelationName(state->indexRel)), 4223 key_desc ? errdetail("Key %s is duplicated.", key_desc) : 4224 errdetail("Duplicate keys exist."), 4225 errtableconstraint(state->heapRel, 4226 RelationGetRelationName(state->indexRel)))); 4227 } 4228 4229 /* 4230 * If key values are equal, we sort on ItemPointer. This is required for 4231 * btree indexes, since heap TID is treated as an implicit last key 4232 * attribute in order to ensure that all keys in the index are physically 4233 * unique. 4234 */ 4235 { 4236 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); 4237 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); 4238 4239 if (blk1 != blk2) 4240 return (blk1 < blk2) ? -1 : 1; 4241 } 4242 { 4243 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); 4244 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); 4245 4246 if (pos1 != pos2) 4247 return (pos1 < pos2) ? -1 : 1; 4248 } 4249 4250 /* ItemPointer values should never be equal */ 4251 Assert(false); 4252 4253 return 0; 4254 } 4255 4256 static int 4257 comparetup_index_hash(const SortTuple *a, const SortTuple *b, 4258 Tuplesortstate *state) 4259 { 4260 Bucket bucket1; 4261 Bucket bucket2; 4262 IndexTuple tuple1; 4263 IndexTuple tuple2; 4264 4265 /* 4266 * Fetch hash keys and mask off bits we don't want to sort by. We know 4267 * that the first column of the index tuple is the hash key. 4268 */ 4269 Assert(!a->isnull1); 4270 bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1), 4271 state->max_buckets, state->high_mask, 4272 state->low_mask); 4273 Assert(!b->isnull1); 4274 bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1), 4275 state->max_buckets, state->high_mask, 4276 state->low_mask); 4277 if (bucket1 > bucket2) 4278 return 1; 4279 else if (bucket1 < bucket2) 4280 return -1; 4281 4282 /* 4283 * If hash values are equal, we sort on ItemPointer. This does not affect 4284 * validity of the finished index, but it may be useful to have index 4285 * scans in physical order. 4286 */ 4287 tuple1 = (IndexTuple) a->tuple; 4288 tuple2 = (IndexTuple) b->tuple; 4289 4290 { 4291 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); 4292 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); 4293 4294 if (blk1 != blk2) 4295 return (blk1 < blk2) ? -1 : 1; 4296 } 4297 { 4298 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); 4299 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); 4300 4301 if (pos1 != pos2) 4302 return (pos1 < pos2) ? -1 : 1; 4303 } 4304 4305 /* ItemPointer values should never be equal */ 4306 Assert(false); 4307 4308 return 0; 4309 } 4310 4311 static void 4312 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) 4313 { 4314 /* Not currently needed */ 4315 elog(ERROR, "copytup_index() should not be called"); 4316 } 4317 4318 static void 4319 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) 4320 { 4321 IndexTuple tuple = (IndexTuple) stup->tuple; 4322 unsigned int tuplen; 4323 4324 tuplen = IndexTupleSize(tuple) + sizeof(tuplen); 4325 LogicalTapeWrite(state->tapeset, tapenum, 4326 (void *) &tuplen, sizeof(tuplen)); 4327 LogicalTapeWrite(state->tapeset, tapenum, 4328 (void *) tuple, IndexTupleSize(tuple)); 4329 if (state->randomAccess) /* need trailing length word? */ 4330 LogicalTapeWrite(state->tapeset, tapenum, 4331 (void *) &tuplen, sizeof(tuplen)); 4332 4333 if (!state->slabAllocatorUsed) 4334 { 4335 FREEMEM(state, GetMemoryChunkSpace(tuple)); 4336 pfree(tuple); 4337 } 4338 } 4339 4340 static void 4341 readtup_index(Tuplesortstate *state, SortTuple *stup, 4342 int tapenum, unsigned int len) 4343 { 4344 unsigned int tuplen = len - sizeof(unsigned int); 4345 IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); 4346 4347 LogicalTapeReadExact(state->tapeset, tapenum, 4348 tuple, tuplen); 4349 if (state->randomAccess) /* need trailing length word? */ 4350 LogicalTapeReadExact(state->tapeset, tapenum, 4351 &tuplen, sizeof(tuplen)); 4352 stup->tuple = (void *) tuple; 4353 /* set up first-column key value */ 4354 stup->datum1 = index_getattr(tuple, 4355 1, 4356 RelationGetDescr(state->indexRel), 4357 &stup->isnull1); 4358 } 4359 4360 /* 4361 * Routines specialized for DatumTuple case 4362 */ 4363 4364 static int 4365 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) 4366 { 4367 int compare; 4368 4369 compare = ApplySortComparator(a->datum1, a->isnull1, 4370 b->datum1, b->isnull1, 4371 state->sortKeys); 4372 if (compare != 0) 4373 return compare; 4374 4375 /* if we have abbreviations, then "tuple" has the original value */ 4376 4377 if (state->sortKeys->abbrev_converter) 4378 compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1, 4379 PointerGetDatum(b->tuple), b->isnull1, 4380 state->sortKeys); 4381 4382 return compare; 4383 } 4384 4385 static void 4386 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) 4387 { 4388 /* Not currently needed */ 4389 elog(ERROR, "copytup_datum() should not be called"); 4390 } 4391 4392 static void 4393 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) 4394 { 4395 void *waddr; 4396 unsigned int tuplen; 4397 unsigned int writtenlen; 4398 4399 if (stup->isnull1) 4400 { 4401 waddr = NULL; 4402 tuplen = 0; 4403 } 4404 else if (!state->tuples) 4405 { 4406 waddr = &stup->datum1; 4407 tuplen = sizeof(Datum); 4408 } 4409 else 4410 { 4411 waddr = stup->tuple; 4412 tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen); 4413 Assert(tuplen != 0); 4414 } 4415 4416 writtenlen = tuplen + sizeof(unsigned int); 4417 4418 LogicalTapeWrite(state->tapeset, tapenum, 4419 (void *) &writtenlen, sizeof(writtenlen)); 4420 LogicalTapeWrite(state->tapeset, tapenum, 4421 waddr, tuplen); 4422 if (state->randomAccess) /* need trailing length word? */ 4423 LogicalTapeWrite(state->tapeset, tapenum, 4424 (void *) &writtenlen, sizeof(writtenlen)); 4425 4426 if (!state->slabAllocatorUsed && stup->tuple) 4427 { 4428 FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); 4429 pfree(stup->tuple); 4430 } 4431 } 4432 4433 static void 4434 readtup_datum(Tuplesortstate *state, SortTuple *stup, 4435 int tapenum, unsigned int len) 4436 { 4437 unsigned int tuplen = len - sizeof(unsigned int); 4438 4439 if (tuplen == 0) 4440 { 4441 /* it's NULL */ 4442 stup->datum1 = (Datum) 0; 4443 stup->isnull1 = true; 4444 stup->tuple = NULL; 4445 } 4446 else if (!state->tuples) 4447 { 4448 Assert(tuplen == sizeof(Datum)); 4449 LogicalTapeReadExact(state->tapeset, tapenum, 4450 &stup->datum1, tuplen); 4451 stup->isnull1 = false; 4452 stup->tuple = NULL; 4453 } 4454 else 4455 { 4456 void *raddr = readtup_alloc(state, tuplen); 4457 4458 LogicalTapeReadExact(state->tapeset, tapenum, 4459 raddr, tuplen); 4460 stup->datum1 = PointerGetDatum(raddr); 4461 stup->isnull1 = false; 4462 stup->tuple = raddr; 4463 } 4464 4465 if (state->randomAccess) /* need trailing length word? */ 4466 LogicalTapeReadExact(state->tapeset, tapenum, 4467 &tuplen, sizeof(tuplen)); 4468 } 4469 4470 /* 4471 * Parallel sort routines 4472 */ 4473 4474 /* 4475 * tuplesort_estimate_shared - estimate required shared memory allocation 4476 * 4477 * nWorkers is an estimate of the number of workers (it's the number that 4478 * will be requested). 4479 */ 4480 Size 4481 tuplesort_estimate_shared(int nWorkers) 4482 { 4483 Size tapesSize; 4484 4485 Assert(nWorkers > 0); 4486 4487 /* Make sure that BufFile shared state is MAXALIGN'd */ 4488 tapesSize = mul_size(sizeof(TapeShare), nWorkers); 4489 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes))); 4490 4491 return tapesSize; 4492 } 4493 4494 /* 4495 * tuplesort_initialize_shared - initialize shared tuplesort state 4496 * 4497 * Must be called from leader process before workers are launched, to 4498 * establish state needed up-front for worker tuplesortstates. nWorkers 4499 * should match the argument passed to tuplesort_estimate_shared(). 4500 */ 4501 void 4502 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg) 4503 { 4504 int i; 4505 4506 Assert(nWorkers > 0); 4507 4508 SpinLockInit(&shared->mutex); 4509 shared->currentWorker = 0; 4510 shared->workersFinished = 0; 4511 SharedFileSetInit(&shared->fileset, seg); 4512 shared->nTapes = nWorkers; 4513 for (i = 0; i < nWorkers; i++) 4514 { 4515 shared->tapes[i].firstblocknumber = 0L; 4516 } 4517 } 4518 4519 /* 4520 * tuplesort_attach_shared - attach to shared tuplesort state 4521 * 4522 * Must be called by all worker processes. 4523 */ 4524 void 4525 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg) 4526 { 4527 /* Attach to SharedFileSet */ 4528 SharedFileSetAttach(&shared->fileset, seg); 4529 } 4530 4531 /* 4532 * worker_get_identifier - Assign and return ordinal identifier for worker 4533 * 4534 * The order in which these are assigned is not well defined, and should not 4535 * matter; worker numbers across parallel sort participants need only be 4536 * distinct and gapless. logtape.c requires this. 4537 * 4538 * Note that the identifiers assigned from here have no relation to 4539 * ParallelWorkerNumber number, to avoid making any assumption about 4540 * caller's requirements. However, we do follow the ParallelWorkerNumber 4541 * convention of representing a non-worker with worker number -1. This 4542 * includes the leader, as well as serial Tuplesort processes. 4543 */ 4544 static int 4545 worker_get_identifier(Tuplesortstate *state) 4546 { 4547 Sharedsort *shared = state->shared; 4548 int worker; 4549 4550 Assert(WORKER(state)); 4551 4552 SpinLockAcquire(&shared->mutex); 4553 worker = shared->currentWorker++; 4554 SpinLockRelease(&shared->mutex); 4555 4556 return worker; 4557 } 4558 4559 /* 4560 * worker_freeze_result_tape - freeze worker's result tape for leader 4561 * 4562 * This is called by workers just after the result tape has been determined, 4563 * instead of calling LogicalTapeFreeze() directly. They do so because 4564 * workers require a few additional steps over similar serial 4565 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra 4566 * steps are around freeing now unneeded resources, and representing to 4567 * leader that worker's input run is available for its merge. 4568 * 4569 * There should only be one final output run for each worker, which consists 4570 * of all tuples that were originally input into worker. 4571 */ 4572 static void 4573 worker_freeze_result_tape(Tuplesortstate *state) 4574 { 4575 Sharedsort *shared = state->shared; 4576 TapeShare output; 4577 4578 Assert(WORKER(state)); 4579 Assert(state->result_tape != -1); 4580 Assert(state->memtupcount == 0); 4581 4582 /* 4583 * Free most remaining memory, in case caller is sensitive to our holding 4584 * on to it. memtuples may not be a tiny merge heap at this point. 4585 */ 4586 pfree(state->memtuples); 4587 /* Be tidy */ 4588 state->memtuples = NULL; 4589 state->memtupsize = 0; 4590 4591 /* 4592 * Parallel worker requires result tape metadata, which is to be stored in 4593 * shared memory for leader 4594 */ 4595 LogicalTapeFreeze(state->tapeset, state->result_tape, &output); 4596 4597 /* Store properties of output tape, and update finished worker count */ 4598 SpinLockAcquire(&shared->mutex); 4599 shared->tapes[state->worker] = output; 4600 shared->workersFinished++; 4601 SpinLockRelease(&shared->mutex); 4602 } 4603 4604 /* 4605 * worker_nomergeruns - dump memtuples in worker, without merging 4606 * 4607 * This called as an alternative to mergeruns() with a worker when no 4608 * merging is required. 4609 */ 4610 static void 4611 worker_nomergeruns(Tuplesortstate *state) 4612 { 4613 Assert(WORKER(state)); 4614 Assert(state->result_tape == -1); 4615 4616 state->result_tape = state->tp_tapenum[state->destTape]; 4617 worker_freeze_result_tape(state); 4618 } 4619 4620 /* 4621 * leader_takeover_tapes - create tapeset for leader from worker tapes 4622 * 4623 * So far, leader Tuplesortstate has performed no actual sorting. By now, all 4624 * sorting has occurred in workers, all of which must have already returned 4625 * from tuplesort_performsort(). 4626 * 4627 * When this returns, leader process is left in a state that is virtually 4628 * indistinguishable from it having generated runs as a serial external sort 4629 * might have. 4630 */ 4631 static void 4632 leader_takeover_tapes(Tuplesortstate *state) 4633 { 4634 Sharedsort *shared = state->shared; 4635 int nParticipants = state->nParticipants; 4636 int workersFinished; 4637 int j; 4638 4639 Assert(LEADER(state)); 4640 Assert(nParticipants >= 1); 4641 4642 SpinLockAcquire(&shared->mutex); 4643 workersFinished = shared->workersFinished; 4644 SpinLockRelease(&shared->mutex); 4645 4646 if (nParticipants != workersFinished) 4647 elog(ERROR, "cannot take over tapes before all workers finish"); 4648 4649 /* 4650 * Create the tapeset from worker tapes, including a leader-owned tape at 4651 * the end. Parallel workers are far more expensive than logical tapes, 4652 * so the number of tapes allocated here should never be excessive. 4653 * 4654 * We still have a leader tape, though it's not possible to write to it 4655 * due to restrictions in the shared fileset infrastructure used by 4656 * logtape.c. It will never be written to in practice because 4657 * randomAccess is disallowed for parallel sorts. 4658 */ 4659 inittapestate(state, nParticipants + 1); 4660 state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false, 4661 shared->tapes, &shared->fileset, 4662 state->worker); 4663 4664 /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ 4665 state->currentRun = nParticipants; 4666 4667 /* 4668 * Initialize variables of Algorithm D to be consistent with runs from 4669 * workers having been generated in the leader. 4670 * 4671 * There will always be exactly 1 run per worker, and exactly one input 4672 * tape per run, because workers always output exactly 1 run, even when 4673 * there were no input tuples for workers to sort. 4674 */ 4675 for (j = 0; j < state->maxTapes; j++) 4676 { 4677 /* One real run; no dummy runs for worker tapes */ 4678 state->tp_fib[j] = 1; 4679 state->tp_runs[j] = 1; 4680 state->tp_dummy[j] = 0; 4681 state->tp_tapenum[j] = j; 4682 } 4683 /* Leader tape gets one dummy run, and no real runs */ 4684 state->tp_fib[state->tapeRange] = 0; 4685 state->tp_runs[state->tapeRange] = 0; 4686 state->tp_dummy[state->tapeRange] = 1; 4687 4688 state->Level = 1; 4689 state->destTape = 0; 4690 4691 state->status = TSS_BUILDRUNS; 4692 } 4693 4694 /* 4695 * Convenience routine to free a tuple previously loaded into sort memory 4696 */ 4697 static void 4698 free_sort_tuple(Tuplesortstate *state, SortTuple *stup) 4699 { 4700 if (stup->tuple) 4701 { 4702 FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); 4703 pfree(stup->tuple); 4704 stup->tuple = NULL; 4705 } 4706 } 4707