1 /*-------------------------------------------------------------------------
2 *
3 * tuplesort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module handles sorting of heap tuples, index tuples, or single
7 * Datums (and could easily support other kinds of sortable objects,
8 * if necessary). It works efficiently for both small and large amounts
9 * of data. Small amounts are sorted in-memory using qsort(). Large
10 * amounts are sorted using temporary files and a standard external sort
11 * algorithm.
12 *
13 * See Knuth, volume 3, for more than you want to know about the external
14 * sorting algorithm. Historically, we divided the input into sorted runs
15 * using replacement selection, in the form of a priority tree implemented
16 * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17 * quicksort for run generation. We merge the runs using polyphase merge,
18 * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
19 * implemented by logtape.c, which avoids space wastage by recycling disk
20 * space as soon as each block is read from its "tape".
21 *
22 * The approximate amount of memory allowed for any one sort operation
23 * is specified in kilobytes by the caller (most pass work_mem). Initially,
24 * we absorb tuples and simply store them in an unsorted array as long as
25 * we haven't exceeded workMem. If we reach the end of the input without
26 * exceeding workMem, we sort the array using qsort() and subsequently return
27 * tuples just by scanning the tuple array sequentially. If we do exceed
28 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29 * When tuples are dumped in batch after quicksorting, we begin a new run
30 * with a new output tape (selected per Algorithm D). After the end of the
31 * input is reached, we dump out remaining tuples in memory into a final run,
32 * then merge the runs using Algorithm D.
33 *
34 * When merging runs, we use a heap containing just the frontmost tuple from
35 * each source run; we repeatedly output the smallest tuple and replace it
36 * with the next tuple from its source tape (if any). When the heap empties,
37 * the merge is complete. The basic merge algorithm thus needs very little
38 * memory --- only M tuples for an M-way merge, and M is constrained to a
39 * small number. However, we can still make good use of our full workMem
40 * allocation by pre-reading additional blocks from each source tape. Without
41 * prereading, our access pattern to the temporary file would be very erratic;
42 * on average we'd read one block from each of M source tapes during the same
43 * time that we're writing M blocks to the output tape, so there is no
44 * sequentiality of access at all, defeating the read-ahead methods used by
45 * most Unix kernels. Worse, the output tape gets written into a very random
46 * sequence of blocks of the temp file, ensuring that things will be even
47 * worse when it comes time to read that tape. A straightforward merge pass
48 * thus ends up doing a lot of waiting for disk seeks. We can improve matters
49 * by prereading from each source tape sequentially, loading about workMem/M
50 * bytes from each tape in turn, and making the sequential blocks immediately
51 * available for reuse. This approach helps to localize both read and write
52 * accesses. The pre-reading is handled by logtape.c, we just tell it how
53 * much memory to use for the buffers.
54 *
55 * When the caller requests random access to the sort result, we form
56 * the final sorted run on a logical tape which is then "frozen", so
57 * that we can access it randomly. When the caller does not need random
58 * access, we return from tuplesort_performsort() as soon as we are down
59 * to one run per logical tape. The final merge is then performed
60 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61 * saves one cycle of writing all the data out to disk and reading it in.
62 *
63 * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64 * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65 * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
66 * tape drives are expensive beasts, and in particular that there will always
67 * be many more runs than tape drives. In our implementation a "tape drive"
68 * doesn't cost much more than a few Kb of memory buffers, so we can afford
69 * to have lots of them. In particular, if we can have as many tape drives
70 * as sorted runs, we can eliminate any repeated I/O at all. In the current
71 * code we determine the number of tapes M on the basis of workMem: we want
72 * workMem/M to be large enough that we read a fair amount of data each time
73 * we preread from a tape, so as to maintain the locality of access described
74 * above. Nonetheless, with large workMem we can have many tapes (but not
75 * too many -- see the comments in tuplesort_merge_order).
76 *
77 * This module supports parallel sorting. Parallel sorts involve coordination
78 * among one or more worker processes, and a leader process, each with its own
79 * tuplesort state. The leader process (or, more accurately, the
80 * Tuplesortstate associated with a leader process) creates a full tapeset
81 * consisting of worker tapes with one run to merge; a run for every
82 * worker process. This is then merged. Worker processes are guaranteed to
83 * produce exactly one output run from their partial input.
84 *
85 *
86 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
87 * Portions Copyright (c) 1994, Regents of the University of California
88 *
89 * IDENTIFICATION
90 * src/backend/utils/sort/tuplesort.c
91 *
92 *-------------------------------------------------------------------------
93 */
94
95 #include "postgres.h"
96
97 #include <limits.h>
98
99 #include "access/hash.h"
100 #include "access/htup_details.h"
101 #include "access/nbtree.h"
102 #include "catalog/index.h"
103 #include "catalog/pg_am.h"
104 #include "commands/tablespace.h"
105 #include "executor/executor.h"
106 #include "miscadmin.h"
107 #include "pg_trace.h"
108 #include "utils/datum.h"
109 #include "utils/logtape.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 #include "utils/pg_rusage.h"
113 #include "utils/rel.h"
114 #include "utils/sortsupport.h"
115 #include "utils/tuplesort.h"
116
117
118 /* sort-type codes for sort__start probes */
119 #define HEAP_SORT 0
120 #define INDEX_SORT 1
121 #define DATUM_SORT 2
122 #define CLUSTER_SORT 3
123
124 /* Sort parallel code from state for sort__start probes */
125 #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
126 (state)->worker >= 0 ? 1 : 2)
127
128 /*
129 * Initial size of memtuples array. We're trying to select this size so that
130 * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
131 * allocation might possibly be lowered. However, we don't consider array sizes
132 * less than 1024.
133 *
134 */
135 #define INITIAL_MEMTUPSIZE Max(1024, \
136 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
137
138 /* GUC variables */
139 #ifdef TRACE_SORT
140 bool trace_sort = false;
141 #endif
142
143 #ifdef DEBUG_BOUNDED_SORT
144 bool optimize_bounded_sort = true;
145 #endif
146
147
148 /*
149 * The objects we actually sort are SortTuple structs. These contain
150 * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
151 * which is a separate palloc chunk --- we assume it is just one chunk and
152 * can be freed by a simple pfree() (except during merge, when we use a
153 * simple slab allocator). SortTuples also contain the tuple's first key
154 * column in Datum/nullflag format, and a source/input tape number that
155 * tracks which tape each heap element/slot belongs to during merging.
156 *
157 * Storing the first key column lets us save heap_getattr or index_getattr
158 * calls during tuple comparisons. We could extract and save all the key
159 * columns not just the first, but this would increase code complexity and
160 * overhead, and wouldn't actually save any comparison cycles in the common
161 * case where the first key determines the comparison result. Note that
162 * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
163 *
164 * There is one special case: when the sort support infrastructure provides an
165 * "abbreviated key" representation, where the key is (typically) a pass by
166 * value proxy for a pass by reference type. In this case, the abbreviated key
167 * is stored in datum1 in place of the actual first key column.
168 *
169 * When sorting single Datums, the data value is represented directly by
170 * datum1/isnull1 for pass by value types (or null values). If the datatype is
171 * pass-by-reference and isnull1 is false, then "tuple" points to a separately
172 * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then
173 * either the same pointer as "tuple", or is an abbreviated key value as
174 * described above. Accordingly, "tuple" is always used in preference to
175 * datum1 as the authoritative value for pass-by-reference cases.
176 */
177 typedef struct
178 {
179 void *tuple; /* the tuple itself */
180 Datum datum1; /* value of first key column */
181 bool isnull1; /* is first key column NULL? */
182 int srctape; /* source tape number */
183 } SortTuple;
184
185 /*
186 * During merge, we use a pre-allocated set of fixed-size slots to hold
187 * tuples. To avoid palloc/pfree overhead.
188 *
189 * Merge doesn't require a lot of memory, so we can afford to waste some,
190 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
191 * palloc() overhead is not significant anymore.
192 *
193 * 'nextfree' is valid when this chunk is in the free list. When in use, the
194 * slot holds a tuple.
195 */
196 #define SLAB_SLOT_SIZE 1024
197
198 typedef union SlabSlot
199 {
200 union SlabSlot *nextfree;
201 char buffer[SLAB_SLOT_SIZE];
202 } SlabSlot;
203
204 /*
205 * Possible states of a Tuplesort object. These denote the states that
206 * persist between calls of Tuplesort routines.
207 */
208 typedef enum
209 {
210 TSS_INITIAL, /* Loading tuples; still within memory limit */
211 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
212 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
213 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
214 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
215 TSS_FINALMERGE /* Performing final merge on-the-fly */
216 } TupSortStatus;
217
218 /*
219 * Parameters for calculation of number of tapes to use --- see inittapes()
220 * and tuplesort_merge_order().
221 *
222 * In this calculation we assume that each tape will cost us about 1 blocks
223 * worth of buffer space. This ignores the overhead of all the other data
224 * structures needed for each tape, but it's probably close enough.
225 *
226 * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
227 * tape during a preread cycle (see discussion at top of file).
228 */
229 #define MINORDER 6 /* minimum merge order */
230 #define MAXORDER 500 /* maximum merge order */
231 #define TAPE_BUFFER_OVERHEAD BLCKSZ
232 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
233
234 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
235 Tuplesortstate *state);
236
237 /*
238 * Private state of a Tuplesort operation.
239 */
240 struct Tuplesortstate
241 {
242 TupSortStatus status; /* enumerated value as shown above */
243 int nKeys; /* number of columns in sort key */
244 bool randomAccess; /* did caller request random access? */
245 bool bounded; /* did caller specify a maximum number of
246 * tuples to return? */
247 bool boundUsed; /* true if we made use of a bounded heap */
248 int bound; /* if bounded, the maximum number of tuples */
249 bool tuples; /* Can SortTuple.tuple ever be set? */
250 int64 availMem; /* remaining memory available, in bytes */
251 int64 allowedMem; /* total memory allowed, in bytes */
252 int maxTapes; /* number of tapes (Knuth's T) */
253 int tapeRange; /* maxTapes-1 (Knuth's P) */
254 int64 maxSpace; /* maximum amount of space occupied among sort
255 * of groups, either in-memory or on-disk */
256 bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
257 * space, false when it's value for in-memory
258 * space */
259 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
260 MemoryContext maincontext; /* memory context for tuple sort metadata that
261 * persists across multiple batches */
262 MemoryContext sortcontext; /* memory context holding most sort data */
263 MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
264 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
265
266 /*
267 * These function pointers decouple the routines that must know what kind
268 * of tuple we are sorting from the routines that don't need to know it.
269 * They are set up by the tuplesort_begin_xxx routines.
270 *
271 * Function to compare two tuples; result is per qsort() convention, ie:
272 * <0, 0, >0 according as a<b, a=b, a>b. The API must match
273 * qsort_arg_comparator.
274 */
275 SortTupleComparator comparetup;
276
277 /*
278 * Function to copy a supplied input tuple into palloc'd space and set up
279 * its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
280 * state->availMem must be decreased by the amount of space used for the
281 * tuple copy (note the SortTuple struct itself is not counted).
282 */
283 void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
284
285 /*
286 * Function to write a stored tuple onto tape. The representation of the
287 * tuple on tape need not be the same as it is in memory; requirements on
288 * the tape representation are given below. Unless the slab allocator is
289 * used, after writing the tuple, pfree() the out-of-line data (not the
290 * SortTuple struct!), and increase state->availMem by the amount of
291 * memory space thereby released.
292 */
293 void (*writetup) (Tuplesortstate *state, int tapenum,
294 SortTuple *stup);
295
296 /*
297 * Function to read a stored tuple from tape back into memory. 'len' is
298 * the already-read length of the stored tuple. The tuple is allocated
299 * from the slab memory arena, or is palloc'd, see readtup_alloc().
300 */
301 void (*readtup) (Tuplesortstate *state, SortTuple *stup,
302 int tapenum, unsigned int len);
303
304 /*
305 * This array holds the tuples now in sort memory. If we are in state
306 * INITIAL, the tuples are in no particular order; if we are in state
307 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
308 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
309 * H. In state SORTEDONTAPE, the array is not used.
310 */
311 SortTuple *memtuples; /* array of SortTuple structs */
312 int memtupcount; /* number of tuples currently present */
313 int memtupsize; /* allocated length of memtuples array */
314 bool growmemtuples; /* memtuples' growth still underway? */
315
316 /*
317 * Memory for tuples is sometimes allocated using a simple slab allocator,
318 * rather than with palloc(). Currently, we switch to slab allocation
319 * when we start merging. Merging only needs to keep a small, fixed
320 * number of tuples in memory at any time, so we can avoid the
321 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
322 * to hold the tuples.
323 *
324 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
325 * slots. The allocation is sized to have one slot per tape, plus one
326 * additional slot. We need that many slots to hold all the tuples kept
327 * in the heap during merge, plus the one we have last returned from the
328 * sort, with tuplesort_gettuple.
329 *
330 * Initially, all the slots are kept in a linked list of free slots. When
331 * a tuple is read from a tape, it is put to the next available slot, if
332 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
333 * instead.
334 *
335 * When we're done processing a tuple, we return the slot back to the free
336 * list, or pfree() if it was palloc'd. We know that a tuple was
337 * allocated from the slab, if its pointer value is between
338 * slabMemoryBegin and -End.
339 *
340 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
341 * tracking memory usage is not used.
342 */
343 bool slabAllocatorUsed;
344
345 char *slabMemoryBegin; /* beginning of slab memory arena */
346 char *slabMemoryEnd; /* end of slab memory arena */
347 SlabSlot *slabFreeHead; /* head of free list */
348
349 /* Buffer size to use for reading input tapes, during merge. */
350 size_t read_buffer_size;
351
352 /*
353 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
354 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
355 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
356 * recycle the memory on next gettuple call.
357 */
358 void *lastReturnedTuple;
359
360 /*
361 * While building initial runs, this is the current output run number.
362 * Afterwards, it is the number of initial runs we made.
363 */
364 int currentRun;
365
366 /*
367 * Unless otherwise noted, all pointer variables below are pointers to
368 * arrays of length maxTapes, holding per-tape data.
369 */
370
371 /*
372 * This variable is only used during merge passes. mergeactive[i] is true
373 * if we are reading an input run from (actual) tape number i and have not
374 * yet exhausted that run.
375 */
376 bool *mergeactive; /* active input run source? */
377
378 /*
379 * Variables for Algorithm D. Note that destTape is a "logical" tape
380 * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
381 * "logical" and "actual" tape numbers straight!
382 */
383 int Level; /* Knuth's l */
384 int destTape; /* current output tape (Knuth's j, less 1) */
385 int *tp_fib; /* Target Fibonacci run counts (A[]) */
386 int *tp_runs; /* # of real runs on each tape */
387 int *tp_dummy; /* # of dummy runs for each tape (D[]) */
388 int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
389 int activeTapes; /* # of active input tapes in merge pass */
390
391 /*
392 * These variables are used after completion of sorting to keep track of
393 * the next tuple to return. (In the tape case, the tape's current read
394 * position is also critical state.)
395 */
396 int result_tape; /* actual tape number of finished output */
397 int current; /* array index (only used if SORTEDINMEM) */
398 bool eof_reached; /* reached EOF (needed for cursors) */
399
400 /* markpos_xxx holds marked position for mark and restore */
401 long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
402 int markpos_offset; /* saved "current", or offset in tape block */
403 bool markpos_eof; /* saved "eof_reached" */
404
405 /*
406 * These variables are used during parallel sorting.
407 *
408 * worker is our worker identifier. Follows the general convention that
409 * -1 value relates to a leader tuplesort, and values >= 0 worker
410 * tuplesorts. (-1 can also be a serial tuplesort.)
411 *
412 * shared is mutable shared memory state, which is used to coordinate
413 * parallel sorts.
414 *
415 * nParticipants is the number of worker Tuplesortstates known by the
416 * leader to have actually been launched, which implies that they must
417 * finish a run leader can merge. Typically includes a worker state held
418 * by the leader process itself. Set in the leader Tuplesortstate only.
419 */
420 int worker;
421 Sharedsort *shared;
422 int nParticipants;
423
424 /*
425 * The sortKeys variable is used by every case other than the hash index
426 * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
427 * MinimalTuple and CLUSTER routines, though.
428 */
429 TupleDesc tupDesc;
430 SortSupport sortKeys; /* array of length nKeys */
431
432 /*
433 * This variable is shared by the single-key MinimalTuple case and the
434 * Datum case (which both use qsort_ssup()). Otherwise it's NULL.
435 */
436 SortSupport onlyKey;
437
438 /*
439 * Additional state for managing "abbreviated key" sortsupport routines
440 * (which currently may be used by all cases except the hash index case).
441 * Tracks the intervals at which the optimization's effectiveness is
442 * tested.
443 */
444 int64 abbrevNext; /* Tuple # at which to next check
445 * applicability */
446
447 /*
448 * These variables are specific to the CLUSTER case; they are set by
449 * tuplesort_begin_cluster.
450 */
451 IndexInfo *indexInfo; /* info about index being used for reference */
452 EState *estate; /* for evaluating index expressions */
453
454 /*
455 * These variables are specific to the IndexTuple case; they are set by
456 * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
457 */
458 Relation heapRel; /* table the index is being built on */
459 Relation indexRel; /* index being built */
460
461 /* These are specific to the index_btree subcase: */
462 bool enforceUnique; /* complain if we find duplicate tuples */
463
464 /* These are specific to the index_hash subcase: */
465 uint32 high_mask; /* masks for sortable part of hash code */
466 uint32 low_mask;
467 uint32 max_buckets;
468
469 /*
470 * These variables are specific to the Datum case; they are set by
471 * tuplesort_begin_datum and used only by the DatumTuple routines.
472 */
473 Oid datumType;
474 /* we need typelen in order to know how to copy the Datums. */
475 int datumTypeLen;
476
477 /*
478 * Resource snapshot for time of sort start.
479 */
480 #ifdef TRACE_SORT
481 PGRUsage ru_start;
482 #endif
483 };
484
485 /*
486 * Private mutable state of tuplesort-parallel-operation. This is allocated
487 * in shared memory.
488 */
489 struct Sharedsort
490 {
491 /* mutex protects all fields prior to tapes */
492 slock_t mutex;
493
494 /*
495 * currentWorker generates ordinal identifier numbers for parallel sort
496 * workers. These start from 0, and are always gapless.
497 *
498 * Workers increment workersFinished to indicate having finished. If this
499 * is equal to state.nParticipants within the leader, leader is ready to
500 * merge worker runs.
501 */
502 int currentWorker;
503 int workersFinished;
504
505 /* Temporary file space */
506 SharedFileSet fileset;
507
508 /* Size of tapes flexible array */
509 int nTapes;
510
511 /*
512 * Tapes array used by workers to report back information needed by the
513 * leader to concatenate all worker tapes into one for merging
514 */
515 TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
516 };
517
518 /*
519 * Is the given tuple allocated from the slab memory arena?
520 */
521 #define IS_SLAB_SLOT(state, tuple) \
522 ((char *) (tuple) >= (state)->slabMemoryBegin && \
523 (char *) (tuple) < (state)->slabMemoryEnd)
524
525 /*
526 * Return the given tuple to the slab memory free list, or free it
527 * if it was palloc'd.
528 */
529 #define RELEASE_SLAB_SLOT(state, tuple) \
530 do { \
531 SlabSlot *buf = (SlabSlot *) tuple; \
532 \
533 if (IS_SLAB_SLOT((state), buf)) \
534 { \
535 buf->nextfree = (state)->slabFreeHead; \
536 (state)->slabFreeHead = buf; \
537 } else \
538 pfree(buf); \
539 } while(0)
540
541 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
542 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
543 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
544 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
545 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
546 #define USEMEM(state,amt) ((state)->availMem -= (amt))
547 #define FREEMEM(state,amt) ((state)->availMem += (amt))
548 #define SERIAL(state) ((state)->shared == NULL)
549 #define WORKER(state) ((state)->shared && (state)->worker != -1)
550 #define LEADER(state) ((state)->shared && (state)->worker == -1)
551
552 /*
553 * NOTES about on-tape representation of tuples:
554 *
555 * We require the first "unsigned int" of a stored tuple to be the total size
556 * on-tape of the tuple, including itself (so it is never zero; an all-zero
557 * unsigned int is used to delimit runs). The remainder of the stored tuple
558 * may or may not match the in-memory representation of the tuple ---
559 * any conversion needed is the job of the writetup and readtup routines.
560 *
561 * If state->randomAccess is true, then the stored representation of the
562 * tuple must be followed by another "unsigned int" that is a copy of the
563 * length --- so the total tape space used is actually sizeof(unsigned int)
564 * more than the stored length value. This allows read-backwards. When
565 * randomAccess is not true, the write/read routines may omit the extra
566 * length word.
567 *
568 * writetup is expected to write both length words as well as the tuple
569 * data. When readtup is called, the tape is positioned just after the
570 * front length word; readtup must read the tuple data and advance past
571 * the back length word (if present).
572 *
573 * The write/read routines can make use of the tuple description data
574 * stored in the Tuplesortstate record, if needed. They are also expected
575 * to adjust state->availMem by the amount of memory space (not tape space!)
576 * released or consumed. There is no error return from either writetup
577 * or readtup; they should ereport() on failure.
578 *
579 *
580 * NOTES about memory consumption calculations:
581 *
582 * We count space allocated for tuples against the workMem limit, plus
583 * the space used by the variable-size memtuples array. Fixed-size space
584 * is not counted; it's small enough to not be interesting.
585 *
586 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
587 * rather than the originally-requested size. This is important since
588 * palloc can add substantial overhead. It's not a complete answer since
589 * we won't count any wasted space in palloc allocation blocks, but it's
590 * a lot better than what we were doing before 7.3. As of 9.6, a
591 * separate memory context is used for caller passed tuples. Resetting
592 * it at certain key increments significantly ameliorates fragmentation.
593 * Note that this places a responsibility on copytup routines to use the
594 * correct memory context for these tuples (and to not use the reset
595 * context for anything whose lifetime needs to span multiple external
596 * sort runs). readtup routines use the slab allocator (they cannot use
597 * the reset context because it gets deleted at the point that merging
598 * begins).
599 */
600
601 /* When using this macro, beware of double evaluation of len */
602 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
603 do { \
604 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
605 elog(ERROR, "unexpected end of data"); \
606 } while(0)
607
608
609 static Tuplesortstate *tuplesort_begin_common(int workMem,
610 SortCoordinate coordinate,
611 bool randomAccess);
612 static void tuplesort_begin_batch(Tuplesortstate *state);
613 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
614 static bool consider_abort_common(Tuplesortstate *state);
615 static void inittapes(Tuplesortstate *state, bool mergeruns);
616 static void inittapestate(Tuplesortstate *state, int maxTapes);
617 static void selectnewtape(Tuplesortstate *state);
618 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
619 static void mergeruns(Tuplesortstate *state);
620 static void mergeonerun(Tuplesortstate *state);
621 static void beginmerge(Tuplesortstate *state);
622 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
623 static void dumptuples(Tuplesortstate *state, bool alltuples);
624 static void make_bounded_heap(Tuplesortstate *state);
625 static void sort_bounded_heap(Tuplesortstate *state);
626 static void tuplesort_sort_memtuples(Tuplesortstate *state);
627 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
628 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
629 static void tuplesort_heap_delete_top(Tuplesortstate *state);
630 static void reversedirection(Tuplesortstate *state);
631 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
632 static void markrunend(Tuplesortstate *state, int tapenum);
633 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
634 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
635 Tuplesortstate *state);
636 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
637 static void writetup_heap(Tuplesortstate *state, int tapenum,
638 SortTuple *stup);
639 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
640 int tapenum, unsigned int len);
641 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
642 Tuplesortstate *state);
643 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
644 static void writetup_cluster(Tuplesortstate *state, int tapenum,
645 SortTuple *stup);
646 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
647 int tapenum, unsigned int len);
648 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
649 Tuplesortstate *state);
650 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
651 Tuplesortstate *state);
652 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
653 static void writetup_index(Tuplesortstate *state, int tapenum,
654 SortTuple *stup);
655 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
656 int tapenum, unsigned int len);
657 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
658 Tuplesortstate *state);
659 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
660 static void writetup_datum(Tuplesortstate *state, int tapenum,
661 SortTuple *stup);
662 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
663 int tapenum, unsigned int len);
664 static int worker_get_identifier(Tuplesortstate *state);
665 static void worker_freeze_result_tape(Tuplesortstate *state);
666 static void worker_nomergeruns(Tuplesortstate *state);
667 static void leader_takeover_tapes(Tuplesortstate *state);
668 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
669 static void tuplesort_free(Tuplesortstate *state);
670 static void tuplesort_updatemax(Tuplesortstate *state);
671
672 /*
673 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
674 * any variant of SortTuples, using the appropriate comparetup function.
675 * qsort_ssup() is specialized for the case where the comparetup function
676 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
677 * and Datum sorts.
678 */
679 #include "qsort_tuple.c"
680
681
682 /*
683 * tuplesort_begin_xxx
684 *
685 * Initialize for a tuple sort operation.
686 *
687 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
688 * zero or more times, then call tuplesort_performsort when all the tuples
689 * have been supplied. After performsort, retrieve the tuples in sorted
690 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
691 * access was requested, rescan, markpos, and restorepos can also be called.)
692 * Call tuplesort_end to terminate the operation and release memory/disk space.
693 *
694 * Each variant of tuplesort_begin has a workMem parameter specifying the
695 * maximum number of kilobytes of RAM to use before spilling data to disk.
696 * (The normal value of this parameter is work_mem, but some callers use
697 * other values.) Each variant also has a randomAccess parameter specifying
698 * whether the caller needs non-sequential access to the sort result.
699 */
700
701 static Tuplesortstate *
tuplesort_begin_common(int workMem,SortCoordinate coordinate,bool randomAccess)702 tuplesort_begin_common(int workMem, SortCoordinate coordinate,
703 bool randomAccess)
704 {
705 Tuplesortstate *state;
706 MemoryContext maincontext;
707 MemoryContext sortcontext;
708 MemoryContext oldcontext;
709
710 /* See leader_takeover_tapes() remarks on randomAccess support */
711 if (coordinate && randomAccess)
712 elog(ERROR, "random access disallowed under parallel sort");
713
714 /*
715 * Memory context surviving tuplesort_reset. This memory context holds
716 * data which is useful to keep while sorting multiple similar batches.
717 */
718 maincontext = AllocSetContextCreate(CurrentMemoryContext,
719 "TupleSort main",
720 ALLOCSET_DEFAULT_SIZES);
721
722 /*
723 * Create a working memory context for one sort operation. The content of
724 * this context is deleted by tuplesort_reset.
725 */
726 sortcontext = AllocSetContextCreate(maincontext,
727 "TupleSort sort",
728 ALLOCSET_DEFAULT_SIZES);
729
730 /*
731 * Additionally a working memory context for tuples is setup in
732 * tuplesort_begin_batch.
733 */
734
735 /*
736 * Make the Tuplesortstate within the per-sortstate context. This way, we
737 * don't need a separate pfree() operation for it at shutdown.
738 */
739 oldcontext = MemoryContextSwitchTo(maincontext);
740
741 state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
742
743 #ifdef TRACE_SORT
744 if (trace_sort)
745 pg_rusage_init(&state->ru_start);
746 #endif
747
748 state->randomAccess = randomAccess;
749 state->tuples = true;
750
751 /*
752 * workMem is forced to be at least 64KB, the current minimum valid value
753 * for the work_mem GUC. This is a defense against parallel sort callers
754 * that divide out memory among many workers in a way that leaves each
755 * with very little memory.
756 */
757 state->allowedMem = Max(workMem, 64) * (int64) 1024;
758 state->sortcontext = sortcontext;
759 state->maincontext = maincontext;
760
761 /*
762 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
763 * see comments in grow_memtuples().
764 */
765 state->memtupsize = INITIAL_MEMTUPSIZE;
766 state->memtuples = NULL;
767
768 /*
769 * After all of the other non-parallel-related state, we setup all of the
770 * state needed for each batch.
771 */
772 tuplesort_begin_batch(state);
773
774 /*
775 * Initialize parallel-related state based on coordination information
776 * from caller
777 */
778 if (!coordinate)
779 {
780 /* Serial sort */
781 state->shared = NULL;
782 state->worker = -1;
783 state->nParticipants = -1;
784 }
785 else if (coordinate->isWorker)
786 {
787 /* Parallel worker produces exactly one final run from all input */
788 state->shared = coordinate->sharedsort;
789 state->worker = worker_get_identifier(state);
790 state->nParticipants = -1;
791 }
792 else
793 {
794 /* Parallel leader state only used for final merge */
795 state->shared = coordinate->sharedsort;
796 state->worker = -1;
797 state->nParticipants = coordinate->nParticipants;
798 Assert(state->nParticipants >= 1);
799 }
800
801 MemoryContextSwitchTo(oldcontext);
802
803 return state;
804 }
805
806 /*
807 * tuplesort_begin_batch
808 *
809 * Setup, or reset, all state need for processing a new set of tuples with this
810 * sort state. Called both from tuplesort_begin_common (the first time sorting
811 * with this sort state) and tuplesort_reset (for subsequent usages).
812 */
813 static void
tuplesort_begin_batch(Tuplesortstate * state)814 tuplesort_begin_batch(Tuplesortstate *state)
815 {
816 MemoryContext oldcontext;
817
818 oldcontext = MemoryContextSwitchTo(state->maincontext);
819
820 /*
821 * Caller tuple (e.g. IndexTuple) memory context.
822 *
823 * A dedicated child context used exclusively for caller passed tuples
824 * eases memory management. Resetting at key points reduces
825 * fragmentation. Note that the memtuples array of SortTuples is allocated
826 * in the parent context, not this context, because there is no need to
827 * free memtuples early.
828 */
829 state->tuplecontext = AllocSetContextCreate(state->sortcontext,
830 "Caller tuples",
831 ALLOCSET_DEFAULT_SIZES);
832
833 state->status = TSS_INITIAL;
834 state->bounded = false;
835 state->boundUsed = false;
836
837 state->availMem = state->allowedMem;
838
839 state->tapeset = NULL;
840
841 state->memtupcount = 0;
842
843 /*
844 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
845 * see comments in grow_memtuples().
846 */
847 state->growmemtuples = true;
848 state->slabAllocatorUsed = false;
849 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
850 {
851 pfree(state->memtuples);
852 state->memtuples = NULL;
853 state->memtupsize = INITIAL_MEMTUPSIZE;
854 }
855 if (state->memtuples == NULL)
856 {
857 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
858 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
859 }
860
861 /* workMem must be large enough for the minimal memtuples array */
862 if (LACKMEM(state))
863 elog(ERROR, "insufficient memory allowed for sort");
864
865 state->currentRun = 0;
866
867 /*
868 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
869 * inittapes(), if needed
870 */
871
872 state->result_tape = -1; /* flag that result tape has not been formed */
873
874 MemoryContextSwitchTo(oldcontext);
875 }
876
877 Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,int nkeys,AttrNumber * attNums,Oid * sortOperators,Oid * sortCollations,bool * nullsFirstFlags,int workMem,SortCoordinate coordinate,bool randomAccess)878 tuplesort_begin_heap(TupleDesc tupDesc,
879 int nkeys, AttrNumber *attNums,
880 Oid *sortOperators, Oid *sortCollations,
881 bool *nullsFirstFlags,
882 int workMem, SortCoordinate coordinate, bool randomAccess)
883 {
884 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
885 randomAccess);
886 MemoryContext oldcontext;
887 int i;
888
889 oldcontext = MemoryContextSwitchTo(state->maincontext);
890
891 AssertArg(nkeys > 0);
892
893 #ifdef TRACE_SORT
894 if (trace_sort)
895 elog(LOG,
896 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
897 nkeys, workMem, randomAccess ? 't' : 'f');
898 #endif
899
900 state->nKeys = nkeys;
901
902 TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
903 false, /* no unique check */
904 nkeys,
905 workMem,
906 randomAccess,
907 PARALLEL_SORT(state));
908
909 state->comparetup = comparetup_heap;
910 state->copytup = copytup_heap;
911 state->writetup = writetup_heap;
912 state->readtup = readtup_heap;
913
914 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
915 state->abbrevNext = 10;
916
917 /* Prepare SortSupport data for each column */
918 state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
919
920 for (i = 0; i < nkeys; i++)
921 {
922 SortSupport sortKey = state->sortKeys + i;
923
924 AssertArg(attNums[i] != 0);
925 AssertArg(sortOperators[i] != 0);
926
927 sortKey->ssup_cxt = CurrentMemoryContext;
928 sortKey->ssup_collation = sortCollations[i];
929 sortKey->ssup_nulls_first = nullsFirstFlags[i];
930 sortKey->ssup_attno = attNums[i];
931 /* Convey if abbreviation optimization is applicable in principle */
932 sortKey->abbreviate = (i == 0);
933
934 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
935 }
936
937 /*
938 * The "onlyKey" optimization cannot be used with abbreviated keys, since
939 * tie-breaker comparisons may be required. Typically, the optimization
940 * is only of value to pass-by-value types anyway, whereas abbreviated
941 * keys are typically only of value to pass-by-reference types.
942 */
943 if (nkeys == 1 && !state->sortKeys->abbrev_converter)
944 state->onlyKey = state->sortKeys;
945
946 MemoryContextSwitchTo(oldcontext);
947
948 return state;
949 }
950
951 Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)952 tuplesort_begin_cluster(TupleDesc tupDesc,
953 Relation indexRel,
954 int workMem,
955 SortCoordinate coordinate, bool randomAccess)
956 {
957 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
958 randomAccess);
959 BTScanInsert indexScanKey;
960 MemoryContext oldcontext;
961 int i;
962
963 Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
964
965 oldcontext = MemoryContextSwitchTo(state->maincontext);
966
967 #ifdef TRACE_SORT
968 if (trace_sort)
969 elog(LOG,
970 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
971 RelationGetNumberOfAttributes(indexRel),
972 workMem, randomAccess ? 't' : 'f');
973 #endif
974
975 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
976
977 TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
978 false, /* no unique check */
979 state->nKeys,
980 workMem,
981 randomAccess,
982 PARALLEL_SORT(state));
983
984 state->comparetup = comparetup_cluster;
985 state->copytup = copytup_cluster;
986 state->writetup = writetup_cluster;
987 state->readtup = readtup_cluster;
988 state->abbrevNext = 10;
989
990 state->indexInfo = BuildIndexInfo(indexRel);
991
992 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
993
994 indexScanKey = _bt_mkscankey(indexRel, NULL);
995
996 if (state->indexInfo->ii_Expressions != NULL)
997 {
998 TupleTableSlot *slot;
999 ExprContext *econtext;
1000
1001 /*
1002 * We will need to use FormIndexDatum to evaluate the index
1003 * expressions. To do that, we need an EState, as well as a
1004 * TupleTableSlot to put the table tuples into. The econtext's
1005 * scantuple has to point to that slot, too.
1006 */
1007 state->estate = CreateExecutorState();
1008 slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
1009 econtext = GetPerTupleExprContext(state->estate);
1010 econtext->ecxt_scantuple = slot;
1011 }
1012
1013 /* Prepare SortSupport data for each column */
1014 state->sortKeys = (SortSupport) palloc0(state->nKeys *
1015 sizeof(SortSupportData));
1016
1017 for (i = 0; i < state->nKeys; i++)
1018 {
1019 SortSupport sortKey = state->sortKeys + i;
1020 ScanKey scanKey = indexScanKey->scankeys + i;
1021 int16 strategy;
1022
1023 sortKey->ssup_cxt = CurrentMemoryContext;
1024 sortKey->ssup_collation = scanKey->sk_collation;
1025 sortKey->ssup_nulls_first =
1026 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1027 sortKey->ssup_attno = scanKey->sk_attno;
1028 /* Convey if abbreviation optimization is applicable in principle */
1029 sortKey->abbreviate = (i == 0);
1030
1031 AssertState(sortKey->ssup_attno != 0);
1032
1033 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1034 BTGreaterStrategyNumber : BTLessStrategyNumber;
1035
1036 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1037 }
1038
1039 pfree(indexScanKey);
1040
1041 MemoryContextSwitchTo(oldcontext);
1042
1043 return state;
1044 }
1045
1046 Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,Relation indexRel,bool enforceUnique,int workMem,SortCoordinate coordinate,bool randomAccess)1047 tuplesort_begin_index_btree(Relation heapRel,
1048 Relation indexRel,
1049 bool enforceUnique,
1050 int workMem,
1051 SortCoordinate coordinate,
1052 bool randomAccess)
1053 {
1054 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1055 randomAccess);
1056 BTScanInsert indexScanKey;
1057 MemoryContext oldcontext;
1058 int i;
1059
1060 oldcontext = MemoryContextSwitchTo(state->maincontext);
1061
1062 #ifdef TRACE_SORT
1063 if (trace_sort)
1064 elog(LOG,
1065 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
1066 enforceUnique ? 't' : 'f',
1067 workMem, randomAccess ? 't' : 'f');
1068 #endif
1069
1070 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
1071
1072 TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1073 enforceUnique,
1074 state->nKeys,
1075 workMem,
1076 randomAccess,
1077 PARALLEL_SORT(state));
1078
1079 state->comparetup = comparetup_index_btree;
1080 state->copytup = copytup_index;
1081 state->writetup = writetup_index;
1082 state->readtup = readtup_index;
1083 state->abbrevNext = 10;
1084
1085 state->heapRel = heapRel;
1086 state->indexRel = indexRel;
1087 state->enforceUnique = enforceUnique;
1088
1089 indexScanKey = _bt_mkscankey(indexRel, NULL);
1090
1091 /* Prepare SortSupport data for each column */
1092 state->sortKeys = (SortSupport) palloc0(state->nKeys *
1093 sizeof(SortSupportData));
1094
1095 for (i = 0; i < state->nKeys; i++)
1096 {
1097 SortSupport sortKey = state->sortKeys + i;
1098 ScanKey scanKey = indexScanKey->scankeys + i;
1099 int16 strategy;
1100
1101 sortKey->ssup_cxt = CurrentMemoryContext;
1102 sortKey->ssup_collation = scanKey->sk_collation;
1103 sortKey->ssup_nulls_first =
1104 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1105 sortKey->ssup_attno = scanKey->sk_attno;
1106 /* Convey if abbreviation optimization is applicable in principle */
1107 sortKey->abbreviate = (i == 0);
1108
1109 AssertState(sortKey->ssup_attno != 0);
1110
1111 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1112 BTGreaterStrategyNumber : BTLessStrategyNumber;
1113
1114 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1115 }
1116
1117 pfree(indexScanKey);
1118
1119 MemoryContextSwitchTo(oldcontext);
1120
1121 return state;
1122 }
1123
1124 Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,Relation indexRel,uint32 high_mask,uint32 low_mask,uint32 max_buckets,int workMem,SortCoordinate coordinate,bool randomAccess)1125 tuplesort_begin_index_hash(Relation heapRel,
1126 Relation indexRel,
1127 uint32 high_mask,
1128 uint32 low_mask,
1129 uint32 max_buckets,
1130 int workMem,
1131 SortCoordinate coordinate,
1132 bool randomAccess)
1133 {
1134 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1135 randomAccess);
1136 MemoryContext oldcontext;
1137
1138 oldcontext = MemoryContextSwitchTo(state->maincontext);
1139
1140 #ifdef TRACE_SORT
1141 if (trace_sort)
1142 elog(LOG,
1143 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1144 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1145 high_mask,
1146 low_mask,
1147 max_buckets,
1148 workMem, randomAccess ? 't' : 'f');
1149 #endif
1150
1151 state->nKeys = 1; /* Only one sort column, the hash code */
1152
1153 state->comparetup = comparetup_index_hash;
1154 state->copytup = copytup_index;
1155 state->writetup = writetup_index;
1156 state->readtup = readtup_index;
1157
1158 state->heapRel = heapRel;
1159 state->indexRel = indexRel;
1160
1161 state->high_mask = high_mask;
1162 state->low_mask = low_mask;
1163 state->max_buckets = max_buckets;
1164
1165 MemoryContextSwitchTo(oldcontext);
1166
1167 return state;
1168 }
1169
1170 Tuplesortstate *
tuplesort_begin_datum(Oid datumType,Oid sortOperator,Oid sortCollation,bool nullsFirstFlag,int workMem,SortCoordinate coordinate,bool randomAccess)1171 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1172 bool nullsFirstFlag, int workMem,
1173 SortCoordinate coordinate, bool randomAccess)
1174 {
1175 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1176 randomAccess);
1177 MemoryContext oldcontext;
1178 int16 typlen;
1179 bool typbyval;
1180
1181 oldcontext = MemoryContextSwitchTo(state->maincontext);
1182
1183 #ifdef TRACE_SORT
1184 if (trace_sort)
1185 elog(LOG,
1186 "begin datum sort: workMem = %d, randomAccess = %c",
1187 workMem, randomAccess ? 't' : 'f');
1188 #endif
1189
1190 state->nKeys = 1; /* always a one-column sort */
1191
1192 TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1193 false, /* no unique check */
1194 1,
1195 workMem,
1196 randomAccess,
1197 PARALLEL_SORT(state));
1198
1199 state->comparetup = comparetup_datum;
1200 state->copytup = copytup_datum;
1201 state->writetup = writetup_datum;
1202 state->readtup = readtup_datum;
1203 state->abbrevNext = 10;
1204
1205 state->datumType = datumType;
1206
1207 /* lookup necessary attributes of the datum type */
1208 get_typlenbyval(datumType, &typlen, &typbyval);
1209 state->datumTypeLen = typlen;
1210 state->tuples = !typbyval;
1211
1212 /* Prepare SortSupport data */
1213 state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1214
1215 state->sortKeys->ssup_cxt = CurrentMemoryContext;
1216 state->sortKeys->ssup_collation = sortCollation;
1217 state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1218
1219 /*
1220 * Abbreviation is possible here only for by-reference types. In theory,
1221 * a pass-by-value datatype could have an abbreviated form that is cheaper
1222 * to compare. In a tuple sort, we could support that, because we can
1223 * always extract the original datum from the tuple as needed. Here, we
1224 * can't, because a datum sort only stores a single copy of the datum; the
1225 * "tuple" field of each SortTuple is NULL.
1226 */
1227 state->sortKeys->abbreviate = !typbyval;
1228
1229 PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1230
1231 /*
1232 * The "onlyKey" optimization cannot be used with abbreviated keys, since
1233 * tie-breaker comparisons may be required. Typically, the optimization
1234 * is only of value to pass-by-value types anyway, whereas abbreviated
1235 * keys are typically only of value to pass-by-reference types.
1236 */
1237 if (!state->sortKeys->abbrev_converter)
1238 state->onlyKey = state->sortKeys;
1239
1240 MemoryContextSwitchTo(oldcontext);
1241
1242 return state;
1243 }
1244
1245 /*
1246 * tuplesort_set_bound
1247 *
1248 * Advise tuplesort that at most the first N result tuples are required.
1249 *
1250 * Must be called before inserting any tuples. (Actually, we could allow it
1251 * as long as the sort hasn't spilled to disk, but there seems no need for
1252 * delayed calls at the moment.)
1253 *
1254 * This is a hint only. The tuplesort may still return more tuples than
1255 * requested. Parallel leader tuplesorts will always ignore the hint.
1256 */
1257 void
tuplesort_set_bound(Tuplesortstate * state,int64 bound)1258 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1259 {
1260 /* Assert we're called before loading any tuples */
1261 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
1262 /* Can't set the bound twice, either */
1263 Assert(!state->bounded);
1264 /* Also, this shouldn't be called in a parallel worker */
1265 Assert(!WORKER(state));
1266
1267 /* Parallel leader allows but ignores hint */
1268 if (LEADER(state))
1269 return;
1270
1271 #ifdef DEBUG_BOUNDED_SORT
1272 /* Honor GUC setting that disables the feature (for easy testing) */
1273 if (!optimize_bounded_sort)
1274 return;
1275 #endif
1276
1277 /* We want to be able to compute bound * 2, so limit the setting */
1278 if (bound > (int64) (INT_MAX / 2))
1279 return;
1280
1281 state->bounded = true;
1282 state->bound = (int) bound;
1283
1284 /*
1285 * Bounded sorts are not an effective target for abbreviated key
1286 * optimization. Disable by setting state to be consistent with no
1287 * abbreviation support.
1288 */
1289 state->sortKeys->abbrev_converter = NULL;
1290 if (state->sortKeys->abbrev_full_comparator)
1291 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1292
1293 /* Not strictly necessary, but be tidy */
1294 state->sortKeys->abbrev_abort = NULL;
1295 state->sortKeys->abbrev_full_comparator = NULL;
1296 }
1297
1298 /*
1299 * tuplesort_used_bound
1300 *
1301 * Allow callers to find out if the sort state was able to use a bound.
1302 */
1303 bool
tuplesort_used_bound(Tuplesortstate * state)1304 tuplesort_used_bound(Tuplesortstate *state)
1305 {
1306 return state->boundUsed;
1307 }
1308
1309 /*
1310 * tuplesort_free
1311 *
1312 * Internal routine for freeing resources of tuplesort.
1313 */
1314 static void
tuplesort_free(Tuplesortstate * state)1315 tuplesort_free(Tuplesortstate *state)
1316 {
1317 /* context swap probably not needed, but let's be safe */
1318 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1319
1320 #ifdef TRACE_SORT
1321 long spaceUsed;
1322
1323 if (state->tapeset)
1324 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1325 else
1326 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1327 #endif
1328
1329 /*
1330 * Delete temporary "tape" files, if any.
1331 *
1332 * Note: want to include this in reported total cost of sort, hence need
1333 * for two #ifdef TRACE_SORT sections.
1334 */
1335 if (state->tapeset)
1336 LogicalTapeSetClose(state->tapeset);
1337
1338 #ifdef TRACE_SORT
1339 if (trace_sort)
1340 {
1341 if (state->tapeset)
1342 elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1343 SERIAL(state) ? "external sort" : "parallel external sort",
1344 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1345 else
1346 elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1347 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1348 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1349 }
1350
1351 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1352 #else
1353
1354 /*
1355 * If you disabled TRACE_SORT, you can still probe sort__done, but you
1356 * ain't getting space-used stats.
1357 */
1358 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1359 #endif
1360
1361 /* Free any execution state created for CLUSTER case */
1362 if (state->estate != NULL)
1363 {
1364 ExprContext *econtext = GetPerTupleExprContext(state->estate);
1365
1366 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1367 FreeExecutorState(state->estate);
1368 }
1369
1370 MemoryContextSwitchTo(oldcontext);
1371
1372 /*
1373 * Free the per-sort memory context, thereby releasing all working memory.
1374 */
1375 MemoryContextReset(state->sortcontext);
1376 }
1377
1378 /*
1379 * tuplesort_end
1380 *
1381 * Release resources and clean up.
1382 *
1383 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1384 * pointing to garbage. Be careful not to attempt to use or free such
1385 * pointers afterwards!
1386 */
1387 void
tuplesort_end(Tuplesortstate * state)1388 tuplesort_end(Tuplesortstate *state)
1389 {
1390 tuplesort_free(state);
1391
1392 /*
1393 * Free the main memory context, including the Tuplesortstate struct
1394 * itself.
1395 */
1396 MemoryContextDelete(state->maincontext);
1397 }
1398
1399 /*
1400 * tuplesort_updatemax
1401 *
1402 * Update maximum resource usage statistics.
1403 */
1404 static void
tuplesort_updatemax(Tuplesortstate * state)1405 tuplesort_updatemax(Tuplesortstate *state)
1406 {
1407 int64 spaceUsed;
1408 bool isSpaceDisk;
1409
1410 /*
1411 * Note: it might seem we should provide both memory and disk usage for a
1412 * disk-based sort. However, the current code doesn't track memory space
1413 * accurately once we have begun to return tuples to the caller (since we
1414 * don't account for pfree's the caller is expected to do), so we cannot
1415 * rely on availMem in a disk sort. This does not seem worth the overhead
1416 * to fix. Is it worth creating an API for the memory context code to
1417 * tell us how much is actually used in sortcontext?
1418 */
1419 if (state->tapeset)
1420 {
1421 isSpaceDisk = true;
1422 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1423 }
1424 else
1425 {
1426 isSpaceDisk = false;
1427 spaceUsed = state->allowedMem - state->availMem;
1428 }
1429
1430 /*
1431 * Sort evicts data to the disk when it wasn't able to fit that data into
1432 * main memory. This is why we assume space used on the disk to be more
1433 * important for tracking resource usage than space used in memory. Note
1434 * that the amount of space occupied by some tupleset on the disk might be
1435 * less than amount of space occupied by the same tupleset in memory due
1436 * to more compact representation.
1437 */
1438 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1439 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1440 {
1441 state->maxSpace = spaceUsed;
1442 state->isMaxSpaceDisk = isSpaceDisk;
1443 state->maxSpaceStatus = state->status;
1444 }
1445 }
1446
1447 /*
1448 * tuplesort_reset
1449 *
1450 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1451 * meta-information in. After tuplesort_reset, tuplesort is ready to start
1452 * a new sort. This allows avoiding recreation of tuple sort states (and
1453 * save resources) when sorting multiple small batches.
1454 */
1455 void
tuplesort_reset(Tuplesortstate * state)1456 tuplesort_reset(Tuplesortstate *state)
1457 {
1458 tuplesort_updatemax(state);
1459 tuplesort_free(state);
1460
1461 /*
1462 * After we've freed up per-batch memory, re-setup all of the state common
1463 * to both the first batch and any subsequent batch.
1464 */
1465 tuplesort_begin_batch(state);
1466
1467 state->lastReturnedTuple = NULL;
1468 state->slabMemoryBegin = NULL;
1469 state->slabMemoryEnd = NULL;
1470 state->slabFreeHead = NULL;
1471 }
1472
1473 /*
1474 * Grow the memtuples[] array, if possible within our memory constraint. We
1475 * must not exceed INT_MAX tuples in memory or the caller-provided memory
1476 * limit. Return true if we were able to enlarge the array, false if not.
1477 *
1478 * Normally, at each increment we double the size of the array. When doing
1479 * that would exceed a limit, we attempt one last, smaller increase (and then
1480 * clear the growmemtuples flag so we don't try any more). That allows us to
1481 * use memory as fully as permitted; sticking to the pure doubling rule could
1482 * result in almost half going unused. Because availMem moves around with
1483 * tuple addition/removal, we need some rule to prevent making repeated small
1484 * increases in memtupsize, which would just be useless thrashing. The
1485 * growmemtuples flag accomplishes that and also prevents useless
1486 * recalculations in this function.
1487 */
1488 static bool
grow_memtuples(Tuplesortstate * state)1489 grow_memtuples(Tuplesortstate *state)
1490 {
1491 int newmemtupsize;
1492 int memtupsize = state->memtupsize;
1493 int64 memNowUsed = state->allowedMem - state->availMem;
1494
1495 /* Forget it if we've already maxed out memtuples, per comment above */
1496 if (!state->growmemtuples)
1497 return false;
1498
1499 /* Select new value of memtupsize */
1500 if (memNowUsed <= state->availMem)
1501 {
1502 /*
1503 * We've used no more than half of allowedMem; double our usage,
1504 * clamping at INT_MAX tuples.
1505 */
1506 if (memtupsize < INT_MAX / 2)
1507 newmemtupsize = memtupsize * 2;
1508 else
1509 {
1510 newmemtupsize = INT_MAX;
1511 state->growmemtuples = false;
1512 }
1513 }
1514 else
1515 {
1516 /*
1517 * This will be the last increment of memtupsize. Abandon doubling
1518 * strategy and instead increase as much as we safely can.
1519 *
1520 * To stay within allowedMem, we can't increase memtupsize by more
1521 * than availMem / sizeof(SortTuple) elements. In practice, we want
1522 * to increase it by considerably less, because we need to leave some
1523 * space for the tuples to which the new array slots will refer. We
1524 * assume the new tuples will be about the same size as the tuples
1525 * we've already seen, and thus we can extrapolate from the space
1526 * consumption so far to estimate an appropriate new size for the
1527 * memtuples array. The optimal value might be higher or lower than
1528 * this estimate, but it's hard to know that in advance. We again
1529 * clamp at INT_MAX tuples.
1530 *
1531 * This calculation is safe against enlarging the array so much that
1532 * LACKMEM becomes true, because the memory currently used includes
1533 * the present array; thus, there would be enough allowedMem for the
1534 * new array elements even if no other memory were currently used.
1535 *
1536 * We do the arithmetic in float8, because otherwise the product of
1537 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1538 * result should be insignificant; but even if we computed a
1539 * completely insane result, the checks below will prevent anything
1540 * really bad from happening.
1541 */
1542 double grow_ratio;
1543
1544 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1545 if (memtupsize * grow_ratio < INT_MAX)
1546 newmemtupsize = (int) (memtupsize * grow_ratio);
1547 else
1548 newmemtupsize = INT_MAX;
1549
1550 /* We won't make any further enlargement attempts */
1551 state->growmemtuples = false;
1552 }
1553
1554 /* Must enlarge array by at least one element, else report failure */
1555 if (newmemtupsize <= memtupsize)
1556 goto noalloc;
1557
1558 /*
1559 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1560 * to ensure our request won't be rejected. Note that we can easily
1561 * exhaust address space before facing this outcome. (This is presently
1562 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1563 * don't rely on that at this distance.)
1564 */
1565 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1566 {
1567 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1568 state->growmemtuples = false; /* can't grow any more */
1569 }
1570
1571 /*
1572 * We need to be sure that we do not cause LACKMEM to become true, else
1573 * the space management algorithm will go nuts. The code above should
1574 * never generate a dangerous request, but to be safe, check explicitly
1575 * that the array growth fits within availMem. (We could still cause
1576 * LACKMEM if the memory chunk overhead associated with the memtuples
1577 * array were to increase. That shouldn't happen because we chose the
1578 * initial array size large enough to ensure that palloc will be treating
1579 * both old and new arrays as separate chunks. But we'll check LACKMEM
1580 * explicitly below just in case.)
1581 */
1582 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1583 goto noalloc;
1584
1585 /* OK, do it */
1586 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1587 state->memtupsize = newmemtupsize;
1588 state->memtuples = (SortTuple *)
1589 repalloc_huge(state->memtuples,
1590 state->memtupsize * sizeof(SortTuple));
1591 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1592 if (LACKMEM(state))
1593 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1594 return true;
1595
1596 noalloc:
1597 /* If for any reason we didn't realloc, shut off future attempts */
1598 state->growmemtuples = false;
1599 return false;
1600 }
1601
1602 /*
1603 * Accept one tuple while collecting input data for sort.
1604 *
1605 * Note that the input data is always copied; the caller need not save it.
1606 */
1607 void
tuplesort_puttupleslot(Tuplesortstate * state,TupleTableSlot * slot)1608 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1609 {
1610 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1611 SortTuple stup;
1612
1613 /*
1614 * Copy the given tuple into memory we control, and decrease availMem.
1615 * Then call the common code.
1616 */
1617 COPYTUP(state, &stup, (void *) slot);
1618
1619 puttuple_common(state, &stup);
1620
1621 MemoryContextSwitchTo(oldcontext);
1622 }
1623
1624 /*
1625 * Accept one tuple while collecting input data for sort.
1626 *
1627 * Note that the input data is always copied; the caller need not save it.
1628 */
1629 void
tuplesort_putheaptuple(Tuplesortstate * state,HeapTuple tup)1630 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1631 {
1632 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1633 SortTuple stup;
1634
1635 /*
1636 * Copy the given tuple into memory we control, and decrease availMem.
1637 * Then call the common code.
1638 */
1639 COPYTUP(state, &stup, (void *) tup);
1640
1641 puttuple_common(state, &stup);
1642
1643 MemoryContextSwitchTo(oldcontext);
1644 }
1645
1646 /*
1647 * Collect one index tuple while collecting input data for sort, building
1648 * it from caller-supplied values.
1649 */
1650 void
tuplesort_putindextuplevalues(Tuplesortstate * state,Relation rel,ItemPointer self,Datum * values,bool * isnull)1651 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1652 ItemPointer self, Datum *values,
1653 bool *isnull)
1654 {
1655 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1656 SortTuple stup;
1657 Datum original;
1658 IndexTuple tuple;
1659
1660 stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1661 tuple = ((IndexTuple) stup.tuple);
1662 tuple->t_tid = *self;
1663 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1664 /* set up first-column key value */
1665 original = index_getattr(tuple,
1666 1,
1667 RelationGetDescr(state->indexRel),
1668 &stup.isnull1);
1669
1670 MemoryContextSwitchTo(state->sortcontext);
1671
1672 if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1673 {
1674 /*
1675 * Store ordinary Datum representation, or NULL value. If there is a
1676 * converter it won't expect NULL values, and cost model is not
1677 * required to account for NULL, so in that case we avoid calling
1678 * converter and just set datum1 to zeroed representation (to be
1679 * consistent, and to support cheap inequality tests for NULL
1680 * abbreviated keys).
1681 */
1682 stup.datum1 = original;
1683 }
1684 else if (!consider_abort_common(state))
1685 {
1686 /* Store abbreviated key representation */
1687 stup.datum1 = state->sortKeys->abbrev_converter(original,
1688 state->sortKeys);
1689 }
1690 else
1691 {
1692 /* Abort abbreviation */
1693 int i;
1694
1695 stup.datum1 = original;
1696
1697 /*
1698 * Set state to be consistent with never trying abbreviation.
1699 *
1700 * Alter datum1 representation in already-copied tuples, so as to
1701 * ensure a consistent representation (current tuple was just
1702 * handled). It does not matter if some dumped tuples are already
1703 * sorted on tape, since serialized tuples lack abbreviated keys
1704 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1705 */
1706 for (i = 0; i < state->memtupcount; i++)
1707 {
1708 SortTuple *mtup = &state->memtuples[i];
1709
1710 tuple = mtup->tuple;
1711 mtup->datum1 = index_getattr(tuple,
1712 1,
1713 RelationGetDescr(state->indexRel),
1714 &mtup->isnull1);
1715 }
1716 }
1717
1718 puttuple_common(state, &stup);
1719
1720 MemoryContextSwitchTo(oldcontext);
1721 }
1722
1723 /*
1724 * Accept one Datum while collecting input data for sort.
1725 *
1726 * If the Datum is pass-by-ref type, the value will be copied.
1727 */
1728 void
tuplesort_putdatum(Tuplesortstate * state,Datum val,bool isNull)1729 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1730 {
1731 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1732 SortTuple stup;
1733
1734 /*
1735 * Pass-by-value types or null values are just stored directly in
1736 * stup.datum1 (and stup.tuple is not used and set to NULL).
1737 *
1738 * Non-null pass-by-reference values need to be copied into memory we
1739 * control, and possibly abbreviated. The copied value is pointed to by
1740 * stup.tuple and is treated as the canonical copy (e.g. to return via
1741 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1742 * abbreviated value if abbreviation is happening, otherwise it's
1743 * identical to stup.tuple.
1744 */
1745
1746 if (isNull || !state->tuples)
1747 {
1748 /*
1749 * Set datum1 to zeroed representation for NULLs (to be consistent,
1750 * and to support cheap inequality tests for NULL abbreviated keys).
1751 */
1752 stup.datum1 = !isNull ? val : (Datum) 0;
1753 stup.isnull1 = isNull;
1754 stup.tuple = NULL; /* no separate storage */
1755 MemoryContextSwitchTo(state->sortcontext);
1756 }
1757 else
1758 {
1759 Datum original = datumCopy(val, false, state->datumTypeLen);
1760
1761 stup.isnull1 = false;
1762 stup.tuple = DatumGetPointer(original);
1763 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1764 MemoryContextSwitchTo(state->sortcontext);
1765
1766 if (!state->sortKeys->abbrev_converter)
1767 {
1768 stup.datum1 = original;
1769 }
1770 else if (!consider_abort_common(state))
1771 {
1772 /* Store abbreviated key representation */
1773 stup.datum1 = state->sortKeys->abbrev_converter(original,
1774 state->sortKeys);
1775 }
1776 else
1777 {
1778 /* Abort abbreviation */
1779 int i;
1780
1781 stup.datum1 = original;
1782
1783 /*
1784 * Set state to be consistent with never trying abbreviation.
1785 *
1786 * Alter datum1 representation in already-copied tuples, so as to
1787 * ensure a consistent representation (current tuple was just
1788 * handled). It does not matter if some dumped tuples are already
1789 * sorted on tape, since serialized tuples lack abbreviated keys
1790 * (TSS_BUILDRUNS state prevents control reaching here in any
1791 * case).
1792 */
1793 for (i = 0; i < state->memtupcount; i++)
1794 {
1795 SortTuple *mtup = &state->memtuples[i];
1796
1797 mtup->datum1 = PointerGetDatum(mtup->tuple);
1798 }
1799 }
1800 }
1801
1802 puttuple_common(state, &stup);
1803
1804 MemoryContextSwitchTo(oldcontext);
1805 }
1806
1807 /*
1808 * Shared code for tuple and datum cases.
1809 */
1810 static void
puttuple_common(Tuplesortstate * state,SortTuple * tuple)1811 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1812 {
1813 Assert(!LEADER(state));
1814
1815 switch (state->status)
1816 {
1817 case TSS_INITIAL:
1818
1819 /*
1820 * Save the tuple into the unsorted array. First, grow the array
1821 * as needed. Note that we try to grow the array when there is
1822 * still one free slot remaining --- if we fail, there'll still be
1823 * room to store the incoming tuple, and then we'll switch to
1824 * tape-based operation.
1825 */
1826 if (state->memtupcount >= state->memtupsize - 1)
1827 {
1828 (void) grow_memtuples(state);
1829 Assert(state->memtupcount < state->memtupsize);
1830 }
1831 state->memtuples[state->memtupcount++] = *tuple;
1832
1833 /*
1834 * Check if it's time to switch over to a bounded heapsort. We do
1835 * so if the input tuple count exceeds twice the desired tuple
1836 * count (this is a heuristic for where heapsort becomes cheaper
1837 * than a quicksort), or if we've just filled workMem and have
1838 * enough tuples to meet the bound.
1839 *
1840 * Note that once we enter TSS_BOUNDED state we will always try to
1841 * complete the sort that way. In the worst case, if later input
1842 * tuples are larger than earlier ones, this might cause us to
1843 * exceed workMem significantly.
1844 */
1845 if (state->bounded &&
1846 (state->memtupcount > state->bound * 2 ||
1847 (state->memtupcount > state->bound && LACKMEM(state))))
1848 {
1849 #ifdef TRACE_SORT
1850 if (trace_sort)
1851 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1852 state->memtupcount,
1853 pg_rusage_show(&state->ru_start));
1854 #endif
1855 make_bounded_heap(state);
1856 return;
1857 }
1858
1859 /*
1860 * Done if we still fit in available memory and have array slots.
1861 */
1862 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1863 return;
1864
1865 /*
1866 * Nope; time to switch to tape-based operation.
1867 */
1868 inittapes(state, true);
1869
1870 /*
1871 * Dump all tuples.
1872 */
1873 dumptuples(state, false);
1874 break;
1875
1876 case TSS_BOUNDED:
1877
1878 /*
1879 * We don't want to grow the array here, so check whether the new
1880 * tuple can be discarded before putting it in. This should be a
1881 * good speed optimization, too, since when there are many more
1882 * input tuples than the bound, most input tuples can be discarded
1883 * with just this one comparison. Note that because we currently
1884 * have the sort direction reversed, we must check for <= not >=.
1885 */
1886 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1887 {
1888 /* new tuple <= top of the heap, so we can discard it */
1889 free_sort_tuple(state, tuple);
1890 CHECK_FOR_INTERRUPTS();
1891 }
1892 else
1893 {
1894 /* discard top of heap, replacing it with the new tuple */
1895 free_sort_tuple(state, &state->memtuples[0]);
1896 tuplesort_heap_replace_top(state, tuple);
1897 }
1898 break;
1899
1900 case TSS_BUILDRUNS:
1901
1902 /*
1903 * Save the tuple into the unsorted array (there must be space)
1904 */
1905 state->memtuples[state->memtupcount++] = *tuple;
1906
1907 /*
1908 * If we are over the memory limit, dump all tuples.
1909 */
1910 dumptuples(state, false);
1911 break;
1912
1913 default:
1914 elog(ERROR, "invalid tuplesort state");
1915 break;
1916 }
1917 }
1918
1919 static bool
consider_abort_common(Tuplesortstate * state)1920 consider_abort_common(Tuplesortstate *state)
1921 {
1922 Assert(state->sortKeys[0].abbrev_converter != NULL);
1923 Assert(state->sortKeys[0].abbrev_abort != NULL);
1924 Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
1925
1926 /*
1927 * Check effectiveness of abbreviation optimization. Consider aborting
1928 * when still within memory limit.
1929 */
1930 if (state->status == TSS_INITIAL &&
1931 state->memtupcount >= state->abbrevNext)
1932 {
1933 state->abbrevNext *= 2;
1934
1935 /*
1936 * Check opclass-supplied abbreviation abort routine. It may indicate
1937 * that abbreviation should not proceed.
1938 */
1939 if (!state->sortKeys->abbrev_abort(state->memtupcount,
1940 state->sortKeys))
1941 return false;
1942
1943 /*
1944 * Finally, restore authoritative comparator, and indicate that
1945 * abbreviation is not in play by setting abbrev_converter to NULL
1946 */
1947 state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
1948 state->sortKeys[0].abbrev_converter = NULL;
1949 /* Not strictly necessary, but be tidy */
1950 state->sortKeys[0].abbrev_abort = NULL;
1951 state->sortKeys[0].abbrev_full_comparator = NULL;
1952
1953 /* Give up - expect original pass-by-value representation */
1954 return true;
1955 }
1956
1957 return false;
1958 }
1959
1960 /*
1961 * All tuples have been provided; finish the sort.
1962 */
1963 void
tuplesort_performsort(Tuplesortstate * state)1964 tuplesort_performsort(Tuplesortstate *state)
1965 {
1966 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1967
1968 #ifdef TRACE_SORT
1969 if (trace_sort)
1970 elog(LOG, "performsort of worker %d starting: %s",
1971 state->worker, pg_rusage_show(&state->ru_start));
1972 #endif
1973
1974 switch (state->status)
1975 {
1976 case TSS_INITIAL:
1977
1978 /*
1979 * We were able to accumulate all the tuples within the allowed
1980 * amount of memory, or leader to take over worker tapes
1981 */
1982 if (SERIAL(state))
1983 {
1984 /* Just qsort 'em and we're done */
1985 tuplesort_sort_memtuples(state);
1986 state->status = TSS_SORTEDINMEM;
1987 }
1988 else if (WORKER(state))
1989 {
1990 /*
1991 * Parallel workers must still dump out tuples to tape. No
1992 * merge is required to produce single output run, though.
1993 */
1994 inittapes(state, false);
1995 dumptuples(state, true);
1996 worker_nomergeruns(state);
1997 state->status = TSS_SORTEDONTAPE;
1998 }
1999 else
2000 {
2001 /*
2002 * Leader will take over worker tapes and merge worker runs.
2003 * Note that mergeruns sets the correct state->status.
2004 */
2005 leader_takeover_tapes(state);
2006 mergeruns(state);
2007 }
2008 state->current = 0;
2009 state->eof_reached = false;
2010 state->markpos_block = 0L;
2011 state->markpos_offset = 0;
2012 state->markpos_eof = false;
2013 break;
2014
2015 case TSS_BOUNDED:
2016
2017 /*
2018 * We were able to accumulate all the tuples required for output
2019 * in memory, using a heap to eliminate excess tuples. Now we
2020 * have to transform the heap to a properly-sorted array.
2021 */
2022 sort_bounded_heap(state);
2023 state->current = 0;
2024 state->eof_reached = false;
2025 state->markpos_offset = 0;
2026 state->markpos_eof = false;
2027 state->status = TSS_SORTEDINMEM;
2028 break;
2029
2030 case TSS_BUILDRUNS:
2031
2032 /*
2033 * Finish tape-based sort. First, flush all tuples remaining in
2034 * memory out to tape; then merge until we have a single remaining
2035 * run (or, if !randomAccess and !WORKER(), one run per tape).
2036 * Note that mergeruns sets the correct state->status.
2037 */
2038 dumptuples(state, true);
2039 mergeruns(state);
2040 state->eof_reached = false;
2041 state->markpos_block = 0L;
2042 state->markpos_offset = 0;
2043 state->markpos_eof = false;
2044 break;
2045
2046 default:
2047 elog(ERROR, "invalid tuplesort state");
2048 break;
2049 }
2050
2051 #ifdef TRACE_SORT
2052 if (trace_sort)
2053 {
2054 if (state->status == TSS_FINALMERGE)
2055 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
2056 state->worker, state->activeTapes,
2057 pg_rusage_show(&state->ru_start));
2058 else
2059 elog(LOG, "performsort of worker %d done: %s",
2060 state->worker, pg_rusage_show(&state->ru_start));
2061 }
2062 #endif
2063
2064 MemoryContextSwitchTo(oldcontext);
2065 }
2066
2067 /*
2068 * Internal routine to fetch the next tuple in either forward or back
2069 * direction into *stup. Returns false if no more tuples.
2070 * Returned tuple belongs to tuplesort memory context, and must not be freed
2071 * by caller. Note that fetched tuple is stored in memory that may be
2072 * recycled by any future fetch.
2073 */
2074 static bool
tuplesort_gettuple_common(Tuplesortstate * state,bool forward,SortTuple * stup)2075 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
2076 SortTuple *stup)
2077 {
2078 unsigned int tuplen;
2079 size_t nmoved;
2080
2081 Assert(!WORKER(state));
2082
2083 switch (state->status)
2084 {
2085 case TSS_SORTEDINMEM:
2086 Assert(forward || state->randomAccess);
2087 Assert(!state->slabAllocatorUsed);
2088 if (forward)
2089 {
2090 if (state->current < state->memtupcount)
2091 {
2092 *stup = state->memtuples[state->current++];
2093 return true;
2094 }
2095 state->eof_reached = true;
2096
2097 /*
2098 * Complain if caller tries to retrieve more tuples than
2099 * originally asked for in a bounded sort. This is because
2100 * returning EOF here might be the wrong thing.
2101 */
2102 if (state->bounded && state->current >= state->bound)
2103 elog(ERROR, "retrieved too many tuples in a bounded sort");
2104
2105 return false;
2106 }
2107 else
2108 {
2109 if (state->current <= 0)
2110 return false;
2111
2112 /*
2113 * if all tuples are fetched already then we return last
2114 * tuple, else - tuple before last returned.
2115 */
2116 if (state->eof_reached)
2117 state->eof_reached = false;
2118 else
2119 {
2120 state->current--; /* last returned tuple */
2121 if (state->current <= 0)
2122 return false;
2123 }
2124 *stup = state->memtuples[state->current - 1];
2125 return true;
2126 }
2127 break;
2128
2129 case TSS_SORTEDONTAPE:
2130 Assert(forward || state->randomAccess);
2131 Assert(state->slabAllocatorUsed);
2132
2133 /*
2134 * The slot that held the tuple that we returned in previous
2135 * gettuple call can now be reused.
2136 */
2137 if (state->lastReturnedTuple)
2138 {
2139 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2140 state->lastReturnedTuple = NULL;
2141 }
2142
2143 if (forward)
2144 {
2145 if (state->eof_reached)
2146 return false;
2147
2148 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
2149 {
2150 READTUP(state, stup, state->result_tape, tuplen);
2151
2152 /*
2153 * Remember the tuple we return, so that we can recycle
2154 * its memory on next call. (This can be NULL, in the
2155 * !state->tuples case).
2156 */
2157 state->lastReturnedTuple = stup->tuple;
2158
2159 return true;
2160 }
2161 else
2162 {
2163 state->eof_reached = true;
2164 return false;
2165 }
2166 }
2167
2168 /*
2169 * Backward.
2170 *
2171 * if all tuples are fetched already then we return last tuple,
2172 * else - tuple before last returned.
2173 */
2174 if (state->eof_reached)
2175 {
2176 /*
2177 * Seek position is pointing just past the zero tuplen at the
2178 * end of file; back up to fetch last tuple's ending length
2179 * word. If seek fails we must have a completely empty file.
2180 */
2181 nmoved = LogicalTapeBackspace(state->tapeset,
2182 state->result_tape,
2183 2 * sizeof(unsigned int));
2184 if (nmoved == 0)
2185 return false;
2186 else if (nmoved != 2 * sizeof(unsigned int))
2187 elog(ERROR, "unexpected tape position");
2188 state->eof_reached = false;
2189 }
2190 else
2191 {
2192 /*
2193 * Back up and fetch previously-returned tuple's ending length
2194 * word. If seek fails, assume we are at start of file.
2195 */
2196 nmoved = LogicalTapeBackspace(state->tapeset,
2197 state->result_tape,
2198 sizeof(unsigned int));
2199 if (nmoved == 0)
2200 return false;
2201 else if (nmoved != sizeof(unsigned int))
2202 elog(ERROR, "unexpected tape position");
2203 tuplen = getlen(state, state->result_tape, false);
2204
2205 /*
2206 * Back up to get ending length word of tuple before it.
2207 */
2208 nmoved = LogicalTapeBackspace(state->tapeset,
2209 state->result_tape,
2210 tuplen + 2 * sizeof(unsigned int));
2211 if (nmoved == tuplen + sizeof(unsigned int))
2212 {
2213 /*
2214 * We backed up over the previous tuple, but there was no
2215 * ending length word before it. That means that the prev
2216 * tuple is the first tuple in the file. It is now the
2217 * next to read in forward direction (not obviously right,
2218 * but that is what in-memory case does).
2219 */
2220 return false;
2221 }
2222 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2223 elog(ERROR, "bogus tuple length in backward scan");
2224 }
2225
2226 tuplen = getlen(state, state->result_tape, false);
2227
2228 /*
2229 * Now we have the length of the prior tuple, back up and read it.
2230 * Note: READTUP expects we are positioned after the initial
2231 * length word of the tuple, so back up to that point.
2232 */
2233 nmoved = LogicalTapeBackspace(state->tapeset,
2234 state->result_tape,
2235 tuplen);
2236 if (nmoved != tuplen)
2237 elog(ERROR, "bogus tuple length in backward scan");
2238 READTUP(state, stup, state->result_tape, tuplen);
2239
2240 /*
2241 * Remember the tuple we return, so that we can recycle its memory
2242 * on next call. (This can be NULL, in the Datum case).
2243 */
2244 state->lastReturnedTuple = stup->tuple;
2245
2246 return true;
2247
2248 case TSS_FINALMERGE:
2249 Assert(forward);
2250 /* We are managing memory ourselves, with the slab allocator. */
2251 Assert(state->slabAllocatorUsed);
2252
2253 /*
2254 * The slab slot holding the tuple that we returned in previous
2255 * gettuple call can now be reused.
2256 */
2257 if (state->lastReturnedTuple)
2258 {
2259 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2260 state->lastReturnedTuple = NULL;
2261 }
2262
2263 /*
2264 * This code should match the inner loop of mergeonerun().
2265 */
2266 if (state->memtupcount > 0)
2267 {
2268 int srcTape = state->memtuples[0].srctape;
2269 SortTuple newtup;
2270
2271 *stup = state->memtuples[0];
2272
2273 /*
2274 * Remember the tuple we return, so that we can recycle its
2275 * memory on next call. (This can be NULL, in the Datum case).
2276 */
2277 state->lastReturnedTuple = stup->tuple;
2278
2279 /*
2280 * Pull next tuple from tape, and replace the returned tuple
2281 * at top of the heap with it.
2282 */
2283 if (!mergereadnext(state, srcTape, &newtup))
2284 {
2285 /*
2286 * If no more data, we've reached end of run on this tape.
2287 * Remove the top node from the heap.
2288 */
2289 tuplesort_heap_delete_top(state);
2290
2291 /*
2292 * Rewind to free the read buffer. It'd go away at the
2293 * end of the sort anyway, but better to release the
2294 * memory early.
2295 */
2296 LogicalTapeRewindForWrite(state->tapeset, srcTape);
2297 return true;
2298 }
2299 newtup.srctape = srcTape;
2300 tuplesort_heap_replace_top(state, &newtup);
2301 return true;
2302 }
2303 return false;
2304
2305 default:
2306 elog(ERROR, "invalid tuplesort state");
2307 return false; /* keep compiler quiet */
2308 }
2309 }
2310
2311 /*
2312 * Fetch the next tuple in either forward or back direction.
2313 * If successful, put tuple in slot and return true; else, clear the slot
2314 * and return false.
2315 *
2316 * Caller may optionally be passed back abbreviated value (on true return
2317 * value) when abbreviation was used, which can be used to cheaply avoid
2318 * equality checks that might otherwise be required. Caller can safely make a
2319 * determination of "non-equal tuple" based on simple binary inequality. A
2320 * NULL value in leading attribute will set abbreviated value to zeroed
2321 * representation, which caller may rely on in abbreviated inequality check.
2322 *
2323 * If copy is true, the slot receives a tuple that's been copied into the
2324 * caller's memory context, so that it will stay valid regardless of future
2325 * manipulations of the tuplesort's state (up to and including deleting the
2326 * tuplesort). If copy is false, the slot will just receive a pointer to a
2327 * tuple held within the tuplesort, which is more efficient, but only safe for
2328 * callers that are prepared to have any subsequent manipulation of the
2329 * tuplesort's state invalidate slot contents.
2330 */
2331 bool
tuplesort_gettupleslot(Tuplesortstate * state,bool forward,bool copy,TupleTableSlot * slot,Datum * abbrev)2332 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2333 TupleTableSlot *slot, Datum *abbrev)
2334 {
2335 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2336 SortTuple stup;
2337
2338 if (!tuplesort_gettuple_common(state, forward, &stup))
2339 stup.tuple = NULL;
2340
2341 MemoryContextSwitchTo(oldcontext);
2342
2343 if (stup.tuple)
2344 {
2345 /* Record abbreviated key for caller */
2346 if (state->sortKeys->abbrev_converter && abbrev)
2347 *abbrev = stup.datum1;
2348
2349 if (copy)
2350 stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2351
2352 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2353 return true;
2354 }
2355 else
2356 {
2357 ExecClearTuple(slot);
2358 return false;
2359 }
2360 }
2361
2362 /*
2363 * Fetch the next tuple in either forward or back direction.
2364 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
2365 * context, and must not be freed by caller. Caller may not rely on tuple
2366 * remaining valid after any further manipulation of tuplesort.
2367 */
2368 HeapTuple
tuplesort_getheaptuple(Tuplesortstate * state,bool forward)2369 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2370 {
2371 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2372 SortTuple stup;
2373
2374 if (!tuplesort_gettuple_common(state, forward, &stup))
2375 stup.tuple = NULL;
2376
2377 MemoryContextSwitchTo(oldcontext);
2378
2379 return stup.tuple;
2380 }
2381
2382 /*
2383 * Fetch the next index tuple in either forward or back direction.
2384 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
2385 * context, and must not be freed by caller. Caller may not rely on tuple
2386 * remaining valid after any further manipulation of tuplesort.
2387 */
2388 IndexTuple
tuplesort_getindextuple(Tuplesortstate * state,bool forward)2389 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2390 {
2391 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2392 SortTuple stup;
2393
2394 if (!tuplesort_gettuple_common(state, forward, &stup))
2395 stup.tuple = NULL;
2396
2397 MemoryContextSwitchTo(oldcontext);
2398
2399 return (IndexTuple) stup.tuple;
2400 }
2401
2402 /*
2403 * Fetch the next Datum in either forward or back direction.
2404 * Returns false if no more datums.
2405 *
2406 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2407 * in caller's context, and is now owned by the caller (this differs from
2408 * similar routines for other types of tuplesorts).
2409 *
2410 * Caller may optionally be passed back abbreviated value (on true return
2411 * value) when abbreviation was used, which can be used to cheaply avoid
2412 * equality checks that might otherwise be required. Caller can safely make a
2413 * determination of "non-equal tuple" based on simple binary inequality. A
2414 * NULL value will have a zeroed abbreviated value representation, which caller
2415 * may rely on in abbreviated inequality check.
2416 */
2417 bool
tuplesort_getdatum(Tuplesortstate * state,bool forward,Datum * val,bool * isNull,Datum * abbrev)2418 tuplesort_getdatum(Tuplesortstate *state, bool forward,
2419 Datum *val, bool *isNull, Datum *abbrev)
2420 {
2421 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2422 SortTuple stup;
2423
2424 if (!tuplesort_gettuple_common(state, forward, &stup))
2425 {
2426 MemoryContextSwitchTo(oldcontext);
2427 return false;
2428 }
2429
2430 /* Ensure we copy into caller's memory context */
2431 MemoryContextSwitchTo(oldcontext);
2432
2433 /* Record abbreviated key for caller */
2434 if (state->sortKeys->abbrev_converter && abbrev)
2435 *abbrev = stup.datum1;
2436
2437 if (stup.isnull1 || !state->tuples)
2438 {
2439 *val = stup.datum1;
2440 *isNull = stup.isnull1;
2441 }
2442 else
2443 {
2444 /* use stup.tuple because stup.datum1 may be an abbreviation */
2445 *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2446 *isNull = false;
2447 }
2448
2449 return true;
2450 }
2451
2452 /*
2453 * Advance over N tuples in either forward or back direction,
2454 * without returning any data. N==0 is a no-op.
2455 * Returns true if successful, false if ran out of tuples.
2456 */
2457 bool
tuplesort_skiptuples(Tuplesortstate * state,int64 ntuples,bool forward)2458 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2459 {
2460 MemoryContext oldcontext;
2461
2462 /*
2463 * We don't actually support backwards skip yet, because no callers need
2464 * it. The API is designed to allow for that later, though.
2465 */
2466 Assert(forward);
2467 Assert(ntuples >= 0);
2468 Assert(!WORKER(state));
2469
2470 switch (state->status)
2471 {
2472 case TSS_SORTEDINMEM:
2473 if (state->memtupcount - state->current >= ntuples)
2474 {
2475 state->current += ntuples;
2476 return true;
2477 }
2478 state->current = state->memtupcount;
2479 state->eof_reached = true;
2480
2481 /*
2482 * Complain if caller tries to retrieve more tuples than
2483 * originally asked for in a bounded sort. This is because
2484 * returning EOF here might be the wrong thing.
2485 */
2486 if (state->bounded && state->current >= state->bound)
2487 elog(ERROR, "retrieved too many tuples in a bounded sort");
2488
2489 return false;
2490
2491 case TSS_SORTEDONTAPE:
2492 case TSS_FINALMERGE:
2493
2494 /*
2495 * We could probably optimize these cases better, but for now it's
2496 * not worth the trouble.
2497 */
2498 oldcontext = MemoryContextSwitchTo(state->sortcontext);
2499 while (ntuples-- > 0)
2500 {
2501 SortTuple stup;
2502
2503 if (!tuplesort_gettuple_common(state, forward, &stup))
2504 {
2505 MemoryContextSwitchTo(oldcontext);
2506 return false;
2507 }
2508 CHECK_FOR_INTERRUPTS();
2509 }
2510 MemoryContextSwitchTo(oldcontext);
2511 return true;
2512
2513 default:
2514 elog(ERROR, "invalid tuplesort state");
2515 return false; /* keep compiler quiet */
2516 }
2517 }
2518
2519 /*
2520 * tuplesort_merge_order - report merge order we'll use for given memory
2521 * (note: "merge order" just means the number of input tapes in the merge).
2522 *
2523 * This is exported for use by the planner. allowedMem is in bytes.
2524 */
2525 int
tuplesort_merge_order(int64 allowedMem)2526 tuplesort_merge_order(int64 allowedMem)
2527 {
2528 int mOrder;
2529
2530 /*
2531 * We need one tape for each merge input, plus another one for the output,
2532 * and each of these tapes needs buffer space. In addition we want
2533 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2534 * count).
2535 *
2536 * Note: you might be thinking we need to account for the memtuples[]
2537 * array in this calculation, but we effectively treat that as part of the
2538 * MERGE_BUFFER_SIZE workspace.
2539 */
2540 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2541 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2542
2543 /*
2544 * Even in minimum memory, use at least a MINORDER merge. On the other
2545 * hand, even when we have lots of memory, do not use more than a MAXORDER
2546 * merge. Tapes are pretty cheap, but they're not entirely free. Each
2547 * additional tape reduces the amount of memory available to build runs,
2548 * which in turn can cause the same sort to need more runs, which makes
2549 * merging slower even if it can still be done in a single pass. Also,
2550 * high order merges are quite slow due to CPU cache effects; it can be
2551 * faster to pay the I/O cost of a polyphase merge than to perform a
2552 * single merge pass across many hundreds of tapes.
2553 */
2554 mOrder = Max(mOrder, MINORDER);
2555 mOrder = Min(mOrder, MAXORDER);
2556
2557 return mOrder;
2558 }
2559
2560 /*
2561 * inittapes - initialize for tape sorting.
2562 *
2563 * This is called only if we have found we won't sort in memory.
2564 */
2565 static void
inittapes(Tuplesortstate * state,bool mergeruns)2566 inittapes(Tuplesortstate *state, bool mergeruns)
2567 {
2568 int maxTapes,
2569 j;
2570
2571 Assert(!LEADER(state));
2572
2573 if (mergeruns)
2574 {
2575 /* Compute number of tapes to use: merge order plus 1 */
2576 maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2577 }
2578 else
2579 {
2580 /* Workers can sometimes produce single run, output without merge */
2581 Assert(WORKER(state));
2582 maxTapes = MINORDER + 1;
2583 }
2584
2585 #ifdef TRACE_SORT
2586 if (trace_sort)
2587 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2588 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2589 #endif
2590
2591 /* Create the tape set and allocate the per-tape data arrays */
2592 inittapestate(state, maxTapes);
2593 state->tapeset =
2594 LogicalTapeSetCreate(maxTapes, false, NULL,
2595 state->shared ? &state->shared->fileset : NULL,
2596 state->worker);
2597
2598 state->currentRun = 0;
2599
2600 /*
2601 * Initialize variables of Algorithm D (step D1).
2602 */
2603 for (j = 0; j < maxTapes; j++)
2604 {
2605 state->tp_fib[j] = 1;
2606 state->tp_runs[j] = 0;
2607 state->tp_dummy[j] = 1;
2608 state->tp_tapenum[j] = j;
2609 }
2610 state->tp_fib[state->tapeRange] = 0;
2611 state->tp_dummy[state->tapeRange] = 0;
2612
2613 state->Level = 1;
2614 state->destTape = 0;
2615
2616 state->status = TSS_BUILDRUNS;
2617 }
2618
2619 /*
2620 * inittapestate - initialize generic tape management state
2621 */
2622 static void
inittapestate(Tuplesortstate * state,int maxTapes)2623 inittapestate(Tuplesortstate *state, int maxTapes)
2624 {
2625 int64 tapeSpace;
2626
2627 /*
2628 * Decrease availMem to reflect the space needed for tape buffers; but
2629 * don't decrease it to the point that we have no room for tuples. (That
2630 * case is only likely to occur if sorting pass-by-value Datums; in all
2631 * other scenarios the memtuples[] array is unlikely to occupy more than
2632 * half of allowedMem. In the pass-by-value case it's not important to
2633 * account for tuple space, so we don't care if LACKMEM becomes
2634 * inaccurate.)
2635 */
2636 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2637
2638 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2639 USEMEM(state, tapeSpace);
2640
2641 /*
2642 * Make sure that the temp file(s) underlying the tape set are created in
2643 * suitable temp tablespaces. For parallel sorts, this should have been
2644 * called already, but it doesn't matter if it is called a second time.
2645 */
2646 PrepareTempTablespaces();
2647
2648 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2649 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2650 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2651 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2652 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2653
2654 /* Record # of tapes allocated (for duration of sort) */
2655 state->maxTapes = maxTapes;
2656 /* Record maximum # of tapes usable as inputs when merging */
2657 state->tapeRange = maxTapes - 1;
2658 }
2659
2660 /*
2661 * selectnewtape -- select new tape for new initial run.
2662 *
2663 * This is called after finishing a run when we know another run
2664 * must be started. This implements steps D3, D4 of Algorithm D.
2665 */
2666 static void
selectnewtape(Tuplesortstate * state)2667 selectnewtape(Tuplesortstate *state)
2668 {
2669 int j;
2670 int a;
2671
2672 /* Step D3: advance j (destTape) */
2673 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2674 {
2675 state->destTape++;
2676 return;
2677 }
2678 if (state->tp_dummy[state->destTape] != 0)
2679 {
2680 state->destTape = 0;
2681 return;
2682 }
2683
2684 /* Step D4: increase level */
2685 state->Level++;
2686 a = state->tp_fib[0];
2687 for (j = 0; j < state->tapeRange; j++)
2688 {
2689 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2690 state->tp_fib[j] = a + state->tp_fib[j + 1];
2691 }
2692 state->destTape = 0;
2693 }
2694
2695 /*
2696 * Initialize the slab allocation arena, for the given number of slots.
2697 */
2698 static void
init_slab_allocator(Tuplesortstate * state,int numSlots)2699 init_slab_allocator(Tuplesortstate *state, int numSlots)
2700 {
2701 if (numSlots > 0)
2702 {
2703 char *p;
2704 int i;
2705
2706 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2707 state->slabMemoryEnd = state->slabMemoryBegin +
2708 numSlots * SLAB_SLOT_SIZE;
2709 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2710 USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2711
2712 p = state->slabMemoryBegin;
2713 for (i = 0; i < numSlots - 1; i++)
2714 {
2715 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2716 p += SLAB_SLOT_SIZE;
2717 }
2718 ((SlabSlot *) p)->nextfree = NULL;
2719 }
2720 else
2721 {
2722 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2723 state->slabFreeHead = NULL;
2724 }
2725 state->slabAllocatorUsed = true;
2726 }
2727
2728 /*
2729 * mergeruns -- merge all the completed initial runs.
2730 *
2731 * This implements steps D5, D6 of Algorithm D. All input data has
2732 * already been written to initial runs on tape (see dumptuples).
2733 */
2734 static void
mergeruns(Tuplesortstate * state)2735 mergeruns(Tuplesortstate *state)
2736 {
2737 int tapenum,
2738 svTape,
2739 svRuns,
2740 svDummy;
2741 int numTapes;
2742 int numInputTapes;
2743
2744 Assert(state->status == TSS_BUILDRUNS);
2745 Assert(state->memtupcount == 0);
2746
2747 if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2748 {
2749 /*
2750 * If there are multiple runs to be merged, when we go to read back
2751 * tuples from disk, abbreviated keys will not have been stored, and
2752 * we don't care to regenerate them. Disable abbreviation from this
2753 * point on.
2754 */
2755 state->sortKeys->abbrev_converter = NULL;
2756 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2757
2758 /* Not strictly necessary, but be tidy */
2759 state->sortKeys->abbrev_abort = NULL;
2760 state->sortKeys->abbrev_full_comparator = NULL;
2761 }
2762
2763 /*
2764 * Reset tuple memory. We've freed all the tuples that we previously
2765 * allocated. We will use the slab allocator from now on.
2766 */
2767 MemoryContextResetOnly(state->tuplecontext);
2768
2769 /*
2770 * We no longer need a large memtuples array. (We will allocate a smaller
2771 * one for the heap later.)
2772 */
2773 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2774 pfree(state->memtuples);
2775 state->memtuples = NULL;
2776
2777 /*
2778 * If we had fewer runs than tapes, refund the memory that we imagined we
2779 * would need for the tape buffers of the unused tapes.
2780 *
2781 * numTapes and numInputTapes reflect the actual number of tapes we will
2782 * use. Note that the output tape's tape number is maxTapes - 1, so the
2783 * tape numbers of the used tapes are not consecutive, and you cannot just
2784 * loop from 0 to numTapes to visit all used tapes!
2785 */
2786 if (state->Level == 1)
2787 {
2788 numInputTapes = state->currentRun;
2789 numTapes = numInputTapes + 1;
2790 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2791 }
2792 else
2793 {
2794 numInputTapes = state->tapeRange;
2795 numTapes = state->maxTapes;
2796 }
2797
2798 /*
2799 * Initialize the slab allocator. We need one slab slot per input tape,
2800 * for the tuples in the heap, plus one to hold the tuple last returned
2801 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2802 * however, we don't need to do allocate anything.)
2803 *
2804 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2805 * to track memory usage of individual tuples.
2806 */
2807 if (state->tuples)
2808 init_slab_allocator(state, numInputTapes + 1);
2809 else
2810 init_slab_allocator(state, 0);
2811
2812 /*
2813 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2814 * from each input tape.
2815 */
2816 state->memtupsize = numInputTapes;
2817 state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
2818 numInputTapes * sizeof(SortTuple));
2819 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2820
2821 /*
2822 * Use all the remaining memory we have available for read buffers among
2823 * the input tapes.
2824 *
2825 * We don't try to "rebalance" the memory among tapes, when we start a new
2826 * merge phase, even if some tapes are inactive in the new phase. That
2827 * would be hard, because logtape.c doesn't know where one run ends and
2828 * another begins. When a new merge phase begins, and a tape doesn't
2829 * participate in it, its buffer nevertheless already contains tuples from
2830 * the next run on same tape, so we cannot release the buffer. That's OK
2831 * in practice, merge performance isn't that sensitive to the amount of
2832 * buffers used, and most merge phases use all or almost all tapes,
2833 * anyway.
2834 */
2835 #ifdef TRACE_SORT
2836 if (trace_sort)
2837 elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2838 state->worker, state->availMem / 1024, numInputTapes);
2839 #endif
2840
2841 state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2842 USEMEM(state, state->read_buffer_size * numInputTapes);
2843
2844 /* End of step D2: rewind all output tapes to prepare for merging */
2845 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2846 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2847
2848 for (;;)
2849 {
2850 /*
2851 * At this point we know that tape[T] is empty. If there's just one
2852 * (real or dummy) run left on each input tape, then only one merge
2853 * pass remains. If we don't have to produce a materialized sorted
2854 * tape, we can stop at this point and do the final merge on-the-fly.
2855 */
2856 if (!state->randomAccess && !WORKER(state))
2857 {
2858 bool allOneRun = true;
2859
2860 Assert(state->tp_runs[state->tapeRange] == 0);
2861 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2862 {
2863 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2864 {
2865 allOneRun = false;
2866 break;
2867 }
2868 }
2869 if (allOneRun)
2870 {
2871 /* Tell logtape.c we won't be writing anymore */
2872 LogicalTapeSetForgetFreeSpace(state->tapeset);
2873 /* Initialize for the final merge pass */
2874 beginmerge(state);
2875 state->status = TSS_FINALMERGE;
2876 return;
2877 }
2878 }
2879
2880 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
2881 while (state->tp_runs[state->tapeRange - 1] ||
2882 state->tp_dummy[state->tapeRange - 1])
2883 {
2884 bool allDummy = true;
2885
2886 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2887 {
2888 if (state->tp_dummy[tapenum] == 0)
2889 {
2890 allDummy = false;
2891 break;
2892 }
2893 }
2894
2895 if (allDummy)
2896 {
2897 state->tp_dummy[state->tapeRange]++;
2898 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2899 state->tp_dummy[tapenum]--;
2900 }
2901 else
2902 mergeonerun(state);
2903 }
2904
2905 /* Step D6: decrease level */
2906 if (--state->Level == 0)
2907 break;
2908 /* rewind output tape T to use as new input */
2909 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2910 state->read_buffer_size);
2911 /* rewind used-up input tape P, and prepare it for write pass */
2912 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2913 state->tp_runs[state->tapeRange - 1] = 0;
2914
2915 /*
2916 * reassign tape units per step D6; note we no longer care about A[]
2917 */
2918 svTape = state->tp_tapenum[state->tapeRange];
2919 svDummy = state->tp_dummy[state->tapeRange];
2920 svRuns = state->tp_runs[state->tapeRange];
2921 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2922 {
2923 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
2924 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
2925 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
2926 }
2927 state->tp_tapenum[0] = svTape;
2928 state->tp_dummy[0] = svDummy;
2929 state->tp_runs[0] = svRuns;
2930 }
2931
2932 /*
2933 * Done. Knuth says that the result is on TAPE[1], but since we exited
2934 * the loop without performing the last iteration of step D6, we have not
2935 * rearranged the tape unit assignment, and therefore the result is on
2936 * TAPE[T]. We need to do it this way so that we can freeze the final
2937 * output tape while rewinding it. The last iteration of step D6 would be
2938 * a waste of cycles anyway...
2939 */
2940 state->result_tape = state->tp_tapenum[state->tapeRange];
2941 if (!WORKER(state))
2942 LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
2943 else
2944 worker_freeze_result_tape(state);
2945 state->status = TSS_SORTEDONTAPE;
2946
2947 /* Release the read buffers of all the other tapes, by rewinding them. */
2948 for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
2949 {
2950 if (tapenum != state->result_tape)
2951 LogicalTapeRewindForWrite(state->tapeset, tapenum);
2952 }
2953 }
2954
2955 /*
2956 * Merge one run from each input tape, except ones with dummy runs.
2957 *
2958 * This is the inner loop of Algorithm D step D5. We know that the
2959 * output tape is TAPE[T].
2960 */
2961 static void
mergeonerun(Tuplesortstate * state)2962 mergeonerun(Tuplesortstate *state)
2963 {
2964 int destTape = state->tp_tapenum[state->tapeRange];
2965 int srcTape;
2966
2967 /*
2968 * Start the merge by loading one tuple from each active source tape into
2969 * the heap. We can also decrease the input run/dummy run counts.
2970 */
2971 beginmerge(state);
2972
2973 /*
2974 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2975 * out, and replacing it with next tuple from same tape (if there is
2976 * another one).
2977 */
2978 while (state->memtupcount > 0)
2979 {
2980 SortTuple stup;
2981
2982 /* write the tuple to destTape */
2983 srcTape = state->memtuples[0].srctape;
2984 WRITETUP(state, destTape, &state->memtuples[0]);
2985
2986 /* recycle the slot of the tuple we just wrote out, for the next read */
2987 if (state->memtuples[0].tuple)
2988 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2989
2990 /*
2991 * pull next tuple from the tape, and replace the written-out tuple in
2992 * the heap with it.
2993 */
2994 if (mergereadnext(state, srcTape, &stup))
2995 {
2996 stup.srctape = srcTape;
2997 tuplesort_heap_replace_top(state, &stup);
2998 }
2999 else
3000 tuplesort_heap_delete_top(state);
3001 }
3002
3003 /*
3004 * When the heap empties, we're done. Write an end-of-run marker on the
3005 * output tape, and increment its count of real runs.
3006 */
3007 markrunend(state, destTape);
3008 state->tp_runs[state->tapeRange]++;
3009
3010 #ifdef TRACE_SORT
3011 if (trace_sort)
3012 elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
3013 state->activeTapes, pg_rusage_show(&state->ru_start));
3014 #endif
3015 }
3016
3017 /*
3018 * beginmerge - initialize for a merge pass
3019 *
3020 * We decrease the counts of real and dummy runs for each tape, and mark
3021 * which tapes contain active input runs in mergeactive[]. Then, fill the
3022 * merge heap with the first tuple from each active tape.
3023 */
3024 static void
beginmerge(Tuplesortstate * state)3025 beginmerge(Tuplesortstate *state)
3026 {
3027 int activeTapes;
3028 int tapenum;
3029 int srcTape;
3030
3031 /* Heap should be empty here */
3032 Assert(state->memtupcount == 0);
3033
3034 /* Adjust run counts and mark the active tapes */
3035 memset(state->mergeactive, 0,
3036 state->maxTapes * sizeof(*state->mergeactive));
3037 activeTapes = 0;
3038 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
3039 {
3040 if (state->tp_dummy[tapenum] > 0)
3041 state->tp_dummy[tapenum]--;
3042 else
3043 {
3044 Assert(state->tp_runs[tapenum] > 0);
3045 state->tp_runs[tapenum]--;
3046 srcTape = state->tp_tapenum[tapenum];
3047 state->mergeactive[srcTape] = true;
3048 activeTapes++;
3049 }
3050 }
3051 Assert(activeTapes > 0);
3052 state->activeTapes = activeTapes;
3053
3054 /* Load the merge heap with the first tuple from each input tape */
3055 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
3056 {
3057 SortTuple tup;
3058
3059 if (mergereadnext(state, srcTape, &tup))
3060 {
3061 tup.srctape = srcTape;
3062 tuplesort_heap_insert(state, &tup);
3063 }
3064 }
3065 }
3066
3067 /*
3068 * mergereadnext - read next tuple from one merge input tape
3069 *
3070 * Returns false on EOF.
3071 */
3072 static bool
mergereadnext(Tuplesortstate * state,int srcTape,SortTuple * stup)3073 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
3074 {
3075 unsigned int tuplen;
3076
3077 if (!state->mergeactive[srcTape])
3078 return false; /* tape's run is already exhausted */
3079
3080 /* read next tuple, if any */
3081 if ((tuplen = getlen(state, srcTape, true)) == 0)
3082 {
3083 state->mergeactive[srcTape] = false;
3084 return false;
3085 }
3086 READTUP(state, stup, srcTape, tuplen);
3087
3088 return true;
3089 }
3090
3091 /*
3092 * dumptuples - remove tuples from memtuples and write initial run to tape
3093 *
3094 * When alltuples = true, dump everything currently in memory. (This case is
3095 * only used at end of input data.)
3096 */
3097 static void
dumptuples(Tuplesortstate * state,bool alltuples)3098 dumptuples(Tuplesortstate *state, bool alltuples)
3099 {
3100 int memtupwrite;
3101 int i;
3102
3103 /*
3104 * Nothing to do if we still fit in available memory and have array slots,
3105 * unless this is the final call during initial run generation.
3106 */
3107 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
3108 !alltuples)
3109 return;
3110
3111 /*
3112 * Final call might require no sorting, in rare cases where we just so
3113 * happen to have previously LACKMEM()'d at the point where exactly all
3114 * remaining tuples are loaded into memory, just before input was
3115 * exhausted.
3116 *
3117 * In general, short final runs are quite possible. Rather than allowing
3118 * a special case where there was a superfluous selectnewtape() call (i.e.
3119 * a call with no subsequent run actually written to destTape), we prefer
3120 * to write out a 0 tuple run.
3121 *
3122 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
3123 * the tape inactive for the merge when called from beginmerge(). This
3124 * case is therefore similar to the case where mergeonerun() finds a dummy
3125 * run for the tape, and so doesn't need to merge a run from the tape (or
3126 * conceptually "merges" the dummy run, if you prefer). According to
3127 * Knuth, Algorithm D "isn't strictly optimal" in its method of
3128 * distribution and dummy run assignment; this edge case seems very
3129 * unlikely to make that appreciably worse.
3130 */
3131 Assert(state->status == TSS_BUILDRUNS);
3132
3133 /*
3134 * It seems unlikely that this limit will ever be exceeded, but take no
3135 * chances
3136 */
3137 if (state->currentRun == INT_MAX)
3138 ereport(ERROR,
3139 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
3140 errmsg("cannot have more than %d runs for an external sort",
3141 INT_MAX)));
3142
3143 state->currentRun++;
3144
3145 #ifdef TRACE_SORT
3146 if (trace_sort)
3147 elog(LOG, "worker %d starting quicksort of run %d: %s",
3148 state->worker, state->currentRun,
3149 pg_rusage_show(&state->ru_start));
3150 #endif
3151
3152 /*
3153 * Sort all tuples accumulated within the allowed amount of memory for
3154 * this run using quicksort
3155 */
3156 tuplesort_sort_memtuples(state);
3157
3158 #ifdef TRACE_SORT
3159 if (trace_sort)
3160 elog(LOG, "worker %d finished quicksort of run %d: %s",
3161 state->worker, state->currentRun,
3162 pg_rusage_show(&state->ru_start));
3163 #endif
3164
3165 memtupwrite = state->memtupcount;
3166 for (i = 0; i < memtupwrite; i++)
3167 {
3168 WRITETUP(state, state->tp_tapenum[state->destTape],
3169 &state->memtuples[i]);
3170 state->memtupcount--;
3171 }
3172
3173 /*
3174 * Reset tuple memory. We've freed all of the tuples that we previously
3175 * allocated. It's important to avoid fragmentation when there is a stark
3176 * change in the sizes of incoming tuples. Fragmentation due to
3177 * AllocSetFree's bucketing by size class might be particularly bad if
3178 * this step wasn't taken.
3179 */
3180 MemoryContextReset(state->tuplecontext);
3181
3182 markrunend(state, state->tp_tapenum[state->destTape]);
3183 state->tp_runs[state->destTape]++;
3184 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3185
3186 #ifdef TRACE_SORT
3187 if (trace_sort)
3188 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3189 state->worker, state->currentRun, state->destTape,
3190 pg_rusage_show(&state->ru_start));
3191 #endif
3192
3193 if (!alltuples)
3194 selectnewtape(state);
3195 }
3196
3197 /*
3198 * tuplesort_rescan - rewind and replay the scan
3199 */
3200 void
tuplesort_rescan(Tuplesortstate * state)3201 tuplesort_rescan(Tuplesortstate *state)
3202 {
3203 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3204
3205 Assert(state->randomAccess);
3206
3207 switch (state->status)
3208 {
3209 case TSS_SORTEDINMEM:
3210 state->current = 0;
3211 state->eof_reached = false;
3212 state->markpos_offset = 0;
3213 state->markpos_eof = false;
3214 break;
3215 case TSS_SORTEDONTAPE:
3216 LogicalTapeRewindForRead(state->tapeset,
3217 state->result_tape,
3218 0);
3219 state->eof_reached = false;
3220 state->markpos_block = 0L;
3221 state->markpos_offset = 0;
3222 state->markpos_eof = false;
3223 break;
3224 default:
3225 elog(ERROR, "invalid tuplesort state");
3226 break;
3227 }
3228
3229 MemoryContextSwitchTo(oldcontext);
3230 }
3231
3232 /*
3233 * tuplesort_markpos - saves current position in the merged sort file
3234 */
3235 void
tuplesort_markpos(Tuplesortstate * state)3236 tuplesort_markpos(Tuplesortstate *state)
3237 {
3238 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3239
3240 Assert(state->randomAccess);
3241
3242 switch (state->status)
3243 {
3244 case TSS_SORTEDINMEM:
3245 state->markpos_offset = state->current;
3246 state->markpos_eof = state->eof_reached;
3247 break;
3248 case TSS_SORTEDONTAPE:
3249 LogicalTapeTell(state->tapeset,
3250 state->result_tape,
3251 &state->markpos_block,
3252 &state->markpos_offset);
3253 state->markpos_eof = state->eof_reached;
3254 break;
3255 default:
3256 elog(ERROR, "invalid tuplesort state");
3257 break;
3258 }
3259
3260 MemoryContextSwitchTo(oldcontext);
3261 }
3262
3263 /*
3264 * tuplesort_restorepos - restores current position in merged sort file to
3265 * last saved position
3266 */
3267 void
tuplesort_restorepos(Tuplesortstate * state)3268 tuplesort_restorepos(Tuplesortstate *state)
3269 {
3270 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3271
3272 Assert(state->randomAccess);
3273
3274 switch (state->status)
3275 {
3276 case TSS_SORTEDINMEM:
3277 state->current = state->markpos_offset;
3278 state->eof_reached = state->markpos_eof;
3279 break;
3280 case TSS_SORTEDONTAPE:
3281 LogicalTapeSeek(state->tapeset,
3282 state->result_tape,
3283 state->markpos_block,
3284 state->markpos_offset);
3285 state->eof_reached = state->markpos_eof;
3286 break;
3287 default:
3288 elog(ERROR, "invalid tuplesort state");
3289 break;
3290 }
3291
3292 MemoryContextSwitchTo(oldcontext);
3293 }
3294
3295 /*
3296 * tuplesort_get_stats - extract summary statistics
3297 *
3298 * This can be called after tuplesort_performsort() finishes to obtain
3299 * printable summary information about how the sort was performed.
3300 */
3301 void
tuplesort_get_stats(Tuplesortstate * state,TuplesortInstrumentation * stats)3302 tuplesort_get_stats(Tuplesortstate *state,
3303 TuplesortInstrumentation *stats)
3304 {
3305 /*
3306 * Note: it might seem we should provide both memory and disk usage for a
3307 * disk-based sort. However, the current code doesn't track memory space
3308 * accurately once we have begun to return tuples to the caller (since we
3309 * don't account for pfree's the caller is expected to do), so we cannot
3310 * rely on availMem in a disk sort. This does not seem worth the overhead
3311 * to fix. Is it worth creating an API for the memory context code to
3312 * tell us how much is actually used in sortcontext?
3313 */
3314 tuplesort_updatemax(state);
3315
3316 if (state->isMaxSpaceDisk)
3317 stats->spaceType = SORT_SPACE_TYPE_DISK;
3318 else
3319 stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3320 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
3321
3322 switch (state->maxSpaceStatus)
3323 {
3324 case TSS_SORTEDINMEM:
3325 if (state->boundUsed)
3326 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3327 else
3328 stats->sortMethod = SORT_TYPE_QUICKSORT;
3329 break;
3330 case TSS_SORTEDONTAPE:
3331 stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3332 break;
3333 case TSS_FINALMERGE:
3334 stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3335 break;
3336 default:
3337 stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3338 break;
3339 }
3340 }
3341
3342 /*
3343 * Convert TuplesortMethod to a string.
3344 */
3345 const char *
tuplesort_method_name(TuplesortMethod m)3346 tuplesort_method_name(TuplesortMethod m)
3347 {
3348 switch (m)
3349 {
3350 case SORT_TYPE_STILL_IN_PROGRESS:
3351 return "still in progress";
3352 case SORT_TYPE_TOP_N_HEAPSORT:
3353 return "top-N heapsort";
3354 case SORT_TYPE_QUICKSORT:
3355 return "quicksort";
3356 case SORT_TYPE_EXTERNAL_SORT:
3357 return "external sort";
3358 case SORT_TYPE_EXTERNAL_MERGE:
3359 return "external merge";
3360 }
3361
3362 return "unknown";
3363 }
3364
3365 /*
3366 * Convert TuplesortSpaceType to a string.
3367 */
3368 const char *
tuplesort_space_type_name(TuplesortSpaceType t)3369 tuplesort_space_type_name(TuplesortSpaceType t)
3370 {
3371 Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3372 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3373 }
3374
3375
3376 /*
3377 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3378 */
3379
3380 /*
3381 * Convert the existing unordered array of SortTuples to a bounded heap,
3382 * discarding all but the smallest "state->bound" tuples.
3383 *
3384 * When working with a bounded heap, we want to keep the largest entry
3385 * at the root (array entry zero), instead of the smallest as in the normal
3386 * sort case. This allows us to discard the largest entry cheaply.
3387 * Therefore, we temporarily reverse the sort direction.
3388 */
3389 static void
make_bounded_heap(Tuplesortstate * state)3390 make_bounded_heap(Tuplesortstate *state)
3391 {
3392 int tupcount = state->memtupcount;
3393 int i;
3394
3395 Assert(state->status == TSS_INITIAL);
3396 Assert(state->bounded);
3397 Assert(tupcount >= state->bound);
3398 Assert(SERIAL(state));
3399
3400 /* Reverse sort direction so largest entry will be at root */
3401 reversedirection(state);
3402
3403 state->memtupcount = 0; /* make the heap empty */
3404 for (i = 0; i < tupcount; i++)
3405 {
3406 if (state->memtupcount < state->bound)
3407 {
3408 /* Insert next tuple into heap */
3409 /* Must copy source tuple to avoid possible overwrite */
3410 SortTuple stup = state->memtuples[i];
3411
3412 tuplesort_heap_insert(state, &stup);
3413 }
3414 else
3415 {
3416 /*
3417 * The heap is full. Replace the largest entry with the new
3418 * tuple, or just discard it, if it's larger than anything already
3419 * in the heap.
3420 */
3421 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3422 {
3423 free_sort_tuple(state, &state->memtuples[i]);
3424 CHECK_FOR_INTERRUPTS();
3425 }
3426 else
3427 tuplesort_heap_replace_top(state, &state->memtuples[i]);
3428 }
3429 }
3430
3431 Assert(state->memtupcount == state->bound);
3432 state->status = TSS_BOUNDED;
3433 }
3434
3435 /*
3436 * Convert the bounded heap to a properly-sorted array
3437 */
3438 static void
sort_bounded_heap(Tuplesortstate * state)3439 sort_bounded_heap(Tuplesortstate *state)
3440 {
3441 int tupcount = state->memtupcount;
3442
3443 Assert(state->status == TSS_BOUNDED);
3444 Assert(state->bounded);
3445 Assert(tupcount == state->bound);
3446 Assert(SERIAL(state));
3447
3448 /*
3449 * We can unheapify in place because each delete-top call will remove the
3450 * largest entry, which we can promptly store in the newly freed slot at
3451 * the end. Once we're down to a single-entry heap, we're done.
3452 */
3453 while (state->memtupcount > 1)
3454 {
3455 SortTuple stup = state->memtuples[0];
3456
3457 /* this sifts-up the next-largest entry and decreases memtupcount */
3458 tuplesort_heap_delete_top(state);
3459 state->memtuples[state->memtupcount] = stup;
3460 }
3461 state->memtupcount = tupcount;
3462
3463 /*
3464 * Reverse sort direction back to the original state. This is not
3465 * actually necessary but seems like a good idea for tidiness.
3466 */
3467 reversedirection(state);
3468
3469 state->status = TSS_SORTEDINMEM;
3470 state->boundUsed = true;
3471 }
3472
3473 /*
3474 * Sort all memtuples using specialized qsort() routines.
3475 *
3476 * Quicksort is used for small in-memory sorts, and external sort runs.
3477 */
3478 static void
tuplesort_sort_memtuples(Tuplesortstate * state)3479 tuplesort_sort_memtuples(Tuplesortstate *state)
3480 {
3481 Assert(!LEADER(state));
3482
3483 if (state->memtupcount > 1)
3484 {
3485 /* Can we use the single-key sort function? */
3486 if (state->onlyKey != NULL)
3487 qsort_ssup(state->memtuples, state->memtupcount,
3488 state->onlyKey);
3489 else
3490 qsort_tuple(state->memtuples,
3491 state->memtupcount,
3492 state->comparetup,
3493 state);
3494 }
3495 }
3496
3497 /*
3498 * Insert a new tuple into an empty or existing heap, maintaining the
3499 * heap invariant. Caller is responsible for ensuring there's room.
3500 *
3501 * Note: For some callers, tuple points to a memtuples[] entry above the
3502 * end of the heap. This is safe as long as it's not immediately adjacent
3503 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3504 * is, it might get overwritten before being moved into the heap!
3505 */
3506 static void
tuplesort_heap_insert(Tuplesortstate * state,SortTuple * tuple)3507 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3508 {
3509 SortTuple *memtuples;
3510 int j;
3511
3512 memtuples = state->memtuples;
3513 Assert(state->memtupcount < state->memtupsize);
3514
3515 CHECK_FOR_INTERRUPTS();
3516
3517 /*
3518 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3519 * using 1-based array indexes, not 0-based.
3520 */
3521 j = state->memtupcount++;
3522 while (j > 0)
3523 {
3524 int i = (j - 1) >> 1;
3525
3526 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3527 break;
3528 memtuples[j] = memtuples[i];
3529 j = i;
3530 }
3531 memtuples[j] = *tuple;
3532 }
3533
3534 /*
3535 * Remove the tuple at state->memtuples[0] from the heap. Decrement
3536 * memtupcount, and sift up to maintain the heap invariant.
3537 *
3538 * The caller has already free'd the tuple the top node points to,
3539 * if necessary.
3540 */
3541 static void
tuplesort_heap_delete_top(Tuplesortstate * state)3542 tuplesort_heap_delete_top(Tuplesortstate *state)
3543 {
3544 SortTuple *memtuples = state->memtuples;
3545 SortTuple *tuple;
3546
3547 if (--state->memtupcount <= 0)
3548 return;
3549
3550 /*
3551 * Remove the last tuple in the heap, and re-insert it, by replacing the
3552 * current top node with it.
3553 */
3554 tuple = &memtuples[state->memtupcount];
3555 tuplesort_heap_replace_top(state, tuple);
3556 }
3557
3558 /*
3559 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3560 * maintain the heap invariant.
3561 *
3562 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3563 * Heapsort, steps H3-H8).
3564 */
3565 static void
tuplesort_heap_replace_top(Tuplesortstate * state,SortTuple * tuple)3566 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3567 {
3568 SortTuple *memtuples = state->memtuples;
3569 unsigned int i,
3570 n;
3571
3572 Assert(state->memtupcount >= 1);
3573
3574 CHECK_FOR_INTERRUPTS();
3575
3576 /*
3577 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3578 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3579 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3580 */
3581 n = state->memtupcount;
3582 i = 0; /* i is where the "hole" is */
3583 for (;;)
3584 {
3585 unsigned int j = 2 * i + 1;
3586
3587 if (j >= n)
3588 break;
3589 if (j + 1 < n &&
3590 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3591 j++;
3592 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3593 break;
3594 memtuples[i] = memtuples[j];
3595 i = j;
3596 }
3597 memtuples[i] = *tuple;
3598 }
3599
3600 /*
3601 * Function to reverse the sort direction from its current state
3602 *
3603 * It is not safe to call this when performing hash tuplesorts
3604 */
3605 static void
reversedirection(Tuplesortstate * state)3606 reversedirection(Tuplesortstate *state)
3607 {
3608 SortSupport sortKey = state->sortKeys;
3609 int nkey;
3610
3611 for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3612 {
3613 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3614 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3615 }
3616 }
3617
3618
3619 /*
3620 * Tape interface routines
3621 */
3622
3623 static unsigned int
getlen(Tuplesortstate * state,int tapenum,bool eofOK)3624 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3625 {
3626 unsigned int len;
3627
3628 if (LogicalTapeRead(state->tapeset, tapenum,
3629 &len, sizeof(len)) != sizeof(len))
3630 elog(ERROR, "unexpected end of tape");
3631 if (len == 0 && !eofOK)
3632 elog(ERROR, "unexpected end of data");
3633 return len;
3634 }
3635
3636 static void
markrunend(Tuplesortstate * state,int tapenum)3637 markrunend(Tuplesortstate *state, int tapenum)
3638 {
3639 unsigned int len = 0;
3640
3641 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3642 }
3643
3644 /*
3645 * Get memory for tuple from within READTUP() routine.
3646 *
3647 * We use next free slot from the slab allocator, or palloc() if the tuple
3648 * is too large for that.
3649 */
3650 static void *
readtup_alloc(Tuplesortstate * state,Size tuplen)3651 readtup_alloc(Tuplesortstate *state, Size tuplen)
3652 {
3653 SlabSlot *buf;
3654
3655 /*
3656 * We pre-allocate enough slots in the slab arena that we should never run
3657 * out.
3658 */
3659 Assert(state->slabFreeHead);
3660
3661 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3662 return MemoryContextAlloc(state->sortcontext, tuplen);
3663 else
3664 {
3665 buf = state->slabFreeHead;
3666 /* Reuse this slot */
3667 state->slabFreeHead = buf->nextfree;
3668
3669 return buf;
3670 }
3671 }
3672
3673
3674 /*
3675 * Routines specialized for HeapTuple (actually MinimalTuple) case
3676 */
3677
3678 static int
comparetup_heap(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3679 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3680 {
3681 SortSupport sortKey = state->sortKeys;
3682 HeapTupleData ltup;
3683 HeapTupleData rtup;
3684 TupleDesc tupDesc;
3685 int nkey;
3686 int32 compare;
3687 AttrNumber attno;
3688 Datum datum1,
3689 datum2;
3690 bool isnull1,
3691 isnull2;
3692
3693
3694 /* Compare the leading sort key */
3695 compare = ApplySortComparator(a->datum1, a->isnull1,
3696 b->datum1, b->isnull1,
3697 sortKey);
3698 if (compare != 0)
3699 return compare;
3700
3701 /* Compare additional sort keys */
3702 ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3703 ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3704 rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3705 rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3706 tupDesc = state->tupDesc;
3707
3708 if (sortKey->abbrev_converter)
3709 {
3710 attno = sortKey->ssup_attno;
3711
3712 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
3713 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3714
3715 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3716 datum2, isnull2,
3717 sortKey);
3718 if (compare != 0)
3719 return compare;
3720 }
3721
3722 sortKey++;
3723 for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3724 {
3725 attno = sortKey->ssup_attno;
3726
3727 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
3728 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3729
3730 compare = ApplySortComparator(datum1, isnull1,
3731 datum2, isnull2,
3732 sortKey);
3733 if (compare != 0)
3734 return compare;
3735 }
3736
3737 return 0;
3738 }
3739
3740 static void
copytup_heap(Tuplesortstate * state,SortTuple * stup,void * tup)3741 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3742 {
3743 /*
3744 * We expect the passed "tup" to be a TupleTableSlot, and form a
3745 * MinimalTuple using the exported interface for that.
3746 */
3747 TupleTableSlot *slot = (TupleTableSlot *) tup;
3748 Datum original;
3749 MinimalTuple tuple;
3750 HeapTupleData htup;
3751 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3752
3753 /* copy the tuple into sort storage */
3754 tuple = ExecCopySlotMinimalTuple(slot);
3755 stup->tuple = (void *) tuple;
3756 USEMEM(state, GetMemoryChunkSpace(tuple));
3757 /* set up first-column key value */
3758 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3759 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3760 original = heap_getattr(&htup,
3761 state->sortKeys[0].ssup_attno,
3762 state->tupDesc,
3763 &stup->isnull1);
3764
3765 MemoryContextSwitchTo(oldcontext);
3766
3767 if (!state->sortKeys->abbrev_converter || stup->isnull1)
3768 {
3769 /*
3770 * Store ordinary Datum representation, or NULL value. If there is a
3771 * converter it won't expect NULL values, and cost model is not
3772 * required to account for NULL, so in that case we avoid calling
3773 * converter and just set datum1 to zeroed representation (to be
3774 * consistent, and to support cheap inequality tests for NULL
3775 * abbreviated keys).
3776 */
3777 stup->datum1 = original;
3778 }
3779 else if (!consider_abort_common(state))
3780 {
3781 /* Store abbreviated key representation */
3782 stup->datum1 = state->sortKeys->abbrev_converter(original,
3783 state->sortKeys);
3784 }
3785 else
3786 {
3787 /* Abort abbreviation */
3788 int i;
3789
3790 stup->datum1 = original;
3791
3792 /*
3793 * Set state to be consistent with never trying abbreviation.
3794 *
3795 * Alter datum1 representation in already-copied tuples, so as to
3796 * ensure a consistent representation (current tuple was just
3797 * handled). It does not matter if some dumped tuples are already
3798 * sorted on tape, since serialized tuples lack abbreviated keys
3799 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3800 */
3801 for (i = 0; i < state->memtupcount; i++)
3802 {
3803 SortTuple *mtup = &state->memtuples[i];
3804
3805 htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3806 MINIMAL_TUPLE_OFFSET;
3807 htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3808 MINIMAL_TUPLE_OFFSET);
3809
3810 mtup->datum1 = heap_getattr(&htup,
3811 state->sortKeys[0].ssup_attno,
3812 state->tupDesc,
3813 &mtup->isnull1);
3814 }
3815 }
3816 }
3817
3818 static void
writetup_heap(Tuplesortstate * state,int tapenum,SortTuple * stup)3819 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3820 {
3821 MinimalTuple tuple = (MinimalTuple) stup->tuple;
3822
3823 /* the part of the MinimalTuple we'll write: */
3824 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3825 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3826
3827 /* total on-disk footprint: */
3828 unsigned int tuplen = tupbodylen + sizeof(int);
3829
3830 LogicalTapeWrite(state->tapeset, tapenum,
3831 (void *) &tuplen, sizeof(tuplen));
3832 LogicalTapeWrite(state->tapeset, tapenum,
3833 (void *) tupbody, tupbodylen);
3834 if (state->randomAccess) /* need trailing length word? */
3835 LogicalTapeWrite(state->tapeset, tapenum,
3836 (void *) &tuplen, sizeof(tuplen));
3837
3838 if (!state->slabAllocatorUsed)
3839 {
3840 FREEMEM(state, GetMemoryChunkSpace(tuple));
3841 heap_free_minimal_tuple(tuple);
3842 }
3843 }
3844
3845 static void
readtup_heap(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)3846 readtup_heap(Tuplesortstate *state, SortTuple *stup,
3847 int tapenum, unsigned int len)
3848 {
3849 unsigned int tupbodylen = len - sizeof(int);
3850 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3851 MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3852 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3853 HeapTupleData htup;
3854
3855 /* read in the tuple proper */
3856 tuple->t_len = tuplen;
3857 LogicalTapeReadExact(state->tapeset, tapenum,
3858 tupbody, tupbodylen);
3859 if (state->randomAccess) /* need trailing length word? */
3860 LogicalTapeReadExact(state->tapeset, tapenum,
3861 &tuplen, sizeof(tuplen));
3862 stup->tuple = (void *) tuple;
3863 /* set up first-column key value */
3864 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3865 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3866 stup->datum1 = heap_getattr(&htup,
3867 state->sortKeys[0].ssup_attno,
3868 state->tupDesc,
3869 &stup->isnull1);
3870 }
3871
3872 /*
3873 * Routines specialized for the CLUSTER case (HeapTuple data, with
3874 * comparisons per a btree index definition)
3875 */
3876
3877 static int
comparetup_cluster(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3878 comparetup_cluster(const SortTuple *a, const SortTuple *b,
3879 Tuplesortstate *state)
3880 {
3881 SortSupport sortKey = state->sortKeys;
3882 HeapTuple ltup;
3883 HeapTuple rtup;
3884 TupleDesc tupDesc;
3885 int nkey;
3886 int32 compare;
3887 Datum datum1,
3888 datum2;
3889 bool isnull1,
3890 isnull2;
3891 AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0];
3892
3893 /* Be prepared to compare additional sort keys */
3894 ltup = (HeapTuple) a->tuple;
3895 rtup = (HeapTuple) b->tuple;
3896 tupDesc = state->tupDesc;
3897
3898 /* Compare the leading sort key, if it's simple */
3899 if (leading != 0)
3900 {
3901 compare = ApplySortComparator(a->datum1, a->isnull1,
3902 b->datum1, b->isnull1,
3903 sortKey);
3904 if (compare != 0)
3905 return compare;
3906
3907 if (sortKey->abbrev_converter)
3908 {
3909 datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3910 datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3911
3912 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3913 datum2, isnull2,
3914 sortKey);
3915 }
3916 if (compare != 0 || state->nKeys == 1)
3917 return compare;
3918 /* Compare additional columns the hard way */
3919 sortKey++;
3920 nkey = 1;
3921 }
3922 else
3923 {
3924 /* Must compare all keys the hard way */
3925 nkey = 0;
3926 }
3927
3928 if (state->indexInfo->ii_Expressions == NULL)
3929 {
3930 /* If not expression index, just compare the proper heap attrs */
3931
3932 for (; nkey < state->nKeys; nkey++, sortKey++)
3933 {
3934 AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
3935
3936 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
3937 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
3938
3939 compare = ApplySortComparator(datum1, isnull1,
3940 datum2, isnull2,
3941 sortKey);
3942 if (compare != 0)
3943 return compare;
3944 }
3945 }
3946 else
3947 {
3948 /*
3949 * In the expression index case, compute the whole index tuple and
3950 * then compare values. It would perhaps be faster to compute only as
3951 * many columns as we need to compare, but that would require
3952 * duplicating all the logic in FormIndexDatum.
3953 */
3954 Datum l_index_values[INDEX_MAX_KEYS];
3955 bool l_index_isnull[INDEX_MAX_KEYS];
3956 Datum r_index_values[INDEX_MAX_KEYS];
3957 bool r_index_isnull[INDEX_MAX_KEYS];
3958 TupleTableSlot *ecxt_scantuple;
3959
3960 /* Reset context each time to prevent memory leakage */
3961 ResetPerTupleExprContext(state->estate);
3962
3963 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
3964
3965 ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
3966 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3967 l_index_values, l_index_isnull);
3968
3969 ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
3970 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3971 r_index_values, r_index_isnull);
3972
3973 for (; nkey < state->nKeys; nkey++, sortKey++)
3974 {
3975 compare = ApplySortComparator(l_index_values[nkey],
3976 l_index_isnull[nkey],
3977 r_index_values[nkey],
3978 r_index_isnull[nkey],
3979 sortKey);
3980 if (compare != 0)
3981 return compare;
3982 }
3983 }
3984
3985 return 0;
3986 }
3987
3988 static void
copytup_cluster(Tuplesortstate * state,SortTuple * stup,void * tup)3989 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3990 {
3991 HeapTuple tuple = (HeapTuple) tup;
3992 Datum original;
3993 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3994
3995 /* copy the tuple into sort storage */
3996 tuple = heap_copytuple(tuple);
3997 stup->tuple = (void *) tuple;
3998 USEMEM(state, GetMemoryChunkSpace(tuple));
3999
4000 MemoryContextSwitchTo(oldcontext);
4001
4002 /*
4003 * set up first-column key value, and potentially abbreviate, if it's a
4004 * simple column
4005 */
4006 if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
4007 return;
4008
4009 original = heap_getattr(tuple,
4010 state->indexInfo->ii_IndexAttrNumbers[0],
4011 state->tupDesc,
4012 &stup->isnull1);
4013
4014 if (!state->sortKeys->abbrev_converter || stup->isnull1)
4015 {
4016 /*
4017 * Store ordinary Datum representation, or NULL value. If there is a
4018 * converter it won't expect NULL values, and cost model is not
4019 * required to account for NULL, so in that case we avoid calling
4020 * converter and just set datum1 to zeroed representation (to be
4021 * consistent, and to support cheap inequality tests for NULL
4022 * abbreviated keys).
4023 */
4024 stup->datum1 = original;
4025 }
4026 else if (!consider_abort_common(state))
4027 {
4028 /* Store abbreviated key representation */
4029 stup->datum1 = state->sortKeys->abbrev_converter(original,
4030 state->sortKeys);
4031 }
4032 else
4033 {
4034 /* Abort abbreviation */
4035 int i;
4036
4037 stup->datum1 = original;
4038
4039 /*
4040 * Set state to be consistent with never trying abbreviation.
4041 *
4042 * Alter datum1 representation in already-copied tuples, so as to
4043 * ensure a consistent representation (current tuple was just
4044 * handled). It does not matter if some dumped tuples are already
4045 * sorted on tape, since serialized tuples lack abbreviated keys
4046 * (TSS_BUILDRUNS state prevents control reaching here in any case).
4047 */
4048 for (i = 0; i < state->memtupcount; i++)
4049 {
4050 SortTuple *mtup = &state->memtuples[i];
4051
4052 tuple = (HeapTuple) mtup->tuple;
4053 mtup->datum1 = heap_getattr(tuple,
4054 state->indexInfo->ii_IndexAttrNumbers[0],
4055 state->tupDesc,
4056 &mtup->isnull1);
4057 }
4058 }
4059 }
4060
4061 static void
writetup_cluster(Tuplesortstate * state,int tapenum,SortTuple * stup)4062 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
4063 {
4064 HeapTuple tuple = (HeapTuple) stup->tuple;
4065 unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
4066
4067 /* We need to store t_self, but not other fields of HeapTupleData */
4068 LogicalTapeWrite(state->tapeset, tapenum,
4069 &tuplen, sizeof(tuplen));
4070 LogicalTapeWrite(state->tapeset, tapenum,
4071 &tuple->t_self, sizeof(ItemPointerData));
4072 LogicalTapeWrite(state->tapeset, tapenum,
4073 tuple->t_data, tuple->t_len);
4074 if (state->randomAccess) /* need trailing length word? */
4075 LogicalTapeWrite(state->tapeset, tapenum,
4076 &tuplen, sizeof(tuplen));
4077
4078 if (!state->slabAllocatorUsed)
4079 {
4080 FREEMEM(state, GetMemoryChunkSpace(tuple));
4081 heap_freetuple(tuple);
4082 }
4083 }
4084
4085 static void
readtup_cluster(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int tuplen)4086 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
4087 int tapenum, unsigned int tuplen)
4088 {
4089 unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
4090 HeapTuple tuple = (HeapTuple) readtup_alloc(state,
4091 t_len + HEAPTUPLESIZE);
4092
4093 /* Reconstruct the HeapTupleData header */
4094 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
4095 tuple->t_len = t_len;
4096 LogicalTapeReadExact(state->tapeset, tapenum,
4097 &tuple->t_self, sizeof(ItemPointerData));
4098 /* We don't currently bother to reconstruct t_tableOid */
4099 tuple->t_tableOid = InvalidOid;
4100 /* Read in the tuple body */
4101 LogicalTapeReadExact(state->tapeset, tapenum,
4102 tuple->t_data, tuple->t_len);
4103 if (state->randomAccess) /* need trailing length word? */
4104 LogicalTapeReadExact(state->tapeset, tapenum,
4105 &tuplen, sizeof(tuplen));
4106 stup->tuple = (void *) tuple;
4107 /* set up first-column key value, if it's a simple column */
4108 if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
4109 stup->datum1 = heap_getattr(tuple,
4110 state->indexInfo->ii_IndexAttrNumbers[0],
4111 state->tupDesc,
4112 &stup->isnull1);
4113 }
4114
4115 /*
4116 * Routines specialized for IndexTuple case
4117 *
4118 * The btree and hash cases require separate comparison functions, but the
4119 * IndexTuple representation is the same so the copy/write/read support
4120 * functions can be shared.
4121 */
4122
4123 static int
comparetup_index_btree(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4124 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
4125 Tuplesortstate *state)
4126 {
4127 /*
4128 * This is similar to comparetup_heap(), but expects index tuples. There
4129 * is also special handling for enforcing uniqueness, and special
4130 * treatment for equal keys at the end.
4131 */
4132 SortSupport sortKey = state->sortKeys;
4133 IndexTuple tuple1;
4134 IndexTuple tuple2;
4135 int keysz;
4136 TupleDesc tupDes;
4137 bool equal_hasnull = false;
4138 int nkey;
4139 int32 compare;
4140 Datum datum1,
4141 datum2;
4142 bool isnull1,
4143 isnull2;
4144
4145
4146 /* Compare the leading sort key */
4147 compare = ApplySortComparator(a->datum1, a->isnull1,
4148 b->datum1, b->isnull1,
4149 sortKey);
4150 if (compare != 0)
4151 return compare;
4152
4153 /* Compare additional sort keys */
4154 tuple1 = (IndexTuple) a->tuple;
4155 tuple2 = (IndexTuple) b->tuple;
4156 keysz = state->nKeys;
4157 tupDes = RelationGetDescr(state->indexRel);
4158
4159 if (sortKey->abbrev_converter)
4160 {
4161 datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
4162 datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
4163
4164 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
4165 datum2, isnull2,
4166 sortKey);
4167 if (compare != 0)
4168 return compare;
4169 }
4170
4171 /* they are equal, so we only need to examine one null flag */
4172 if (a->isnull1)
4173 equal_hasnull = true;
4174
4175 sortKey++;
4176 for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4177 {
4178 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4179 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4180
4181 compare = ApplySortComparator(datum1, isnull1,
4182 datum2, isnull2,
4183 sortKey);
4184 if (compare != 0)
4185 return compare; /* done when we find unequal attributes */
4186
4187 /* they are equal, so we only need to examine one null flag */
4188 if (isnull1)
4189 equal_hasnull = true;
4190 }
4191
4192 /*
4193 * If btree has asked us to enforce uniqueness, complain if two equal
4194 * tuples are detected (unless there was at least one NULL field).
4195 *
4196 * It is sufficient to make the test here, because if two tuples are equal
4197 * they *must* get compared at some stage of the sort --- otherwise the
4198 * sort algorithm wouldn't have checked whether one must appear before the
4199 * other.
4200 */
4201 if (state->enforceUnique && !equal_hasnull)
4202 {
4203 Datum values[INDEX_MAX_KEYS];
4204 bool isnull[INDEX_MAX_KEYS];
4205 char *key_desc;
4206
4207 /*
4208 * Some rather brain-dead implementations of qsort (such as the one in
4209 * QNX 4) will sometimes call the comparison routine to compare a
4210 * value to itself, but we always use our own implementation, which
4211 * does not.
4212 */
4213 Assert(tuple1 != tuple2);
4214
4215 index_deform_tuple(tuple1, tupDes, values, isnull);
4216
4217 key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4218
4219 ereport(ERROR,
4220 (errcode(ERRCODE_UNIQUE_VIOLATION),
4221 errmsg("could not create unique index \"%s\"",
4222 RelationGetRelationName(state->indexRel)),
4223 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4224 errdetail("Duplicate keys exist."),
4225 errtableconstraint(state->heapRel,
4226 RelationGetRelationName(state->indexRel))));
4227 }
4228
4229 /*
4230 * If key values are equal, we sort on ItemPointer. This is required for
4231 * btree indexes, since heap TID is treated as an implicit last key
4232 * attribute in order to ensure that all keys in the index are physically
4233 * unique.
4234 */
4235 {
4236 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4237 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4238
4239 if (blk1 != blk2)
4240 return (blk1 < blk2) ? -1 : 1;
4241 }
4242 {
4243 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4244 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4245
4246 if (pos1 != pos2)
4247 return (pos1 < pos2) ? -1 : 1;
4248 }
4249
4250 /* ItemPointer values should never be equal */
4251 Assert(false);
4252
4253 return 0;
4254 }
4255
4256 static int
comparetup_index_hash(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4257 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4258 Tuplesortstate *state)
4259 {
4260 Bucket bucket1;
4261 Bucket bucket2;
4262 IndexTuple tuple1;
4263 IndexTuple tuple2;
4264
4265 /*
4266 * Fetch hash keys and mask off bits we don't want to sort by. We know
4267 * that the first column of the index tuple is the hash key.
4268 */
4269 Assert(!a->isnull1);
4270 bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4271 state->max_buckets, state->high_mask,
4272 state->low_mask);
4273 Assert(!b->isnull1);
4274 bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4275 state->max_buckets, state->high_mask,
4276 state->low_mask);
4277 if (bucket1 > bucket2)
4278 return 1;
4279 else if (bucket1 < bucket2)
4280 return -1;
4281
4282 /*
4283 * If hash values are equal, we sort on ItemPointer. This does not affect
4284 * validity of the finished index, but it may be useful to have index
4285 * scans in physical order.
4286 */
4287 tuple1 = (IndexTuple) a->tuple;
4288 tuple2 = (IndexTuple) b->tuple;
4289
4290 {
4291 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4292 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4293
4294 if (blk1 != blk2)
4295 return (blk1 < blk2) ? -1 : 1;
4296 }
4297 {
4298 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4299 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4300
4301 if (pos1 != pos2)
4302 return (pos1 < pos2) ? -1 : 1;
4303 }
4304
4305 /* ItemPointer values should never be equal */
4306 Assert(false);
4307
4308 return 0;
4309 }
4310
4311 static void
copytup_index(Tuplesortstate * state,SortTuple * stup,void * tup)4312 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4313 {
4314 /* Not currently needed */
4315 elog(ERROR, "copytup_index() should not be called");
4316 }
4317
4318 static void
writetup_index(Tuplesortstate * state,int tapenum,SortTuple * stup)4319 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4320 {
4321 IndexTuple tuple = (IndexTuple) stup->tuple;
4322 unsigned int tuplen;
4323
4324 tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4325 LogicalTapeWrite(state->tapeset, tapenum,
4326 (void *) &tuplen, sizeof(tuplen));
4327 LogicalTapeWrite(state->tapeset, tapenum,
4328 (void *) tuple, IndexTupleSize(tuple));
4329 if (state->randomAccess) /* need trailing length word? */
4330 LogicalTapeWrite(state->tapeset, tapenum,
4331 (void *) &tuplen, sizeof(tuplen));
4332
4333 if (!state->slabAllocatorUsed)
4334 {
4335 FREEMEM(state, GetMemoryChunkSpace(tuple));
4336 pfree(tuple);
4337 }
4338 }
4339
4340 static void
readtup_index(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4341 readtup_index(Tuplesortstate *state, SortTuple *stup,
4342 int tapenum, unsigned int len)
4343 {
4344 unsigned int tuplen = len - sizeof(unsigned int);
4345 IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
4346
4347 LogicalTapeReadExact(state->tapeset, tapenum,
4348 tuple, tuplen);
4349 if (state->randomAccess) /* need trailing length word? */
4350 LogicalTapeReadExact(state->tapeset, tapenum,
4351 &tuplen, sizeof(tuplen));
4352 stup->tuple = (void *) tuple;
4353 /* set up first-column key value */
4354 stup->datum1 = index_getattr(tuple,
4355 1,
4356 RelationGetDescr(state->indexRel),
4357 &stup->isnull1);
4358 }
4359
4360 /*
4361 * Routines specialized for DatumTuple case
4362 */
4363
4364 static int
comparetup_datum(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4365 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4366 {
4367 int compare;
4368
4369 compare = ApplySortComparator(a->datum1, a->isnull1,
4370 b->datum1, b->isnull1,
4371 state->sortKeys);
4372 if (compare != 0)
4373 return compare;
4374
4375 /* if we have abbreviations, then "tuple" has the original value */
4376
4377 if (state->sortKeys->abbrev_converter)
4378 compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4379 PointerGetDatum(b->tuple), b->isnull1,
4380 state->sortKeys);
4381
4382 return compare;
4383 }
4384
4385 static void
copytup_datum(Tuplesortstate * state,SortTuple * stup,void * tup)4386 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4387 {
4388 /* Not currently needed */
4389 elog(ERROR, "copytup_datum() should not be called");
4390 }
4391
4392 static void
writetup_datum(Tuplesortstate * state,int tapenum,SortTuple * stup)4393 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4394 {
4395 void *waddr;
4396 unsigned int tuplen;
4397 unsigned int writtenlen;
4398
4399 if (stup->isnull1)
4400 {
4401 waddr = NULL;
4402 tuplen = 0;
4403 }
4404 else if (!state->tuples)
4405 {
4406 waddr = &stup->datum1;
4407 tuplen = sizeof(Datum);
4408 }
4409 else
4410 {
4411 waddr = stup->tuple;
4412 tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4413 Assert(tuplen != 0);
4414 }
4415
4416 writtenlen = tuplen + sizeof(unsigned int);
4417
4418 LogicalTapeWrite(state->tapeset, tapenum,
4419 (void *) &writtenlen, sizeof(writtenlen));
4420 LogicalTapeWrite(state->tapeset, tapenum,
4421 waddr, tuplen);
4422 if (state->randomAccess) /* need trailing length word? */
4423 LogicalTapeWrite(state->tapeset, tapenum,
4424 (void *) &writtenlen, sizeof(writtenlen));
4425
4426 if (!state->slabAllocatorUsed && stup->tuple)
4427 {
4428 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4429 pfree(stup->tuple);
4430 }
4431 }
4432
4433 static void
readtup_datum(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4434 readtup_datum(Tuplesortstate *state, SortTuple *stup,
4435 int tapenum, unsigned int len)
4436 {
4437 unsigned int tuplen = len - sizeof(unsigned int);
4438
4439 if (tuplen == 0)
4440 {
4441 /* it's NULL */
4442 stup->datum1 = (Datum) 0;
4443 stup->isnull1 = true;
4444 stup->tuple = NULL;
4445 }
4446 else if (!state->tuples)
4447 {
4448 Assert(tuplen == sizeof(Datum));
4449 LogicalTapeReadExact(state->tapeset, tapenum,
4450 &stup->datum1, tuplen);
4451 stup->isnull1 = false;
4452 stup->tuple = NULL;
4453 }
4454 else
4455 {
4456 void *raddr = readtup_alloc(state, tuplen);
4457
4458 LogicalTapeReadExact(state->tapeset, tapenum,
4459 raddr, tuplen);
4460 stup->datum1 = PointerGetDatum(raddr);
4461 stup->isnull1 = false;
4462 stup->tuple = raddr;
4463 }
4464
4465 if (state->randomAccess) /* need trailing length word? */
4466 LogicalTapeReadExact(state->tapeset, tapenum,
4467 &tuplen, sizeof(tuplen));
4468 }
4469
4470 /*
4471 * Parallel sort routines
4472 */
4473
4474 /*
4475 * tuplesort_estimate_shared - estimate required shared memory allocation
4476 *
4477 * nWorkers is an estimate of the number of workers (it's the number that
4478 * will be requested).
4479 */
4480 Size
tuplesort_estimate_shared(int nWorkers)4481 tuplesort_estimate_shared(int nWorkers)
4482 {
4483 Size tapesSize;
4484
4485 Assert(nWorkers > 0);
4486
4487 /* Make sure that BufFile shared state is MAXALIGN'd */
4488 tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4489 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4490
4491 return tapesSize;
4492 }
4493
4494 /*
4495 * tuplesort_initialize_shared - initialize shared tuplesort state
4496 *
4497 * Must be called from leader process before workers are launched, to
4498 * establish state needed up-front for worker tuplesortstates. nWorkers
4499 * should match the argument passed to tuplesort_estimate_shared().
4500 */
4501 void
tuplesort_initialize_shared(Sharedsort * shared,int nWorkers,dsm_segment * seg)4502 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4503 {
4504 int i;
4505
4506 Assert(nWorkers > 0);
4507
4508 SpinLockInit(&shared->mutex);
4509 shared->currentWorker = 0;
4510 shared->workersFinished = 0;
4511 SharedFileSetInit(&shared->fileset, seg);
4512 shared->nTapes = nWorkers;
4513 for (i = 0; i < nWorkers; i++)
4514 {
4515 shared->tapes[i].firstblocknumber = 0L;
4516 }
4517 }
4518
4519 /*
4520 * tuplesort_attach_shared - attach to shared tuplesort state
4521 *
4522 * Must be called by all worker processes.
4523 */
4524 void
tuplesort_attach_shared(Sharedsort * shared,dsm_segment * seg)4525 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4526 {
4527 /* Attach to SharedFileSet */
4528 SharedFileSetAttach(&shared->fileset, seg);
4529 }
4530
4531 /*
4532 * worker_get_identifier - Assign and return ordinal identifier for worker
4533 *
4534 * The order in which these are assigned is not well defined, and should not
4535 * matter; worker numbers across parallel sort participants need only be
4536 * distinct and gapless. logtape.c requires this.
4537 *
4538 * Note that the identifiers assigned from here have no relation to
4539 * ParallelWorkerNumber number, to avoid making any assumption about
4540 * caller's requirements. However, we do follow the ParallelWorkerNumber
4541 * convention of representing a non-worker with worker number -1. This
4542 * includes the leader, as well as serial Tuplesort processes.
4543 */
4544 static int
worker_get_identifier(Tuplesortstate * state)4545 worker_get_identifier(Tuplesortstate *state)
4546 {
4547 Sharedsort *shared = state->shared;
4548 int worker;
4549
4550 Assert(WORKER(state));
4551
4552 SpinLockAcquire(&shared->mutex);
4553 worker = shared->currentWorker++;
4554 SpinLockRelease(&shared->mutex);
4555
4556 return worker;
4557 }
4558
4559 /*
4560 * worker_freeze_result_tape - freeze worker's result tape for leader
4561 *
4562 * This is called by workers just after the result tape has been determined,
4563 * instead of calling LogicalTapeFreeze() directly. They do so because
4564 * workers require a few additional steps over similar serial
4565 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
4566 * steps are around freeing now unneeded resources, and representing to
4567 * leader that worker's input run is available for its merge.
4568 *
4569 * There should only be one final output run for each worker, which consists
4570 * of all tuples that were originally input into worker.
4571 */
4572 static void
worker_freeze_result_tape(Tuplesortstate * state)4573 worker_freeze_result_tape(Tuplesortstate *state)
4574 {
4575 Sharedsort *shared = state->shared;
4576 TapeShare output;
4577
4578 Assert(WORKER(state));
4579 Assert(state->result_tape != -1);
4580 Assert(state->memtupcount == 0);
4581
4582 /*
4583 * Free most remaining memory, in case caller is sensitive to our holding
4584 * on to it. memtuples may not be a tiny merge heap at this point.
4585 */
4586 pfree(state->memtuples);
4587 /* Be tidy */
4588 state->memtuples = NULL;
4589 state->memtupsize = 0;
4590
4591 /*
4592 * Parallel worker requires result tape metadata, which is to be stored in
4593 * shared memory for leader
4594 */
4595 LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4596
4597 /* Store properties of output tape, and update finished worker count */
4598 SpinLockAcquire(&shared->mutex);
4599 shared->tapes[state->worker] = output;
4600 shared->workersFinished++;
4601 SpinLockRelease(&shared->mutex);
4602 }
4603
4604 /*
4605 * worker_nomergeruns - dump memtuples in worker, without merging
4606 *
4607 * This called as an alternative to mergeruns() with a worker when no
4608 * merging is required.
4609 */
4610 static void
worker_nomergeruns(Tuplesortstate * state)4611 worker_nomergeruns(Tuplesortstate *state)
4612 {
4613 Assert(WORKER(state));
4614 Assert(state->result_tape == -1);
4615
4616 state->result_tape = state->tp_tapenum[state->destTape];
4617 worker_freeze_result_tape(state);
4618 }
4619
4620 /*
4621 * leader_takeover_tapes - create tapeset for leader from worker tapes
4622 *
4623 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
4624 * sorting has occurred in workers, all of which must have already returned
4625 * from tuplesort_performsort().
4626 *
4627 * When this returns, leader process is left in a state that is virtually
4628 * indistinguishable from it having generated runs as a serial external sort
4629 * might have.
4630 */
4631 static void
leader_takeover_tapes(Tuplesortstate * state)4632 leader_takeover_tapes(Tuplesortstate *state)
4633 {
4634 Sharedsort *shared = state->shared;
4635 int nParticipants = state->nParticipants;
4636 int workersFinished;
4637 int j;
4638
4639 Assert(LEADER(state));
4640 Assert(nParticipants >= 1);
4641
4642 SpinLockAcquire(&shared->mutex);
4643 workersFinished = shared->workersFinished;
4644 SpinLockRelease(&shared->mutex);
4645
4646 if (nParticipants != workersFinished)
4647 elog(ERROR, "cannot take over tapes before all workers finish");
4648
4649 /*
4650 * Create the tapeset from worker tapes, including a leader-owned tape at
4651 * the end. Parallel workers are far more expensive than logical tapes,
4652 * so the number of tapes allocated here should never be excessive.
4653 *
4654 * We still have a leader tape, though it's not possible to write to it
4655 * due to restrictions in the shared fileset infrastructure used by
4656 * logtape.c. It will never be written to in practice because
4657 * randomAccess is disallowed for parallel sorts.
4658 */
4659 inittapestate(state, nParticipants + 1);
4660 state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
4661 shared->tapes, &shared->fileset,
4662 state->worker);
4663
4664 /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4665 state->currentRun = nParticipants;
4666
4667 /*
4668 * Initialize variables of Algorithm D to be consistent with runs from
4669 * workers having been generated in the leader.
4670 *
4671 * There will always be exactly 1 run per worker, and exactly one input
4672 * tape per run, because workers always output exactly 1 run, even when
4673 * there were no input tuples for workers to sort.
4674 */
4675 for (j = 0; j < state->maxTapes; j++)
4676 {
4677 /* One real run; no dummy runs for worker tapes */
4678 state->tp_fib[j] = 1;
4679 state->tp_runs[j] = 1;
4680 state->tp_dummy[j] = 0;
4681 state->tp_tapenum[j] = j;
4682 }
4683 /* Leader tape gets one dummy run, and no real runs */
4684 state->tp_fib[state->tapeRange] = 0;
4685 state->tp_runs[state->tapeRange] = 0;
4686 state->tp_dummy[state->tapeRange] = 1;
4687
4688 state->Level = 1;
4689 state->destTape = 0;
4690
4691 state->status = TSS_BUILDRUNS;
4692 }
4693
4694 /*
4695 * Convenience routine to free a tuple previously loaded into sort memory
4696 */
4697 static void
free_sort_tuple(Tuplesortstate * state,SortTuple * stup)4698 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4699 {
4700 if (stup->tuple)
4701 {
4702 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4703 pfree(stup->tuple);
4704 stup->tuple = NULL;
4705 }
4706 }
4707