1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *	  Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  Historically, we divided the input into sorted runs
15  * using replacement selection, in the form of a priority tree implemented
16  * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17  * quicksort for run generation.  We merge the runs using polyphase merge,
18  * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
19  * implemented by logtape.c, which avoids space wastage by recycling disk
20  * space as soon as each block is read from its "tape".
21  *
22  * The approximate amount of memory allowed for any one sort operation
23  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
24  * we absorb tuples and simply store them in an unsorted array as long as
25  * we haven't exceeded workMem.  If we reach the end of the input without
26  * exceeding workMem, we sort the array using qsort() and subsequently return
27  * tuples just by scanning the tuple array sequentially.  If we do exceed
28  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29  * When tuples are dumped in batch after quicksorting, we begin a new run
30  * with a new output tape (selected per Algorithm D).  After the end of the
31  * input is reached, we dump out remaining tuples in memory into a final run,
32  * then merge the runs using Algorithm D.
33  *
34  * When merging runs, we use a heap containing just the frontmost tuple from
35  * each source run; we repeatedly output the smallest tuple and replace it
36  * with the next tuple from its source tape (if any).  When the heap empties,
37  * the merge is complete.  The basic merge algorithm thus needs very little
38  * memory --- only M tuples for an M-way merge, and M is constrained to a
39  * small number.  However, we can still make good use of our full workMem
40  * allocation by pre-reading additional blocks from each source tape.  Without
41  * prereading, our access pattern to the temporary file would be very erratic;
42  * on average we'd read one block from each of M source tapes during the same
43  * time that we're writing M blocks to the output tape, so there is no
44  * sequentiality of access at all, defeating the read-ahead methods used by
45  * most Unix kernels.  Worse, the output tape gets written into a very random
46  * sequence of blocks of the temp file, ensuring that things will be even
47  * worse when it comes time to read that tape.  A straightforward merge pass
48  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
49  * by prereading from each source tape sequentially, loading about workMem/M
50  * bytes from each tape in turn, and making the sequential blocks immediately
51  * available for reuse.  This approach helps to localize both read and write
52  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
53  * much memory to use for the buffers.
54  *
55  * When the caller requests random access to the sort result, we form
56  * the final sorted run on a logical tape which is then "frozen", so
57  * that we can access it randomly.  When the caller does not need random
58  * access, we return from tuplesort_performsort() as soon as we are down
59  * to one run per logical tape.  The final merge is then performed
60  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61  * saves one cycle of writing all the data out to disk and reading it in.
62  *
63  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
66  * tape drives are expensive beasts, and in particular that there will always
67  * be many more runs than tape drives.  In our implementation a "tape drive"
68  * doesn't cost much more than a few Kb of memory buffers, so we can afford
69  * to have lots of them.  In particular, if we can have as many tape drives
70  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
71  * code we determine the number of tapes M on the basis of workMem: we want
72  * workMem/M to be large enough that we read a fair amount of data each time
73  * we preread from a tape, so as to maintain the locality of access described
74  * above.  Nonetheless, with large workMem we can have many tapes (but not
75  * too many -- see the comments in tuplesort_merge_order).
76  *
77  * This module supports parallel sorting.  Parallel sorts involve coordination
78  * among one or more worker processes, and a leader process, each with its own
79  * tuplesort state.  The leader process (or, more accurately, the
80  * Tuplesortstate associated with a leader process) creates a full tapeset
81  * consisting of worker tapes with one run to merge; a run for every
82  * worker process.  This is then merged.  Worker processes are guaranteed to
83  * produce exactly one output run from their partial input.
84  *
85  *
86  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
87  * Portions Copyright (c) 1994, Regents of the University of California
88  *
89  * IDENTIFICATION
90  *	  src/backend/utils/sort/tuplesort.c
91  *
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include <limits.h>
98 
99 #include "access/hash.h"
100 #include "access/htup_details.h"
101 #include "access/nbtree.h"
102 #include "catalog/index.h"
103 #include "catalog/pg_am.h"
104 #include "commands/tablespace.h"
105 #include "executor/executor.h"
106 #include "miscadmin.h"
107 #include "pg_trace.h"
108 #include "utils/datum.h"
109 #include "utils/logtape.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 #include "utils/pg_rusage.h"
113 #include "utils/rel.h"
114 #include "utils/sortsupport.h"
115 #include "utils/tuplesort.h"
116 
117 
118 /* sort-type codes for sort__start probes */
119 #define HEAP_SORT		0
120 #define INDEX_SORT		1
121 #define DATUM_SORT		2
122 #define CLUSTER_SORT	3
123 
124 /* Sort parallel code from state for sort__start probes */
125 #define PARALLEL_SORT(state)	((state)->shared == NULL ? 0 : \
126 								 (state)->worker >= 0 ? 1 : 2)
127 
128 /* GUC variables */
129 #ifdef TRACE_SORT
130 bool		trace_sort = false;
131 #endif
132 
133 #ifdef DEBUG_BOUNDED_SORT
134 bool		optimize_bounded_sort = true;
135 #endif
136 
137 
138 /*
139  * The objects we actually sort are SortTuple structs.  These contain
140  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
141  * which is a separate palloc chunk --- we assume it is just one chunk and
142  * can be freed by a simple pfree() (except during merge, when we use a
143  * simple slab allocator).  SortTuples also contain the tuple's first key
144  * column in Datum/nullflag format, and an index integer.
145  *
146  * Storing the first key column lets us save heap_getattr or index_getattr
147  * calls during tuple comparisons.  We could extract and save all the key
148  * columns not just the first, but this would increase code complexity and
149  * overhead, and wouldn't actually save any comparison cycles in the common
150  * case where the first key determines the comparison result.  Note that
151  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
152  *
153  * There is one special case: when the sort support infrastructure provides an
154  * "abbreviated key" representation, where the key is (typically) a pass by
155  * value proxy for a pass by reference type.  In this case, the abbreviated key
156  * is stored in datum1 in place of the actual first key column.
157  *
158  * When sorting single Datums, the data value is represented directly by
159  * datum1/isnull1 for pass by value types (or null values).  If the datatype is
160  * pass-by-reference and isnull1 is false, then "tuple" points to a separately
161  * palloc'd data value, otherwise "tuple" is NULL.  The value of datum1 is then
162  * either the same pointer as "tuple", or is an abbreviated key value as
163  * described above.  Accordingly, "tuple" is always used in preference to
164  * datum1 as the authoritative value for pass-by-reference cases.
165  *
166  * tupindex holds the input tape number that each tuple in the heap was read
167  * from during merge passes.
168  */
169 typedef struct
170 {
171 	void	   *tuple;			/* the tuple itself */
172 	Datum		datum1;			/* value of first key column */
173 	bool		isnull1;		/* is first key column NULL? */
174 	int			tupindex;		/* see notes above */
175 } SortTuple;
176 
177 /*
178  * During merge, we use a pre-allocated set of fixed-size slots to hold
179  * tuples.  To avoid palloc/pfree overhead.
180  *
181  * Merge doesn't require a lot of memory, so we can afford to waste some,
182  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
183  * palloc() overhead is not significant anymore.
184  *
185  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
186  * slot holds a tuple.
187  */
188 #define SLAB_SLOT_SIZE 1024
189 
190 typedef union SlabSlot
191 {
192 	union SlabSlot *nextfree;
193 	char		buffer[SLAB_SLOT_SIZE];
194 } SlabSlot;
195 
196 /*
197  * Possible states of a Tuplesort object.  These denote the states that
198  * persist between calls of Tuplesort routines.
199  */
200 typedef enum
201 {
202 	TSS_INITIAL,				/* Loading tuples; still within memory limit */
203 	TSS_BOUNDED,				/* Loading tuples into bounded-size heap */
204 	TSS_BUILDRUNS,				/* Loading tuples; writing to tape */
205 	TSS_SORTEDINMEM,			/* Sort completed entirely in memory */
206 	TSS_SORTEDONTAPE,			/* Sort completed, final run is on tape */
207 	TSS_FINALMERGE				/* Performing final merge on-the-fly */
208 } TupSortStatus;
209 
210 /*
211  * Parameters for calculation of number of tapes to use --- see inittapes()
212  * and tuplesort_merge_order().
213  *
214  * In this calculation we assume that each tape will cost us about 1 blocks
215  * worth of buffer space.  This ignores the overhead of all the other data
216  * structures needed for each tape, but it's probably close enough.
217  *
218  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
219  * tape during a preread cycle (see discussion at top of file).
220  */
221 #define MINORDER		6		/* minimum merge order */
222 #define MAXORDER		500		/* maximum merge order */
223 #define TAPE_BUFFER_OVERHEAD		BLCKSZ
224 #define MERGE_BUFFER_SIZE			(BLCKSZ * 32)
225 
226 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
227 									Tuplesortstate *state);
228 
229 /*
230  * Private state of a Tuplesort operation.
231  */
232 struct Tuplesortstate
233 {
234 	TupSortStatus status;		/* enumerated value as shown above */
235 	int			nKeys;			/* number of columns in sort key */
236 	bool		randomAccess;	/* did caller request random access? */
237 	bool		bounded;		/* did caller specify a maximum number of
238 								 * tuples to return? */
239 	bool		boundUsed;		/* true if we made use of a bounded heap */
240 	int			bound;			/* if bounded, the maximum number of tuples */
241 	bool		tuples;			/* Can SortTuple.tuple ever be set? */
242 	int64		availMem;		/* remaining memory available, in bytes */
243 	int64		allowedMem;		/* total memory allowed, in bytes */
244 	int			maxTapes;		/* number of tapes (Knuth's T) */
245 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
246 	MemoryContext sortcontext;	/* memory context holding most sort data */
247 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
248 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
249 
250 	/*
251 	 * These function pointers decouple the routines that must know what kind
252 	 * of tuple we are sorting from the routines that don't need to know it.
253 	 * They are set up by the tuplesort_begin_xxx routines.
254 	 *
255 	 * Function to compare two tuples; result is per qsort() convention, ie:
256 	 * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
257 	 * qsort_arg_comparator.
258 	 */
259 	SortTupleComparator comparetup;
260 
261 	/*
262 	 * Function to copy a supplied input tuple into palloc'd space and set up
263 	 * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
264 	 * state->availMem must be decreased by the amount of space used for the
265 	 * tuple copy (note the SortTuple struct itself is not counted).
266 	 */
267 	void		(*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
268 
269 	/*
270 	 * Function to write a stored tuple onto tape.  The representation of the
271 	 * tuple on tape need not be the same as it is in memory; requirements on
272 	 * the tape representation are given below.  Unless the slab allocator is
273 	 * used, after writing the tuple, pfree() the out-of-line data (not the
274 	 * SortTuple struct!), and increase state->availMem by the amount of
275 	 * memory space thereby released.
276 	 */
277 	void		(*writetup) (Tuplesortstate *state, int tapenum,
278 							 SortTuple *stup);
279 
280 	/*
281 	 * Function to read a stored tuple from tape back into memory. 'len' is
282 	 * the already-read length of the stored tuple.  The tuple is allocated
283 	 * from the slab memory arena, or is palloc'd, see readtup_alloc().
284 	 */
285 	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
286 							int tapenum, unsigned int len);
287 
288 	/*
289 	 * This array holds the tuples now in sort memory.  If we are in state
290 	 * INITIAL, the tuples are in no particular order; if we are in state
291 	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
292 	 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
293 	 * H.  In state SORTEDONTAPE, the array is not used.
294 	 */
295 	SortTuple  *memtuples;		/* array of SortTuple structs */
296 	int			memtupcount;	/* number of tuples currently present */
297 	int			memtupsize;		/* allocated length of memtuples array */
298 	bool		growmemtuples;	/* memtuples' growth still underway? */
299 
300 	/*
301 	 * Memory for tuples is sometimes allocated using a simple slab allocator,
302 	 * rather than with palloc().  Currently, we switch to slab allocation
303 	 * when we start merging.  Merging only needs to keep a small, fixed
304 	 * number of tuples in memory at any time, so we can avoid the
305 	 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
306 	 * to hold the tuples.
307 	 *
308 	 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
309 	 * slots.  The allocation is sized to have one slot per tape, plus one
310 	 * additional slot.  We need that many slots to hold all the tuples kept
311 	 * in the heap during merge, plus the one we have last returned from the
312 	 * sort, with tuplesort_gettuple.
313 	 *
314 	 * Initially, all the slots are kept in a linked list of free slots.  When
315 	 * a tuple is read from a tape, it is put to the next available slot, if
316 	 * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
317 	 * instead.
318 	 *
319 	 * When we're done processing a tuple, we return the slot back to the free
320 	 * list, or pfree() if it was palloc'd.  We know that a tuple was
321 	 * allocated from the slab, if its pointer value is between
322 	 * slabMemoryBegin and -End.
323 	 *
324 	 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
325 	 * tracking memory usage is not used.
326 	 */
327 	bool		slabAllocatorUsed;
328 
329 	char	   *slabMemoryBegin;	/* beginning of slab memory arena */
330 	char	   *slabMemoryEnd;	/* end of slab memory arena */
331 	SlabSlot   *slabFreeHead;	/* head of free list */
332 
333 	/* Buffer size to use for reading input tapes, during merge. */
334 	size_t		read_buffer_size;
335 
336 	/*
337 	 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
338 	 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
339 	 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
340 	 * recycle the memory on next gettuple call.
341 	 */
342 	void	   *lastReturnedTuple;
343 
344 	/*
345 	 * While building initial runs, this is the current output run number.
346 	 * Afterwards, it is the number of initial runs we made.
347 	 */
348 	int			currentRun;
349 
350 	/*
351 	 * Unless otherwise noted, all pointer variables below are pointers to
352 	 * arrays of length maxTapes, holding per-tape data.
353 	 */
354 
355 	/*
356 	 * This variable is only used during merge passes.  mergeactive[i] is true
357 	 * if we are reading an input run from (actual) tape number i and have not
358 	 * yet exhausted that run.
359 	 */
360 	bool	   *mergeactive;	/* active input run source? */
361 
362 	/*
363 	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
364 	 * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
365 	 * "logical" and "actual" tape numbers straight!
366 	 */
367 	int			Level;			/* Knuth's l */
368 	int			destTape;		/* current output tape (Knuth's j, less 1) */
369 	int		   *tp_fib;			/* Target Fibonacci run counts (A[]) */
370 	int		   *tp_runs;		/* # of real runs on each tape */
371 	int		   *tp_dummy;		/* # of dummy runs for each tape (D[]) */
372 	int		   *tp_tapenum;		/* Actual tape numbers (TAPE[]) */
373 	int			activeTapes;	/* # of active input tapes in merge pass */
374 
375 	/*
376 	 * These variables are used after completion of sorting to keep track of
377 	 * the next tuple to return.  (In the tape case, the tape's current read
378 	 * position is also critical state.)
379 	 */
380 	int			result_tape;	/* actual tape number of finished output */
381 	int			current;		/* array index (only used if SORTEDINMEM) */
382 	bool		eof_reached;	/* reached EOF (needed for cursors) */
383 
384 	/* markpos_xxx holds marked position for mark and restore */
385 	long		markpos_block;	/* tape block# (only used if SORTEDONTAPE) */
386 	int			markpos_offset; /* saved "current", or offset in tape block */
387 	bool		markpos_eof;	/* saved "eof_reached" */
388 
389 	/*
390 	 * These variables are used during parallel sorting.
391 	 *
392 	 * worker is our worker identifier.  Follows the general convention that
393 	 * -1 value relates to a leader tuplesort, and values >= 0 worker
394 	 * tuplesorts. (-1 can also be a serial tuplesort.)
395 	 *
396 	 * shared is mutable shared memory state, which is used to coordinate
397 	 * parallel sorts.
398 	 *
399 	 * nParticipants is the number of worker Tuplesortstates known by the
400 	 * leader to have actually been launched, which implies that they must
401 	 * finish a run leader can merge.  Typically includes a worker state held
402 	 * by the leader process itself.  Set in the leader Tuplesortstate only.
403 	 */
404 	int			worker;
405 	Sharedsort *shared;
406 	int			nParticipants;
407 
408 	/*
409 	 * The sortKeys variable is used by every case other than the hash index
410 	 * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
411 	 * MinimalTuple and CLUSTER routines, though.
412 	 */
413 	TupleDesc	tupDesc;
414 	SortSupport sortKeys;		/* array of length nKeys */
415 
416 	/*
417 	 * This variable is shared by the single-key MinimalTuple case and the
418 	 * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
419 	 */
420 	SortSupport onlyKey;
421 
422 	/*
423 	 * Additional state for managing "abbreviated key" sortsupport routines
424 	 * (which currently may be used by all cases except the hash index case).
425 	 * Tracks the intervals at which the optimization's effectiveness is
426 	 * tested.
427 	 */
428 	int64		abbrevNext;		/* Tuple # at which to next check
429 								 * applicability */
430 
431 	/*
432 	 * These variables are specific to the CLUSTER case; they are set by
433 	 * tuplesort_begin_cluster.
434 	 */
435 	IndexInfo  *indexInfo;		/* info about index being used for reference */
436 	EState	   *estate;			/* for evaluating index expressions */
437 
438 	/*
439 	 * These variables are specific to the IndexTuple case; they are set by
440 	 * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
441 	 */
442 	Relation	heapRel;		/* table the index is being built on */
443 	Relation	indexRel;		/* index being built */
444 
445 	/* These are specific to the index_btree subcase: */
446 	bool		enforceUnique;	/* complain if we find duplicate tuples */
447 
448 	/* These are specific to the index_hash subcase: */
449 	uint32		high_mask;		/* masks for sortable part of hash code */
450 	uint32		low_mask;
451 	uint32		max_buckets;
452 
453 	/*
454 	 * These variables are specific to the Datum case; they are set by
455 	 * tuplesort_begin_datum and used only by the DatumTuple routines.
456 	 */
457 	Oid			datumType;
458 	/* we need typelen in order to know how to copy the Datums. */
459 	int			datumTypeLen;
460 
461 	/*
462 	 * Resource snapshot for time of sort start.
463 	 */
464 #ifdef TRACE_SORT
465 	PGRUsage	ru_start;
466 #endif
467 };
468 
469 /*
470  * Private mutable state of tuplesort-parallel-operation.  This is allocated
471  * in shared memory.
472  */
473 struct Sharedsort
474 {
475 	/* mutex protects all fields prior to tapes */
476 	slock_t		mutex;
477 
478 	/*
479 	 * currentWorker generates ordinal identifier numbers for parallel sort
480 	 * workers.  These start from 0, and are always gapless.
481 	 *
482 	 * Workers increment workersFinished to indicate having finished.  If this
483 	 * is equal to state.nParticipants within the leader, leader is ready to
484 	 * merge worker runs.
485 	 */
486 	int			currentWorker;
487 	int			workersFinished;
488 
489 	/* Temporary file space */
490 	SharedFileSet fileset;
491 
492 	/* Size of tapes flexible array */
493 	int			nTapes;
494 
495 	/*
496 	 * Tapes array used by workers to report back information needed by the
497 	 * leader to concatenate all worker tapes into one for merging
498 	 */
499 	TapeShare	tapes[FLEXIBLE_ARRAY_MEMBER];
500 };
501 
502 /*
503  * Is the given tuple allocated from the slab memory arena?
504  */
505 #define IS_SLAB_SLOT(state, tuple) \
506 	((char *) (tuple) >= (state)->slabMemoryBegin && \
507 	 (char *) (tuple) < (state)->slabMemoryEnd)
508 
509 /*
510  * Return the given tuple to the slab memory free list, or free it
511  * if it was palloc'd.
512  */
513 #define RELEASE_SLAB_SLOT(state, tuple) \
514 	do { \
515 		SlabSlot *buf = (SlabSlot *) tuple; \
516 		\
517 		if (IS_SLAB_SLOT((state), buf)) \
518 		{ \
519 			buf->nextfree = (state)->slabFreeHead; \
520 			(state)->slabFreeHead = buf; \
521 		} else \
522 			pfree(buf); \
523 	} while(0)
524 
525 #define COMPARETUP(state,a,b)	((*(state)->comparetup) (a, b, state))
526 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
527 #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
528 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
529 #define LACKMEM(state)		((state)->availMem < 0 && !(state)->slabAllocatorUsed)
530 #define USEMEM(state,amt)	((state)->availMem -= (amt))
531 #define FREEMEM(state,amt)	((state)->availMem += (amt))
532 #define SERIAL(state)		((state)->shared == NULL)
533 #define WORKER(state)		((state)->shared && (state)->worker != -1)
534 #define LEADER(state)		((state)->shared && (state)->worker == -1)
535 
536 /*
537  * NOTES about on-tape representation of tuples:
538  *
539  * We require the first "unsigned int" of a stored tuple to be the total size
540  * on-tape of the tuple, including itself (so it is never zero; an all-zero
541  * unsigned int is used to delimit runs).  The remainder of the stored tuple
542  * may or may not match the in-memory representation of the tuple ---
543  * any conversion needed is the job of the writetup and readtup routines.
544  *
545  * If state->randomAccess is true, then the stored representation of the
546  * tuple must be followed by another "unsigned int" that is a copy of the
547  * length --- so the total tape space used is actually sizeof(unsigned int)
548  * more than the stored length value.  This allows read-backwards.  When
549  * randomAccess is not true, the write/read routines may omit the extra
550  * length word.
551  *
552  * writetup is expected to write both length words as well as the tuple
553  * data.  When readtup is called, the tape is positioned just after the
554  * front length word; readtup must read the tuple data and advance past
555  * the back length word (if present).
556  *
557  * The write/read routines can make use of the tuple description data
558  * stored in the Tuplesortstate record, if needed.  They are also expected
559  * to adjust state->availMem by the amount of memory space (not tape space!)
560  * released or consumed.  There is no error return from either writetup
561  * or readtup; they should ereport() on failure.
562  *
563  *
564  * NOTES about memory consumption calculations:
565  *
566  * We count space allocated for tuples against the workMem limit, plus
567  * the space used by the variable-size memtuples array.  Fixed-size space
568  * is not counted; it's small enough to not be interesting.
569  *
570  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
571  * rather than the originally-requested size.  This is important since
572  * palloc can add substantial overhead.  It's not a complete answer since
573  * we won't count any wasted space in palloc allocation blocks, but it's
574  * a lot better than what we were doing before 7.3.  As of 9.6, a
575  * separate memory context is used for caller passed tuples.  Resetting
576  * it at certain key increments significantly ameliorates fragmentation.
577  * Note that this places a responsibility on readtup and copytup routines
578  * to use the right memory context for these tuples (and to not use the
579  * reset context for anything whose lifetime needs to span multiple
580  * external sort runs).
581  */
582 
583 /* When using this macro, beware of double evaluation of len */
584 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
585 	do { \
586 		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
587 			elog(ERROR, "unexpected end of data"); \
588 	} while(0)
589 
590 
591 static Tuplesortstate *tuplesort_begin_common(int workMem,
592 											  SortCoordinate coordinate,
593 											  bool randomAccess);
594 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
595 static bool consider_abort_common(Tuplesortstate *state);
596 static void inittapes(Tuplesortstate *state, bool mergeruns);
597 static void inittapestate(Tuplesortstate *state, int maxTapes);
598 static void selectnewtape(Tuplesortstate *state);
599 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
600 static void mergeruns(Tuplesortstate *state);
601 static void mergeonerun(Tuplesortstate *state);
602 static void beginmerge(Tuplesortstate *state);
603 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
604 static void dumptuples(Tuplesortstate *state, bool alltuples);
605 static void make_bounded_heap(Tuplesortstate *state);
606 static void sort_bounded_heap(Tuplesortstate *state);
607 static void tuplesort_sort_memtuples(Tuplesortstate *state);
608 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
609 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
610 static void tuplesort_heap_delete_top(Tuplesortstate *state);
611 static void reversedirection(Tuplesortstate *state);
612 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
613 static void markrunend(Tuplesortstate *state, int tapenum);
614 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
615 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
616 							Tuplesortstate *state);
617 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
618 static void writetup_heap(Tuplesortstate *state, int tapenum,
619 						  SortTuple *stup);
620 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
621 						 int tapenum, unsigned int len);
622 static int	comparetup_cluster(const SortTuple *a, const SortTuple *b,
623 							   Tuplesortstate *state);
624 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
625 static void writetup_cluster(Tuplesortstate *state, int tapenum,
626 							 SortTuple *stup);
627 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
628 							int tapenum, unsigned int len);
629 static int	comparetup_index_btree(const SortTuple *a, const SortTuple *b,
630 								   Tuplesortstate *state);
631 static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
632 								  Tuplesortstate *state);
633 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
634 static void writetup_index(Tuplesortstate *state, int tapenum,
635 						   SortTuple *stup);
636 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
637 						  int tapenum, unsigned int len);
638 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
639 							 Tuplesortstate *state);
640 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
641 static void writetup_datum(Tuplesortstate *state, int tapenum,
642 						   SortTuple *stup);
643 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
644 						  int tapenum, unsigned int len);
645 static int	worker_get_identifier(Tuplesortstate *state);
646 static void worker_freeze_result_tape(Tuplesortstate *state);
647 static void worker_nomergeruns(Tuplesortstate *state);
648 static void leader_takeover_tapes(Tuplesortstate *state);
649 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
650 
651 /*
652  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
653  * any variant of SortTuples, using the appropriate comparetup function.
654  * qsort_ssup() is specialized for the case where the comparetup function
655  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
656  * and Datum sorts.
657  */
658 #include "qsort_tuple.c"
659 
660 
661 /*
662  *		tuplesort_begin_xxx
663  *
664  * Initialize for a tuple sort operation.
665  *
666  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
667  * zero or more times, then call tuplesort_performsort when all the tuples
668  * have been supplied.  After performsort, retrieve the tuples in sorted
669  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
670  * access was requested, rescan, markpos, and restorepos can also be called.)
671  * Call tuplesort_end to terminate the operation and release memory/disk space.
672  *
673  * Each variant of tuplesort_begin has a workMem parameter specifying the
674  * maximum number of kilobytes of RAM to use before spilling data to disk.
675  * (The normal value of this parameter is work_mem, but some callers use
676  * other values.)  Each variant also has a randomAccess parameter specifying
677  * whether the caller needs non-sequential access to the sort result.
678  */
679 
680 static Tuplesortstate *
tuplesort_begin_common(int workMem,SortCoordinate coordinate,bool randomAccess)681 tuplesort_begin_common(int workMem, SortCoordinate coordinate,
682 					   bool randomAccess)
683 {
684 	Tuplesortstate *state;
685 	MemoryContext sortcontext;
686 	MemoryContext tuplecontext;
687 	MemoryContext oldcontext;
688 
689 	/* See leader_takeover_tapes() remarks on randomAccess support */
690 	if (coordinate && randomAccess)
691 		elog(ERROR, "random access disallowed under parallel sort");
692 
693 	/*
694 	 * Create a working memory context for this sort operation. All data
695 	 * needed by the sort will live inside this context.
696 	 */
697 	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
698 										"TupleSort main",
699 										ALLOCSET_DEFAULT_SIZES);
700 
701 	/*
702 	 * Caller tuple (e.g. IndexTuple) memory context.
703 	 *
704 	 * A dedicated child context used exclusively for caller passed tuples
705 	 * eases memory management.  Resetting at key points reduces
706 	 * fragmentation. Note that the memtuples array of SortTuples is allocated
707 	 * in the parent context, not this context, because there is no need to
708 	 * free memtuples early.
709 	 */
710 	tuplecontext = AllocSetContextCreate(sortcontext,
711 										 "Caller tuples",
712 										 ALLOCSET_DEFAULT_SIZES);
713 
714 	/*
715 	 * Make the Tuplesortstate within the per-sort context.  This way, we
716 	 * don't need a separate pfree() operation for it at shutdown.
717 	 */
718 	oldcontext = MemoryContextSwitchTo(sortcontext);
719 
720 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
721 
722 #ifdef TRACE_SORT
723 	if (trace_sort)
724 		pg_rusage_init(&state->ru_start);
725 #endif
726 
727 	state->status = TSS_INITIAL;
728 	state->randomAccess = randomAccess;
729 	state->bounded = false;
730 	state->tuples = true;
731 	state->boundUsed = false;
732 
733 	/*
734 	 * workMem is forced to be at least 64KB, the current minimum valid value
735 	 * for the work_mem GUC.  This is a defense against parallel sort callers
736 	 * that divide out memory among many workers in a way that leaves each
737 	 * with very little memory.
738 	 */
739 	state->allowedMem = Max(workMem, 64) * (int64) 1024;
740 	state->availMem = state->allowedMem;
741 	state->sortcontext = sortcontext;
742 	state->tuplecontext = tuplecontext;
743 	state->tapeset = NULL;
744 
745 	state->memtupcount = 0;
746 
747 	/*
748 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
749 	 * see comments in grow_memtuples().
750 	 */
751 	state->memtupsize = Max(1024,
752 							ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
753 
754 	state->growmemtuples = true;
755 	state->slabAllocatorUsed = false;
756 	state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
757 
758 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
759 
760 	/* workMem must be large enough for the minimal memtuples array */
761 	if (LACKMEM(state))
762 		elog(ERROR, "insufficient memory allowed for sort");
763 
764 	state->currentRun = 0;
765 
766 	/*
767 	 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
768 	 * inittapes(), if needed
769 	 */
770 
771 	state->result_tape = -1;	/* flag that result tape has not been formed */
772 
773 	/*
774 	 * Initialize parallel-related state based on coordination information
775 	 * from caller
776 	 */
777 	if (!coordinate)
778 	{
779 		/* Serial sort */
780 		state->shared = NULL;
781 		state->worker = -1;
782 		state->nParticipants = -1;
783 	}
784 	else if (coordinate->isWorker)
785 	{
786 		/* Parallel worker produces exactly one final run from all input */
787 		state->shared = coordinate->sharedsort;
788 		state->worker = worker_get_identifier(state);
789 		state->nParticipants = -1;
790 	}
791 	else
792 	{
793 		/* Parallel leader state only used for final merge */
794 		state->shared = coordinate->sharedsort;
795 		state->worker = -1;
796 		state->nParticipants = coordinate->nParticipants;
797 		Assert(state->nParticipants >= 1);
798 	}
799 
800 	MemoryContextSwitchTo(oldcontext);
801 
802 	return state;
803 }
804 
805 Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,int nkeys,AttrNumber * attNums,Oid * sortOperators,Oid * sortCollations,bool * nullsFirstFlags,int workMem,SortCoordinate coordinate,bool randomAccess)806 tuplesort_begin_heap(TupleDesc tupDesc,
807 					 int nkeys, AttrNumber *attNums,
808 					 Oid *sortOperators, Oid *sortCollations,
809 					 bool *nullsFirstFlags,
810 					 int workMem, SortCoordinate coordinate, bool randomAccess)
811 {
812 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
813 												   randomAccess);
814 	MemoryContext oldcontext;
815 	int			i;
816 
817 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
818 
819 	AssertArg(nkeys > 0);
820 
821 #ifdef TRACE_SORT
822 	if (trace_sort)
823 		elog(LOG,
824 			 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
825 			 nkeys, workMem, randomAccess ? 't' : 'f');
826 #endif
827 
828 	state->nKeys = nkeys;
829 
830 	TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
831 								false,	/* no unique check */
832 								nkeys,
833 								workMem,
834 								randomAccess,
835 								PARALLEL_SORT(state));
836 
837 	state->comparetup = comparetup_heap;
838 	state->copytup = copytup_heap;
839 	state->writetup = writetup_heap;
840 	state->readtup = readtup_heap;
841 
842 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
843 	state->abbrevNext = 10;
844 
845 	/* Prepare SortSupport data for each column */
846 	state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
847 
848 	for (i = 0; i < nkeys; i++)
849 	{
850 		SortSupport sortKey = state->sortKeys + i;
851 
852 		AssertArg(attNums[i] != 0);
853 		AssertArg(sortOperators[i] != 0);
854 
855 		sortKey->ssup_cxt = CurrentMemoryContext;
856 		sortKey->ssup_collation = sortCollations[i];
857 		sortKey->ssup_nulls_first = nullsFirstFlags[i];
858 		sortKey->ssup_attno = attNums[i];
859 		/* Convey if abbreviation optimization is applicable in principle */
860 		sortKey->abbreviate = (i == 0);
861 
862 		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
863 	}
864 
865 	/*
866 	 * The "onlyKey" optimization cannot be used with abbreviated keys, since
867 	 * tie-breaker comparisons may be required.  Typically, the optimization
868 	 * is only of value to pass-by-value types anyway, whereas abbreviated
869 	 * keys are typically only of value to pass-by-reference types.
870 	 */
871 	if (nkeys == 1 && !state->sortKeys->abbrev_converter)
872 		state->onlyKey = state->sortKeys;
873 
874 	MemoryContextSwitchTo(oldcontext);
875 
876 	return state;
877 }
878 
879 Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)880 tuplesort_begin_cluster(TupleDesc tupDesc,
881 						Relation indexRel,
882 						int workMem,
883 						SortCoordinate coordinate, bool randomAccess)
884 {
885 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
886 												   randomAccess);
887 	BTScanInsert indexScanKey;
888 	MemoryContext oldcontext;
889 	int			i;
890 
891 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
892 
893 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
894 
895 #ifdef TRACE_SORT
896 	if (trace_sort)
897 		elog(LOG,
898 			 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
899 			 RelationGetNumberOfAttributes(indexRel),
900 			 workMem, randomAccess ? 't' : 'f');
901 #endif
902 
903 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
904 
905 	TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
906 								false,	/* no unique check */
907 								state->nKeys,
908 								workMem,
909 								randomAccess,
910 								PARALLEL_SORT(state));
911 
912 	state->comparetup = comparetup_cluster;
913 	state->copytup = copytup_cluster;
914 	state->writetup = writetup_cluster;
915 	state->readtup = readtup_cluster;
916 	state->abbrevNext = 10;
917 
918 	state->indexInfo = BuildIndexInfo(indexRel);
919 
920 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
921 
922 	indexScanKey = _bt_mkscankey(indexRel, NULL);
923 
924 	if (state->indexInfo->ii_Expressions != NULL)
925 	{
926 		TupleTableSlot *slot;
927 		ExprContext *econtext;
928 
929 		/*
930 		 * We will need to use FormIndexDatum to evaluate the index
931 		 * expressions.  To do that, we need an EState, as well as a
932 		 * TupleTableSlot to put the table tuples into.  The econtext's
933 		 * scantuple has to point to that slot, too.
934 		 */
935 		state->estate = CreateExecutorState();
936 		slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
937 		econtext = GetPerTupleExprContext(state->estate);
938 		econtext->ecxt_scantuple = slot;
939 	}
940 
941 	/* Prepare SortSupport data for each column */
942 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
943 											sizeof(SortSupportData));
944 
945 	for (i = 0; i < state->nKeys; i++)
946 	{
947 		SortSupport sortKey = state->sortKeys + i;
948 		ScanKey		scanKey = indexScanKey->scankeys + i;
949 		int16		strategy;
950 
951 		sortKey->ssup_cxt = CurrentMemoryContext;
952 		sortKey->ssup_collation = scanKey->sk_collation;
953 		sortKey->ssup_nulls_first =
954 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
955 		sortKey->ssup_attno = scanKey->sk_attno;
956 		/* Convey if abbreviation optimization is applicable in principle */
957 		sortKey->abbreviate = (i == 0);
958 
959 		AssertState(sortKey->ssup_attno != 0);
960 
961 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
962 			BTGreaterStrategyNumber : BTLessStrategyNumber;
963 
964 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
965 	}
966 
967 	pfree(indexScanKey);
968 
969 	MemoryContextSwitchTo(oldcontext);
970 
971 	return state;
972 }
973 
974 Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,Relation indexRel,bool enforceUnique,int workMem,SortCoordinate coordinate,bool randomAccess)975 tuplesort_begin_index_btree(Relation heapRel,
976 							Relation indexRel,
977 							bool enforceUnique,
978 							int workMem,
979 							SortCoordinate coordinate,
980 							bool randomAccess)
981 {
982 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
983 												   randomAccess);
984 	BTScanInsert indexScanKey;
985 	MemoryContext oldcontext;
986 	int			i;
987 
988 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
989 
990 #ifdef TRACE_SORT
991 	if (trace_sort)
992 		elog(LOG,
993 			 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
994 			 enforceUnique ? 't' : 'f',
995 			 workMem, randomAccess ? 't' : 'f');
996 #endif
997 
998 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
999 
1000 	TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1001 								enforceUnique,
1002 								state->nKeys,
1003 								workMem,
1004 								randomAccess,
1005 								PARALLEL_SORT(state));
1006 
1007 	state->comparetup = comparetup_index_btree;
1008 	state->copytup = copytup_index;
1009 	state->writetup = writetup_index;
1010 	state->readtup = readtup_index;
1011 	state->abbrevNext = 10;
1012 
1013 	state->heapRel = heapRel;
1014 	state->indexRel = indexRel;
1015 	state->enforceUnique = enforceUnique;
1016 
1017 	indexScanKey = _bt_mkscankey(indexRel, NULL);
1018 
1019 	/* Prepare SortSupport data for each column */
1020 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
1021 											sizeof(SortSupportData));
1022 
1023 	for (i = 0; i < state->nKeys; i++)
1024 	{
1025 		SortSupport sortKey = state->sortKeys + i;
1026 		ScanKey		scanKey = indexScanKey->scankeys + i;
1027 		int16		strategy;
1028 
1029 		sortKey->ssup_cxt = CurrentMemoryContext;
1030 		sortKey->ssup_collation = scanKey->sk_collation;
1031 		sortKey->ssup_nulls_first =
1032 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1033 		sortKey->ssup_attno = scanKey->sk_attno;
1034 		/* Convey if abbreviation optimization is applicable in principle */
1035 		sortKey->abbreviate = (i == 0);
1036 
1037 		AssertState(sortKey->ssup_attno != 0);
1038 
1039 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1040 			BTGreaterStrategyNumber : BTLessStrategyNumber;
1041 
1042 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1043 	}
1044 
1045 	pfree(indexScanKey);
1046 
1047 	MemoryContextSwitchTo(oldcontext);
1048 
1049 	return state;
1050 }
1051 
1052 Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,Relation indexRel,uint32 high_mask,uint32 low_mask,uint32 max_buckets,int workMem,SortCoordinate coordinate,bool randomAccess)1053 tuplesort_begin_index_hash(Relation heapRel,
1054 						   Relation indexRel,
1055 						   uint32 high_mask,
1056 						   uint32 low_mask,
1057 						   uint32 max_buckets,
1058 						   int workMem,
1059 						   SortCoordinate coordinate,
1060 						   bool randomAccess)
1061 {
1062 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1063 												   randomAccess);
1064 	MemoryContext oldcontext;
1065 
1066 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
1067 
1068 #ifdef TRACE_SORT
1069 	if (trace_sort)
1070 		elog(LOG,
1071 			 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1072 			 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1073 			 high_mask,
1074 			 low_mask,
1075 			 max_buckets,
1076 			 workMem, randomAccess ? 't' : 'f');
1077 #endif
1078 
1079 	state->nKeys = 1;			/* Only one sort column, the hash code */
1080 
1081 	state->comparetup = comparetup_index_hash;
1082 	state->copytup = copytup_index;
1083 	state->writetup = writetup_index;
1084 	state->readtup = readtup_index;
1085 
1086 	state->heapRel = heapRel;
1087 	state->indexRel = indexRel;
1088 
1089 	state->high_mask = high_mask;
1090 	state->low_mask = low_mask;
1091 	state->max_buckets = max_buckets;
1092 
1093 	MemoryContextSwitchTo(oldcontext);
1094 
1095 	return state;
1096 }
1097 
1098 Tuplesortstate *
tuplesort_begin_datum(Oid datumType,Oid sortOperator,Oid sortCollation,bool nullsFirstFlag,int workMem,SortCoordinate coordinate,bool randomAccess)1099 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1100 					  bool nullsFirstFlag, int workMem,
1101 					  SortCoordinate coordinate, bool randomAccess)
1102 {
1103 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1104 												   randomAccess);
1105 	MemoryContext oldcontext;
1106 	int16		typlen;
1107 	bool		typbyval;
1108 
1109 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
1110 
1111 #ifdef TRACE_SORT
1112 	if (trace_sort)
1113 		elog(LOG,
1114 			 "begin datum sort: workMem = %d, randomAccess = %c",
1115 			 workMem, randomAccess ? 't' : 'f');
1116 #endif
1117 
1118 	state->nKeys = 1;			/* always a one-column sort */
1119 
1120 	TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1121 								false,	/* no unique check */
1122 								1,
1123 								workMem,
1124 								randomAccess,
1125 								PARALLEL_SORT(state));
1126 
1127 	state->comparetup = comparetup_datum;
1128 	state->copytup = copytup_datum;
1129 	state->writetup = writetup_datum;
1130 	state->readtup = readtup_datum;
1131 	state->abbrevNext = 10;
1132 
1133 	state->datumType = datumType;
1134 
1135 	/* lookup necessary attributes of the datum type */
1136 	get_typlenbyval(datumType, &typlen, &typbyval);
1137 	state->datumTypeLen = typlen;
1138 	state->tuples = !typbyval;
1139 
1140 	/* Prepare SortSupport data */
1141 	state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1142 
1143 	state->sortKeys->ssup_cxt = CurrentMemoryContext;
1144 	state->sortKeys->ssup_collation = sortCollation;
1145 	state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1146 
1147 	/*
1148 	 * Abbreviation is possible here only for by-reference types.  In theory,
1149 	 * a pass-by-value datatype could have an abbreviated form that is cheaper
1150 	 * to compare.  In a tuple sort, we could support that, because we can
1151 	 * always extract the original datum from the tuple is needed.  Here, we
1152 	 * can't, because a datum sort only stores a single copy of the datum; the
1153 	 * "tuple" field of each sortTuple is NULL.
1154 	 */
1155 	state->sortKeys->abbreviate = !typbyval;
1156 
1157 	PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1158 
1159 	/*
1160 	 * The "onlyKey" optimization cannot be used with abbreviated keys, since
1161 	 * tie-breaker comparisons may be required.  Typically, the optimization
1162 	 * is only of value to pass-by-value types anyway, whereas abbreviated
1163 	 * keys are typically only of value to pass-by-reference types.
1164 	 */
1165 	if (!state->sortKeys->abbrev_converter)
1166 		state->onlyKey = state->sortKeys;
1167 
1168 	MemoryContextSwitchTo(oldcontext);
1169 
1170 	return state;
1171 }
1172 
1173 /*
1174  * tuplesort_set_bound
1175  *
1176  *	Advise tuplesort that at most the first N result tuples are required.
1177  *
1178  * Must be called before inserting any tuples.  (Actually, we could allow it
1179  * as long as the sort hasn't spilled to disk, but there seems no need for
1180  * delayed calls at the moment.)
1181  *
1182  * This is a hint only. The tuplesort may still return more tuples than
1183  * requested.  Parallel leader tuplesorts will always ignore the hint.
1184  */
1185 void
tuplesort_set_bound(Tuplesortstate * state,int64 bound)1186 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1187 {
1188 	/* Assert we're called before loading any tuples */
1189 	Assert(state->status == TSS_INITIAL);
1190 	Assert(state->memtupcount == 0);
1191 	Assert(!state->bounded);
1192 	Assert(!WORKER(state));
1193 
1194 #ifdef DEBUG_BOUNDED_SORT
1195 	/* Honor GUC setting that disables the feature (for easy testing) */
1196 	if (!optimize_bounded_sort)
1197 		return;
1198 #endif
1199 
1200 	/* Parallel leader ignores hint */
1201 	if (LEADER(state))
1202 		return;
1203 
1204 	/* We want to be able to compute bound * 2, so limit the setting */
1205 	if (bound > (int64) (INT_MAX / 2))
1206 		return;
1207 
1208 	state->bounded = true;
1209 	state->bound = (int) bound;
1210 
1211 	/*
1212 	 * Bounded sorts are not an effective target for abbreviated key
1213 	 * optimization.  Disable by setting state to be consistent with no
1214 	 * abbreviation support.
1215 	 */
1216 	state->sortKeys->abbrev_converter = NULL;
1217 	if (state->sortKeys->abbrev_full_comparator)
1218 		state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1219 
1220 	/* Not strictly necessary, but be tidy */
1221 	state->sortKeys->abbrev_abort = NULL;
1222 	state->sortKeys->abbrev_full_comparator = NULL;
1223 }
1224 
1225 /*
1226  * tuplesort_end
1227  *
1228  *	Release resources and clean up.
1229  *
1230  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1231  * pointing to garbage.  Be careful not to attempt to use or free such
1232  * pointers afterwards!
1233  */
1234 void
tuplesort_end(Tuplesortstate * state)1235 tuplesort_end(Tuplesortstate *state)
1236 {
1237 	/* context swap probably not needed, but let's be safe */
1238 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1239 
1240 #ifdef TRACE_SORT
1241 	long		spaceUsed;
1242 
1243 	if (state->tapeset)
1244 		spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1245 	else
1246 		spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1247 #endif
1248 
1249 	/*
1250 	 * Delete temporary "tape" files, if any.
1251 	 *
1252 	 * Note: want to include this in reported total cost of sort, hence need
1253 	 * for two #ifdef TRACE_SORT sections.
1254 	 */
1255 	if (state->tapeset)
1256 		LogicalTapeSetClose(state->tapeset);
1257 
1258 #ifdef TRACE_SORT
1259 	if (trace_sort)
1260 	{
1261 		if (state->tapeset)
1262 			elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1263 				 SERIAL(state) ? "external sort" : "parallel external sort",
1264 				 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1265 		else
1266 			elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1267 				 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1268 				 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1269 	}
1270 
1271 	TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1272 #else
1273 
1274 	/*
1275 	 * If you disabled TRACE_SORT, you can still probe sort__done, but you
1276 	 * ain't getting space-used stats.
1277 	 */
1278 	TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1279 #endif
1280 
1281 	/* Free any execution state created for CLUSTER case */
1282 	if (state->estate != NULL)
1283 	{
1284 		ExprContext *econtext = GetPerTupleExprContext(state->estate);
1285 
1286 		ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1287 		FreeExecutorState(state->estate);
1288 	}
1289 
1290 	MemoryContextSwitchTo(oldcontext);
1291 
1292 	/*
1293 	 * Free the per-sort memory context, thereby releasing all working memory,
1294 	 * including the Tuplesortstate struct itself.
1295 	 */
1296 	MemoryContextDelete(state->sortcontext);
1297 }
1298 
1299 /*
1300  * Grow the memtuples[] array, if possible within our memory constraint.  We
1301  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1302  * limit.  Return true if we were able to enlarge the array, false if not.
1303  *
1304  * Normally, at each increment we double the size of the array.  When doing
1305  * that would exceed a limit, we attempt one last, smaller increase (and then
1306  * clear the growmemtuples flag so we don't try any more).  That allows us to
1307  * use memory as fully as permitted; sticking to the pure doubling rule could
1308  * result in almost half going unused.  Because availMem moves around with
1309  * tuple addition/removal, we need some rule to prevent making repeated small
1310  * increases in memtupsize, which would just be useless thrashing.  The
1311  * growmemtuples flag accomplishes that and also prevents useless
1312  * recalculations in this function.
1313  */
1314 static bool
grow_memtuples(Tuplesortstate * state)1315 grow_memtuples(Tuplesortstate *state)
1316 {
1317 	int			newmemtupsize;
1318 	int			memtupsize = state->memtupsize;
1319 	int64		memNowUsed = state->allowedMem - state->availMem;
1320 
1321 	/* Forget it if we've already maxed out memtuples, per comment above */
1322 	if (!state->growmemtuples)
1323 		return false;
1324 
1325 	/* Select new value of memtupsize */
1326 	if (memNowUsed <= state->availMem)
1327 	{
1328 		/*
1329 		 * We've used no more than half of allowedMem; double our usage,
1330 		 * clamping at INT_MAX tuples.
1331 		 */
1332 		if (memtupsize < INT_MAX / 2)
1333 			newmemtupsize = memtupsize * 2;
1334 		else
1335 		{
1336 			newmemtupsize = INT_MAX;
1337 			state->growmemtuples = false;
1338 		}
1339 	}
1340 	else
1341 	{
1342 		/*
1343 		 * This will be the last increment of memtupsize.  Abandon doubling
1344 		 * strategy and instead increase as much as we safely can.
1345 		 *
1346 		 * To stay within allowedMem, we can't increase memtupsize by more
1347 		 * than availMem / sizeof(SortTuple) elements.  In practice, we want
1348 		 * to increase it by considerably less, because we need to leave some
1349 		 * space for the tuples to which the new array slots will refer.  We
1350 		 * assume the new tuples will be about the same size as the tuples
1351 		 * we've already seen, and thus we can extrapolate from the space
1352 		 * consumption so far to estimate an appropriate new size for the
1353 		 * memtuples array.  The optimal value might be higher or lower than
1354 		 * this estimate, but it's hard to know that in advance.  We again
1355 		 * clamp at INT_MAX tuples.
1356 		 *
1357 		 * This calculation is safe against enlarging the array so much that
1358 		 * LACKMEM becomes true, because the memory currently used includes
1359 		 * the present array; thus, there would be enough allowedMem for the
1360 		 * new array elements even if no other memory were currently used.
1361 		 *
1362 		 * We do the arithmetic in float8, because otherwise the product of
1363 		 * memtupsize and allowedMem could overflow.  Any inaccuracy in the
1364 		 * result should be insignificant; but even if we computed a
1365 		 * completely insane result, the checks below will prevent anything
1366 		 * really bad from happening.
1367 		 */
1368 		double		grow_ratio;
1369 
1370 		grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1371 		if (memtupsize * grow_ratio < INT_MAX)
1372 			newmemtupsize = (int) (memtupsize * grow_ratio);
1373 		else
1374 			newmemtupsize = INT_MAX;
1375 
1376 		/* We won't make any further enlargement attempts */
1377 		state->growmemtuples = false;
1378 	}
1379 
1380 	/* Must enlarge array by at least one element, else report failure */
1381 	if (newmemtupsize <= memtupsize)
1382 		goto noalloc;
1383 
1384 	/*
1385 	 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
1386 	 * to ensure our request won't be rejected.  Note that we can easily
1387 	 * exhaust address space before facing this outcome.  (This is presently
1388 	 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1389 	 * don't rely on that at this distance.)
1390 	 */
1391 	if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1392 	{
1393 		newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1394 		state->growmemtuples = false;	/* can't grow any more */
1395 	}
1396 
1397 	/*
1398 	 * We need to be sure that we do not cause LACKMEM to become true, else
1399 	 * the space management algorithm will go nuts.  The code above should
1400 	 * never generate a dangerous request, but to be safe, check explicitly
1401 	 * that the array growth fits within availMem.  (We could still cause
1402 	 * LACKMEM if the memory chunk overhead associated with the memtuples
1403 	 * array were to increase.  That shouldn't happen because we chose the
1404 	 * initial array size large enough to ensure that palloc will be treating
1405 	 * both old and new arrays as separate chunks.  But we'll check LACKMEM
1406 	 * explicitly below just in case.)
1407 	 */
1408 	if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1409 		goto noalloc;
1410 
1411 	/* OK, do it */
1412 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1413 	state->memtupsize = newmemtupsize;
1414 	state->memtuples = (SortTuple *)
1415 		repalloc_huge(state->memtuples,
1416 					  state->memtupsize * sizeof(SortTuple));
1417 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1418 	if (LACKMEM(state))
1419 		elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1420 	return true;
1421 
1422 noalloc:
1423 	/* If for any reason we didn't realloc, shut off future attempts */
1424 	state->growmemtuples = false;
1425 	return false;
1426 }
1427 
1428 /*
1429  * Accept one tuple while collecting input data for sort.
1430  *
1431  * Note that the input data is always copied; the caller need not save it.
1432  */
1433 void
tuplesort_puttupleslot(Tuplesortstate * state,TupleTableSlot * slot)1434 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1435 {
1436 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1437 	SortTuple	stup;
1438 
1439 	/*
1440 	 * Copy the given tuple into memory we control, and decrease availMem.
1441 	 * Then call the common code.
1442 	 */
1443 	COPYTUP(state, &stup, (void *) slot);
1444 
1445 	puttuple_common(state, &stup);
1446 
1447 	MemoryContextSwitchTo(oldcontext);
1448 }
1449 
1450 /*
1451  * Accept one tuple while collecting input data for sort.
1452  *
1453  * Note that the input data is always copied; the caller need not save it.
1454  */
1455 void
tuplesort_putheaptuple(Tuplesortstate * state,HeapTuple tup)1456 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1457 {
1458 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1459 	SortTuple	stup;
1460 
1461 	/*
1462 	 * Copy the given tuple into memory we control, and decrease availMem.
1463 	 * Then call the common code.
1464 	 */
1465 	COPYTUP(state, &stup, (void *) tup);
1466 
1467 	puttuple_common(state, &stup);
1468 
1469 	MemoryContextSwitchTo(oldcontext);
1470 }
1471 
1472 /*
1473  * Collect one index tuple while collecting input data for sort, building
1474  * it from caller-supplied values.
1475  */
1476 void
tuplesort_putindextuplevalues(Tuplesortstate * state,Relation rel,ItemPointer self,Datum * values,bool * isnull)1477 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1478 							  ItemPointer self, Datum *values,
1479 							  bool *isnull)
1480 {
1481 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1482 	SortTuple	stup;
1483 	Datum		original;
1484 	IndexTuple	tuple;
1485 
1486 	stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1487 	tuple = ((IndexTuple) stup.tuple);
1488 	tuple->t_tid = *self;
1489 	USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1490 	/* set up first-column key value */
1491 	original = index_getattr(tuple,
1492 							 1,
1493 							 RelationGetDescr(state->indexRel),
1494 							 &stup.isnull1);
1495 
1496 	MemoryContextSwitchTo(state->sortcontext);
1497 
1498 	if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1499 	{
1500 		/*
1501 		 * Store ordinary Datum representation, or NULL value.  If there is a
1502 		 * converter it won't expect NULL values, and cost model is not
1503 		 * required to account for NULL, so in that case we avoid calling
1504 		 * converter and just set datum1 to zeroed representation (to be
1505 		 * consistent, and to support cheap inequality tests for NULL
1506 		 * abbreviated keys).
1507 		 */
1508 		stup.datum1 = original;
1509 	}
1510 	else if (!consider_abort_common(state))
1511 	{
1512 		/* Store abbreviated key representation */
1513 		stup.datum1 = state->sortKeys->abbrev_converter(original,
1514 														state->sortKeys);
1515 	}
1516 	else
1517 	{
1518 		/* Abort abbreviation */
1519 		int			i;
1520 
1521 		stup.datum1 = original;
1522 
1523 		/*
1524 		 * Set state to be consistent with never trying abbreviation.
1525 		 *
1526 		 * Alter datum1 representation in already-copied tuples, so as to
1527 		 * ensure a consistent representation (current tuple was just
1528 		 * handled).  It does not matter if some dumped tuples are already
1529 		 * sorted on tape, since serialized tuples lack abbreviated keys
1530 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1531 		 */
1532 		for (i = 0; i < state->memtupcount; i++)
1533 		{
1534 			SortTuple  *mtup = &state->memtuples[i];
1535 
1536 			tuple = mtup->tuple;
1537 			mtup->datum1 = index_getattr(tuple,
1538 										 1,
1539 										 RelationGetDescr(state->indexRel),
1540 										 &mtup->isnull1);
1541 		}
1542 	}
1543 
1544 	puttuple_common(state, &stup);
1545 
1546 	MemoryContextSwitchTo(oldcontext);
1547 }
1548 
1549 /*
1550  * Accept one Datum while collecting input data for sort.
1551  *
1552  * If the Datum is pass-by-ref type, the value will be copied.
1553  */
1554 void
tuplesort_putdatum(Tuplesortstate * state,Datum val,bool isNull)1555 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1556 {
1557 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1558 	SortTuple	stup;
1559 
1560 	/*
1561 	 * Pass-by-value types or null values are just stored directly in
1562 	 * stup.datum1 (and stup.tuple is not used and set to NULL).
1563 	 *
1564 	 * Non-null pass-by-reference values need to be copied into memory we
1565 	 * control, and possibly abbreviated. The copied value is pointed to by
1566 	 * stup.tuple and is treated as the canonical copy (e.g. to return via
1567 	 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1568 	 * abbreviated value if abbreviation is happening, otherwise it's
1569 	 * identical to stup.tuple.
1570 	 */
1571 
1572 	if (isNull || !state->tuples)
1573 	{
1574 		/*
1575 		 * Set datum1 to zeroed representation for NULLs (to be consistent,
1576 		 * and to support cheap inequality tests for NULL abbreviated keys).
1577 		 */
1578 		stup.datum1 = !isNull ? val : (Datum) 0;
1579 		stup.isnull1 = isNull;
1580 		stup.tuple = NULL;		/* no separate storage */
1581 		MemoryContextSwitchTo(state->sortcontext);
1582 	}
1583 	else
1584 	{
1585 		Datum		original = datumCopy(val, false, state->datumTypeLen);
1586 
1587 		stup.isnull1 = false;
1588 		stup.tuple = DatumGetPointer(original);
1589 		USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1590 		MemoryContextSwitchTo(state->sortcontext);
1591 
1592 		if (!state->sortKeys->abbrev_converter)
1593 		{
1594 			stup.datum1 = original;
1595 		}
1596 		else if (!consider_abort_common(state))
1597 		{
1598 			/* Store abbreviated key representation */
1599 			stup.datum1 = state->sortKeys->abbrev_converter(original,
1600 															state->sortKeys);
1601 		}
1602 		else
1603 		{
1604 			/* Abort abbreviation */
1605 			int			i;
1606 
1607 			stup.datum1 = original;
1608 
1609 			/*
1610 			 * Set state to be consistent with never trying abbreviation.
1611 			 *
1612 			 * Alter datum1 representation in already-copied tuples, so as to
1613 			 * ensure a consistent representation (current tuple was just
1614 			 * handled).  It does not matter if some dumped tuples are already
1615 			 * sorted on tape, since serialized tuples lack abbreviated keys
1616 			 * (TSS_BUILDRUNS state prevents control reaching here in any
1617 			 * case).
1618 			 */
1619 			for (i = 0; i < state->memtupcount; i++)
1620 			{
1621 				SortTuple  *mtup = &state->memtuples[i];
1622 
1623 				mtup->datum1 = PointerGetDatum(mtup->tuple);
1624 			}
1625 		}
1626 	}
1627 
1628 	puttuple_common(state, &stup);
1629 
1630 	MemoryContextSwitchTo(oldcontext);
1631 }
1632 
1633 /*
1634  * Shared code for tuple and datum cases.
1635  */
1636 static void
puttuple_common(Tuplesortstate * state,SortTuple * tuple)1637 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1638 {
1639 	Assert(!LEADER(state));
1640 
1641 	switch (state->status)
1642 	{
1643 		case TSS_INITIAL:
1644 
1645 			/*
1646 			 * Save the tuple into the unsorted array.  First, grow the array
1647 			 * as needed.  Note that we try to grow the array when there is
1648 			 * still one free slot remaining --- if we fail, there'll still be
1649 			 * room to store the incoming tuple, and then we'll switch to
1650 			 * tape-based operation.
1651 			 */
1652 			if (state->memtupcount >= state->memtupsize - 1)
1653 			{
1654 				(void) grow_memtuples(state);
1655 				Assert(state->memtupcount < state->memtupsize);
1656 			}
1657 			state->memtuples[state->memtupcount++] = *tuple;
1658 
1659 			/*
1660 			 * Check if it's time to switch over to a bounded heapsort. We do
1661 			 * so if the input tuple count exceeds twice the desired tuple
1662 			 * count (this is a heuristic for where heapsort becomes cheaper
1663 			 * than a quicksort), or if we've just filled workMem and have
1664 			 * enough tuples to meet the bound.
1665 			 *
1666 			 * Note that once we enter TSS_BOUNDED state we will always try to
1667 			 * complete the sort that way.  In the worst case, if later input
1668 			 * tuples are larger than earlier ones, this might cause us to
1669 			 * exceed workMem significantly.
1670 			 */
1671 			if (state->bounded &&
1672 				(state->memtupcount > state->bound * 2 ||
1673 				 (state->memtupcount > state->bound && LACKMEM(state))))
1674 			{
1675 #ifdef TRACE_SORT
1676 				if (trace_sort)
1677 					elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1678 						 state->memtupcount,
1679 						 pg_rusage_show(&state->ru_start));
1680 #endif
1681 				make_bounded_heap(state);
1682 				return;
1683 			}
1684 
1685 			/*
1686 			 * Done if we still fit in available memory and have array slots.
1687 			 */
1688 			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1689 				return;
1690 
1691 			/*
1692 			 * Nope; time to switch to tape-based operation.
1693 			 */
1694 			inittapes(state, true);
1695 
1696 			/*
1697 			 * Dump all tuples.
1698 			 */
1699 			dumptuples(state, false);
1700 			break;
1701 
1702 		case TSS_BOUNDED:
1703 
1704 			/*
1705 			 * We don't want to grow the array here, so check whether the new
1706 			 * tuple can be discarded before putting it in.  This should be a
1707 			 * good speed optimization, too, since when there are many more
1708 			 * input tuples than the bound, most input tuples can be discarded
1709 			 * with just this one comparison.  Note that because we currently
1710 			 * have the sort direction reversed, we must check for <= not >=.
1711 			 */
1712 			if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1713 			{
1714 				/* new tuple <= top of the heap, so we can discard it */
1715 				free_sort_tuple(state, tuple);
1716 				CHECK_FOR_INTERRUPTS();
1717 			}
1718 			else
1719 			{
1720 				/* discard top of heap, replacing it with the new tuple */
1721 				free_sort_tuple(state, &state->memtuples[0]);
1722 				tuplesort_heap_replace_top(state, tuple);
1723 			}
1724 			break;
1725 
1726 		case TSS_BUILDRUNS:
1727 
1728 			/*
1729 			 * Save the tuple into the unsorted array (there must be space)
1730 			 */
1731 			state->memtuples[state->memtupcount++] = *tuple;
1732 
1733 			/*
1734 			 * If we are over the memory limit, dump all tuples.
1735 			 */
1736 			dumptuples(state, false);
1737 			break;
1738 
1739 		default:
1740 			elog(ERROR, "invalid tuplesort state");
1741 			break;
1742 	}
1743 }
1744 
1745 static bool
consider_abort_common(Tuplesortstate * state)1746 consider_abort_common(Tuplesortstate *state)
1747 {
1748 	Assert(state->sortKeys[0].abbrev_converter != NULL);
1749 	Assert(state->sortKeys[0].abbrev_abort != NULL);
1750 	Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
1751 
1752 	/*
1753 	 * Check effectiveness of abbreviation optimization.  Consider aborting
1754 	 * when still within memory limit.
1755 	 */
1756 	if (state->status == TSS_INITIAL &&
1757 		state->memtupcount >= state->abbrevNext)
1758 	{
1759 		state->abbrevNext *= 2;
1760 
1761 		/*
1762 		 * Check opclass-supplied abbreviation abort routine.  It may indicate
1763 		 * that abbreviation should not proceed.
1764 		 */
1765 		if (!state->sortKeys->abbrev_abort(state->memtupcount,
1766 										   state->sortKeys))
1767 			return false;
1768 
1769 		/*
1770 		 * Finally, restore authoritative comparator, and indicate that
1771 		 * abbreviation is not in play by setting abbrev_converter to NULL
1772 		 */
1773 		state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
1774 		state->sortKeys[0].abbrev_converter = NULL;
1775 		/* Not strictly necessary, but be tidy */
1776 		state->sortKeys[0].abbrev_abort = NULL;
1777 		state->sortKeys[0].abbrev_full_comparator = NULL;
1778 
1779 		/* Give up - expect original pass-by-value representation */
1780 		return true;
1781 	}
1782 
1783 	return false;
1784 }
1785 
1786 /*
1787  * All tuples have been provided; finish the sort.
1788  */
1789 void
tuplesort_performsort(Tuplesortstate * state)1790 tuplesort_performsort(Tuplesortstate *state)
1791 {
1792 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1793 
1794 #ifdef TRACE_SORT
1795 	if (trace_sort)
1796 		elog(LOG, "performsort of worker %d starting: %s",
1797 			 state->worker, pg_rusage_show(&state->ru_start));
1798 #endif
1799 
1800 	switch (state->status)
1801 	{
1802 		case TSS_INITIAL:
1803 
1804 			/*
1805 			 * We were able to accumulate all the tuples within the allowed
1806 			 * amount of memory, or leader to take over worker tapes
1807 			 */
1808 			if (SERIAL(state))
1809 			{
1810 				/* Just qsort 'em and we're done */
1811 				tuplesort_sort_memtuples(state);
1812 				state->status = TSS_SORTEDINMEM;
1813 			}
1814 			else if (WORKER(state))
1815 			{
1816 				/*
1817 				 * Parallel workers must still dump out tuples to tape.  No
1818 				 * merge is required to produce single output run, though.
1819 				 */
1820 				inittapes(state, false);
1821 				dumptuples(state, true);
1822 				worker_nomergeruns(state);
1823 				state->status = TSS_SORTEDONTAPE;
1824 			}
1825 			else
1826 			{
1827 				/*
1828 				 * Leader will take over worker tapes and merge worker runs.
1829 				 * Note that mergeruns sets the correct state->status.
1830 				 */
1831 				leader_takeover_tapes(state);
1832 				mergeruns(state);
1833 			}
1834 			state->current = 0;
1835 			state->eof_reached = false;
1836 			state->markpos_block = 0L;
1837 			state->markpos_offset = 0;
1838 			state->markpos_eof = false;
1839 			break;
1840 
1841 		case TSS_BOUNDED:
1842 
1843 			/*
1844 			 * We were able to accumulate all the tuples required for output
1845 			 * in memory, using a heap to eliminate excess tuples.  Now we
1846 			 * have to transform the heap to a properly-sorted array.
1847 			 */
1848 			sort_bounded_heap(state);
1849 			state->current = 0;
1850 			state->eof_reached = false;
1851 			state->markpos_offset = 0;
1852 			state->markpos_eof = false;
1853 			state->status = TSS_SORTEDINMEM;
1854 			break;
1855 
1856 		case TSS_BUILDRUNS:
1857 
1858 			/*
1859 			 * Finish tape-based sort.  First, flush all tuples remaining in
1860 			 * memory out to tape; then merge until we have a single remaining
1861 			 * run (or, if !randomAccess and !WORKER(), one run per tape).
1862 			 * Note that mergeruns sets the correct state->status.
1863 			 */
1864 			dumptuples(state, true);
1865 			mergeruns(state);
1866 			state->eof_reached = false;
1867 			state->markpos_block = 0L;
1868 			state->markpos_offset = 0;
1869 			state->markpos_eof = false;
1870 			break;
1871 
1872 		default:
1873 			elog(ERROR, "invalid tuplesort state");
1874 			break;
1875 	}
1876 
1877 #ifdef TRACE_SORT
1878 	if (trace_sort)
1879 	{
1880 		if (state->status == TSS_FINALMERGE)
1881 			elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1882 				 state->worker, state->activeTapes,
1883 				 pg_rusage_show(&state->ru_start));
1884 		else
1885 			elog(LOG, "performsort of worker %d done: %s",
1886 				 state->worker, pg_rusage_show(&state->ru_start));
1887 	}
1888 #endif
1889 
1890 	MemoryContextSwitchTo(oldcontext);
1891 }
1892 
1893 /*
1894  * Internal routine to fetch the next tuple in either forward or back
1895  * direction into *stup.  Returns false if no more tuples.
1896  * Returned tuple belongs to tuplesort memory context, and must not be freed
1897  * by caller.  Note that fetched tuple is stored in memory that may be
1898  * recycled by any future fetch.
1899  */
1900 static bool
tuplesort_gettuple_common(Tuplesortstate * state,bool forward,SortTuple * stup)1901 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1902 						  SortTuple *stup)
1903 {
1904 	unsigned int tuplen;
1905 	size_t		nmoved;
1906 
1907 	Assert(!WORKER(state));
1908 
1909 	switch (state->status)
1910 	{
1911 		case TSS_SORTEDINMEM:
1912 			Assert(forward || state->randomAccess);
1913 			Assert(!state->slabAllocatorUsed);
1914 			if (forward)
1915 			{
1916 				if (state->current < state->memtupcount)
1917 				{
1918 					*stup = state->memtuples[state->current++];
1919 					return true;
1920 				}
1921 				state->eof_reached = true;
1922 
1923 				/*
1924 				 * Complain if caller tries to retrieve more tuples than
1925 				 * originally asked for in a bounded sort.  This is because
1926 				 * returning EOF here might be the wrong thing.
1927 				 */
1928 				if (state->bounded && state->current >= state->bound)
1929 					elog(ERROR, "retrieved too many tuples in a bounded sort");
1930 
1931 				return false;
1932 			}
1933 			else
1934 			{
1935 				if (state->current <= 0)
1936 					return false;
1937 
1938 				/*
1939 				 * if all tuples are fetched already then we return last
1940 				 * tuple, else - tuple before last returned.
1941 				 */
1942 				if (state->eof_reached)
1943 					state->eof_reached = false;
1944 				else
1945 				{
1946 					state->current--;	/* last returned tuple */
1947 					if (state->current <= 0)
1948 						return false;
1949 				}
1950 				*stup = state->memtuples[state->current - 1];
1951 				return true;
1952 			}
1953 			break;
1954 
1955 		case TSS_SORTEDONTAPE:
1956 			Assert(forward || state->randomAccess);
1957 			Assert(state->slabAllocatorUsed);
1958 
1959 			/*
1960 			 * The slot that held the tuple that we returned in previous
1961 			 * gettuple call can now be reused.
1962 			 */
1963 			if (state->lastReturnedTuple)
1964 			{
1965 				RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1966 				state->lastReturnedTuple = NULL;
1967 			}
1968 
1969 			if (forward)
1970 			{
1971 				if (state->eof_reached)
1972 					return false;
1973 
1974 				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1975 				{
1976 					READTUP(state, stup, state->result_tape, tuplen);
1977 
1978 					/*
1979 					 * Remember the tuple we return, so that we can recycle
1980 					 * its memory on next call.  (This can be NULL, in the
1981 					 * !state->tuples case).
1982 					 */
1983 					state->lastReturnedTuple = stup->tuple;
1984 
1985 					return true;
1986 				}
1987 				else
1988 				{
1989 					state->eof_reached = true;
1990 					return false;
1991 				}
1992 			}
1993 
1994 			/*
1995 			 * Backward.
1996 			 *
1997 			 * if all tuples are fetched already then we return last tuple,
1998 			 * else - tuple before last returned.
1999 			 */
2000 			if (state->eof_reached)
2001 			{
2002 				/*
2003 				 * Seek position is pointing just past the zero tuplen at the
2004 				 * end of file; back up to fetch last tuple's ending length
2005 				 * word.  If seek fails we must have a completely empty file.
2006 				 */
2007 				nmoved = LogicalTapeBackspace(state->tapeset,
2008 											  state->result_tape,
2009 											  2 * sizeof(unsigned int));
2010 				if (nmoved == 0)
2011 					return false;
2012 				else if (nmoved != 2 * sizeof(unsigned int))
2013 					elog(ERROR, "unexpected tape position");
2014 				state->eof_reached = false;
2015 			}
2016 			else
2017 			{
2018 				/*
2019 				 * Back up and fetch previously-returned tuple's ending length
2020 				 * word.  If seek fails, assume we are at start of file.
2021 				 */
2022 				nmoved = LogicalTapeBackspace(state->tapeset,
2023 											  state->result_tape,
2024 											  sizeof(unsigned int));
2025 				if (nmoved == 0)
2026 					return false;
2027 				else if (nmoved != sizeof(unsigned int))
2028 					elog(ERROR, "unexpected tape position");
2029 				tuplen = getlen(state, state->result_tape, false);
2030 
2031 				/*
2032 				 * Back up to get ending length word of tuple before it.
2033 				 */
2034 				nmoved = LogicalTapeBackspace(state->tapeset,
2035 											  state->result_tape,
2036 											  tuplen + 2 * sizeof(unsigned int));
2037 				if (nmoved == tuplen + sizeof(unsigned int))
2038 				{
2039 					/*
2040 					 * We backed up over the previous tuple, but there was no
2041 					 * ending length word before it.  That means that the prev
2042 					 * tuple is the first tuple in the file.  It is now the
2043 					 * next to read in forward direction (not obviously right,
2044 					 * but that is what in-memory case does).
2045 					 */
2046 					return false;
2047 				}
2048 				else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2049 					elog(ERROR, "bogus tuple length in backward scan");
2050 			}
2051 
2052 			tuplen = getlen(state, state->result_tape, false);
2053 
2054 			/*
2055 			 * Now we have the length of the prior tuple, back up and read it.
2056 			 * Note: READTUP expects we are positioned after the initial
2057 			 * length word of the tuple, so back up to that point.
2058 			 */
2059 			nmoved = LogicalTapeBackspace(state->tapeset,
2060 										  state->result_tape,
2061 										  tuplen);
2062 			if (nmoved != tuplen)
2063 				elog(ERROR, "bogus tuple length in backward scan");
2064 			READTUP(state, stup, state->result_tape, tuplen);
2065 
2066 			/*
2067 			 * Remember the tuple we return, so that we can recycle its memory
2068 			 * on next call. (This can be NULL, in the Datum case).
2069 			 */
2070 			state->lastReturnedTuple = stup->tuple;
2071 
2072 			return true;
2073 
2074 		case TSS_FINALMERGE:
2075 			Assert(forward);
2076 			/* We are managing memory ourselves, with the slab allocator. */
2077 			Assert(state->slabAllocatorUsed);
2078 
2079 			/*
2080 			 * The slab slot holding the tuple that we returned in previous
2081 			 * gettuple call can now be reused.
2082 			 */
2083 			if (state->lastReturnedTuple)
2084 			{
2085 				RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2086 				state->lastReturnedTuple = NULL;
2087 			}
2088 
2089 			/*
2090 			 * This code should match the inner loop of mergeonerun().
2091 			 */
2092 			if (state->memtupcount > 0)
2093 			{
2094 				int			srcTape = state->memtuples[0].tupindex;
2095 				SortTuple	newtup;
2096 
2097 				*stup = state->memtuples[0];
2098 
2099 				/*
2100 				 * Remember the tuple we return, so that we can recycle its
2101 				 * memory on next call. (This can be NULL, in the Datum case).
2102 				 */
2103 				state->lastReturnedTuple = stup->tuple;
2104 
2105 				/*
2106 				 * Pull next tuple from tape, and replace the returned tuple
2107 				 * at top of the heap with it.
2108 				 */
2109 				if (!mergereadnext(state, srcTape, &newtup))
2110 				{
2111 					/*
2112 					 * If no more data, we've reached end of run on this tape.
2113 					 * Remove the top node from the heap.
2114 					 */
2115 					tuplesort_heap_delete_top(state);
2116 
2117 					/*
2118 					 * Rewind to free the read buffer.  It'd go away at the
2119 					 * end of the sort anyway, but better to release the
2120 					 * memory early.
2121 					 */
2122 					LogicalTapeRewindForWrite(state->tapeset, srcTape);
2123 					return true;
2124 				}
2125 				newtup.tupindex = srcTape;
2126 				tuplesort_heap_replace_top(state, &newtup);
2127 				return true;
2128 			}
2129 			return false;
2130 
2131 		default:
2132 			elog(ERROR, "invalid tuplesort state");
2133 			return false;		/* keep compiler quiet */
2134 	}
2135 }
2136 
2137 /*
2138  * Fetch the next tuple in either forward or back direction.
2139  * If successful, put tuple in slot and return true; else, clear the slot
2140  * and return false.
2141  *
2142  * Caller may optionally be passed back abbreviated value (on true return
2143  * value) when abbreviation was used, which can be used to cheaply avoid
2144  * equality checks that might otherwise be required.  Caller can safely make a
2145  * determination of "non-equal tuple" based on simple binary inequality.  A
2146  * NULL value in leading attribute will set abbreviated value to zeroed
2147  * representation, which caller may rely on in abbreviated inequality check.
2148  *
2149  * If copy is true, the slot receives a tuple that's been copied into the
2150  * caller's memory context, so that it will stay valid regardless of future
2151  * manipulations of the tuplesort's state (up to and including deleting the
2152  * tuplesort).  If copy is false, the slot will just receive a pointer to a
2153  * tuple held within the tuplesort, which is more efficient, but only safe for
2154  * callers that are prepared to have any subsequent manipulation of the
2155  * tuplesort's state invalidate slot contents.
2156  */
2157 bool
tuplesort_gettupleslot(Tuplesortstate * state,bool forward,bool copy,TupleTableSlot * slot,Datum * abbrev)2158 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2159 					   TupleTableSlot *slot, Datum *abbrev)
2160 {
2161 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2162 	SortTuple	stup;
2163 
2164 	if (!tuplesort_gettuple_common(state, forward, &stup))
2165 		stup.tuple = NULL;
2166 
2167 	MemoryContextSwitchTo(oldcontext);
2168 
2169 	if (stup.tuple)
2170 	{
2171 		/* Record abbreviated key for caller */
2172 		if (state->sortKeys->abbrev_converter && abbrev)
2173 			*abbrev = stup.datum1;
2174 
2175 		if (copy)
2176 			stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2177 
2178 		ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2179 		return true;
2180 	}
2181 	else
2182 	{
2183 		ExecClearTuple(slot);
2184 		return false;
2185 	}
2186 }
2187 
2188 /*
2189  * Fetch the next tuple in either forward or back direction.
2190  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2191  * context, and must not be freed by caller.  Caller may not rely on tuple
2192  * remaining valid after any further manipulation of tuplesort.
2193  */
2194 HeapTuple
tuplesort_getheaptuple(Tuplesortstate * state,bool forward)2195 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2196 {
2197 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2198 	SortTuple	stup;
2199 
2200 	if (!tuplesort_gettuple_common(state, forward, &stup))
2201 		stup.tuple = NULL;
2202 
2203 	MemoryContextSwitchTo(oldcontext);
2204 
2205 	return stup.tuple;
2206 }
2207 
2208 /*
2209  * Fetch the next index tuple in either forward or back direction.
2210  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2211  * context, and must not be freed by caller.  Caller may not rely on tuple
2212  * remaining valid after any further manipulation of tuplesort.
2213  */
2214 IndexTuple
tuplesort_getindextuple(Tuplesortstate * state,bool forward)2215 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2216 {
2217 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2218 	SortTuple	stup;
2219 
2220 	if (!tuplesort_gettuple_common(state, forward, &stup))
2221 		stup.tuple = NULL;
2222 
2223 	MemoryContextSwitchTo(oldcontext);
2224 
2225 	return (IndexTuple) stup.tuple;
2226 }
2227 
2228 /*
2229  * Fetch the next Datum in either forward or back direction.
2230  * Returns false if no more datums.
2231  *
2232  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2233  * in caller's context, and is now owned by the caller (this differs from
2234  * similar routines for other types of tuplesorts).
2235  *
2236  * Caller may optionally be passed back abbreviated value (on true return
2237  * value) when abbreviation was used, which can be used to cheaply avoid
2238  * equality checks that might otherwise be required.  Caller can safely make a
2239  * determination of "non-equal tuple" based on simple binary inequality.  A
2240  * NULL value will have a zeroed abbreviated value representation, which caller
2241  * may rely on in abbreviated inequality check.
2242  */
2243 bool
tuplesort_getdatum(Tuplesortstate * state,bool forward,Datum * val,bool * isNull,Datum * abbrev)2244 tuplesort_getdatum(Tuplesortstate *state, bool forward,
2245 				   Datum *val, bool *isNull, Datum *abbrev)
2246 {
2247 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2248 	SortTuple	stup;
2249 
2250 	if (!tuplesort_gettuple_common(state, forward, &stup))
2251 	{
2252 		MemoryContextSwitchTo(oldcontext);
2253 		return false;
2254 	}
2255 
2256 	/* Ensure we copy into caller's memory context */
2257 	MemoryContextSwitchTo(oldcontext);
2258 
2259 	/* Record abbreviated key for caller */
2260 	if (state->sortKeys->abbrev_converter && abbrev)
2261 		*abbrev = stup.datum1;
2262 
2263 	if (stup.isnull1 || !state->tuples)
2264 	{
2265 		*val = stup.datum1;
2266 		*isNull = stup.isnull1;
2267 	}
2268 	else
2269 	{
2270 		/* use stup.tuple because stup.datum1 may be an abbreviation */
2271 		*val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2272 		*isNull = false;
2273 	}
2274 
2275 	return true;
2276 }
2277 
2278 /*
2279  * Advance over N tuples in either forward or back direction,
2280  * without returning any data.  N==0 is a no-op.
2281  * Returns true if successful, false if ran out of tuples.
2282  */
2283 bool
tuplesort_skiptuples(Tuplesortstate * state,int64 ntuples,bool forward)2284 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2285 {
2286 	MemoryContext oldcontext;
2287 
2288 	/*
2289 	 * We don't actually support backwards skip yet, because no callers need
2290 	 * it.  The API is designed to allow for that later, though.
2291 	 */
2292 	Assert(forward);
2293 	Assert(ntuples >= 0);
2294 	Assert(!WORKER(state));
2295 
2296 	switch (state->status)
2297 	{
2298 		case TSS_SORTEDINMEM:
2299 			if (state->memtupcount - state->current >= ntuples)
2300 			{
2301 				state->current += ntuples;
2302 				return true;
2303 			}
2304 			state->current = state->memtupcount;
2305 			state->eof_reached = true;
2306 
2307 			/*
2308 			 * Complain if caller tries to retrieve more tuples than
2309 			 * originally asked for in a bounded sort.  This is because
2310 			 * returning EOF here might be the wrong thing.
2311 			 */
2312 			if (state->bounded && state->current >= state->bound)
2313 				elog(ERROR, "retrieved too many tuples in a bounded sort");
2314 
2315 			return false;
2316 
2317 		case TSS_SORTEDONTAPE:
2318 		case TSS_FINALMERGE:
2319 
2320 			/*
2321 			 * We could probably optimize these cases better, but for now it's
2322 			 * not worth the trouble.
2323 			 */
2324 			oldcontext = MemoryContextSwitchTo(state->sortcontext);
2325 			while (ntuples-- > 0)
2326 			{
2327 				SortTuple	stup;
2328 
2329 				if (!tuplesort_gettuple_common(state, forward, &stup))
2330 				{
2331 					MemoryContextSwitchTo(oldcontext);
2332 					return false;
2333 				}
2334 				CHECK_FOR_INTERRUPTS();
2335 			}
2336 			MemoryContextSwitchTo(oldcontext);
2337 			return true;
2338 
2339 		default:
2340 			elog(ERROR, "invalid tuplesort state");
2341 			return false;		/* keep compiler quiet */
2342 	}
2343 }
2344 
2345 /*
2346  * tuplesort_merge_order - report merge order we'll use for given memory
2347  * (note: "merge order" just means the number of input tapes in the merge).
2348  *
2349  * This is exported for use by the planner.  allowedMem is in bytes.
2350  */
2351 int
tuplesort_merge_order(int64 allowedMem)2352 tuplesort_merge_order(int64 allowedMem)
2353 {
2354 	int			mOrder;
2355 
2356 	/*
2357 	 * We need one tape for each merge input, plus another one for the output,
2358 	 * and each of these tapes needs buffer space.  In addition we want
2359 	 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2360 	 * count).
2361 	 *
2362 	 * Note: you might be thinking we need to account for the memtuples[]
2363 	 * array in this calculation, but we effectively treat that as part of the
2364 	 * MERGE_BUFFER_SIZE workspace.
2365 	 */
2366 	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2367 		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2368 
2369 	/*
2370 	 * Even in minimum memory, use at least a MINORDER merge.  On the other
2371 	 * hand, even when we have lots of memory, do not use more than a MAXORDER
2372 	 * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
2373 	 * additional tape reduces the amount of memory available to build runs,
2374 	 * which in turn can cause the same sort to need more runs, which makes
2375 	 * merging slower even if it can still be done in a single pass.  Also,
2376 	 * high order merges are quite slow due to CPU cache effects; it can be
2377 	 * faster to pay the I/O cost of a polyphase merge than to perform a
2378 	 * single merge pass across many hundreds of tapes.
2379 	 */
2380 	mOrder = Max(mOrder, MINORDER);
2381 	mOrder = Min(mOrder, MAXORDER);
2382 
2383 	return mOrder;
2384 }
2385 
2386 /*
2387  * inittapes - initialize for tape sorting.
2388  *
2389  * This is called only if we have found we won't sort in memory.
2390  */
2391 static void
inittapes(Tuplesortstate * state,bool mergeruns)2392 inittapes(Tuplesortstate *state, bool mergeruns)
2393 {
2394 	int			maxTapes,
2395 				j;
2396 
2397 	Assert(!LEADER(state));
2398 
2399 	if (mergeruns)
2400 	{
2401 		/* Compute number of tapes to use: merge order plus 1 */
2402 		maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2403 	}
2404 	else
2405 	{
2406 		/* Workers can sometimes produce single run, output without merge */
2407 		Assert(WORKER(state));
2408 		maxTapes = MINORDER + 1;
2409 	}
2410 
2411 #ifdef TRACE_SORT
2412 	if (trace_sort)
2413 		elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2414 			 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2415 #endif
2416 
2417 	/* Create the tape set and allocate the per-tape data arrays */
2418 	inittapestate(state, maxTapes);
2419 	state->tapeset =
2420 		LogicalTapeSetCreate(maxTapes, NULL,
2421 							 state->shared ? &state->shared->fileset : NULL,
2422 							 state->worker);
2423 
2424 	state->currentRun = 0;
2425 
2426 	/*
2427 	 * Initialize variables of Algorithm D (step D1).
2428 	 */
2429 	for (j = 0; j < maxTapes; j++)
2430 	{
2431 		state->tp_fib[j] = 1;
2432 		state->tp_runs[j] = 0;
2433 		state->tp_dummy[j] = 1;
2434 		state->tp_tapenum[j] = j;
2435 	}
2436 	state->tp_fib[state->tapeRange] = 0;
2437 	state->tp_dummy[state->tapeRange] = 0;
2438 
2439 	state->Level = 1;
2440 	state->destTape = 0;
2441 
2442 	state->status = TSS_BUILDRUNS;
2443 }
2444 
2445 /*
2446  * inittapestate - initialize generic tape management state
2447  */
2448 static void
inittapestate(Tuplesortstate * state,int maxTapes)2449 inittapestate(Tuplesortstate *state, int maxTapes)
2450 {
2451 	int64		tapeSpace;
2452 
2453 	/*
2454 	 * Decrease availMem to reflect the space needed for tape buffers; but
2455 	 * don't decrease it to the point that we have no room for tuples. (That
2456 	 * case is only likely to occur if sorting pass-by-value Datums; in all
2457 	 * other scenarios the memtuples[] array is unlikely to occupy more than
2458 	 * half of allowedMem.  In the pass-by-value case it's not important to
2459 	 * account for tuple space, so we don't care if LACKMEM becomes
2460 	 * inaccurate.)
2461 	 */
2462 	tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2463 
2464 	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2465 		USEMEM(state, tapeSpace);
2466 
2467 	/*
2468 	 * Make sure that the temp file(s) underlying the tape set are created in
2469 	 * suitable temp tablespaces.  For parallel sorts, this should have been
2470 	 * called already, but it doesn't matter if it is called a second time.
2471 	 */
2472 	PrepareTempTablespaces();
2473 
2474 	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2475 	state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2476 	state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2477 	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2478 	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2479 
2480 	/* Record # of tapes allocated (for duration of sort) */
2481 	state->maxTapes = maxTapes;
2482 	/* Record maximum # of tapes usable as inputs when merging */
2483 	state->tapeRange = maxTapes - 1;
2484 }
2485 
2486 /*
2487  * selectnewtape -- select new tape for new initial run.
2488  *
2489  * This is called after finishing a run when we know another run
2490  * must be started.  This implements steps D3, D4 of Algorithm D.
2491  */
2492 static void
selectnewtape(Tuplesortstate * state)2493 selectnewtape(Tuplesortstate *state)
2494 {
2495 	int			j;
2496 	int			a;
2497 
2498 	/* Step D3: advance j (destTape) */
2499 	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2500 	{
2501 		state->destTape++;
2502 		return;
2503 	}
2504 	if (state->tp_dummy[state->destTape] != 0)
2505 	{
2506 		state->destTape = 0;
2507 		return;
2508 	}
2509 
2510 	/* Step D4: increase level */
2511 	state->Level++;
2512 	a = state->tp_fib[0];
2513 	for (j = 0; j < state->tapeRange; j++)
2514 	{
2515 		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2516 		state->tp_fib[j] = a + state->tp_fib[j + 1];
2517 	}
2518 	state->destTape = 0;
2519 }
2520 
2521 /*
2522  * Initialize the slab allocation arena, for the given number of slots.
2523  */
2524 static void
init_slab_allocator(Tuplesortstate * state,int numSlots)2525 init_slab_allocator(Tuplesortstate *state, int numSlots)
2526 {
2527 	if (numSlots > 0)
2528 	{
2529 		char	   *p;
2530 		int			i;
2531 
2532 		state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2533 		state->slabMemoryEnd = state->slabMemoryBegin +
2534 			numSlots * SLAB_SLOT_SIZE;
2535 		state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2536 		USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2537 
2538 		p = state->slabMemoryBegin;
2539 		for (i = 0; i < numSlots - 1; i++)
2540 		{
2541 			((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2542 			p += SLAB_SLOT_SIZE;
2543 		}
2544 		((SlabSlot *) p)->nextfree = NULL;
2545 	}
2546 	else
2547 	{
2548 		state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2549 		state->slabFreeHead = NULL;
2550 	}
2551 	state->slabAllocatorUsed = true;
2552 }
2553 
2554 /*
2555  * mergeruns -- merge all the completed initial runs.
2556  *
2557  * This implements steps D5, D6 of Algorithm D.  All input data has
2558  * already been written to initial runs on tape (see dumptuples).
2559  */
2560 static void
mergeruns(Tuplesortstate * state)2561 mergeruns(Tuplesortstate *state)
2562 {
2563 	int			tapenum,
2564 				svTape,
2565 				svRuns,
2566 				svDummy;
2567 	int			numTapes;
2568 	int			numInputTapes;
2569 
2570 	Assert(state->status == TSS_BUILDRUNS);
2571 	Assert(state->memtupcount == 0);
2572 
2573 	if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2574 	{
2575 		/*
2576 		 * If there are multiple runs to be merged, when we go to read back
2577 		 * tuples from disk, abbreviated keys will not have been stored, and
2578 		 * we don't care to regenerate them.  Disable abbreviation from this
2579 		 * point on.
2580 		 */
2581 		state->sortKeys->abbrev_converter = NULL;
2582 		state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2583 
2584 		/* Not strictly necessary, but be tidy */
2585 		state->sortKeys->abbrev_abort = NULL;
2586 		state->sortKeys->abbrev_full_comparator = NULL;
2587 	}
2588 
2589 	/*
2590 	 * Reset tuple memory.  We've freed all the tuples that we previously
2591 	 * allocated.  We will use the slab allocator from now on.
2592 	 */
2593 	MemoryContextDelete(state->tuplecontext);
2594 	state->tuplecontext = NULL;
2595 
2596 	/*
2597 	 * We no longer need a large memtuples array.  (We will allocate a smaller
2598 	 * one for the heap later.)
2599 	 */
2600 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2601 	pfree(state->memtuples);
2602 	state->memtuples = NULL;
2603 
2604 	/*
2605 	 * If we had fewer runs than tapes, refund the memory that we imagined we
2606 	 * would need for the tape buffers of the unused tapes.
2607 	 *
2608 	 * numTapes and numInputTapes reflect the actual number of tapes we will
2609 	 * use.  Note that the output tape's tape number is maxTapes - 1, so the
2610 	 * tape numbers of the used tapes are not consecutive, and you cannot just
2611 	 * loop from 0 to numTapes to visit all used tapes!
2612 	 */
2613 	if (state->Level == 1)
2614 	{
2615 		numInputTapes = state->currentRun;
2616 		numTapes = numInputTapes + 1;
2617 		FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2618 	}
2619 	else
2620 	{
2621 		numInputTapes = state->tapeRange;
2622 		numTapes = state->maxTapes;
2623 	}
2624 
2625 	/*
2626 	 * Initialize the slab allocator.  We need one slab slot per input tape,
2627 	 * for the tuples in the heap, plus one to hold the tuple last returned
2628 	 * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
2629 	 * however, we don't need to do allocate anything.)
2630 	 *
2631 	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2632 	 * to track memory usage of individual tuples.
2633 	 */
2634 	if (state->tuples)
2635 		init_slab_allocator(state, numInputTapes + 1);
2636 	else
2637 		init_slab_allocator(state, 0);
2638 
2639 	/*
2640 	 * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
2641 	 * from each input tape.
2642 	 */
2643 	state->memtupsize = numInputTapes;
2644 	state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
2645 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2646 
2647 	/*
2648 	 * Use all the remaining memory we have available for read buffers among
2649 	 * the input tapes.
2650 	 *
2651 	 * We don't try to "rebalance" the memory among tapes, when we start a new
2652 	 * merge phase, even if some tapes are inactive in the new phase.  That
2653 	 * would be hard, because logtape.c doesn't know where one run ends and
2654 	 * another begins.  When a new merge phase begins, and a tape doesn't
2655 	 * participate in it, its buffer nevertheless already contains tuples from
2656 	 * the next run on same tape, so we cannot release the buffer.  That's OK
2657 	 * in practice, merge performance isn't that sensitive to the amount of
2658 	 * buffers used, and most merge phases use all or almost all tapes,
2659 	 * anyway.
2660 	 */
2661 #ifdef TRACE_SORT
2662 	if (trace_sort)
2663 		elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2664 			 state->worker, state->availMem / 1024, numInputTapes);
2665 #endif
2666 
2667 	state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2668 	USEMEM(state, state->read_buffer_size * numInputTapes);
2669 
2670 	/* End of step D2: rewind all output tapes to prepare for merging */
2671 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2672 		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2673 
2674 	for (;;)
2675 	{
2676 		/*
2677 		 * At this point we know that tape[T] is empty.  If there's just one
2678 		 * (real or dummy) run left on each input tape, then only one merge
2679 		 * pass remains.  If we don't have to produce a materialized sorted
2680 		 * tape, we can stop at this point and do the final merge on-the-fly.
2681 		 */
2682 		if (!state->randomAccess && !WORKER(state))
2683 		{
2684 			bool		allOneRun = true;
2685 
2686 			Assert(state->tp_runs[state->tapeRange] == 0);
2687 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2688 			{
2689 				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2690 				{
2691 					allOneRun = false;
2692 					break;
2693 				}
2694 			}
2695 			if (allOneRun)
2696 			{
2697 				/* Tell logtape.c we won't be writing anymore */
2698 				LogicalTapeSetForgetFreeSpace(state->tapeset);
2699 				/* Initialize for the final merge pass */
2700 				beginmerge(state);
2701 				state->status = TSS_FINALMERGE;
2702 				return;
2703 			}
2704 		}
2705 
2706 		/* Step D5: merge runs onto tape[T] until tape[P] is empty */
2707 		while (state->tp_runs[state->tapeRange - 1] ||
2708 			   state->tp_dummy[state->tapeRange - 1])
2709 		{
2710 			bool		allDummy = true;
2711 
2712 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2713 			{
2714 				if (state->tp_dummy[tapenum] == 0)
2715 				{
2716 					allDummy = false;
2717 					break;
2718 				}
2719 			}
2720 
2721 			if (allDummy)
2722 			{
2723 				state->tp_dummy[state->tapeRange]++;
2724 				for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2725 					state->tp_dummy[tapenum]--;
2726 			}
2727 			else
2728 				mergeonerun(state);
2729 		}
2730 
2731 		/* Step D6: decrease level */
2732 		if (--state->Level == 0)
2733 			break;
2734 		/* rewind output tape T to use as new input */
2735 		LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2736 								 state->read_buffer_size);
2737 		/* rewind used-up input tape P, and prepare it for write pass */
2738 		LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2739 		state->tp_runs[state->tapeRange - 1] = 0;
2740 
2741 		/*
2742 		 * reassign tape units per step D6; note we no longer care about A[]
2743 		 */
2744 		svTape = state->tp_tapenum[state->tapeRange];
2745 		svDummy = state->tp_dummy[state->tapeRange];
2746 		svRuns = state->tp_runs[state->tapeRange];
2747 		for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2748 		{
2749 			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
2750 			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
2751 			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
2752 		}
2753 		state->tp_tapenum[0] = svTape;
2754 		state->tp_dummy[0] = svDummy;
2755 		state->tp_runs[0] = svRuns;
2756 	}
2757 
2758 	/*
2759 	 * Done.  Knuth says that the result is on TAPE[1], but since we exited
2760 	 * the loop without performing the last iteration of step D6, we have not
2761 	 * rearranged the tape unit assignment, and therefore the result is on
2762 	 * TAPE[T].  We need to do it this way so that we can freeze the final
2763 	 * output tape while rewinding it.  The last iteration of step D6 would be
2764 	 * a waste of cycles anyway...
2765 	 */
2766 	state->result_tape = state->tp_tapenum[state->tapeRange];
2767 	if (!WORKER(state))
2768 		LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
2769 	else
2770 		worker_freeze_result_tape(state);
2771 	state->status = TSS_SORTEDONTAPE;
2772 
2773 	/* Release the read buffers of all the other tapes, by rewinding them. */
2774 	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
2775 	{
2776 		if (tapenum != state->result_tape)
2777 			LogicalTapeRewindForWrite(state->tapeset, tapenum);
2778 	}
2779 }
2780 
2781 /*
2782  * Merge one run from each input tape, except ones with dummy runs.
2783  *
2784  * This is the inner loop of Algorithm D step D5.  We know that the
2785  * output tape is TAPE[T].
2786  */
2787 static void
mergeonerun(Tuplesortstate * state)2788 mergeonerun(Tuplesortstate *state)
2789 {
2790 	int			destTape = state->tp_tapenum[state->tapeRange];
2791 	int			srcTape;
2792 
2793 	/*
2794 	 * Start the merge by loading one tuple from each active source tape into
2795 	 * the heap.  We can also decrease the input run/dummy run counts.
2796 	 */
2797 	beginmerge(state);
2798 
2799 	/*
2800 	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2801 	 * out, and replacing it with next tuple from same tape (if there is
2802 	 * another one).
2803 	 */
2804 	while (state->memtupcount > 0)
2805 	{
2806 		SortTuple	stup;
2807 
2808 		/* write the tuple to destTape */
2809 		srcTape = state->memtuples[0].tupindex;
2810 		WRITETUP(state, destTape, &state->memtuples[0]);
2811 
2812 		/* recycle the slot of the tuple we just wrote out, for the next read */
2813 		if (state->memtuples[0].tuple)
2814 			RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2815 
2816 		/*
2817 		 * pull next tuple from the tape, and replace the written-out tuple in
2818 		 * the heap with it.
2819 		 */
2820 		if (mergereadnext(state, srcTape, &stup))
2821 		{
2822 			stup.tupindex = srcTape;
2823 			tuplesort_heap_replace_top(state, &stup);
2824 
2825 		}
2826 		else
2827 			tuplesort_heap_delete_top(state);
2828 	}
2829 
2830 	/*
2831 	 * When the heap empties, we're done.  Write an end-of-run marker on the
2832 	 * output tape, and increment its count of real runs.
2833 	 */
2834 	markrunend(state, destTape);
2835 	state->tp_runs[state->tapeRange]++;
2836 
2837 #ifdef TRACE_SORT
2838 	if (trace_sort)
2839 		elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
2840 			 state->activeTapes, pg_rusage_show(&state->ru_start));
2841 #endif
2842 }
2843 
2844 /*
2845  * beginmerge - initialize for a merge pass
2846  *
2847  * We decrease the counts of real and dummy runs for each tape, and mark
2848  * which tapes contain active input runs in mergeactive[].  Then, fill the
2849  * merge heap with the first tuple from each active tape.
2850  */
2851 static void
beginmerge(Tuplesortstate * state)2852 beginmerge(Tuplesortstate *state)
2853 {
2854 	int			activeTapes;
2855 	int			tapenum;
2856 	int			srcTape;
2857 
2858 	/* Heap should be empty here */
2859 	Assert(state->memtupcount == 0);
2860 
2861 	/* Adjust run counts and mark the active tapes */
2862 	memset(state->mergeactive, 0,
2863 		   state->maxTapes * sizeof(*state->mergeactive));
2864 	activeTapes = 0;
2865 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2866 	{
2867 		if (state->tp_dummy[tapenum] > 0)
2868 			state->tp_dummy[tapenum]--;
2869 		else
2870 		{
2871 			Assert(state->tp_runs[tapenum] > 0);
2872 			state->tp_runs[tapenum]--;
2873 			srcTape = state->tp_tapenum[tapenum];
2874 			state->mergeactive[srcTape] = true;
2875 			activeTapes++;
2876 		}
2877 	}
2878 	Assert(activeTapes > 0);
2879 	state->activeTapes = activeTapes;
2880 
2881 	/* Load the merge heap with the first tuple from each input tape */
2882 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2883 	{
2884 		SortTuple	tup;
2885 
2886 		if (mergereadnext(state, srcTape, &tup))
2887 		{
2888 			tup.tupindex = srcTape;
2889 			tuplesort_heap_insert(state, &tup);
2890 		}
2891 	}
2892 }
2893 
2894 /*
2895  * mergereadnext - read next tuple from one merge input tape
2896  *
2897  * Returns false on EOF.
2898  */
2899 static bool
mergereadnext(Tuplesortstate * state,int srcTape,SortTuple * stup)2900 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
2901 {
2902 	unsigned int tuplen;
2903 
2904 	if (!state->mergeactive[srcTape])
2905 		return false;			/* tape's run is already exhausted */
2906 
2907 	/* read next tuple, if any */
2908 	if ((tuplen = getlen(state, srcTape, true)) == 0)
2909 	{
2910 		state->mergeactive[srcTape] = false;
2911 		return false;
2912 	}
2913 	READTUP(state, stup, srcTape, tuplen);
2914 
2915 	return true;
2916 }
2917 
2918 /*
2919  * dumptuples - remove tuples from memtuples and write initial run to tape
2920  *
2921  * When alltuples = true, dump everything currently in memory.  (This case is
2922  * only used at end of input data.)
2923  */
2924 static void
dumptuples(Tuplesortstate * state,bool alltuples)2925 dumptuples(Tuplesortstate *state, bool alltuples)
2926 {
2927 	int			memtupwrite;
2928 	int			i;
2929 
2930 	/*
2931 	 * Nothing to do if we still fit in available memory and have array slots,
2932 	 * unless this is the final call during initial run generation.
2933 	 */
2934 	if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2935 		!alltuples)
2936 		return;
2937 
2938 	/*
2939 	 * Final call might require no sorting, in rare cases where we just so
2940 	 * happen to have previously LACKMEM()'d at the point where exactly all
2941 	 * remaining tuples are loaded into memory, just before input was
2942 	 * exhausted.
2943 	 *
2944 	 * In general, short final runs are quite possible.  Rather than allowing
2945 	 * a special case where there was a superfluous selectnewtape() call (i.e.
2946 	 * a call with no subsequent run actually written to destTape), we prefer
2947 	 * to write out a 0 tuple run.
2948 	 *
2949 	 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
2950 	 * the tape inactive for the merge when called from beginmerge().  This
2951 	 * case is therefore similar to the case where mergeonerun() finds a dummy
2952 	 * run for the tape, and so doesn't need to merge a run from the tape (or
2953 	 * conceptually "merges" the dummy run, if you prefer).  According to
2954 	 * Knuth, Algorithm D "isn't strictly optimal" in its method of
2955 	 * distribution and dummy run assignment; this edge case seems very
2956 	 * unlikely to make that appreciably worse.
2957 	 */
2958 	Assert(state->status == TSS_BUILDRUNS);
2959 
2960 	/*
2961 	 * It seems unlikely that this limit will ever be exceeded, but take no
2962 	 * chances
2963 	 */
2964 	if (state->currentRun == INT_MAX)
2965 		ereport(ERROR,
2966 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2967 				 errmsg("cannot have more than %d runs for an external sort",
2968 						INT_MAX)));
2969 
2970 	state->currentRun++;
2971 
2972 #ifdef TRACE_SORT
2973 	if (trace_sort)
2974 		elog(LOG, "worker %d starting quicksort of run %d: %s",
2975 			 state->worker, state->currentRun,
2976 			 pg_rusage_show(&state->ru_start));
2977 #endif
2978 
2979 	/*
2980 	 * Sort all tuples accumulated within the allowed amount of memory for
2981 	 * this run using quicksort
2982 	 */
2983 	tuplesort_sort_memtuples(state);
2984 
2985 #ifdef TRACE_SORT
2986 	if (trace_sort)
2987 		elog(LOG, "worker %d finished quicksort of run %d: %s",
2988 			 state->worker, state->currentRun,
2989 			 pg_rusage_show(&state->ru_start));
2990 #endif
2991 
2992 	memtupwrite = state->memtupcount;
2993 	for (i = 0; i < memtupwrite; i++)
2994 	{
2995 		WRITETUP(state, state->tp_tapenum[state->destTape],
2996 				 &state->memtuples[i]);
2997 		state->memtupcount--;
2998 	}
2999 
3000 	/*
3001 	 * Reset tuple memory.  We've freed all of the tuples that we previously
3002 	 * allocated.  It's important to avoid fragmentation when there is a stark
3003 	 * change in the sizes of incoming tuples.  Fragmentation due to
3004 	 * AllocSetFree's bucketing by size class might be particularly bad if
3005 	 * this step wasn't taken.
3006 	 */
3007 	MemoryContextReset(state->tuplecontext);
3008 
3009 	markrunend(state, state->tp_tapenum[state->destTape]);
3010 	state->tp_runs[state->destTape]++;
3011 	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3012 
3013 #ifdef TRACE_SORT
3014 	if (trace_sort)
3015 		elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3016 			 state->worker, state->currentRun, state->destTape,
3017 			 pg_rusage_show(&state->ru_start));
3018 #endif
3019 
3020 	if (!alltuples)
3021 		selectnewtape(state);
3022 }
3023 
3024 /*
3025  * tuplesort_rescan		- rewind and replay the scan
3026  */
3027 void
tuplesort_rescan(Tuplesortstate * state)3028 tuplesort_rescan(Tuplesortstate *state)
3029 {
3030 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3031 
3032 	Assert(state->randomAccess);
3033 
3034 	switch (state->status)
3035 	{
3036 		case TSS_SORTEDINMEM:
3037 			state->current = 0;
3038 			state->eof_reached = false;
3039 			state->markpos_offset = 0;
3040 			state->markpos_eof = false;
3041 			break;
3042 		case TSS_SORTEDONTAPE:
3043 			LogicalTapeRewindForRead(state->tapeset,
3044 									 state->result_tape,
3045 									 0);
3046 			state->eof_reached = false;
3047 			state->markpos_block = 0L;
3048 			state->markpos_offset = 0;
3049 			state->markpos_eof = false;
3050 			break;
3051 		default:
3052 			elog(ERROR, "invalid tuplesort state");
3053 			break;
3054 	}
3055 
3056 	MemoryContextSwitchTo(oldcontext);
3057 }
3058 
3059 /*
3060  * tuplesort_markpos	- saves current position in the merged sort file
3061  */
3062 void
tuplesort_markpos(Tuplesortstate * state)3063 tuplesort_markpos(Tuplesortstate *state)
3064 {
3065 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3066 
3067 	Assert(state->randomAccess);
3068 
3069 	switch (state->status)
3070 	{
3071 		case TSS_SORTEDINMEM:
3072 			state->markpos_offset = state->current;
3073 			state->markpos_eof = state->eof_reached;
3074 			break;
3075 		case TSS_SORTEDONTAPE:
3076 			LogicalTapeTell(state->tapeset,
3077 							state->result_tape,
3078 							&state->markpos_block,
3079 							&state->markpos_offset);
3080 			state->markpos_eof = state->eof_reached;
3081 			break;
3082 		default:
3083 			elog(ERROR, "invalid tuplesort state");
3084 			break;
3085 	}
3086 
3087 	MemoryContextSwitchTo(oldcontext);
3088 }
3089 
3090 /*
3091  * tuplesort_restorepos - restores current position in merged sort file to
3092  *						  last saved position
3093  */
3094 void
tuplesort_restorepos(Tuplesortstate * state)3095 tuplesort_restorepos(Tuplesortstate *state)
3096 {
3097 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3098 
3099 	Assert(state->randomAccess);
3100 
3101 	switch (state->status)
3102 	{
3103 		case TSS_SORTEDINMEM:
3104 			state->current = state->markpos_offset;
3105 			state->eof_reached = state->markpos_eof;
3106 			break;
3107 		case TSS_SORTEDONTAPE:
3108 			LogicalTapeSeek(state->tapeset,
3109 							state->result_tape,
3110 							state->markpos_block,
3111 							state->markpos_offset);
3112 			state->eof_reached = state->markpos_eof;
3113 			break;
3114 		default:
3115 			elog(ERROR, "invalid tuplesort state");
3116 			break;
3117 	}
3118 
3119 	MemoryContextSwitchTo(oldcontext);
3120 }
3121 
3122 /*
3123  * tuplesort_get_stats - extract summary statistics
3124  *
3125  * This can be called after tuplesort_performsort() finishes to obtain
3126  * printable summary information about how the sort was performed.
3127  */
3128 void
tuplesort_get_stats(Tuplesortstate * state,TuplesortInstrumentation * stats)3129 tuplesort_get_stats(Tuplesortstate *state,
3130 					TuplesortInstrumentation *stats)
3131 {
3132 	/*
3133 	 * Note: it might seem we should provide both memory and disk usage for a
3134 	 * disk-based sort.  However, the current code doesn't track memory space
3135 	 * accurately once we have begun to return tuples to the caller (since we
3136 	 * don't account for pfree's the caller is expected to do), so we cannot
3137 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
3138 	 * to fix.  Is it worth creating an API for the memory context code to
3139 	 * tell us how much is actually used in sortcontext?
3140 	 */
3141 	if (state->tapeset)
3142 	{
3143 		stats->spaceType = SORT_SPACE_TYPE_DISK;
3144 		stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
3145 	}
3146 	else
3147 	{
3148 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3149 		stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
3150 	}
3151 
3152 	switch (state->status)
3153 	{
3154 		case TSS_SORTEDINMEM:
3155 			if (state->boundUsed)
3156 				stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3157 			else
3158 				stats->sortMethod = SORT_TYPE_QUICKSORT;
3159 			break;
3160 		case TSS_SORTEDONTAPE:
3161 			stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3162 			break;
3163 		case TSS_FINALMERGE:
3164 			stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3165 			break;
3166 		default:
3167 			stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3168 			break;
3169 	}
3170 }
3171 
3172 /*
3173  * Convert TuplesortMethod to a string.
3174  */
3175 const char *
tuplesort_method_name(TuplesortMethod m)3176 tuplesort_method_name(TuplesortMethod m)
3177 {
3178 	switch (m)
3179 	{
3180 		case SORT_TYPE_STILL_IN_PROGRESS:
3181 			return "still in progress";
3182 		case SORT_TYPE_TOP_N_HEAPSORT:
3183 			return "top-N heapsort";
3184 		case SORT_TYPE_QUICKSORT:
3185 			return "quicksort";
3186 		case SORT_TYPE_EXTERNAL_SORT:
3187 			return "external sort";
3188 		case SORT_TYPE_EXTERNAL_MERGE:
3189 			return "external merge";
3190 	}
3191 
3192 	return "unknown";
3193 }
3194 
3195 /*
3196  * Convert TuplesortSpaceType to a string.
3197  */
3198 const char *
tuplesort_space_type_name(TuplesortSpaceType t)3199 tuplesort_space_type_name(TuplesortSpaceType t)
3200 {
3201 	Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3202 	return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3203 }
3204 
3205 
3206 /*
3207  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3208  */
3209 
3210 /*
3211  * Convert the existing unordered array of SortTuples to a bounded heap,
3212  * discarding all but the smallest "state->bound" tuples.
3213  *
3214  * When working with a bounded heap, we want to keep the largest entry
3215  * at the root (array entry zero), instead of the smallest as in the normal
3216  * sort case.  This allows us to discard the largest entry cheaply.
3217  * Therefore, we temporarily reverse the sort direction.
3218  */
3219 static void
make_bounded_heap(Tuplesortstate * state)3220 make_bounded_heap(Tuplesortstate *state)
3221 {
3222 	int			tupcount = state->memtupcount;
3223 	int			i;
3224 
3225 	Assert(state->status == TSS_INITIAL);
3226 	Assert(state->bounded);
3227 	Assert(tupcount >= state->bound);
3228 	Assert(SERIAL(state));
3229 
3230 	/* Reverse sort direction so largest entry will be at root */
3231 	reversedirection(state);
3232 
3233 	state->memtupcount = 0;		/* make the heap empty */
3234 	for (i = 0; i < tupcount; i++)
3235 	{
3236 		if (state->memtupcount < state->bound)
3237 		{
3238 			/* Insert next tuple into heap */
3239 			/* Must copy source tuple to avoid possible overwrite */
3240 			SortTuple	stup = state->memtuples[i];
3241 
3242 			tuplesort_heap_insert(state, &stup);
3243 		}
3244 		else
3245 		{
3246 			/*
3247 			 * The heap is full.  Replace the largest entry with the new
3248 			 * tuple, or just discard it, if it's larger than anything already
3249 			 * in the heap.
3250 			 */
3251 			if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3252 			{
3253 				free_sort_tuple(state, &state->memtuples[i]);
3254 				CHECK_FOR_INTERRUPTS();
3255 			}
3256 			else
3257 				tuplesort_heap_replace_top(state, &state->memtuples[i]);
3258 		}
3259 	}
3260 
3261 	Assert(state->memtupcount == state->bound);
3262 	state->status = TSS_BOUNDED;
3263 }
3264 
3265 /*
3266  * Convert the bounded heap to a properly-sorted array
3267  */
3268 static void
sort_bounded_heap(Tuplesortstate * state)3269 sort_bounded_heap(Tuplesortstate *state)
3270 {
3271 	int			tupcount = state->memtupcount;
3272 
3273 	Assert(state->status == TSS_BOUNDED);
3274 	Assert(state->bounded);
3275 	Assert(tupcount == state->bound);
3276 	Assert(SERIAL(state));
3277 
3278 	/*
3279 	 * We can unheapify in place because each delete-top call will remove the
3280 	 * largest entry, which we can promptly store in the newly freed slot at
3281 	 * the end.  Once we're down to a single-entry heap, we're done.
3282 	 */
3283 	while (state->memtupcount > 1)
3284 	{
3285 		SortTuple	stup = state->memtuples[0];
3286 
3287 		/* this sifts-up the next-largest entry and decreases memtupcount */
3288 		tuplesort_heap_delete_top(state);
3289 		state->memtuples[state->memtupcount] = stup;
3290 	}
3291 	state->memtupcount = tupcount;
3292 
3293 	/*
3294 	 * Reverse sort direction back to the original state.  This is not
3295 	 * actually necessary but seems like a good idea for tidiness.
3296 	 */
3297 	reversedirection(state);
3298 
3299 	state->status = TSS_SORTEDINMEM;
3300 	state->boundUsed = true;
3301 }
3302 
3303 /*
3304  * Sort all memtuples using specialized qsort() routines.
3305  *
3306  * Quicksort is used for small in-memory sorts, and external sort runs.
3307  */
3308 static void
tuplesort_sort_memtuples(Tuplesortstate * state)3309 tuplesort_sort_memtuples(Tuplesortstate *state)
3310 {
3311 	Assert(!LEADER(state));
3312 
3313 	if (state->memtupcount > 1)
3314 	{
3315 		/* Can we use the single-key sort function? */
3316 		if (state->onlyKey != NULL)
3317 			qsort_ssup(state->memtuples, state->memtupcount,
3318 					   state->onlyKey);
3319 		else
3320 			qsort_tuple(state->memtuples,
3321 						state->memtupcount,
3322 						state->comparetup,
3323 						state);
3324 	}
3325 }
3326 
3327 /*
3328  * Insert a new tuple into an empty or existing heap, maintaining the
3329  * heap invariant.  Caller is responsible for ensuring there's room.
3330  *
3331  * Note: For some callers, tuple points to a memtuples[] entry above the
3332  * end of the heap.  This is safe as long as it's not immediately adjacent
3333  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3334  * is, it might get overwritten before being moved into the heap!
3335  */
3336 static void
tuplesort_heap_insert(Tuplesortstate * state,SortTuple * tuple)3337 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3338 {
3339 	SortTuple  *memtuples;
3340 	int			j;
3341 
3342 	memtuples = state->memtuples;
3343 	Assert(state->memtupcount < state->memtupsize);
3344 
3345 	CHECK_FOR_INTERRUPTS();
3346 
3347 	/*
3348 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3349 	 * using 1-based array indexes, not 0-based.
3350 	 */
3351 	j = state->memtupcount++;
3352 	while (j > 0)
3353 	{
3354 		int			i = (j - 1) >> 1;
3355 
3356 		if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3357 			break;
3358 		memtuples[j] = memtuples[i];
3359 		j = i;
3360 	}
3361 	memtuples[j] = *tuple;
3362 }
3363 
3364 /*
3365  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
3366  * memtupcount, and sift up to maintain the heap invariant.
3367  *
3368  * The caller has already free'd the tuple the top node points to,
3369  * if necessary.
3370  */
3371 static void
tuplesort_heap_delete_top(Tuplesortstate * state)3372 tuplesort_heap_delete_top(Tuplesortstate *state)
3373 {
3374 	SortTuple  *memtuples = state->memtuples;
3375 	SortTuple  *tuple;
3376 
3377 	if (--state->memtupcount <= 0)
3378 		return;
3379 
3380 	/*
3381 	 * Remove the last tuple in the heap, and re-insert it, by replacing the
3382 	 * current top node with it.
3383 	 */
3384 	tuple = &memtuples[state->memtupcount];
3385 	tuplesort_heap_replace_top(state, tuple);
3386 }
3387 
3388 /*
3389  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
3390  * maintain the heap invariant.
3391  *
3392  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3393  * Heapsort, steps H3-H8).
3394  */
3395 static void
tuplesort_heap_replace_top(Tuplesortstate * state,SortTuple * tuple)3396 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3397 {
3398 	SortTuple  *memtuples = state->memtuples;
3399 	unsigned int i,
3400 				n;
3401 
3402 	Assert(state->memtupcount >= 1);
3403 
3404 	CHECK_FOR_INTERRUPTS();
3405 
3406 	/*
3407 	 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3408 	 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3409 	 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3410 	 */
3411 	n = state->memtupcount;
3412 	i = 0;						/* i is where the "hole" is */
3413 	for (;;)
3414 	{
3415 		unsigned int j = 2 * i + 1;
3416 
3417 		if (j >= n)
3418 			break;
3419 		if (j + 1 < n &&
3420 			COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3421 			j++;
3422 		if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3423 			break;
3424 		memtuples[i] = memtuples[j];
3425 		i = j;
3426 	}
3427 	memtuples[i] = *tuple;
3428 }
3429 
3430 /*
3431  * Function to reverse the sort direction from its current state
3432  *
3433  * It is not safe to call this when performing hash tuplesorts
3434  */
3435 static void
reversedirection(Tuplesortstate * state)3436 reversedirection(Tuplesortstate *state)
3437 {
3438 	SortSupport sortKey = state->sortKeys;
3439 	int			nkey;
3440 
3441 	for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3442 	{
3443 		sortKey->ssup_reverse = !sortKey->ssup_reverse;
3444 		sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3445 	}
3446 }
3447 
3448 
3449 /*
3450  * Tape interface routines
3451  */
3452 
3453 static unsigned int
getlen(Tuplesortstate * state,int tapenum,bool eofOK)3454 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3455 {
3456 	unsigned int len;
3457 
3458 	if (LogicalTapeRead(state->tapeset, tapenum,
3459 						&len, sizeof(len)) != sizeof(len))
3460 		elog(ERROR, "unexpected end of tape");
3461 	if (len == 0 && !eofOK)
3462 		elog(ERROR, "unexpected end of data");
3463 	return len;
3464 }
3465 
3466 static void
markrunend(Tuplesortstate * state,int tapenum)3467 markrunend(Tuplesortstate *state, int tapenum)
3468 {
3469 	unsigned int len = 0;
3470 
3471 	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3472 }
3473 
3474 /*
3475  * Get memory for tuple from within READTUP() routine.
3476  *
3477  * We use next free slot from the slab allocator, or palloc() if the tuple
3478  * is too large for that.
3479  */
3480 static void *
readtup_alloc(Tuplesortstate * state,Size tuplen)3481 readtup_alloc(Tuplesortstate *state, Size tuplen)
3482 {
3483 	SlabSlot   *buf;
3484 
3485 	/*
3486 	 * We pre-allocate enough slots in the slab arena that we should never run
3487 	 * out.
3488 	 */
3489 	Assert(state->slabFreeHead);
3490 
3491 	if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3492 		return MemoryContextAlloc(state->sortcontext, tuplen);
3493 	else
3494 	{
3495 		buf = state->slabFreeHead;
3496 		/* Reuse this slot */
3497 		state->slabFreeHead = buf->nextfree;
3498 
3499 		return buf;
3500 	}
3501 }
3502 
3503 
3504 /*
3505  * Routines specialized for HeapTuple (actually MinimalTuple) case
3506  */
3507 
3508 static int
comparetup_heap(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3509 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3510 {
3511 	SortSupport sortKey = state->sortKeys;
3512 	HeapTupleData ltup;
3513 	HeapTupleData rtup;
3514 	TupleDesc	tupDesc;
3515 	int			nkey;
3516 	int32		compare;
3517 	AttrNumber	attno;
3518 	Datum		datum1,
3519 				datum2;
3520 	bool		isnull1,
3521 				isnull2;
3522 
3523 
3524 	/* Compare the leading sort key */
3525 	compare = ApplySortComparator(a->datum1, a->isnull1,
3526 								  b->datum1, b->isnull1,
3527 								  sortKey);
3528 	if (compare != 0)
3529 		return compare;
3530 
3531 	/* Compare additional sort keys */
3532 	ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3533 	ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3534 	rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3535 	rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3536 	tupDesc = state->tupDesc;
3537 
3538 	if (sortKey->abbrev_converter)
3539 	{
3540 		attno = sortKey->ssup_attno;
3541 
3542 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3543 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3544 
3545 		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3546 												datum2, isnull2,
3547 												sortKey);
3548 		if (compare != 0)
3549 			return compare;
3550 	}
3551 
3552 	sortKey++;
3553 	for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3554 	{
3555 		attno = sortKey->ssup_attno;
3556 
3557 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3558 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3559 
3560 		compare = ApplySortComparator(datum1, isnull1,
3561 									  datum2, isnull2,
3562 									  sortKey);
3563 		if (compare != 0)
3564 			return compare;
3565 	}
3566 
3567 	return 0;
3568 }
3569 
3570 static void
copytup_heap(Tuplesortstate * state,SortTuple * stup,void * tup)3571 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3572 {
3573 	/*
3574 	 * We expect the passed "tup" to be a TupleTableSlot, and form a
3575 	 * MinimalTuple using the exported interface for that.
3576 	 */
3577 	TupleTableSlot *slot = (TupleTableSlot *) tup;
3578 	Datum		original;
3579 	MinimalTuple tuple;
3580 	HeapTupleData htup;
3581 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3582 
3583 	/* copy the tuple into sort storage */
3584 	tuple = ExecCopySlotMinimalTuple(slot);
3585 	stup->tuple = (void *) tuple;
3586 	USEMEM(state, GetMemoryChunkSpace(tuple));
3587 	/* set up first-column key value */
3588 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3589 	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3590 	original = heap_getattr(&htup,
3591 							state->sortKeys[0].ssup_attno,
3592 							state->tupDesc,
3593 							&stup->isnull1);
3594 
3595 	MemoryContextSwitchTo(oldcontext);
3596 
3597 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
3598 	{
3599 		/*
3600 		 * Store ordinary Datum representation, or NULL value.  If there is a
3601 		 * converter it won't expect NULL values, and cost model is not
3602 		 * required to account for NULL, so in that case we avoid calling
3603 		 * converter and just set datum1 to zeroed representation (to be
3604 		 * consistent, and to support cheap inequality tests for NULL
3605 		 * abbreviated keys).
3606 		 */
3607 		stup->datum1 = original;
3608 	}
3609 	else if (!consider_abort_common(state))
3610 	{
3611 		/* Store abbreviated key representation */
3612 		stup->datum1 = state->sortKeys->abbrev_converter(original,
3613 														 state->sortKeys);
3614 	}
3615 	else
3616 	{
3617 		/* Abort abbreviation */
3618 		int			i;
3619 
3620 		stup->datum1 = original;
3621 
3622 		/*
3623 		 * Set state to be consistent with never trying abbreviation.
3624 		 *
3625 		 * Alter datum1 representation in already-copied tuples, so as to
3626 		 * ensure a consistent representation (current tuple was just
3627 		 * handled).  It does not matter if some dumped tuples are already
3628 		 * sorted on tape, since serialized tuples lack abbreviated keys
3629 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3630 		 */
3631 		for (i = 0; i < state->memtupcount; i++)
3632 		{
3633 			SortTuple  *mtup = &state->memtuples[i];
3634 
3635 			htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3636 				MINIMAL_TUPLE_OFFSET;
3637 			htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3638 											 MINIMAL_TUPLE_OFFSET);
3639 
3640 			mtup->datum1 = heap_getattr(&htup,
3641 										state->sortKeys[0].ssup_attno,
3642 										state->tupDesc,
3643 										&mtup->isnull1);
3644 		}
3645 	}
3646 }
3647 
3648 static void
writetup_heap(Tuplesortstate * state,int tapenum,SortTuple * stup)3649 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3650 {
3651 	MinimalTuple tuple = (MinimalTuple) stup->tuple;
3652 
3653 	/* the part of the MinimalTuple we'll write: */
3654 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3655 	unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3656 
3657 	/* total on-disk footprint: */
3658 	unsigned int tuplen = tupbodylen + sizeof(int);
3659 
3660 	LogicalTapeWrite(state->tapeset, tapenum,
3661 					 (void *) &tuplen, sizeof(tuplen));
3662 	LogicalTapeWrite(state->tapeset, tapenum,
3663 					 (void *) tupbody, tupbodylen);
3664 	if (state->randomAccess)	/* need trailing length word? */
3665 		LogicalTapeWrite(state->tapeset, tapenum,
3666 						 (void *) &tuplen, sizeof(tuplen));
3667 
3668 	if (!state->slabAllocatorUsed)
3669 	{
3670 		FREEMEM(state, GetMemoryChunkSpace(tuple));
3671 		heap_free_minimal_tuple(tuple);
3672 	}
3673 }
3674 
3675 static void
readtup_heap(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)3676 readtup_heap(Tuplesortstate *state, SortTuple *stup,
3677 			 int tapenum, unsigned int len)
3678 {
3679 	unsigned int tupbodylen = len - sizeof(int);
3680 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3681 	MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3682 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3683 	HeapTupleData htup;
3684 
3685 	/* read in the tuple proper */
3686 	tuple->t_len = tuplen;
3687 	LogicalTapeReadExact(state->tapeset, tapenum,
3688 						 tupbody, tupbodylen);
3689 	if (state->randomAccess)	/* need trailing length word? */
3690 		LogicalTapeReadExact(state->tapeset, tapenum,
3691 							 &tuplen, sizeof(tuplen));
3692 	stup->tuple = (void *) tuple;
3693 	/* set up first-column key value */
3694 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3695 	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3696 	stup->datum1 = heap_getattr(&htup,
3697 								state->sortKeys[0].ssup_attno,
3698 								state->tupDesc,
3699 								&stup->isnull1);
3700 }
3701 
3702 /*
3703  * Routines specialized for the CLUSTER case (HeapTuple data, with
3704  * comparisons per a btree index definition)
3705  */
3706 
3707 static int
comparetup_cluster(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3708 comparetup_cluster(const SortTuple *a, const SortTuple *b,
3709 				   Tuplesortstate *state)
3710 {
3711 	SortSupport sortKey = state->sortKeys;
3712 	HeapTuple	ltup;
3713 	HeapTuple	rtup;
3714 	TupleDesc	tupDesc;
3715 	int			nkey;
3716 	int32		compare;
3717 	Datum		datum1,
3718 				datum2;
3719 	bool		isnull1,
3720 				isnull2;
3721 	AttrNumber	leading = state->indexInfo->ii_IndexAttrNumbers[0];
3722 
3723 	/* Be prepared to compare additional sort keys */
3724 	ltup = (HeapTuple) a->tuple;
3725 	rtup = (HeapTuple) b->tuple;
3726 	tupDesc = state->tupDesc;
3727 
3728 	/* Compare the leading sort key, if it's simple */
3729 	if (leading != 0)
3730 	{
3731 		compare = ApplySortComparator(a->datum1, a->isnull1,
3732 									  b->datum1, b->isnull1,
3733 									  sortKey);
3734 		if (compare != 0)
3735 			return compare;
3736 
3737 		if (sortKey->abbrev_converter)
3738 		{
3739 			datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3740 			datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3741 
3742 			compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3743 													datum2, isnull2,
3744 													sortKey);
3745 		}
3746 		if (compare != 0 || state->nKeys == 1)
3747 			return compare;
3748 		/* Compare additional columns the hard way */
3749 		sortKey++;
3750 		nkey = 1;
3751 	}
3752 	else
3753 	{
3754 		/* Must compare all keys the hard way */
3755 		nkey = 0;
3756 	}
3757 
3758 	if (state->indexInfo->ii_Expressions == NULL)
3759 	{
3760 		/* If not expression index, just compare the proper heap attrs */
3761 
3762 		for (; nkey < state->nKeys; nkey++, sortKey++)
3763 		{
3764 			AttrNumber	attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
3765 
3766 			datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
3767 			datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
3768 
3769 			compare = ApplySortComparator(datum1, isnull1,
3770 										  datum2, isnull2,
3771 										  sortKey);
3772 			if (compare != 0)
3773 				return compare;
3774 		}
3775 	}
3776 	else
3777 	{
3778 		/*
3779 		 * In the expression index case, compute the whole index tuple and
3780 		 * then compare values.  It would perhaps be faster to compute only as
3781 		 * many columns as we need to compare, but that would require
3782 		 * duplicating all the logic in FormIndexDatum.
3783 		 */
3784 		Datum		l_index_values[INDEX_MAX_KEYS];
3785 		bool		l_index_isnull[INDEX_MAX_KEYS];
3786 		Datum		r_index_values[INDEX_MAX_KEYS];
3787 		bool		r_index_isnull[INDEX_MAX_KEYS];
3788 		TupleTableSlot *ecxt_scantuple;
3789 
3790 		/* Reset context each time to prevent memory leakage */
3791 		ResetPerTupleExprContext(state->estate);
3792 
3793 		ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
3794 
3795 		ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
3796 		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3797 					   l_index_values, l_index_isnull);
3798 
3799 		ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
3800 		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3801 					   r_index_values, r_index_isnull);
3802 
3803 		for (; nkey < state->nKeys; nkey++, sortKey++)
3804 		{
3805 			compare = ApplySortComparator(l_index_values[nkey],
3806 										  l_index_isnull[nkey],
3807 										  r_index_values[nkey],
3808 										  r_index_isnull[nkey],
3809 										  sortKey);
3810 			if (compare != 0)
3811 				return compare;
3812 		}
3813 	}
3814 
3815 	return 0;
3816 }
3817 
3818 static void
copytup_cluster(Tuplesortstate * state,SortTuple * stup,void * tup)3819 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3820 {
3821 	HeapTuple	tuple = (HeapTuple) tup;
3822 	Datum		original;
3823 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3824 
3825 	/* copy the tuple into sort storage */
3826 	tuple = heap_copytuple(tuple);
3827 	stup->tuple = (void *) tuple;
3828 	USEMEM(state, GetMemoryChunkSpace(tuple));
3829 
3830 	MemoryContextSwitchTo(oldcontext);
3831 
3832 	/*
3833 	 * set up first-column key value, and potentially abbreviate, if it's a
3834 	 * simple column
3835 	 */
3836 	if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
3837 		return;
3838 
3839 	original = heap_getattr(tuple,
3840 							state->indexInfo->ii_IndexAttrNumbers[0],
3841 							state->tupDesc,
3842 							&stup->isnull1);
3843 
3844 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
3845 	{
3846 		/*
3847 		 * Store ordinary Datum representation, or NULL value.  If there is a
3848 		 * converter it won't expect NULL values, and cost model is not
3849 		 * required to account for NULL, so in that case we avoid calling
3850 		 * converter and just set datum1 to zeroed representation (to be
3851 		 * consistent, and to support cheap inequality tests for NULL
3852 		 * abbreviated keys).
3853 		 */
3854 		stup->datum1 = original;
3855 	}
3856 	else if (!consider_abort_common(state))
3857 	{
3858 		/* Store abbreviated key representation */
3859 		stup->datum1 = state->sortKeys->abbrev_converter(original,
3860 														 state->sortKeys);
3861 	}
3862 	else
3863 	{
3864 		/* Abort abbreviation */
3865 		int			i;
3866 
3867 		stup->datum1 = original;
3868 
3869 		/*
3870 		 * Set state to be consistent with never trying abbreviation.
3871 		 *
3872 		 * Alter datum1 representation in already-copied tuples, so as to
3873 		 * ensure a consistent representation (current tuple was just
3874 		 * handled).  It does not matter if some dumped tuples are already
3875 		 * sorted on tape, since serialized tuples lack abbreviated keys
3876 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3877 		 */
3878 		for (i = 0; i < state->memtupcount; i++)
3879 		{
3880 			SortTuple  *mtup = &state->memtuples[i];
3881 
3882 			tuple = (HeapTuple) mtup->tuple;
3883 			mtup->datum1 = heap_getattr(tuple,
3884 										state->indexInfo->ii_IndexAttrNumbers[0],
3885 										state->tupDesc,
3886 										&mtup->isnull1);
3887 		}
3888 	}
3889 }
3890 
3891 static void
writetup_cluster(Tuplesortstate * state,int tapenum,SortTuple * stup)3892 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
3893 {
3894 	HeapTuple	tuple = (HeapTuple) stup->tuple;
3895 	unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
3896 
3897 	/* We need to store t_self, but not other fields of HeapTupleData */
3898 	LogicalTapeWrite(state->tapeset, tapenum,
3899 					 &tuplen, sizeof(tuplen));
3900 	LogicalTapeWrite(state->tapeset, tapenum,
3901 					 &tuple->t_self, sizeof(ItemPointerData));
3902 	LogicalTapeWrite(state->tapeset, tapenum,
3903 					 tuple->t_data, tuple->t_len);
3904 	if (state->randomAccess)	/* need trailing length word? */
3905 		LogicalTapeWrite(state->tapeset, tapenum,
3906 						 &tuplen, sizeof(tuplen));
3907 
3908 	if (!state->slabAllocatorUsed)
3909 	{
3910 		FREEMEM(state, GetMemoryChunkSpace(tuple));
3911 		heap_freetuple(tuple);
3912 	}
3913 }
3914 
3915 static void
readtup_cluster(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int tuplen)3916 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
3917 				int tapenum, unsigned int tuplen)
3918 {
3919 	unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
3920 	HeapTuple	tuple = (HeapTuple) readtup_alloc(state,
3921 												  t_len + HEAPTUPLESIZE);
3922 
3923 	/* Reconstruct the HeapTupleData header */
3924 	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
3925 	tuple->t_len = t_len;
3926 	LogicalTapeReadExact(state->tapeset, tapenum,
3927 						 &tuple->t_self, sizeof(ItemPointerData));
3928 	/* We don't currently bother to reconstruct t_tableOid */
3929 	tuple->t_tableOid = InvalidOid;
3930 	/* Read in the tuple body */
3931 	LogicalTapeReadExact(state->tapeset, tapenum,
3932 						 tuple->t_data, tuple->t_len);
3933 	if (state->randomAccess)	/* need trailing length word? */
3934 		LogicalTapeReadExact(state->tapeset, tapenum,
3935 							 &tuplen, sizeof(tuplen));
3936 	stup->tuple = (void *) tuple;
3937 	/* set up first-column key value, if it's a simple column */
3938 	if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
3939 		stup->datum1 = heap_getattr(tuple,
3940 									state->indexInfo->ii_IndexAttrNumbers[0],
3941 									state->tupDesc,
3942 									&stup->isnull1);
3943 }
3944 
3945 /*
3946  * Routines specialized for IndexTuple case
3947  *
3948  * The btree and hash cases require separate comparison functions, but the
3949  * IndexTuple representation is the same so the copy/write/read support
3950  * functions can be shared.
3951  */
3952 
3953 static int
comparetup_index_btree(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3954 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
3955 					   Tuplesortstate *state)
3956 {
3957 	/*
3958 	 * This is similar to comparetup_heap(), but expects index tuples.  There
3959 	 * is also special handling for enforcing uniqueness, and special
3960 	 * treatment for equal keys at the end.
3961 	 */
3962 	SortSupport sortKey = state->sortKeys;
3963 	IndexTuple	tuple1;
3964 	IndexTuple	tuple2;
3965 	int			keysz;
3966 	TupleDesc	tupDes;
3967 	bool		equal_hasnull = false;
3968 	int			nkey;
3969 	int32		compare;
3970 	Datum		datum1,
3971 				datum2;
3972 	bool		isnull1,
3973 				isnull2;
3974 
3975 
3976 	/* Compare the leading sort key */
3977 	compare = ApplySortComparator(a->datum1, a->isnull1,
3978 								  b->datum1, b->isnull1,
3979 								  sortKey);
3980 	if (compare != 0)
3981 		return compare;
3982 
3983 	/* Compare additional sort keys */
3984 	tuple1 = (IndexTuple) a->tuple;
3985 	tuple2 = (IndexTuple) b->tuple;
3986 	keysz = state->nKeys;
3987 	tupDes = RelationGetDescr(state->indexRel);
3988 
3989 	if (sortKey->abbrev_converter)
3990 	{
3991 		datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
3992 		datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
3993 
3994 		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3995 												datum2, isnull2,
3996 												sortKey);
3997 		if (compare != 0)
3998 			return compare;
3999 	}
4000 
4001 	/* they are equal, so we only need to examine one null flag */
4002 	if (a->isnull1)
4003 		equal_hasnull = true;
4004 
4005 	sortKey++;
4006 	for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4007 	{
4008 		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4009 		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4010 
4011 		compare = ApplySortComparator(datum1, isnull1,
4012 									  datum2, isnull2,
4013 									  sortKey);
4014 		if (compare != 0)
4015 			return compare;		/* done when we find unequal attributes */
4016 
4017 		/* they are equal, so we only need to examine one null flag */
4018 		if (isnull1)
4019 			equal_hasnull = true;
4020 	}
4021 
4022 	/*
4023 	 * If btree has asked us to enforce uniqueness, complain if two equal
4024 	 * tuples are detected (unless there was at least one NULL field).
4025 	 *
4026 	 * It is sufficient to make the test here, because if two tuples are equal
4027 	 * they *must* get compared at some stage of the sort --- otherwise the
4028 	 * sort algorithm wouldn't have checked whether one must appear before the
4029 	 * other.
4030 	 */
4031 	if (state->enforceUnique && !equal_hasnull)
4032 	{
4033 		Datum		values[INDEX_MAX_KEYS];
4034 		bool		isnull[INDEX_MAX_KEYS];
4035 		char	   *key_desc;
4036 
4037 		/*
4038 		 * Some rather brain-dead implementations of qsort (such as the one in
4039 		 * QNX 4) will sometimes call the comparison routine to compare a
4040 		 * value to itself, but we always use our own implementation, which
4041 		 * does not.
4042 		 */
4043 		Assert(tuple1 != tuple2);
4044 
4045 		index_deform_tuple(tuple1, tupDes, values, isnull);
4046 
4047 		key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4048 
4049 		ereport(ERROR,
4050 				(errcode(ERRCODE_UNIQUE_VIOLATION),
4051 				 errmsg("could not create unique index \"%s\"",
4052 						RelationGetRelationName(state->indexRel)),
4053 				 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4054 				 errdetail("Duplicate keys exist."),
4055 				 errtableconstraint(state->heapRel,
4056 									RelationGetRelationName(state->indexRel))));
4057 	}
4058 
4059 	/*
4060 	 * If key values are equal, we sort on ItemPointer.  This is required for
4061 	 * btree indexes, since heap TID is treated as an implicit last key
4062 	 * attribute in order to ensure that all keys in the index are physically
4063 	 * unique.
4064 	 */
4065 	{
4066 		BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4067 		BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4068 
4069 		if (blk1 != blk2)
4070 			return (blk1 < blk2) ? -1 : 1;
4071 	}
4072 	{
4073 		OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4074 		OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4075 
4076 		if (pos1 != pos2)
4077 			return (pos1 < pos2) ? -1 : 1;
4078 	}
4079 
4080 	/* ItemPointer values should never be equal */
4081 	Assert(false);
4082 
4083 	return 0;
4084 }
4085 
4086 static int
comparetup_index_hash(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4087 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4088 					  Tuplesortstate *state)
4089 {
4090 	Bucket		bucket1;
4091 	Bucket		bucket2;
4092 	IndexTuple	tuple1;
4093 	IndexTuple	tuple2;
4094 
4095 	/*
4096 	 * Fetch hash keys and mask off bits we don't want to sort by. We know
4097 	 * that the first column of the index tuple is the hash key.
4098 	 */
4099 	Assert(!a->isnull1);
4100 	bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4101 								   state->max_buckets, state->high_mask,
4102 								   state->low_mask);
4103 	Assert(!b->isnull1);
4104 	bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4105 								   state->max_buckets, state->high_mask,
4106 								   state->low_mask);
4107 	if (bucket1 > bucket2)
4108 		return 1;
4109 	else if (bucket1 < bucket2)
4110 		return -1;
4111 
4112 	/*
4113 	 * If hash values are equal, we sort on ItemPointer.  This does not affect
4114 	 * validity of the finished index, but it may be useful to have index
4115 	 * scans in physical order.
4116 	 */
4117 	tuple1 = (IndexTuple) a->tuple;
4118 	tuple2 = (IndexTuple) b->tuple;
4119 
4120 	{
4121 		BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4122 		BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4123 
4124 		if (blk1 != blk2)
4125 			return (blk1 < blk2) ? -1 : 1;
4126 	}
4127 	{
4128 		OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4129 		OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4130 
4131 		if (pos1 != pos2)
4132 			return (pos1 < pos2) ? -1 : 1;
4133 	}
4134 
4135 	/* ItemPointer values should never be equal */
4136 	Assert(false);
4137 
4138 	return 0;
4139 }
4140 
4141 static void
copytup_index(Tuplesortstate * state,SortTuple * stup,void * tup)4142 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4143 {
4144 	IndexTuple	tuple = (IndexTuple) tup;
4145 	unsigned int tuplen = IndexTupleSize(tuple);
4146 	IndexTuple	newtuple;
4147 	Datum		original;
4148 
4149 	/* copy the tuple into sort storage */
4150 	newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
4151 	memcpy(newtuple, tuple, tuplen);
4152 	USEMEM(state, GetMemoryChunkSpace(newtuple));
4153 	stup->tuple = (void *) newtuple;
4154 	/* set up first-column key value */
4155 	original = index_getattr(newtuple,
4156 							 1,
4157 							 RelationGetDescr(state->indexRel),
4158 							 &stup->isnull1);
4159 
4160 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
4161 	{
4162 		/*
4163 		 * Store ordinary Datum representation, or NULL value.  If there is a
4164 		 * converter it won't expect NULL values, and cost model is not
4165 		 * required to account for NULL, so in that case we avoid calling
4166 		 * converter and just set datum1 to zeroed representation (to be
4167 		 * consistent, and to support cheap inequality tests for NULL
4168 		 * abbreviated keys).
4169 		 */
4170 		stup->datum1 = original;
4171 	}
4172 	else if (!consider_abort_common(state))
4173 	{
4174 		/* Store abbreviated key representation */
4175 		stup->datum1 = state->sortKeys->abbrev_converter(original,
4176 														 state->sortKeys);
4177 	}
4178 	else
4179 	{
4180 		/* Abort abbreviation */
4181 		int			i;
4182 
4183 		stup->datum1 = original;
4184 
4185 		/*
4186 		 * Set state to be consistent with never trying abbreviation.
4187 		 *
4188 		 * Alter datum1 representation in already-copied tuples, so as to
4189 		 * ensure a consistent representation (current tuple was just
4190 		 * handled).  It does not matter if some dumped tuples are already
4191 		 * sorted on tape, since serialized tuples lack abbreviated keys
4192 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
4193 		 */
4194 		for (i = 0; i < state->memtupcount; i++)
4195 		{
4196 			SortTuple  *mtup = &state->memtuples[i];
4197 
4198 			tuple = (IndexTuple) mtup->tuple;
4199 			mtup->datum1 = index_getattr(tuple,
4200 										 1,
4201 										 RelationGetDescr(state->indexRel),
4202 										 &mtup->isnull1);
4203 		}
4204 	}
4205 }
4206 
4207 static void
writetup_index(Tuplesortstate * state,int tapenum,SortTuple * stup)4208 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4209 {
4210 	IndexTuple	tuple = (IndexTuple) stup->tuple;
4211 	unsigned int tuplen;
4212 
4213 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4214 	LogicalTapeWrite(state->tapeset, tapenum,
4215 					 (void *) &tuplen, sizeof(tuplen));
4216 	LogicalTapeWrite(state->tapeset, tapenum,
4217 					 (void *) tuple, IndexTupleSize(tuple));
4218 	if (state->randomAccess)	/* need trailing length word? */
4219 		LogicalTapeWrite(state->tapeset, tapenum,
4220 						 (void *) &tuplen, sizeof(tuplen));
4221 
4222 	if (!state->slabAllocatorUsed)
4223 	{
4224 		FREEMEM(state, GetMemoryChunkSpace(tuple));
4225 		pfree(tuple);
4226 	}
4227 }
4228 
4229 static void
readtup_index(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4230 readtup_index(Tuplesortstate *state, SortTuple *stup,
4231 			  int tapenum, unsigned int len)
4232 {
4233 	unsigned int tuplen = len - sizeof(unsigned int);
4234 	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tuplen);
4235 
4236 	LogicalTapeReadExact(state->tapeset, tapenum,
4237 						 tuple, tuplen);
4238 	if (state->randomAccess)	/* need trailing length word? */
4239 		LogicalTapeReadExact(state->tapeset, tapenum,
4240 							 &tuplen, sizeof(tuplen));
4241 	stup->tuple = (void *) tuple;
4242 	/* set up first-column key value */
4243 	stup->datum1 = index_getattr(tuple,
4244 								 1,
4245 								 RelationGetDescr(state->indexRel),
4246 								 &stup->isnull1);
4247 }
4248 
4249 /*
4250  * Routines specialized for DatumTuple case
4251  */
4252 
4253 static int
comparetup_datum(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4254 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4255 {
4256 	int			compare;
4257 
4258 	compare = ApplySortComparator(a->datum1, a->isnull1,
4259 								  b->datum1, b->isnull1,
4260 								  state->sortKeys);
4261 	if (compare != 0)
4262 		return compare;
4263 
4264 	/* if we have abbreviations, then "tuple" has the original value */
4265 
4266 	if (state->sortKeys->abbrev_converter)
4267 		compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4268 												PointerGetDatum(b->tuple), b->isnull1,
4269 												state->sortKeys);
4270 
4271 	return compare;
4272 }
4273 
4274 static void
copytup_datum(Tuplesortstate * state,SortTuple * stup,void * tup)4275 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4276 {
4277 	/* Not currently needed */
4278 	elog(ERROR, "copytup_datum() should not be called");
4279 }
4280 
4281 static void
writetup_datum(Tuplesortstate * state,int tapenum,SortTuple * stup)4282 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4283 {
4284 	void	   *waddr;
4285 	unsigned int tuplen;
4286 	unsigned int writtenlen;
4287 
4288 	if (stup->isnull1)
4289 	{
4290 		waddr = NULL;
4291 		tuplen = 0;
4292 	}
4293 	else if (!state->tuples)
4294 	{
4295 		waddr = &stup->datum1;
4296 		tuplen = sizeof(Datum);
4297 	}
4298 	else
4299 	{
4300 		waddr = stup->tuple;
4301 		tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4302 		Assert(tuplen != 0);
4303 	}
4304 
4305 	writtenlen = tuplen + sizeof(unsigned int);
4306 
4307 	LogicalTapeWrite(state->tapeset, tapenum,
4308 					 (void *) &writtenlen, sizeof(writtenlen));
4309 	LogicalTapeWrite(state->tapeset, tapenum,
4310 					 waddr, tuplen);
4311 	if (state->randomAccess)	/* need trailing length word? */
4312 		LogicalTapeWrite(state->tapeset, tapenum,
4313 						 (void *) &writtenlen, sizeof(writtenlen));
4314 
4315 	if (!state->slabAllocatorUsed && stup->tuple)
4316 	{
4317 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4318 		pfree(stup->tuple);
4319 	}
4320 }
4321 
4322 static void
readtup_datum(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4323 readtup_datum(Tuplesortstate *state, SortTuple *stup,
4324 			  int tapenum, unsigned int len)
4325 {
4326 	unsigned int tuplen = len - sizeof(unsigned int);
4327 
4328 	if (tuplen == 0)
4329 	{
4330 		/* it's NULL */
4331 		stup->datum1 = (Datum) 0;
4332 		stup->isnull1 = true;
4333 		stup->tuple = NULL;
4334 	}
4335 	else if (!state->tuples)
4336 	{
4337 		Assert(tuplen == sizeof(Datum));
4338 		LogicalTapeReadExact(state->tapeset, tapenum,
4339 							 &stup->datum1, tuplen);
4340 		stup->isnull1 = false;
4341 		stup->tuple = NULL;
4342 	}
4343 	else
4344 	{
4345 		void	   *raddr = readtup_alloc(state, tuplen);
4346 
4347 		LogicalTapeReadExact(state->tapeset, tapenum,
4348 							 raddr, tuplen);
4349 		stup->datum1 = PointerGetDatum(raddr);
4350 		stup->isnull1 = false;
4351 		stup->tuple = raddr;
4352 	}
4353 
4354 	if (state->randomAccess)	/* need trailing length word? */
4355 		LogicalTapeReadExact(state->tapeset, tapenum,
4356 							 &tuplen, sizeof(tuplen));
4357 }
4358 
4359 /*
4360  * Parallel sort routines
4361  */
4362 
4363 /*
4364  * tuplesort_estimate_shared - estimate required shared memory allocation
4365  *
4366  * nWorkers is an estimate of the number of workers (it's the number that
4367  * will be requested).
4368  */
4369 Size
tuplesort_estimate_shared(int nWorkers)4370 tuplesort_estimate_shared(int nWorkers)
4371 {
4372 	Size		tapesSize;
4373 
4374 	Assert(nWorkers > 0);
4375 
4376 	/* Make sure that BufFile shared state is MAXALIGN'd */
4377 	tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4378 	tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4379 
4380 	return tapesSize;
4381 }
4382 
4383 /*
4384  * tuplesort_initialize_shared - initialize shared tuplesort state
4385  *
4386  * Must be called from leader process before workers are launched, to
4387  * establish state needed up-front for worker tuplesortstates.  nWorkers
4388  * should match the argument passed to tuplesort_estimate_shared().
4389  */
4390 void
tuplesort_initialize_shared(Sharedsort * shared,int nWorkers,dsm_segment * seg)4391 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4392 {
4393 	int			i;
4394 
4395 	Assert(nWorkers > 0);
4396 
4397 	SpinLockInit(&shared->mutex);
4398 	shared->currentWorker = 0;
4399 	shared->workersFinished = 0;
4400 	SharedFileSetInit(&shared->fileset, seg);
4401 	shared->nTapes = nWorkers;
4402 	for (i = 0; i < nWorkers; i++)
4403 	{
4404 		shared->tapes[i].firstblocknumber = 0L;
4405 	}
4406 }
4407 
4408 /*
4409  * tuplesort_attach_shared - attach to shared tuplesort state
4410  *
4411  * Must be called by all worker processes.
4412  */
4413 void
tuplesort_attach_shared(Sharedsort * shared,dsm_segment * seg)4414 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4415 {
4416 	/* Attach to SharedFileSet */
4417 	SharedFileSetAttach(&shared->fileset, seg);
4418 }
4419 
4420 /*
4421  * worker_get_identifier - Assign and return ordinal identifier for worker
4422  *
4423  * The order in which these are assigned is not well defined, and should not
4424  * matter; worker numbers across parallel sort participants need only be
4425  * distinct and gapless.  logtape.c requires this.
4426  *
4427  * Note that the identifiers assigned from here have no relation to
4428  * ParallelWorkerNumber number, to avoid making any assumption about
4429  * caller's requirements.  However, we do follow the ParallelWorkerNumber
4430  * convention of representing a non-worker with worker number -1.  This
4431  * includes the leader, as well as serial Tuplesort processes.
4432  */
4433 static int
worker_get_identifier(Tuplesortstate * state)4434 worker_get_identifier(Tuplesortstate *state)
4435 {
4436 	Sharedsort *shared = state->shared;
4437 	int			worker;
4438 
4439 	Assert(WORKER(state));
4440 
4441 	SpinLockAcquire(&shared->mutex);
4442 	worker = shared->currentWorker++;
4443 	SpinLockRelease(&shared->mutex);
4444 
4445 	return worker;
4446 }
4447 
4448 /*
4449  * worker_freeze_result_tape - freeze worker's result tape for leader
4450  *
4451  * This is called by workers just after the result tape has been determined,
4452  * instead of calling LogicalTapeFreeze() directly.  They do so because
4453  * workers require a few additional steps over similar serial
4454  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
4455  * steps are around freeing now unneeded resources, and representing to
4456  * leader that worker's input run is available for its merge.
4457  *
4458  * There should only be one final output run for each worker, which consists
4459  * of all tuples that were originally input into worker.
4460  */
4461 static void
worker_freeze_result_tape(Tuplesortstate * state)4462 worker_freeze_result_tape(Tuplesortstate *state)
4463 {
4464 	Sharedsort *shared = state->shared;
4465 	TapeShare	output;
4466 
4467 	Assert(WORKER(state));
4468 	Assert(state->result_tape != -1);
4469 	Assert(state->memtupcount == 0);
4470 
4471 	/*
4472 	 * Free most remaining memory, in case caller is sensitive to our holding
4473 	 * on to it.  memtuples may not be a tiny merge heap at this point.
4474 	 */
4475 	pfree(state->memtuples);
4476 	/* Be tidy */
4477 	state->memtuples = NULL;
4478 	state->memtupsize = 0;
4479 
4480 	/*
4481 	 * Parallel worker requires result tape metadata, which is to be stored in
4482 	 * shared memory for leader
4483 	 */
4484 	LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4485 
4486 	/* Store properties of output tape, and update finished worker count */
4487 	SpinLockAcquire(&shared->mutex);
4488 	shared->tapes[state->worker] = output;
4489 	shared->workersFinished++;
4490 	SpinLockRelease(&shared->mutex);
4491 }
4492 
4493 /*
4494  * worker_nomergeruns - dump memtuples in worker, without merging
4495  *
4496  * This called as an alternative to mergeruns() with a worker when no
4497  * merging is required.
4498  */
4499 static void
worker_nomergeruns(Tuplesortstate * state)4500 worker_nomergeruns(Tuplesortstate *state)
4501 {
4502 	Assert(WORKER(state));
4503 	Assert(state->result_tape == -1);
4504 
4505 	state->result_tape = state->tp_tapenum[state->destTape];
4506 	worker_freeze_result_tape(state);
4507 }
4508 
4509 /*
4510  * leader_takeover_tapes - create tapeset for leader from worker tapes
4511  *
4512  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
4513  * sorting has occurred in workers, all of which must have already returned
4514  * from tuplesort_performsort().
4515  *
4516  * When this returns, leader process is left in a state that is virtually
4517  * indistinguishable from it having generated runs as a serial external sort
4518  * might have.
4519  */
4520 static void
leader_takeover_tapes(Tuplesortstate * state)4521 leader_takeover_tapes(Tuplesortstate *state)
4522 {
4523 	Sharedsort *shared = state->shared;
4524 	int			nParticipants = state->nParticipants;
4525 	int			workersFinished;
4526 	int			j;
4527 
4528 	Assert(LEADER(state));
4529 	Assert(nParticipants >= 1);
4530 
4531 	SpinLockAcquire(&shared->mutex);
4532 	workersFinished = shared->workersFinished;
4533 	SpinLockRelease(&shared->mutex);
4534 
4535 	if (nParticipants != workersFinished)
4536 		elog(ERROR, "cannot take over tapes before all workers finish");
4537 
4538 	/*
4539 	 * Create the tapeset from worker tapes, including a leader-owned tape at
4540 	 * the end.  Parallel workers are far more expensive than logical tapes,
4541 	 * so the number of tapes allocated here should never be excessive.
4542 	 *
4543 	 * We still have a leader tape, though it's not possible to write to it
4544 	 * due to restrictions in the shared fileset infrastructure used by
4545 	 * logtape.c.  It will never be written to in practice because
4546 	 * randomAccess is disallowed for parallel sorts.
4547 	 */
4548 	inittapestate(state, nParticipants + 1);
4549 	state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
4550 										  &shared->fileset, state->worker);
4551 
4552 	/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4553 	state->currentRun = nParticipants;
4554 
4555 	/*
4556 	 * Initialize variables of Algorithm D to be consistent with runs from
4557 	 * workers having been generated in the leader.
4558 	 *
4559 	 * There will always be exactly 1 run per worker, and exactly one input
4560 	 * tape per run, because workers always output exactly 1 run, even when
4561 	 * there were no input tuples for workers to sort.
4562 	 */
4563 	for (j = 0; j < state->maxTapes; j++)
4564 	{
4565 		/* One real run; no dummy runs for worker tapes */
4566 		state->tp_fib[j] = 1;
4567 		state->tp_runs[j] = 1;
4568 		state->tp_dummy[j] = 0;
4569 		state->tp_tapenum[j] = j;
4570 	}
4571 	/* Leader tape gets one dummy run, and no real runs */
4572 	state->tp_fib[state->tapeRange] = 0;
4573 	state->tp_runs[state->tapeRange] = 0;
4574 	state->tp_dummy[state->tapeRange] = 1;
4575 
4576 	state->Level = 1;
4577 	state->destTape = 0;
4578 
4579 	state->status = TSS_BUILDRUNS;
4580 }
4581 
4582 /*
4583  * Convenience routine to free a tuple previously loaded into sort memory
4584  */
4585 static void
free_sort_tuple(Tuplesortstate * state,SortTuple * stup)4586 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4587 {
4588 	if (stup->tuple)
4589 	{
4590 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4591 		pfree(stup->tuple);
4592 		stup->tuple = NULL;
4593 	}
4594 }
4595