1 /*-------------------------------------------------------------------------
2 *
3 * tuplesort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module handles sorting of heap tuples, index tuples, or single
7 * Datums (and could easily support other kinds of sortable objects,
8 * if necessary). It works efficiently for both small and large amounts
9 * of data. Small amounts are sorted in-memory using qsort(). Large
10 * amounts are sorted using temporary files and a standard external sort
11 * algorithm.
12 *
13 * See Knuth, volume 3, for more than you want to know about the external
14 * sorting algorithm. Historically, we divided the input into sorted runs
15 * using replacement selection, in the form of a priority tree implemented
16 * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17 * quicksort for run generation. We merge the runs using polyphase merge,
18 * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
19 * implemented by logtape.c, which avoids space wastage by recycling disk
20 * space as soon as each block is read from its "tape".
21 *
22 * The approximate amount of memory allowed for any one sort operation
23 * is specified in kilobytes by the caller (most pass work_mem). Initially,
24 * we absorb tuples and simply store them in an unsorted array as long as
25 * we haven't exceeded workMem. If we reach the end of the input without
26 * exceeding workMem, we sort the array using qsort() and subsequently return
27 * tuples just by scanning the tuple array sequentially. If we do exceed
28 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29 * When tuples are dumped in batch after quicksorting, we begin a new run
30 * with a new output tape (selected per Algorithm D). After the end of the
31 * input is reached, we dump out remaining tuples in memory into a final run,
32 * then merge the runs using Algorithm D.
33 *
34 * When merging runs, we use a heap containing just the frontmost tuple from
35 * each source run; we repeatedly output the smallest tuple and replace it
36 * with the next tuple from its source tape (if any). When the heap empties,
37 * the merge is complete. The basic merge algorithm thus needs very little
38 * memory --- only M tuples for an M-way merge, and M is constrained to a
39 * small number. However, we can still make good use of our full workMem
40 * allocation by pre-reading additional blocks from each source tape. Without
41 * prereading, our access pattern to the temporary file would be very erratic;
42 * on average we'd read one block from each of M source tapes during the same
43 * time that we're writing M blocks to the output tape, so there is no
44 * sequentiality of access at all, defeating the read-ahead methods used by
45 * most Unix kernels. Worse, the output tape gets written into a very random
46 * sequence of blocks of the temp file, ensuring that things will be even
47 * worse when it comes time to read that tape. A straightforward merge pass
48 * thus ends up doing a lot of waiting for disk seeks. We can improve matters
49 * by prereading from each source tape sequentially, loading about workMem/M
50 * bytes from each tape in turn, and making the sequential blocks immediately
51 * available for reuse. This approach helps to localize both read and write
52 * accesses. The pre-reading is handled by logtape.c, we just tell it how
53 * much memory to use for the buffers.
54 *
55 * When the caller requests random access to the sort result, we form
56 * the final sorted run on a logical tape which is then "frozen", so
57 * that we can access it randomly. When the caller does not need random
58 * access, we return from tuplesort_performsort() as soon as we are down
59 * to one run per logical tape. The final merge is then performed
60 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61 * saves one cycle of writing all the data out to disk and reading it in.
62 *
63 * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64 * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65 * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
66 * tape drives are expensive beasts, and in particular that there will always
67 * be many more runs than tape drives. In our implementation a "tape drive"
68 * doesn't cost much more than a few Kb of memory buffers, so we can afford
69 * to have lots of them. In particular, if we can have as many tape drives
70 * as sorted runs, we can eliminate any repeated I/O at all. In the current
71 * code we determine the number of tapes M on the basis of workMem: we want
72 * workMem/M to be large enough that we read a fair amount of data each time
73 * we preread from a tape, so as to maintain the locality of access described
74 * above. Nonetheless, with large workMem we can have many tapes (but not
75 * too many -- see the comments in tuplesort_merge_order).
76 *
77 * This module supports parallel sorting. Parallel sorts involve coordination
78 * among one or more worker processes, and a leader process, each with its own
79 * tuplesort state. The leader process (or, more accurately, the
80 * Tuplesortstate associated with a leader process) creates a full tapeset
81 * consisting of worker tapes with one run to merge; a run for every
82 * worker process. This is then merged. Worker processes are guaranteed to
83 * produce exactly one output run from their partial input.
84 *
85 *
86 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
87 * Portions Copyright (c) 1994, Regents of the University of California
88 *
89 * IDENTIFICATION
90 * src/backend/utils/sort/tuplesort.c
91 *
92 *-------------------------------------------------------------------------
93 */
94
95 #include "postgres.h"
96
97 #include <limits.h>
98
99 #include "access/hash.h"
100 #include "access/htup_details.h"
101 #include "access/nbtree.h"
102 #include "catalog/index.h"
103 #include "catalog/pg_am.h"
104 #include "commands/tablespace.h"
105 #include "executor/executor.h"
106 #include "miscadmin.h"
107 #include "pg_trace.h"
108 #include "utils/datum.h"
109 #include "utils/logtape.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 #include "utils/pg_rusage.h"
113 #include "utils/rel.h"
114 #include "utils/sortsupport.h"
115 #include "utils/tuplesort.h"
116
117
118 /* sort-type codes for sort__start probes */
119 #define HEAP_SORT 0
120 #define INDEX_SORT 1
121 #define DATUM_SORT 2
122 #define CLUSTER_SORT 3
123
124 /* Sort parallel code from state for sort__start probes */
125 #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
126 (state)->worker >= 0 ? 1 : 2)
127
128 /*
129 * Initial size of memtuples array. We're trying to select this size so that
130 * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
131 * allocation might possibly be lowered. However, we don't consider array sizes
132 * less than 1024.
133 *
134 */
135 #define INITIAL_MEMTUPSIZE Max(1024, \
136 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
137
138 /* GUC variables */
139 #ifdef TRACE_SORT
140 bool trace_sort = false;
141 #endif
142
143 #ifdef DEBUG_BOUNDED_SORT
144 bool optimize_bounded_sort = true;
145 #endif
146
147
148 /*
149 * The objects we actually sort are SortTuple structs. These contain
150 * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
151 * which is a separate palloc chunk --- we assume it is just one chunk and
152 * can be freed by a simple pfree() (except during merge, when we use a
153 * simple slab allocator). SortTuples also contain the tuple's first key
154 * column in Datum/nullflag format, and a source/input tape number that
155 * tracks which tape each heap element/slot belongs to during merging.
156 *
157 * Storing the first key column lets us save heap_getattr or index_getattr
158 * calls during tuple comparisons. We could extract and save all the key
159 * columns not just the first, but this would increase code complexity and
160 * overhead, and wouldn't actually save any comparison cycles in the common
161 * case where the first key determines the comparison result. Note that
162 * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
163 *
164 * There is one special case: when the sort support infrastructure provides an
165 * "abbreviated key" representation, where the key is (typically) a pass by
166 * value proxy for a pass by reference type. In this case, the abbreviated key
167 * is stored in datum1 in place of the actual first key column.
168 *
169 * When sorting single Datums, the data value is represented directly by
170 * datum1/isnull1 for pass by value types (or null values). If the datatype is
171 * pass-by-reference and isnull1 is false, then "tuple" points to a separately
172 * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then
173 * either the same pointer as "tuple", or is an abbreviated key value as
174 * described above. Accordingly, "tuple" is always used in preference to
175 * datum1 as the authoritative value for pass-by-reference cases.
176 */
177 typedef struct
178 {
179 void *tuple; /* the tuple itself */
180 Datum datum1; /* value of first key column */
181 bool isnull1; /* is first key column NULL? */
182 int srctape; /* source tape number */
183 } SortTuple;
184
185 /*
186 * During merge, we use a pre-allocated set of fixed-size slots to hold
187 * tuples. To avoid palloc/pfree overhead.
188 *
189 * Merge doesn't require a lot of memory, so we can afford to waste some,
190 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
191 * palloc() overhead is not significant anymore.
192 *
193 * 'nextfree' is valid when this chunk is in the free list. When in use, the
194 * slot holds a tuple.
195 */
196 #define SLAB_SLOT_SIZE 1024
197
198 typedef union SlabSlot
199 {
200 union SlabSlot *nextfree;
201 char buffer[SLAB_SLOT_SIZE];
202 } SlabSlot;
203
204 /*
205 * Possible states of a Tuplesort object. These denote the states that
206 * persist between calls of Tuplesort routines.
207 */
208 typedef enum
209 {
210 TSS_INITIAL, /* Loading tuples; still within memory limit */
211 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
212 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
213 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
214 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
215 TSS_FINALMERGE /* Performing final merge on-the-fly */
216 } TupSortStatus;
217
218 /*
219 * Parameters for calculation of number of tapes to use --- see inittapes()
220 * and tuplesort_merge_order().
221 *
222 * In this calculation we assume that each tape will cost us about 1 blocks
223 * worth of buffer space. This ignores the overhead of all the other data
224 * structures needed for each tape, but it's probably close enough.
225 *
226 * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
227 * tape during a preread cycle (see discussion at top of file).
228 */
229 #define MINORDER 6 /* minimum merge order */
230 #define MAXORDER 500 /* maximum merge order */
231 #define TAPE_BUFFER_OVERHEAD BLCKSZ
232 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
233
234 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
235 Tuplesortstate *state);
236
237 /*
238 * Private state of a Tuplesort operation.
239 */
240 struct Tuplesortstate
241 {
242 TupSortStatus status; /* enumerated value as shown above */
243 int nKeys; /* number of columns in sort key */
244 bool randomAccess; /* did caller request random access? */
245 bool bounded; /* did caller specify a maximum number of
246 * tuples to return? */
247 bool boundUsed; /* true if we made use of a bounded heap */
248 int bound; /* if bounded, the maximum number of tuples */
249 bool tuples; /* Can SortTuple.tuple ever be set? */
250 int64 availMem; /* remaining memory available, in bytes */
251 int64 allowedMem; /* total memory allowed, in bytes */
252 int maxTapes; /* number of tapes (Knuth's T) */
253 int tapeRange; /* maxTapes-1 (Knuth's P) */
254 int64 maxSpace; /* maximum amount of space occupied among sort
255 * of groups, either in-memory or on-disk */
256 bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
257 * space, false when it's value for in-memory
258 * space */
259 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
260 MemoryContext maincontext; /* memory context for tuple sort metadata that
261 * persists across multiple batches */
262 MemoryContext sortcontext; /* memory context holding most sort data */
263 MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
264 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
265
266 /*
267 * These function pointers decouple the routines that must know what kind
268 * of tuple we are sorting from the routines that don't need to know it.
269 * They are set up by the tuplesort_begin_xxx routines.
270 *
271 * Function to compare two tuples; result is per qsort() convention, ie:
272 * <0, 0, >0 according as a<b, a=b, a>b. The API must match
273 * qsort_arg_comparator.
274 */
275 SortTupleComparator comparetup;
276
277 /*
278 * Function to copy a supplied input tuple into palloc'd space and set up
279 * its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
280 * state->availMem must be decreased by the amount of space used for the
281 * tuple copy (note the SortTuple struct itself is not counted).
282 */
283 void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
284
285 /*
286 * Function to write a stored tuple onto tape. The representation of the
287 * tuple on tape need not be the same as it is in memory; requirements on
288 * the tape representation are given below. Unless the slab allocator is
289 * used, after writing the tuple, pfree() the out-of-line data (not the
290 * SortTuple struct!), and increase state->availMem by the amount of
291 * memory space thereby released.
292 */
293 void (*writetup) (Tuplesortstate *state, int tapenum,
294 SortTuple *stup);
295
296 /*
297 * Function to read a stored tuple from tape back into memory. 'len' is
298 * the already-read length of the stored tuple. The tuple is allocated
299 * from the slab memory arena, or is palloc'd, see readtup_alloc().
300 */
301 void (*readtup) (Tuplesortstate *state, SortTuple *stup,
302 int tapenum, unsigned int len);
303
304 /*
305 * This array holds the tuples now in sort memory. If we are in state
306 * INITIAL, the tuples are in no particular order; if we are in state
307 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
308 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
309 * H. In state SORTEDONTAPE, the array is not used.
310 */
311 SortTuple *memtuples; /* array of SortTuple structs */
312 int memtupcount; /* number of tuples currently present */
313 int memtupsize; /* allocated length of memtuples array */
314 bool growmemtuples; /* memtuples' growth still underway? */
315
316 /*
317 * Memory for tuples is sometimes allocated using a simple slab allocator,
318 * rather than with palloc(). Currently, we switch to slab allocation
319 * when we start merging. Merging only needs to keep a small, fixed
320 * number of tuples in memory at any time, so we can avoid the
321 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
322 * to hold the tuples.
323 *
324 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
325 * slots. The allocation is sized to have one slot per tape, plus one
326 * additional slot. We need that many slots to hold all the tuples kept
327 * in the heap during merge, plus the one we have last returned from the
328 * sort, with tuplesort_gettuple.
329 *
330 * Initially, all the slots are kept in a linked list of free slots. When
331 * a tuple is read from a tape, it is put to the next available slot, if
332 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
333 * instead.
334 *
335 * When we're done processing a tuple, we return the slot back to the free
336 * list, or pfree() if it was palloc'd. We know that a tuple was
337 * allocated from the slab, if its pointer value is between
338 * slabMemoryBegin and -End.
339 *
340 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
341 * tracking memory usage is not used.
342 */
343 bool slabAllocatorUsed;
344
345 char *slabMemoryBegin; /* beginning of slab memory arena */
346 char *slabMemoryEnd; /* end of slab memory arena */
347 SlabSlot *slabFreeHead; /* head of free list */
348
349 /* Buffer size to use for reading input tapes, during merge. */
350 size_t read_buffer_size;
351
352 /*
353 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
354 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
355 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
356 * recycle the memory on next gettuple call.
357 */
358 void *lastReturnedTuple;
359
360 /*
361 * While building initial runs, this is the current output run number.
362 * Afterwards, it is the number of initial runs we made.
363 */
364 int currentRun;
365
366 /*
367 * Unless otherwise noted, all pointer variables below are pointers to
368 * arrays of length maxTapes, holding per-tape data.
369 */
370
371 /*
372 * This variable is only used during merge passes. mergeactive[i] is true
373 * if we are reading an input run from (actual) tape number i and have not
374 * yet exhausted that run.
375 */
376 bool *mergeactive; /* active input run source? */
377
378 /*
379 * Variables for Algorithm D. Note that destTape is a "logical" tape
380 * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
381 * "logical" and "actual" tape numbers straight!
382 */
383 int Level; /* Knuth's l */
384 int destTape; /* current output tape (Knuth's j, less 1) */
385 int *tp_fib; /* Target Fibonacci run counts (A[]) */
386 int *tp_runs; /* # of real runs on each tape */
387 int *tp_dummy; /* # of dummy runs for each tape (D[]) */
388 int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
389 int activeTapes; /* # of active input tapes in merge pass */
390
391 /*
392 * These variables are used after completion of sorting to keep track of
393 * the next tuple to return. (In the tape case, the tape's current read
394 * position is also critical state.)
395 */
396 int result_tape; /* actual tape number of finished output */
397 int current; /* array index (only used if SORTEDINMEM) */
398 bool eof_reached; /* reached EOF (needed for cursors) */
399
400 /* markpos_xxx holds marked position for mark and restore */
401 long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
402 int markpos_offset; /* saved "current", or offset in tape block */
403 bool markpos_eof; /* saved "eof_reached" */
404
405 /*
406 * These variables are used during parallel sorting.
407 *
408 * worker is our worker identifier. Follows the general convention that
409 * -1 value relates to a leader tuplesort, and values >= 0 worker
410 * tuplesorts. (-1 can also be a serial tuplesort.)
411 *
412 * shared is mutable shared memory state, which is used to coordinate
413 * parallel sorts.
414 *
415 * nParticipants is the number of worker Tuplesortstates known by the
416 * leader to have actually been launched, which implies that they must
417 * finish a run leader can merge. Typically includes a worker state held
418 * by the leader process itself. Set in the leader Tuplesortstate only.
419 */
420 int worker;
421 Sharedsort *shared;
422 int nParticipants;
423
424 /*
425 * The sortKeys variable is used by every case other than the hash index
426 * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
427 * MinimalTuple and CLUSTER routines, though.
428 */
429 TupleDesc tupDesc;
430 SortSupport sortKeys; /* array of length nKeys */
431
432 /*
433 * This variable is shared by the single-key MinimalTuple case and the
434 * Datum case (which both use qsort_ssup()). Otherwise it's NULL.
435 */
436 SortSupport onlyKey;
437
438 /*
439 * Additional state for managing "abbreviated key" sortsupport routines
440 * (which currently may be used by all cases except the hash index case).
441 * Tracks the intervals at which the optimization's effectiveness is
442 * tested.
443 */
444 int64 abbrevNext; /* Tuple # at which to next check
445 * applicability */
446
447 /*
448 * These variables are specific to the CLUSTER case; they are set by
449 * tuplesort_begin_cluster.
450 */
451 IndexInfo *indexInfo; /* info about index being used for reference */
452 EState *estate; /* for evaluating index expressions */
453
454 /*
455 * These variables are specific to the IndexTuple case; they are set by
456 * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
457 */
458 Relation heapRel; /* table the index is being built on */
459 Relation indexRel; /* index being built */
460
461 /* These are specific to the index_btree subcase: */
462 bool enforceUnique; /* complain if we find duplicate tuples */
463
464 /* These are specific to the index_hash subcase: */
465 uint32 high_mask; /* masks for sortable part of hash code */
466 uint32 low_mask;
467 uint32 max_buckets;
468
469 /*
470 * These variables are specific to the Datum case; they are set by
471 * tuplesort_begin_datum and used only by the DatumTuple routines.
472 */
473 Oid datumType;
474 /* we need typelen in order to know how to copy the Datums. */
475 int datumTypeLen;
476
477 /*
478 * Resource snapshot for time of sort start.
479 */
480 #ifdef TRACE_SORT
481 PGRUsage ru_start;
482 #endif
483 };
484
485 /*
486 * Private mutable state of tuplesort-parallel-operation. This is allocated
487 * in shared memory.
488 */
489 struct Sharedsort
490 {
491 /* mutex protects all fields prior to tapes */
492 slock_t mutex;
493
494 /*
495 * currentWorker generates ordinal identifier numbers for parallel sort
496 * workers. These start from 0, and are always gapless.
497 *
498 * Workers increment workersFinished to indicate having finished. If this
499 * is equal to state.nParticipants within the leader, leader is ready to
500 * merge worker runs.
501 */
502 int currentWorker;
503 int workersFinished;
504
505 /* Temporary file space */
506 SharedFileSet fileset;
507
508 /* Size of tapes flexible array */
509 int nTapes;
510
511 /*
512 * Tapes array used by workers to report back information needed by the
513 * leader to concatenate all worker tapes into one for merging
514 */
515 TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
516 };
517
518 /*
519 * Is the given tuple allocated from the slab memory arena?
520 */
521 #define IS_SLAB_SLOT(state, tuple) \
522 ((char *) (tuple) >= (state)->slabMemoryBegin && \
523 (char *) (tuple) < (state)->slabMemoryEnd)
524
525 /*
526 * Return the given tuple to the slab memory free list, or free it
527 * if it was palloc'd.
528 */
529 #define RELEASE_SLAB_SLOT(state, tuple) \
530 do { \
531 SlabSlot *buf = (SlabSlot *) tuple; \
532 \
533 if (IS_SLAB_SLOT((state), buf)) \
534 { \
535 buf->nextfree = (state)->slabFreeHead; \
536 (state)->slabFreeHead = buf; \
537 } else \
538 pfree(buf); \
539 } while(0)
540
541 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
542 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
543 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
544 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
545 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
546 #define USEMEM(state,amt) ((state)->availMem -= (amt))
547 #define FREEMEM(state,amt) ((state)->availMem += (amt))
548 #define SERIAL(state) ((state)->shared == NULL)
549 #define WORKER(state) ((state)->shared && (state)->worker != -1)
550 #define LEADER(state) ((state)->shared && (state)->worker == -1)
551
552 /*
553 * NOTES about on-tape representation of tuples:
554 *
555 * We require the first "unsigned int" of a stored tuple to be the total size
556 * on-tape of the tuple, including itself (so it is never zero; an all-zero
557 * unsigned int is used to delimit runs). The remainder of the stored tuple
558 * may or may not match the in-memory representation of the tuple ---
559 * any conversion needed is the job of the writetup and readtup routines.
560 *
561 * If state->randomAccess is true, then the stored representation of the
562 * tuple must be followed by another "unsigned int" that is a copy of the
563 * length --- so the total tape space used is actually sizeof(unsigned int)
564 * more than the stored length value. This allows read-backwards. When
565 * randomAccess is not true, the write/read routines may omit the extra
566 * length word.
567 *
568 * writetup is expected to write both length words as well as the tuple
569 * data. When readtup is called, the tape is positioned just after the
570 * front length word; readtup must read the tuple data and advance past
571 * the back length word (if present).
572 *
573 * The write/read routines can make use of the tuple description data
574 * stored in the Tuplesortstate record, if needed. They are also expected
575 * to adjust state->availMem by the amount of memory space (not tape space!)
576 * released or consumed. There is no error return from either writetup
577 * or readtup; they should ereport() on failure.
578 *
579 *
580 * NOTES about memory consumption calculations:
581 *
582 * We count space allocated for tuples against the workMem limit, plus
583 * the space used by the variable-size memtuples array. Fixed-size space
584 * is not counted; it's small enough to not be interesting.
585 *
586 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
587 * rather than the originally-requested size. This is important since
588 * palloc can add substantial overhead. It's not a complete answer since
589 * we won't count any wasted space in palloc allocation blocks, but it's
590 * a lot better than what we were doing before 7.3. As of 9.6, a
591 * separate memory context is used for caller passed tuples. Resetting
592 * it at certain key increments significantly ameliorates fragmentation.
593 * Note that this places a responsibility on copytup routines to use the
594 * correct memory context for these tuples (and to not use the reset
595 * context for anything whose lifetime needs to span multiple external
596 * sort runs). readtup routines use the slab allocator (they cannot use
597 * the reset context because it gets deleted at the point that merging
598 * begins).
599 */
600
601 /* When using this macro, beware of double evaluation of len */
602 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
603 do { \
604 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
605 elog(ERROR, "unexpected end of data"); \
606 } while(0)
607
608
609 static Tuplesortstate *tuplesort_begin_common(int workMem,
610 SortCoordinate coordinate,
611 bool randomAccess);
612 static void tuplesort_begin_batch(Tuplesortstate *state);
613 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
614 static bool consider_abort_common(Tuplesortstate *state);
615 static void inittapes(Tuplesortstate *state, bool mergeruns);
616 static void inittapestate(Tuplesortstate *state, int maxTapes);
617 static void selectnewtape(Tuplesortstate *state);
618 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
619 static void mergeruns(Tuplesortstate *state);
620 static void mergeonerun(Tuplesortstate *state);
621 static void beginmerge(Tuplesortstate *state);
622 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
623 static void dumptuples(Tuplesortstate *state, bool alltuples);
624 static void make_bounded_heap(Tuplesortstate *state);
625 static void sort_bounded_heap(Tuplesortstate *state);
626 static void tuplesort_sort_memtuples(Tuplesortstate *state);
627 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
628 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
629 static void tuplesort_heap_delete_top(Tuplesortstate *state);
630 static void reversedirection(Tuplesortstate *state);
631 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
632 static void markrunend(Tuplesortstate *state, int tapenum);
633 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
634 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
635 Tuplesortstate *state);
636 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
637 static void writetup_heap(Tuplesortstate *state, int tapenum,
638 SortTuple *stup);
639 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
640 int tapenum, unsigned int len);
641 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
642 Tuplesortstate *state);
643 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
644 static void writetup_cluster(Tuplesortstate *state, int tapenum,
645 SortTuple *stup);
646 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
647 int tapenum, unsigned int len);
648 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
649 Tuplesortstate *state);
650 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
651 Tuplesortstate *state);
652 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
653 static void writetup_index(Tuplesortstate *state, int tapenum,
654 SortTuple *stup);
655 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
656 int tapenum, unsigned int len);
657 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
658 Tuplesortstate *state);
659 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
660 static void writetup_datum(Tuplesortstate *state, int tapenum,
661 SortTuple *stup);
662 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
663 int tapenum, unsigned int len);
664 static int worker_get_identifier(Tuplesortstate *state);
665 static void worker_freeze_result_tape(Tuplesortstate *state);
666 static void worker_nomergeruns(Tuplesortstate *state);
667 static void leader_takeover_tapes(Tuplesortstate *state);
668 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
669 static void tuplesort_free(Tuplesortstate *state);
670 static void tuplesort_updatemax(Tuplesortstate *state);
671
672 /*
673 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
674 * any variant of SortTuples, using the appropriate comparetup function.
675 * qsort_ssup() is specialized for the case where the comparetup function
676 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
677 * and Datum sorts.
678 */
679
680 #define ST_SORT qsort_tuple
681 #define ST_ELEMENT_TYPE SortTuple
682 #define ST_COMPARE_RUNTIME_POINTER
683 #define ST_COMPARE_ARG_TYPE Tuplesortstate
684 #define ST_CHECK_FOR_INTERRUPTS
685 #define ST_SCOPE static
686 #define ST_DECLARE
687 #define ST_DEFINE
688 #include "lib/sort_template.h"
689
690 #define ST_SORT qsort_ssup
691 #define ST_ELEMENT_TYPE SortTuple
692 #define ST_COMPARE(a, b, ssup) \
693 ApplySortComparator((a)->datum1, (a)->isnull1, \
694 (b)->datum1, (b)->isnull1, (ssup))
695 #define ST_COMPARE_ARG_TYPE SortSupportData
696 #define ST_CHECK_FOR_INTERRUPTS
697 #define ST_SCOPE static
698 #define ST_DEFINE
699 #include "lib/sort_template.h"
700
701 /*
702 * tuplesort_begin_xxx
703 *
704 * Initialize for a tuple sort operation.
705 *
706 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
707 * zero or more times, then call tuplesort_performsort when all the tuples
708 * have been supplied. After performsort, retrieve the tuples in sorted
709 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
710 * access was requested, rescan, markpos, and restorepos can also be called.)
711 * Call tuplesort_end to terminate the operation and release memory/disk space.
712 *
713 * Each variant of tuplesort_begin has a workMem parameter specifying the
714 * maximum number of kilobytes of RAM to use before spilling data to disk.
715 * (The normal value of this parameter is work_mem, but some callers use
716 * other values.) Each variant also has a randomAccess parameter specifying
717 * whether the caller needs non-sequential access to the sort result.
718 */
719
720 static Tuplesortstate *
tuplesort_begin_common(int workMem,SortCoordinate coordinate,bool randomAccess)721 tuplesort_begin_common(int workMem, SortCoordinate coordinate,
722 bool randomAccess)
723 {
724 Tuplesortstate *state;
725 MemoryContext maincontext;
726 MemoryContext sortcontext;
727 MemoryContext oldcontext;
728
729 /* See leader_takeover_tapes() remarks on randomAccess support */
730 if (coordinate && randomAccess)
731 elog(ERROR, "random access disallowed under parallel sort");
732
733 /*
734 * Memory context surviving tuplesort_reset. This memory context holds
735 * data which is useful to keep while sorting multiple similar batches.
736 */
737 maincontext = AllocSetContextCreate(CurrentMemoryContext,
738 "TupleSort main",
739 ALLOCSET_DEFAULT_SIZES);
740
741 /*
742 * Create a working memory context for one sort operation. The content of
743 * this context is deleted by tuplesort_reset.
744 */
745 sortcontext = AllocSetContextCreate(maincontext,
746 "TupleSort sort",
747 ALLOCSET_DEFAULT_SIZES);
748
749 /*
750 * Additionally a working memory context for tuples is setup in
751 * tuplesort_begin_batch.
752 */
753
754 /*
755 * Make the Tuplesortstate within the per-sortstate context. This way, we
756 * don't need a separate pfree() operation for it at shutdown.
757 */
758 oldcontext = MemoryContextSwitchTo(maincontext);
759
760 state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
761
762 #ifdef TRACE_SORT
763 if (trace_sort)
764 pg_rusage_init(&state->ru_start);
765 #endif
766
767 state->randomAccess = randomAccess;
768 state->tuples = true;
769
770 /*
771 * workMem is forced to be at least 64KB, the current minimum valid value
772 * for the work_mem GUC. This is a defense against parallel sort callers
773 * that divide out memory among many workers in a way that leaves each
774 * with very little memory.
775 */
776 state->allowedMem = Max(workMem, 64) * (int64) 1024;
777 state->sortcontext = sortcontext;
778 state->maincontext = maincontext;
779
780 /*
781 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
782 * see comments in grow_memtuples().
783 */
784 state->memtupsize = INITIAL_MEMTUPSIZE;
785 state->memtuples = NULL;
786
787 /*
788 * After all of the other non-parallel-related state, we setup all of the
789 * state needed for each batch.
790 */
791 tuplesort_begin_batch(state);
792
793 /*
794 * Initialize parallel-related state based on coordination information
795 * from caller
796 */
797 if (!coordinate)
798 {
799 /* Serial sort */
800 state->shared = NULL;
801 state->worker = -1;
802 state->nParticipants = -1;
803 }
804 else if (coordinate->isWorker)
805 {
806 /* Parallel worker produces exactly one final run from all input */
807 state->shared = coordinate->sharedsort;
808 state->worker = worker_get_identifier(state);
809 state->nParticipants = -1;
810 }
811 else
812 {
813 /* Parallel leader state only used for final merge */
814 state->shared = coordinate->sharedsort;
815 state->worker = -1;
816 state->nParticipants = coordinate->nParticipants;
817 Assert(state->nParticipants >= 1);
818 }
819
820 MemoryContextSwitchTo(oldcontext);
821
822 return state;
823 }
824
825 /*
826 * tuplesort_begin_batch
827 *
828 * Setup, or reset, all state need for processing a new set of tuples with this
829 * sort state. Called both from tuplesort_begin_common (the first time sorting
830 * with this sort state) and tuplesort_reset (for subsequent usages).
831 */
832 static void
tuplesort_begin_batch(Tuplesortstate * state)833 tuplesort_begin_batch(Tuplesortstate *state)
834 {
835 MemoryContext oldcontext;
836
837 oldcontext = MemoryContextSwitchTo(state->maincontext);
838
839 /*
840 * Caller tuple (e.g. IndexTuple) memory context.
841 *
842 * A dedicated child context used exclusively for caller passed tuples
843 * eases memory management. Resetting at key points reduces
844 * fragmentation. Note that the memtuples array of SortTuples is allocated
845 * in the parent context, not this context, because there is no need to
846 * free memtuples early.
847 */
848 state->tuplecontext = AllocSetContextCreate(state->sortcontext,
849 "Caller tuples",
850 ALLOCSET_DEFAULT_SIZES);
851
852 state->status = TSS_INITIAL;
853 state->bounded = false;
854 state->boundUsed = false;
855
856 state->availMem = state->allowedMem;
857
858 state->tapeset = NULL;
859
860 state->memtupcount = 0;
861
862 /*
863 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
864 * see comments in grow_memtuples().
865 */
866 state->growmemtuples = true;
867 state->slabAllocatorUsed = false;
868 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
869 {
870 pfree(state->memtuples);
871 state->memtuples = NULL;
872 state->memtupsize = INITIAL_MEMTUPSIZE;
873 }
874 if (state->memtuples == NULL)
875 {
876 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
877 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
878 }
879
880 /* workMem must be large enough for the minimal memtuples array */
881 if (LACKMEM(state))
882 elog(ERROR, "insufficient memory allowed for sort");
883
884 state->currentRun = 0;
885
886 /*
887 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
888 * inittapes(), if needed
889 */
890
891 state->result_tape = -1; /* flag that result tape has not been formed */
892
893 MemoryContextSwitchTo(oldcontext);
894 }
895
896 Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,int nkeys,AttrNumber * attNums,Oid * sortOperators,Oid * sortCollations,bool * nullsFirstFlags,int workMem,SortCoordinate coordinate,bool randomAccess)897 tuplesort_begin_heap(TupleDesc tupDesc,
898 int nkeys, AttrNumber *attNums,
899 Oid *sortOperators, Oid *sortCollations,
900 bool *nullsFirstFlags,
901 int workMem, SortCoordinate coordinate, bool randomAccess)
902 {
903 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
904 randomAccess);
905 MemoryContext oldcontext;
906 int i;
907
908 oldcontext = MemoryContextSwitchTo(state->maincontext);
909
910 AssertArg(nkeys > 0);
911
912 #ifdef TRACE_SORT
913 if (trace_sort)
914 elog(LOG,
915 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
916 nkeys, workMem, randomAccess ? 't' : 'f');
917 #endif
918
919 state->nKeys = nkeys;
920
921 TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
922 false, /* no unique check */
923 nkeys,
924 workMem,
925 randomAccess,
926 PARALLEL_SORT(state));
927
928 state->comparetup = comparetup_heap;
929 state->copytup = copytup_heap;
930 state->writetup = writetup_heap;
931 state->readtup = readtup_heap;
932
933 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
934 state->abbrevNext = 10;
935
936 /* Prepare SortSupport data for each column */
937 state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
938
939 for (i = 0; i < nkeys; i++)
940 {
941 SortSupport sortKey = state->sortKeys + i;
942
943 AssertArg(attNums[i] != 0);
944 AssertArg(sortOperators[i] != 0);
945
946 sortKey->ssup_cxt = CurrentMemoryContext;
947 sortKey->ssup_collation = sortCollations[i];
948 sortKey->ssup_nulls_first = nullsFirstFlags[i];
949 sortKey->ssup_attno = attNums[i];
950 /* Convey if abbreviation optimization is applicable in principle */
951 sortKey->abbreviate = (i == 0);
952
953 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
954 }
955
956 /*
957 * The "onlyKey" optimization cannot be used with abbreviated keys, since
958 * tie-breaker comparisons may be required. Typically, the optimization
959 * is only of value to pass-by-value types anyway, whereas abbreviated
960 * keys are typically only of value to pass-by-reference types.
961 */
962 if (nkeys == 1 && !state->sortKeys->abbrev_converter)
963 state->onlyKey = state->sortKeys;
964
965 MemoryContextSwitchTo(oldcontext);
966
967 return state;
968 }
969
970 Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)971 tuplesort_begin_cluster(TupleDesc tupDesc,
972 Relation indexRel,
973 int workMem,
974 SortCoordinate coordinate, bool randomAccess)
975 {
976 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
977 randomAccess);
978 BTScanInsert indexScanKey;
979 MemoryContext oldcontext;
980 int i;
981
982 Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
983
984 oldcontext = MemoryContextSwitchTo(state->maincontext);
985
986 #ifdef TRACE_SORT
987 if (trace_sort)
988 elog(LOG,
989 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
990 RelationGetNumberOfAttributes(indexRel),
991 workMem, randomAccess ? 't' : 'f');
992 #endif
993
994 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
995
996 TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
997 false, /* no unique check */
998 state->nKeys,
999 workMem,
1000 randomAccess,
1001 PARALLEL_SORT(state));
1002
1003 state->comparetup = comparetup_cluster;
1004 state->copytup = copytup_cluster;
1005 state->writetup = writetup_cluster;
1006 state->readtup = readtup_cluster;
1007 state->abbrevNext = 10;
1008
1009 state->indexInfo = BuildIndexInfo(indexRel);
1010
1011 state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
1012
1013 indexScanKey = _bt_mkscankey(indexRel, NULL);
1014
1015 if (state->indexInfo->ii_Expressions != NULL)
1016 {
1017 TupleTableSlot *slot;
1018 ExprContext *econtext;
1019
1020 /*
1021 * We will need to use FormIndexDatum to evaluate the index
1022 * expressions. To do that, we need an EState, as well as a
1023 * TupleTableSlot to put the table tuples into. The econtext's
1024 * scantuple has to point to that slot, too.
1025 */
1026 state->estate = CreateExecutorState();
1027 slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
1028 econtext = GetPerTupleExprContext(state->estate);
1029 econtext->ecxt_scantuple = slot;
1030 }
1031
1032 /* Prepare SortSupport data for each column */
1033 state->sortKeys = (SortSupport) palloc0(state->nKeys *
1034 sizeof(SortSupportData));
1035
1036 for (i = 0; i < state->nKeys; i++)
1037 {
1038 SortSupport sortKey = state->sortKeys + i;
1039 ScanKey scanKey = indexScanKey->scankeys + i;
1040 int16 strategy;
1041
1042 sortKey->ssup_cxt = CurrentMemoryContext;
1043 sortKey->ssup_collation = scanKey->sk_collation;
1044 sortKey->ssup_nulls_first =
1045 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1046 sortKey->ssup_attno = scanKey->sk_attno;
1047 /* Convey if abbreviation optimization is applicable in principle */
1048 sortKey->abbreviate = (i == 0);
1049
1050 AssertState(sortKey->ssup_attno != 0);
1051
1052 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1053 BTGreaterStrategyNumber : BTLessStrategyNumber;
1054
1055 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1056 }
1057
1058 pfree(indexScanKey);
1059
1060 MemoryContextSwitchTo(oldcontext);
1061
1062 return state;
1063 }
1064
1065 Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,Relation indexRel,bool enforceUnique,int workMem,SortCoordinate coordinate,bool randomAccess)1066 tuplesort_begin_index_btree(Relation heapRel,
1067 Relation indexRel,
1068 bool enforceUnique,
1069 int workMem,
1070 SortCoordinate coordinate,
1071 bool randomAccess)
1072 {
1073 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1074 randomAccess);
1075 BTScanInsert indexScanKey;
1076 MemoryContext oldcontext;
1077 int i;
1078
1079 oldcontext = MemoryContextSwitchTo(state->maincontext);
1080
1081 #ifdef TRACE_SORT
1082 if (trace_sort)
1083 elog(LOG,
1084 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
1085 enforceUnique ? 't' : 'f',
1086 workMem, randomAccess ? 't' : 'f');
1087 #endif
1088
1089 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
1090
1091 TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1092 enforceUnique,
1093 state->nKeys,
1094 workMem,
1095 randomAccess,
1096 PARALLEL_SORT(state));
1097
1098 state->comparetup = comparetup_index_btree;
1099 state->copytup = copytup_index;
1100 state->writetup = writetup_index;
1101 state->readtup = readtup_index;
1102 state->abbrevNext = 10;
1103
1104 state->heapRel = heapRel;
1105 state->indexRel = indexRel;
1106 state->enforceUnique = enforceUnique;
1107
1108 indexScanKey = _bt_mkscankey(indexRel, NULL);
1109
1110 /* Prepare SortSupport data for each column */
1111 state->sortKeys = (SortSupport) palloc0(state->nKeys *
1112 sizeof(SortSupportData));
1113
1114 for (i = 0; i < state->nKeys; i++)
1115 {
1116 SortSupport sortKey = state->sortKeys + i;
1117 ScanKey scanKey = indexScanKey->scankeys + i;
1118 int16 strategy;
1119
1120 sortKey->ssup_cxt = CurrentMemoryContext;
1121 sortKey->ssup_collation = scanKey->sk_collation;
1122 sortKey->ssup_nulls_first =
1123 (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1124 sortKey->ssup_attno = scanKey->sk_attno;
1125 /* Convey if abbreviation optimization is applicable in principle */
1126 sortKey->abbreviate = (i == 0);
1127
1128 AssertState(sortKey->ssup_attno != 0);
1129
1130 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1131 BTGreaterStrategyNumber : BTLessStrategyNumber;
1132
1133 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1134 }
1135
1136 pfree(indexScanKey);
1137
1138 MemoryContextSwitchTo(oldcontext);
1139
1140 return state;
1141 }
1142
1143 Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,Relation indexRel,uint32 high_mask,uint32 low_mask,uint32 max_buckets,int workMem,SortCoordinate coordinate,bool randomAccess)1144 tuplesort_begin_index_hash(Relation heapRel,
1145 Relation indexRel,
1146 uint32 high_mask,
1147 uint32 low_mask,
1148 uint32 max_buckets,
1149 int workMem,
1150 SortCoordinate coordinate,
1151 bool randomAccess)
1152 {
1153 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1154 randomAccess);
1155 MemoryContext oldcontext;
1156
1157 oldcontext = MemoryContextSwitchTo(state->maincontext);
1158
1159 #ifdef TRACE_SORT
1160 if (trace_sort)
1161 elog(LOG,
1162 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1163 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1164 high_mask,
1165 low_mask,
1166 max_buckets,
1167 workMem, randomAccess ? 't' : 'f');
1168 #endif
1169
1170 state->nKeys = 1; /* Only one sort column, the hash code */
1171
1172 state->comparetup = comparetup_index_hash;
1173 state->copytup = copytup_index;
1174 state->writetup = writetup_index;
1175 state->readtup = readtup_index;
1176
1177 state->heapRel = heapRel;
1178 state->indexRel = indexRel;
1179
1180 state->high_mask = high_mask;
1181 state->low_mask = low_mask;
1182 state->max_buckets = max_buckets;
1183
1184 MemoryContextSwitchTo(oldcontext);
1185
1186 return state;
1187 }
1188
1189 Tuplesortstate *
tuplesort_begin_index_gist(Relation heapRel,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)1190 tuplesort_begin_index_gist(Relation heapRel,
1191 Relation indexRel,
1192 int workMem,
1193 SortCoordinate coordinate,
1194 bool randomAccess)
1195 {
1196 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1197 randomAccess);
1198 MemoryContext oldcontext;
1199 int i;
1200
1201 oldcontext = MemoryContextSwitchTo(state->sortcontext);
1202
1203 #ifdef TRACE_SORT
1204 if (trace_sort)
1205 elog(LOG,
1206 "begin index sort: workMem = %d, randomAccess = %c",
1207 workMem, randomAccess ? 't' : 'f');
1208 #endif
1209
1210 state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
1211
1212 state->comparetup = comparetup_index_btree;
1213 state->copytup = copytup_index;
1214 state->writetup = writetup_index;
1215 state->readtup = readtup_index;
1216
1217 state->heapRel = heapRel;
1218 state->indexRel = indexRel;
1219
1220 /* Prepare SortSupport data for each column */
1221 state->sortKeys = (SortSupport) palloc0(state->nKeys *
1222 sizeof(SortSupportData));
1223
1224 for (i = 0; i < state->nKeys; i++)
1225 {
1226 SortSupport sortKey = state->sortKeys + i;
1227
1228 sortKey->ssup_cxt = CurrentMemoryContext;
1229 sortKey->ssup_collation = indexRel->rd_indcollation[i];
1230 sortKey->ssup_nulls_first = false;
1231 sortKey->ssup_attno = i + 1;
1232 /* Convey if abbreviation optimization is applicable in principle */
1233 sortKey->abbreviate = (i == 0);
1234
1235 AssertState(sortKey->ssup_attno != 0);
1236
1237 /* Look for a sort support function */
1238 PrepareSortSupportFromGistIndexRel(indexRel, sortKey);
1239 }
1240
1241 MemoryContextSwitchTo(oldcontext);
1242
1243 return state;
1244 }
1245
1246 Tuplesortstate *
tuplesort_begin_datum(Oid datumType,Oid sortOperator,Oid sortCollation,bool nullsFirstFlag,int workMem,SortCoordinate coordinate,bool randomAccess)1247 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1248 bool nullsFirstFlag, int workMem,
1249 SortCoordinate coordinate, bool randomAccess)
1250 {
1251 Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1252 randomAccess);
1253 MemoryContext oldcontext;
1254 int16 typlen;
1255 bool typbyval;
1256
1257 oldcontext = MemoryContextSwitchTo(state->maincontext);
1258
1259 #ifdef TRACE_SORT
1260 if (trace_sort)
1261 elog(LOG,
1262 "begin datum sort: workMem = %d, randomAccess = %c",
1263 workMem, randomAccess ? 't' : 'f');
1264 #endif
1265
1266 state->nKeys = 1; /* always a one-column sort */
1267
1268 TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1269 false, /* no unique check */
1270 1,
1271 workMem,
1272 randomAccess,
1273 PARALLEL_SORT(state));
1274
1275 state->comparetup = comparetup_datum;
1276 state->copytup = copytup_datum;
1277 state->writetup = writetup_datum;
1278 state->readtup = readtup_datum;
1279 state->abbrevNext = 10;
1280
1281 state->datumType = datumType;
1282
1283 /* lookup necessary attributes of the datum type */
1284 get_typlenbyval(datumType, &typlen, &typbyval);
1285 state->datumTypeLen = typlen;
1286 state->tuples = !typbyval;
1287
1288 /* Prepare SortSupport data */
1289 state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1290
1291 state->sortKeys->ssup_cxt = CurrentMemoryContext;
1292 state->sortKeys->ssup_collation = sortCollation;
1293 state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1294
1295 /*
1296 * Abbreviation is possible here only for by-reference types. In theory,
1297 * a pass-by-value datatype could have an abbreviated form that is cheaper
1298 * to compare. In a tuple sort, we could support that, because we can
1299 * always extract the original datum from the tuple as needed. Here, we
1300 * can't, because a datum sort only stores a single copy of the datum; the
1301 * "tuple" field of each SortTuple is NULL.
1302 */
1303 state->sortKeys->abbreviate = !typbyval;
1304
1305 PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1306
1307 /*
1308 * The "onlyKey" optimization cannot be used with abbreviated keys, since
1309 * tie-breaker comparisons may be required. Typically, the optimization
1310 * is only of value to pass-by-value types anyway, whereas abbreviated
1311 * keys are typically only of value to pass-by-reference types.
1312 */
1313 if (!state->sortKeys->abbrev_converter)
1314 state->onlyKey = state->sortKeys;
1315
1316 MemoryContextSwitchTo(oldcontext);
1317
1318 return state;
1319 }
1320
1321 /*
1322 * tuplesort_set_bound
1323 *
1324 * Advise tuplesort that at most the first N result tuples are required.
1325 *
1326 * Must be called before inserting any tuples. (Actually, we could allow it
1327 * as long as the sort hasn't spilled to disk, but there seems no need for
1328 * delayed calls at the moment.)
1329 *
1330 * This is a hint only. The tuplesort may still return more tuples than
1331 * requested. Parallel leader tuplesorts will always ignore the hint.
1332 */
1333 void
tuplesort_set_bound(Tuplesortstate * state,int64 bound)1334 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1335 {
1336 /* Assert we're called before loading any tuples */
1337 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
1338 /* Can't set the bound twice, either */
1339 Assert(!state->bounded);
1340 /* Also, this shouldn't be called in a parallel worker */
1341 Assert(!WORKER(state));
1342
1343 /* Parallel leader allows but ignores hint */
1344 if (LEADER(state))
1345 return;
1346
1347 #ifdef DEBUG_BOUNDED_SORT
1348 /* Honor GUC setting that disables the feature (for easy testing) */
1349 if (!optimize_bounded_sort)
1350 return;
1351 #endif
1352
1353 /* We want to be able to compute bound * 2, so limit the setting */
1354 if (bound > (int64) (INT_MAX / 2))
1355 return;
1356
1357 state->bounded = true;
1358 state->bound = (int) bound;
1359
1360 /*
1361 * Bounded sorts are not an effective target for abbreviated key
1362 * optimization. Disable by setting state to be consistent with no
1363 * abbreviation support.
1364 */
1365 state->sortKeys->abbrev_converter = NULL;
1366 if (state->sortKeys->abbrev_full_comparator)
1367 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1368
1369 /* Not strictly necessary, but be tidy */
1370 state->sortKeys->abbrev_abort = NULL;
1371 state->sortKeys->abbrev_full_comparator = NULL;
1372 }
1373
1374 /*
1375 * tuplesort_used_bound
1376 *
1377 * Allow callers to find out if the sort state was able to use a bound.
1378 */
1379 bool
tuplesort_used_bound(Tuplesortstate * state)1380 tuplesort_used_bound(Tuplesortstate *state)
1381 {
1382 return state->boundUsed;
1383 }
1384
1385 /*
1386 * tuplesort_free
1387 *
1388 * Internal routine for freeing resources of tuplesort.
1389 */
1390 static void
tuplesort_free(Tuplesortstate * state)1391 tuplesort_free(Tuplesortstate *state)
1392 {
1393 /* context swap probably not needed, but let's be safe */
1394 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1395
1396 #ifdef TRACE_SORT
1397 long spaceUsed;
1398
1399 if (state->tapeset)
1400 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1401 else
1402 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1403 #endif
1404
1405 /*
1406 * Delete temporary "tape" files, if any.
1407 *
1408 * Note: want to include this in reported total cost of sort, hence need
1409 * for two #ifdef TRACE_SORT sections.
1410 */
1411 if (state->tapeset)
1412 LogicalTapeSetClose(state->tapeset);
1413
1414 #ifdef TRACE_SORT
1415 if (trace_sort)
1416 {
1417 if (state->tapeset)
1418 elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1419 SERIAL(state) ? "external sort" : "parallel external sort",
1420 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1421 else
1422 elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1423 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1424 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1425 }
1426
1427 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1428 #else
1429
1430 /*
1431 * If you disabled TRACE_SORT, you can still probe sort__done, but you
1432 * ain't getting space-used stats.
1433 */
1434 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1435 #endif
1436
1437 /* Free any execution state created for CLUSTER case */
1438 if (state->estate != NULL)
1439 {
1440 ExprContext *econtext = GetPerTupleExprContext(state->estate);
1441
1442 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1443 FreeExecutorState(state->estate);
1444 }
1445
1446 MemoryContextSwitchTo(oldcontext);
1447
1448 /*
1449 * Free the per-sort memory context, thereby releasing all working memory.
1450 */
1451 MemoryContextReset(state->sortcontext);
1452 }
1453
1454 /*
1455 * tuplesort_end
1456 *
1457 * Release resources and clean up.
1458 *
1459 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1460 * pointing to garbage. Be careful not to attempt to use or free such
1461 * pointers afterwards!
1462 */
1463 void
tuplesort_end(Tuplesortstate * state)1464 tuplesort_end(Tuplesortstate *state)
1465 {
1466 tuplesort_free(state);
1467
1468 /*
1469 * Free the main memory context, including the Tuplesortstate struct
1470 * itself.
1471 */
1472 MemoryContextDelete(state->maincontext);
1473 }
1474
1475 /*
1476 * tuplesort_updatemax
1477 *
1478 * Update maximum resource usage statistics.
1479 */
1480 static void
tuplesort_updatemax(Tuplesortstate * state)1481 tuplesort_updatemax(Tuplesortstate *state)
1482 {
1483 int64 spaceUsed;
1484 bool isSpaceDisk;
1485
1486 /*
1487 * Note: it might seem we should provide both memory and disk usage for a
1488 * disk-based sort. However, the current code doesn't track memory space
1489 * accurately once we have begun to return tuples to the caller (since we
1490 * don't account for pfree's the caller is expected to do), so we cannot
1491 * rely on availMem in a disk sort. This does not seem worth the overhead
1492 * to fix. Is it worth creating an API for the memory context code to
1493 * tell us how much is actually used in sortcontext?
1494 */
1495 if (state->tapeset)
1496 {
1497 isSpaceDisk = true;
1498 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1499 }
1500 else
1501 {
1502 isSpaceDisk = false;
1503 spaceUsed = state->allowedMem - state->availMem;
1504 }
1505
1506 /*
1507 * Sort evicts data to the disk when it wasn't able to fit that data into
1508 * main memory. This is why we assume space used on the disk to be more
1509 * important for tracking resource usage than space used in memory. Note
1510 * that the amount of space occupied by some tupleset on the disk might be
1511 * less than amount of space occupied by the same tupleset in memory due
1512 * to more compact representation.
1513 */
1514 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1515 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1516 {
1517 state->maxSpace = spaceUsed;
1518 state->isMaxSpaceDisk = isSpaceDisk;
1519 state->maxSpaceStatus = state->status;
1520 }
1521 }
1522
1523 /*
1524 * tuplesort_reset
1525 *
1526 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1527 * meta-information in. After tuplesort_reset, tuplesort is ready to start
1528 * a new sort. This allows avoiding recreation of tuple sort states (and
1529 * save resources) when sorting multiple small batches.
1530 */
1531 void
tuplesort_reset(Tuplesortstate * state)1532 tuplesort_reset(Tuplesortstate *state)
1533 {
1534 tuplesort_updatemax(state);
1535 tuplesort_free(state);
1536
1537 /*
1538 * After we've freed up per-batch memory, re-setup all of the state common
1539 * to both the first batch and any subsequent batch.
1540 */
1541 tuplesort_begin_batch(state);
1542
1543 state->lastReturnedTuple = NULL;
1544 state->slabMemoryBegin = NULL;
1545 state->slabMemoryEnd = NULL;
1546 state->slabFreeHead = NULL;
1547 }
1548
1549 /*
1550 * Grow the memtuples[] array, if possible within our memory constraint. We
1551 * must not exceed INT_MAX tuples in memory or the caller-provided memory
1552 * limit. Return true if we were able to enlarge the array, false if not.
1553 *
1554 * Normally, at each increment we double the size of the array. When doing
1555 * that would exceed a limit, we attempt one last, smaller increase (and then
1556 * clear the growmemtuples flag so we don't try any more). That allows us to
1557 * use memory as fully as permitted; sticking to the pure doubling rule could
1558 * result in almost half going unused. Because availMem moves around with
1559 * tuple addition/removal, we need some rule to prevent making repeated small
1560 * increases in memtupsize, which would just be useless thrashing. The
1561 * growmemtuples flag accomplishes that and also prevents useless
1562 * recalculations in this function.
1563 */
1564 static bool
grow_memtuples(Tuplesortstate * state)1565 grow_memtuples(Tuplesortstate *state)
1566 {
1567 int newmemtupsize;
1568 int memtupsize = state->memtupsize;
1569 int64 memNowUsed = state->allowedMem - state->availMem;
1570
1571 /* Forget it if we've already maxed out memtuples, per comment above */
1572 if (!state->growmemtuples)
1573 return false;
1574
1575 /* Select new value of memtupsize */
1576 if (memNowUsed <= state->availMem)
1577 {
1578 /*
1579 * We've used no more than half of allowedMem; double our usage,
1580 * clamping at INT_MAX tuples.
1581 */
1582 if (memtupsize < INT_MAX / 2)
1583 newmemtupsize = memtupsize * 2;
1584 else
1585 {
1586 newmemtupsize = INT_MAX;
1587 state->growmemtuples = false;
1588 }
1589 }
1590 else
1591 {
1592 /*
1593 * This will be the last increment of memtupsize. Abandon doubling
1594 * strategy and instead increase as much as we safely can.
1595 *
1596 * To stay within allowedMem, we can't increase memtupsize by more
1597 * than availMem / sizeof(SortTuple) elements. In practice, we want
1598 * to increase it by considerably less, because we need to leave some
1599 * space for the tuples to which the new array slots will refer. We
1600 * assume the new tuples will be about the same size as the tuples
1601 * we've already seen, and thus we can extrapolate from the space
1602 * consumption so far to estimate an appropriate new size for the
1603 * memtuples array. The optimal value might be higher or lower than
1604 * this estimate, but it's hard to know that in advance. We again
1605 * clamp at INT_MAX tuples.
1606 *
1607 * This calculation is safe against enlarging the array so much that
1608 * LACKMEM becomes true, because the memory currently used includes
1609 * the present array; thus, there would be enough allowedMem for the
1610 * new array elements even if no other memory were currently used.
1611 *
1612 * We do the arithmetic in float8, because otherwise the product of
1613 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1614 * result should be insignificant; but even if we computed a
1615 * completely insane result, the checks below will prevent anything
1616 * really bad from happening.
1617 */
1618 double grow_ratio;
1619
1620 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1621 if (memtupsize * grow_ratio < INT_MAX)
1622 newmemtupsize = (int) (memtupsize * grow_ratio);
1623 else
1624 newmemtupsize = INT_MAX;
1625
1626 /* We won't make any further enlargement attempts */
1627 state->growmemtuples = false;
1628 }
1629
1630 /* Must enlarge array by at least one element, else report failure */
1631 if (newmemtupsize <= memtupsize)
1632 goto noalloc;
1633
1634 /*
1635 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1636 * to ensure our request won't be rejected. Note that we can easily
1637 * exhaust address space before facing this outcome. (This is presently
1638 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1639 * don't rely on that at this distance.)
1640 */
1641 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1642 {
1643 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1644 state->growmemtuples = false; /* can't grow any more */
1645 }
1646
1647 /*
1648 * We need to be sure that we do not cause LACKMEM to become true, else
1649 * the space management algorithm will go nuts. The code above should
1650 * never generate a dangerous request, but to be safe, check explicitly
1651 * that the array growth fits within availMem. (We could still cause
1652 * LACKMEM if the memory chunk overhead associated with the memtuples
1653 * array were to increase. That shouldn't happen because we chose the
1654 * initial array size large enough to ensure that palloc will be treating
1655 * both old and new arrays as separate chunks. But we'll check LACKMEM
1656 * explicitly below just in case.)
1657 */
1658 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1659 goto noalloc;
1660
1661 /* OK, do it */
1662 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1663 state->memtupsize = newmemtupsize;
1664 state->memtuples = (SortTuple *)
1665 repalloc_huge(state->memtuples,
1666 state->memtupsize * sizeof(SortTuple));
1667 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1668 if (LACKMEM(state))
1669 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1670 return true;
1671
1672 noalloc:
1673 /* If for any reason we didn't realloc, shut off future attempts */
1674 state->growmemtuples = false;
1675 return false;
1676 }
1677
1678 /*
1679 * Accept one tuple while collecting input data for sort.
1680 *
1681 * Note that the input data is always copied; the caller need not save it.
1682 */
1683 void
tuplesort_puttupleslot(Tuplesortstate * state,TupleTableSlot * slot)1684 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1685 {
1686 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1687 SortTuple stup;
1688
1689 /*
1690 * Copy the given tuple into memory we control, and decrease availMem.
1691 * Then call the common code.
1692 */
1693 COPYTUP(state, &stup, (void *) slot);
1694
1695 puttuple_common(state, &stup);
1696
1697 MemoryContextSwitchTo(oldcontext);
1698 }
1699
1700 /*
1701 * Accept one tuple while collecting input data for sort.
1702 *
1703 * Note that the input data is always copied; the caller need not save it.
1704 */
1705 void
tuplesort_putheaptuple(Tuplesortstate * state,HeapTuple tup)1706 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1707 {
1708 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1709 SortTuple stup;
1710
1711 /*
1712 * Copy the given tuple into memory we control, and decrease availMem.
1713 * Then call the common code.
1714 */
1715 COPYTUP(state, &stup, (void *) tup);
1716
1717 puttuple_common(state, &stup);
1718
1719 MemoryContextSwitchTo(oldcontext);
1720 }
1721
1722 /*
1723 * Collect one index tuple while collecting input data for sort, building
1724 * it from caller-supplied values.
1725 */
1726 void
tuplesort_putindextuplevalues(Tuplesortstate * state,Relation rel,ItemPointer self,Datum * values,bool * isnull)1727 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1728 ItemPointer self, Datum *values,
1729 bool *isnull)
1730 {
1731 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1732 SortTuple stup;
1733 Datum original;
1734 IndexTuple tuple;
1735
1736 stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1737 tuple = ((IndexTuple) stup.tuple);
1738 tuple->t_tid = *self;
1739 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1740 /* set up first-column key value */
1741 original = index_getattr(tuple,
1742 1,
1743 RelationGetDescr(state->indexRel),
1744 &stup.isnull1);
1745
1746 MemoryContextSwitchTo(state->sortcontext);
1747
1748 if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1749 {
1750 /*
1751 * Store ordinary Datum representation, or NULL value. If there is a
1752 * converter it won't expect NULL values, and cost model is not
1753 * required to account for NULL, so in that case we avoid calling
1754 * converter and just set datum1 to zeroed representation (to be
1755 * consistent, and to support cheap inequality tests for NULL
1756 * abbreviated keys).
1757 */
1758 stup.datum1 = original;
1759 }
1760 else if (!consider_abort_common(state))
1761 {
1762 /* Store abbreviated key representation */
1763 stup.datum1 = state->sortKeys->abbrev_converter(original,
1764 state->sortKeys);
1765 }
1766 else
1767 {
1768 /* Abort abbreviation */
1769 int i;
1770
1771 stup.datum1 = original;
1772
1773 /*
1774 * Set state to be consistent with never trying abbreviation.
1775 *
1776 * Alter datum1 representation in already-copied tuples, so as to
1777 * ensure a consistent representation (current tuple was just
1778 * handled). It does not matter if some dumped tuples are already
1779 * sorted on tape, since serialized tuples lack abbreviated keys
1780 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1781 */
1782 for (i = 0; i < state->memtupcount; i++)
1783 {
1784 SortTuple *mtup = &state->memtuples[i];
1785
1786 tuple = mtup->tuple;
1787 mtup->datum1 = index_getattr(tuple,
1788 1,
1789 RelationGetDescr(state->indexRel),
1790 &mtup->isnull1);
1791 }
1792 }
1793
1794 puttuple_common(state, &stup);
1795
1796 MemoryContextSwitchTo(oldcontext);
1797 }
1798
1799 /*
1800 * Accept one Datum while collecting input data for sort.
1801 *
1802 * If the Datum is pass-by-ref type, the value will be copied.
1803 */
1804 void
tuplesort_putdatum(Tuplesortstate * state,Datum val,bool isNull)1805 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1806 {
1807 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1808 SortTuple stup;
1809
1810 /*
1811 * Pass-by-value types or null values are just stored directly in
1812 * stup.datum1 (and stup.tuple is not used and set to NULL).
1813 *
1814 * Non-null pass-by-reference values need to be copied into memory we
1815 * control, and possibly abbreviated. The copied value is pointed to by
1816 * stup.tuple and is treated as the canonical copy (e.g. to return via
1817 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1818 * abbreviated value if abbreviation is happening, otherwise it's
1819 * identical to stup.tuple.
1820 */
1821
1822 if (isNull || !state->tuples)
1823 {
1824 /*
1825 * Set datum1 to zeroed representation for NULLs (to be consistent,
1826 * and to support cheap inequality tests for NULL abbreviated keys).
1827 */
1828 stup.datum1 = !isNull ? val : (Datum) 0;
1829 stup.isnull1 = isNull;
1830 stup.tuple = NULL; /* no separate storage */
1831 MemoryContextSwitchTo(state->sortcontext);
1832 }
1833 else
1834 {
1835 Datum original = datumCopy(val, false, state->datumTypeLen);
1836
1837 stup.isnull1 = false;
1838 stup.tuple = DatumGetPointer(original);
1839 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1840 MemoryContextSwitchTo(state->sortcontext);
1841
1842 if (!state->sortKeys->abbrev_converter)
1843 {
1844 stup.datum1 = original;
1845 }
1846 else if (!consider_abort_common(state))
1847 {
1848 /* Store abbreviated key representation */
1849 stup.datum1 = state->sortKeys->abbrev_converter(original,
1850 state->sortKeys);
1851 }
1852 else
1853 {
1854 /* Abort abbreviation */
1855 int i;
1856
1857 stup.datum1 = original;
1858
1859 /*
1860 * Set state to be consistent with never trying abbreviation.
1861 *
1862 * Alter datum1 representation in already-copied tuples, so as to
1863 * ensure a consistent representation (current tuple was just
1864 * handled). It does not matter if some dumped tuples are already
1865 * sorted on tape, since serialized tuples lack abbreviated keys
1866 * (TSS_BUILDRUNS state prevents control reaching here in any
1867 * case).
1868 */
1869 for (i = 0; i < state->memtupcount; i++)
1870 {
1871 SortTuple *mtup = &state->memtuples[i];
1872
1873 mtup->datum1 = PointerGetDatum(mtup->tuple);
1874 }
1875 }
1876 }
1877
1878 puttuple_common(state, &stup);
1879
1880 MemoryContextSwitchTo(oldcontext);
1881 }
1882
1883 /*
1884 * Shared code for tuple and datum cases.
1885 */
1886 static void
puttuple_common(Tuplesortstate * state,SortTuple * tuple)1887 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1888 {
1889 Assert(!LEADER(state));
1890
1891 switch (state->status)
1892 {
1893 case TSS_INITIAL:
1894
1895 /*
1896 * Save the tuple into the unsorted array. First, grow the array
1897 * as needed. Note that we try to grow the array when there is
1898 * still one free slot remaining --- if we fail, there'll still be
1899 * room to store the incoming tuple, and then we'll switch to
1900 * tape-based operation.
1901 */
1902 if (state->memtupcount >= state->memtupsize - 1)
1903 {
1904 (void) grow_memtuples(state);
1905 Assert(state->memtupcount < state->memtupsize);
1906 }
1907 state->memtuples[state->memtupcount++] = *tuple;
1908
1909 /*
1910 * Check if it's time to switch over to a bounded heapsort. We do
1911 * so if the input tuple count exceeds twice the desired tuple
1912 * count (this is a heuristic for where heapsort becomes cheaper
1913 * than a quicksort), or if we've just filled workMem and have
1914 * enough tuples to meet the bound.
1915 *
1916 * Note that once we enter TSS_BOUNDED state we will always try to
1917 * complete the sort that way. In the worst case, if later input
1918 * tuples are larger than earlier ones, this might cause us to
1919 * exceed workMem significantly.
1920 */
1921 if (state->bounded &&
1922 (state->memtupcount > state->bound * 2 ||
1923 (state->memtupcount > state->bound && LACKMEM(state))))
1924 {
1925 #ifdef TRACE_SORT
1926 if (trace_sort)
1927 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1928 state->memtupcount,
1929 pg_rusage_show(&state->ru_start));
1930 #endif
1931 make_bounded_heap(state);
1932 return;
1933 }
1934
1935 /*
1936 * Done if we still fit in available memory and have array slots.
1937 */
1938 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1939 return;
1940
1941 /*
1942 * Nope; time to switch to tape-based operation.
1943 */
1944 inittapes(state, true);
1945
1946 /*
1947 * Dump all tuples.
1948 */
1949 dumptuples(state, false);
1950 break;
1951
1952 case TSS_BOUNDED:
1953
1954 /*
1955 * We don't want to grow the array here, so check whether the new
1956 * tuple can be discarded before putting it in. This should be a
1957 * good speed optimization, too, since when there are many more
1958 * input tuples than the bound, most input tuples can be discarded
1959 * with just this one comparison. Note that because we currently
1960 * have the sort direction reversed, we must check for <= not >=.
1961 */
1962 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1963 {
1964 /* new tuple <= top of the heap, so we can discard it */
1965 free_sort_tuple(state, tuple);
1966 CHECK_FOR_INTERRUPTS();
1967 }
1968 else
1969 {
1970 /* discard top of heap, replacing it with the new tuple */
1971 free_sort_tuple(state, &state->memtuples[0]);
1972 tuplesort_heap_replace_top(state, tuple);
1973 }
1974 break;
1975
1976 case TSS_BUILDRUNS:
1977
1978 /*
1979 * Save the tuple into the unsorted array (there must be space)
1980 */
1981 state->memtuples[state->memtupcount++] = *tuple;
1982
1983 /*
1984 * If we are over the memory limit, dump all tuples.
1985 */
1986 dumptuples(state, false);
1987 break;
1988
1989 default:
1990 elog(ERROR, "invalid tuplesort state");
1991 break;
1992 }
1993 }
1994
1995 static bool
consider_abort_common(Tuplesortstate * state)1996 consider_abort_common(Tuplesortstate *state)
1997 {
1998 Assert(state->sortKeys[0].abbrev_converter != NULL);
1999 Assert(state->sortKeys[0].abbrev_abort != NULL);
2000 Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
2001
2002 /*
2003 * Check effectiveness of abbreviation optimization. Consider aborting
2004 * when still within memory limit.
2005 */
2006 if (state->status == TSS_INITIAL &&
2007 state->memtupcount >= state->abbrevNext)
2008 {
2009 state->abbrevNext *= 2;
2010
2011 /*
2012 * Check opclass-supplied abbreviation abort routine. It may indicate
2013 * that abbreviation should not proceed.
2014 */
2015 if (!state->sortKeys->abbrev_abort(state->memtupcount,
2016 state->sortKeys))
2017 return false;
2018
2019 /*
2020 * Finally, restore authoritative comparator, and indicate that
2021 * abbreviation is not in play by setting abbrev_converter to NULL
2022 */
2023 state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
2024 state->sortKeys[0].abbrev_converter = NULL;
2025 /* Not strictly necessary, but be tidy */
2026 state->sortKeys[0].abbrev_abort = NULL;
2027 state->sortKeys[0].abbrev_full_comparator = NULL;
2028
2029 /* Give up - expect original pass-by-value representation */
2030 return true;
2031 }
2032
2033 return false;
2034 }
2035
2036 /*
2037 * All tuples have been provided; finish the sort.
2038 */
2039 void
tuplesort_performsort(Tuplesortstate * state)2040 tuplesort_performsort(Tuplesortstate *state)
2041 {
2042 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2043
2044 #ifdef TRACE_SORT
2045 if (trace_sort)
2046 elog(LOG, "performsort of worker %d starting: %s",
2047 state->worker, pg_rusage_show(&state->ru_start));
2048 #endif
2049
2050 switch (state->status)
2051 {
2052 case TSS_INITIAL:
2053
2054 /*
2055 * We were able to accumulate all the tuples within the allowed
2056 * amount of memory, or leader to take over worker tapes
2057 */
2058 if (SERIAL(state))
2059 {
2060 /* Just qsort 'em and we're done */
2061 tuplesort_sort_memtuples(state);
2062 state->status = TSS_SORTEDINMEM;
2063 }
2064 else if (WORKER(state))
2065 {
2066 /*
2067 * Parallel workers must still dump out tuples to tape. No
2068 * merge is required to produce single output run, though.
2069 */
2070 inittapes(state, false);
2071 dumptuples(state, true);
2072 worker_nomergeruns(state);
2073 state->status = TSS_SORTEDONTAPE;
2074 }
2075 else
2076 {
2077 /*
2078 * Leader will take over worker tapes and merge worker runs.
2079 * Note that mergeruns sets the correct state->status.
2080 */
2081 leader_takeover_tapes(state);
2082 mergeruns(state);
2083 }
2084 state->current = 0;
2085 state->eof_reached = false;
2086 state->markpos_block = 0L;
2087 state->markpos_offset = 0;
2088 state->markpos_eof = false;
2089 break;
2090
2091 case TSS_BOUNDED:
2092
2093 /*
2094 * We were able to accumulate all the tuples required for output
2095 * in memory, using a heap to eliminate excess tuples. Now we
2096 * have to transform the heap to a properly-sorted array.
2097 */
2098 sort_bounded_heap(state);
2099 state->current = 0;
2100 state->eof_reached = false;
2101 state->markpos_offset = 0;
2102 state->markpos_eof = false;
2103 state->status = TSS_SORTEDINMEM;
2104 break;
2105
2106 case TSS_BUILDRUNS:
2107
2108 /*
2109 * Finish tape-based sort. First, flush all tuples remaining in
2110 * memory out to tape; then merge until we have a single remaining
2111 * run (or, if !randomAccess and !WORKER(), one run per tape).
2112 * Note that mergeruns sets the correct state->status.
2113 */
2114 dumptuples(state, true);
2115 mergeruns(state);
2116 state->eof_reached = false;
2117 state->markpos_block = 0L;
2118 state->markpos_offset = 0;
2119 state->markpos_eof = false;
2120 break;
2121
2122 default:
2123 elog(ERROR, "invalid tuplesort state");
2124 break;
2125 }
2126
2127 #ifdef TRACE_SORT
2128 if (trace_sort)
2129 {
2130 if (state->status == TSS_FINALMERGE)
2131 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
2132 state->worker, state->activeTapes,
2133 pg_rusage_show(&state->ru_start));
2134 else
2135 elog(LOG, "performsort of worker %d done: %s",
2136 state->worker, pg_rusage_show(&state->ru_start));
2137 }
2138 #endif
2139
2140 MemoryContextSwitchTo(oldcontext);
2141 }
2142
2143 /*
2144 * Internal routine to fetch the next tuple in either forward or back
2145 * direction into *stup. Returns false if no more tuples.
2146 * Returned tuple belongs to tuplesort memory context, and must not be freed
2147 * by caller. Note that fetched tuple is stored in memory that may be
2148 * recycled by any future fetch.
2149 */
2150 static bool
tuplesort_gettuple_common(Tuplesortstate * state,bool forward,SortTuple * stup)2151 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
2152 SortTuple *stup)
2153 {
2154 unsigned int tuplen;
2155 size_t nmoved;
2156
2157 Assert(!WORKER(state));
2158
2159 switch (state->status)
2160 {
2161 case TSS_SORTEDINMEM:
2162 Assert(forward || state->randomAccess);
2163 Assert(!state->slabAllocatorUsed);
2164 if (forward)
2165 {
2166 if (state->current < state->memtupcount)
2167 {
2168 *stup = state->memtuples[state->current++];
2169 return true;
2170 }
2171 state->eof_reached = true;
2172
2173 /*
2174 * Complain if caller tries to retrieve more tuples than
2175 * originally asked for in a bounded sort. This is because
2176 * returning EOF here might be the wrong thing.
2177 */
2178 if (state->bounded && state->current >= state->bound)
2179 elog(ERROR, "retrieved too many tuples in a bounded sort");
2180
2181 return false;
2182 }
2183 else
2184 {
2185 if (state->current <= 0)
2186 return false;
2187
2188 /*
2189 * if all tuples are fetched already then we return last
2190 * tuple, else - tuple before last returned.
2191 */
2192 if (state->eof_reached)
2193 state->eof_reached = false;
2194 else
2195 {
2196 state->current--; /* last returned tuple */
2197 if (state->current <= 0)
2198 return false;
2199 }
2200 *stup = state->memtuples[state->current - 1];
2201 return true;
2202 }
2203 break;
2204
2205 case TSS_SORTEDONTAPE:
2206 Assert(forward || state->randomAccess);
2207 Assert(state->slabAllocatorUsed);
2208
2209 /*
2210 * The slot that held the tuple that we returned in previous
2211 * gettuple call can now be reused.
2212 */
2213 if (state->lastReturnedTuple)
2214 {
2215 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2216 state->lastReturnedTuple = NULL;
2217 }
2218
2219 if (forward)
2220 {
2221 if (state->eof_reached)
2222 return false;
2223
2224 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
2225 {
2226 READTUP(state, stup, state->result_tape, tuplen);
2227
2228 /*
2229 * Remember the tuple we return, so that we can recycle
2230 * its memory on next call. (This can be NULL, in the
2231 * !state->tuples case).
2232 */
2233 state->lastReturnedTuple = stup->tuple;
2234
2235 return true;
2236 }
2237 else
2238 {
2239 state->eof_reached = true;
2240 return false;
2241 }
2242 }
2243
2244 /*
2245 * Backward.
2246 *
2247 * if all tuples are fetched already then we return last tuple,
2248 * else - tuple before last returned.
2249 */
2250 if (state->eof_reached)
2251 {
2252 /*
2253 * Seek position is pointing just past the zero tuplen at the
2254 * end of file; back up to fetch last tuple's ending length
2255 * word. If seek fails we must have a completely empty file.
2256 */
2257 nmoved = LogicalTapeBackspace(state->tapeset,
2258 state->result_tape,
2259 2 * sizeof(unsigned int));
2260 if (nmoved == 0)
2261 return false;
2262 else if (nmoved != 2 * sizeof(unsigned int))
2263 elog(ERROR, "unexpected tape position");
2264 state->eof_reached = false;
2265 }
2266 else
2267 {
2268 /*
2269 * Back up and fetch previously-returned tuple's ending length
2270 * word. If seek fails, assume we are at start of file.
2271 */
2272 nmoved = LogicalTapeBackspace(state->tapeset,
2273 state->result_tape,
2274 sizeof(unsigned int));
2275 if (nmoved == 0)
2276 return false;
2277 else if (nmoved != sizeof(unsigned int))
2278 elog(ERROR, "unexpected tape position");
2279 tuplen = getlen(state, state->result_tape, false);
2280
2281 /*
2282 * Back up to get ending length word of tuple before it.
2283 */
2284 nmoved = LogicalTapeBackspace(state->tapeset,
2285 state->result_tape,
2286 tuplen + 2 * sizeof(unsigned int));
2287 if (nmoved == tuplen + sizeof(unsigned int))
2288 {
2289 /*
2290 * We backed up over the previous tuple, but there was no
2291 * ending length word before it. That means that the prev
2292 * tuple is the first tuple in the file. It is now the
2293 * next to read in forward direction (not obviously right,
2294 * but that is what in-memory case does).
2295 */
2296 return false;
2297 }
2298 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2299 elog(ERROR, "bogus tuple length in backward scan");
2300 }
2301
2302 tuplen = getlen(state, state->result_tape, false);
2303
2304 /*
2305 * Now we have the length of the prior tuple, back up and read it.
2306 * Note: READTUP expects we are positioned after the initial
2307 * length word of the tuple, so back up to that point.
2308 */
2309 nmoved = LogicalTapeBackspace(state->tapeset,
2310 state->result_tape,
2311 tuplen);
2312 if (nmoved != tuplen)
2313 elog(ERROR, "bogus tuple length in backward scan");
2314 READTUP(state, stup, state->result_tape, tuplen);
2315
2316 /*
2317 * Remember the tuple we return, so that we can recycle its memory
2318 * on next call. (This can be NULL, in the Datum case).
2319 */
2320 state->lastReturnedTuple = stup->tuple;
2321
2322 return true;
2323
2324 case TSS_FINALMERGE:
2325 Assert(forward);
2326 /* We are managing memory ourselves, with the slab allocator. */
2327 Assert(state->slabAllocatorUsed);
2328
2329 /*
2330 * The slab slot holding the tuple that we returned in previous
2331 * gettuple call can now be reused.
2332 */
2333 if (state->lastReturnedTuple)
2334 {
2335 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2336 state->lastReturnedTuple = NULL;
2337 }
2338
2339 /*
2340 * This code should match the inner loop of mergeonerun().
2341 */
2342 if (state->memtupcount > 0)
2343 {
2344 int srcTape = state->memtuples[0].srctape;
2345 SortTuple newtup;
2346
2347 *stup = state->memtuples[0];
2348
2349 /*
2350 * Remember the tuple we return, so that we can recycle its
2351 * memory on next call. (This can be NULL, in the Datum case).
2352 */
2353 state->lastReturnedTuple = stup->tuple;
2354
2355 /*
2356 * Pull next tuple from tape, and replace the returned tuple
2357 * at top of the heap with it.
2358 */
2359 if (!mergereadnext(state, srcTape, &newtup))
2360 {
2361 /*
2362 * If no more data, we've reached end of run on this tape.
2363 * Remove the top node from the heap.
2364 */
2365 tuplesort_heap_delete_top(state);
2366
2367 /*
2368 * Rewind to free the read buffer. It'd go away at the
2369 * end of the sort anyway, but better to release the
2370 * memory early.
2371 */
2372 LogicalTapeRewindForWrite(state->tapeset, srcTape);
2373 return true;
2374 }
2375 newtup.srctape = srcTape;
2376 tuplesort_heap_replace_top(state, &newtup);
2377 return true;
2378 }
2379 return false;
2380
2381 default:
2382 elog(ERROR, "invalid tuplesort state");
2383 return false; /* keep compiler quiet */
2384 }
2385 }
2386
2387 /*
2388 * Fetch the next tuple in either forward or back direction.
2389 * If successful, put tuple in slot and return true; else, clear the slot
2390 * and return false.
2391 *
2392 * Caller may optionally be passed back abbreviated value (on true return
2393 * value) when abbreviation was used, which can be used to cheaply avoid
2394 * equality checks that might otherwise be required. Caller can safely make a
2395 * determination of "non-equal tuple" based on simple binary inequality. A
2396 * NULL value in leading attribute will set abbreviated value to zeroed
2397 * representation, which caller may rely on in abbreviated inequality check.
2398 *
2399 * If copy is true, the slot receives a tuple that's been copied into the
2400 * caller's memory context, so that it will stay valid regardless of future
2401 * manipulations of the tuplesort's state (up to and including deleting the
2402 * tuplesort). If copy is false, the slot will just receive a pointer to a
2403 * tuple held within the tuplesort, which is more efficient, but only safe for
2404 * callers that are prepared to have any subsequent manipulation of the
2405 * tuplesort's state invalidate slot contents.
2406 */
2407 bool
tuplesort_gettupleslot(Tuplesortstate * state,bool forward,bool copy,TupleTableSlot * slot,Datum * abbrev)2408 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2409 TupleTableSlot *slot, Datum *abbrev)
2410 {
2411 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2412 SortTuple stup;
2413
2414 if (!tuplesort_gettuple_common(state, forward, &stup))
2415 stup.tuple = NULL;
2416
2417 MemoryContextSwitchTo(oldcontext);
2418
2419 if (stup.tuple)
2420 {
2421 /* Record abbreviated key for caller */
2422 if (state->sortKeys->abbrev_converter && abbrev)
2423 *abbrev = stup.datum1;
2424
2425 if (copy)
2426 stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2427
2428 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2429 return true;
2430 }
2431 else
2432 {
2433 ExecClearTuple(slot);
2434 return false;
2435 }
2436 }
2437
2438 /*
2439 * Fetch the next tuple in either forward or back direction.
2440 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
2441 * context, and must not be freed by caller. Caller may not rely on tuple
2442 * remaining valid after any further manipulation of tuplesort.
2443 */
2444 HeapTuple
tuplesort_getheaptuple(Tuplesortstate * state,bool forward)2445 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2446 {
2447 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2448 SortTuple stup;
2449
2450 if (!tuplesort_gettuple_common(state, forward, &stup))
2451 stup.tuple = NULL;
2452
2453 MemoryContextSwitchTo(oldcontext);
2454
2455 return stup.tuple;
2456 }
2457
2458 /*
2459 * Fetch the next index tuple in either forward or back direction.
2460 * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
2461 * context, and must not be freed by caller. Caller may not rely on tuple
2462 * remaining valid after any further manipulation of tuplesort.
2463 */
2464 IndexTuple
tuplesort_getindextuple(Tuplesortstate * state,bool forward)2465 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2466 {
2467 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2468 SortTuple stup;
2469
2470 if (!tuplesort_gettuple_common(state, forward, &stup))
2471 stup.tuple = NULL;
2472
2473 MemoryContextSwitchTo(oldcontext);
2474
2475 return (IndexTuple) stup.tuple;
2476 }
2477
2478 /*
2479 * Fetch the next Datum in either forward or back direction.
2480 * Returns false if no more datums.
2481 *
2482 * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2483 * in caller's context, and is now owned by the caller (this differs from
2484 * similar routines for other types of tuplesorts).
2485 *
2486 * Caller may optionally be passed back abbreviated value (on true return
2487 * value) when abbreviation was used, which can be used to cheaply avoid
2488 * equality checks that might otherwise be required. Caller can safely make a
2489 * determination of "non-equal tuple" based on simple binary inequality. A
2490 * NULL value will have a zeroed abbreviated value representation, which caller
2491 * may rely on in abbreviated inequality check.
2492 */
2493 bool
tuplesort_getdatum(Tuplesortstate * state,bool forward,Datum * val,bool * isNull,Datum * abbrev)2494 tuplesort_getdatum(Tuplesortstate *state, bool forward,
2495 Datum *val, bool *isNull, Datum *abbrev)
2496 {
2497 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2498 SortTuple stup;
2499
2500 if (!tuplesort_gettuple_common(state, forward, &stup))
2501 {
2502 MemoryContextSwitchTo(oldcontext);
2503 return false;
2504 }
2505
2506 /* Ensure we copy into caller's memory context */
2507 MemoryContextSwitchTo(oldcontext);
2508
2509 /* Record abbreviated key for caller */
2510 if (state->sortKeys->abbrev_converter && abbrev)
2511 *abbrev = stup.datum1;
2512
2513 if (stup.isnull1 || !state->tuples)
2514 {
2515 *val = stup.datum1;
2516 *isNull = stup.isnull1;
2517 }
2518 else
2519 {
2520 /* use stup.tuple because stup.datum1 may be an abbreviation */
2521 *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2522 *isNull = false;
2523 }
2524
2525 return true;
2526 }
2527
2528 /*
2529 * Advance over N tuples in either forward or back direction,
2530 * without returning any data. N==0 is a no-op.
2531 * Returns true if successful, false if ran out of tuples.
2532 */
2533 bool
tuplesort_skiptuples(Tuplesortstate * state,int64 ntuples,bool forward)2534 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2535 {
2536 MemoryContext oldcontext;
2537
2538 /*
2539 * We don't actually support backwards skip yet, because no callers need
2540 * it. The API is designed to allow for that later, though.
2541 */
2542 Assert(forward);
2543 Assert(ntuples >= 0);
2544 Assert(!WORKER(state));
2545
2546 switch (state->status)
2547 {
2548 case TSS_SORTEDINMEM:
2549 if (state->memtupcount - state->current >= ntuples)
2550 {
2551 state->current += ntuples;
2552 return true;
2553 }
2554 state->current = state->memtupcount;
2555 state->eof_reached = true;
2556
2557 /*
2558 * Complain if caller tries to retrieve more tuples than
2559 * originally asked for in a bounded sort. This is because
2560 * returning EOF here might be the wrong thing.
2561 */
2562 if (state->bounded && state->current >= state->bound)
2563 elog(ERROR, "retrieved too many tuples in a bounded sort");
2564
2565 return false;
2566
2567 case TSS_SORTEDONTAPE:
2568 case TSS_FINALMERGE:
2569
2570 /*
2571 * We could probably optimize these cases better, but for now it's
2572 * not worth the trouble.
2573 */
2574 oldcontext = MemoryContextSwitchTo(state->sortcontext);
2575 while (ntuples-- > 0)
2576 {
2577 SortTuple stup;
2578
2579 if (!tuplesort_gettuple_common(state, forward, &stup))
2580 {
2581 MemoryContextSwitchTo(oldcontext);
2582 return false;
2583 }
2584 CHECK_FOR_INTERRUPTS();
2585 }
2586 MemoryContextSwitchTo(oldcontext);
2587 return true;
2588
2589 default:
2590 elog(ERROR, "invalid tuplesort state");
2591 return false; /* keep compiler quiet */
2592 }
2593 }
2594
2595 /*
2596 * tuplesort_merge_order - report merge order we'll use for given memory
2597 * (note: "merge order" just means the number of input tapes in the merge).
2598 *
2599 * This is exported for use by the planner. allowedMem is in bytes.
2600 */
2601 int
tuplesort_merge_order(int64 allowedMem)2602 tuplesort_merge_order(int64 allowedMem)
2603 {
2604 int mOrder;
2605
2606 /*
2607 * We need one tape for each merge input, plus another one for the output,
2608 * and each of these tapes needs buffer space. In addition we want
2609 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2610 * count).
2611 *
2612 * Note: you might be thinking we need to account for the memtuples[]
2613 * array in this calculation, but we effectively treat that as part of the
2614 * MERGE_BUFFER_SIZE workspace.
2615 */
2616 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2617 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2618
2619 /*
2620 * Even in minimum memory, use at least a MINORDER merge. On the other
2621 * hand, even when we have lots of memory, do not use more than a MAXORDER
2622 * merge. Tapes are pretty cheap, but they're not entirely free. Each
2623 * additional tape reduces the amount of memory available to build runs,
2624 * which in turn can cause the same sort to need more runs, which makes
2625 * merging slower even if it can still be done in a single pass. Also,
2626 * high order merges are quite slow due to CPU cache effects; it can be
2627 * faster to pay the I/O cost of a polyphase merge than to perform a
2628 * single merge pass across many hundreds of tapes.
2629 */
2630 mOrder = Max(mOrder, MINORDER);
2631 mOrder = Min(mOrder, MAXORDER);
2632
2633 return mOrder;
2634 }
2635
2636 /*
2637 * inittapes - initialize for tape sorting.
2638 *
2639 * This is called only if we have found we won't sort in memory.
2640 */
2641 static void
inittapes(Tuplesortstate * state,bool mergeruns)2642 inittapes(Tuplesortstate *state, bool mergeruns)
2643 {
2644 int maxTapes,
2645 j;
2646
2647 Assert(!LEADER(state));
2648
2649 if (mergeruns)
2650 {
2651 /* Compute number of tapes to use: merge order plus 1 */
2652 maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2653 }
2654 else
2655 {
2656 /* Workers can sometimes produce single run, output without merge */
2657 Assert(WORKER(state));
2658 maxTapes = MINORDER + 1;
2659 }
2660
2661 #ifdef TRACE_SORT
2662 if (trace_sort)
2663 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2664 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2665 #endif
2666
2667 /* Create the tape set and allocate the per-tape data arrays */
2668 inittapestate(state, maxTapes);
2669 state->tapeset =
2670 LogicalTapeSetCreate(maxTapes, false, NULL,
2671 state->shared ? &state->shared->fileset : NULL,
2672 state->worker);
2673
2674 state->currentRun = 0;
2675
2676 /*
2677 * Initialize variables of Algorithm D (step D1).
2678 */
2679 for (j = 0; j < maxTapes; j++)
2680 {
2681 state->tp_fib[j] = 1;
2682 state->tp_runs[j] = 0;
2683 state->tp_dummy[j] = 1;
2684 state->tp_tapenum[j] = j;
2685 }
2686 state->tp_fib[state->tapeRange] = 0;
2687 state->tp_dummy[state->tapeRange] = 0;
2688
2689 state->Level = 1;
2690 state->destTape = 0;
2691
2692 state->status = TSS_BUILDRUNS;
2693 }
2694
2695 /*
2696 * inittapestate - initialize generic tape management state
2697 */
2698 static void
inittapestate(Tuplesortstate * state,int maxTapes)2699 inittapestate(Tuplesortstate *state, int maxTapes)
2700 {
2701 int64 tapeSpace;
2702
2703 /*
2704 * Decrease availMem to reflect the space needed for tape buffers; but
2705 * don't decrease it to the point that we have no room for tuples. (That
2706 * case is only likely to occur if sorting pass-by-value Datums; in all
2707 * other scenarios the memtuples[] array is unlikely to occupy more than
2708 * half of allowedMem. In the pass-by-value case it's not important to
2709 * account for tuple space, so we don't care if LACKMEM becomes
2710 * inaccurate.)
2711 */
2712 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2713
2714 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2715 USEMEM(state, tapeSpace);
2716
2717 /*
2718 * Make sure that the temp file(s) underlying the tape set are created in
2719 * suitable temp tablespaces. For parallel sorts, this should have been
2720 * called already, but it doesn't matter if it is called a second time.
2721 */
2722 PrepareTempTablespaces();
2723
2724 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2725 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2726 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2727 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2728 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2729
2730 /* Record # of tapes allocated (for duration of sort) */
2731 state->maxTapes = maxTapes;
2732 /* Record maximum # of tapes usable as inputs when merging */
2733 state->tapeRange = maxTapes - 1;
2734 }
2735
2736 /*
2737 * selectnewtape -- select new tape for new initial run.
2738 *
2739 * This is called after finishing a run when we know another run
2740 * must be started. This implements steps D3, D4 of Algorithm D.
2741 */
2742 static void
selectnewtape(Tuplesortstate * state)2743 selectnewtape(Tuplesortstate *state)
2744 {
2745 int j;
2746 int a;
2747
2748 /* Step D3: advance j (destTape) */
2749 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2750 {
2751 state->destTape++;
2752 return;
2753 }
2754 if (state->tp_dummy[state->destTape] != 0)
2755 {
2756 state->destTape = 0;
2757 return;
2758 }
2759
2760 /* Step D4: increase level */
2761 state->Level++;
2762 a = state->tp_fib[0];
2763 for (j = 0; j < state->tapeRange; j++)
2764 {
2765 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2766 state->tp_fib[j] = a + state->tp_fib[j + 1];
2767 }
2768 state->destTape = 0;
2769 }
2770
2771 /*
2772 * Initialize the slab allocation arena, for the given number of slots.
2773 */
2774 static void
init_slab_allocator(Tuplesortstate * state,int numSlots)2775 init_slab_allocator(Tuplesortstate *state, int numSlots)
2776 {
2777 if (numSlots > 0)
2778 {
2779 char *p;
2780 int i;
2781
2782 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2783 state->slabMemoryEnd = state->slabMemoryBegin +
2784 numSlots * SLAB_SLOT_SIZE;
2785 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2786 USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2787
2788 p = state->slabMemoryBegin;
2789 for (i = 0; i < numSlots - 1; i++)
2790 {
2791 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2792 p += SLAB_SLOT_SIZE;
2793 }
2794 ((SlabSlot *) p)->nextfree = NULL;
2795 }
2796 else
2797 {
2798 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2799 state->slabFreeHead = NULL;
2800 }
2801 state->slabAllocatorUsed = true;
2802 }
2803
2804 /*
2805 * mergeruns -- merge all the completed initial runs.
2806 *
2807 * This implements steps D5, D6 of Algorithm D. All input data has
2808 * already been written to initial runs on tape (see dumptuples).
2809 */
2810 static void
mergeruns(Tuplesortstate * state)2811 mergeruns(Tuplesortstate *state)
2812 {
2813 int tapenum,
2814 svTape,
2815 svRuns,
2816 svDummy;
2817 int numTapes;
2818 int numInputTapes;
2819
2820 Assert(state->status == TSS_BUILDRUNS);
2821 Assert(state->memtupcount == 0);
2822
2823 if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2824 {
2825 /*
2826 * If there are multiple runs to be merged, when we go to read back
2827 * tuples from disk, abbreviated keys will not have been stored, and
2828 * we don't care to regenerate them. Disable abbreviation from this
2829 * point on.
2830 */
2831 state->sortKeys->abbrev_converter = NULL;
2832 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2833
2834 /* Not strictly necessary, but be tidy */
2835 state->sortKeys->abbrev_abort = NULL;
2836 state->sortKeys->abbrev_full_comparator = NULL;
2837 }
2838
2839 /*
2840 * Reset tuple memory. We've freed all the tuples that we previously
2841 * allocated. We will use the slab allocator from now on.
2842 */
2843 MemoryContextResetOnly(state->tuplecontext);
2844
2845 /*
2846 * We no longer need a large memtuples array. (We will allocate a smaller
2847 * one for the heap later.)
2848 */
2849 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2850 pfree(state->memtuples);
2851 state->memtuples = NULL;
2852
2853 /*
2854 * If we had fewer runs than tapes, refund the memory that we imagined we
2855 * would need for the tape buffers of the unused tapes.
2856 *
2857 * numTapes and numInputTapes reflect the actual number of tapes we will
2858 * use. Note that the output tape's tape number is maxTapes - 1, so the
2859 * tape numbers of the used tapes are not consecutive, and you cannot just
2860 * loop from 0 to numTapes to visit all used tapes!
2861 */
2862 if (state->Level == 1)
2863 {
2864 numInputTapes = state->currentRun;
2865 numTapes = numInputTapes + 1;
2866 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2867 }
2868 else
2869 {
2870 numInputTapes = state->tapeRange;
2871 numTapes = state->maxTapes;
2872 }
2873
2874 /*
2875 * Initialize the slab allocator. We need one slab slot per input tape,
2876 * for the tuples in the heap, plus one to hold the tuple last returned
2877 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2878 * however, we don't need to do allocate anything.)
2879 *
2880 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2881 * to track memory usage of individual tuples.
2882 */
2883 if (state->tuples)
2884 init_slab_allocator(state, numInputTapes + 1);
2885 else
2886 init_slab_allocator(state, 0);
2887
2888 /*
2889 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2890 * from each input tape.
2891 */
2892 state->memtupsize = numInputTapes;
2893 state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
2894 numInputTapes * sizeof(SortTuple));
2895 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2896
2897 /*
2898 * Use all the remaining memory we have available for read buffers among
2899 * the input tapes.
2900 *
2901 * We don't try to "rebalance" the memory among tapes, when we start a new
2902 * merge phase, even if some tapes are inactive in the new phase. That
2903 * would be hard, because logtape.c doesn't know where one run ends and
2904 * another begins. When a new merge phase begins, and a tape doesn't
2905 * participate in it, its buffer nevertheless already contains tuples from
2906 * the next run on same tape, so we cannot release the buffer. That's OK
2907 * in practice, merge performance isn't that sensitive to the amount of
2908 * buffers used, and most merge phases use all or almost all tapes,
2909 * anyway.
2910 */
2911 #ifdef TRACE_SORT
2912 if (trace_sort)
2913 elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2914 state->worker, state->availMem / 1024, numInputTapes);
2915 #endif
2916
2917 state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2918 USEMEM(state, state->read_buffer_size * numInputTapes);
2919
2920 /* End of step D2: rewind all output tapes to prepare for merging */
2921 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2922 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2923
2924 for (;;)
2925 {
2926 /*
2927 * At this point we know that tape[T] is empty. If there's just one
2928 * (real or dummy) run left on each input tape, then only one merge
2929 * pass remains. If we don't have to produce a materialized sorted
2930 * tape, we can stop at this point and do the final merge on-the-fly.
2931 */
2932 if (!state->randomAccess && !WORKER(state))
2933 {
2934 bool allOneRun = true;
2935
2936 Assert(state->tp_runs[state->tapeRange] == 0);
2937 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2938 {
2939 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2940 {
2941 allOneRun = false;
2942 break;
2943 }
2944 }
2945 if (allOneRun)
2946 {
2947 /* Tell logtape.c we won't be writing anymore */
2948 LogicalTapeSetForgetFreeSpace(state->tapeset);
2949 /* Initialize for the final merge pass */
2950 beginmerge(state);
2951 state->status = TSS_FINALMERGE;
2952 return;
2953 }
2954 }
2955
2956 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
2957 while (state->tp_runs[state->tapeRange - 1] ||
2958 state->tp_dummy[state->tapeRange - 1])
2959 {
2960 bool allDummy = true;
2961
2962 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2963 {
2964 if (state->tp_dummy[tapenum] == 0)
2965 {
2966 allDummy = false;
2967 break;
2968 }
2969 }
2970
2971 if (allDummy)
2972 {
2973 state->tp_dummy[state->tapeRange]++;
2974 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2975 state->tp_dummy[tapenum]--;
2976 }
2977 else
2978 mergeonerun(state);
2979 }
2980
2981 /* Step D6: decrease level */
2982 if (--state->Level == 0)
2983 break;
2984 /* rewind output tape T to use as new input */
2985 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2986 state->read_buffer_size);
2987 /* rewind used-up input tape P, and prepare it for write pass */
2988 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2989 state->tp_runs[state->tapeRange - 1] = 0;
2990
2991 /*
2992 * reassign tape units per step D6; note we no longer care about A[]
2993 */
2994 svTape = state->tp_tapenum[state->tapeRange];
2995 svDummy = state->tp_dummy[state->tapeRange];
2996 svRuns = state->tp_runs[state->tapeRange];
2997 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2998 {
2999 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
3000 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
3001 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
3002 }
3003 state->tp_tapenum[0] = svTape;
3004 state->tp_dummy[0] = svDummy;
3005 state->tp_runs[0] = svRuns;
3006 }
3007
3008 /*
3009 * Done. Knuth says that the result is on TAPE[1], but since we exited
3010 * the loop without performing the last iteration of step D6, we have not
3011 * rearranged the tape unit assignment, and therefore the result is on
3012 * TAPE[T]. We need to do it this way so that we can freeze the final
3013 * output tape while rewinding it. The last iteration of step D6 would be
3014 * a waste of cycles anyway...
3015 */
3016 state->result_tape = state->tp_tapenum[state->tapeRange];
3017 if (!WORKER(state))
3018 LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
3019 else
3020 worker_freeze_result_tape(state);
3021 state->status = TSS_SORTEDONTAPE;
3022
3023 /* Release the read buffers of all the other tapes, by rewinding them. */
3024 for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
3025 {
3026 if (tapenum != state->result_tape)
3027 LogicalTapeRewindForWrite(state->tapeset, tapenum);
3028 }
3029 }
3030
3031 /*
3032 * Merge one run from each input tape, except ones with dummy runs.
3033 *
3034 * This is the inner loop of Algorithm D step D5. We know that the
3035 * output tape is TAPE[T].
3036 */
3037 static void
mergeonerun(Tuplesortstate * state)3038 mergeonerun(Tuplesortstate *state)
3039 {
3040 int destTape = state->tp_tapenum[state->tapeRange];
3041 int srcTape;
3042
3043 /*
3044 * Start the merge by loading one tuple from each active source tape into
3045 * the heap. We can also decrease the input run/dummy run counts.
3046 */
3047 beginmerge(state);
3048
3049 /*
3050 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
3051 * out, and replacing it with next tuple from same tape (if there is
3052 * another one).
3053 */
3054 while (state->memtupcount > 0)
3055 {
3056 SortTuple stup;
3057
3058 /* write the tuple to destTape */
3059 srcTape = state->memtuples[0].srctape;
3060 WRITETUP(state, destTape, &state->memtuples[0]);
3061
3062 /* recycle the slot of the tuple we just wrote out, for the next read */
3063 if (state->memtuples[0].tuple)
3064 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
3065
3066 /*
3067 * pull next tuple from the tape, and replace the written-out tuple in
3068 * the heap with it.
3069 */
3070 if (mergereadnext(state, srcTape, &stup))
3071 {
3072 stup.srctape = srcTape;
3073 tuplesort_heap_replace_top(state, &stup);
3074 }
3075 else
3076 tuplesort_heap_delete_top(state);
3077 }
3078
3079 /*
3080 * When the heap empties, we're done. Write an end-of-run marker on the
3081 * output tape, and increment its count of real runs.
3082 */
3083 markrunend(state, destTape);
3084 state->tp_runs[state->tapeRange]++;
3085
3086 #ifdef TRACE_SORT
3087 if (trace_sort)
3088 elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
3089 state->activeTapes, pg_rusage_show(&state->ru_start));
3090 #endif
3091 }
3092
3093 /*
3094 * beginmerge - initialize for a merge pass
3095 *
3096 * We decrease the counts of real and dummy runs for each tape, and mark
3097 * which tapes contain active input runs in mergeactive[]. Then, fill the
3098 * merge heap with the first tuple from each active tape.
3099 */
3100 static void
beginmerge(Tuplesortstate * state)3101 beginmerge(Tuplesortstate *state)
3102 {
3103 int activeTapes;
3104 int tapenum;
3105 int srcTape;
3106
3107 /* Heap should be empty here */
3108 Assert(state->memtupcount == 0);
3109
3110 /* Adjust run counts and mark the active tapes */
3111 memset(state->mergeactive, 0,
3112 state->maxTapes * sizeof(*state->mergeactive));
3113 activeTapes = 0;
3114 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
3115 {
3116 if (state->tp_dummy[tapenum] > 0)
3117 state->tp_dummy[tapenum]--;
3118 else
3119 {
3120 Assert(state->tp_runs[tapenum] > 0);
3121 state->tp_runs[tapenum]--;
3122 srcTape = state->tp_tapenum[tapenum];
3123 state->mergeactive[srcTape] = true;
3124 activeTapes++;
3125 }
3126 }
3127 Assert(activeTapes > 0);
3128 state->activeTapes = activeTapes;
3129
3130 /* Load the merge heap with the first tuple from each input tape */
3131 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
3132 {
3133 SortTuple tup;
3134
3135 if (mergereadnext(state, srcTape, &tup))
3136 {
3137 tup.srctape = srcTape;
3138 tuplesort_heap_insert(state, &tup);
3139 }
3140 }
3141 }
3142
3143 /*
3144 * mergereadnext - read next tuple from one merge input tape
3145 *
3146 * Returns false on EOF.
3147 */
3148 static bool
mergereadnext(Tuplesortstate * state,int srcTape,SortTuple * stup)3149 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
3150 {
3151 unsigned int tuplen;
3152
3153 if (!state->mergeactive[srcTape])
3154 return false; /* tape's run is already exhausted */
3155
3156 /* read next tuple, if any */
3157 if ((tuplen = getlen(state, srcTape, true)) == 0)
3158 {
3159 state->mergeactive[srcTape] = false;
3160 return false;
3161 }
3162 READTUP(state, stup, srcTape, tuplen);
3163
3164 return true;
3165 }
3166
3167 /*
3168 * dumptuples - remove tuples from memtuples and write initial run to tape
3169 *
3170 * When alltuples = true, dump everything currently in memory. (This case is
3171 * only used at end of input data.)
3172 */
3173 static void
dumptuples(Tuplesortstate * state,bool alltuples)3174 dumptuples(Tuplesortstate *state, bool alltuples)
3175 {
3176 int memtupwrite;
3177 int i;
3178
3179 /*
3180 * Nothing to do if we still fit in available memory and have array slots,
3181 * unless this is the final call during initial run generation.
3182 */
3183 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
3184 !alltuples)
3185 return;
3186
3187 /*
3188 * Final call might require no sorting, in rare cases where we just so
3189 * happen to have previously LACKMEM()'d at the point where exactly all
3190 * remaining tuples are loaded into memory, just before input was
3191 * exhausted.
3192 *
3193 * In general, short final runs are quite possible. Rather than allowing
3194 * a special case where there was a superfluous selectnewtape() call (i.e.
3195 * a call with no subsequent run actually written to destTape), we prefer
3196 * to write out a 0 tuple run.
3197 *
3198 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
3199 * the tape inactive for the merge when called from beginmerge(). This
3200 * case is therefore similar to the case where mergeonerun() finds a dummy
3201 * run for the tape, and so doesn't need to merge a run from the tape (or
3202 * conceptually "merges" the dummy run, if you prefer). According to
3203 * Knuth, Algorithm D "isn't strictly optimal" in its method of
3204 * distribution and dummy run assignment; this edge case seems very
3205 * unlikely to make that appreciably worse.
3206 */
3207 Assert(state->status == TSS_BUILDRUNS);
3208
3209 /*
3210 * It seems unlikely that this limit will ever be exceeded, but take no
3211 * chances
3212 */
3213 if (state->currentRun == INT_MAX)
3214 ereport(ERROR,
3215 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
3216 errmsg("cannot have more than %d runs for an external sort",
3217 INT_MAX)));
3218
3219 state->currentRun++;
3220
3221 #ifdef TRACE_SORT
3222 if (trace_sort)
3223 elog(LOG, "worker %d starting quicksort of run %d: %s",
3224 state->worker, state->currentRun,
3225 pg_rusage_show(&state->ru_start));
3226 #endif
3227
3228 /*
3229 * Sort all tuples accumulated within the allowed amount of memory for
3230 * this run using quicksort
3231 */
3232 tuplesort_sort_memtuples(state);
3233
3234 #ifdef TRACE_SORT
3235 if (trace_sort)
3236 elog(LOG, "worker %d finished quicksort of run %d: %s",
3237 state->worker, state->currentRun,
3238 pg_rusage_show(&state->ru_start));
3239 #endif
3240
3241 memtupwrite = state->memtupcount;
3242 for (i = 0; i < memtupwrite; i++)
3243 {
3244 WRITETUP(state, state->tp_tapenum[state->destTape],
3245 &state->memtuples[i]);
3246 state->memtupcount--;
3247 }
3248
3249 /*
3250 * Reset tuple memory. We've freed all of the tuples that we previously
3251 * allocated. It's important to avoid fragmentation when there is a stark
3252 * change in the sizes of incoming tuples. Fragmentation due to
3253 * AllocSetFree's bucketing by size class might be particularly bad if
3254 * this step wasn't taken.
3255 */
3256 MemoryContextReset(state->tuplecontext);
3257
3258 markrunend(state, state->tp_tapenum[state->destTape]);
3259 state->tp_runs[state->destTape]++;
3260 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3261
3262 #ifdef TRACE_SORT
3263 if (trace_sort)
3264 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3265 state->worker, state->currentRun, state->destTape,
3266 pg_rusage_show(&state->ru_start));
3267 #endif
3268
3269 if (!alltuples)
3270 selectnewtape(state);
3271 }
3272
3273 /*
3274 * tuplesort_rescan - rewind and replay the scan
3275 */
3276 void
tuplesort_rescan(Tuplesortstate * state)3277 tuplesort_rescan(Tuplesortstate *state)
3278 {
3279 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3280
3281 Assert(state->randomAccess);
3282
3283 switch (state->status)
3284 {
3285 case TSS_SORTEDINMEM:
3286 state->current = 0;
3287 state->eof_reached = false;
3288 state->markpos_offset = 0;
3289 state->markpos_eof = false;
3290 break;
3291 case TSS_SORTEDONTAPE:
3292 LogicalTapeRewindForRead(state->tapeset,
3293 state->result_tape,
3294 0);
3295 state->eof_reached = false;
3296 state->markpos_block = 0L;
3297 state->markpos_offset = 0;
3298 state->markpos_eof = false;
3299 break;
3300 default:
3301 elog(ERROR, "invalid tuplesort state");
3302 break;
3303 }
3304
3305 MemoryContextSwitchTo(oldcontext);
3306 }
3307
3308 /*
3309 * tuplesort_markpos - saves current position in the merged sort file
3310 */
3311 void
tuplesort_markpos(Tuplesortstate * state)3312 tuplesort_markpos(Tuplesortstate *state)
3313 {
3314 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3315
3316 Assert(state->randomAccess);
3317
3318 switch (state->status)
3319 {
3320 case TSS_SORTEDINMEM:
3321 state->markpos_offset = state->current;
3322 state->markpos_eof = state->eof_reached;
3323 break;
3324 case TSS_SORTEDONTAPE:
3325 LogicalTapeTell(state->tapeset,
3326 state->result_tape,
3327 &state->markpos_block,
3328 &state->markpos_offset);
3329 state->markpos_eof = state->eof_reached;
3330 break;
3331 default:
3332 elog(ERROR, "invalid tuplesort state");
3333 break;
3334 }
3335
3336 MemoryContextSwitchTo(oldcontext);
3337 }
3338
3339 /*
3340 * tuplesort_restorepos - restores current position in merged sort file to
3341 * last saved position
3342 */
3343 void
tuplesort_restorepos(Tuplesortstate * state)3344 tuplesort_restorepos(Tuplesortstate *state)
3345 {
3346 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3347
3348 Assert(state->randomAccess);
3349
3350 switch (state->status)
3351 {
3352 case TSS_SORTEDINMEM:
3353 state->current = state->markpos_offset;
3354 state->eof_reached = state->markpos_eof;
3355 break;
3356 case TSS_SORTEDONTAPE:
3357 LogicalTapeSeek(state->tapeset,
3358 state->result_tape,
3359 state->markpos_block,
3360 state->markpos_offset);
3361 state->eof_reached = state->markpos_eof;
3362 break;
3363 default:
3364 elog(ERROR, "invalid tuplesort state");
3365 break;
3366 }
3367
3368 MemoryContextSwitchTo(oldcontext);
3369 }
3370
3371 /*
3372 * tuplesort_get_stats - extract summary statistics
3373 *
3374 * This can be called after tuplesort_performsort() finishes to obtain
3375 * printable summary information about how the sort was performed.
3376 */
3377 void
tuplesort_get_stats(Tuplesortstate * state,TuplesortInstrumentation * stats)3378 tuplesort_get_stats(Tuplesortstate *state,
3379 TuplesortInstrumentation *stats)
3380 {
3381 /*
3382 * Note: it might seem we should provide both memory and disk usage for a
3383 * disk-based sort. However, the current code doesn't track memory space
3384 * accurately once we have begun to return tuples to the caller (since we
3385 * don't account for pfree's the caller is expected to do), so we cannot
3386 * rely on availMem in a disk sort. This does not seem worth the overhead
3387 * to fix. Is it worth creating an API for the memory context code to
3388 * tell us how much is actually used in sortcontext?
3389 */
3390 tuplesort_updatemax(state);
3391
3392 if (state->isMaxSpaceDisk)
3393 stats->spaceType = SORT_SPACE_TYPE_DISK;
3394 else
3395 stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3396 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
3397
3398 switch (state->maxSpaceStatus)
3399 {
3400 case TSS_SORTEDINMEM:
3401 if (state->boundUsed)
3402 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3403 else
3404 stats->sortMethod = SORT_TYPE_QUICKSORT;
3405 break;
3406 case TSS_SORTEDONTAPE:
3407 stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3408 break;
3409 case TSS_FINALMERGE:
3410 stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3411 break;
3412 default:
3413 stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3414 break;
3415 }
3416 }
3417
3418 /*
3419 * Convert TuplesortMethod to a string.
3420 */
3421 const char *
tuplesort_method_name(TuplesortMethod m)3422 tuplesort_method_name(TuplesortMethod m)
3423 {
3424 switch (m)
3425 {
3426 case SORT_TYPE_STILL_IN_PROGRESS:
3427 return "still in progress";
3428 case SORT_TYPE_TOP_N_HEAPSORT:
3429 return "top-N heapsort";
3430 case SORT_TYPE_QUICKSORT:
3431 return "quicksort";
3432 case SORT_TYPE_EXTERNAL_SORT:
3433 return "external sort";
3434 case SORT_TYPE_EXTERNAL_MERGE:
3435 return "external merge";
3436 }
3437
3438 return "unknown";
3439 }
3440
3441 /*
3442 * Convert TuplesortSpaceType to a string.
3443 */
3444 const char *
tuplesort_space_type_name(TuplesortSpaceType t)3445 tuplesort_space_type_name(TuplesortSpaceType t)
3446 {
3447 Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3448 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3449 }
3450
3451
3452 /*
3453 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3454 */
3455
3456 /*
3457 * Convert the existing unordered array of SortTuples to a bounded heap,
3458 * discarding all but the smallest "state->bound" tuples.
3459 *
3460 * When working with a bounded heap, we want to keep the largest entry
3461 * at the root (array entry zero), instead of the smallest as in the normal
3462 * sort case. This allows us to discard the largest entry cheaply.
3463 * Therefore, we temporarily reverse the sort direction.
3464 */
3465 static void
make_bounded_heap(Tuplesortstate * state)3466 make_bounded_heap(Tuplesortstate *state)
3467 {
3468 int tupcount = state->memtupcount;
3469 int i;
3470
3471 Assert(state->status == TSS_INITIAL);
3472 Assert(state->bounded);
3473 Assert(tupcount >= state->bound);
3474 Assert(SERIAL(state));
3475
3476 /* Reverse sort direction so largest entry will be at root */
3477 reversedirection(state);
3478
3479 state->memtupcount = 0; /* make the heap empty */
3480 for (i = 0; i < tupcount; i++)
3481 {
3482 if (state->memtupcount < state->bound)
3483 {
3484 /* Insert next tuple into heap */
3485 /* Must copy source tuple to avoid possible overwrite */
3486 SortTuple stup = state->memtuples[i];
3487
3488 tuplesort_heap_insert(state, &stup);
3489 }
3490 else
3491 {
3492 /*
3493 * The heap is full. Replace the largest entry with the new
3494 * tuple, or just discard it, if it's larger than anything already
3495 * in the heap.
3496 */
3497 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3498 {
3499 free_sort_tuple(state, &state->memtuples[i]);
3500 CHECK_FOR_INTERRUPTS();
3501 }
3502 else
3503 tuplesort_heap_replace_top(state, &state->memtuples[i]);
3504 }
3505 }
3506
3507 Assert(state->memtupcount == state->bound);
3508 state->status = TSS_BOUNDED;
3509 }
3510
3511 /*
3512 * Convert the bounded heap to a properly-sorted array
3513 */
3514 static void
sort_bounded_heap(Tuplesortstate * state)3515 sort_bounded_heap(Tuplesortstate *state)
3516 {
3517 int tupcount = state->memtupcount;
3518
3519 Assert(state->status == TSS_BOUNDED);
3520 Assert(state->bounded);
3521 Assert(tupcount == state->bound);
3522 Assert(SERIAL(state));
3523
3524 /*
3525 * We can unheapify in place because each delete-top call will remove the
3526 * largest entry, which we can promptly store in the newly freed slot at
3527 * the end. Once we're down to a single-entry heap, we're done.
3528 */
3529 while (state->memtupcount > 1)
3530 {
3531 SortTuple stup = state->memtuples[0];
3532
3533 /* this sifts-up the next-largest entry and decreases memtupcount */
3534 tuplesort_heap_delete_top(state);
3535 state->memtuples[state->memtupcount] = stup;
3536 }
3537 state->memtupcount = tupcount;
3538
3539 /*
3540 * Reverse sort direction back to the original state. This is not
3541 * actually necessary but seems like a good idea for tidiness.
3542 */
3543 reversedirection(state);
3544
3545 state->status = TSS_SORTEDINMEM;
3546 state->boundUsed = true;
3547 }
3548
3549 /*
3550 * Sort all memtuples using specialized qsort() routines.
3551 *
3552 * Quicksort is used for small in-memory sorts, and external sort runs.
3553 */
3554 static void
tuplesort_sort_memtuples(Tuplesortstate * state)3555 tuplesort_sort_memtuples(Tuplesortstate *state)
3556 {
3557 Assert(!LEADER(state));
3558
3559 if (state->memtupcount > 1)
3560 {
3561 /* Can we use the single-key sort function? */
3562 if (state->onlyKey != NULL)
3563 qsort_ssup(state->memtuples, state->memtupcount,
3564 state->onlyKey);
3565 else
3566 qsort_tuple(state->memtuples,
3567 state->memtupcount,
3568 state->comparetup,
3569 state);
3570 }
3571 }
3572
3573 /*
3574 * Insert a new tuple into an empty or existing heap, maintaining the
3575 * heap invariant. Caller is responsible for ensuring there's room.
3576 *
3577 * Note: For some callers, tuple points to a memtuples[] entry above the
3578 * end of the heap. This is safe as long as it's not immediately adjacent
3579 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3580 * is, it might get overwritten before being moved into the heap!
3581 */
3582 static void
tuplesort_heap_insert(Tuplesortstate * state,SortTuple * tuple)3583 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3584 {
3585 SortTuple *memtuples;
3586 int j;
3587
3588 memtuples = state->memtuples;
3589 Assert(state->memtupcount < state->memtupsize);
3590
3591 CHECK_FOR_INTERRUPTS();
3592
3593 /*
3594 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3595 * using 1-based array indexes, not 0-based.
3596 */
3597 j = state->memtupcount++;
3598 while (j > 0)
3599 {
3600 int i = (j - 1) >> 1;
3601
3602 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3603 break;
3604 memtuples[j] = memtuples[i];
3605 j = i;
3606 }
3607 memtuples[j] = *tuple;
3608 }
3609
3610 /*
3611 * Remove the tuple at state->memtuples[0] from the heap. Decrement
3612 * memtupcount, and sift up to maintain the heap invariant.
3613 *
3614 * The caller has already free'd the tuple the top node points to,
3615 * if necessary.
3616 */
3617 static void
tuplesort_heap_delete_top(Tuplesortstate * state)3618 tuplesort_heap_delete_top(Tuplesortstate *state)
3619 {
3620 SortTuple *memtuples = state->memtuples;
3621 SortTuple *tuple;
3622
3623 if (--state->memtupcount <= 0)
3624 return;
3625
3626 /*
3627 * Remove the last tuple in the heap, and re-insert it, by replacing the
3628 * current top node with it.
3629 */
3630 tuple = &memtuples[state->memtupcount];
3631 tuplesort_heap_replace_top(state, tuple);
3632 }
3633
3634 /*
3635 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3636 * maintain the heap invariant.
3637 *
3638 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3639 * Heapsort, steps H3-H8).
3640 */
3641 static void
tuplesort_heap_replace_top(Tuplesortstate * state,SortTuple * tuple)3642 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3643 {
3644 SortTuple *memtuples = state->memtuples;
3645 unsigned int i,
3646 n;
3647
3648 Assert(state->memtupcount >= 1);
3649
3650 CHECK_FOR_INTERRUPTS();
3651
3652 /*
3653 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3654 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3655 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3656 */
3657 n = state->memtupcount;
3658 i = 0; /* i is where the "hole" is */
3659 for (;;)
3660 {
3661 unsigned int j = 2 * i + 1;
3662
3663 if (j >= n)
3664 break;
3665 if (j + 1 < n &&
3666 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3667 j++;
3668 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3669 break;
3670 memtuples[i] = memtuples[j];
3671 i = j;
3672 }
3673 memtuples[i] = *tuple;
3674 }
3675
3676 /*
3677 * Function to reverse the sort direction from its current state
3678 *
3679 * It is not safe to call this when performing hash tuplesorts
3680 */
3681 static void
reversedirection(Tuplesortstate * state)3682 reversedirection(Tuplesortstate *state)
3683 {
3684 SortSupport sortKey = state->sortKeys;
3685 int nkey;
3686
3687 for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3688 {
3689 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3690 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3691 }
3692 }
3693
3694
3695 /*
3696 * Tape interface routines
3697 */
3698
3699 static unsigned int
getlen(Tuplesortstate * state,int tapenum,bool eofOK)3700 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3701 {
3702 unsigned int len;
3703
3704 if (LogicalTapeRead(state->tapeset, tapenum,
3705 &len, sizeof(len)) != sizeof(len))
3706 elog(ERROR, "unexpected end of tape");
3707 if (len == 0 && !eofOK)
3708 elog(ERROR, "unexpected end of data");
3709 return len;
3710 }
3711
3712 static void
markrunend(Tuplesortstate * state,int tapenum)3713 markrunend(Tuplesortstate *state, int tapenum)
3714 {
3715 unsigned int len = 0;
3716
3717 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3718 }
3719
3720 /*
3721 * Get memory for tuple from within READTUP() routine.
3722 *
3723 * We use next free slot from the slab allocator, or palloc() if the tuple
3724 * is too large for that.
3725 */
3726 static void *
readtup_alloc(Tuplesortstate * state,Size tuplen)3727 readtup_alloc(Tuplesortstate *state, Size tuplen)
3728 {
3729 SlabSlot *buf;
3730
3731 /*
3732 * We pre-allocate enough slots in the slab arena that we should never run
3733 * out.
3734 */
3735 Assert(state->slabFreeHead);
3736
3737 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3738 return MemoryContextAlloc(state->sortcontext, tuplen);
3739 else
3740 {
3741 buf = state->slabFreeHead;
3742 /* Reuse this slot */
3743 state->slabFreeHead = buf->nextfree;
3744
3745 return buf;
3746 }
3747 }
3748
3749
3750 /*
3751 * Routines specialized for HeapTuple (actually MinimalTuple) case
3752 */
3753
3754 static int
comparetup_heap(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3755 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3756 {
3757 SortSupport sortKey = state->sortKeys;
3758 HeapTupleData ltup;
3759 HeapTupleData rtup;
3760 TupleDesc tupDesc;
3761 int nkey;
3762 int32 compare;
3763 AttrNumber attno;
3764 Datum datum1,
3765 datum2;
3766 bool isnull1,
3767 isnull2;
3768
3769
3770 /* Compare the leading sort key */
3771 compare = ApplySortComparator(a->datum1, a->isnull1,
3772 b->datum1, b->isnull1,
3773 sortKey);
3774 if (compare != 0)
3775 return compare;
3776
3777 /* Compare additional sort keys */
3778 ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3779 ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3780 rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3781 rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3782 tupDesc = state->tupDesc;
3783
3784 if (sortKey->abbrev_converter)
3785 {
3786 attno = sortKey->ssup_attno;
3787
3788 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
3789 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3790
3791 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3792 datum2, isnull2,
3793 sortKey);
3794 if (compare != 0)
3795 return compare;
3796 }
3797
3798 sortKey++;
3799 for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3800 {
3801 attno = sortKey->ssup_attno;
3802
3803 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
3804 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3805
3806 compare = ApplySortComparator(datum1, isnull1,
3807 datum2, isnull2,
3808 sortKey);
3809 if (compare != 0)
3810 return compare;
3811 }
3812
3813 return 0;
3814 }
3815
3816 static void
copytup_heap(Tuplesortstate * state,SortTuple * stup,void * tup)3817 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3818 {
3819 /*
3820 * We expect the passed "tup" to be a TupleTableSlot, and form a
3821 * MinimalTuple using the exported interface for that.
3822 */
3823 TupleTableSlot *slot = (TupleTableSlot *) tup;
3824 Datum original;
3825 MinimalTuple tuple;
3826 HeapTupleData htup;
3827 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3828
3829 /* copy the tuple into sort storage */
3830 tuple = ExecCopySlotMinimalTuple(slot);
3831 stup->tuple = (void *) tuple;
3832 USEMEM(state, GetMemoryChunkSpace(tuple));
3833 /* set up first-column key value */
3834 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3835 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3836 original = heap_getattr(&htup,
3837 state->sortKeys[0].ssup_attno,
3838 state->tupDesc,
3839 &stup->isnull1);
3840
3841 MemoryContextSwitchTo(oldcontext);
3842
3843 if (!state->sortKeys->abbrev_converter || stup->isnull1)
3844 {
3845 /*
3846 * Store ordinary Datum representation, or NULL value. If there is a
3847 * converter it won't expect NULL values, and cost model is not
3848 * required to account for NULL, so in that case we avoid calling
3849 * converter and just set datum1 to zeroed representation (to be
3850 * consistent, and to support cheap inequality tests for NULL
3851 * abbreviated keys).
3852 */
3853 stup->datum1 = original;
3854 }
3855 else if (!consider_abort_common(state))
3856 {
3857 /* Store abbreviated key representation */
3858 stup->datum1 = state->sortKeys->abbrev_converter(original,
3859 state->sortKeys);
3860 }
3861 else
3862 {
3863 /* Abort abbreviation */
3864 int i;
3865
3866 stup->datum1 = original;
3867
3868 /*
3869 * Set state to be consistent with never trying abbreviation.
3870 *
3871 * Alter datum1 representation in already-copied tuples, so as to
3872 * ensure a consistent representation (current tuple was just
3873 * handled). It does not matter if some dumped tuples are already
3874 * sorted on tape, since serialized tuples lack abbreviated keys
3875 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3876 */
3877 for (i = 0; i < state->memtupcount; i++)
3878 {
3879 SortTuple *mtup = &state->memtuples[i];
3880
3881 htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3882 MINIMAL_TUPLE_OFFSET;
3883 htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3884 MINIMAL_TUPLE_OFFSET);
3885
3886 mtup->datum1 = heap_getattr(&htup,
3887 state->sortKeys[0].ssup_attno,
3888 state->tupDesc,
3889 &mtup->isnull1);
3890 }
3891 }
3892 }
3893
3894 static void
writetup_heap(Tuplesortstate * state,int tapenum,SortTuple * stup)3895 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3896 {
3897 MinimalTuple tuple = (MinimalTuple) stup->tuple;
3898
3899 /* the part of the MinimalTuple we'll write: */
3900 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3901 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3902
3903 /* total on-disk footprint: */
3904 unsigned int tuplen = tupbodylen + sizeof(int);
3905
3906 LogicalTapeWrite(state->tapeset, tapenum,
3907 (void *) &tuplen, sizeof(tuplen));
3908 LogicalTapeWrite(state->tapeset, tapenum,
3909 (void *) tupbody, tupbodylen);
3910 if (state->randomAccess) /* need trailing length word? */
3911 LogicalTapeWrite(state->tapeset, tapenum,
3912 (void *) &tuplen, sizeof(tuplen));
3913
3914 if (!state->slabAllocatorUsed)
3915 {
3916 FREEMEM(state, GetMemoryChunkSpace(tuple));
3917 heap_free_minimal_tuple(tuple);
3918 }
3919 }
3920
3921 static void
readtup_heap(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)3922 readtup_heap(Tuplesortstate *state, SortTuple *stup,
3923 int tapenum, unsigned int len)
3924 {
3925 unsigned int tupbodylen = len - sizeof(int);
3926 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3927 MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3928 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3929 HeapTupleData htup;
3930
3931 /* read in the tuple proper */
3932 tuple->t_len = tuplen;
3933 LogicalTapeReadExact(state->tapeset, tapenum,
3934 tupbody, tupbodylen);
3935 if (state->randomAccess) /* need trailing length word? */
3936 LogicalTapeReadExact(state->tapeset, tapenum,
3937 &tuplen, sizeof(tuplen));
3938 stup->tuple = (void *) tuple;
3939 /* set up first-column key value */
3940 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3941 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3942 stup->datum1 = heap_getattr(&htup,
3943 state->sortKeys[0].ssup_attno,
3944 state->tupDesc,
3945 &stup->isnull1);
3946 }
3947
3948 /*
3949 * Routines specialized for the CLUSTER case (HeapTuple data, with
3950 * comparisons per a btree index definition)
3951 */
3952
3953 static int
comparetup_cluster(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3954 comparetup_cluster(const SortTuple *a, const SortTuple *b,
3955 Tuplesortstate *state)
3956 {
3957 SortSupport sortKey = state->sortKeys;
3958 HeapTuple ltup;
3959 HeapTuple rtup;
3960 TupleDesc tupDesc;
3961 int nkey;
3962 int32 compare;
3963 Datum datum1,
3964 datum2;
3965 bool isnull1,
3966 isnull2;
3967 AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0];
3968
3969 /* Be prepared to compare additional sort keys */
3970 ltup = (HeapTuple) a->tuple;
3971 rtup = (HeapTuple) b->tuple;
3972 tupDesc = state->tupDesc;
3973
3974 /* Compare the leading sort key, if it's simple */
3975 if (leading != 0)
3976 {
3977 compare = ApplySortComparator(a->datum1, a->isnull1,
3978 b->datum1, b->isnull1,
3979 sortKey);
3980 if (compare != 0)
3981 return compare;
3982
3983 if (sortKey->abbrev_converter)
3984 {
3985 datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3986 datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3987
3988 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3989 datum2, isnull2,
3990 sortKey);
3991 }
3992 if (compare != 0 || state->nKeys == 1)
3993 return compare;
3994 /* Compare additional columns the hard way */
3995 sortKey++;
3996 nkey = 1;
3997 }
3998 else
3999 {
4000 /* Must compare all keys the hard way */
4001 nkey = 0;
4002 }
4003
4004 if (state->indexInfo->ii_Expressions == NULL)
4005 {
4006 /* If not expression index, just compare the proper heap attrs */
4007
4008 for (; nkey < state->nKeys; nkey++, sortKey++)
4009 {
4010 AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
4011
4012 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
4013 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
4014
4015 compare = ApplySortComparator(datum1, isnull1,
4016 datum2, isnull2,
4017 sortKey);
4018 if (compare != 0)
4019 return compare;
4020 }
4021 }
4022 else
4023 {
4024 /*
4025 * In the expression index case, compute the whole index tuple and
4026 * then compare values. It would perhaps be faster to compute only as
4027 * many columns as we need to compare, but that would require
4028 * duplicating all the logic in FormIndexDatum.
4029 */
4030 Datum l_index_values[INDEX_MAX_KEYS];
4031 bool l_index_isnull[INDEX_MAX_KEYS];
4032 Datum r_index_values[INDEX_MAX_KEYS];
4033 bool r_index_isnull[INDEX_MAX_KEYS];
4034 TupleTableSlot *ecxt_scantuple;
4035
4036 /* Reset context each time to prevent memory leakage */
4037 ResetPerTupleExprContext(state->estate);
4038
4039 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
4040
4041 ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
4042 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
4043 l_index_values, l_index_isnull);
4044
4045 ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
4046 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
4047 r_index_values, r_index_isnull);
4048
4049 for (; nkey < state->nKeys; nkey++, sortKey++)
4050 {
4051 compare = ApplySortComparator(l_index_values[nkey],
4052 l_index_isnull[nkey],
4053 r_index_values[nkey],
4054 r_index_isnull[nkey],
4055 sortKey);
4056 if (compare != 0)
4057 return compare;
4058 }
4059 }
4060
4061 return 0;
4062 }
4063
4064 static void
copytup_cluster(Tuplesortstate * state,SortTuple * stup,void * tup)4065 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
4066 {
4067 HeapTuple tuple = (HeapTuple) tup;
4068 Datum original;
4069 MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
4070
4071 /* copy the tuple into sort storage */
4072 tuple = heap_copytuple(tuple);
4073 stup->tuple = (void *) tuple;
4074 USEMEM(state, GetMemoryChunkSpace(tuple));
4075
4076 MemoryContextSwitchTo(oldcontext);
4077
4078 /*
4079 * set up first-column key value, and potentially abbreviate, if it's a
4080 * simple column
4081 */
4082 if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
4083 return;
4084
4085 original = heap_getattr(tuple,
4086 state->indexInfo->ii_IndexAttrNumbers[0],
4087 state->tupDesc,
4088 &stup->isnull1);
4089
4090 if (!state->sortKeys->abbrev_converter || stup->isnull1)
4091 {
4092 /*
4093 * Store ordinary Datum representation, or NULL value. If there is a
4094 * converter it won't expect NULL values, and cost model is not
4095 * required to account for NULL, so in that case we avoid calling
4096 * converter and just set datum1 to zeroed representation (to be
4097 * consistent, and to support cheap inequality tests for NULL
4098 * abbreviated keys).
4099 */
4100 stup->datum1 = original;
4101 }
4102 else if (!consider_abort_common(state))
4103 {
4104 /* Store abbreviated key representation */
4105 stup->datum1 = state->sortKeys->abbrev_converter(original,
4106 state->sortKeys);
4107 }
4108 else
4109 {
4110 /* Abort abbreviation */
4111 int i;
4112
4113 stup->datum1 = original;
4114
4115 /*
4116 * Set state to be consistent with never trying abbreviation.
4117 *
4118 * Alter datum1 representation in already-copied tuples, so as to
4119 * ensure a consistent representation (current tuple was just
4120 * handled). It does not matter if some dumped tuples are already
4121 * sorted on tape, since serialized tuples lack abbreviated keys
4122 * (TSS_BUILDRUNS state prevents control reaching here in any case).
4123 */
4124 for (i = 0; i < state->memtupcount; i++)
4125 {
4126 SortTuple *mtup = &state->memtuples[i];
4127
4128 tuple = (HeapTuple) mtup->tuple;
4129 mtup->datum1 = heap_getattr(tuple,
4130 state->indexInfo->ii_IndexAttrNumbers[0],
4131 state->tupDesc,
4132 &mtup->isnull1);
4133 }
4134 }
4135 }
4136
4137 static void
writetup_cluster(Tuplesortstate * state,int tapenum,SortTuple * stup)4138 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
4139 {
4140 HeapTuple tuple = (HeapTuple) stup->tuple;
4141 unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
4142
4143 /* We need to store t_self, but not other fields of HeapTupleData */
4144 LogicalTapeWrite(state->tapeset, tapenum,
4145 &tuplen, sizeof(tuplen));
4146 LogicalTapeWrite(state->tapeset, tapenum,
4147 &tuple->t_self, sizeof(ItemPointerData));
4148 LogicalTapeWrite(state->tapeset, tapenum,
4149 tuple->t_data, tuple->t_len);
4150 if (state->randomAccess) /* need trailing length word? */
4151 LogicalTapeWrite(state->tapeset, tapenum,
4152 &tuplen, sizeof(tuplen));
4153
4154 if (!state->slabAllocatorUsed)
4155 {
4156 FREEMEM(state, GetMemoryChunkSpace(tuple));
4157 heap_freetuple(tuple);
4158 }
4159 }
4160
4161 static void
readtup_cluster(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int tuplen)4162 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
4163 int tapenum, unsigned int tuplen)
4164 {
4165 unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
4166 HeapTuple tuple = (HeapTuple) readtup_alloc(state,
4167 t_len + HEAPTUPLESIZE);
4168
4169 /* Reconstruct the HeapTupleData header */
4170 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
4171 tuple->t_len = t_len;
4172 LogicalTapeReadExact(state->tapeset, tapenum,
4173 &tuple->t_self, sizeof(ItemPointerData));
4174 /* We don't currently bother to reconstruct t_tableOid */
4175 tuple->t_tableOid = InvalidOid;
4176 /* Read in the tuple body */
4177 LogicalTapeReadExact(state->tapeset, tapenum,
4178 tuple->t_data, tuple->t_len);
4179 if (state->randomAccess) /* need trailing length word? */
4180 LogicalTapeReadExact(state->tapeset, tapenum,
4181 &tuplen, sizeof(tuplen));
4182 stup->tuple = (void *) tuple;
4183 /* set up first-column key value, if it's a simple column */
4184 if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
4185 stup->datum1 = heap_getattr(tuple,
4186 state->indexInfo->ii_IndexAttrNumbers[0],
4187 state->tupDesc,
4188 &stup->isnull1);
4189 }
4190
4191 /*
4192 * Routines specialized for IndexTuple case
4193 *
4194 * The btree and hash cases require separate comparison functions, but the
4195 * IndexTuple representation is the same so the copy/write/read support
4196 * functions can be shared.
4197 */
4198
4199 static int
comparetup_index_btree(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4200 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
4201 Tuplesortstate *state)
4202 {
4203 /*
4204 * This is similar to comparetup_heap(), but expects index tuples. There
4205 * is also special handling for enforcing uniqueness, and special
4206 * treatment for equal keys at the end.
4207 */
4208 SortSupport sortKey = state->sortKeys;
4209 IndexTuple tuple1;
4210 IndexTuple tuple2;
4211 int keysz;
4212 TupleDesc tupDes;
4213 bool equal_hasnull = false;
4214 int nkey;
4215 int32 compare;
4216 Datum datum1,
4217 datum2;
4218 bool isnull1,
4219 isnull2;
4220
4221
4222 /* Compare the leading sort key */
4223 compare = ApplySortComparator(a->datum1, a->isnull1,
4224 b->datum1, b->isnull1,
4225 sortKey);
4226 if (compare != 0)
4227 return compare;
4228
4229 /* Compare additional sort keys */
4230 tuple1 = (IndexTuple) a->tuple;
4231 tuple2 = (IndexTuple) b->tuple;
4232 keysz = state->nKeys;
4233 tupDes = RelationGetDescr(state->indexRel);
4234
4235 if (sortKey->abbrev_converter)
4236 {
4237 datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
4238 datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
4239
4240 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
4241 datum2, isnull2,
4242 sortKey);
4243 if (compare != 0)
4244 return compare;
4245 }
4246
4247 /* they are equal, so we only need to examine one null flag */
4248 if (a->isnull1)
4249 equal_hasnull = true;
4250
4251 sortKey++;
4252 for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4253 {
4254 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4255 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4256
4257 compare = ApplySortComparator(datum1, isnull1,
4258 datum2, isnull2,
4259 sortKey);
4260 if (compare != 0)
4261 return compare; /* done when we find unequal attributes */
4262
4263 /* they are equal, so we only need to examine one null flag */
4264 if (isnull1)
4265 equal_hasnull = true;
4266 }
4267
4268 /*
4269 * If btree has asked us to enforce uniqueness, complain if two equal
4270 * tuples are detected (unless there was at least one NULL field).
4271 *
4272 * It is sufficient to make the test here, because if two tuples are equal
4273 * they *must* get compared at some stage of the sort --- otherwise the
4274 * sort algorithm wouldn't have checked whether one must appear before the
4275 * other.
4276 */
4277 if (state->enforceUnique && !equal_hasnull)
4278 {
4279 Datum values[INDEX_MAX_KEYS];
4280 bool isnull[INDEX_MAX_KEYS];
4281 char *key_desc;
4282
4283 /*
4284 * Some rather brain-dead implementations of qsort (such as the one in
4285 * QNX 4) will sometimes call the comparison routine to compare a
4286 * value to itself, but we always use our own implementation, which
4287 * does not.
4288 */
4289 Assert(tuple1 != tuple2);
4290
4291 index_deform_tuple(tuple1, tupDes, values, isnull);
4292
4293 key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4294
4295 ereport(ERROR,
4296 (errcode(ERRCODE_UNIQUE_VIOLATION),
4297 errmsg("could not create unique index \"%s\"",
4298 RelationGetRelationName(state->indexRel)),
4299 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4300 errdetail("Duplicate keys exist."),
4301 errtableconstraint(state->heapRel,
4302 RelationGetRelationName(state->indexRel))));
4303 }
4304
4305 /*
4306 * If key values are equal, we sort on ItemPointer. This is required for
4307 * btree indexes, since heap TID is treated as an implicit last key
4308 * attribute in order to ensure that all keys in the index are physically
4309 * unique.
4310 */
4311 {
4312 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4313 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4314
4315 if (blk1 != blk2)
4316 return (blk1 < blk2) ? -1 : 1;
4317 }
4318 {
4319 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4320 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4321
4322 if (pos1 != pos2)
4323 return (pos1 < pos2) ? -1 : 1;
4324 }
4325
4326 /* ItemPointer values should never be equal */
4327 Assert(false);
4328
4329 return 0;
4330 }
4331
4332 static int
comparetup_index_hash(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4333 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4334 Tuplesortstate *state)
4335 {
4336 Bucket bucket1;
4337 Bucket bucket2;
4338 IndexTuple tuple1;
4339 IndexTuple tuple2;
4340
4341 /*
4342 * Fetch hash keys and mask off bits we don't want to sort by. We know
4343 * that the first column of the index tuple is the hash key.
4344 */
4345 Assert(!a->isnull1);
4346 bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4347 state->max_buckets, state->high_mask,
4348 state->low_mask);
4349 Assert(!b->isnull1);
4350 bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4351 state->max_buckets, state->high_mask,
4352 state->low_mask);
4353 if (bucket1 > bucket2)
4354 return 1;
4355 else if (bucket1 < bucket2)
4356 return -1;
4357
4358 /*
4359 * If hash values are equal, we sort on ItemPointer. This does not affect
4360 * validity of the finished index, but it may be useful to have index
4361 * scans in physical order.
4362 */
4363 tuple1 = (IndexTuple) a->tuple;
4364 tuple2 = (IndexTuple) b->tuple;
4365
4366 {
4367 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4368 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4369
4370 if (blk1 != blk2)
4371 return (blk1 < blk2) ? -1 : 1;
4372 }
4373 {
4374 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4375 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4376
4377 if (pos1 != pos2)
4378 return (pos1 < pos2) ? -1 : 1;
4379 }
4380
4381 /* ItemPointer values should never be equal */
4382 Assert(false);
4383
4384 return 0;
4385 }
4386
4387 static void
copytup_index(Tuplesortstate * state,SortTuple * stup,void * tup)4388 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4389 {
4390 /* Not currently needed */
4391 elog(ERROR, "copytup_index() should not be called");
4392 }
4393
4394 static void
writetup_index(Tuplesortstate * state,int tapenum,SortTuple * stup)4395 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4396 {
4397 IndexTuple tuple = (IndexTuple) stup->tuple;
4398 unsigned int tuplen;
4399
4400 tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4401 LogicalTapeWrite(state->tapeset, tapenum,
4402 (void *) &tuplen, sizeof(tuplen));
4403 LogicalTapeWrite(state->tapeset, tapenum,
4404 (void *) tuple, IndexTupleSize(tuple));
4405 if (state->randomAccess) /* need trailing length word? */
4406 LogicalTapeWrite(state->tapeset, tapenum,
4407 (void *) &tuplen, sizeof(tuplen));
4408
4409 if (!state->slabAllocatorUsed)
4410 {
4411 FREEMEM(state, GetMemoryChunkSpace(tuple));
4412 pfree(tuple);
4413 }
4414 }
4415
4416 static void
readtup_index(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4417 readtup_index(Tuplesortstate *state, SortTuple *stup,
4418 int tapenum, unsigned int len)
4419 {
4420 unsigned int tuplen = len - sizeof(unsigned int);
4421 IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
4422
4423 LogicalTapeReadExact(state->tapeset, tapenum,
4424 tuple, tuplen);
4425 if (state->randomAccess) /* need trailing length word? */
4426 LogicalTapeReadExact(state->tapeset, tapenum,
4427 &tuplen, sizeof(tuplen));
4428 stup->tuple = (void *) tuple;
4429 /* set up first-column key value */
4430 stup->datum1 = index_getattr(tuple,
4431 1,
4432 RelationGetDescr(state->indexRel),
4433 &stup->isnull1);
4434 }
4435
4436 /*
4437 * Routines specialized for DatumTuple case
4438 */
4439
4440 static int
comparetup_datum(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4441 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4442 {
4443 int compare;
4444
4445 compare = ApplySortComparator(a->datum1, a->isnull1,
4446 b->datum1, b->isnull1,
4447 state->sortKeys);
4448 if (compare != 0)
4449 return compare;
4450
4451 /* if we have abbreviations, then "tuple" has the original value */
4452
4453 if (state->sortKeys->abbrev_converter)
4454 compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4455 PointerGetDatum(b->tuple), b->isnull1,
4456 state->sortKeys);
4457
4458 return compare;
4459 }
4460
4461 static void
copytup_datum(Tuplesortstate * state,SortTuple * stup,void * tup)4462 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4463 {
4464 /* Not currently needed */
4465 elog(ERROR, "copytup_datum() should not be called");
4466 }
4467
4468 static void
writetup_datum(Tuplesortstate * state,int tapenum,SortTuple * stup)4469 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4470 {
4471 void *waddr;
4472 unsigned int tuplen;
4473 unsigned int writtenlen;
4474
4475 if (stup->isnull1)
4476 {
4477 waddr = NULL;
4478 tuplen = 0;
4479 }
4480 else if (!state->tuples)
4481 {
4482 waddr = &stup->datum1;
4483 tuplen = sizeof(Datum);
4484 }
4485 else
4486 {
4487 waddr = stup->tuple;
4488 tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4489 Assert(tuplen != 0);
4490 }
4491
4492 writtenlen = tuplen + sizeof(unsigned int);
4493
4494 LogicalTapeWrite(state->tapeset, tapenum,
4495 (void *) &writtenlen, sizeof(writtenlen));
4496 LogicalTapeWrite(state->tapeset, tapenum,
4497 waddr, tuplen);
4498 if (state->randomAccess) /* need trailing length word? */
4499 LogicalTapeWrite(state->tapeset, tapenum,
4500 (void *) &writtenlen, sizeof(writtenlen));
4501
4502 if (!state->slabAllocatorUsed && stup->tuple)
4503 {
4504 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4505 pfree(stup->tuple);
4506 }
4507 }
4508
4509 static void
readtup_datum(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4510 readtup_datum(Tuplesortstate *state, SortTuple *stup,
4511 int tapenum, unsigned int len)
4512 {
4513 unsigned int tuplen = len - sizeof(unsigned int);
4514
4515 if (tuplen == 0)
4516 {
4517 /* it's NULL */
4518 stup->datum1 = (Datum) 0;
4519 stup->isnull1 = true;
4520 stup->tuple = NULL;
4521 }
4522 else if (!state->tuples)
4523 {
4524 Assert(tuplen == sizeof(Datum));
4525 LogicalTapeReadExact(state->tapeset, tapenum,
4526 &stup->datum1, tuplen);
4527 stup->isnull1 = false;
4528 stup->tuple = NULL;
4529 }
4530 else
4531 {
4532 void *raddr = readtup_alloc(state, tuplen);
4533
4534 LogicalTapeReadExact(state->tapeset, tapenum,
4535 raddr, tuplen);
4536 stup->datum1 = PointerGetDatum(raddr);
4537 stup->isnull1 = false;
4538 stup->tuple = raddr;
4539 }
4540
4541 if (state->randomAccess) /* need trailing length word? */
4542 LogicalTapeReadExact(state->tapeset, tapenum,
4543 &tuplen, sizeof(tuplen));
4544 }
4545
4546 /*
4547 * Parallel sort routines
4548 */
4549
4550 /*
4551 * tuplesort_estimate_shared - estimate required shared memory allocation
4552 *
4553 * nWorkers is an estimate of the number of workers (it's the number that
4554 * will be requested).
4555 */
4556 Size
tuplesort_estimate_shared(int nWorkers)4557 tuplesort_estimate_shared(int nWorkers)
4558 {
4559 Size tapesSize;
4560
4561 Assert(nWorkers > 0);
4562
4563 /* Make sure that BufFile shared state is MAXALIGN'd */
4564 tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4565 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4566
4567 return tapesSize;
4568 }
4569
4570 /*
4571 * tuplesort_initialize_shared - initialize shared tuplesort state
4572 *
4573 * Must be called from leader process before workers are launched, to
4574 * establish state needed up-front for worker tuplesortstates. nWorkers
4575 * should match the argument passed to tuplesort_estimate_shared().
4576 */
4577 void
tuplesort_initialize_shared(Sharedsort * shared,int nWorkers,dsm_segment * seg)4578 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4579 {
4580 int i;
4581
4582 Assert(nWorkers > 0);
4583
4584 SpinLockInit(&shared->mutex);
4585 shared->currentWorker = 0;
4586 shared->workersFinished = 0;
4587 SharedFileSetInit(&shared->fileset, seg);
4588 shared->nTapes = nWorkers;
4589 for (i = 0; i < nWorkers; i++)
4590 {
4591 shared->tapes[i].firstblocknumber = 0L;
4592 }
4593 }
4594
4595 /*
4596 * tuplesort_attach_shared - attach to shared tuplesort state
4597 *
4598 * Must be called by all worker processes.
4599 */
4600 void
tuplesort_attach_shared(Sharedsort * shared,dsm_segment * seg)4601 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4602 {
4603 /* Attach to SharedFileSet */
4604 SharedFileSetAttach(&shared->fileset, seg);
4605 }
4606
4607 /*
4608 * worker_get_identifier - Assign and return ordinal identifier for worker
4609 *
4610 * The order in which these are assigned is not well defined, and should not
4611 * matter; worker numbers across parallel sort participants need only be
4612 * distinct and gapless. logtape.c requires this.
4613 *
4614 * Note that the identifiers assigned from here have no relation to
4615 * ParallelWorkerNumber number, to avoid making any assumption about
4616 * caller's requirements. However, we do follow the ParallelWorkerNumber
4617 * convention of representing a non-worker with worker number -1. This
4618 * includes the leader, as well as serial Tuplesort processes.
4619 */
4620 static int
worker_get_identifier(Tuplesortstate * state)4621 worker_get_identifier(Tuplesortstate *state)
4622 {
4623 Sharedsort *shared = state->shared;
4624 int worker;
4625
4626 Assert(WORKER(state));
4627
4628 SpinLockAcquire(&shared->mutex);
4629 worker = shared->currentWorker++;
4630 SpinLockRelease(&shared->mutex);
4631
4632 return worker;
4633 }
4634
4635 /*
4636 * worker_freeze_result_tape - freeze worker's result tape for leader
4637 *
4638 * This is called by workers just after the result tape has been determined,
4639 * instead of calling LogicalTapeFreeze() directly. They do so because
4640 * workers require a few additional steps over similar serial
4641 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
4642 * steps are around freeing now unneeded resources, and representing to
4643 * leader that worker's input run is available for its merge.
4644 *
4645 * There should only be one final output run for each worker, which consists
4646 * of all tuples that were originally input into worker.
4647 */
4648 static void
worker_freeze_result_tape(Tuplesortstate * state)4649 worker_freeze_result_tape(Tuplesortstate *state)
4650 {
4651 Sharedsort *shared = state->shared;
4652 TapeShare output;
4653
4654 Assert(WORKER(state));
4655 Assert(state->result_tape != -1);
4656 Assert(state->memtupcount == 0);
4657
4658 /*
4659 * Free most remaining memory, in case caller is sensitive to our holding
4660 * on to it. memtuples may not be a tiny merge heap at this point.
4661 */
4662 pfree(state->memtuples);
4663 /* Be tidy */
4664 state->memtuples = NULL;
4665 state->memtupsize = 0;
4666
4667 /*
4668 * Parallel worker requires result tape metadata, which is to be stored in
4669 * shared memory for leader
4670 */
4671 LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4672
4673 /* Store properties of output tape, and update finished worker count */
4674 SpinLockAcquire(&shared->mutex);
4675 shared->tapes[state->worker] = output;
4676 shared->workersFinished++;
4677 SpinLockRelease(&shared->mutex);
4678 }
4679
4680 /*
4681 * worker_nomergeruns - dump memtuples in worker, without merging
4682 *
4683 * This called as an alternative to mergeruns() with a worker when no
4684 * merging is required.
4685 */
4686 static void
worker_nomergeruns(Tuplesortstate * state)4687 worker_nomergeruns(Tuplesortstate *state)
4688 {
4689 Assert(WORKER(state));
4690 Assert(state->result_tape == -1);
4691
4692 state->result_tape = state->tp_tapenum[state->destTape];
4693 worker_freeze_result_tape(state);
4694 }
4695
4696 /*
4697 * leader_takeover_tapes - create tapeset for leader from worker tapes
4698 *
4699 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
4700 * sorting has occurred in workers, all of which must have already returned
4701 * from tuplesort_performsort().
4702 *
4703 * When this returns, leader process is left in a state that is virtually
4704 * indistinguishable from it having generated runs as a serial external sort
4705 * might have.
4706 */
4707 static void
leader_takeover_tapes(Tuplesortstate * state)4708 leader_takeover_tapes(Tuplesortstate *state)
4709 {
4710 Sharedsort *shared = state->shared;
4711 int nParticipants = state->nParticipants;
4712 int workersFinished;
4713 int j;
4714
4715 Assert(LEADER(state));
4716 Assert(nParticipants >= 1);
4717
4718 SpinLockAcquire(&shared->mutex);
4719 workersFinished = shared->workersFinished;
4720 SpinLockRelease(&shared->mutex);
4721
4722 if (nParticipants != workersFinished)
4723 elog(ERROR, "cannot take over tapes before all workers finish");
4724
4725 /*
4726 * Create the tapeset from worker tapes, including a leader-owned tape at
4727 * the end. Parallel workers are far more expensive than logical tapes,
4728 * so the number of tapes allocated here should never be excessive.
4729 *
4730 * We still have a leader tape, though it's not possible to write to it
4731 * due to restrictions in the shared fileset infrastructure used by
4732 * logtape.c. It will never be written to in practice because
4733 * randomAccess is disallowed for parallel sorts.
4734 */
4735 inittapestate(state, nParticipants + 1);
4736 state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
4737 shared->tapes, &shared->fileset,
4738 state->worker);
4739
4740 /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4741 state->currentRun = nParticipants;
4742
4743 /*
4744 * Initialize variables of Algorithm D to be consistent with runs from
4745 * workers having been generated in the leader.
4746 *
4747 * There will always be exactly 1 run per worker, and exactly one input
4748 * tape per run, because workers always output exactly 1 run, even when
4749 * there were no input tuples for workers to sort.
4750 */
4751 for (j = 0; j < state->maxTapes; j++)
4752 {
4753 /* One real run; no dummy runs for worker tapes */
4754 state->tp_fib[j] = 1;
4755 state->tp_runs[j] = 1;
4756 state->tp_dummy[j] = 0;
4757 state->tp_tapenum[j] = j;
4758 }
4759 /* Leader tape gets one dummy run, and no real runs */
4760 state->tp_fib[state->tapeRange] = 0;
4761 state->tp_runs[state->tapeRange] = 0;
4762 state->tp_dummy[state->tapeRange] = 1;
4763
4764 state->Level = 1;
4765 state->destTape = 0;
4766
4767 state->status = TSS_BUILDRUNS;
4768 }
4769
4770 /*
4771 * Convenience routine to free a tuple previously loaded into sort memory
4772 */
4773 static void
free_sort_tuple(Tuplesortstate * state,SortTuple * stup)4774 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4775 {
4776 if (stup->tuple)
4777 {
4778 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4779 pfree(stup->tuple);
4780 stup->tuple = NULL;
4781 }
4782 }
4783