/*------------------------------------------------------------------------- * * tuplesort.c * Generalized tuple sorting routines. * * This module handles sorting of heap tuples, index tuples, or single * Datums (and could easily support other kinds of sortable objects, * if necessary). It works efficiently for both small and large amounts * of data. Small amounts are sorted in-memory using qsort(). Large * amounts are sorted using temporary files and a standard external sort * algorithm. * * See Knuth, volume 3, for more than you want to know about the external * sorting algorithm. Historically, we divided the input into sorted runs * using replacement selection, in the form of a priority tree implemented * as a heap (essentially his Algorithm 5.2.3H), but now we always use * quicksort for run generation. We merge the runs using polyphase merge, * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are * implemented by logtape.c, which avoids space wastage by recycling disk * space as soon as each block is read from its "tape". * * The approximate amount of memory allowed for any one sort operation * is specified in kilobytes by the caller (most pass work_mem). Initially, * we absorb tuples and simply store them in an unsorted array as long as * we haven't exceeded workMem. If we reach the end of the input without * exceeding workMem, we sort the array using qsort() and subsequently return * tuples just by scanning the tuple array sequentially. If we do exceed * workMem, we begin to emit tuples into sorted runs in temporary tapes. * When tuples are dumped in batch after quicksorting, we begin a new run * with a new output tape (selected per Algorithm D). After the end of the * input is reached, we dump out remaining tuples in memory into a final run, * then merge the runs using Algorithm D. * * When merging runs, we use a heap containing just the frontmost tuple from * each source run; we repeatedly output the smallest tuple and replace it * with the next tuple from its source tape (if any). When the heap empties, * the merge is complete. The basic merge algorithm thus needs very little * memory --- only M tuples for an M-way merge, and M is constrained to a * small number. However, we can still make good use of our full workMem * allocation by pre-reading additional blocks from each source tape. Without * prereading, our access pattern to the temporary file would be very erratic; * on average we'd read one block from each of M source tapes during the same * time that we're writing M blocks to the output tape, so there is no * sequentiality of access at all, defeating the read-ahead methods used by * most Unix kernels. Worse, the output tape gets written into a very random * sequence of blocks of the temp file, ensuring that things will be even * worse when it comes time to read that tape. A straightforward merge pass * thus ends up doing a lot of waiting for disk seeks. We can improve matters * by prereading from each source tape sequentially, loading about workMem/M * bytes from each tape in turn, and making the sequential blocks immediately * available for reuse. This approach helps to localize both read and write * accesses. The pre-reading is handled by logtape.c, we just tell it how * much memory to use for the buffers. * * When the caller requests random access to the sort result, we form * the final sorted run on a logical tape which is then "frozen", so * that we can access it randomly. When the caller does not need random * access, we return from tuplesort_performsort() as soon as we are down * to one run per logical tape. The final merge is then performed * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this * saves one cycle of writing all the data out to disk and reading it in. * * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that * tape drives are expensive beasts, and in particular that there will always * be many more runs than tape drives. In our implementation a "tape drive" * doesn't cost much more than a few Kb of memory buffers, so we can afford * to have lots of them. In particular, if we can have as many tape drives * as sorted runs, we can eliminate any repeated I/O at all. In the current * code we determine the number of tapes M on the basis of workMem: we want * workMem/M to be large enough that we read a fair amount of data each time * we preread from a tape, so as to maintain the locality of access described * above. Nonetheless, with large workMem we can have many tapes (but not * too many -- see the comments in tuplesort_merge_order). * * This module supports parallel sorting. Parallel sorts involve coordination * among one or more worker processes, and a leader process, each with its own * tuplesort state. The leader process (or, more accurately, the * Tuplesortstate associated with a leader process) creates a full tapeset * consisting of worker tapes with one run to merge; a run for every * worker process. This is then merged. Worker processes are guaranteed to * produce exactly one output run from their partial input. * * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/utils/sort/tuplesort.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include "access/htup_details.h" #include "access/nbtree.h" #include "access/hash.h" #include "catalog/index.h" #include "catalog/pg_am.h" #include "commands/tablespace.h" #include "executor/executor.h" #include "miscadmin.h" #include "pg_trace.h" #include "utils/datum.h" #include "utils/logtape.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_rusage.h" #include "utils/rel.h" #include "utils/sortsupport.h" #include "utils/tuplesort.h" /* sort-type codes for sort__start probes */ #define HEAP_SORT 0 #define INDEX_SORT 1 #define DATUM_SORT 2 #define CLUSTER_SORT 3 /* Sort parallel code from state for sort__start probes */ #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \ (state)->worker >= 0 ? 1 : 2) /* GUC variables */ #ifdef TRACE_SORT bool trace_sort = false; #endif #ifdef DEBUG_BOUNDED_SORT bool optimize_bounded_sort = true; #endif /* * The objects we actually sort are SortTuple structs. These contain * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), * which is a separate palloc chunk --- we assume it is just one chunk and * can be freed by a simple pfree() (except during merge, when we use a * simple slab allocator). SortTuples also contain the tuple's first key * column in Datum/nullflag format, and an index integer. * * Storing the first key column lets us save heap_getattr or index_getattr * calls during tuple comparisons. We could extract and save all the key * columns not just the first, but this would increase code complexity and * overhead, and wouldn't actually save any comparison cycles in the common * case where the first key determines the comparison result. Note that * for a pass-by-reference datatype, datum1 points into the "tuple" storage. * * There is one special case: when the sort support infrastructure provides an * "abbreviated key" representation, where the key is (typically) a pass by * value proxy for a pass by reference type. In this case, the abbreviated key * is stored in datum1 in place of the actual first key column. * * When sorting single Datums, the data value is represented directly by * datum1/isnull1 for pass by value types (or null values). If the datatype is * pass-by-reference and isnull1 is false, then "tuple" points to a separately * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then * either the same pointer as "tuple", or is an abbreviated key value as * described above. Accordingly, "tuple" is always used in preference to * datum1 as the authoritative value for pass-by-reference cases. * * tupindex holds the input tape number that each tuple in the heap was read * from during merge passes. */ typedef struct { void *tuple; /* the tuple itself */ Datum datum1; /* value of first key column */ bool isnull1; /* is first key column NULL? */ int tupindex; /* see notes above */ } SortTuple; /* * During merge, we use a pre-allocated set of fixed-size slots to hold * tuples. To avoid palloc/pfree overhead. * * Merge doesn't require a lot of memory, so we can afford to waste some, * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the * palloc() overhead is not significant anymore. * * 'nextfree' is valid when this chunk is in the free list. When in use, the * slot holds a tuple. */ #define SLAB_SLOT_SIZE 1024 typedef union SlabSlot { union SlabSlot *nextfree; char buffer[SLAB_SLOT_SIZE]; } SlabSlot; /* * Possible states of a Tuplesort object. These denote the states that * persist between calls of Tuplesort routines. */ typedef enum { TSS_INITIAL, /* Loading tuples; still within memory limit */ TSS_BOUNDED, /* Loading tuples into bounded-size heap */ TSS_BUILDRUNS, /* Loading tuples; writing to tape */ TSS_SORTEDINMEM, /* Sort completed entirely in memory */ TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */ TSS_FINALMERGE /* Performing final merge on-the-fly */ } TupSortStatus; /* * Parameters for calculation of number of tapes to use --- see inittapes() * and tuplesort_merge_order(). * * In this calculation we assume that each tape will cost us about 1 blocks * worth of buffer space. This ignores the overhead of all the other data * structures needed for each tape, but it's probably close enough. * * MERGE_BUFFER_SIZE is how much data we'd like to read from each input * tape during a preread cycle (see discussion at top of file). */ #define MINORDER 6 /* minimum merge order */ #define MAXORDER 500 /* maximum merge order */ #define TAPE_BUFFER_OVERHEAD BLCKSZ #define MERGE_BUFFER_SIZE (BLCKSZ * 32) typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b, Tuplesortstate *state); /* * Private state of a Tuplesort operation. */ struct Tuplesortstate { TupSortStatus status; /* enumerated value as shown above */ int nKeys; /* number of columns in sort key */ bool randomAccess; /* did caller request random access? */ bool bounded; /* did caller specify a maximum number of * tuples to return? */ bool boundUsed; /* true if we made use of a bounded heap */ int bound; /* if bounded, the maximum number of tuples */ bool tuples; /* Can SortTuple.tuple ever be set? */ int64 availMem; /* remaining memory available, in bytes */ int64 allowedMem; /* total memory allowed, in bytes */ int maxTapes; /* number of tapes (Knuth's T) */ int tapeRange; /* maxTapes-1 (Knuth's P) */ MemoryContext sortcontext; /* memory context holding most sort data */ MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ /* * These function pointers decouple the routines that must know what kind * of tuple we are sorting from the routines that don't need to know it. * They are set up by the tuplesort_begin_xxx routines. * * Function to compare two tuples; result is per qsort() convention, ie: * <0, 0, >0 according as ab. The API must match * qsort_arg_comparator. */ SortTupleComparator comparetup; /* * Function to copy a supplied input tuple into palloc'd space and set up * its SortTuple representation (ie, set tuple/datum1/isnull1). Also, * state->availMem must be decreased by the amount of space used for the * tuple copy (note the SortTuple struct itself is not counted). */ void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup); /* * Function to write a stored tuple onto tape. The representation of the * tuple on tape need not be the same as it is in memory; requirements on * the tape representation are given below. Unless the slab allocator is * used, after writing the tuple, pfree() the out-of-line data (not the * SortTuple struct!), and increase state->availMem by the amount of * memory space thereby released. */ void (*writetup) (Tuplesortstate *state, int tapenum, SortTuple *stup); /* * Function to read a stored tuple from tape back into memory. 'len' is * the already-read length of the stored tuple. The tuple is allocated * from the slab memory arena, or is palloc'd, see readtup_alloc(). */ void (*readtup) (Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); /* * This array holds the tuples now in sort memory. If we are in state * INITIAL, the tuples are in no particular order; if we are in state * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS * and FINALMERGE, the tuples are organized in "heap" order per Algorithm * H. In state SORTEDONTAPE, the array is not used. */ SortTuple *memtuples; /* array of SortTuple structs */ int memtupcount; /* number of tuples currently present */ int memtupsize; /* allocated length of memtuples array */ bool growmemtuples; /* memtuples' growth still underway? */ /* * Memory for tuples is sometimes allocated using a simple slab allocator, * rather than with palloc(). Currently, we switch to slab allocation * when we start merging. Merging only needs to keep a small, fixed * number of tuples in memory at any time, so we can avoid the * palloc/pfree overhead by recycling a fixed number of fixed-size slots * to hold the tuples. * * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE * slots. The allocation is sized to have one slot per tape, plus one * additional slot. We need that many slots to hold all the tuples kept * in the heap during merge, plus the one we have last returned from the * sort, with tuplesort_gettuple. * * Initially, all the slots are kept in a linked list of free slots. When * a tuple is read from a tape, it is put to the next available slot, if * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd * instead. * * When we're done processing a tuple, we return the slot back to the free * list, or pfree() if it was palloc'd. We know that a tuple was * allocated from the slab, if its pointer value is between * slabMemoryBegin and -End. * * When the slab allocator is used, the USEMEM/LACKMEM mechanism of * tracking memory usage is not used. */ bool slabAllocatorUsed; char *slabMemoryBegin; /* beginning of slab memory arena */ char *slabMemoryEnd; /* end of slab memory arena */ SlabSlot *slabFreeHead; /* head of free list */ /* Buffer size to use for reading input tapes, during merge. */ size_t read_buffer_size; /* * When we return a tuple to the caller in tuplesort_gettuple_XXX, that * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE * modes), we remember the tuple in 'lastReturnedTuple', so that we can * recycle the memory on next gettuple call. */ void *lastReturnedTuple; /* * While building initial runs, this is the current output run number. * Afterwards, it is the number of initial runs we made. */ int currentRun; /* * Unless otherwise noted, all pointer variables below are pointers to * arrays of length maxTapes, holding per-tape data. */ /* * This variable is only used during merge passes. mergeactive[i] is true * if we are reading an input run from (actual) tape number i and have not * yet exhausted that run. */ bool *mergeactive; /* active input run source? */ /* * Variables for Algorithm D. Note that destTape is a "logical" tape * number, ie, an index into the tp_xxx[] arrays. Be careful to keep * "logical" and "actual" tape numbers straight! */ int Level; /* Knuth's l */ int destTape; /* current output tape (Knuth's j, less 1) */ int *tp_fib; /* Target Fibonacci run counts (A[]) */ int *tp_runs; /* # of real runs on each tape */ int *tp_dummy; /* # of dummy runs for each tape (D[]) */ int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ int activeTapes; /* # of active input tapes in merge pass */ /* * These variables are used after completion of sorting to keep track of * the next tuple to return. (In the tape case, the tape's current read * position is also critical state.) */ int result_tape; /* actual tape number of finished output */ int current; /* array index (only used if SORTEDINMEM) */ bool eof_reached; /* reached EOF (needed for cursors) */ /* markpos_xxx holds marked position for mark and restore */ long markpos_block; /* tape block# (only used if SORTEDONTAPE) */ int markpos_offset; /* saved "current", or offset in tape block */ bool markpos_eof; /* saved "eof_reached" */ /* * These variables are used during parallel sorting. * * worker is our worker identifier. Follows the general convention that * -1 value relates to a leader tuplesort, and values >= 0 worker * tuplesorts. (-1 can also be a serial tuplesort.) * * shared is mutable shared memory state, which is used to coordinate * parallel sorts. * * nParticipants is the number of worker Tuplesortstates known by the * leader to have actually been launched, which implies that they must * finish a run leader can merge. Typically includes a worker state held * by the leader process itself. Set in the leader Tuplesortstate only. */ int worker; Sharedsort *shared; int nParticipants; /* * The sortKeys variable is used by every case other than the hash index * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the * MinimalTuple and CLUSTER routines, though. */ TupleDesc tupDesc; SortSupport sortKeys; /* array of length nKeys */ /* * This variable is shared by the single-key MinimalTuple case and the * Datum case (which both use qsort_ssup()). Otherwise it's NULL. */ SortSupport onlyKey; /* * Additional state for managing "abbreviated key" sortsupport routines * (which currently may be used by all cases except the hash index case). * Tracks the intervals at which the optimization's effectiveness is * tested. */ int64 abbrevNext; /* Tuple # at which to next check * applicability */ /* * These variables are specific to the CLUSTER case; they are set by * tuplesort_begin_cluster. */ IndexInfo *indexInfo; /* info about index being used for reference */ EState *estate; /* for evaluating index expressions */ /* * These variables are specific to the IndexTuple case; they are set by * tuplesort_begin_index_xxx and used only by the IndexTuple routines. */ Relation heapRel; /* table the index is being built on */ Relation indexRel; /* index being built */ /* These are specific to the index_btree subcase: */ bool enforceUnique; /* complain if we find duplicate tuples */ /* These are specific to the index_hash subcase: */ uint32 high_mask; /* masks for sortable part of hash code */ uint32 low_mask; uint32 max_buckets; /* * These variables are specific to the Datum case; they are set by * tuplesort_begin_datum and used only by the DatumTuple routines. */ Oid datumType; /* we need typelen in order to know how to copy the Datums. */ int datumTypeLen; /* * Resource snapshot for time of sort start. */ #ifdef TRACE_SORT PGRUsage ru_start; #endif }; /* * Private mutable state of tuplesort-parallel-operation. This is allocated * in shared memory. */ struct Sharedsort { /* mutex protects all fields prior to tapes */ slock_t mutex; /* * currentWorker generates ordinal identifier numbers for parallel sort * workers. These start from 0, and are always gapless. * * Workers increment workersFinished to indicate having finished. If this * is equal to state.nParticipants within the leader, leader is ready to * merge worker runs. */ int currentWorker; int workersFinished; /* Temporary file space */ SharedFileSet fileset; /* Size of tapes flexible array */ int nTapes; /* * Tapes array used by workers to report back information needed by the * leader to concatenate all worker tapes into one for merging */ TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]; }; /* * Is the given tuple allocated from the slab memory arena? */ #define IS_SLAB_SLOT(state, tuple) \ ((char *) (tuple) >= (state)->slabMemoryBegin && \ (char *) (tuple) < (state)->slabMemoryEnd) /* * Return the given tuple to the slab memory free list, or free it * if it was palloc'd. */ #define RELEASE_SLAB_SLOT(state, tuple) \ do { \ SlabSlot *buf = (SlabSlot *) tuple; \ \ if (IS_SLAB_SLOT((state), buf)) \ { \ buf->nextfree = (state)->slabFreeHead; \ (state)->slabFreeHead = buf; \ } else \ pfree(buf); \ } while(0) #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state)) #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) #define SERIAL(state) ((state)->shared == NULL) #define WORKER(state) ((state)->shared && (state)->worker != -1) #define LEADER(state) ((state)->shared && (state)->worker == -1) /* * NOTES about on-tape representation of tuples: * * We require the first "unsigned int" of a stored tuple to be the total size * on-tape of the tuple, including itself (so it is never zero; an all-zero * unsigned int is used to delimit runs). The remainder of the stored tuple * may or may not match the in-memory representation of the tuple --- * any conversion needed is the job of the writetup and readtup routines. * * If state->randomAccess is true, then the stored representation of the * tuple must be followed by another "unsigned int" that is a copy of the * length --- so the total tape space used is actually sizeof(unsigned int) * more than the stored length value. This allows read-backwards. When * randomAccess is not true, the write/read routines may omit the extra * length word. * * writetup is expected to write both length words as well as the tuple * data. When readtup is called, the tape is positioned just after the * front length word; readtup must read the tuple data and advance past * the back length word (if present). * * The write/read routines can make use of the tuple description data * stored in the Tuplesortstate record, if needed. They are also expected * to adjust state->availMem by the amount of memory space (not tape space!) * released or consumed. There is no error return from either writetup * or readtup; they should ereport() on failure. * * * NOTES about memory consumption calculations: * * We count space allocated for tuples against the workMem limit, plus * the space used by the variable-size memtuples array. Fixed-size space * is not counted; it's small enough to not be interesting. * * Note that we count actual space used (as shown by GetMemoryChunkSpace) * rather than the originally-requested size. This is important since * palloc can add substantial overhead. It's not a complete answer since * we won't count any wasted space in palloc allocation blocks, but it's * a lot better than what we were doing before 7.3. As of 9.6, a * separate memory context is used for caller passed tuples. Resetting * it at certain key increments significantly ameliorates fragmentation. * Note that this places a responsibility on readtup and copytup routines * to use the right memory context for these tuples (and to not use the * reset context for anything whose lifetime needs to span multiple * external sort runs). */ /* When using this macro, beware of double evaluation of len */ #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \ do { \ if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \ elog(ERROR, "unexpected end of data"); \ } while(0) static Tuplesortstate *tuplesort_begin_common(int workMem, SortCoordinate coordinate, bool randomAccess); static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); static bool consider_abort_common(Tuplesortstate *state); static void inittapes(Tuplesortstate *state, bool mergeruns); static void inittapestate(Tuplesortstate *state, int maxTapes); static void selectnewtape(Tuplesortstate *state); static void init_slab_allocator(Tuplesortstate *state, int numSlots); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); static void dumptuples(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state); static void tuplesort_sort_memtuples(Tuplesortstate *state); static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_delete_top(Tuplesortstate *state); static void reversedirection(Tuplesortstate *state); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); static void *readtup_alloc(Tuplesortstate *state, Size tuplen); static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); static void writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); static void writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); static void writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); static void writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); static int worker_get_identifier(Tuplesortstate *state); static void worker_freeze_result_tape(Tuplesortstate *state); static void worker_nomergeruns(Tuplesortstate *state); static void leader_takeover_tapes(Tuplesortstate *state); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); /* * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts * any variant of SortTuples, using the appropriate comparetup function. * qsort_ssup() is specialized for the case where the comparetup function * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts * and Datum sorts. */ #include "qsort_tuple.c" /* * tuplesort_begin_xxx * * Initialize for a tuple sort operation. * * After calling tuplesort_begin, the caller should call tuplesort_putXXX * zero or more times, then call tuplesort_performsort when all the tuples * have been supplied. After performsort, retrieve the tuples in sorted * order by calling tuplesort_getXXX until it returns false/NULL. (If random * access was requested, rescan, markpos, and restorepos can also be called.) * Call tuplesort_end to terminate the operation and release memory/disk space. * * Each variant of tuplesort_begin has a workMem parameter specifying the * maximum number of kilobytes of RAM to use before spilling data to disk. * (The normal value of this parameter is work_mem, but some callers use * other values.) Each variant also has a randomAccess parameter specifying * whether the caller needs non-sequential access to the sort result. */ static Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, bool randomAccess) { Tuplesortstate *state; MemoryContext sortcontext; MemoryContext tuplecontext; MemoryContext oldcontext; /* See leader_takeover_tapes() remarks on randomAccess support */ if (coordinate && randomAccess) elog(ERROR, "random access disallowed under parallel sort"); /* * Create a working memory context for this sort operation. All data * needed by the sort will live inside this context. */ sortcontext = AllocSetContextCreate(CurrentMemoryContext, "TupleSort main", ALLOCSET_DEFAULT_SIZES); /* * Caller tuple (e.g. IndexTuple) memory context. * * A dedicated child context used exclusively for caller passed tuples * eases memory management. Resetting at key points reduces * fragmentation. Note that the memtuples array of SortTuples is allocated * in the parent context, not this context, because there is no need to * free memtuples early. */ tuplecontext = AllocSetContextCreate(sortcontext, "Caller tuples", ALLOCSET_DEFAULT_SIZES); /* * Make the Tuplesortstate within the per-sort context. This way, we * don't need a separate pfree() operation for it at shutdown. */ oldcontext = MemoryContextSwitchTo(sortcontext); state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate)); #ifdef TRACE_SORT if (trace_sort) pg_rusage_init(&state->ru_start); #endif state->status = TSS_INITIAL; state->randomAccess = randomAccess; state->bounded = false; state->tuples = true; state->boundUsed = false; /* * workMem is forced to be at least 64KB, the current minimum valid value * for the work_mem GUC. This is a defense against parallel sort callers * that divide out memory among many workers in a way that leaves each * with very little memory. */ state->allowedMem = Max(workMem, 64) * (int64) 1024; state->availMem = state->allowedMem; state->sortcontext = sortcontext; state->tuplecontext = tuplecontext; state->tapeset = NULL; state->memtupcount = 0; /* * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; * see comments in grow_memtuples(). */ state->memtupsize = Max(1024, ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1); state->growmemtuples = true; state->slabAllocatorUsed = false; state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); /* workMem must be large enough for the minimal memtuples array */ if (LACKMEM(state)) elog(ERROR, "insufficient memory allowed for sort"); state->currentRun = 0; /* * maxTapes, tapeRange, and Algorithm D variables will be initialized by * inittapes(), if needed */ state->result_tape = -1; /* flag that result tape has not been formed */ /* * Initialize parallel-related state based on coordination information * from caller */ if (!coordinate) { /* Serial sort */ state->shared = NULL; state->worker = -1; state->nParticipants = -1; } else if (coordinate->isWorker) { /* Parallel worker produces exactly one final run from all input */ state->shared = coordinate->sharedsort; state->worker = worker_get_identifier(state); state->nParticipants = -1; } else { /* Parallel leader state only used for final merge */ state->shared = coordinate->sharedsort; state->worker = -1; state->nParticipants = coordinate->nParticipants; Assert(state->nParticipants >= 1); } MemoryContextSwitchTo(oldcontext); return state; } Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, randomAccess); MemoryContext oldcontext; int i; oldcontext = MemoryContextSwitchTo(state->sortcontext); AssertArg(nkeys > 0); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", nkeys, workMem, randomAccess ? 't' : 'f'); #endif state->nKeys = nkeys; TRACE_POSTGRESQL_SORT_START(HEAP_SORT, false, /* no unique check */ nkeys, workMem, randomAccess, PARALLEL_SORT(state)); state->comparetup = comparetup_heap; state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ state->abbrevNext = 10; /* Prepare SortSupport data for each column */ state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData)); for (i = 0; i < nkeys; i++) { SortSupport sortKey = state->sortKeys + i; AssertArg(attNums[i] != 0); AssertArg(sortOperators[i] != 0); sortKey->ssup_cxt = CurrentMemoryContext; sortKey->ssup_collation = sortCollations[i]; sortKey->ssup_nulls_first = nullsFirstFlags[i]; sortKey->ssup_attno = attNums[i]; /* Convey if abbreviation optimization is applicable in principle */ sortKey->abbreviate = (i == 0); PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); } /* * The "onlyKey" optimization cannot be used with abbreviated keys, since * tie-breaker comparisons may be required. Typically, the optimization * is only of value to pass-by-value types anyway, whereas abbreviated * keys are typically only of value to pass-by-reference types. */ if (nkeys == 1 && !state->sortKeys->abbrev_converter) state->onlyKey = state->sortKeys; MemoryContextSwitchTo(oldcontext); return state; } Tuplesortstate * tuplesort_begin_cluster(TupleDesc tupDesc, Relation indexRel, int workMem, SortCoordinate coordinate, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, randomAccess); ScanKey indexScanKey; MemoryContext oldcontext; int i; Assert(indexRel->rd_rel->relam == BTREE_AM_OID); oldcontext = MemoryContextSwitchTo(state->sortcontext); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", RelationGetNumberOfAttributes(indexRel), workMem, randomAccess ? 't' : 'f'); #endif state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT, false, /* no unique check */ state->nKeys, workMem, randomAccess, PARALLEL_SORT(state)); state->comparetup = comparetup_cluster; state->copytup = copytup_cluster; state->writetup = writetup_cluster; state->readtup = readtup_cluster; state->abbrevNext = 10; state->indexInfo = BuildIndexInfo(indexRel); state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ indexScanKey = _bt_mkscankey_nodata(indexRel); if (state->indexInfo->ii_Expressions != NULL) { TupleTableSlot *slot; ExprContext *econtext; /* * We will need to use FormIndexDatum to evaluate the index * expressions. To do that, we need an EState, as well as a * TupleTableSlot to put the table tuples into. The econtext's * scantuple has to point to that slot, too. */ state->estate = CreateExecutorState(); slot = MakeSingleTupleTableSlot(tupDesc); econtext = GetPerTupleExprContext(state->estate); econtext->ecxt_scantuple = slot; } /* Prepare SortSupport data for each column */ state->sortKeys = (SortSupport) palloc0(state->nKeys * sizeof(SortSupportData)); for (i = 0; i < state->nKeys; i++) { SortSupport sortKey = state->sortKeys + i; ScanKey scanKey = indexScanKey + i; int16 strategy; sortKey->ssup_cxt = CurrentMemoryContext; sortKey->ssup_collation = scanKey->sk_collation; sortKey->ssup_nulls_first = (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; sortKey->ssup_attno = scanKey->sk_attno; /* Convey if abbreviation optimization is applicable in principle */ sortKey->abbreviate = (i == 0); AssertState(sortKey->ssup_attno != 0); strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? BTGreaterStrategyNumber : BTLessStrategyNumber; PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); } _bt_freeskey(indexScanKey); MemoryContextSwitchTo(oldcontext); return state; } Tuplesortstate * tuplesort_begin_index_btree(Relation heapRel, Relation indexRel, bool enforceUnique, int workMem, SortCoordinate coordinate, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, randomAccess); ScanKey indexScanKey; MemoryContext oldcontext; int i; oldcontext = MemoryContextSwitchTo(state->sortcontext); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin index sort: unique = %c, workMem = %d, randomAccess = %c", enforceUnique ? 't' : 'f', workMem, randomAccess ? 't' : 'f'); #endif state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); TRACE_POSTGRESQL_SORT_START(INDEX_SORT, enforceUnique, state->nKeys, workMem, randomAccess, PARALLEL_SORT(state)); state->comparetup = comparetup_index_btree; state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; state->abbrevNext = 10; state->heapRel = heapRel; state->indexRel = indexRel; state->enforceUnique = enforceUnique; indexScanKey = _bt_mkscankey_nodata(indexRel); /* Prepare SortSupport data for each column */ state->sortKeys = (SortSupport) palloc0(state->nKeys * sizeof(SortSupportData)); for (i = 0; i < state->nKeys; i++) { SortSupport sortKey = state->sortKeys + i; ScanKey scanKey = indexScanKey + i; int16 strategy; sortKey->ssup_cxt = CurrentMemoryContext; sortKey->ssup_collation = scanKey->sk_collation; sortKey->ssup_nulls_first = (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; sortKey->ssup_attno = scanKey->sk_attno; /* Convey if abbreviation optimization is applicable in principle */ sortKey->abbreviate = (i == 0); AssertState(sortKey->ssup_attno != 0); strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? BTGreaterStrategyNumber : BTLessStrategyNumber; PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); } _bt_freeskey(indexScanKey); MemoryContextSwitchTo(oldcontext); return state; } Tuplesortstate * tuplesort_begin_index_hash(Relation heapRel, Relation indexRel, uint32 high_mask, uint32 low_mask, uint32 max_buckets, int workMem, SortCoordinate coordinate, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, randomAccess); MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(state->sortcontext); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin index sort: high_mask = 0x%x, low_mask = 0x%x, " "max_buckets = 0x%x, workMem = %d, randomAccess = %c", high_mask, low_mask, max_buckets, workMem, randomAccess ? 't' : 'f'); #endif state->nKeys = 1; /* Only one sort column, the hash code */ state->comparetup = comparetup_index_hash; state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; state->heapRel = heapRel; state->indexRel = indexRel; state->high_mask = high_mask; state->low_mask = low_mask; state->max_buckets = max_buckets; MemoryContextSwitchTo(oldcontext); return state; } Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, randomAccess); MemoryContext oldcontext; int16 typlen; bool typbyval; oldcontext = MemoryContextSwitchTo(state->sortcontext); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin datum sort: workMem = %d, randomAccess = %c", workMem, randomAccess ? 't' : 'f'); #endif state->nKeys = 1; /* always a one-column sort */ TRACE_POSTGRESQL_SORT_START(DATUM_SORT, false, /* no unique check */ 1, workMem, randomAccess, PARALLEL_SORT(state)); state->comparetup = comparetup_datum; state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; state->abbrevNext = 10; state->datumType = datumType; /* lookup necessary attributes of the datum type */ get_typlenbyval(datumType, &typlen, &typbyval); state->datumTypeLen = typlen; state->tuples = !typbyval; /* Prepare SortSupport data */ state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); state->sortKeys->ssup_cxt = CurrentMemoryContext; state->sortKeys->ssup_collation = sortCollation; state->sortKeys->ssup_nulls_first = nullsFirstFlag; /* * Abbreviation is possible here only for by-reference types. In theory, * a pass-by-value datatype could have an abbreviated form that is cheaper * to compare. In a tuple sort, we could support that, because we can * always extract the original datum from the tuple is needed. Here, we * can't, because a datum sort only stores a single copy of the datum; the * "tuple" field of each sortTuple is NULL. */ state->sortKeys->abbreviate = !typbyval; PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys); /* * The "onlyKey" optimization cannot be used with abbreviated keys, since * tie-breaker comparisons may be required. Typically, the optimization * is only of value to pass-by-value types anyway, whereas abbreviated * keys are typically only of value to pass-by-reference types. */ if (!state->sortKeys->abbrev_converter) state->onlyKey = state->sortKeys; MemoryContextSwitchTo(oldcontext); return state; } /* * tuplesort_set_bound * * Advise tuplesort that at most the first N result tuples are required. * * Must be called before inserting any tuples. (Actually, we could allow it * as long as the sort hasn't spilled to disk, but there seems no need for * delayed calls at the moment.) * * This is a hint only. The tuplesort may still return more tuples than * requested. Parallel leader tuplesorts will always ignore the hint. */ void tuplesort_set_bound(Tuplesortstate *state, int64 bound) { /* Assert we're called before loading any tuples */ Assert(state->status == TSS_INITIAL); Assert(state->memtupcount == 0); Assert(!state->bounded); Assert(!WORKER(state)); #ifdef DEBUG_BOUNDED_SORT /* Honor GUC setting that disables the feature (for easy testing) */ if (!optimize_bounded_sort) return; #endif /* Parallel leader ignores hint */ if (LEADER(state)) return; /* We want to be able to compute bound * 2, so limit the setting */ if (bound > (int64) (INT_MAX / 2)) return; state->bounded = true; state->bound = (int) bound; /* * Bounded sorts are not an effective target for abbreviated key * optimization. Disable by setting state to be consistent with no * abbreviation support. */ state->sortKeys->abbrev_converter = NULL; if (state->sortKeys->abbrev_full_comparator) state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; /* Not strictly necessary, but be tidy */ state->sortKeys->abbrev_abort = NULL; state->sortKeys->abbrev_full_comparator = NULL; } /* * tuplesort_end * * Release resources and clean up. * * NOTE: after calling this, any pointers returned by tuplesort_getXXX are * pointing to garbage. Be careful not to attempt to use or free such * pointers afterwards! */ void tuplesort_end(Tuplesortstate *state) { /* context swap probably not needed, but let's be safe */ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); #ifdef TRACE_SORT long spaceUsed; if (state->tapeset) spaceUsed = LogicalTapeSetBlocks(state->tapeset); else spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; #endif /* * Delete temporary "tape" files, if any. * * Note: want to include this in reported total cost of sort, hence need * for two #ifdef TRACE_SORT sections. */ if (state->tapeset) LogicalTapeSetClose(state->tapeset); #ifdef TRACE_SORT if (trace_sort) { if (state->tapeset) elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s", SERIAL(state) ? "external sort" : "parallel external sort", state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); else elog(LOG, "%s of worker %d ended, %ld KB used: %s", SERIAL(state) ? "internal sort" : "unperformed parallel sort", state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); } TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed); #else /* * If you disabled TRACE_SORT, you can still probe sort__done, but you * ain't getting space-used stats. */ TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L); #endif /* Free any execution state created for CLUSTER case */ if (state->estate != NULL) { ExprContext *econtext = GetPerTupleExprContext(state->estate); ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple); FreeExecutorState(state->estate); } MemoryContextSwitchTo(oldcontext); /* * Free the per-sort memory context, thereby releasing all working memory, * including the Tuplesortstate struct itself. */ MemoryContextDelete(state->sortcontext); } /* * Grow the memtuples[] array, if possible within our memory constraint. We * must not exceed INT_MAX tuples in memory or the caller-provided memory * limit. Return true if we were able to enlarge the array, false if not. * * Normally, at each increment we double the size of the array. When doing * that would exceed a limit, we attempt one last, smaller increase (and then * clear the growmemtuples flag so we don't try any more). That allows us to * use memory as fully as permitted; sticking to the pure doubling rule could * result in almost half going unused. Because availMem moves around with * tuple addition/removal, we need some rule to prevent making repeated small * increases in memtupsize, which would just be useless thrashing. The * growmemtuples flag accomplishes that and also prevents useless * recalculations in this function. */ static bool grow_memtuples(Tuplesortstate *state) { int newmemtupsize; int memtupsize = state->memtupsize; int64 memNowUsed = state->allowedMem - state->availMem; /* Forget it if we've already maxed out memtuples, per comment above */ if (!state->growmemtuples) return false; /* Select new value of memtupsize */ if (memNowUsed <= state->availMem) { /* * We've used no more than half of allowedMem; double our usage, * clamping at INT_MAX tuples. */ if (memtupsize < INT_MAX / 2) newmemtupsize = memtupsize * 2; else { newmemtupsize = INT_MAX; state->growmemtuples = false; } } else { /* * This will be the last increment of memtupsize. Abandon doubling * strategy and instead increase as much as we safely can. * * To stay within allowedMem, we can't increase memtupsize by more * than availMem / sizeof(SortTuple) elements. In practice, we want * to increase it by considerably less, because we need to leave some * space for the tuples to which the new array slots will refer. We * assume the new tuples will be about the same size as the tuples * we've already seen, and thus we can extrapolate from the space * consumption so far to estimate an appropriate new size for the * memtuples array. The optimal value might be higher or lower than * this estimate, but it's hard to know that in advance. We again * clamp at INT_MAX tuples. * * This calculation is safe against enlarging the array so much that * LACKMEM becomes true, because the memory currently used includes * the present array; thus, there would be enough allowedMem for the * new array elements even if no other memory were currently used. * * We do the arithmetic in float8, because otherwise the product of * memtupsize and allowedMem could overflow. Any inaccuracy in the * result should be insignificant; but even if we computed a * completely insane result, the checks below will prevent anything * really bad from happening. */ double grow_ratio; grow_ratio = (double) state->allowedMem / (double) memNowUsed; if (memtupsize * grow_ratio < INT_MAX) newmemtupsize = (int) (memtupsize * grow_ratio); else newmemtupsize = INT_MAX; /* We won't make any further enlargement attempts */ state->growmemtuples = false; } /* Must enlarge array by at least one element, else report failure */ if (newmemtupsize <= memtupsize) goto noalloc; /* * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp * to ensure our request won't be rejected. Note that we can easily * exhaust address space before facing this outcome. (This is presently * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but * don't rely on that at this distance.) */ if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple)) { newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple)); state->growmemtuples = false; /* can't grow any more */ } /* * We need to be sure that we do not cause LACKMEM to become true, else * the space management algorithm will go nuts. The code above should * never generate a dangerous request, but to be safe, check explicitly * that the array growth fits within availMem. (We could still cause * LACKMEM if the memory chunk overhead associated with the memtuples * array were to increase. That shouldn't happen because we chose the * initial array size large enough to ensure that palloc will be treating * both old and new arrays as separate chunks. But we'll check LACKMEM * explicitly below just in case.) */ if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple))) goto noalloc; /* OK, do it */ FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->memtupsize = newmemtupsize; state->memtuples = (SortTuple *) repalloc_huge(state->memtuples, state->memtupsize * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); if (LACKMEM(state)) elog(ERROR, "unexpected out-of-memory situation in tuplesort"); return true; noalloc: /* If for any reason we didn't realloc, shut off future attempts */ state->growmemtuples = false; return false; } /* * Accept one tuple while collecting input data for sort. * * Note that the input data is always copied; the caller need not save it. */ void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; /* * Copy the given tuple into memory we control, and decrease availMem. * Then call the common code. */ COPYTUP(state, &stup, (void *) slot); puttuple_common(state, &stup); MemoryContextSwitchTo(oldcontext); } /* * Accept one tuple while collecting input data for sort. * * Note that the input data is always copied; the caller need not save it. */ void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; /* * Copy the given tuple into memory we control, and decrease availMem. * Then call the common code. */ COPYTUP(state, &stup, (void *) tup); puttuple_common(state, &stup); MemoryContextSwitchTo(oldcontext); } /* * Collect one index tuple while collecting input data for sort, building * it from caller-supplied values. */ void tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ItemPointer self, Datum *values, bool *isnull) { MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); SortTuple stup; Datum original; IndexTuple tuple; stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull); tuple = ((IndexTuple) stup.tuple); tuple->t_tid = *self; USEMEM(state, GetMemoryChunkSpace(stup.tuple)); /* set up first-column key value */ original = index_getattr(tuple, 1, RelationGetDescr(state->indexRel), &stup.isnull1); MemoryContextSwitchTo(state->sortcontext); if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) { /* * Store ordinary Datum representation, or NULL value. If there is a * converter it won't expect NULL values, and cost model is not * required to account for NULL, so in that case we avoid calling * converter and just set datum1 to zeroed representation (to be * consistent, and to support cheap inequality tests for NULL * abbreviated keys). */ stup.datum1 = original; } else if (!consider_abort_common(state)) { /* Store abbreviated key representation */ stup.datum1 = state->sortKeys->abbrev_converter(original, state->sortKeys); } else { /* Abort abbreviation */ int i; stup.datum1 = original; /* * Set state to be consistent with never trying abbreviation. * * Alter datum1 representation in already-copied tuples, so as to * ensure a consistent representation (current tuple was just * handled). It does not matter if some dumped tuples are already * sorted on tape, since serialized tuples lack abbreviated keys * (TSS_BUILDRUNS state prevents control reaching here in any case). */ for (i = 0; i < state->memtupcount; i++) { SortTuple *mtup = &state->memtuples[i]; tuple = mtup->tuple; mtup->datum1 = index_getattr(tuple, 1, RelationGetDescr(state->indexRel), &mtup->isnull1); } } puttuple_common(state, &stup); MemoryContextSwitchTo(oldcontext); } /* * Accept one Datum while collecting input data for sort. * * If the Datum is pass-by-ref type, the value will be copied. */ void tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) { MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); SortTuple stup; /* * Pass-by-value types or null values are just stored directly in * stup.datum1 (and stup.tuple is not used and set to NULL). * * Non-null pass-by-reference values need to be copied into memory we * control, and possibly abbreviated. The copied value is pointed to by * stup.tuple and is treated as the canonical copy (e.g. to return via * tuplesort_getdatum or when writing to tape); stup.datum1 gets the * abbreviated value if abbreviation is happening, otherwise it's * identical to stup.tuple. */ if (isNull || !state->tuples) { /* * Set datum1 to zeroed representation for NULLs (to be consistent, * and to support cheap inequality tests for NULL abbreviated keys). */ stup.datum1 = !isNull ? val : (Datum) 0; stup.isnull1 = isNull; stup.tuple = NULL; /* no separate storage */ MemoryContextSwitchTo(state->sortcontext); } else { Datum original = datumCopy(val, false, state->datumTypeLen); stup.isnull1 = false; stup.tuple = DatumGetPointer(original); USEMEM(state, GetMemoryChunkSpace(stup.tuple)); MemoryContextSwitchTo(state->sortcontext); if (!state->sortKeys->abbrev_converter) { stup.datum1 = original; } else if (!consider_abort_common(state)) { /* Store abbreviated key representation */ stup.datum1 = state->sortKeys->abbrev_converter(original, state->sortKeys); } else { /* Abort abbreviation */ int i; stup.datum1 = original; /* * Set state to be consistent with never trying abbreviation. * * Alter datum1 representation in already-copied tuples, so as to * ensure a consistent representation (current tuple was just * handled). It does not matter if some dumped tuples are already * sorted on tape, since serialized tuples lack abbreviated keys * (TSS_BUILDRUNS state prevents control reaching here in any * case). */ for (i = 0; i < state->memtupcount; i++) { SortTuple *mtup = &state->memtuples[i]; mtup->datum1 = PointerGetDatum(mtup->tuple); } } } puttuple_common(state, &stup); MemoryContextSwitchTo(oldcontext); } /* * Shared code for tuple and datum cases. */ static void puttuple_common(Tuplesortstate *state, SortTuple *tuple) { Assert(!LEADER(state)); switch (state->status) { case TSS_INITIAL: /* * Save the tuple into the unsorted array. First, grow the array * as needed. Note that we try to grow the array when there is * still one free slot remaining --- if we fail, there'll still be * room to store the incoming tuple, and then we'll switch to * tape-based operation. */ if (state->memtupcount >= state->memtupsize - 1) { (void) grow_memtuples(state); Assert(state->memtupcount < state->memtupsize); } state->memtuples[state->memtupcount++] = *tuple; /* * Check if it's time to switch over to a bounded heapsort. We do * so if the input tuple count exceeds twice the desired tuple * count (this is a heuristic for where heapsort becomes cheaper * than a quicksort), or if we've just filled workMem and have * enough tuples to meet the bound. * * Note that once we enter TSS_BOUNDED state we will always try to * complete the sort that way. In the worst case, if later input * tuples are larger than earlier ones, this might cause us to * exceed workMem significantly. */ if (state->bounded && (state->memtupcount > state->bound * 2 || (state->memtupcount > state->bound && LACKMEM(state)))) { #ifdef TRACE_SORT if (trace_sort) elog(LOG, "switching to bounded heapsort at %d tuples: %s", state->memtupcount, pg_rusage_show(&state->ru_start)); #endif make_bounded_heap(state); return; } /* * Done if we still fit in available memory and have array slots. */ if (state->memtupcount < state->memtupsize && !LACKMEM(state)) return; /* * Nope; time to switch to tape-based operation. */ inittapes(state, true); /* * Dump all tuples. */ dumptuples(state, false); break; case TSS_BOUNDED: /* * We don't want to grow the array here, so check whether the new * tuple can be discarded before putting it in. This should be a * good speed optimization, too, since when there are many more * input tuples than the bound, most input tuples can be discarded * with just this one comparison. Note that because we currently * have the sort direction reversed, we must check for <= not >=. */ if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0) { /* new tuple <= top of the heap, so we can discard it */ free_sort_tuple(state, tuple); CHECK_FOR_INTERRUPTS(); } else { /* discard top of heap, replacing it with the new tuple */ free_sort_tuple(state, &state->memtuples[0]); tuplesort_heap_replace_top(state, tuple); } break; case TSS_BUILDRUNS: /* * Save the tuple into the unsorted array (there must be space) */ state->memtuples[state->memtupcount++] = *tuple; /* * If we are over the memory limit, dump all tuples. */ dumptuples(state, false); break; default: elog(ERROR, "invalid tuplesort state"); break; } } static bool consider_abort_common(Tuplesortstate *state) { Assert(state->sortKeys[0].abbrev_converter != NULL); Assert(state->sortKeys[0].abbrev_abort != NULL); Assert(state->sortKeys[0].abbrev_full_comparator != NULL); /* * Check effectiveness of abbreviation optimization. Consider aborting * when still within memory limit. */ if (state->status == TSS_INITIAL && state->memtupcount >= state->abbrevNext) { state->abbrevNext *= 2; /* * Check opclass-supplied abbreviation abort routine. It may indicate * that abbreviation should not proceed. */ if (!state->sortKeys->abbrev_abort(state->memtupcount, state->sortKeys)) return false; /* * Finally, restore authoritative comparator, and indicate that * abbreviation is not in play by setting abbrev_converter to NULL */ state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator; state->sortKeys[0].abbrev_converter = NULL; /* Not strictly necessary, but be tidy */ state->sortKeys[0].abbrev_abort = NULL; state->sortKeys[0].abbrev_full_comparator = NULL; /* Give up - expect original pass-by-value representation */ return true; } return false; } /* * All tuples have been provided; finish the sort. */ void tuplesort_performsort(Tuplesortstate *state) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "performsort of worker %d starting: %s", state->worker, pg_rusage_show(&state->ru_start)); #endif switch (state->status) { case TSS_INITIAL: /* * We were able to accumulate all the tuples within the allowed * amount of memory, or leader to take over worker tapes */ if (SERIAL(state)) { /* Just qsort 'em and we're done */ tuplesort_sort_memtuples(state); state->status = TSS_SORTEDINMEM; } else if (WORKER(state)) { /* * Parallel workers must still dump out tuples to tape. No * merge is required to produce single output run, though. */ inittapes(state, false); dumptuples(state, true); worker_nomergeruns(state); state->status = TSS_SORTEDONTAPE; } else { /* * Leader will take over worker tapes and merge worker runs. * Note that mergeruns sets the correct state->status. */ leader_takeover_tapes(state); mergeruns(state); } state->current = 0; state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; break; case TSS_BOUNDED: /* * We were able to accumulate all the tuples required for output * in memory, using a heap to eliminate excess tuples. Now we * have to transform the heap to a properly-sorted array. */ sort_bounded_heap(state); state->current = 0; state->eof_reached = false; state->markpos_offset = 0; state->markpos_eof = false; state->status = TSS_SORTEDINMEM; break; case TSS_BUILDRUNS: /* * Finish tape-based sort. First, flush all tuples remaining in * memory out to tape; then merge until we have a single remaining * run (or, if !randomAccess and !WORKER(), one run per tape). * Note that mergeruns sets the correct state->status. */ dumptuples(state, true); mergeruns(state); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; break; default: elog(ERROR, "invalid tuplesort state"); break; } #ifdef TRACE_SORT if (trace_sort) { if (state->status == TSS_FINALMERGE) elog(LOG, "performsort of worker %d done (except %d-way final merge): %s", state->worker, state->activeTapes, pg_rusage_show(&state->ru_start)); else elog(LOG, "performsort of worker %d done: %s", state->worker, pg_rusage_show(&state->ru_start)); } #endif MemoryContextSwitchTo(oldcontext); } /* * Internal routine to fetch the next tuple in either forward or back * direction into *stup. Returns false if no more tuples. * Returned tuple belongs to tuplesort memory context, and must not be freed * by caller. Note that fetched tuple is stored in memory that may be * recycled by any future fetch. */ static bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup) { unsigned int tuplen; size_t nmoved; Assert(!WORKER(state)); switch (state->status) { case TSS_SORTEDINMEM: Assert(forward || state->randomAccess); Assert(!state->slabAllocatorUsed); if (forward) { if (state->current < state->memtupcount) { *stup = state->memtuples[state->current++]; return true; } state->eof_reached = true; /* * Complain if caller tries to retrieve more tuples than * originally asked for in a bounded sort. This is because * returning EOF here might be the wrong thing. */ if (state->bounded && state->current >= state->bound) elog(ERROR, "retrieved too many tuples in a bounded sort"); return false; } else { if (state->current <= 0) return false; /* * if all tuples are fetched already then we return last * tuple, else - tuple before last returned. */ if (state->eof_reached) state->eof_reached = false; else { state->current--; /* last returned tuple */ if (state->current <= 0) return false; } *stup = state->memtuples[state->current - 1]; return true; } break; case TSS_SORTEDONTAPE: Assert(forward || state->randomAccess); Assert(state->slabAllocatorUsed); /* * The slot that held the tuple that we returned in previous * gettuple call can now be reused. */ if (state->lastReturnedTuple) { RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); state->lastReturnedTuple = NULL; } if (forward) { if (state->eof_reached) return false; if ((tuplen = getlen(state, state->result_tape, true)) != 0) { READTUP(state, stup, state->result_tape, tuplen); /* * Remember the tuple we return, so that we can recycle * its memory on next call. (This can be NULL, in the * !state->tuples case). */ state->lastReturnedTuple = stup->tuple; return true; } else { state->eof_reached = true; return false; } } /* * Backward. * * if all tuples are fetched already then we return last tuple, * else - tuple before last returned. */ if (state->eof_reached) { /* * Seek position is pointing just past the zero tuplen at the * end of file; back up to fetch last tuple's ending length * word. If seek fails we must have a completely empty file. */ nmoved = LogicalTapeBackspace(state->tapeset, state->result_tape, 2 * sizeof(unsigned int)); if (nmoved == 0) return false; else if (nmoved != 2 * sizeof(unsigned int)) elog(ERROR, "unexpected tape position"); state->eof_reached = false; } else { /* * Back up and fetch previously-returned tuple's ending length * word. If seek fails, assume we are at start of file. */ nmoved = LogicalTapeBackspace(state->tapeset, state->result_tape, sizeof(unsigned int)); if (nmoved == 0) return false; else if (nmoved != sizeof(unsigned int)) elog(ERROR, "unexpected tape position"); tuplen = getlen(state, state->result_tape, false); /* * Back up to get ending length word of tuple before it. */ nmoved = LogicalTapeBackspace(state->tapeset, state->result_tape, tuplen + 2 * sizeof(unsigned int)); if (nmoved == tuplen + sizeof(unsigned int)) { /* * We backed up over the previous tuple, but there was no * ending length word before it. That means that the prev * tuple is the first tuple in the file. It is now the * next to read in forward direction (not obviously right, * but that is what in-memory case does). */ return false; } else if (nmoved != tuplen + 2 * sizeof(unsigned int)) elog(ERROR, "bogus tuple length in backward scan"); } tuplen = getlen(state, state->result_tape, false); /* * Now we have the length of the prior tuple, back up and read it. * Note: READTUP expects we are positioned after the initial * length word of the tuple, so back up to that point. */ nmoved = LogicalTapeBackspace(state->tapeset, state->result_tape, tuplen); if (nmoved != tuplen) elog(ERROR, "bogus tuple length in backward scan"); READTUP(state, stup, state->result_tape, tuplen); /* * Remember the tuple we return, so that we can recycle its memory * on next call. (This can be NULL, in the Datum case). */ state->lastReturnedTuple = stup->tuple; return true; case TSS_FINALMERGE: Assert(forward); /* We are managing memory ourselves, with the slab allocator. */ Assert(state->slabAllocatorUsed); /* * The slab slot holding the tuple that we returned in previous * gettuple call can now be reused. */ if (state->lastReturnedTuple) { RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); state->lastReturnedTuple = NULL; } /* * This code should match the inner loop of mergeonerun(). */ if (state->memtupcount > 0) { int srcTape = state->memtuples[0].tupindex; SortTuple newtup; *stup = state->memtuples[0]; /* * Remember the tuple we return, so that we can recycle its * memory on next call. (This can be NULL, in the Datum case). */ state->lastReturnedTuple = stup->tuple; /* * Pull next tuple from tape, and replace the returned tuple * at top of the heap with it. */ if (!mergereadnext(state, srcTape, &newtup)) { /* * If no more data, we've reached end of run on this tape. * Remove the top node from the heap. */ tuplesort_heap_delete_top(state); /* * Rewind to free the read buffer. It'd go away at the * end of the sort anyway, but better to release the * memory early. */ LogicalTapeRewindForWrite(state->tapeset, srcTape); return true; } newtup.tupindex = srcTape; tuplesort_heap_replace_top(state, &newtup); return true; } return false; default: elog(ERROR, "invalid tuplesort state"); return false; /* keep compiler quiet */ } } /* * Fetch the next tuple in either forward or back direction. * If successful, put tuple in slot and return true; else, clear the slot * and return false. * * Caller may optionally be passed back abbreviated value (on true return * value) when abbreviation was used, which can be used to cheaply avoid * equality checks that might otherwise be required. Caller can safely make a * determination of "non-equal tuple" based on simple binary inequality. A * NULL value in leading attribute will set abbreviated value to zeroed * representation, which caller may rely on in abbreviated inequality check. * * If copy is true, the slot receives a tuple that's been copied into the * caller's memory context, so that it will stay valid regardless of future * manipulations of the tuplesort's state (up to and including deleting the * tuplesort). If copy is false, the slot will just receive a pointer to a * tuple held within the tuplesort, which is more efficient, but only safe for * callers that are prepared to have any subsequent manipulation of the * tuplesort's state invalidate slot contents. */ bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; MemoryContextSwitchTo(oldcontext); if (stup.tuple) { /* Record abbreviated key for caller */ if (state->sortKeys->abbrev_converter && abbrev) *abbrev = stup.datum1; if (copy) stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple); ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy); return true; } else { ExecClearTuple(slot); return false; } } /* * Fetch the next tuple in either forward or back direction. * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory * context, and must not be freed by caller. Caller may not rely on tuple * remaining valid after any further manipulation of tuplesort. */ HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; MemoryContextSwitchTo(oldcontext); return stup.tuple; } /* * Fetch the next index tuple in either forward or back direction. * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory * context, and must not be freed by caller. Caller may not rely on tuple * remaining valid after any further manipulation of tuplesort. */ IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; MemoryContextSwitchTo(oldcontext); return (IndexTuple) stup.tuple; } /* * Fetch the next Datum in either forward or back direction. * Returns false if no more datums. * * If the Datum is pass-by-ref type, the returned value is freshly palloc'd * in caller's context, and is now owned by the caller (this differs from * similar routines for other types of tuplesorts). * * Caller may optionally be passed back abbreviated value (on true return * value) when abbreviation was used, which can be used to cheaply avoid * equality checks that might otherwise be required. Caller can safely make a * determination of "non-equal tuple" based on simple binary inequality. A * NULL value will have a zeroed abbreviated value representation, which caller * may rely on in abbreviated inequality check. */ bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; if (!tuplesort_gettuple_common(state, forward, &stup)) { MemoryContextSwitchTo(oldcontext); return false; } /* Ensure we copy into caller's memory context */ MemoryContextSwitchTo(oldcontext); /* Record abbreviated key for caller */ if (state->sortKeys->abbrev_converter && abbrev) *abbrev = stup.datum1; if (stup.isnull1 || !state->tuples) { *val = stup.datum1; *isNull = stup.isnull1; } else { /* use stup.tuple because stup.datum1 may be an abbreviation */ *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen); *isNull = false; } return true; } /* * Advance over N tuples in either forward or back direction, * without returning any data. N==0 is a no-op. * Returns true if successful, false if ran out of tuples. */ bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward) { MemoryContext oldcontext; /* * We don't actually support backwards skip yet, because no callers need * it. The API is designed to allow for that later, though. */ Assert(forward); Assert(ntuples >= 0); Assert(!WORKER(state)); switch (state->status) { case TSS_SORTEDINMEM: if (state->memtupcount - state->current >= ntuples) { state->current += ntuples; return true; } state->current = state->memtupcount; state->eof_reached = true; /* * Complain if caller tries to retrieve more tuples than * originally asked for in a bounded sort. This is because * returning EOF here might be the wrong thing. */ if (state->bounded && state->current >= state->bound) elog(ERROR, "retrieved too many tuples in a bounded sort"); return false; case TSS_SORTEDONTAPE: case TSS_FINALMERGE: /* * We could probably optimize these cases better, but for now it's * not worth the trouble. */ oldcontext = MemoryContextSwitchTo(state->sortcontext); while (ntuples-- > 0) { SortTuple stup; if (!tuplesort_gettuple_common(state, forward, &stup)) { MemoryContextSwitchTo(oldcontext); return false; } CHECK_FOR_INTERRUPTS(); } MemoryContextSwitchTo(oldcontext); return true; default: elog(ERROR, "invalid tuplesort state"); return false; /* keep compiler quiet */ } } /* * tuplesort_merge_order - report merge order we'll use for given memory * (note: "merge order" just means the number of input tapes in the merge). * * This is exported for use by the planner. allowedMem is in bytes. */ int tuplesort_merge_order(int64 allowedMem) { int mOrder; /* * We need one tape for each merge input, plus another one for the output, * and each of these tapes needs buffer space. In addition we want * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't * count). * * Note: you might be thinking we need to account for the memtuples[] * array in this calculation, but we effectively treat that as part of the * MERGE_BUFFER_SIZE workspace. */ mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); /* * Even in minimum memory, use at least a MINORDER merge. On the other * hand, even when we have lots of memory, do not use more than a MAXORDER * merge. Tapes are pretty cheap, but they're not entirely free. Each * additional tape reduces the amount of memory available to build runs, * which in turn can cause the same sort to need more runs, which makes * merging slower even if it can still be done in a single pass. Also, * high order merges are quite slow due to CPU cache effects; it can be * faster to pay the I/O cost of a polyphase merge than to perform a * single merge pass across many hundreds of tapes. */ mOrder = Max(mOrder, MINORDER); mOrder = Min(mOrder, MAXORDER); return mOrder; } /* * inittapes - initialize for tape sorting. * * This is called only if we have found we won't sort in memory. */ static void inittapes(Tuplesortstate *state, bool mergeruns) { int maxTapes, j; Assert(!LEADER(state)); if (mergeruns) { /* Compute number of tapes to use: merge order plus 1 */ maxTapes = tuplesort_merge_order(state->allowedMem) + 1; } else { /* Workers can sometimes produce single run, output without merge */ Assert(WORKER(state)); maxTapes = MINORDER + 1; } #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d switching to external sort with %d tapes: %s", state->worker, maxTapes, pg_rusage_show(&state->ru_start)); #endif /* Create the tape set and allocate the per-tape data arrays */ inittapestate(state, maxTapes); state->tapeset = LogicalTapeSetCreate(maxTapes, NULL, state->shared ? &state->shared->fileset : NULL, state->worker); state->currentRun = 0; /* * Initialize variables of Algorithm D (step D1). */ for (j = 0; j < maxTapes; j++) { state->tp_fib[j] = 1; state->tp_runs[j] = 0; state->tp_dummy[j] = 1; state->tp_tapenum[j] = j; } state->tp_fib[state->tapeRange] = 0; state->tp_dummy[state->tapeRange] = 0; state->Level = 1; state->destTape = 0; state->status = TSS_BUILDRUNS; } /* * inittapestate - initialize generic tape management state */ static void inittapestate(Tuplesortstate *state, int maxTapes) { int64 tapeSpace; /* * Decrease availMem to reflect the space needed for tape buffers; but * don't decrease it to the point that we have no room for tuples. (That * case is only likely to occur if sorting pass-by-value Datums; in all * other scenarios the memtuples[] array is unlikely to occupy more than * half of allowedMem. In the pass-by-value case it's not important to * account for tuple space, so we don't care if LACKMEM becomes * inaccurate.) */ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) USEMEM(state, tapeSpace); /* * Make sure that the temp file(s) underlying the tape set are created in * suitable temp tablespaces. For parallel sorts, this should have been * called already, but it doesn't matter if it is called a second time. */ PrepareTempTablespaces(); state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); /* Record # of tapes allocated (for duration of sort) */ state->maxTapes = maxTapes; /* Record maximum # of tapes usable as inputs when merging */ state->tapeRange = maxTapes - 1; } /* * selectnewtape -- select new tape for new initial run. * * This is called after finishing a run when we know another run * must be started. This implements steps D3, D4 of Algorithm D. */ static void selectnewtape(Tuplesortstate *state) { int j; int a; /* Step D3: advance j (destTape) */ if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) { state->destTape++; return; } if (state->tp_dummy[state->destTape] != 0) { state->destTape = 0; return; } /* Step D4: increase level */ state->Level++; a = state->tp_fib[0]; for (j = 0; j < state->tapeRange; j++) { state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; state->tp_fib[j] = a + state->tp_fib[j + 1]; } state->destTape = 0; } /* * Initialize the slab allocation arena, for the given number of slots. */ static void init_slab_allocator(Tuplesortstate *state, int numSlots) { if (numSlots > 0) { char *p; int i; state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE); state->slabMemoryEnd = state->slabMemoryBegin + numSlots * SLAB_SLOT_SIZE; state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; USEMEM(state, numSlots * SLAB_SLOT_SIZE); p = state->slabMemoryBegin; for (i = 0; i < numSlots - 1; i++) { ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); p += SLAB_SLOT_SIZE; } ((SlabSlot *) p)->nextfree = NULL; } else { state->slabMemoryBegin = state->slabMemoryEnd = NULL; state->slabFreeHead = NULL; } state->slabAllocatorUsed = true; } /* * mergeruns -- merge all the completed initial runs. * * This implements steps D5, D6 of Algorithm D. All input data has * already been written to initial runs on tape (see dumptuples). */ static void mergeruns(Tuplesortstate *state) { int tapenum, svTape, svRuns, svDummy; int numTapes; int numInputTapes; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL) { /* * If there are multiple runs to be merged, when we go to read back * tuples from disk, abbreviated keys will not have been stored, and * we don't care to regenerate them. Disable abbreviation from this * point on. */ state->sortKeys->abbrev_converter = NULL; state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; /* Not strictly necessary, but be tidy */ state->sortKeys->abbrev_abort = NULL; state->sortKeys->abbrev_full_comparator = NULL; } /* * Reset tuple memory. We've freed all the tuples that we previously * allocated. We will use the slab allocator from now on. */ MemoryContextDelete(state->tuplecontext); state->tuplecontext = NULL; /* * We no longer need a large memtuples array. (We will allocate a smaller * one for the heap later.) */ FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); pfree(state->memtuples); state->memtuples = NULL; /* * If we had fewer runs than tapes, refund the memory that we imagined we * would need for the tape buffers of the unused tapes. * * numTapes and numInputTapes reflect the actual number of tapes we will * use. Note that the output tape's tape number is maxTapes - 1, so the * tape numbers of the used tapes are not consecutive, and you cannot just * loop from 0 to numTapes to visit all used tapes! */ if (state->Level == 1) { numInputTapes = state->currentRun; numTapes = numInputTapes + 1; FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); } else { numInputTapes = state->tapeRange; numTapes = state->maxTapes; } /* * Initialize the slab allocator. We need one slab slot per input tape, * for the tuples in the heap, plus one to hold the tuple last returned * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, * however, we don't need to do allocate anything.) * * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism * to track memory usage of individual tuples. */ if (state->tuples) init_slab_allocator(state, numInputTapes + 1); else init_slab_allocator(state, 0); /* * Allocate a new 'memtuples' array, for the heap. It will hold one tuple * from each input tape. */ state->memtupsize = numInputTapes; state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); /* * Use all the remaining memory we have available for read buffers among * the input tapes. * * We don't try to "rebalance" the memory among tapes, when we start a new * merge phase, even if some tapes are inactive in the new phase. That * would be hard, because logtape.c doesn't know where one run ends and * another begins. When a new merge phase begins, and a tape doesn't * participate in it, its buffer nevertheless already contains tuples from * the next run on same tape, so we cannot release the buffer. That's OK * in practice, merge performance isn't that sensitive to the amount of * buffers used, and most merge phases use all or almost all tapes, * anyway. */ #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", state->worker, state->availMem / 1024, numInputTapes); #endif state->read_buffer_size = Max(state->availMem / numInputTapes, 0); USEMEM(state, state->read_buffer_size * numInputTapes); /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); for (;;) { /* * At this point we know that tape[T] is empty. If there's just one * (real or dummy) run left on each input tape, then only one merge * pass remains. If we don't have to produce a materialized sorted * tape, we can stop at this point and do the final merge on-the-fly. */ if (!state->randomAccess && !WORKER(state)) { bool allOneRun = true; Assert(state->tp_runs[state->tapeRange] == 0); for (tapenum = 0; tapenum < state->tapeRange; tapenum++) { if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) { allOneRun = false; break; } } if (allOneRun) { /* Tell logtape.c we won't be writing anymore */ LogicalTapeSetForgetFreeSpace(state->tapeset); /* Initialize for the final merge pass */ beginmerge(state); state->status = TSS_FINALMERGE; return; } } /* Step D5: merge runs onto tape[T] until tape[P] is empty */ while (state->tp_runs[state->tapeRange - 1] || state->tp_dummy[state->tapeRange - 1]) { bool allDummy = true; for (tapenum = 0; tapenum < state->tapeRange; tapenum++) { if (state->tp_dummy[tapenum] == 0) { allDummy = false; break; } } if (allDummy) { state->tp_dummy[state->tapeRange]++; for (tapenum = 0; tapenum < state->tapeRange; tapenum++) state->tp_dummy[tapenum]--; } else mergeonerun(state); } /* Step D6: decrease level */ if (--state->Level == 0) break; /* rewind output tape T to use as new input */ LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], state->read_buffer_size); /* rewind used-up input tape P, and prepare it for write pass */ LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); state->tp_runs[state->tapeRange - 1] = 0; /* * reassign tape units per step D6; note we no longer care about A[] */ svTape = state->tp_tapenum[state->tapeRange]; svDummy = state->tp_dummy[state->tapeRange]; svRuns = state->tp_runs[state->tapeRange]; for (tapenum = state->tapeRange; tapenum > 0; tapenum--) { state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; } state->tp_tapenum[0] = svTape; state->tp_dummy[0] = svDummy; state->tp_runs[0] = svRuns; } /* * Done. Knuth says that the result is on TAPE[1], but since we exited * the loop without performing the last iteration of step D6, we have not * rearranged the tape unit assignment, and therefore the result is on * TAPE[T]. We need to do it this way so that we can freeze the final * output tape while rewinding it. The last iteration of step D6 would be * a waste of cycles anyway... */ state->result_tape = state->tp_tapenum[state->tapeRange]; if (!WORKER(state)) LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); else worker_freeze_result_tape(state); state->status = TSS_SORTEDONTAPE; /* Release the read buffers of all the other tapes, by rewinding them. */ for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { if (tapenum != state->result_tape) LogicalTapeRewindForWrite(state->tapeset, tapenum); } } /* * Merge one run from each input tape, except ones with dummy runs. * * This is the inner loop of Algorithm D step D5. We know that the * output tape is TAPE[T]. */ static void mergeonerun(Tuplesortstate *state) { int destTape = state->tp_tapenum[state->tapeRange]; int srcTape; /* * Start the merge by loading one tuple from each active source tape into * the heap. We can also decrease the input run/dummy run counts. */ beginmerge(state); /* * Execute merge by repeatedly extracting lowest tuple in heap, writing it * out, and replacing it with next tuple from same tape (if there is * another one). */ while (state->memtupcount > 0) { SortTuple stup; /* write the tuple to destTape */ srcTape = state->memtuples[0].tupindex; WRITETUP(state, destTape, &state->memtuples[0]); /* recycle the slot of the tuple we just wrote out, for the next read */ if (state->memtuples[0].tuple) RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); /* * pull next tuple from the tape, and replace the written-out tuple in * the heap with it. */ if (mergereadnext(state, srcTape, &stup)) { stup.tupindex = srcTape; tuplesort_heap_replace_top(state, &stup); } else tuplesort_heap_delete_top(state); } /* * When the heap empties, we're done. Write an end-of-run marker on the * output tape, and increment its count of real runs. */ markrunend(state, destTape); state->tp_runs[state->tapeRange]++; #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d finished %d-way merge step: %s", state->worker, state->activeTapes, pg_rusage_show(&state->ru_start)); #endif } /* * beginmerge - initialize for a merge pass * * We decrease the counts of real and dummy runs for each tape, and mark * which tapes contain active input runs in mergeactive[]. Then, fill the * merge heap with the first tuple from each active tape. */ static void beginmerge(Tuplesortstate *state) { int activeTapes; int tapenum; int srcTape; /* Heap should be empty here */ Assert(state->memtupcount == 0); /* Adjust run counts and mark the active tapes */ memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive)); activeTapes = 0; for (tapenum = 0; tapenum < state->tapeRange; tapenum++) { if (state->tp_dummy[tapenum] > 0) state->tp_dummy[tapenum]--; else { Assert(state->tp_runs[tapenum] > 0); state->tp_runs[tapenum]--; srcTape = state->tp_tapenum[tapenum]; state->mergeactive[srcTape] = true; activeTapes++; } } Assert(activeTapes > 0); state->activeTapes = activeTapes; /* Load the merge heap with the first tuple from each input tape */ for (srcTape = 0; srcTape < state->maxTapes; srcTape++) { SortTuple tup; if (mergereadnext(state, srcTape, &tup)) { tup.tupindex = srcTape; tuplesort_heap_insert(state, &tup); } } } /* * mergereadnext - read next tuple from one merge input tape * * Returns false on EOF. */ static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) { unsigned int tuplen; if (!state->mergeactive[srcTape]) return false; /* tape's run is already exhausted */ /* read next tuple, if any */ if ((tuplen = getlen(state, srcTape, true)) == 0) { state->mergeactive[srcTape] = false; return false; } READTUP(state, stup, srcTape, tuplen); return true; } /* * dumptuples - remove tuples from memtuples and write initial run to tape * * When alltuples = true, dump everything currently in memory. (This case is * only used at end of input data.) */ static void dumptuples(Tuplesortstate *state, bool alltuples) { int memtupwrite; int i; /* * Nothing to do if we still fit in available memory and have array slots, * unless this is the final call during initial run generation. */ if (state->memtupcount < state->memtupsize && !LACKMEM(state) && !alltuples) return; /* * Final call might require no sorting, in rare cases where we just so * happen to have previously LACKMEM()'d at the point where exactly all * remaining tuples are loaded into memory, just before input was * exhausted. * * In general, short final runs are quite possible. Rather than allowing * a special case where there was a superfluous selectnewtape() call (i.e. * a call with no subsequent run actually written to destTape), we prefer * to write out a 0 tuple run. * * mergereadnext() is prepared for 0 tuple runs, and will reliably mark * the tape inactive for the merge when called from beginmerge(). This * case is therefore similar to the case where mergeonerun() finds a dummy * run for the tape, and so doesn't need to merge a run from the tape (or * conceptually "merges" the dummy run, if you prefer). According to * Knuth, Algorithm D "isn't strictly optimal" in its method of * distribution and dummy run assignment; this edge case seems very * unlikely to make that appreciably worse. */ Assert(state->status == TSS_BUILDRUNS); /* * It seems unlikely that this limit will ever be exceeded, but take no * chances */ if (state->currentRun == INT_MAX) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("cannot have more than %d runs for an external sort", INT_MAX))); state->currentRun++; #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d starting quicksort of run %d: %s", state->worker, state->currentRun, pg_rusage_show(&state->ru_start)); #endif /* * Sort all tuples accumulated within the allowed amount of memory for * this run using quicksort */ tuplesort_sort_memtuples(state); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d finished quicksort of run %d: %s", state->worker, state->currentRun, pg_rusage_show(&state->ru_start)); #endif memtupwrite = state->memtupcount; for (i = 0; i < memtupwrite; i++) { WRITETUP(state, state->tp_tapenum[state->destTape], &state->memtuples[i]); state->memtupcount--; } /* * Reset tuple memory. We've freed all of the tuples that we previously * allocated. It's important to avoid fragmentation when there is a stark * change in the sizes of incoming tuples. Fragmentation due to * AllocSetFree's bucketing by size class might be particularly bad if * this step wasn't taken. */ MemoryContextReset(state->tuplecontext); markrunend(state, state->tp_tapenum[state->destTape]); state->tp_runs[state->destTape]++; state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d finished writing run %d to tape %d: %s", state->worker, state->currentRun, state->destTape, pg_rusage_show(&state->ru_start)); #endif if (!alltuples) selectnewtape(state); } /* * tuplesort_rescan - rewind and replay the scan */ void tuplesort_rescan(Tuplesortstate *state) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->current = 0; state->eof_reached = false; state->markpos_offset = 0; state->markpos_eof = false; break; case TSS_SORTEDONTAPE: LogicalTapeRewindForRead(state->tapeset, state->result_tape, 0); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; break; default: elog(ERROR, "invalid tuplesort state"); break; } MemoryContextSwitchTo(oldcontext); } /* * tuplesort_markpos - saves current position in the merged sort file */ void tuplesort_markpos(Tuplesortstate *state) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->markpos_offset = state->current; state->markpos_eof = state->eof_reached; break; case TSS_SORTEDONTAPE: LogicalTapeTell(state->tapeset, state->result_tape, &state->markpos_block, &state->markpos_offset); state->markpos_eof = state->eof_reached; break; default: elog(ERROR, "invalid tuplesort state"); break; } MemoryContextSwitchTo(oldcontext); } /* * tuplesort_restorepos - restores current position in merged sort file to * last saved position */ void tuplesort_restorepos(Tuplesortstate *state) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->current = state->markpos_offset; state->eof_reached = state->markpos_eof; break; case TSS_SORTEDONTAPE: LogicalTapeSeek(state->tapeset, state->result_tape, state->markpos_block, state->markpos_offset); state->eof_reached = state->markpos_eof; break; default: elog(ERROR, "invalid tuplesort state"); break; } MemoryContextSwitchTo(oldcontext); } /* * tuplesort_get_stats - extract summary statistics * * This can be called after tuplesort_performsort() finishes to obtain * printable summary information about how the sort was performed. */ void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats) { /* * Note: it might seem we should provide both memory and disk usage for a * disk-based sort. However, the current code doesn't track memory space * accurately once we have begun to return tuples to the caller (since we * don't account for pfree's the caller is expected to do), so we cannot * rely on availMem in a disk sort. This does not seem worth the overhead * to fix. Is it worth creating an API for the memory context code to * tell us how much is actually used in sortcontext? */ if (state->tapeset) { stats->spaceType = SORT_SPACE_TYPE_DISK; stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024); } else { stats->spaceType = SORT_SPACE_TYPE_MEMORY; stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; } switch (state->status) { case TSS_SORTEDINMEM: if (state->boundUsed) stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT; else stats->sortMethod = SORT_TYPE_QUICKSORT; break; case TSS_SORTEDONTAPE: stats->sortMethod = SORT_TYPE_EXTERNAL_SORT; break; case TSS_FINALMERGE: stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE; break; default: stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS; break; } } /* * Convert TuplesortMethod to a string. */ const char * tuplesort_method_name(TuplesortMethod m) { switch (m) { case SORT_TYPE_STILL_IN_PROGRESS: return "still in progress"; case SORT_TYPE_TOP_N_HEAPSORT: return "top-N heapsort"; case SORT_TYPE_QUICKSORT: return "quicksort"; case SORT_TYPE_EXTERNAL_SORT: return "external sort"; case SORT_TYPE_EXTERNAL_MERGE: return "external merge"; } return "unknown"; } /* * Convert TuplesortSpaceType to a string. */ const char * tuplesort_space_type_name(TuplesortSpaceType t) { Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY); return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory"; } /* * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. */ /* * Convert the existing unordered array of SortTuples to a bounded heap, * discarding all but the smallest "state->bound" tuples. * * When working with a bounded heap, we want to keep the largest entry * at the root (array entry zero), instead of the smallest as in the normal * sort case. This allows us to discard the largest entry cheaply. * Therefore, we temporarily reverse the sort direction. */ static void make_bounded_heap(Tuplesortstate *state) { int tupcount = state->memtupcount; int i; Assert(state->status == TSS_INITIAL); Assert(state->bounded); Assert(tupcount >= state->bound); Assert(SERIAL(state)); /* Reverse sort direction so largest entry will be at root */ reversedirection(state); state->memtupcount = 0; /* make the heap empty */ for (i = 0; i < tupcount; i++) { if (state->memtupcount < state->bound) { /* Insert next tuple into heap */ /* Must copy source tuple to avoid possible overwrite */ SortTuple stup = state->memtuples[i]; tuplesort_heap_insert(state, &stup); } else { /* * The heap is full. Replace the largest entry with the new * tuple, or just discard it, if it's larger than anything already * in the heap. */ if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) { free_sort_tuple(state, &state->memtuples[i]); CHECK_FOR_INTERRUPTS(); } else tuplesort_heap_replace_top(state, &state->memtuples[i]); } } Assert(state->memtupcount == state->bound); state->status = TSS_BOUNDED; } /* * Convert the bounded heap to a properly-sorted array */ static void sort_bounded_heap(Tuplesortstate *state) { int tupcount = state->memtupcount; Assert(state->status == TSS_BOUNDED); Assert(state->bounded); Assert(tupcount == state->bound); Assert(SERIAL(state)); /* * We can unheapify in place because each delete-top call will remove the * largest entry, which we can promptly store in the newly freed slot at * the end. Once we're down to a single-entry heap, we're done. */ while (state->memtupcount > 1) { SortTuple stup = state->memtuples[0]; /* this sifts-up the next-largest entry and decreases memtupcount */ tuplesort_heap_delete_top(state); state->memtuples[state->memtupcount] = stup; } state->memtupcount = tupcount; /* * Reverse sort direction back to the original state. This is not * actually necessary but seems like a good idea for tidiness. */ reversedirection(state); state->status = TSS_SORTEDINMEM; state->boundUsed = true; } /* * Sort all memtuples using specialized qsort() routines. * * Quicksort is used for small in-memory sorts, and external sort runs. */ static void tuplesort_sort_memtuples(Tuplesortstate *state) { Assert(!LEADER(state)); if (state->memtupcount > 1) { /* Can we use the single-key sort function? */ if (state->onlyKey != NULL) qsort_ssup(state->memtuples, state->memtupcount, state->onlyKey); else qsort_tuple(state->memtuples, state->memtupcount, state->comparetup, state); } } /* * Insert a new tuple into an empty or existing heap, maintaining the * heap invariant. Caller is responsible for ensuring there's room. * * Note: For some callers, tuple points to a memtuples[] entry above the * end of the heap. This is safe as long as it's not immediately adjacent * to the end of the heap (ie, in the [memtupcount] array entry) --- if it * is, it might get overwritten before being moved into the heap! */ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple) { SortTuple *memtuples; int j; memtuples = state->memtuples; Assert(state->memtupcount < state->memtupsize); CHECK_FOR_INTERRUPTS(); /* * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is * using 1-based array indexes, not 0-based. */ j = state->memtupcount++; while (j > 0) { int i = (j - 1) >> 1; if (COMPARETUP(state, tuple, &memtuples[i]) >= 0) break; memtuples[j] = memtuples[i]; j = i; } memtuples[j] = *tuple; } /* * Remove the tuple at state->memtuples[0] from the heap. Decrement * memtupcount, and sift up to maintain the heap invariant. * * The caller has already free'd the tuple the top node points to, * if necessary. */ static void tuplesort_heap_delete_top(Tuplesortstate *state) { SortTuple *memtuples = state->memtuples; SortTuple *tuple; if (--state->memtupcount <= 0) return; /* * Remove the last tuple in the heap, and re-insert it, by replacing the * current top node with it. */ tuple = &memtuples[state->memtupcount]; tuplesort_heap_replace_top(state, tuple); } /* * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to * maintain the heap invariant. * * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H, * Heapsort, steps H3-H8). */ static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple) { SortTuple *memtuples = state->memtuples; unsigned int i, n; Assert(state->memtupcount >= 1); CHECK_FOR_INTERRUPTS(); /* * state->memtupcount is "int", but we use "unsigned int" for i, j, n. * This prevents overflow in the "2 * i + 1" calculation, since at the top * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2. */ n = state->memtupcount; i = 0; /* i is where the "hole" is */ for (;;) { unsigned int j = 2 * i + 1; if (j >= n) break; if (j + 1 < n && COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0) j++; if (COMPARETUP(state, tuple, &memtuples[j]) <= 0) break; memtuples[i] = memtuples[j]; i = j; } memtuples[i] = *tuple; } /* * Function to reverse the sort direction from its current state * * It is not safe to call this when performing hash tuplesorts */ static void reversedirection(Tuplesortstate *state) { SortSupport sortKey = state->sortKeys; int nkey; for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) { sortKey->ssup_reverse = !sortKey->ssup_reverse; sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; } } /* * Tape interface routines */ static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK) { unsigned int len; if (LogicalTapeRead(state->tapeset, tapenum, &len, sizeof(len)) != sizeof(len)) elog(ERROR, "unexpected end of tape"); if (len == 0 && !eofOK) elog(ERROR, "unexpected end of data"); return len; } static void markrunend(Tuplesortstate *state, int tapenum) { unsigned int len = 0; LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); } /* * Get memory for tuple from within READTUP() routine. * * We use next free slot from the slab allocator, or palloc() if the tuple * is too large for that. */ static void * readtup_alloc(Tuplesortstate *state, Size tuplen) { SlabSlot *buf; /* * We pre-allocate enough slots in the slab arena that we should never run * out. */ Assert(state->slabFreeHead); if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) return MemoryContextAlloc(state->sortcontext, tuplen); else { buf = state->slabFreeHead; /* Reuse this slot */ state->slabFreeHead = buf->nextfree; return buf; } } /* * Routines specialized for HeapTuple (actually MinimalTuple) case */ static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { SortSupport sortKey = state->sortKeys; HeapTupleData ltup; HeapTupleData rtup; TupleDesc tupDesc; int nkey; int32 compare; AttrNumber attno; Datum datum1, datum2; bool isnull1, isnull2; /* Compare the leading sort key */ compare = ApplySortComparator(a->datum1, a->isnull1, b->datum1, b->isnull1, sortKey); if (compare != 0) return compare; /* Compare additional sort keys */ ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET); rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET; rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET); tupDesc = state->tupDesc; if (sortKey->abbrev_converter) { attno = sortKey->ssup_attno; datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); compare = ApplySortAbbrevFullComparator(datum1, isnull1, datum2, isnull2, sortKey); if (compare != 0) return compare; } sortKey++; for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++) { attno = sortKey->ssup_attno; datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); compare = ApplySortComparator(datum1, isnull1, datum2, isnull2, sortKey); if (compare != 0) return compare; } return 0; } static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) { /* * We expect the passed "tup" to be a TupleTableSlot, and form a * MinimalTuple using the exported interface for that. */ TupleTableSlot *slot = (TupleTableSlot *) tup; Datum original; MinimalTuple tuple; HeapTupleData htup; MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); /* copy the tuple into sort storage */ tuple = ExecCopySlotMinimalTuple(slot); stup->tuple = (void *) tuple; USEMEM(state, GetMemoryChunkSpace(tuple)); /* set up first-column key value */ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); original = heap_getattr(&htup, state->sortKeys[0].ssup_attno, state->tupDesc, &stup->isnull1); MemoryContextSwitchTo(oldcontext); if (!state->sortKeys->abbrev_converter || stup->isnull1) { /* * Store ordinary Datum representation, or NULL value. If there is a * converter it won't expect NULL values, and cost model is not * required to account for NULL, so in that case we avoid calling * converter and just set datum1 to zeroed representation (to be * consistent, and to support cheap inequality tests for NULL * abbreviated keys). */ stup->datum1 = original; } else if (!consider_abort_common(state)) { /* Store abbreviated key representation */ stup->datum1 = state->sortKeys->abbrev_converter(original, state->sortKeys); } else { /* Abort abbreviation */ int i; stup->datum1 = original; /* * Set state to be consistent with never trying abbreviation. * * Alter datum1 representation in already-copied tuples, so as to * ensure a consistent representation (current tuple was just * handled). It does not matter if some dumped tuples are already * sorted on tape, since serialized tuples lack abbreviated keys * (TSS_BUILDRUNS state prevents control reaching here in any case). */ for (i = 0; i < state->memtupcount; i++) { SortTuple *mtup = &state->memtuples[i]; htup.t_len = ((MinimalTuple) mtup->tuple)->t_len + MINIMAL_TUPLE_OFFSET; htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple - MINIMAL_TUPLE_OFFSET); mtup->datum1 = heap_getattr(&htup, state->sortKeys[0].ssup_attno, state->tupDesc, &mtup->isnull1); } } } static void writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) { MinimalTuple tuple = (MinimalTuple) stup->tuple; /* the part of the MinimalTuple we'll write: */ char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; /* total on-disk footprint: */ unsigned int tuplen = tupbodylen + sizeof(int); LogicalTapeWrite(state->tapeset, tapenum, (void *) &tuplen, sizeof(tuplen)); LogicalTapeWrite(state->tapeset, tapenum, (void *) tupbody, tupbodylen); if (state->randomAccess) /* need trailing length word? */ LogicalTapeWrite(state->tapeset, tapenum, (void *) &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { FREEMEM(state, GetMemoryChunkSpace(tuple)); heap_free_minimal_tuple(tuple); } } static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len) { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; HeapTupleData htup; /* read in the tuple proper */ tuple->t_len = tuplen; LogicalTapeReadExact(state->tapeset, tapenum, tupbody, tupbodylen); if (state->randomAccess) /* need trailing length word? */ LogicalTapeReadExact(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value */ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); stup->datum1 = heap_getattr(&htup, state->sortKeys[0].ssup_attno, state->tupDesc, &stup->isnull1); } /* * Routines specialized for the CLUSTER case (HeapTuple data, with * comparisons per a btree index definition) */ static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { SortSupport sortKey = state->sortKeys; HeapTuple ltup; HeapTuple rtup; TupleDesc tupDesc; int nkey; int32 compare; Datum datum1, datum2; bool isnull1, isnull2; AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0]; /* Be prepared to compare additional sort keys */ ltup = (HeapTuple) a->tuple; rtup = (HeapTuple) b->tuple; tupDesc = state->tupDesc; /* Compare the leading sort key, if it's simple */ if (leading != 0) { compare = ApplySortComparator(a->datum1, a->isnull1, b->datum1, b->isnull1, sortKey); if (compare != 0) return compare; if (sortKey->abbrev_converter) { datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1); datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2); compare = ApplySortAbbrevFullComparator(datum1, isnull1, datum2, isnull2, sortKey); } if (compare != 0 || state->nKeys == 1) return compare; /* Compare additional columns the hard way */ sortKey++; nkey = 1; } else { /* Must compare all keys the hard way */ nkey = 0; } if (state->indexInfo->ii_Expressions == NULL) { /* If not expression index, just compare the proper heap attrs */ for (; nkey < state->nKeys; nkey++, sortKey++) { AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey]; datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); compare = ApplySortComparator(datum1, isnull1, datum2, isnull2, sortKey); if (compare != 0) return compare; } } else { /* * In the expression index case, compute the whole index tuple and * then compare values. It would perhaps be faster to compute only as * many columns as we need to compare, but that would require * duplicating all the logic in FormIndexDatum. */ Datum l_index_values[INDEX_MAX_KEYS]; bool l_index_isnull[INDEX_MAX_KEYS]; Datum r_index_values[INDEX_MAX_KEYS]; bool r_index_isnull[INDEX_MAX_KEYS]; TupleTableSlot *ecxt_scantuple; /* Reset context each time to prevent memory leakage */ ResetPerTupleExprContext(state->estate); ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple; ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false); FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, l_index_values, l_index_isnull); ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false); FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, r_index_values, r_index_isnull); for (; nkey < state->nKeys; nkey++, sortKey++) { compare = ApplySortComparator(l_index_values[nkey], l_index_isnull[nkey], r_index_values[nkey], r_index_isnull[nkey], sortKey); if (compare != 0) return compare; } } return 0; } static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) { HeapTuple tuple = (HeapTuple) tup; Datum original; MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); /* copy the tuple into sort storage */ tuple = heap_copytuple(tuple); stup->tuple = (void *) tuple; USEMEM(state, GetMemoryChunkSpace(tuple)); MemoryContextSwitchTo(oldcontext); /* * set up first-column key value, and potentially abbreviate, if it's a * simple column */ if (state->indexInfo->ii_IndexAttrNumbers[0] == 0) return; original = heap_getattr(tuple, state->indexInfo->ii_IndexAttrNumbers[0], state->tupDesc, &stup->isnull1); if (!state->sortKeys->abbrev_converter || stup->isnull1) { /* * Store ordinary Datum representation, or NULL value. If there is a * converter it won't expect NULL values, and cost model is not * required to account for NULL, so in that case we avoid calling * converter and just set datum1 to zeroed representation (to be * consistent, and to support cheap inequality tests for NULL * abbreviated keys). */ stup->datum1 = original; } else if (!consider_abort_common(state)) { /* Store abbreviated key representation */ stup->datum1 = state->sortKeys->abbrev_converter(original, state->sortKeys); } else { /* Abort abbreviation */ int i; stup->datum1 = original; /* * Set state to be consistent with never trying abbreviation. * * Alter datum1 representation in already-copied tuples, so as to * ensure a consistent representation (current tuple was just * handled). It does not matter if some dumped tuples are already * sorted on tape, since serialized tuples lack abbreviated keys * (TSS_BUILDRUNS state prevents control reaching here in any case). */ for (i = 0; i < state->memtupcount; i++) { SortTuple *mtup = &state->memtuples[i]; tuple = (HeapTuple) mtup->tuple; mtup->datum1 = heap_getattr(tuple, state->indexInfo->ii_IndexAttrNumbers[0], state->tupDesc, &mtup->isnull1); } } } static void writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) { HeapTuple tuple = (HeapTuple) stup->tuple; unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); /* We need to store t_self, but not other fields of HeapTupleData */ LogicalTapeWrite(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); LogicalTapeWrite(state->tapeset, tapenum, &tuple->t_self, sizeof(ItemPointerData)); LogicalTapeWrite(state->tapeset, tapenum, tuple->t_data, tuple->t_len); if (state->randomAccess) /* need trailing length word? */ LogicalTapeWrite(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { FREEMEM(state, GetMemoryChunkSpace(tuple)); heap_freetuple(tuple); } } static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int tuplen) { unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); HeapTuple tuple = (HeapTuple) readtup_alloc(state, t_len + HEAPTUPLESIZE); /* Reconstruct the HeapTupleData header */ tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); tuple->t_len = t_len; LogicalTapeReadExact(state->tapeset, tapenum, &tuple->t_self, sizeof(ItemPointerData)); /* We don't currently bother to reconstruct t_tableOid */ tuple->t_tableOid = InvalidOid; /* Read in the tuple body */ LogicalTapeReadExact(state->tapeset, tapenum, tuple->t_data, tuple->t_len); if (state->randomAccess) /* need trailing length word? */ LogicalTapeReadExact(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value, if it's a simple column */ if (state->indexInfo->ii_IndexAttrNumbers[0] != 0) stup->datum1 = heap_getattr(tuple, state->indexInfo->ii_IndexAttrNumbers[0], state->tupDesc, &stup->isnull1); } /* * Routines specialized for IndexTuple case * * The btree and hash cases require separate comparison functions, but the * IndexTuple representation is the same so the copy/write/read support * functions can be shared. */ static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { /* * This is similar to comparetup_heap(), but expects index tuples. There * is also special handling for enforcing uniqueness, and special * treatment for equal keys at the end. */ SortSupport sortKey = state->sortKeys; IndexTuple tuple1; IndexTuple tuple2; int keysz; TupleDesc tupDes; bool equal_hasnull = false; int nkey; int32 compare; Datum datum1, datum2; bool isnull1, isnull2; /* Compare the leading sort key */ compare = ApplySortComparator(a->datum1, a->isnull1, b->datum1, b->isnull1, sortKey); if (compare != 0) return compare; /* Compare additional sort keys */ tuple1 = (IndexTuple) a->tuple; tuple2 = (IndexTuple) b->tuple; keysz = state->nKeys; tupDes = RelationGetDescr(state->indexRel); if (sortKey->abbrev_converter) { datum1 = index_getattr(tuple1, 1, tupDes, &isnull1); datum2 = index_getattr(tuple2, 1, tupDes, &isnull2); compare = ApplySortAbbrevFullComparator(datum1, isnull1, datum2, isnull2, sortKey); if (compare != 0) return compare; } /* they are equal, so we only need to examine one null flag */ if (a->isnull1) equal_hasnull = true; sortKey++; for (nkey = 2; nkey <= keysz; nkey++, sortKey++) { datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); compare = ApplySortComparator(datum1, isnull1, datum2, isnull2, sortKey); if (compare != 0) return compare; /* done when we find unequal attributes */ /* they are equal, so we only need to examine one null flag */ if (isnull1) equal_hasnull = true; } /* * If btree has asked us to enforce uniqueness, complain if two equal * tuples are detected (unless there was at least one NULL field). * * It is sufficient to make the test here, because if two tuples are equal * they *must* get compared at some stage of the sort --- otherwise the * sort algorithm wouldn't have checked whether one must appear before the * other. */ if (state->enforceUnique && !equal_hasnull) { Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; char *key_desc; /* * Some rather brain-dead implementations of qsort (such as the one in * QNX 4) will sometimes call the comparison routine to compare a * value to itself, but we always use our own implementation, which * does not. */ Assert(tuple1 != tuple2); index_deform_tuple(tuple1, tupDes, values, isnull); key_desc = BuildIndexValueDescription(state->indexRel, values, isnull); ereport(ERROR, (errcode(ERRCODE_UNIQUE_VIOLATION), errmsg("could not create unique index \"%s\"", RelationGetRelationName(state->indexRel)), key_desc ? errdetail("Key %s is duplicated.", key_desc) : errdetail("Duplicate keys exist."), errtableconstraint(state->heapRel, RelationGetRelationName(state->indexRel)))); } /* * If key values are equal, we sort on ItemPointer. This does not affect * validity of the finished index, but it may be useful to have index * scans in physical order. */ { BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); if (blk1 != blk2) return (blk1 < blk2) ? -1 : 1; } { OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); if (pos1 != pos2) return (pos1 < pos2) ? -1 : 1; } /* ItemPointer values should never be equal */ Assert(false); return 0; } static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { Bucket bucket1; Bucket bucket2; IndexTuple tuple1; IndexTuple tuple2; /* * Fetch hash keys and mask off bits we don't want to sort by. We know * that the first column of the index tuple is the hash key. */ Assert(!a->isnull1); bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1), state->max_buckets, state->high_mask, state->low_mask); Assert(!b->isnull1); bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1), state->max_buckets, state->high_mask, state->low_mask); if (bucket1 > bucket2) return 1; else if (bucket1 < bucket2) return -1; /* * If hash values are equal, we sort on ItemPointer. This does not affect * validity of the finished index, but it may be useful to have index * scans in physical order. */ tuple1 = (IndexTuple) a->tuple; tuple2 = (IndexTuple) b->tuple; { BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); if (blk1 != blk2) return (blk1 < blk2) ? -1 : 1; } { OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); if (pos1 != pos2) return (pos1 < pos2) ? -1 : 1; } /* ItemPointer values should never be equal */ Assert(false); return 0; } static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) { IndexTuple tuple = (IndexTuple) tup; unsigned int tuplen = IndexTupleSize(tuple); IndexTuple newtuple; Datum original; /* copy the tuple into sort storage */ newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen); memcpy(newtuple, tuple, tuplen); USEMEM(state, GetMemoryChunkSpace(newtuple)); stup->tuple = (void *) newtuple; /* set up first-column key value */ original = index_getattr(newtuple, 1, RelationGetDescr(state->indexRel), &stup->isnull1); if (!state->sortKeys->abbrev_converter || stup->isnull1) { /* * Store ordinary Datum representation, or NULL value. If there is a * converter it won't expect NULL values, and cost model is not * required to account for NULL, so in that case we avoid calling * converter and just set datum1 to zeroed representation (to be * consistent, and to support cheap inequality tests for NULL * abbreviated keys). */ stup->datum1 = original; } else if (!consider_abort_common(state)) { /* Store abbreviated key representation */ stup->datum1 = state->sortKeys->abbrev_converter(original, state->sortKeys); } else { /* Abort abbreviation */ int i; stup->datum1 = original; /* * Set state to be consistent with never trying abbreviation. * * Alter datum1 representation in already-copied tuples, so as to * ensure a consistent representation (current tuple was just * handled). It does not matter if some dumped tuples are already * sorted on tape, since serialized tuples lack abbreviated keys * (TSS_BUILDRUNS state prevents control reaching here in any case). */ for (i = 0; i < state->memtupcount; i++) { SortTuple *mtup = &state->memtuples[i]; tuple = (IndexTuple) mtup->tuple; mtup->datum1 = index_getattr(tuple, 1, RelationGetDescr(state->indexRel), &mtup->isnull1); } } } static void writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) { IndexTuple tuple = (IndexTuple) stup->tuple; unsigned int tuplen; tuplen = IndexTupleSize(tuple) + sizeof(tuplen); LogicalTapeWrite(state->tapeset, tapenum, (void *) &tuplen, sizeof(tuplen)); LogicalTapeWrite(state->tapeset, tapenum, (void *) tuple, IndexTupleSize(tuple)); if (state->randomAccess) /* need trailing length word? */ LogicalTapeWrite(state->tapeset, tapenum, (void *) &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { FREEMEM(state, GetMemoryChunkSpace(tuple)); pfree(tuple); } } static void readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); LogicalTapeReadExact(state->tapeset, tapenum, tuple, tuplen); if (state->randomAccess) /* need trailing length word? */ LogicalTapeReadExact(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value */ stup->datum1 = index_getattr(tuple, 1, RelationGetDescr(state->indexRel), &stup->isnull1); } /* * Routines specialized for DatumTuple case */ static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { int compare; compare = ApplySortComparator(a->datum1, a->isnull1, b->datum1, b->isnull1, state->sortKeys); if (compare != 0) return compare; /* if we have abbreviations, then "tuple" has the original value */ if (state->sortKeys->abbrev_converter) compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1, PointerGetDatum(b->tuple), b->isnull1, state->sortKeys); return compare; } static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) { /* Not currently needed */ elog(ERROR, "copytup_datum() should not be called"); } static void writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) { void *waddr; unsigned int tuplen; unsigned int writtenlen; if (stup->isnull1) { waddr = NULL; tuplen = 0; } else if (!state->tuples) { waddr = &stup->datum1; tuplen = sizeof(Datum); } else { waddr = stup->tuple; tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen); Assert(tuplen != 0); } writtenlen = tuplen + sizeof(unsigned int); LogicalTapeWrite(state->tapeset, tapenum, (void *) &writtenlen, sizeof(writtenlen)); LogicalTapeWrite(state->tapeset, tapenum, waddr, tuplen); if (state->randomAccess) /* need trailing length word? */ LogicalTapeWrite(state->tapeset, tapenum, (void *) &writtenlen, sizeof(writtenlen)); if (!state->slabAllocatorUsed && stup->tuple) { FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); pfree(stup->tuple); } } static void readtup_datum(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); if (tuplen == 0) { /* it's NULL */ stup->datum1 = (Datum) 0; stup->isnull1 = true; stup->tuple = NULL; } else if (!state->tuples) { Assert(tuplen == sizeof(Datum)); LogicalTapeReadExact(state->tapeset, tapenum, &stup->datum1, tuplen); stup->isnull1 = false; stup->tuple = NULL; } else { void *raddr = readtup_alloc(state, tuplen); LogicalTapeReadExact(state->tapeset, tapenum, raddr, tuplen); stup->datum1 = PointerGetDatum(raddr); stup->isnull1 = false; stup->tuple = raddr; } if (state->randomAccess) /* need trailing length word? */ LogicalTapeReadExact(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); } /* * Parallel sort routines */ /* * tuplesort_estimate_shared - estimate required shared memory allocation * * nWorkers is an estimate of the number of workers (it's the number that * will be requested). */ Size tuplesort_estimate_shared(int nWorkers) { Size tapesSize; Assert(nWorkers > 0); /* Make sure that BufFile shared state is MAXALIGN'd */ tapesSize = mul_size(sizeof(TapeShare), nWorkers); tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes))); return tapesSize; } /* * tuplesort_initialize_shared - initialize shared tuplesort state * * Must be called from leader process before workers are launched, to * establish state needed up-front for worker tuplesortstates. nWorkers * should match the argument passed to tuplesort_estimate_shared(). */ void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg) { int i; Assert(nWorkers > 0); SpinLockInit(&shared->mutex); shared->currentWorker = 0; shared->workersFinished = 0; SharedFileSetInit(&shared->fileset, seg); shared->nTapes = nWorkers; for (i = 0; i < nWorkers; i++) { shared->tapes[i].firstblocknumber = 0L; } } /* * tuplesort_attach_shared - attach to shared tuplesort state * * Must be called by all worker processes. */ void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg) { /* Attach to SharedFileSet */ SharedFileSetAttach(&shared->fileset, seg); } /* * worker_get_identifier - Assign and return ordinal identifier for worker * * The order in which these are assigned is not well defined, and should not * matter; worker numbers across parallel sort participants need only be * distinct and gapless. logtape.c requires this. * * Note that the identifiers assigned from here have no relation to * ParallelWorkerNumber number, to avoid making any assumption about * caller's requirements. However, we do follow the ParallelWorkerNumber * convention of representing a non-worker with worker number -1. This * includes the leader, as well as serial Tuplesort processes. */ static int worker_get_identifier(Tuplesortstate *state) { Sharedsort *shared = state->shared; int worker; Assert(WORKER(state)); SpinLockAcquire(&shared->mutex); worker = shared->currentWorker++; SpinLockRelease(&shared->mutex); return worker; } /* * worker_freeze_result_tape - freeze worker's result tape for leader * * This is called by workers just after the result tape has been determined, * instead of calling LogicalTapeFreeze() directly. They do so because * workers require a few additional steps over similar serial * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra * steps are around freeing now unneeded resources, and representing to * leader that worker's input run is available for its merge. * * There should only be one final output run for each worker, which consists * of all tuples that were originally input into worker. */ static void worker_freeze_result_tape(Tuplesortstate *state) { Sharedsort *shared = state->shared; TapeShare output; Assert(WORKER(state)); Assert(state->result_tape != -1); Assert(state->memtupcount == 0); /* * Free most remaining memory, in case caller is sensitive to our holding * on to it. memtuples may not be a tiny merge heap at this point. */ pfree(state->memtuples); /* Be tidy */ state->memtuples = NULL; state->memtupsize = 0; /* * Parallel worker requires result tape metadata, which is to be stored in * shared memory for leader */ LogicalTapeFreeze(state->tapeset, state->result_tape, &output); /* Store properties of output tape, and update finished worker count */ SpinLockAcquire(&shared->mutex); shared->tapes[state->worker] = output; shared->workersFinished++; SpinLockRelease(&shared->mutex); } /* * worker_nomergeruns - dump memtuples in worker, without merging * * This called as an alternative to mergeruns() with a worker when no * merging is required. */ static void worker_nomergeruns(Tuplesortstate *state) { Assert(WORKER(state)); Assert(state->result_tape == -1); state->result_tape = state->tp_tapenum[state->destTape]; worker_freeze_result_tape(state); } /* * leader_takeover_tapes - create tapeset for leader from worker tapes * * So far, leader Tuplesortstate has performed no actual sorting. By now, all * sorting has occurred in workers, all of which must have already returned * from tuplesort_performsort(). * * When this returns, leader process is left in a state that is virtually * indistinguishable from it having generated runs as a serial external sort * might have. */ static void leader_takeover_tapes(Tuplesortstate *state) { Sharedsort *shared = state->shared; int nParticipants = state->nParticipants; int workersFinished; int j; Assert(LEADER(state)); Assert(nParticipants >= 1); SpinLockAcquire(&shared->mutex); workersFinished = shared->workersFinished; SpinLockRelease(&shared->mutex); if (nParticipants != workersFinished) elog(ERROR, "cannot take over tapes before all workers finish"); /* * Create the tapeset from worker tapes, including a leader-owned tape at * the end. Parallel workers are far more expensive than logical tapes, * so the number of tapes allocated here should never be excessive. * * We still have a leader tape, though it's not possible to write to it * due to restrictions in the shared fileset infrastructure used by * logtape.c. It will never be written to in practice because * randomAccess is disallowed for parallel sorts. */ inittapestate(state, nParticipants + 1); state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes, &shared->fileset, state->worker); /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ state->currentRun = nParticipants; /* * Initialize variables of Algorithm D to be consistent with runs from * workers having been generated in the leader. * * There will always be exactly 1 run per worker, and exactly one input * tape per run, because workers always output exactly 1 run, even when * there were no input tuples for workers to sort. */ for (j = 0; j < state->maxTapes; j++) { /* One real run; no dummy runs for worker tapes */ state->tp_fib[j] = 1; state->tp_runs[j] = 1; state->tp_dummy[j] = 0; state->tp_tapenum[j] = j; } /* Leader tape gets one dummy run, and no real runs */ state->tp_fib[state->tapeRange] = 0; state->tp_runs[state->tapeRange] = 0; state->tp_dummy[state->tapeRange] = 1; state->Level = 1; state->destTape = 0; state->status = TSS_BUILDRUNS; } /* * Convenience routine to free a tuple previously loaded into sort memory */ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup) { if (stup->tuple) { FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); pfree(stup->tuple); stup->tuple = NULL; } }