1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *	  Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  Historically, we divided the input into sorted runs
15  * using replacement selection, in the form of a priority tree implemented
16  * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17  * quicksort for run generation.  We merge the runs using polyphase merge,
18  * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
19  * implemented by logtape.c, which avoids space wastage by recycling disk
20  * space as soon as each block is read from its "tape".
21  *
22  * The approximate amount of memory allowed for any one sort operation
23  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
24  * we absorb tuples and simply store them in an unsorted array as long as
25  * we haven't exceeded workMem.  If we reach the end of the input without
26  * exceeding workMem, we sort the array using qsort() and subsequently return
27  * tuples just by scanning the tuple array sequentially.  If we do exceed
28  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29  * When tuples are dumped in batch after quicksorting, we begin a new run
30  * with a new output tape (selected per Algorithm D).  After the end of the
31  * input is reached, we dump out remaining tuples in memory into a final run,
32  * then merge the runs using Algorithm D.
33  *
34  * When merging runs, we use a heap containing just the frontmost tuple from
35  * each source run; we repeatedly output the smallest tuple and replace it
36  * with the next tuple from its source tape (if any).  When the heap empties,
37  * the merge is complete.  The basic merge algorithm thus needs very little
38  * memory --- only M tuples for an M-way merge, and M is constrained to a
39  * small number.  However, we can still make good use of our full workMem
40  * allocation by pre-reading additional blocks from each source tape.  Without
41  * prereading, our access pattern to the temporary file would be very erratic;
42  * on average we'd read one block from each of M source tapes during the same
43  * time that we're writing M blocks to the output tape, so there is no
44  * sequentiality of access at all, defeating the read-ahead methods used by
45  * most Unix kernels.  Worse, the output tape gets written into a very random
46  * sequence of blocks of the temp file, ensuring that things will be even
47  * worse when it comes time to read that tape.  A straightforward merge pass
48  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
49  * by prereading from each source tape sequentially, loading about workMem/M
50  * bytes from each tape in turn, and making the sequential blocks immediately
51  * available for reuse.  This approach helps to localize both read and write
52  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
53  * much memory to use for the buffers.
54  *
55  * When the caller requests random access to the sort result, we form
56  * the final sorted run on a logical tape which is then "frozen", so
57  * that we can access it randomly.  When the caller does not need random
58  * access, we return from tuplesort_performsort() as soon as we are down
59  * to one run per logical tape.  The final merge is then performed
60  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61  * saves one cycle of writing all the data out to disk and reading it in.
62  *
63  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
66  * tape drives are expensive beasts, and in particular that there will always
67  * be many more runs than tape drives.  In our implementation a "tape drive"
68  * doesn't cost much more than a few Kb of memory buffers, so we can afford
69  * to have lots of them.  In particular, if we can have as many tape drives
70  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
71  * code we determine the number of tapes M on the basis of workMem: we want
72  * workMem/M to be large enough that we read a fair amount of data each time
73  * we preread from a tape, so as to maintain the locality of access described
74  * above.  Nonetheless, with large workMem we can have many tapes (but not
75  * too many -- see the comments in tuplesort_merge_order).
76  *
77  * This module supports parallel sorting.  Parallel sorts involve coordination
78  * among one or more worker processes, and a leader process, each with its own
79  * tuplesort state.  The leader process (or, more accurately, the
80  * Tuplesortstate associated with a leader process) creates a full tapeset
81  * consisting of worker tapes with one run to merge; a run for every
82  * worker process.  This is then merged.  Worker processes are guaranteed to
83  * produce exactly one output run from their partial input.
84  *
85  *
86  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
87  * Portions Copyright (c) 1994, Regents of the University of California
88  *
89  * IDENTIFICATION
90  *	  src/backend/utils/sort/tuplesort.c
91  *
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include <limits.h>
98 
99 #include "access/hash.h"
100 #include "access/htup_details.h"
101 #include "access/nbtree.h"
102 #include "catalog/index.h"
103 #include "catalog/pg_am.h"
104 #include "commands/tablespace.h"
105 #include "executor/executor.h"
106 #include "miscadmin.h"
107 #include "pg_trace.h"
108 #include "utils/datum.h"
109 #include "utils/logtape.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 #include "utils/pg_rusage.h"
113 #include "utils/rel.h"
114 #include "utils/sortsupport.h"
115 #include "utils/tuplesort.h"
116 
117 
118 /* sort-type codes for sort__start probes */
119 #define HEAP_SORT		0
120 #define INDEX_SORT		1
121 #define DATUM_SORT		2
122 #define CLUSTER_SORT	3
123 
124 /* Sort parallel code from state for sort__start probes */
125 #define PARALLEL_SORT(state)	((state)->shared == NULL ? 0 : \
126 								 (state)->worker >= 0 ? 1 : 2)
127 
128 /*
129  * Initial size of memtuples array.  We're trying to select this size so that
130  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
131  * allocation might possibly be lowered.  However, we don't consider array sizes
132  * less than 1024.
133  *
134  */
135 #define INITIAL_MEMTUPSIZE Max(1024, \
136 	ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
137 
138 /* GUC variables */
139 #ifdef TRACE_SORT
140 bool		trace_sort = false;
141 #endif
142 
143 #ifdef DEBUG_BOUNDED_SORT
144 bool		optimize_bounded_sort = true;
145 #endif
146 
147 
148 /*
149  * The objects we actually sort are SortTuple structs.  These contain
150  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
151  * which is a separate palloc chunk --- we assume it is just one chunk and
152  * can be freed by a simple pfree() (except during merge, when we use a
153  * simple slab allocator).  SortTuples also contain the tuple's first key
154  * column in Datum/nullflag format, and a source/input tape number that
155  * tracks which tape each heap element/slot belongs to during merging.
156  *
157  * Storing the first key column lets us save heap_getattr or index_getattr
158  * calls during tuple comparisons.  We could extract and save all the key
159  * columns not just the first, but this would increase code complexity and
160  * overhead, and wouldn't actually save any comparison cycles in the common
161  * case where the first key determines the comparison result.  Note that
162  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
163  *
164  * There is one special case: when the sort support infrastructure provides an
165  * "abbreviated key" representation, where the key is (typically) a pass by
166  * value proxy for a pass by reference type.  In this case, the abbreviated key
167  * is stored in datum1 in place of the actual first key column.
168  *
169  * When sorting single Datums, the data value is represented directly by
170  * datum1/isnull1 for pass by value types (or null values).  If the datatype is
171  * pass-by-reference and isnull1 is false, then "tuple" points to a separately
172  * palloc'd data value, otherwise "tuple" is NULL.  The value of datum1 is then
173  * either the same pointer as "tuple", or is an abbreviated key value as
174  * described above.  Accordingly, "tuple" is always used in preference to
175  * datum1 as the authoritative value for pass-by-reference cases.
176  */
177 typedef struct
178 {
179 	void	   *tuple;			/* the tuple itself */
180 	Datum		datum1;			/* value of first key column */
181 	bool		isnull1;		/* is first key column NULL? */
182 	int			srctape;		/* source tape number */
183 } SortTuple;
184 
185 /*
186  * During merge, we use a pre-allocated set of fixed-size slots to hold
187  * tuples.  To avoid palloc/pfree overhead.
188  *
189  * Merge doesn't require a lot of memory, so we can afford to waste some,
190  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
191  * palloc() overhead is not significant anymore.
192  *
193  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
194  * slot holds a tuple.
195  */
196 #define SLAB_SLOT_SIZE 1024
197 
198 typedef union SlabSlot
199 {
200 	union SlabSlot *nextfree;
201 	char		buffer[SLAB_SLOT_SIZE];
202 } SlabSlot;
203 
204 /*
205  * Possible states of a Tuplesort object.  These denote the states that
206  * persist between calls of Tuplesort routines.
207  */
208 typedef enum
209 {
210 	TSS_INITIAL,				/* Loading tuples; still within memory limit */
211 	TSS_BOUNDED,				/* Loading tuples into bounded-size heap */
212 	TSS_BUILDRUNS,				/* Loading tuples; writing to tape */
213 	TSS_SORTEDINMEM,			/* Sort completed entirely in memory */
214 	TSS_SORTEDONTAPE,			/* Sort completed, final run is on tape */
215 	TSS_FINALMERGE				/* Performing final merge on-the-fly */
216 } TupSortStatus;
217 
218 /*
219  * Parameters for calculation of number of tapes to use --- see inittapes()
220  * and tuplesort_merge_order().
221  *
222  * In this calculation we assume that each tape will cost us about 1 blocks
223  * worth of buffer space.  This ignores the overhead of all the other data
224  * structures needed for each tape, but it's probably close enough.
225  *
226  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
227  * tape during a preread cycle (see discussion at top of file).
228  */
229 #define MINORDER		6		/* minimum merge order */
230 #define MAXORDER		500		/* maximum merge order */
231 #define TAPE_BUFFER_OVERHEAD		BLCKSZ
232 #define MERGE_BUFFER_SIZE			(BLCKSZ * 32)
233 
234 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
235 									Tuplesortstate *state);
236 
237 /*
238  * Private state of a Tuplesort operation.
239  */
240 struct Tuplesortstate
241 {
242 	TupSortStatus status;		/* enumerated value as shown above */
243 	int			nKeys;			/* number of columns in sort key */
244 	bool		randomAccess;	/* did caller request random access? */
245 	bool		bounded;		/* did caller specify a maximum number of
246 								 * tuples to return? */
247 	bool		boundUsed;		/* true if we made use of a bounded heap */
248 	int			bound;			/* if bounded, the maximum number of tuples */
249 	bool		tuples;			/* Can SortTuple.tuple ever be set? */
250 	int64		availMem;		/* remaining memory available, in bytes */
251 	int64		allowedMem;		/* total memory allowed, in bytes */
252 	int			maxTapes;		/* number of tapes (Knuth's T) */
253 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
254 	int64		maxSpace;		/* maximum amount of space occupied among sort
255 								 * of groups, either in-memory or on-disk */
256 	bool		isMaxSpaceDisk; /* true when maxSpace is value for on-disk
257 								 * space, false when it's value for in-memory
258 								 * space */
259 	TupSortStatus maxSpaceStatus;	/* sort status when maxSpace was reached */
260 	MemoryContext maincontext;	/* memory context for tuple sort metadata that
261 								 * persists across multiple batches */
262 	MemoryContext sortcontext;	/* memory context holding most sort data */
263 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
264 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
265 
266 	/*
267 	 * These function pointers decouple the routines that must know what kind
268 	 * of tuple we are sorting from the routines that don't need to know it.
269 	 * They are set up by the tuplesort_begin_xxx routines.
270 	 *
271 	 * Function to compare two tuples; result is per qsort() convention, ie:
272 	 * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
273 	 * qsort_arg_comparator.
274 	 */
275 	SortTupleComparator comparetup;
276 
277 	/*
278 	 * Function to copy a supplied input tuple into palloc'd space and set up
279 	 * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
280 	 * state->availMem must be decreased by the amount of space used for the
281 	 * tuple copy (note the SortTuple struct itself is not counted).
282 	 */
283 	void		(*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
284 
285 	/*
286 	 * Function to write a stored tuple onto tape.  The representation of the
287 	 * tuple on tape need not be the same as it is in memory; requirements on
288 	 * the tape representation are given below.  Unless the slab allocator is
289 	 * used, after writing the tuple, pfree() the out-of-line data (not the
290 	 * SortTuple struct!), and increase state->availMem by the amount of
291 	 * memory space thereby released.
292 	 */
293 	void		(*writetup) (Tuplesortstate *state, int tapenum,
294 							 SortTuple *stup);
295 
296 	/*
297 	 * Function to read a stored tuple from tape back into memory. 'len' is
298 	 * the already-read length of the stored tuple.  The tuple is allocated
299 	 * from the slab memory arena, or is palloc'd, see readtup_alloc().
300 	 */
301 	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
302 							int tapenum, unsigned int len);
303 
304 	/*
305 	 * This array holds the tuples now in sort memory.  If we are in state
306 	 * INITIAL, the tuples are in no particular order; if we are in state
307 	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
308 	 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
309 	 * H.  In state SORTEDONTAPE, the array is not used.
310 	 */
311 	SortTuple  *memtuples;		/* array of SortTuple structs */
312 	int			memtupcount;	/* number of tuples currently present */
313 	int			memtupsize;		/* allocated length of memtuples array */
314 	bool		growmemtuples;	/* memtuples' growth still underway? */
315 
316 	/*
317 	 * Memory for tuples is sometimes allocated using a simple slab allocator,
318 	 * rather than with palloc().  Currently, we switch to slab allocation
319 	 * when we start merging.  Merging only needs to keep a small, fixed
320 	 * number of tuples in memory at any time, so we can avoid the
321 	 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
322 	 * to hold the tuples.
323 	 *
324 	 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
325 	 * slots.  The allocation is sized to have one slot per tape, plus one
326 	 * additional slot.  We need that many slots to hold all the tuples kept
327 	 * in the heap during merge, plus the one we have last returned from the
328 	 * sort, with tuplesort_gettuple.
329 	 *
330 	 * Initially, all the slots are kept in a linked list of free slots.  When
331 	 * a tuple is read from a tape, it is put to the next available slot, if
332 	 * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
333 	 * instead.
334 	 *
335 	 * When we're done processing a tuple, we return the slot back to the free
336 	 * list, or pfree() if it was palloc'd.  We know that a tuple was
337 	 * allocated from the slab, if its pointer value is between
338 	 * slabMemoryBegin and -End.
339 	 *
340 	 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
341 	 * tracking memory usage is not used.
342 	 */
343 	bool		slabAllocatorUsed;
344 
345 	char	   *slabMemoryBegin;	/* beginning of slab memory arena */
346 	char	   *slabMemoryEnd;	/* end of slab memory arena */
347 	SlabSlot   *slabFreeHead;	/* head of free list */
348 
349 	/* Buffer size to use for reading input tapes, during merge. */
350 	size_t		read_buffer_size;
351 
352 	/*
353 	 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
354 	 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
355 	 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
356 	 * recycle the memory on next gettuple call.
357 	 */
358 	void	   *lastReturnedTuple;
359 
360 	/*
361 	 * While building initial runs, this is the current output run number.
362 	 * Afterwards, it is the number of initial runs we made.
363 	 */
364 	int			currentRun;
365 
366 	/*
367 	 * Unless otherwise noted, all pointer variables below are pointers to
368 	 * arrays of length maxTapes, holding per-tape data.
369 	 */
370 
371 	/*
372 	 * This variable is only used during merge passes.  mergeactive[i] is true
373 	 * if we are reading an input run from (actual) tape number i and have not
374 	 * yet exhausted that run.
375 	 */
376 	bool	   *mergeactive;	/* active input run source? */
377 
378 	/*
379 	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
380 	 * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
381 	 * "logical" and "actual" tape numbers straight!
382 	 */
383 	int			Level;			/* Knuth's l */
384 	int			destTape;		/* current output tape (Knuth's j, less 1) */
385 	int		   *tp_fib;			/* Target Fibonacci run counts (A[]) */
386 	int		   *tp_runs;		/* # of real runs on each tape */
387 	int		   *tp_dummy;		/* # of dummy runs for each tape (D[]) */
388 	int		   *tp_tapenum;		/* Actual tape numbers (TAPE[]) */
389 	int			activeTapes;	/* # of active input tapes in merge pass */
390 
391 	/*
392 	 * These variables are used after completion of sorting to keep track of
393 	 * the next tuple to return.  (In the tape case, the tape's current read
394 	 * position is also critical state.)
395 	 */
396 	int			result_tape;	/* actual tape number of finished output */
397 	int			current;		/* array index (only used if SORTEDINMEM) */
398 	bool		eof_reached;	/* reached EOF (needed for cursors) */
399 
400 	/* markpos_xxx holds marked position for mark and restore */
401 	long		markpos_block;	/* tape block# (only used if SORTEDONTAPE) */
402 	int			markpos_offset; /* saved "current", or offset in tape block */
403 	bool		markpos_eof;	/* saved "eof_reached" */
404 
405 	/*
406 	 * These variables are used during parallel sorting.
407 	 *
408 	 * worker is our worker identifier.  Follows the general convention that
409 	 * -1 value relates to a leader tuplesort, and values >= 0 worker
410 	 * tuplesorts. (-1 can also be a serial tuplesort.)
411 	 *
412 	 * shared is mutable shared memory state, which is used to coordinate
413 	 * parallel sorts.
414 	 *
415 	 * nParticipants is the number of worker Tuplesortstates known by the
416 	 * leader to have actually been launched, which implies that they must
417 	 * finish a run leader can merge.  Typically includes a worker state held
418 	 * by the leader process itself.  Set in the leader Tuplesortstate only.
419 	 */
420 	int			worker;
421 	Sharedsort *shared;
422 	int			nParticipants;
423 
424 	/*
425 	 * The sortKeys variable is used by every case other than the hash index
426 	 * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
427 	 * MinimalTuple and CLUSTER routines, though.
428 	 */
429 	TupleDesc	tupDesc;
430 	SortSupport sortKeys;		/* array of length nKeys */
431 
432 	/*
433 	 * This variable is shared by the single-key MinimalTuple case and the
434 	 * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
435 	 */
436 	SortSupport onlyKey;
437 
438 	/*
439 	 * Additional state for managing "abbreviated key" sortsupport routines
440 	 * (which currently may be used by all cases except the hash index case).
441 	 * Tracks the intervals at which the optimization's effectiveness is
442 	 * tested.
443 	 */
444 	int64		abbrevNext;		/* Tuple # at which to next check
445 								 * applicability */
446 
447 	/*
448 	 * These variables are specific to the CLUSTER case; they are set by
449 	 * tuplesort_begin_cluster.
450 	 */
451 	IndexInfo  *indexInfo;		/* info about index being used for reference */
452 	EState	   *estate;			/* for evaluating index expressions */
453 
454 	/*
455 	 * These variables are specific to the IndexTuple case; they are set by
456 	 * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
457 	 */
458 	Relation	heapRel;		/* table the index is being built on */
459 	Relation	indexRel;		/* index being built */
460 
461 	/* These are specific to the index_btree subcase: */
462 	bool		enforceUnique;	/* complain if we find duplicate tuples */
463 
464 	/* These are specific to the index_hash subcase: */
465 	uint32		high_mask;		/* masks for sortable part of hash code */
466 	uint32		low_mask;
467 	uint32		max_buckets;
468 
469 	/*
470 	 * These variables are specific to the Datum case; they are set by
471 	 * tuplesort_begin_datum and used only by the DatumTuple routines.
472 	 */
473 	Oid			datumType;
474 	/* we need typelen in order to know how to copy the Datums. */
475 	int			datumTypeLen;
476 
477 	/*
478 	 * Resource snapshot for time of sort start.
479 	 */
480 #ifdef TRACE_SORT
481 	PGRUsage	ru_start;
482 #endif
483 };
484 
485 /*
486  * Private mutable state of tuplesort-parallel-operation.  This is allocated
487  * in shared memory.
488  */
489 struct Sharedsort
490 {
491 	/* mutex protects all fields prior to tapes */
492 	slock_t		mutex;
493 
494 	/*
495 	 * currentWorker generates ordinal identifier numbers for parallel sort
496 	 * workers.  These start from 0, and are always gapless.
497 	 *
498 	 * Workers increment workersFinished to indicate having finished.  If this
499 	 * is equal to state.nParticipants within the leader, leader is ready to
500 	 * merge worker runs.
501 	 */
502 	int			currentWorker;
503 	int			workersFinished;
504 
505 	/* Temporary file space */
506 	SharedFileSet fileset;
507 
508 	/* Size of tapes flexible array */
509 	int			nTapes;
510 
511 	/*
512 	 * Tapes array used by workers to report back information needed by the
513 	 * leader to concatenate all worker tapes into one for merging
514 	 */
515 	TapeShare	tapes[FLEXIBLE_ARRAY_MEMBER];
516 };
517 
518 /*
519  * Is the given tuple allocated from the slab memory arena?
520  */
521 #define IS_SLAB_SLOT(state, tuple) \
522 	((char *) (tuple) >= (state)->slabMemoryBegin && \
523 	 (char *) (tuple) < (state)->slabMemoryEnd)
524 
525 /*
526  * Return the given tuple to the slab memory free list, or free it
527  * if it was palloc'd.
528  */
529 #define RELEASE_SLAB_SLOT(state, tuple) \
530 	do { \
531 		SlabSlot *buf = (SlabSlot *) tuple; \
532 		\
533 		if (IS_SLAB_SLOT((state), buf)) \
534 		{ \
535 			buf->nextfree = (state)->slabFreeHead; \
536 			(state)->slabFreeHead = buf; \
537 		} else \
538 			pfree(buf); \
539 	} while(0)
540 
541 #define COMPARETUP(state,a,b)	((*(state)->comparetup) (a, b, state))
542 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
543 #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
544 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
545 #define LACKMEM(state)		((state)->availMem < 0 && !(state)->slabAllocatorUsed)
546 #define USEMEM(state,amt)	((state)->availMem -= (amt))
547 #define FREEMEM(state,amt)	((state)->availMem += (amt))
548 #define SERIAL(state)		((state)->shared == NULL)
549 #define WORKER(state)		((state)->shared && (state)->worker != -1)
550 #define LEADER(state)		((state)->shared && (state)->worker == -1)
551 
552 /*
553  * NOTES about on-tape representation of tuples:
554  *
555  * We require the first "unsigned int" of a stored tuple to be the total size
556  * on-tape of the tuple, including itself (so it is never zero; an all-zero
557  * unsigned int is used to delimit runs).  The remainder of the stored tuple
558  * may or may not match the in-memory representation of the tuple ---
559  * any conversion needed is the job of the writetup and readtup routines.
560  *
561  * If state->randomAccess is true, then the stored representation of the
562  * tuple must be followed by another "unsigned int" that is a copy of the
563  * length --- so the total tape space used is actually sizeof(unsigned int)
564  * more than the stored length value.  This allows read-backwards.  When
565  * randomAccess is not true, the write/read routines may omit the extra
566  * length word.
567  *
568  * writetup is expected to write both length words as well as the tuple
569  * data.  When readtup is called, the tape is positioned just after the
570  * front length word; readtup must read the tuple data and advance past
571  * the back length word (if present).
572  *
573  * The write/read routines can make use of the tuple description data
574  * stored in the Tuplesortstate record, if needed.  They are also expected
575  * to adjust state->availMem by the amount of memory space (not tape space!)
576  * released or consumed.  There is no error return from either writetup
577  * or readtup; they should ereport() on failure.
578  *
579  *
580  * NOTES about memory consumption calculations:
581  *
582  * We count space allocated for tuples against the workMem limit, plus
583  * the space used by the variable-size memtuples array.  Fixed-size space
584  * is not counted; it's small enough to not be interesting.
585  *
586  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
587  * rather than the originally-requested size.  This is important since
588  * palloc can add substantial overhead.  It's not a complete answer since
589  * we won't count any wasted space in palloc allocation blocks, but it's
590  * a lot better than what we were doing before 7.3.  As of 9.6, a
591  * separate memory context is used for caller passed tuples.  Resetting
592  * it at certain key increments significantly ameliorates fragmentation.
593  * Note that this places a responsibility on copytup routines to use the
594  * correct memory context for these tuples (and to not use the reset
595  * context for anything whose lifetime needs to span multiple external
596  * sort runs).  readtup routines use the slab allocator (they cannot use
597  * the reset context because it gets deleted at the point that merging
598  * begins).
599  */
600 
601 /* When using this macro, beware of double evaluation of len */
602 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
603 	do { \
604 		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
605 			elog(ERROR, "unexpected end of data"); \
606 	} while(0)
607 
608 
609 static Tuplesortstate *tuplesort_begin_common(int workMem,
610 											  SortCoordinate coordinate,
611 											  bool randomAccess);
612 static void tuplesort_begin_batch(Tuplesortstate *state);
613 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
614 static bool consider_abort_common(Tuplesortstate *state);
615 static void inittapes(Tuplesortstate *state, bool mergeruns);
616 static void inittapestate(Tuplesortstate *state, int maxTapes);
617 static void selectnewtape(Tuplesortstate *state);
618 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
619 static void mergeruns(Tuplesortstate *state);
620 static void mergeonerun(Tuplesortstate *state);
621 static void beginmerge(Tuplesortstate *state);
622 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
623 static void dumptuples(Tuplesortstate *state, bool alltuples);
624 static void make_bounded_heap(Tuplesortstate *state);
625 static void sort_bounded_heap(Tuplesortstate *state);
626 static void tuplesort_sort_memtuples(Tuplesortstate *state);
627 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
628 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
629 static void tuplesort_heap_delete_top(Tuplesortstate *state);
630 static void reversedirection(Tuplesortstate *state);
631 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
632 static void markrunend(Tuplesortstate *state, int tapenum);
633 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
634 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
635 							Tuplesortstate *state);
636 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
637 static void writetup_heap(Tuplesortstate *state, int tapenum,
638 						  SortTuple *stup);
639 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
640 						 int tapenum, unsigned int len);
641 static int	comparetup_cluster(const SortTuple *a, const SortTuple *b,
642 							   Tuplesortstate *state);
643 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
644 static void writetup_cluster(Tuplesortstate *state, int tapenum,
645 							 SortTuple *stup);
646 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
647 							int tapenum, unsigned int len);
648 static int	comparetup_index_btree(const SortTuple *a, const SortTuple *b,
649 								   Tuplesortstate *state);
650 static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
651 								  Tuplesortstate *state);
652 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
653 static void writetup_index(Tuplesortstate *state, int tapenum,
654 						   SortTuple *stup);
655 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
656 						  int tapenum, unsigned int len);
657 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
658 							 Tuplesortstate *state);
659 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
660 static void writetup_datum(Tuplesortstate *state, int tapenum,
661 						   SortTuple *stup);
662 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
663 						  int tapenum, unsigned int len);
664 static int	worker_get_identifier(Tuplesortstate *state);
665 static void worker_freeze_result_tape(Tuplesortstate *state);
666 static void worker_nomergeruns(Tuplesortstate *state);
667 static void leader_takeover_tapes(Tuplesortstate *state);
668 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
669 static void tuplesort_free(Tuplesortstate *state);
670 static void tuplesort_updatemax(Tuplesortstate *state);
671 
672 /*
673  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
674  * any variant of SortTuples, using the appropriate comparetup function.
675  * qsort_ssup() is specialized for the case where the comparetup function
676  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
677  * and Datum sorts.
678  */
679 
680 #define ST_SORT qsort_tuple
681 #define ST_ELEMENT_TYPE SortTuple
682 #define ST_COMPARE_RUNTIME_POINTER
683 #define ST_COMPARE_ARG_TYPE Tuplesortstate
684 #define ST_CHECK_FOR_INTERRUPTS
685 #define ST_SCOPE static
686 #define ST_DECLARE
687 #define ST_DEFINE
688 #include "lib/sort_template.h"
689 
690 #define ST_SORT qsort_ssup
691 #define ST_ELEMENT_TYPE SortTuple
692 #define ST_COMPARE(a, b, ssup) \
693 	ApplySortComparator((a)->datum1, (a)->isnull1, \
694 						(b)->datum1, (b)->isnull1, (ssup))
695 #define ST_COMPARE_ARG_TYPE SortSupportData
696 #define ST_CHECK_FOR_INTERRUPTS
697 #define ST_SCOPE static
698 #define ST_DEFINE
699 #include "lib/sort_template.h"
700 
701 /*
702  *		tuplesort_begin_xxx
703  *
704  * Initialize for a tuple sort operation.
705  *
706  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
707  * zero or more times, then call tuplesort_performsort when all the tuples
708  * have been supplied.  After performsort, retrieve the tuples in sorted
709  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
710  * access was requested, rescan, markpos, and restorepos can also be called.)
711  * Call tuplesort_end to terminate the operation and release memory/disk space.
712  *
713  * Each variant of tuplesort_begin has a workMem parameter specifying the
714  * maximum number of kilobytes of RAM to use before spilling data to disk.
715  * (The normal value of this parameter is work_mem, but some callers use
716  * other values.)  Each variant also has a randomAccess parameter specifying
717  * whether the caller needs non-sequential access to the sort result.
718  */
719 
720 static Tuplesortstate *
tuplesort_begin_common(int workMem,SortCoordinate coordinate,bool randomAccess)721 tuplesort_begin_common(int workMem, SortCoordinate coordinate,
722 					   bool randomAccess)
723 {
724 	Tuplesortstate *state;
725 	MemoryContext maincontext;
726 	MemoryContext sortcontext;
727 	MemoryContext oldcontext;
728 
729 	/* See leader_takeover_tapes() remarks on randomAccess support */
730 	if (coordinate && randomAccess)
731 		elog(ERROR, "random access disallowed under parallel sort");
732 
733 	/*
734 	 * Memory context surviving tuplesort_reset.  This memory context holds
735 	 * data which is useful to keep while sorting multiple similar batches.
736 	 */
737 	maincontext = AllocSetContextCreate(CurrentMemoryContext,
738 										"TupleSort main",
739 										ALLOCSET_DEFAULT_SIZES);
740 
741 	/*
742 	 * Create a working memory context for one sort operation.  The content of
743 	 * this context is deleted by tuplesort_reset.
744 	 */
745 	sortcontext = AllocSetContextCreate(maincontext,
746 										"TupleSort sort",
747 										ALLOCSET_DEFAULT_SIZES);
748 
749 	/*
750 	 * Additionally a working memory context for tuples is setup in
751 	 * tuplesort_begin_batch.
752 	 */
753 
754 	/*
755 	 * Make the Tuplesortstate within the per-sortstate context.  This way, we
756 	 * don't need a separate pfree() operation for it at shutdown.
757 	 */
758 	oldcontext = MemoryContextSwitchTo(maincontext);
759 
760 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
761 
762 #ifdef TRACE_SORT
763 	if (trace_sort)
764 		pg_rusage_init(&state->ru_start);
765 #endif
766 
767 	state->randomAccess = randomAccess;
768 	state->tuples = true;
769 
770 	/*
771 	 * workMem is forced to be at least 64KB, the current minimum valid value
772 	 * for the work_mem GUC.  This is a defense against parallel sort callers
773 	 * that divide out memory among many workers in a way that leaves each
774 	 * with very little memory.
775 	 */
776 	state->allowedMem = Max(workMem, 64) * (int64) 1024;
777 	state->sortcontext = sortcontext;
778 	state->maincontext = maincontext;
779 
780 	/*
781 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
782 	 * see comments in grow_memtuples().
783 	 */
784 	state->memtupsize = INITIAL_MEMTUPSIZE;
785 	state->memtuples = NULL;
786 
787 	/*
788 	 * After all of the other non-parallel-related state, we setup all of the
789 	 * state needed for each batch.
790 	 */
791 	tuplesort_begin_batch(state);
792 
793 	/*
794 	 * Initialize parallel-related state based on coordination information
795 	 * from caller
796 	 */
797 	if (!coordinate)
798 	{
799 		/* Serial sort */
800 		state->shared = NULL;
801 		state->worker = -1;
802 		state->nParticipants = -1;
803 	}
804 	else if (coordinate->isWorker)
805 	{
806 		/* Parallel worker produces exactly one final run from all input */
807 		state->shared = coordinate->sharedsort;
808 		state->worker = worker_get_identifier(state);
809 		state->nParticipants = -1;
810 	}
811 	else
812 	{
813 		/* Parallel leader state only used for final merge */
814 		state->shared = coordinate->sharedsort;
815 		state->worker = -1;
816 		state->nParticipants = coordinate->nParticipants;
817 		Assert(state->nParticipants >= 1);
818 	}
819 
820 	MemoryContextSwitchTo(oldcontext);
821 
822 	return state;
823 }
824 
825 /*
826  *		tuplesort_begin_batch
827  *
828  * Setup, or reset, all state need for processing a new set of tuples with this
829  * sort state. Called both from tuplesort_begin_common (the first time sorting
830  * with this sort state) and tuplesort_reset (for subsequent usages).
831  */
832 static void
tuplesort_begin_batch(Tuplesortstate * state)833 tuplesort_begin_batch(Tuplesortstate *state)
834 {
835 	MemoryContext oldcontext;
836 
837 	oldcontext = MemoryContextSwitchTo(state->maincontext);
838 
839 	/*
840 	 * Caller tuple (e.g. IndexTuple) memory context.
841 	 *
842 	 * A dedicated child context used exclusively for caller passed tuples
843 	 * eases memory management.  Resetting at key points reduces
844 	 * fragmentation. Note that the memtuples array of SortTuples is allocated
845 	 * in the parent context, not this context, because there is no need to
846 	 * free memtuples early.
847 	 */
848 	state->tuplecontext = AllocSetContextCreate(state->sortcontext,
849 												"Caller tuples",
850 												ALLOCSET_DEFAULT_SIZES);
851 
852 	state->status = TSS_INITIAL;
853 	state->bounded = false;
854 	state->boundUsed = false;
855 
856 	state->availMem = state->allowedMem;
857 
858 	state->tapeset = NULL;
859 
860 	state->memtupcount = 0;
861 
862 	/*
863 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
864 	 * see comments in grow_memtuples().
865 	 */
866 	state->growmemtuples = true;
867 	state->slabAllocatorUsed = false;
868 	if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
869 	{
870 		pfree(state->memtuples);
871 		state->memtuples = NULL;
872 		state->memtupsize = INITIAL_MEMTUPSIZE;
873 	}
874 	if (state->memtuples == NULL)
875 	{
876 		state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
877 		USEMEM(state, GetMemoryChunkSpace(state->memtuples));
878 	}
879 
880 	/* workMem must be large enough for the minimal memtuples array */
881 	if (LACKMEM(state))
882 		elog(ERROR, "insufficient memory allowed for sort");
883 
884 	state->currentRun = 0;
885 
886 	/*
887 	 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
888 	 * inittapes(), if needed
889 	 */
890 
891 	state->result_tape = -1;	/* flag that result tape has not been formed */
892 
893 	MemoryContextSwitchTo(oldcontext);
894 }
895 
896 Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,int nkeys,AttrNumber * attNums,Oid * sortOperators,Oid * sortCollations,bool * nullsFirstFlags,int workMem,SortCoordinate coordinate,bool randomAccess)897 tuplesort_begin_heap(TupleDesc tupDesc,
898 					 int nkeys, AttrNumber *attNums,
899 					 Oid *sortOperators, Oid *sortCollations,
900 					 bool *nullsFirstFlags,
901 					 int workMem, SortCoordinate coordinate, bool randomAccess)
902 {
903 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
904 												   randomAccess);
905 	MemoryContext oldcontext;
906 	int			i;
907 
908 	oldcontext = MemoryContextSwitchTo(state->maincontext);
909 
910 	AssertArg(nkeys > 0);
911 
912 #ifdef TRACE_SORT
913 	if (trace_sort)
914 		elog(LOG,
915 			 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
916 			 nkeys, workMem, randomAccess ? 't' : 'f');
917 #endif
918 
919 	state->nKeys = nkeys;
920 
921 	TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
922 								false,	/* no unique check */
923 								nkeys,
924 								workMem,
925 								randomAccess,
926 								PARALLEL_SORT(state));
927 
928 	state->comparetup = comparetup_heap;
929 	state->copytup = copytup_heap;
930 	state->writetup = writetup_heap;
931 	state->readtup = readtup_heap;
932 
933 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
934 	state->abbrevNext = 10;
935 
936 	/* Prepare SortSupport data for each column */
937 	state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
938 
939 	for (i = 0; i < nkeys; i++)
940 	{
941 		SortSupport sortKey = state->sortKeys + i;
942 
943 		AssertArg(attNums[i] != 0);
944 		AssertArg(sortOperators[i] != 0);
945 
946 		sortKey->ssup_cxt = CurrentMemoryContext;
947 		sortKey->ssup_collation = sortCollations[i];
948 		sortKey->ssup_nulls_first = nullsFirstFlags[i];
949 		sortKey->ssup_attno = attNums[i];
950 		/* Convey if abbreviation optimization is applicable in principle */
951 		sortKey->abbreviate = (i == 0);
952 
953 		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
954 	}
955 
956 	/*
957 	 * The "onlyKey" optimization cannot be used with abbreviated keys, since
958 	 * tie-breaker comparisons may be required.  Typically, the optimization
959 	 * is only of value to pass-by-value types anyway, whereas abbreviated
960 	 * keys are typically only of value to pass-by-reference types.
961 	 */
962 	if (nkeys == 1 && !state->sortKeys->abbrev_converter)
963 		state->onlyKey = state->sortKeys;
964 
965 	MemoryContextSwitchTo(oldcontext);
966 
967 	return state;
968 }
969 
970 Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)971 tuplesort_begin_cluster(TupleDesc tupDesc,
972 						Relation indexRel,
973 						int workMem,
974 						SortCoordinate coordinate, bool randomAccess)
975 {
976 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
977 												   randomAccess);
978 	BTScanInsert indexScanKey;
979 	MemoryContext oldcontext;
980 	int			i;
981 
982 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
983 
984 	oldcontext = MemoryContextSwitchTo(state->maincontext);
985 
986 #ifdef TRACE_SORT
987 	if (trace_sort)
988 		elog(LOG,
989 			 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
990 			 RelationGetNumberOfAttributes(indexRel),
991 			 workMem, randomAccess ? 't' : 'f');
992 #endif
993 
994 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
995 
996 	TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
997 								false,	/* no unique check */
998 								state->nKeys,
999 								workMem,
1000 								randomAccess,
1001 								PARALLEL_SORT(state));
1002 
1003 	state->comparetup = comparetup_cluster;
1004 	state->copytup = copytup_cluster;
1005 	state->writetup = writetup_cluster;
1006 	state->readtup = readtup_cluster;
1007 	state->abbrevNext = 10;
1008 
1009 	state->indexInfo = BuildIndexInfo(indexRel);
1010 
1011 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
1012 
1013 	indexScanKey = _bt_mkscankey(indexRel, NULL);
1014 
1015 	if (state->indexInfo->ii_Expressions != NULL)
1016 	{
1017 		TupleTableSlot *slot;
1018 		ExprContext *econtext;
1019 
1020 		/*
1021 		 * We will need to use FormIndexDatum to evaluate the index
1022 		 * expressions.  To do that, we need an EState, as well as a
1023 		 * TupleTableSlot to put the table tuples into.  The econtext's
1024 		 * scantuple has to point to that slot, too.
1025 		 */
1026 		state->estate = CreateExecutorState();
1027 		slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
1028 		econtext = GetPerTupleExprContext(state->estate);
1029 		econtext->ecxt_scantuple = slot;
1030 	}
1031 
1032 	/* Prepare SortSupport data for each column */
1033 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
1034 											sizeof(SortSupportData));
1035 
1036 	for (i = 0; i < state->nKeys; i++)
1037 	{
1038 		SortSupport sortKey = state->sortKeys + i;
1039 		ScanKey		scanKey = indexScanKey->scankeys + i;
1040 		int16		strategy;
1041 
1042 		sortKey->ssup_cxt = CurrentMemoryContext;
1043 		sortKey->ssup_collation = scanKey->sk_collation;
1044 		sortKey->ssup_nulls_first =
1045 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1046 		sortKey->ssup_attno = scanKey->sk_attno;
1047 		/* Convey if abbreviation optimization is applicable in principle */
1048 		sortKey->abbreviate = (i == 0);
1049 
1050 		AssertState(sortKey->ssup_attno != 0);
1051 
1052 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1053 			BTGreaterStrategyNumber : BTLessStrategyNumber;
1054 
1055 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1056 	}
1057 
1058 	pfree(indexScanKey);
1059 
1060 	MemoryContextSwitchTo(oldcontext);
1061 
1062 	return state;
1063 }
1064 
1065 Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,Relation indexRel,bool enforceUnique,int workMem,SortCoordinate coordinate,bool randomAccess)1066 tuplesort_begin_index_btree(Relation heapRel,
1067 							Relation indexRel,
1068 							bool enforceUnique,
1069 							int workMem,
1070 							SortCoordinate coordinate,
1071 							bool randomAccess)
1072 {
1073 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1074 												   randomAccess);
1075 	BTScanInsert indexScanKey;
1076 	MemoryContext oldcontext;
1077 	int			i;
1078 
1079 	oldcontext = MemoryContextSwitchTo(state->maincontext);
1080 
1081 #ifdef TRACE_SORT
1082 	if (trace_sort)
1083 		elog(LOG,
1084 			 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
1085 			 enforceUnique ? 't' : 'f',
1086 			 workMem, randomAccess ? 't' : 'f');
1087 #endif
1088 
1089 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
1090 
1091 	TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1092 								enforceUnique,
1093 								state->nKeys,
1094 								workMem,
1095 								randomAccess,
1096 								PARALLEL_SORT(state));
1097 
1098 	state->comparetup = comparetup_index_btree;
1099 	state->copytup = copytup_index;
1100 	state->writetup = writetup_index;
1101 	state->readtup = readtup_index;
1102 	state->abbrevNext = 10;
1103 
1104 	state->heapRel = heapRel;
1105 	state->indexRel = indexRel;
1106 	state->enforceUnique = enforceUnique;
1107 
1108 	indexScanKey = _bt_mkscankey(indexRel, NULL);
1109 
1110 	/* Prepare SortSupport data for each column */
1111 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
1112 											sizeof(SortSupportData));
1113 
1114 	for (i = 0; i < state->nKeys; i++)
1115 	{
1116 		SortSupport sortKey = state->sortKeys + i;
1117 		ScanKey		scanKey = indexScanKey->scankeys + i;
1118 		int16		strategy;
1119 
1120 		sortKey->ssup_cxt = CurrentMemoryContext;
1121 		sortKey->ssup_collation = scanKey->sk_collation;
1122 		sortKey->ssup_nulls_first =
1123 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1124 		sortKey->ssup_attno = scanKey->sk_attno;
1125 		/* Convey if abbreviation optimization is applicable in principle */
1126 		sortKey->abbreviate = (i == 0);
1127 
1128 		AssertState(sortKey->ssup_attno != 0);
1129 
1130 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1131 			BTGreaterStrategyNumber : BTLessStrategyNumber;
1132 
1133 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1134 	}
1135 
1136 	pfree(indexScanKey);
1137 
1138 	MemoryContextSwitchTo(oldcontext);
1139 
1140 	return state;
1141 }
1142 
1143 Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,Relation indexRel,uint32 high_mask,uint32 low_mask,uint32 max_buckets,int workMem,SortCoordinate coordinate,bool randomAccess)1144 tuplesort_begin_index_hash(Relation heapRel,
1145 						   Relation indexRel,
1146 						   uint32 high_mask,
1147 						   uint32 low_mask,
1148 						   uint32 max_buckets,
1149 						   int workMem,
1150 						   SortCoordinate coordinate,
1151 						   bool randomAccess)
1152 {
1153 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1154 												   randomAccess);
1155 	MemoryContext oldcontext;
1156 
1157 	oldcontext = MemoryContextSwitchTo(state->maincontext);
1158 
1159 #ifdef TRACE_SORT
1160 	if (trace_sort)
1161 		elog(LOG,
1162 			 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1163 			 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1164 			 high_mask,
1165 			 low_mask,
1166 			 max_buckets,
1167 			 workMem, randomAccess ? 't' : 'f');
1168 #endif
1169 
1170 	state->nKeys = 1;			/* Only one sort column, the hash code */
1171 
1172 	state->comparetup = comparetup_index_hash;
1173 	state->copytup = copytup_index;
1174 	state->writetup = writetup_index;
1175 	state->readtup = readtup_index;
1176 
1177 	state->heapRel = heapRel;
1178 	state->indexRel = indexRel;
1179 
1180 	state->high_mask = high_mask;
1181 	state->low_mask = low_mask;
1182 	state->max_buckets = max_buckets;
1183 
1184 	MemoryContextSwitchTo(oldcontext);
1185 
1186 	return state;
1187 }
1188 
1189 Tuplesortstate *
tuplesort_begin_index_gist(Relation heapRel,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)1190 tuplesort_begin_index_gist(Relation heapRel,
1191 						   Relation indexRel,
1192 						   int workMem,
1193 						   SortCoordinate coordinate,
1194 						   bool randomAccess)
1195 {
1196 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1197 												   randomAccess);
1198 	MemoryContext oldcontext;
1199 	int			i;
1200 
1201 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
1202 
1203 #ifdef TRACE_SORT
1204 	if (trace_sort)
1205 		elog(LOG,
1206 			 "begin index sort: workMem = %d, randomAccess = %c",
1207 			 workMem, randomAccess ? 't' : 'f');
1208 #endif
1209 
1210 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
1211 
1212 	state->comparetup = comparetup_index_btree;
1213 	state->copytup = copytup_index;
1214 	state->writetup = writetup_index;
1215 	state->readtup = readtup_index;
1216 
1217 	state->heapRel = heapRel;
1218 	state->indexRel = indexRel;
1219 
1220 	/* Prepare SortSupport data for each column */
1221 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
1222 											sizeof(SortSupportData));
1223 
1224 	for (i = 0; i < state->nKeys; i++)
1225 	{
1226 		SortSupport sortKey = state->sortKeys + i;
1227 
1228 		sortKey->ssup_cxt = CurrentMemoryContext;
1229 		sortKey->ssup_collation = indexRel->rd_indcollation[i];
1230 		sortKey->ssup_nulls_first = false;
1231 		sortKey->ssup_attno = i + 1;
1232 		/* Convey if abbreviation optimization is applicable in principle */
1233 		sortKey->abbreviate = (i == 0);
1234 
1235 		AssertState(sortKey->ssup_attno != 0);
1236 
1237 		/* Look for a sort support function */
1238 		PrepareSortSupportFromGistIndexRel(indexRel, sortKey);
1239 	}
1240 
1241 	MemoryContextSwitchTo(oldcontext);
1242 
1243 	return state;
1244 }
1245 
1246 Tuplesortstate *
tuplesort_begin_datum(Oid datumType,Oid sortOperator,Oid sortCollation,bool nullsFirstFlag,int workMem,SortCoordinate coordinate,bool randomAccess)1247 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1248 					  bool nullsFirstFlag, int workMem,
1249 					  SortCoordinate coordinate, bool randomAccess)
1250 {
1251 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1252 												   randomAccess);
1253 	MemoryContext oldcontext;
1254 	int16		typlen;
1255 	bool		typbyval;
1256 
1257 	oldcontext = MemoryContextSwitchTo(state->maincontext);
1258 
1259 #ifdef TRACE_SORT
1260 	if (trace_sort)
1261 		elog(LOG,
1262 			 "begin datum sort: workMem = %d, randomAccess = %c",
1263 			 workMem, randomAccess ? 't' : 'f');
1264 #endif
1265 
1266 	state->nKeys = 1;			/* always a one-column sort */
1267 
1268 	TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1269 								false,	/* no unique check */
1270 								1,
1271 								workMem,
1272 								randomAccess,
1273 								PARALLEL_SORT(state));
1274 
1275 	state->comparetup = comparetup_datum;
1276 	state->copytup = copytup_datum;
1277 	state->writetup = writetup_datum;
1278 	state->readtup = readtup_datum;
1279 	state->abbrevNext = 10;
1280 
1281 	state->datumType = datumType;
1282 
1283 	/* lookup necessary attributes of the datum type */
1284 	get_typlenbyval(datumType, &typlen, &typbyval);
1285 	state->datumTypeLen = typlen;
1286 	state->tuples = !typbyval;
1287 
1288 	/* Prepare SortSupport data */
1289 	state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1290 
1291 	state->sortKeys->ssup_cxt = CurrentMemoryContext;
1292 	state->sortKeys->ssup_collation = sortCollation;
1293 	state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1294 
1295 	/*
1296 	 * Abbreviation is possible here only for by-reference types.  In theory,
1297 	 * a pass-by-value datatype could have an abbreviated form that is cheaper
1298 	 * to compare.  In a tuple sort, we could support that, because we can
1299 	 * always extract the original datum from the tuple as needed.  Here, we
1300 	 * can't, because a datum sort only stores a single copy of the datum; the
1301 	 * "tuple" field of each SortTuple is NULL.
1302 	 */
1303 	state->sortKeys->abbreviate = !typbyval;
1304 
1305 	PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1306 
1307 	/*
1308 	 * The "onlyKey" optimization cannot be used with abbreviated keys, since
1309 	 * tie-breaker comparisons may be required.  Typically, the optimization
1310 	 * is only of value to pass-by-value types anyway, whereas abbreviated
1311 	 * keys are typically only of value to pass-by-reference types.
1312 	 */
1313 	if (!state->sortKeys->abbrev_converter)
1314 		state->onlyKey = state->sortKeys;
1315 
1316 	MemoryContextSwitchTo(oldcontext);
1317 
1318 	return state;
1319 }
1320 
1321 /*
1322  * tuplesort_set_bound
1323  *
1324  *	Advise tuplesort that at most the first N result tuples are required.
1325  *
1326  * Must be called before inserting any tuples.  (Actually, we could allow it
1327  * as long as the sort hasn't spilled to disk, but there seems no need for
1328  * delayed calls at the moment.)
1329  *
1330  * This is a hint only. The tuplesort may still return more tuples than
1331  * requested.  Parallel leader tuplesorts will always ignore the hint.
1332  */
1333 void
tuplesort_set_bound(Tuplesortstate * state,int64 bound)1334 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1335 {
1336 	/* Assert we're called before loading any tuples */
1337 	Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
1338 	/* Can't set the bound twice, either */
1339 	Assert(!state->bounded);
1340 	/* Also, this shouldn't be called in a parallel worker */
1341 	Assert(!WORKER(state));
1342 
1343 	/* Parallel leader allows but ignores hint */
1344 	if (LEADER(state))
1345 		return;
1346 
1347 #ifdef DEBUG_BOUNDED_SORT
1348 	/* Honor GUC setting that disables the feature (for easy testing) */
1349 	if (!optimize_bounded_sort)
1350 		return;
1351 #endif
1352 
1353 	/* We want to be able to compute bound * 2, so limit the setting */
1354 	if (bound > (int64) (INT_MAX / 2))
1355 		return;
1356 
1357 	state->bounded = true;
1358 	state->bound = (int) bound;
1359 
1360 	/*
1361 	 * Bounded sorts are not an effective target for abbreviated key
1362 	 * optimization.  Disable by setting state to be consistent with no
1363 	 * abbreviation support.
1364 	 */
1365 	state->sortKeys->abbrev_converter = NULL;
1366 	if (state->sortKeys->abbrev_full_comparator)
1367 		state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1368 
1369 	/* Not strictly necessary, but be tidy */
1370 	state->sortKeys->abbrev_abort = NULL;
1371 	state->sortKeys->abbrev_full_comparator = NULL;
1372 }
1373 
1374 /*
1375  * tuplesort_used_bound
1376  *
1377  * Allow callers to find out if the sort state was able to use a bound.
1378  */
1379 bool
tuplesort_used_bound(Tuplesortstate * state)1380 tuplesort_used_bound(Tuplesortstate *state)
1381 {
1382 	return state->boundUsed;
1383 }
1384 
1385 /*
1386  * tuplesort_free
1387  *
1388  *	Internal routine for freeing resources of tuplesort.
1389  */
1390 static void
tuplesort_free(Tuplesortstate * state)1391 tuplesort_free(Tuplesortstate *state)
1392 {
1393 	/* context swap probably not needed, but let's be safe */
1394 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1395 
1396 #ifdef TRACE_SORT
1397 	long		spaceUsed;
1398 
1399 	if (state->tapeset)
1400 		spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1401 	else
1402 		spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1403 #endif
1404 
1405 	/*
1406 	 * Delete temporary "tape" files, if any.
1407 	 *
1408 	 * Note: want to include this in reported total cost of sort, hence need
1409 	 * for two #ifdef TRACE_SORT sections.
1410 	 */
1411 	if (state->tapeset)
1412 		LogicalTapeSetClose(state->tapeset);
1413 
1414 #ifdef TRACE_SORT
1415 	if (trace_sort)
1416 	{
1417 		if (state->tapeset)
1418 			elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1419 				 SERIAL(state) ? "external sort" : "parallel external sort",
1420 				 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1421 		else
1422 			elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1423 				 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1424 				 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1425 	}
1426 
1427 	TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1428 #else
1429 
1430 	/*
1431 	 * If you disabled TRACE_SORT, you can still probe sort__done, but you
1432 	 * ain't getting space-used stats.
1433 	 */
1434 	TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1435 #endif
1436 
1437 	/* Free any execution state created for CLUSTER case */
1438 	if (state->estate != NULL)
1439 	{
1440 		ExprContext *econtext = GetPerTupleExprContext(state->estate);
1441 
1442 		ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1443 		FreeExecutorState(state->estate);
1444 	}
1445 
1446 	MemoryContextSwitchTo(oldcontext);
1447 
1448 	/*
1449 	 * Free the per-sort memory context, thereby releasing all working memory.
1450 	 */
1451 	MemoryContextReset(state->sortcontext);
1452 }
1453 
1454 /*
1455  * tuplesort_end
1456  *
1457  *	Release resources and clean up.
1458  *
1459  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1460  * pointing to garbage.  Be careful not to attempt to use or free such
1461  * pointers afterwards!
1462  */
1463 void
tuplesort_end(Tuplesortstate * state)1464 tuplesort_end(Tuplesortstate *state)
1465 {
1466 	tuplesort_free(state);
1467 
1468 	/*
1469 	 * Free the main memory context, including the Tuplesortstate struct
1470 	 * itself.
1471 	 */
1472 	MemoryContextDelete(state->maincontext);
1473 }
1474 
1475 /*
1476  * tuplesort_updatemax
1477  *
1478  *	Update maximum resource usage statistics.
1479  */
1480 static void
tuplesort_updatemax(Tuplesortstate * state)1481 tuplesort_updatemax(Tuplesortstate *state)
1482 {
1483 	int64		spaceUsed;
1484 	bool		isSpaceDisk;
1485 
1486 	/*
1487 	 * Note: it might seem we should provide both memory and disk usage for a
1488 	 * disk-based sort.  However, the current code doesn't track memory space
1489 	 * accurately once we have begun to return tuples to the caller (since we
1490 	 * don't account for pfree's the caller is expected to do), so we cannot
1491 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
1492 	 * to fix.  Is it worth creating an API for the memory context code to
1493 	 * tell us how much is actually used in sortcontext?
1494 	 */
1495 	if (state->tapeset)
1496 	{
1497 		isSpaceDisk = true;
1498 		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1499 	}
1500 	else
1501 	{
1502 		isSpaceDisk = false;
1503 		spaceUsed = state->allowedMem - state->availMem;
1504 	}
1505 
1506 	/*
1507 	 * Sort evicts data to the disk when it wasn't able to fit that data into
1508 	 * main memory.  This is why we assume space used on the disk to be more
1509 	 * important for tracking resource usage than space used in memory. Note
1510 	 * that the amount of space occupied by some tupleset on the disk might be
1511 	 * less than amount of space occupied by the same tupleset in memory due
1512 	 * to more compact representation.
1513 	 */
1514 	if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1515 		(isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1516 	{
1517 		state->maxSpace = spaceUsed;
1518 		state->isMaxSpaceDisk = isSpaceDisk;
1519 		state->maxSpaceStatus = state->status;
1520 	}
1521 }
1522 
1523 /*
1524  * tuplesort_reset
1525  *
1526  *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
1527  *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
1528  *	a new sort.  This allows avoiding recreation of tuple sort states (and
1529  *	save resources) when sorting multiple small batches.
1530  */
1531 void
tuplesort_reset(Tuplesortstate * state)1532 tuplesort_reset(Tuplesortstate *state)
1533 {
1534 	tuplesort_updatemax(state);
1535 	tuplesort_free(state);
1536 
1537 	/*
1538 	 * After we've freed up per-batch memory, re-setup all of the state common
1539 	 * to both the first batch and any subsequent batch.
1540 	 */
1541 	tuplesort_begin_batch(state);
1542 
1543 	state->lastReturnedTuple = NULL;
1544 	state->slabMemoryBegin = NULL;
1545 	state->slabMemoryEnd = NULL;
1546 	state->slabFreeHead = NULL;
1547 }
1548 
1549 /*
1550  * Grow the memtuples[] array, if possible within our memory constraint.  We
1551  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1552  * limit.  Return true if we were able to enlarge the array, false if not.
1553  *
1554  * Normally, at each increment we double the size of the array.  When doing
1555  * that would exceed a limit, we attempt one last, smaller increase (and then
1556  * clear the growmemtuples flag so we don't try any more).  That allows us to
1557  * use memory as fully as permitted; sticking to the pure doubling rule could
1558  * result in almost half going unused.  Because availMem moves around with
1559  * tuple addition/removal, we need some rule to prevent making repeated small
1560  * increases in memtupsize, which would just be useless thrashing.  The
1561  * growmemtuples flag accomplishes that and also prevents useless
1562  * recalculations in this function.
1563  */
1564 static bool
grow_memtuples(Tuplesortstate * state)1565 grow_memtuples(Tuplesortstate *state)
1566 {
1567 	int			newmemtupsize;
1568 	int			memtupsize = state->memtupsize;
1569 	int64		memNowUsed = state->allowedMem - state->availMem;
1570 
1571 	/* Forget it if we've already maxed out memtuples, per comment above */
1572 	if (!state->growmemtuples)
1573 		return false;
1574 
1575 	/* Select new value of memtupsize */
1576 	if (memNowUsed <= state->availMem)
1577 	{
1578 		/*
1579 		 * We've used no more than half of allowedMem; double our usage,
1580 		 * clamping at INT_MAX tuples.
1581 		 */
1582 		if (memtupsize < INT_MAX / 2)
1583 			newmemtupsize = memtupsize * 2;
1584 		else
1585 		{
1586 			newmemtupsize = INT_MAX;
1587 			state->growmemtuples = false;
1588 		}
1589 	}
1590 	else
1591 	{
1592 		/*
1593 		 * This will be the last increment of memtupsize.  Abandon doubling
1594 		 * strategy and instead increase as much as we safely can.
1595 		 *
1596 		 * To stay within allowedMem, we can't increase memtupsize by more
1597 		 * than availMem / sizeof(SortTuple) elements.  In practice, we want
1598 		 * to increase it by considerably less, because we need to leave some
1599 		 * space for the tuples to which the new array slots will refer.  We
1600 		 * assume the new tuples will be about the same size as the tuples
1601 		 * we've already seen, and thus we can extrapolate from the space
1602 		 * consumption so far to estimate an appropriate new size for the
1603 		 * memtuples array.  The optimal value might be higher or lower than
1604 		 * this estimate, but it's hard to know that in advance.  We again
1605 		 * clamp at INT_MAX tuples.
1606 		 *
1607 		 * This calculation is safe against enlarging the array so much that
1608 		 * LACKMEM becomes true, because the memory currently used includes
1609 		 * the present array; thus, there would be enough allowedMem for the
1610 		 * new array elements even if no other memory were currently used.
1611 		 *
1612 		 * We do the arithmetic in float8, because otherwise the product of
1613 		 * memtupsize and allowedMem could overflow.  Any inaccuracy in the
1614 		 * result should be insignificant; but even if we computed a
1615 		 * completely insane result, the checks below will prevent anything
1616 		 * really bad from happening.
1617 		 */
1618 		double		grow_ratio;
1619 
1620 		grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1621 		if (memtupsize * grow_ratio < INT_MAX)
1622 			newmemtupsize = (int) (memtupsize * grow_ratio);
1623 		else
1624 			newmemtupsize = INT_MAX;
1625 
1626 		/* We won't make any further enlargement attempts */
1627 		state->growmemtuples = false;
1628 	}
1629 
1630 	/* Must enlarge array by at least one element, else report failure */
1631 	if (newmemtupsize <= memtupsize)
1632 		goto noalloc;
1633 
1634 	/*
1635 	 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
1636 	 * to ensure our request won't be rejected.  Note that we can easily
1637 	 * exhaust address space before facing this outcome.  (This is presently
1638 	 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1639 	 * don't rely on that at this distance.)
1640 	 */
1641 	if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1642 	{
1643 		newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1644 		state->growmemtuples = false;	/* can't grow any more */
1645 	}
1646 
1647 	/*
1648 	 * We need to be sure that we do not cause LACKMEM to become true, else
1649 	 * the space management algorithm will go nuts.  The code above should
1650 	 * never generate a dangerous request, but to be safe, check explicitly
1651 	 * that the array growth fits within availMem.  (We could still cause
1652 	 * LACKMEM if the memory chunk overhead associated with the memtuples
1653 	 * array were to increase.  That shouldn't happen because we chose the
1654 	 * initial array size large enough to ensure that palloc will be treating
1655 	 * both old and new arrays as separate chunks.  But we'll check LACKMEM
1656 	 * explicitly below just in case.)
1657 	 */
1658 	if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1659 		goto noalloc;
1660 
1661 	/* OK, do it */
1662 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1663 	state->memtupsize = newmemtupsize;
1664 	state->memtuples = (SortTuple *)
1665 		repalloc_huge(state->memtuples,
1666 					  state->memtupsize * sizeof(SortTuple));
1667 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1668 	if (LACKMEM(state))
1669 		elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1670 	return true;
1671 
1672 noalloc:
1673 	/* If for any reason we didn't realloc, shut off future attempts */
1674 	state->growmemtuples = false;
1675 	return false;
1676 }
1677 
1678 /*
1679  * Accept one tuple while collecting input data for sort.
1680  *
1681  * Note that the input data is always copied; the caller need not save it.
1682  */
1683 void
tuplesort_puttupleslot(Tuplesortstate * state,TupleTableSlot * slot)1684 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1685 {
1686 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1687 	SortTuple	stup;
1688 
1689 	/*
1690 	 * Copy the given tuple into memory we control, and decrease availMem.
1691 	 * Then call the common code.
1692 	 */
1693 	COPYTUP(state, &stup, (void *) slot);
1694 
1695 	puttuple_common(state, &stup);
1696 
1697 	MemoryContextSwitchTo(oldcontext);
1698 }
1699 
1700 /*
1701  * Accept one tuple while collecting input data for sort.
1702  *
1703  * Note that the input data is always copied; the caller need not save it.
1704  */
1705 void
tuplesort_putheaptuple(Tuplesortstate * state,HeapTuple tup)1706 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1707 {
1708 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1709 	SortTuple	stup;
1710 
1711 	/*
1712 	 * Copy the given tuple into memory we control, and decrease availMem.
1713 	 * Then call the common code.
1714 	 */
1715 	COPYTUP(state, &stup, (void *) tup);
1716 
1717 	puttuple_common(state, &stup);
1718 
1719 	MemoryContextSwitchTo(oldcontext);
1720 }
1721 
1722 /*
1723  * Collect one index tuple while collecting input data for sort, building
1724  * it from caller-supplied values.
1725  */
1726 void
tuplesort_putindextuplevalues(Tuplesortstate * state,Relation rel,ItemPointer self,Datum * values,bool * isnull)1727 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1728 							  ItemPointer self, Datum *values,
1729 							  bool *isnull)
1730 {
1731 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1732 	SortTuple	stup;
1733 	Datum		original;
1734 	IndexTuple	tuple;
1735 
1736 	stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1737 	tuple = ((IndexTuple) stup.tuple);
1738 	tuple->t_tid = *self;
1739 	USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1740 	/* set up first-column key value */
1741 	original = index_getattr(tuple,
1742 							 1,
1743 							 RelationGetDescr(state->indexRel),
1744 							 &stup.isnull1);
1745 
1746 	MemoryContextSwitchTo(state->sortcontext);
1747 
1748 	if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1749 	{
1750 		/*
1751 		 * Store ordinary Datum representation, or NULL value.  If there is a
1752 		 * converter it won't expect NULL values, and cost model is not
1753 		 * required to account for NULL, so in that case we avoid calling
1754 		 * converter and just set datum1 to zeroed representation (to be
1755 		 * consistent, and to support cheap inequality tests for NULL
1756 		 * abbreviated keys).
1757 		 */
1758 		stup.datum1 = original;
1759 	}
1760 	else if (!consider_abort_common(state))
1761 	{
1762 		/* Store abbreviated key representation */
1763 		stup.datum1 = state->sortKeys->abbrev_converter(original,
1764 														state->sortKeys);
1765 	}
1766 	else
1767 	{
1768 		/* Abort abbreviation */
1769 		int			i;
1770 
1771 		stup.datum1 = original;
1772 
1773 		/*
1774 		 * Set state to be consistent with never trying abbreviation.
1775 		 *
1776 		 * Alter datum1 representation in already-copied tuples, so as to
1777 		 * ensure a consistent representation (current tuple was just
1778 		 * handled).  It does not matter if some dumped tuples are already
1779 		 * sorted on tape, since serialized tuples lack abbreviated keys
1780 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1781 		 */
1782 		for (i = 0; i < state->memtupcount; i++)
1783 		{
1784 			SortTuple  *mtup = &state->memtuples[i];
1785 
1786 			tuple = mtup->tuple;
1787 			mtup->datum1 = index_getattr(tuple,
1788 										 1,
1789 										 RelationGetDescr(state->indexRel),
1790 										 &mtup->isnull1);
1791 		}
1792 	}
1793 
1794 	puttuple_common(state, &stup);
1795 
1796 	MemoryContextSwitchTo(oldcontext);
1797 }
1798 
1799 /*
1800  * Accept one Datum while collecting input data for sort.
1801  *
1802  * If the Datum is pass-by-ref type, the value will be copied.
1803  */
1804 void
tuplesort_putdatum(Tuplesortstate * state,Datum val,bool isNull)1805 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1806 {
1807 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1808 	SortTuple	stup;
1809 
1810 	/*
1811 	 * Pass-by-value types or null values are just stored directly in
1812 	 * stup.datum1 (and stup.tuple is not used and set to NULL).
1813 	 *
1814 	 * Non-null pass-by-reference values need to be copied into memory we
1815 	 * control, and possibly abbreviated. The copied value is pointed to by
1816 	 * stup.tuple and is treated as the canonical copy (e.g. to return via
1817 	 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1818 	 * abbreviated value if abbreviation is happening, otherwise it's
1819 	 * identical to stup.tuple.
1820 	 */
1821 
1822 	if (isNull || !state->tuples)
1823 	{
1824 		/*
1825 		 * Set datum1 to zeroed representation for NULLs (to be consistent,
1826 		 * and to support cheap inequality tests for NULL abbreviated keys).
1827 		 */
1828 		stup.datum1 = !isNull ? val : (Datum) 0;
1829 		stup.isnull1 = isNull;
1830 		stup.tuple = NULL;		/* no separate storage */
1831 		MemoryContextSwitchTo(state->sortcontext);
1832 	}
1833 	else
1834 	{
1835 		Datum		original = datumCopy(val, false, state->datumTypeLen);
1836 
1837 		stup.isnull1 = false;
1838 		stup.tuple = DatumGetPointer(original);
1839 		USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1840 		MemoryContextSwitchTo(state->sortcontext);
1841 
1842 		if (!state->sortKeys->abbrev_converter)
1843 		{
1844 			stup.datum1 = original;
1845 		}
1846 		else if (!consider_abort_common(state))
1847 		{
1848 			/* Store abbreviated key representation */
1849 			stup.datum1 = state->sortKeys->abbrev_converter(original,
1850 															state->sortKeys);
1851 		}
1852 		else
1853 		{
1854 			/* Abort abbreviation */
1855 			int			i;
1856 
1857 			stup.datum1 = original;
1858 
1859 			/*
1860 			 * Set state to be consistent with never trying abbreviation.
1861 			 *
1862 			 * Alter datum1 representation in already-copied tuples, so as to
1863 			 * ensure a consistent representation (current tuple was just
1864 			 * handled).  It does not matter if some dumped tuples are already
1865 			 * sorted on tape, since serialized tuples lack abbreviated keys
1866 			 * (TSS_BUILDRUNS state prevents control reaching here in any
1867 			 * case).
1868 			 */
1869 			for (i = 0; i < state->memtupcount; i++)
1870 			{
1871 				SortTuple  *mtup = &state->memtuples[i];
1872 
1873 				mtup->datum1 = PointerGetDatum(mtup->tuple);
1874 			}
1875 		}
1876 	}
1877 
1878 	puttuple_common(state, &stup);
1879 
1880 	MemoryContextSwitchTo(oldcontext);
1881 }
1882 
1883 /*
1884  * Shared code for tuple and datum cases.
1885  */
1886 static void
puttuple_common(Tuplesortstate * state,SortTuple * tuple)1887 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1888 {
1889 	Assert(!LEADER(state));
1890 
1891 	switch (state->status)
1892 	{
1893 		case TSS_INITIAL:
1894 
1895 			/*
1896 			 * Save the tuple into the unsorted array.  First, grow the array
1897 			 * as needed.  Note that we try to grow the array when there is
1898 			 * still one free slot remaining --- if we fail, there'll still be
1899 			 * room to store the incoming tuple, and then we'll switch to
1900 			 * tape-based operation.
1901 			 */
1902 			if (state->memtupcount >= state->memtupsize - 1)
1903 			{
1904 				(void) grow_memtuples(state);
1905 				Assert(state->memtupcount < state->memtupsize);
1906 			}
1907 			state->memtuples[state->memtupcount++] = *tuple;
1908 
1909 			/*
1910 			 * Check if it's time to switch over to a bounded heapsort. We do
1911 			 * so if the input tuple count exceeds twice the desired tuple
1912 			 * count (this is a heuristic for where heapsort becomes cheaper
1913 			 * than a quicksort), or if we've just filled workMem and have
1914 			 * enough tuples to meet the bound.
1915 			 *
1916 			 * Note that once we enter TSS_BOUNDED state we will always try to
1917 			 * complete the sort that way.  In the worst case, if later input
1918 			 * tuples are larger than earlier ones, this might cause us to
1919 			 * exceed workMem significantly.
1920 			 */
1921 			if (state->bounded &&
1922 				(state->memtupcount > state->bound * 2 ||
1923 				 (state->memtupcount > state->bound && LACKMEM(state))))
1924 			{
1925 #ifdef TRACE_SORT
1926 				if (trace_sort)
1927 					elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1928 						 state->memtupcount,
1929 						 pg_rusage_show(&state->ru_start));
1930 #endif
1931 				make_bounded_heap(state);
1932 				return;
1933 			}
1934 
1935 			/*
1936 			 * Done if we still fit in available memory and have array slots.
1937 			 */
1938 			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1939 				return;
1940 
1941 			/*
1942 			 * Nope; time to switch to tape-based operation.
1943 			 */
1944 			inittapes(state, true);
1945 
1946 			/*
1947 			 * Dump all tuples.
1948 			 */
1949 			dumptuples(state, false);
1950 			break;
1951 
1952 		case TSS_BOUNDED:
1953 
1954 			/*
1955 			 * We don't want to grow the array here, so check whether the new
1956 			 * tuple can be discarded before putting it in.  This should be a
1957 			 * good speed optimization, too, since when there are many more
1958 			 * input tuples than the bound, most input tuples can be discarded
1959 			 * with just this one comparison.  Note that because we currently
1960 			 * have the sort direction reversed, we must check for <= not >=.
1961 			 */
1962 			if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1963 			{
1964 				/* new tuple <= top of the heap, so we can discard it */
1965 				free_sort_tuple(state, tuple);
1966 				CHECK_FOR_INTERRUPTS();
1967 			}
1968 			else
1969 			{
1970 				/* discard top of heap, replacing it with the new tuple */
1971 				free_sort_tuple(state, &state->memtuples[0]);
1972 				tuplesort_heap_replace_top(state, tuple);
1973 			}
1974 			break;
1975 
1976 		case TSS_BUILDRUNS:
1977 
1978 			/*
1979 			 * Save the tuple into the unsorted array (there must be space)
1980 			 */
1981 			state->memtuples[state->memtupcount++] = *tuple;
1982 
1983 			/*
1984 			 * If we are over the memory limit, dump all tuples.
1985 			 */
1986 			dumptuples(state, false);
1987 			break;
1988 
1989 		default:
1990 			elog(ERROR, "invalid tuplesort state");
1991 			break;
1992 	}
1993 }
1994 
1995 static bool
consider_abort_common(Tuplesortstate * state)1996 consider_abort_common(Tuplesortstate *state)
1997 {
1998 	Assert(state->sortKeys[0].abbrev_converter != NULL);
1999 	Assert(state->sortKeys[0].abbrev_abort != NULL);
2000 	Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
2001 
2002 	/*
2003 	 * Check effectiveness of abbreviation optimization.  Consider aborting
2004 	 * when still within memory limit.
2005 	 */
2006 	if (state->status == TSS_INITIAL &&
2007 		state->memtupcount >= state->abbrevNext)
2008 	{
2009 		state->abbrevNext *= 2;
2010 
2011 		/*
2012 		 * Check opclass-supplied abbreviation abort routine.  It may indicate
2013 		 * that abbreviation should not proceed.
2014 		 */
2015 		if (!state->sortKeys->abbrev_abort(state->memtupcount,
2016 										   state->sortKeys))
2017 			return false;
2018 
2019 		/*
2020 		 * Finally, restore authoritative comparator, and indicate that
2021 		 * abbreviation is not in play by setting abbrev_converter to NULL
2022 		 */
2023 		state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
2024 		state->sortKeys[0].abbrev_converter = NULL;
2025 		/* Not strictly necessary, but be tidy */
2026 		state->sortKeys[0].abbrev_abort = NULL;
2027 		state->sortKeys[0].abbrev_full_comparator = NULL;
2028 
2029 		/* Give up - expect original pass-by-value representation */
2030 		return true;
2031 	}
2032 
2033 	return false;
2034 }
2035 
2036 /*
2037  * All tuples have been provided; finish the sort.
2038  */
2039 void
tuplesort_performsort(Tuplesortstate * state)2040 tuplesort_performsort(Tuplesortstate *state)
2041 {
2042 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2043 
2044 #ifdef TRACE_SORT
2045 	if (trace_sort)
2046 		elog(LOG, "performsort of worker %d starting: %s",
2047 			 state->worker, pg_rusage_show(&state->ru_start));
2048 #endif
2049 
2050 	switch (state->status)
2051 	{
2052 		case TSS_INITIAL:
2053 
2054 			/*
2055 			 * We were able to accumulate all the tuples within the allowed
2056 			 * amount of memory, or leader to take over worker tapes
2057 			 */
2058 			if (SERIAL(state))
2059 			{
2060 				/* Just qsort 'em and we're done */
2061 				tuplesort_sort_memtuples(state);
2062 				state->status = TSS_SORTEDINMEM;
2063 			}
2064 			else if (WORKER(state))
2065 			{
2066 				/*
2067 				 * Parallel workers must still dump out tuples to tape.  No
2068 				 * merge is required to produce single output run, though.
2069 				 */
2070 				inittapes(state, false);
2071 				dumptuples(state, true);
2072 				worker_nomergeruns(state);
2073 				state->status = TSS_SORTEDONTAPE;
2074 			}
2075 			else
2076 			{
2077 				/*
2078 				 * Leader will take over worker tapes and merge worker runs.
2079 				 * Note that mergeruns sets the correct state->status.
2080 				 */
2081 				leader_takeover_tapes(state);
2082 				mergeruns(state);
2083 			}
2084 			state->current = 0;
2085 			state->eof_reached = false;
2086 			state->markpos_block = 0L;
2087 			state->markpos_offset = 0;
2088 			state->markpos_eof = false;
2089 			break;
2090 
2091 		case TSS_BOUNDED:
2092 
2093 			/*
2094 			 * We were able to accumulate all the tuples required for output
2095 			 * in memory, using a heap to eliminate excess tuples.  Now we
2096 			 * have to transform the heap to a properly-sorted array.
2097 			 */
2098 			sort_bounded_heap(state);
2099 			state->current = 0;
2100 			state->eof_reached = false;
2101 			state->markpos_offset = 0;
2102 			state->markpos_eof = false;
2103 			state->status = TSS_SORTEDINMEM;
2104 			break;
2105 
2106 		case TSS_BUILDRUNS:
2107 
2108 			/*
2109 			 * Finish tape-based sort.  First, flush all tuples remaining in
2110 			 * memory out to tape; then merge until we have a single remaining
2111 			 * run (or, if !randomAccess and !WORKER(), one run per tape).
2112 			 * Note that mergeruns sets the correct state->status.
2113 			 */
2114 			dumptuples(state, true);
2115 			mergeruns(state);
2116 			state->eof_reached = false;
2117 			state->markpos_block = 0L;
2118 			state->markpos_offset = 0;
2119 			state->markpos_eof = false;
2120 			break;
2121 
2122 		default:
2123 			elog(ERROR, "invalid tuplesort state");
2124 			break;
2125 	}
2126 
2127 #ifdef TRACE_SORT
2128 	if (trace_sort)
2129 	{
2130 		if (state->status == TSS_FINALMERGE)
2131 			elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
2132 				 state->worker, state->activeTapes,
2133 				 pg_rusage_show(&state->ru_start));
2134 		else
2135 			elog(LOG, "performsort of worker %d done: %s",
2136 				 state->worker, pg_rusage_show(&state->ru_start));
2137 	}
2138 #endif
2139 
2140 	MemoryContextSwitchTo(oldcontext);
2141 }
2142 
2143 /*
2144  * Internal routine to fetch the next tuple in either forward or back
2145  * direction into *stup.  Returns false if no more tuples.
2146  * Returned tuple belongs to tuplesort memory context, and must not be freed
2147  * by caller.  Note that fetched tuple is stored in memory that may be
2148  * recycled by any future fetch.
2149  */
2150 static bool
tuplesort_gettuple_common(Tuplesortstate * state,bool forward,SortTuple * stup)2151 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
2152 						  SortTuple *stup)
2153 {
2154 	unsigned int tuplen;
2155 	size_t		nmoved;
2156 
2157 	Assert(!WORKER(state));
2158 
2159 	switch (state->status)
2160 	{
2161 		case TSS_SORTEDINMEM:
2162 			Assert(forward || state->randomAccess);
2163 			Assert(!state->slabAllocatorUsed);
2164 			if (forward)
2165 			{
2166 				if (state->current < state->memtupcount)
2167 				{
2168 					*stup = state->memtuples[state->current++];
2169 					return true;
2170 				}
2171 				state->eof_reached = true;
2172 
2173 				/*
2174 				 * Complain if caller tries to retrieve more tuples than
2175 				 * originally asked for in a bounded sort.  This is because
2176 				 * returning EOF here might be the wrong thing.
2177 				 */
2178 				if (state->bounded && state->current >= state->bound)
2179 					elog(ERROR, "retrieved too many tuples in a bounded sort");
2180 
2181 				return false;
2182 			}
2183 			else
2184 			{
2185 				if (state->current <= 0)
2186 					return false;
2187 
2188 				/*
2189 				 * if all tuples are fetched already then we return last
2190 				 * tuple, else - tuple before last returned.
2191 				 */
2192 				if (state->eof_reached)
2193 					state->eof_reached = false;
2194 				else
2195 				{
2196 					state->current--;	/* last returned tuple */
2197 					if (state->current <= 0)
2198 						return false;
2199 				}
2200 				*stup = state->memtuples[state->current - 1];
2201 				return true;
2202 			}
2203 			break;
2204 
2205 		case TSS_SORTEDONTAPE:
2206 			Assert(forward || state->randomAccess);
2207 			Assert(state->slabAllocatorUsed);
2208 
2209 			/*
2210 			 * The slot that held the tuple that we returned in previous
2211 			 * gettuple call can now be reused.
2212 			 */
2213 			if (state->lastReturnedTuple)
2214 			{
2215 				RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2216 				state->lastReturnedTuple = NULL;
2217 			}
2218 
2219 			if (forward)
2220 			{
2221 				if (state->eof_reached)
2222 					return false;
2223 
2224 				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
2225 				{
2226 					READTUP(state, stup, state->result_tape, tuplen);
2227 
2228 					/*
2229 					 * Remember the tuple we return, so that we can recycle
2230 					 * its memory on next call.  (This can be NULL, in the
2231 					 * !state->tuples case).
2232 					 */
2233 					state->lastReturnedTuple = stup->tuple;
2234 
2235 					return true;
2236 				}
2237 				else
2238 				{
2239 					state->eof_reached = true;
2240 					return false;
2241 				}
2242 			}
2243 
2244 			/*
2245 			 * Backward.
2246 			 *
2247 			 * if all tuples are fetched already then we return last tuple,
2248 			 * else - tuple before last returned.
2249 			 */
2250 			if (state->eof_reached)
2251 			{
2252 				/*
2253 				 * Seek position is pointing just past the zero tuplen at the
2254 				 * end of file; back up to fetch last tuple's ending length
2255 				 * word.  If seek fails we must have a completely empty file.
2256 				 */
2257 				nmoved = LogicalTapeBackspace(state->tapeset,
2258 											  state->result_tape,
2259 											  2 * sizeof(unsigned int));
2260 				if (nmoved == 0)
2261 					return false;
2262 				else if (nmoved != 2 * sizeof(unsigned int))
2263 					elog(ERROR, "unexpected tape position");
2264 				state->eof_reached = false;
2265 			}
2266 			else
2267 			{
2268 				/*
2269 				 * Back up and fetch previously-returned tuple's ending length
2270 				 * word.  If seek fails, assume we are at start of file.
2271 				 */
2272 				nmoved = LogicalTapeBackspace(state->tapeset,
2273 											  state->result_tape,
2274 											  sizeof(unsigned int));
2275 				if (nmoved == 0)
2276 					return false;
2277 				else if (nmoved != sizeof(unsigned int))
2278 					elog(ERROR, "unexpected tape position");
2279 				tuplen = getlen(state, state->result_tape, false);
2280 
2281 				/*
2282 				 * Back up to get ending length word of tuple before it.
2283 				 */
2284 				nmoved = LogicalTapeBackspace(state->tapeset,
2285 											  state->result_tape,
2286 											  tuplen + 2 * sizeof(unsigned int));
2287 				if (nmoved == tuplen + sizeof(unsigned int))
2288 				{
2289 					/*
2290 					 * We backed up over the previous tuple, but there was no
2291 					 * ending length word before it.  That means that the prev
2292 					 * tuple is the first tuple in the file.  It is now the
2293 					 * next to read in forward direction (not obviously right,
2294 					 * but that is what in-memory case does).
2295 					 */
2296 					return false;
2297 				}
2298 				else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2299 					elog(ERROR, "bogus tuple length in backward scan");
2300 			}
2301 
2302 			tuplen = getlen(state, state->result_tape, false);
2303 
2304 			/*
2305 			 * Now we have the length of the prior tuple, back up and read it.
2306 			 * Note: READTUP expects we are positioned after the initial
2307 			 * length word of the tuple, so back up to that point.
2308 			 */
2309 			nmoved = LogicalTapeBackspace(state->tapeset,
2310 										  state->result_tape,
2311 										  tuplen);
2312 			if (nmoved != tuplen)
2313 				elog(ERROR, "bogus tuple length in backward scan");
2314 			READTUP(state, stup, state->result_tape, tuplen);
2315 
2316 			/*
2317 			 * Remember the tuple we return, so that we can recycle its memory
2318 			 * on next call. (This can be NULL, in the Datum case).
2319 			 */
2320 			state->lastReturnedTuple = stup->tuple;
2321 
2322 			return true;
2323 
2324 		case TSS_FINALMERGE:
2325 			Assert(forward);
2326 			/* We are managing memory ourselves, with the slab allocator. */
2327 			Assert(state->slabAllocatorUsed);
2328 
2329 			/*
2330 			 * The slab slot holding the tuple that we returned in previous
2331 			 * gettuple call can now be reused.
2332 			 */
2333 			if (state->lastReturnedTuple)
2334 			{
2335 				RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2336 				state->lastReturnedTuple = NULL;
2337 			}
2338 
2339 			/*
2340 			 * This code should match the inner loop of mergeonerun().
2341 			 */
2342 			if (state->memtupcount > 0)
2343 			{
2344 				int			srcTape = state->memtuples[0].srctape;
2345 				SortTuple	newtup;
2346 
2347 				*stup = state->memtuples[0];
2348 
2349 				/*
2350 				 * Remember the tuple we return, so that we can recycle its
2351 				 * memory on next call. (This can be NULL, in the Datum case).
2352 				 */
2353 				state->lastReturnedTuple = stup->tuple;
2354 
2355 				/*
2356 				 * Pull next tuple from tape, and replace the returned tuple
2357 				 * at top of the heap with it.
2358 				 */
2359 				if (!mergereadnext(state, srcTape, &newtup))
2360 				{
2361 					/*
2362 					 * If no more data, we've reached end of run on this tape.
2363 					 * Remove the top node from the heap.
2364 					 */
2365 					tuplesort_heap_delete_top(state);
2366 
2367 					/*
2368 					 * Rewind to free the read buffer.  It'd go away at the
2369 					 * end of the sort anyway, but better to release the
2370 					 * memory early.
2371 					 */
2372 					LogicalTapeRewindForWrite(state->tapeset, srcTape);
2373 					return true;
2374 				}
2375 				newtup.srctape = srcTape;
2376 				tuplesort_heap_replace_top(state, &newtup);
2377 				return true;
2378 			}
2379 			return false;
2380 
2381 		default:
2382 			elog(ERROR, "invalid tuplesort state");
2383 			return false;		/* keep compiler quiet */
2384 	}
2385 }
2386 
2387 /*
2388  * Fetch the next tuple in either forward or back direction.
2389  * If successful, put tuple in slot and return true; else, clear the slot
2390  * and return false.
2391  *
2392  * Caller may optionally be passed back abbreviated value (on true return
2393  * value) when abbreviation was used, which can be used to cheaply avoid
2394  * equality checks that might otherwise be required.  Caller can safely make a
2395  * determination of "non-equal tuple" based on simple binary inequality.  A
2396  * NULL value in leading attribute will set abbreviated value to zeroed
2397  * representation, which caller may rely on in abbreviated inequality check.
2398  *
2399  * If copy is true, the slot receives a tuple that's been copied into the
2400  * caller's memory context, so that it will stay valid regardless of future
2401  * manipulations of the tuplesort's state (up to and including deleting the
2402  * tuplesort).  If copy is false, the slot will just receive a pointer to a
2403  * tuple held within the tuplesort, which is more efficient, but only safe for
2404  * callers that are prepared to have any subsequent manipulation of the
2405  * tuplesort's state invalidate slot contents.
2406  */
2407 bool
tuplesort_gettupleslot(Tuplesortstate * state,bool forward,bool copy,TupleTableSlot * slot,Datum * abbrev)2408 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2409 					   TupleTableSlot *slot, Datum *abbrev)
2410 {
2411 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2412 	SortTuple	stup;
2413 
2414 	if (!tuplesort_gettuple_common(state, forward, &stup))
2415 		stup.tuple = NULL;
2416 
2417 	MemoryContextSwitchTo(oldcontext);
2418 
2419 	if (stup.tuple)
2420 	{
2421 		/* Record abbreviated key for caller */
2422 		if (state->sortKeys->abbrev_converter && abbrev)
2423 			*abbrev = stup.datum1;
2424 
2425 		if (copy)
2426 			stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2427 
2428 		ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2429 		return true;
2430 	}
2431 	else
2432 	{
2433 		ExecClearTuple(slot);
2434 		return false;
2435 	}
2436 }
2437 
2438 /*
2439  * Fetch the next tuple in either forward or back direction.
2440  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2441  * context, and must not be freed by caller.  Caller may not rely on tuple
2442  * remaining valid after any further manipulation of tuplesort.
2443  */
2444 HeapTuple
tuplesort_getheaptuple(Tuplesortstate * state,bool forward)2445 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2446 {
2447 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2448 	SortTuple	stup;
2449 
2450 	if (!tuplesort_gettuple_common(state, forward, &stup))
2451 		stup.tuple = NULL;
2452 
2453 	MemoryContextSwitchTo(oldcontext);
2454 
2455 	return stup.tuple;
2456 }
2457 
2458 /*
2459  * Fetch the next index tuple in either forward or back direction.
2460  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2461  * context, and must not be freed by caller.  Caller may not rely on tuple
2462  * remaining valid after any further manipulation of tuplesort.
2463  */
2464 IndexTuple
tuplesort_getindextuple(Tuplesortstate * state,bool forward)2465 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2466 {
2467 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2468 	SortTuple	stup;
2469 
2470 	if (!tuplesort_gettuple_common(state, forward, &stup))
2471 		stup.tuple = NULL;
2472 
2473 	MemoryContextSwitchTo(oldcontext);
2474 
2475 	return (IndexTuple) stup.tuple;
2476 }
2477 
2478 /*
2479  * Fetch the next Datum in either forward or back direction.
2480  * Returns false if no more datums.
2481  *
2482  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2483  * in caller's context, and is now owned by the caller (this differs from
2484  * similar routines for other types of tuplesorts).
2485  *
2486  * Caller may optionally be passed back abbreviated value (on true return
2487  * value) when abbreviation was used, which can be used to cheaply avoid
2488  * equality checks that might otherwise be required.  Caller can safely make a
2489  * determination of "non-equal tuple" based on simple binary inequality.  A
2490  * NULL value will have a zeroed abbreviated value representation, which caller
2491  * may rely on in abbreviated inequality check.
2492  */
2493 bool
tuplesort_getdatum(Tuplesortstate * state,bool forward,Datum * val,bool * isNull,Datum * abbrev)2494 tuplesort_getdatum(Tuplesortstate *state, bool forward,
2495 				   Datum *val, bool *isNull, Datum *abbrev)
2496 {
2497 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2498 	SortTuple	stup;
2499 
2500 	if (!tuplesort_gettuple_common(state, forward, &stup))
2501 	{
2502 		MemoryContextSwitchTo(oldcontext);
2503 		return false;
2504 	}
2505 
2506 	/* Ensure we copy into caller's memory context */
2507 	MemoryContextSwitchTo(oldcontext);
2508 
2509 	/* Record abbreviated key for caller */
2510 	if (state->sortKeys->abbrev_converter && abbrev)
2511 		*abbrev = stup.datum1;
2512 
2513 	if (stup.isnull1 || !state->tuples)
2514 	{
2515 		*val = stup.datum1;
2516 		*isNull = stup.isnull1;
2517 	}
2518 	else
2519 	{
2520 		/* use stup.tuple because stup.datum1 may be an abbreviation */
2521 		*val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2522 		*isNull = false;
2523 	}
2524 
2525 	return true;
2526 }
2527 
2528 /*
2529  * Advance over N tuples in either forward or back direction,
2530  * without returning any data.  N==0 is a no-op.
2531  * Returns true if successful, false if ran out of tuples.
2532  */
2533 bool
tuplesort_skiptuples(Tuplesortstate * state,int64 ntuples,bool forward)2534 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2535 {
2536 	MemoryContext oldcontext;
2537 
2538 	/*
2539 	 * We don't actually support backwards skip yet, because no callers need
2540 	 * it.  The API is designed to allow for that later, though.
2541 	 */
2542 	Assert(forward);
2543 	Assert(ntuples >= 0);
2544 	Assert(!WORKER(state));
2545 
2546 	switch (state->status)
2547 	{
2548 		case TSS_SORTEDINMEM:
2549 			if (state->memtupcount - state->current >= ntuples)
2550 			{
2551 				state->current += ntuples;
2552 				return true;
2553 			}
2554 			state->current = state->memtupcount;
2555 			state->eof_reached = true;
2556 
2557 			/*
2558 			 * Complain if caller tries to retrieve more tuples than
2559 			 * originally asked for in a bounded sort.  This is because
2560 			 * returning EOF here might be the wrong thing.
2561 			 */
2562 			if (state->bounded && state->current >= state->bound)
2563 				elog(ERROR, "retrieved too many tuples in a bounded sort");
2564 
2565 			return false;
2566 
2567 		case TSS_SORTEDONTAPE:
2568 		case TSS_FINALMERGE:
2569 
2570 			/*
2571 			 * We could probably optimize these cases better, but for now it's
2572 			 * not worth the trouble.
2573 			 */
2574 			oldcontext = MemoryContextSwitchTo(state->sortcontext);
2575 			while (ntuples-- > 0)
2576 			{
2577 				SortTuple	stup;
2578 
2579 				if (!tuplesort_gettuple_common(state, forward, &stup))
2580 				{
2581 					MemoryContextSwitchTo(oldcontext);
2582 					return false;
2583 				}
2584 				CHECK_FOR_INTERRUPTS();
2585 			}
2586 			MemoryContextSwitchTo(oldcontext);
2587 			return true;
2588 
2589 		default:
2590 			elog(ERROR, "invalid tuplesort state");
2591 			return false;		/* keep compiler quiet */
2592 	}
2593 }
2594 
2595 /*
2596  * tuplesort_merge_order - report merge order we'll use for given memory
2597  * (note: "merge order" just means the number of input tapes in the merge).
2598  *
2599  * This is exported for use by the planner.  allowedMem is in bytes.
2600  */
2601 int
tuplesort_merge_order(int64 allowedMem)2602 tuplesort_merge_order(int64 allowedMem)
2603 {
2604 	int			mOrder;
2605 
2606 	/*
2607 	 * We need one tape for each merge input, plus another one for the output,
2608 	 * and each of these tapes needs buffer space.  In addition we want
2609 	 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2610 	 * count).
2611 	 *
2612 	 * Note: you might be thinking we need to account for the memtuples[]
2613 	 * array in this calculation, but we effectively treat that as part of the
2614 	 * MERGE_BUFFER_SIZE workspace.
2615 	 */
2616 	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2617 		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2618 
2619 	/*
2620 	 * Even in minimum memory, use at least a MINORDER merge.  On the other
2621 	 * hand, even when we have lots of memory, do not use more than a MAXORDER
2622 	 * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
2623 	 * additional tape reduces the amount of memory available to build runs,
2624 	 * which in turn can cause the same sort to need more runs, which makes
2625 	 * merging slower even if it can still be done in a single pass.  Also,
2626 	 * high order merges are quite slow due to CPU cache effects; it can be
2627 	 * faster to pay the I/O cost of a polyphase merge than to perform a
2628 	 * single merge pass across many hundreds of tapes.
2629 	 */
2630 	mOrder = Max(mOrder, MINORDER);
2631 	mOrder = Min(mOrder, MAXORDER);
2632 
2633 	return mOrder;
2634 }
2635 
2636 /*
2637  * inittapes - initialize for tape sorting.
2638  *
2639  * This is called only if we have found we won't sort in memory.
2640  */
2641 static void
inittapes(Tuplesortstate * state,bool mergeruns)2642 inittapes(Tuplesortstate *state, bool mergeruns)
2643 {
2644 	int			maxTapes,
2645 				j;
2646 
2647 	Assert(!LEADER(state));
2648 
2649 	if (mergeruns)
2650 	{
2651 		/* Compute number of tapes to use: merge order plus 1 */
2652 		maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2653 	}
2654 	else
2655 	{
2656 		/* Workers can sometimes produce single run, output without merge */
2657 		Assert(WORKER(state));
2658 		maxTapes = MINORDER + 1;
2659 	}
2660 
2661 #ifdef TRACE_SORT
2662 	if (trace_sort)
2663 		elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2664 			 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2665 #endif
2666 
2667 	/* Create the tape set and allocate the per-tape data arrays */
2668 	inittapestate(state, maxTapes);
2669 	state->tapeset =
2670 		LogicalTapeSetCreate(maxTapes, false, NULL,
2671 							 state->shared ? &state->shared->fileset : NULL,
2672 							 state->worker);
2673 
2674 	state->currentRun = 0;
2675 
2676 	/*
2677 	 * Initialize variables of Algorithm D (step D1).
2678 	 */
2679 	for (j = 0; j < maxTapes; j++)
2680 	{
2681 		state->tp_fib[j] = 1;
2682 		state->tp_runs[j] = 0;
2683 		state->tp_dummy[j] = 1;
2684 		state->tp_tapenum[j] = j;
2685 	}
2686 	state->tp_fib[state->tapeRange] = 0;
2687 	state->tp_dummy[state->tapeRange] = 0;
2688 
2689 	state->Level = 1;
2690 	state->destTape = 0;
2691 
2692 	state->status = TSS_BUILDRUNS;
2693 }
2694 
2695 /*
2696  * inittapestate - initialize generic tape management state
2697  */
2698 static void
inittapestate(Tuplesortstate * state,int maxTapes)2699 inittapestate(Tuplesortstate *state, int maxTapes)
2700 {
2701 	int64		tapeSpace;
2702 
2703 	/*
2704 	 * Decrease availMem to reflect the space needed for tape buffers; but
2705 	 * don't decrease it to the point that we have no room for tuples. (That
2706 	 * case is only likely to occur if sorting pass-by-value Datums; in all
2707 	 * other scenarios the memtuples[] array is unlikely to occupy more than
2708 	 * half of allowedMem.  In the pass-by-value case it's not important to
2709 	 * account for tuple space, so we don't care if LACKMEM becomes
2710 	 * inaccurate.)
2711 	 */
2712 	tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2713 
2714 	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2715 		USEMEM(state, tapeSpace);
2716 
2717 	/*
2718 	 * Make sure that the temp file(s) underlying the tape set are created in
2719 	 * suitable temp tablespaces.  For parallel sorts, this should have been
2720 	 * called already, but it doesn't matter if it is called a second time.
2721 	 */
2722 	PrepareTempTablespaces();
2723 
2724 	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2725 	state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2726 	state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2727 	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2728 	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2729 
2730 	/* Record # of tapes allocated (for duration of sort) */
2731 	state->maxTapes = maxTapes;
2732 	/* Record maximum # of tapes usable as inputs when merging */
2733 	state->tapeRange = maxTapes - 1;
2734 }
2735 
2736 /*
2737  * selectnewtape -- select new tape for new initial run.
2738  *
2739  * This is called after finishing a run when we know another run
2740  * must be started.  This implements steps D3, D4 of Algorithm D.
2741  */
2742 static void
selectnewtape(Tuplesortstate * state)2743 selectnewtape(Tuplesortstate *state)
2744 {
2745 	int			j;
2746 	int			a;
2747 
2748 	/* Step D3: advance j (destTape) */
2749 	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2750 	{
2751 		state->destTape++;
2752 		return;
2753 	}
2754 	if (state->tp_dummy[state->destTape] != 0)
2755 	{
2756 		state->destTape = 0;
2757 		return;
2758 	}
2759 
2760 	/* Step D4: increase level */
2761 	state->Level++;
2762 	a = state->tp_fib[0];
2763 	for (j = 0; j < state->tapeRange; j++)
2764 	{
2765 		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2766 		state->tp_fib[j] = a + state->tp_fib[j + 1];
2767 	}
2768 	state->destTape = 0;
2769 }
2770 
2771 /*
2772  * Initialize the slab allocation arena, for the given number of slots.
2773  */
2774 static void
init_slab_allocator(Tuplesortstate * state,int numSlots)2775 init_slab_allocator(Tuplesortstate *state, int numSlots)
2776 {
2777 	if (numSlots > 0)
2778 	{
2779 		char	   *p;
2780 		int			i;
2781 
2782 		state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2783 		state->slabMemoryEnd = state->slabMemoryBegin +
2784 			numSlots * SLAB_SLOT_SIZE;
2785 		state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2786 		USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2787 
2788 		p = state->slabMemoryBegin;
2789 		for (i = 0; i < numSlots - 1; i++)
2790 		{
2791 			((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2792 			p += SLAB_SLOT_SIZE;
2793 		}
2794 		((SlabSlot *) p)->nextfree = NULL;
2795 	}
2796 	else
2797 	{
2798 		state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2799 		state->slabFreeHead = NULL;
2800 	}
2801 	state->slabAllocatorUsed = true;
2802 }
2803 
2804 /*
2805  * mergeruns -- merge all the completed initial runs.
2806  *
2807  * This implements steps D5, D6 of Algorithm D.  All input data has
2808  * already been written to initial runs on tape (see dumptuples).
2809  */
2810 static void
mergeruns(Tuplesortstate * state)2811 mergeruns(Tuplesortstate *state)
2812 {
2813 	int			tapenum,
2814 				svTape,
2815 				svRuns,
2816 				svDummy;
2817 	int			numTapes;
2818 	int			numInputTapes;
2819 
2820 	Assert(state->status == TSS_BUILDRUNS);
2821 	Assert(state->memtupcount == 0);
2822 
2823 	if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2824 	{
2825 		/*
2826 		 * If there are multiple runs to be merged, when we go to read back
2827 		 * tuples from disk, abbreviated keys will not have been stored, and
2828 		 * we don't care to regenerate them.  Disable abbreviation from this
2829 		 * point on.
2830 		 */
2831 		state->sortKeys->abbrev_converter = NULL;
2832 		state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2833 
2834 		/* Not strictly necessary, but be tidy */
2835 		state->sortKeys->abbrev_abort = NULL;
2836 		state->sortKeys->abbrev_full_comparator = NULL;
2837 	}
2838 
2839 	/*
2840 	 * Reset tuple memory.  We've freed all the tuples that we previously
2841 	 * allocated.  We will use the slab allocator from now on.
2842 	 */
2843 	MemoryContextResetOnly(state->tuplecontext);
2844 
2845 	/*
2846 	 * We no longer need a large memtuples array.  (We will allocate a smaller
2847 	 * one for the heap later.)
2848 	 */
2849 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2850 	pfree(state->memtuples);
2851 	state->memtuples = NULL;
2852 
2853 	/*
2854 	 * If we had fewer runs than tapes, refund the memory that we imagined we
2855 	 * would need for the tape buffers of the unused tapes.
2856 	 *
2857 	 * numTapes and numInputTapes reflect the actual number of tapes we will
2858 	 * use.  Note that the output tape's tape number is maxTapes - 1, so the
2859 	 * tape numbers of the used tapes are not consecutive, and you cannot just
2860 	 * loop from 0 to numTapes to visit all used tapes!
2861 	 */
2862 	if (state->Level == 1)
2863 	{
2864 		numInputTapes = state->currentRun;
2865 		numTapes = numInputTapes + 1;
2866 		FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2867 	}
2868 	else
2869 	{
2870 		numInputTapes = state->tapeRange;
2871 		numTapes = state->maxTapes;
2872 	}
2873 
2874 	/*
2875 	 * Initialize the slab allocator.  We need one slab slot per input tape,
2876 	 * for the tuples in the heap, plus one to hold the tuple last returned
2877 	 * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
2878 	 * however, we don't need to do allocate anything.)
2879 	 *
2880 	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2881 	 * to track memory usage of individual tuples.
2882 	 */
2883 	if (state->tuples)
2884 		init_slab_allocator(state, numInputTapes + 1);
2885 	else
2886 		init_slab_allocator(state, 0);
2887 
2888 	/*
2889 	 * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
2890 	 * from each input tape.
2891 	 */
2892 	state->memtupsize = numInputTapes;
2893 	state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
2894 														numInputTapes * sizeof(SortTuple));
2895 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2896 
2897 	/*
2898 	 * Use all the remaining memory we have available for read buffers among
2899 	 * the input tapes.
2900 	 *
2901 	 * We don't try to "rebalance" the memory among tapes, when we start a new
2902 	 * merge phase, even if some tapes are inactive in the new phase.  That
2903 	 * would be hard, because logtape.c doesn't know where one run ends and
2904 	 * another begins.  When a new merge phase begins, and a tape doesn't
2905 	 * participate in it, its buffer nevertheless already contains tuples from
2906 	 * the next run on same tape, so we cannot release the buffer.  That's OK
2907 	 * in practice, merge performance isn't that sensitive to the amount of
2908 	 * buffers used, and most merge phases use all or almost all tapes,
2909 	 * anyway.
2910 	 */
2911 #ifdef TRACE_SORT
2912 	if (trace_sort)
2913 		elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2914 			 state->worker, state->availMem / 1024, numInputTapes);
2915 #endif
2916 
2917 	state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2918 	USEMEM(state, state->read_buffer_size * numInputTapes);
2919 
2920 	/* End of step D2: rewind all output tapes to prepare for merging */
2921 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2922 		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2923 
2924 	for (;;)
2925 	{
2926 		/*
2927 		 * At this point we know that tape[T] is empty.  If there's just one
2928 		 * (real or dummy) run left on each input tape, then only one merge
2929 		 * pass remains.  If we don't have to produce a materialized sorted
2930 		 * tape, we can stop at this point and do the final merge on-the-fly.
2931 		 */
2932 		if (!state->randomAccess && !WORKER(state))
2933 		{
2934 			bool		allOneRun = true;
2935 
2936 			Assert(state->tp_runs[state->tapeRange] == 0);
2937 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2938 			{
2939 				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2940 				{
2941 					allOneRun = false;
2942 					break;
2943 				}
2944 			}
2945 			if (allOneRun)
2946 			{
2947 				/* Tell logtape.c we won't be writing anymore */
2948 				LogicalTapeSetForgetFreeSpace(state->tapeset);
2949 				/* Initialize for the final merge pass */
2950 				beginmerge(state);
2951 				state->status = TSS_FINALMERGE;
2952 				return;
2953 			}
2954 		}
2955 
2956 		/* Step D5: merge runs onto tape[T] until tape[P] is empty */
2957 		while (state->tp_runs[state->tapeRange - 1] ||
2958 			   state->tp_dummy[state->tapeRange - 1])
2959 		{
2960 			bool		allDummy = true;
2961 
2962 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2963 			{
2964 				if (state->tp_dummy[tapenum] == 0)
2965 				{
2966 					allDummy = false;
2967 					break;
2968 				}
2969 			}
2970 
2971 			if (allDummy)
2972 			{
2973 				state->tp_dummy[state->tapeRange]++;
2974 				for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2975 					state->tp_dummy[tapenum]--;
2976 			}
2977 			else
2978 				mergeonerun(state);
2979 		}
2980 
2981 		/* Step D6: decrease level */
2982 		if (--state->Level == 0)
2983 			break;
2984 		/* rewind output tape T to use as new input */
2985 		LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2986 								 state->read_buffer_size);
2987 		/* rewind used-up input tape P, and prepare it for write pass */
2988 		LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2989 		state->tp_runs[state->tapeRange - 1] = 0;
2990 
2991 		/*
2992 		 * reassign tape units per step D6; note we no longer care about A[]
2993 		 */
2994 		svTape = state->tp_tapenum[state->tapeRange];
2995 		svDummy = state->tp_dummy[state->tapeRange];
2996 		svRuns = state->tp_runs[state->tapeRange];
2997 		for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2998 		{
2999 			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
3000 			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
3001 			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
3002 		}
3003 		state->tp_tapenum[0] = svTape;
3004 		state->tp_dummy[0] = svDummy;
3005 		state->tp_runs[0] = svRuns;
3006 	}
3007 
3008 	/*
3009 	 * Done.  Knuth says that the result is on TAPE[1], but since we exited
3010 	 * the loop without performing the last iteration of step D6, we have not
3011 	 * rearranged the tape unit assignment, and therefore the result is on
3012 	 * TAPE[T].  We need to do it this way so that we can freeze the final
3013 	 * output tape while rewinding it.  The last iteration of step D6 would be
3014 	 * a waste of cycles anyway...
3015 	 */
3016 	state->result_tape = state->tp_tapenum[state->tapeRange];
3017 	if (!WORKER(state))
3018 		LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
3019 	else
3020 		worker_freeze_result_tape(state);
3021 	state->status = TSS_SORTEDONTAPE;
3022 
3023 	/* Release the read buffers of all the other tapes, by rewinding them. */
3024 	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
3025 	{
3026 		if (tapenum != state->result_tape)
3027 			LogicalTapeRewindForWrite(state->tapeset, tapenum);
3028 	}
3029 }
3030 
3031 /*
3032  * Merge one run from each input tape, except ones with dummy runs.
3033  *
3034  * This is the inner loop of Algorithm D step D5.  We know that the
3035  * output tape is TAPE[T].
3036  */
3037 static void
mergeonerun(Tuplesortstate * state)3038 mergeonerun(Tuplesortstate *state)
3039 {
3040 	int			destTape = state->tp_tapenum[state->tapeRange];
3041 	int			srcTape;
3042 
3043 	/*
3044 	 * Start the merge by loading one tuple from each active source tape into
3045 	 * the heap.  We can also decrease the input run/dummy run counts.
3046 	 */
3047 	beginmerge(state);
3048 
3049 	/*
3050 	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
3051 	 * out, and replacing it with next tuple from same tape (if there is
3052 	 * another one).
3053 	 */
3054 	while (state->memtupcount > 0)
3055 	{
3056 		SortTuple	stup;
3057 
3058 		/* write the tuple to destTape */
3059 		srcTape = state->memtuples[0].srctape;
3060 		WRITETUP(state, destTape, &state->memtuples[0]);
3061 
3062 		/* recycle the slot of the tuple we just wrote out, for the next read */
3063 		if (state->memtuples[0].tuple)
3064 			RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
3065 
3066 		/*
3067 		 * pull next tuple from the tape, and replace the written-out tuple in
3068 		 * the heap with it.
3069 		 */
3070 		if (mergereadnext(state, srcTape, &stup))
3071 		{
3072 			stup.srctape = srcTape;
3073 			tuplesort_heap_replace_top(state, &stup);
3074 		}
3075 		else
3076 			tuplesort_heap_delete_top(state);
3077 	}
3078 
3079 	/*
3080 	 * When the heap empties, we're done.  Write an end-of-run marker on the
3081 	 * output tape, and increment its count of real runs.
3082 	 */
3083 	markrunend(state, destTape);
3084 	state->tp_runs[state->tapeRange]++;
3085 
3086 #ifdef TRACE_SORT
3087 	if (trace_sort)
3088 		elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
3089 			 state->activeTapes, pg_rusage_show(&state->ru_start));
3090 #endif
3091 }
3092 
3093 /*
3094  * beginmerge - initialize for a merge pass
3095  *
3096  * We decrease the counts of real and dummy runs for each tape, and mark
3097  * which tapes contain active input runs in mergeactive[].  Then, fill the
3098  * merge heap with the first tuple from each active tape.
3099  */
3100 static void
beginmerge(Tuplesortstate * state)3101 beginmerge(Tuplesortstate *state)
3102 {
3103 	int			activeTapes;
3104 	int			tapenum;
3105 	int			srcTape;
3106 
3107 	/* Heap should be empty here */
3108 	Assert(state->memtupcount == 0);
3109 
3110 	/* Adjust run counts and mark the active tapes */
3111 	memset(state->mergeactive, 0,
3112 		   state->maxTapes * sizeof(*state->mergeactive));
3113 	activeTapes = 0;
3114 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
3115 	{
3116 		if (state->tp_dummy[tapenum] > 0)
3117 			state->tp_dummy[tapenum]--;
3118 		else
3119 		{
3120 			Assert(state->tp_runs[tapenum] > 0);
3121 			state->tp_runs[tapenum]--;
3122 			srcTape = state->tp_tapenum[tapenum];
3123 			state->mergeactive[srcTape] = true;
3124 			activeTapes++;
3125 		}
3126 	}
3127 	Assert(activeTapes > 0);
3128 	state->activeTapes = activeTapes;
3129 
3130 	/* Load the merge heap with the first tuple from each input tape */
3131 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
3132 	{
3133 		SortTuple	tup;
3134 
3135 		if (mergereadnext(state, srcTape, &tup))
3136 		{
3137 			tup.srctape = srcTape;
3138 			tuplesort_heap_insert(state, &tup);
3139 		}
3140 	}
3141 }
3142 
3143 /*
3144  * mergereadnext - read next tuple from one merge input tape
3145  *
3146  * Returns false on EOF.
3147  */
3148 static bool
mergereadnext(Tuplesortstate * state,int srcTape,SortTuple * stup)3149 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
3150 {
3151 	unsigned int tuplen;
3152 
3153 	if (!state->mergeactive[srcTape])
3154 		return false;			/* tape's run is already exhausted */
3155 
3156 	/* read next tuple, if any */
3157 	if ((tuplen = getlen(state, srcTape, true)) == 0)
3158 	{
3159 		state->mergeactive[srcTape] = false;
3160 		return false;
3161 	}
3162 	READTUP(state, stup, srcTape, tuplen);
3163 
3164 	return true;
3165 }
3166 
3167 /*
3168  * dumptuples - remove tuples from memtuples and write initial run to tape
3169  *
3170  * When alltuples = true, dump everything currently in memory.  (This case is
3171  * only used at end of input data.)
3172  */
3173 static void
dumptuples(Tuplesortstate * state,bool alltuples)3174 dumptuples(Tuplesortstate *state, bool alltuples)
3175 {
3176 	int			memtupwrite;
3177 	int			i;
3178 
3179 	/*
3180 	 * Nothing to do if we still fit in available memory and have array slots,
3181 	 * unless this is the final call during initial run generation.
3182 	 */
3183 	if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
3184 		!alltuples)
3185 		return;
3186 
3187 	/*
3188 	 * Final call might require no sorting, in rare cases where we just so
3189 	 * happen to have previously LACKMEM()'d at the point where exactly all
3190 	 * remaining tuples are loaded into memory, just before input was
3191 	 * exhausted.
3192 	 *
3193 	 * In general, short final runs are quite possible.  Rather than allowing
3194 	 * a special case where there was a superfluous selectnewtape() call (i.e.
3195 	 * a call with no subsequent run actually written to destTape), we prefer
3196 	 * to write out a 0 tuple run.
3197 	 *
3198 	 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
3199 	 * the tape inactive for the merge when called from beginmerge().  This
3200 	 * case is therefore similar to the case where mergeonerun() finds a dummy
3201 	 * run for the tape, and so doesn't need to merge a run from the tape (or
3202 	 * conceptually "merges" the dummy run, if you prefer).  According to
3203 	 * Knuth, Algorithm D "isn't strictly optimal" in its method of
3204 	 * distribution and dummy run assignment; this edge case seems very
3205 	 * unlikely to make that appreciably worse.
3206 	 */
3207 	Assert(state->status == TSS_BUILDRUNS);
3208 
3209 	/*
3210 	 * It seems unlikely that this limit will ever be exceeded, but take no
3211 	 * chances
3212 	 */
3213 	if (state->currentRun == INT_MAX)
3214 		ereport(ERROR,
3215 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
3216 				 errmsg("cannot have more than %d runs for an external sort",
3217 						INT_MAX)));
3218 
3219 	state->currentRun++;
3220 
3221 #ifdef TRACE_SORT
3222 	if (trace_sort)
3223 		elog(LOG, "worker %d starting quicksort of run %d: %s",
3224 			 state->worker, state->currentRun,
3225 			 pg_rusage_show(&state->ru_start));
3226 #endif
3227 
3228 	/*
3229 	 * Sort all tuples accumulated within the allowed amount of memory for
3230 	 * this run using quicksort
3231 	 */
3232 	tuplesort_sort_memtuples(state);
3233 
3234 #ifdef TRACE_SORT
3235 	if (trace_sort)
3236 		elog(LOG, "worker %d finished quicksort of run %d: %s",
3237 			 state->worker, state->currentRun,
3238 			 pg_rusage_show(&state->ru_start));
3239 #endif
3240 
3241 	memtupwrite = state->memtupcount;
3242 	for (i = 0; i < memtupwrite; i++)
3243 	{
3244 		WRITETUP(state, state->tp_tapenum[state->destTape],
3245 				 &state->memtuples[i]);
3246 		state->memtupcount--;
3247 	}
3248 
3249 	/*
3250 	 * Reset tuple memory.  We've freed all of the tuples that we previously
3251 	 * allocated.  It's important to avoid fragmentation when there is a stark
3252 	 * change in the sizes of incoming tuples.  Fragmentation due to
3253 	 * AllocSetFree's bucketing by size class might be particularly bad if
3254 	 * this step wasn't taken.
3255 	 */
3256 	MemoryContextReset(state->tuplecontext);
3257 
3258 	markrunend(state, state->tp_tapenum[state->destTape]);
3259 	state->tp_runs[state->destTape]++;
3260 	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3261 
3262 #ifdef TRACE_SORT
3263 	if (trace_sort)
3264 		elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3265 			 state->worker, state->currentRun, state->destTape,
3266 			 pg_rusage_show(&state->ru_start));
3267 #endif
3268 
3269 	if (!alltuples)
3270 		selectnewtape(state);
3271 }
3272 
3273 /*
3274  * tuplesort_rescan		- rewind and replay the scan
3275  */
3276 void
tuplesort_rescan(Tuplesortstate * state)3277 tuplesort_rescan(Tuplesortstate *state)
3278 {
3279 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3280 
3281 	Assert(state->randomAccess);
3282 
3283 	switch (state->status)
3284 	{
3285 		case TSS_SORTEDINMEM:
3286 			state->current = 0;
3287 			state->eof_reached = false;
3288 			state->markpos_offset = 0;
3289 			state->markpos_eof = false;
3290 			break;
3291 		case TSS_SORTEDONTAPE:
3292 			LogicalTapeRewindForRead(state->tapeset,
3293 									 state->result_tape,
3294 									 0);
3295 			state->eof_reached = false;
3296 			state->markpos_block = 0L;
3297 			state->markpos_offset = 0;
3298 			state->markpos_eof = false;
3299 			break;
3300 		default:
3301 			elog(ERROR, "invalid tuplesort state");
3302 			break;
3303 	}
3304 
3305 	MemoryContextSwitchTo(oldcontext);
3306 }
3307 
3308 /*
3309  * tuplesort_markpos	- saves current position in the merged sort file
3310  */
3311 void
tuplesort_markpos(Tuplesortstate * state)3312 tuplesort_markpos(Tuplesortstate *state)
3313 {
3314 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3315 
3316 	Assert(state->randomAccess);
3317 
3318 	switch (state->status)
3319 	{
3320 		case TSS_SORTEDINMEM:
3321 			state->markpos_offset = state->current;
3322 			state->markpos_eof = state->eof_reached;
3323 			break;
3324 		case TSS_SORTEDONTAPE:
3325 			LogicalTapeTell(state->tapeset,
3326 							state->result_tape,
3327 							&state->markpos_block,
3328 							&state->markpos_offset);
3329 			state->markpos_eof = state->eof_reached;
3330 			break;
3331 		default:
3332 			elog(ERROR, "invalid tuplesort state");
3333 			break;
3334 	}
3335 
3336 	MemoryContextSwitchTo(oldcontext);
3337 }
3338 
3339 /*
3340  * tuplesort_restorepos - restores current position in merged sort file to
3341  *						  last saved position
3342  */
3343 void
tuplesort_restorepos(Tuplesortstate * state)3344 tuplesort_restorepos(Tuplesortstate *state)
3345 {
3346 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3347 
3348 	Assert(state->randomAccess);
3349 
3350 	switch (state->status)
3351 	{
3352 		case TSS_SORTEDINMEM:
3353 			state->current = state->markpos_offset;
3354 			state->eof_reached = state->markpos_eof;
3355 			break;
3356 		case TSS_SORTEDONTAPE:
3357 			LogicalTapeSeek(state->tapeset,
3358 							state->result_tape,
3359 							state->markpos_block,
3360 							state->markpos_offset);
3361 			state->eof_reached = state->markpos_eof;
3362 			break;
3363 		default:
3364 			elog(ERROR, "invalid tuplesort state");
3365 			break;
3366 	}
3367 
3368 	MemoryContextSwitchTo(oldcontext);
3369 }
3370 
3371 /*
3372  * tuplesort_get_stats - extract summary statistics
3373  *
3374  * This can be called after tuplesort_performsort() finishes to obtain
3375  * printable summary information about how the sort was performed.
3376  */
3377 void
tuplesort_get_stats(Tuplesortstate * state,TuplesortInstrumentation * stats)3378 tuplesort_get_stats(Tuplesortstate *state,
3379 					TuplesortInstrumentation *stats)
3380 {
3381 	/*
3382 	 * Note: it might seem we should provide both memory and disk usage for a
3383 	 * disk-based sort.  However, the current code doesn't track memory space
3384 	 * accurately once we have begun to return tuples to the caller (since we
3385 	 * don't account for pfree's the caller is expected to do), so we cannot
3386 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
3387 	 * to fix.  Is it worth creating an API for the memory context code to
3388 	 * tell us how much is actually used in sortcontext?
3389 	 */
3390 	tuplesort_updatemax(state);
3391 
3392 	if (state->isMaxSpaceDisk)
3393 		stats->spaceType = SORT_SPACE_TYPE_DISK;
3394 	else
3395 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3396 	stats->spaceUsed = (state->maxSpace + 1023) / 1024;
3397 
3398 	switch (state->maxSpaceStatus)
3399 	{
3400 		case TSS_SORTEDINMEM:
3401 			if (state->boundUsed)
3402 				stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3403 			else
3404 				stats->sortMethod = SORT_TYPE_QUICKSORT;
3405 			break;
3406 		case TSS_SORTEDONTAPE:
3407 			stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3408 			break;
3409 		case TSS_FINALMERGE:
3410 			stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3411 			break;
3412 		default:
3413 			stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3414 			break;
3415 	}
3416 }
3417 
3418 /*
3419  * Convert TuplesortMethod to a string.
3420  */
3421 const char *
tuplesort_method_name(TuplesortMethod m)3422 tuplesort_method_name(TuplesortMethod m)
3423 {
3424 	switch (m)
3425 	{
3426 		case SORT_TYPE_STILL_IN_PROGRESS:
3427 			return "still in progress";
3428 		case SORT_TYPE_TOP_N_HEAPSORT:
3429 			return "top-N heapsort";
3430 		case SORT_TYPE_QUICKSORT:
3431 			return "quicksort";
3432 		case SORT_TYPE_EXTERNAL_SORT:
3433 			return "external sort";
3434 		case SORT_TYPE_EXTERNAL_MERGE:
3435 			return "external merge";
3436 	}
3437 
3438 	return "unknown";
3439 }
3440 
3441 /*
3442  * Convert TuplesortSpaceType to a string.
3443  */
3444 const char *
tuplesort_space_type_name(TuplesortSpaceType t)3445 tuplesort_space_type_name(TuplesortSpaceType t)
3446 {
3447 	Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3448 	return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3449 }
3450 
3451 
3452 /*
3453  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3454  */
3455 
3456 /*
3457  * Convert the existing unordered array of SortTuples to a bounded heap,
3458  * discarding all but the smallest "state->bound" tuples.
3459  *
3460  * When working with a bounded heap, we want to keep the largest entry
3461  * at the root (array entry zero), instead of the smallest as in the normal
3462  * sort case.  This allows us to discard the largest entry cheaply.
3463  * Therefore, we temporarily reverse the sort direction.
3464  */
3465 static void
make_bounded_heap(Tuplesortstate * state)3466 make_bounded_heap(Tuplesortstate *state)
3467 {
3468 	int			tupcount = state->memtupcount;
3469 	int			i;
3470 
3471 	Assert(state->status == TSS_INITIAL);
3472 	Assert(state->bounded);
3473 	Assert(tupcount >= state->bound);
3474 	Assert(SERIAL(state));
3475 
3476 	/* Reverse sort direction so largest entry will be at root */
3477 	reversedirection(state);
3478 
3479 	state->memtupcount = 0;		/* make the heap empty */
3480 	for (i = 0; i < tupcount; i++)
3481 	{
3482 		if (state->memtupcount < state->bound)
3483 		{
3484 			/* Insert next tuple into heap */
3485 			/* Must copy source tuple to avoid possible overwrite */
3486 			SortTuple	stup = state->memtuples[i];
3487 
3488 			tuplesort_heap_insert(state, &stup);
3489 		}
3490 		else
3491 		{
3492 			/*
3493 			 * The heap is full.  Replace the largest entry with the new
3494 			 * tuple, or just discard it, if it's larger than anything already
3495 			 * in the heap.
3496 			 */
3497 			if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3498 			{
3499 				free_sort_tuple(state, &state->memtuples[i]);
3500 				CHECK_FOR_INTERRUPTS();
3501 			}
3502 			else
3503 				tuplesort_heap_replace_top(state, &state->memtuples[i]);
3504 		}
3505 	}
3506 
3507 	Assert(state->memtupcount == state->bound);
3508 	state->status = TSS_BOUNDED;
3509 }
3510 
3511 /*
3512  * Convert the bounded heap to a properly-sorted array
3513  */
3514 static void
sort_bounded_heap(Tuplesortstate * state)3515 sort_bounded_heap(Tuplesortstate *state)
3516 {
3517 	int			tupcount = state->memtupcount;
3518 
3519 	Assert(state->status == TSS_BOUNDED);
3520 	Assert(state->bounded);
3521 	Assert(tupcount == state->bound);
3522 	Assert(SERIAL(state));
3523 
3524 	/*
3525 	 * We can unheapify in place because each delete-top call will remove the
3526 	 * largest entry, which we can promptly store in the newly freed slot at
3527 	 * the end.  Once we're down to a single-entry heap, we're done.
3528 	 */
3529 	while (state->memtupcount > 1)
3530 	{
3531 		SortTuple	stup = state->memtuples[0];
3532 
3533 		/* this sifts-up the next-largest entry and decreases memtupcount */
3534 		tuplesort_heap_delete_top(state);
3535 		state->memtuples[state->memtupcount] = stup;
3536 	}
3537 	state->memtupcount = tupcount;
3538 
3539 	/*
3540 	 * Reverse sort direction back to the original state.  This is not
3541 	 * actually necessary but seems like a good idea for tidiness.
3542 	 */
3543 	reversedirection(state);
3544 
3545 	state->status = TSS_SORTEDINMEM;
3546 	state->boundUsed = true;
3547 }
3548 
3549 /*
3550  * Sort all memtuples using specialized qsort() routines.
3551  *
3552  * Quicksort is used for small in-memory sorts, and external sort runs.
3553  */
3554 static void
tuplesort_sort_memtuples(Tuplesortstate * state)3555 tuplesort_sort_memtuples(Tuplesortstate *state)
3556 {
3557 	Assert(!LEADER(state));
3558 
3559 	if (state->memtupcount > 1)
3560 	{
3561 		/* Can we use the single-key sort function? */
3562 		if (state->onlyKey != NULL)
3563 			qsort_ssup(state->memtuples, state->memtupcount,
3564 					   state->onlyKey);
3565 		else
3566 			qsort_tuple(state->memtuples,
3567 						state->memtupcount,
3568 						state->comparetup,
3569 						state);
3570 	}
3571 }
3572 
3573 /*
3574  * Insert a new tuple into an empty or existing heap, maintaining the
3575  * heap invariant.  Caller is responsible for ensuring there's room.
3576  *
3577  * Note: For some callers, tuple points to a memtuples[] entry above the
3578  * end of the heap.  This is safe as long as it's not immediately adjacent
3579  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3580  * is, it might get overwritten before being moved into the heap!
3581  */
3582 static void
tuplesort_heap_insert(Tuplesortstate * state,SortTuple * tuple)3583 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3584 {
3585 	SortTuple  *memtuples;
3586 	int			j;
3587 
3588 	memtuples = state->memtuples;
3589 	Assert(state->memtupcount < state->memtupsize);
3590 
3591 	CHECK_FOR_INTERRUPTS();
3592 
3593 	/*
3594 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3595 	 * using 1-based array indexes, not 0-based.
3596 	 */
3597 	j = state->memtupcount++;
3598 	while (j > 0)
3599 	{
3600 		int			i = (j - 1) >> 1;
3601 
3602 		if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3603 			break;
3604 		memtuples[j] = memtuples[i];
3605 		j = i;
3606 	}
3607 	memtuples[j] = *tuple;
3608 }
3609 
3610 /*
3611  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
3612  * memtupcount, and sift up to maintain the heap invariant.
3613  *
3614  * The caller has already free'd the tuple the top node points to,
3615  * if necessary.
3616  */
3617 static void
tuplesort_heap_delete_top(Tuplesortstate * state)3618 tuplesort_heap_delete_top(Tuplesortstate *state)
3619 {
3620 	SortTuple  *memtuples = state->memtuples;
3621 	SortTuple  *tuple;
3622 
3623 	if (--state->memtupcount <= 0)
3624 		return;
3625 
3626 	/*
3627 	 * Remove the last tuple in the heap, and re-insert it, by replacing the
3628 	 * current top node with it.
3629 	 */
3630 	tuple = &memtuples[state->memtupcount];
3631 	tuplesort_heap_replace_top(state, tuple);
3632 }
3633 
3634 /*
3635  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
3636  * maintain the heap invariant.
3637  *
3638  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3639  * Heapsort, steps H3-H8).
3640  */
3641 static void
tuplesort_heap_replace_top(Tuplesortstate * state,SortTuple * tuple)3642 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3643 {
3644 	SortTuple  *memtuples = state->memtuples;
3645 	unsigned int i,
3646 				n;
3647 
3648 	Assert(state->memtupcount >= 1);
3649 
3650 	CHECK_FOR_INTERRUPTS();
3651 
3652 	/*
3653 	 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3654 	 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3655 	 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3656 	 */
3657 	n = state->memtupcount;
3658 	i = 0;						/* i is where the "hole" is */
3659 	for (;;)
3660 	{
3661 		unsigned int j = 2 * i + 1;
3662 
3663 		if (j >= n)
3664 			break;
3665 		if (j + 1 < n &&
3666 			COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3667 			j++;
3668 		if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3669 			break;
3670 		memtuples[i] = memtuples[j];
3671 		i = j;
3672 	}
3673 	memtuples[i] = *tuple;
3674 }
3675 
3676 /*
3677  * Function to reverse the sort direction from its current state
3678  *
3679  * It is not safe to call this when performing hash tuplesorts
3680  */
3681 static void
reversedirection(Tuplesortstate * state)3682 reversedirection(Tuplesortstate *state)
3683 {
3684 	SortSupport sortKey = state->sortKeys;
3685 	int			nkey;
3686 
3687 	for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3688 	{
3689 		sortKey->ssup_reverse = !sortKey->ssup_reverse;
3690 		sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3691 	}
3692 }
3693 
3694 
3695 /*
3696  * Tape interface routines
3697  */
3698 
3699 static unsigned int
getlen(Tuplesortstate * state,int tapenum,bool eofOK)3700 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3701 {
3702 	unsigned int len;
3703 
3704 	if (LogicalTapeRead(state->tapeset, tapenum,
3705 						&len, sizeof(len)) != sizeof(len))
3706 		elog(ERROR, "unexpected end of tape");
3707 	if (len == 0 && !eofOK)
3708 		elog(ERROR, "unexpected end of data");
3709 	return len;
3710 }
3711 
3712 static void
markrunend(Tuplesortstate * state,int tapenum)3713 markrunend(Tuplesortstate *state, int tapenum)
3714 {
3715 	unsigned int len = 0;
3716 
3717 	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3718 }
3719 
3720 /*
3721  * Get memory for tuple from within READTUP() routine.
3722  *
3723  * We use next free slot from the slab allocator, or palloc() if the tuple
3724  * is too large for that.
3725  */
3726 static void *
readtup_alloc(Tuplesortstate * state,Size tuplen)3727 readtup_alloc(Tuplesortstate *state, Size tuplen)
3728 {
3729 	SlabSlot   *buf;
3730 
3731 	/*
3732 	 * We pre-allocate enough slots in the slab arena that we should never run
3733 	 * out.
3734 	 */
3735 	Assert(state->slabFreeHead);
3736 
3737 	if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3738 		return MemoryContextAlloc(state->sortcontext, tuplen);
3739 	else
3740 	{
3741 		buf = state->slabFreeHead;
3742 		/* Reuse this slot */
3743 		state->slabFreeHead = buf->nextfree;
3744 
3745 		return buf;
3746 	}
3747 }
3748 
3749 
3750 /*
3751  * Routines specialized for HeapTuple (actually MinimalTuple) case
3752  */
3753 
3754 static int
comparetup_heap(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3755 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3756 {
3757 	SortSupport sortKey = state->sortKeys;
3758 	HeapTupleData ltup;
3759 	HeapTupleData rtup;
3760 	TupleDesc	tupDesc;
3761 	int			nkey;
3762 	int32		compare;
3763 	AttrNumber	attno;
3764 	Datum		datum1,
3765 				datum2;
3766 	bool		isnull1,
3767 				isnull2;
3768 
3769 
3770 	/* Compare the leading sort key */
3771 	compare = ApplySortComparator(a->datum1, a->isnull1,
3772 								  b->datum1, b->isnull1,
3773 								  sortKey);
3774 	if (compare != 0)
3775 		return compare;
3776 
3777 	/* Compare additional sort keys */
3778 	ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3779 	ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3780 	rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3781 	rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3782 	tupDesc = state->tupDesc;
3783 
3784 	if (sortKey->abbrev_converter)
3785 	{
3786 		attno = sortKey->ssup_attno;
3787 
3788 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3789 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3790 
3791 		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3792 												datum2, isnull2,
3793 												sortKey);
3794 		if (compare != 0)
3795 			return compare;
3796 	}
3797 
3798 	sortKey++;
3799 	for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3800 	{
3801 		attno = sortKey->ssup_attno;
3802 
3803 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3804 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3805 
3806 		compare = ApplySortComparator(datum1, isnull1,
3807 									  datum2, isnull2,
3808 									  sortKey);
3809 		if (compare != 0)
3810 			return compare;
3811 	}
3812 
3813 	return 0;
3814 }
3815 
3816 static void
copytup_heap(Tuplesortstate * state,SortTuple * stup,void * tup)3817 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3818 {
3819 	/*
3820 	 * We expect the passed "tup" to be a TupleTableSlot, and form a
3821 	 * MinimalTuple using the exported interface for that.
3822 	 */
3823 	TupleTableSlot *slot = (TupleTableSlot *) tup;
3824 	Datum		original;
3825 	MinimalTuple tuple;
3826 	HeapTupleData htup;
3827 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3828 
3829 	/* copy the tuple into sort storage */
3830 	tuple = ExecCopySlotMinimalTuple(slot);
3831 	stup->tuple = (void *) tuple;
3832 	USEMEM(state, GetMemoryChunkSpace(tuple));
3833 	/* set up first-column key value */
3834 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3835 	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3836 	original = heap_getattr(&htup,
3837 							state->sortKeys[0].ssup_attno,
3838 							state->tupDesc,
3839 							&stup->isnull1);
3840 
3841 	MemoryContextSwitchTo(oldcontext);
3842 
3843 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
3844 	{
3845 		/*
3846 		 * Store ordinary Datum representation, or NULL value.  If there is a
3847 		 * converter it won't expect NULL values, and cost model is not
3848 		 * required to account for NULL, so in that case we avoid calling
3849 		 * converter and just set datum1 to zeroed representation (to be
3850 		 * consistent, and to support cheap inequality tests for NULL
3851 		 * abbreviated keys).
3852 		 */
3853 		stup->datum1 = original;
3854 	}
3855 	else if (!consider_abort_common(state))
3856 	{
3857 		/* Store abbreviated key representation */
3858 		stup->datum1 = state->sortKeys->abbrev_converter(original,
3859 														 state->sortKeys);
3860 	}
3861 	else
3862 	{
3863 		/* Abort abbreviation */
3864 		int			i;
3865 
3866 		stup->datum1 = original;
3867 
3868 		/*
3869 		 * Set state to be consistent with never trying abbreviation.
3870 		 *
3871 		 * Alter datum1 representation in already-copied tuples, so as to
3872 		 * ensure a consistent representation (current tuple was just
3873 		 * handled).  It does not matter if some dumped tuples are already
3874 		 * sorted on tape, since serialized tuples lack abbreviated keys
3875 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3876 		 */
3877 		for (i = 0; i < state->memtupcount; i++)
3878 		{
3879 			SortTuple  *mtup = &state->memtuples[i];
3880 
3881 			htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3882 				MINIMAL_TUPLE_OFFSET;
3883 			htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3884 											 MINIMAL_TUPLE_OFFSET);
3885 
3886 			mtup->datum1 = heap_getattr(&htup,
3887 										state->sortKeys[0].ssup_attno,
3888 										state->tupDesc,
3889 										&mtup->isnull1);
3890 		}
3891 	}
3892 }
3893 
3894 static void
writetup_heap(Tuplesortstate * state,int tapenum,SortTuple * stup)3895 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3896 {
3897 	MinimalTuple tuple = (MinimalTuple) stup->tuple;
3898 
3899 	/* the part of the MinimalTuple we'll write: */
3900 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3901 	unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3902 
3903 	/* total on-disk footprint: */
3904 	unsigned int tuplen = tupbodylen + sizeof(int);
3905 
3906 	LogicalTapeWrite(state->tapeset, tapenum,
3907 					 (void *) &tuplen, sizeof(tuplen));
3908 	LogicalTapeWrite(state->tapeset, tapenum,
3909 					 (void *) tupbody, tupbodylen);
3910 	if (state->randomAccess)	/* need trailing length word? */
3911 		LogicalTapeWrite(state->tapeset, tapenum,
3912 						 (void *) &tuplen, sizeof(tuplen));
3913 
3914 	if (!state->slabAllocatorUsed)
3915 	{
3916 		FREEMEM(state, GetMemoryChunkSpace(tuple));
3917 		heap_free_minimal_tuple(tuple);
3918 	}
3919 }
3920 
3921 static void
readtup_heap(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)3922 readtup_heap(Tuplesortstate *state, SortTuple *stup,
3923 			 int tapenum, unsigned int len)
3924 {
3925 	unsigned int tupbodylen = len - sizeof(int);
3926 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3927 	MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3928 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3929 	HeapTupleData htup;
3930 
3931 	/* read in the tuple proper */
3932 	tuple->t_len = tuplen;
3933 	LogicalTapeReadExact(state->tapeset, tapenum,
3934 						 tupbody, tupbodylen);
3935 	if (state->randomAccess)	/* need trailing length word? */
3936 		LogicalTapeReadExact(state->tapeset, tapenum,
3937 							 &tuplen, sizeof(tuplen));
3938 	stup->tuple = (void *) tuple;
3939 	/* set up first-column key value */
3940 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3941 	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3942 	stup->datum1 = heap_getattr(&htup,
3943 								state->sortKeys[0].ssup_attno,
3944 								state->tupDesc,
3945 								&stup->isnull1);
3946 }
3947 
3948 /*
3949  * Routines specialized for the CLUSTER case (HeapTuple data, with
3950  * comparisons per a btree index definition)
3951  */
3952 
3953 static int
comparetup_cluster(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3954 comparetup_cluster(const SortTuple *a, const SortTuple *b,
3955 				   Tuplesortstate *state)
3956 {
3957 	SortSupport sortKey = state->sortKeys;
3958 	HeapTuple	ltup;
3959 	HeapTuple	rtup;
3960 	TupleDesc	tupDesc;
3961 	int			nkey;
3962 	int32		compare;
3963 	Datum		datum1,
3964 				datum2;
3965 	bool		isnull1,
3966 				isnull2;
3967 	AttrNumber	leading = state->indexInfo->ii_IndexAttrNumbers[0];
3968 
3969 	/* Be prepared to compare additional sort keys */
3970 	ltup = (HeapTuple) a->tuple;
3971 	rtup = (HeapTuple) b->tuple;
3972 	tupDesc = state->tupDesc;
3973 
3974 	/* Compare the leading sort key, if it's simple */
3975 	if (leading != 0)
3976 	{
3977 		compare = ApplySortComparator(a->datum1, a->isnull1,
3978 									  b->datum1, b->isnull1,
3979 									  sortKey);
3980 		if (compare != 0)
3981 			return compare;
3982 
3983 		if (sortKey->abbrev_converter)
3984 		{
3985 			datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3986 			datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3987 
3988 			compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3989 													datum2, isnull2,
3990 													sortKey);
3991 		}
3992 		if (compare != 0 || state->nKeys == 1)
3993 			return compare;
3994 		/* Compare additional columns the hard way */
3995 		sortKey++;
3996 		nkey = 1;
3997 	}
3998 	else
3999 	{
4000 		/* Must compare all keys the hard way */
4001 		nkey = 0;
4002 	}
4003 
4004 	if (state->indexInfo->ii_Expressions == NULL)
4005 	{
4006 		/* If not expression index, just compare the proper heap attrs */
4007 
4008 		for (; nkey < state->nKeys; nkey++, sortKey++)
4009 		{
4010 			AttrNumber	attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
4011 
4012 			datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
4013 			datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
4014 
4015 			compare = ApplySortComparator(datum1, isnull1,
4016 										  datum2, isnull2,
4017 										  sortKey);
4018 			if (compare != 0)
4019 				return compare;
4020 		}
4021 	}
4022 	else
4023 	{
4024 		/*
4025 		 * In the expression index case, compute the whole index tuple and
4026 		 * then compare values.  It would perhaps be faster to compute only as
4027 		 * many columns as we need to compare, but that would require
4028 		 * duplicating all the logic in FormIndexDatum.
4029 		 */
4030 		Datum		l_index_values[INDEX_MAX_KEYS];
4031 		bool		l_index_isnull[INDEX_MAX_KEYS];
4032 		Datum		r_index_values[INDEX_MAX_KEYS];
4033 		bool		r_index_isnull[INDEX_MAX_KEYS];
4034 		TupleTableSlot *ecxt_scantuple;
4035 
4036 		/* Reset context each time to prevent memory leakage */
4037 		ResetPerTupleExprContext(state->estate);
4038 
4039 		ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
4040 
4041 		ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
4042 		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
4043 					   l_index_values, l_index_isnull);
4044 
4045 		ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
4046 		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
4047 					   r_index_values, r_index_isnull);
4048 
4049 		for (; nkey < state->nKeys; nkey++, sortKey++)
4050 		{
4051 			compare = ApplySortComparator(l_index_values[nkey],
4052 										  l_index_isnull[nkey],
4053 										  r_index_values[nkey],
4054 										  r_index_isnull[nkey],
4055 										  sortKey);
4056 			if (compare != 0)
4057 				return compare;
4058 		}
4059 	}
4060 
4061 	return 0;
4062 }
4063 
4064 static void
copytup_cluster(Tuplesortstate * state,SortTuple * stup,void * tup)4065 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
4066 {
4067 	HeapTuple	tuple = (HeapTuple) tup;
4068 	Datum		original;
4069 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
4070 
4071 	/* copy the tuple into sort storage */
4072 	tuple = heap_copytuple(tuple);
4073 	stup->tuple = (void *) tuple;
4074 	USEMEM(state, GetMemoryChunkSpace(tuple));
4075 
4076 	MemoryContextSwitchTo(oldcontext);
4077 
4078 	/*
4079 	 * set up first-column key value, and potentially abbreviate, if it's a
4080 	 * simple column
4081 	 */
4082 	if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
4083 		return;
4084 
4085 	original = heap_getattr(tuple,
4086 							state->indexInfo->ii_IndexAttrNumbers[0],
4087 							state->tupDesc,
4088 							&stup->isnull1);
4089 
4090 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
4091 	{
4092 		/*
4093 		 * Store ordinary Datum representation, or NULL value.  If there is a
4094 		 * converter it won't expect NULL values, and cost model is not
4095 		 * required to account for NULL, so in that case we avoid calling
4096 		 * converter and just set datum1 to zeroed representation (to be
4097 		 * consistent, and to support cheap inequality tests for NULL
4098 		 * abbreviated keys).
4099 		 */
4100 		stup->datum1 = original;
4101 	}
4102 	else if (!consider_abort_common(state))
4103 	{
4104 		/* Store abbreviated key representation */
4105 		stup->datum1 = state->sortKeys->abbrev_converter(original,
4106 														 state->sortKeys);
4107 	}
4108 	else
4109 	{
4110 		/* Abort abbreviation */
4111 		int			i;
4112 
4113 		stup->datum1 = original;
4114 
4115 		/*
4116 		 * Set state to be consistent with never trying abbreviation.
4117 		 *
4118 		 * Alter datum1 representation in already-copied tuples, so as to
4119 		 * ensure a consistent representation (current tuple was just
4120 		 * handled).  It does not matter if some dumped tuples are already
4121 		 * sorted on tape, since serialized tuples lack abbreviated keys
4122 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
4123 		 */
4124 		for (i = 0; i < state->memtupcount; i++)
4125 		{
4126 			SortTuple  *mtup = &state->memtuples[i];
4127 
4128 			tuple = (HeapTuple) mtup->tuple;
4129 			mtup->datum1 = heap_getattr(tuple,
4130 										state->indexInfo->ii_IndexAttrNumbers[0],
4131 										state->tupDesc,
4132 										&mtup->isnull1);
4133 		}
4134 	}
4135 }
4136 
4137 static void
writetup_cluster(Tuplesortstate * state,int tapenum,SortTuple * stup)4138 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
4139 {
4140 	HeapTuple	tuple = (HeapTuple) stup->tuple;
4141 	unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
4142 
4143 	/* We need to store t_self, but not other fields of HeapTupleData */
4144 	LogicalTapeWrite(state->tapeset, tapenum,
4145 					 &tuplen, sizeof(tuplen));
4146 	LogicalTapeWrite(state->tapeset, tapenum,
4147 					 &tuple->t_self, sizeof(ItemPointerData));
4148 	LogicalTapeWrite(state->tapeset, tapenum,
4149 					 tuple->t_data, tuple->t_len);
4150 	if (state->randomAccess)	/* need trailing length word? */
4151 		LogicalTapeWrite(state->tapeset, tapenum,
4152 						 &tuplen, sizeof(tuplen));
4153 
4154 	if (!state->slabAllocatorUsed)
4155 	{
4156 		FREEMEM(state, GetMemoryChunkSpace(tuple));
4157 		heap_freetuple(tuple);
4158 	}
4159 }
4160 
4161 static void
readtup_cluster(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int tuplen)4162 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
4163 				int tapenum, unsigned int tuplen)
4164 {
4165 	unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
4166 	HeapTuple	tuple = (HeapTuple) readtup_alloc(state,
4167 												  t_len + HEAPTUPLESIZE);
4168 
4169 	/* Reconstruct the HeapTupleData header */
4170 	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
4171 	tuple->t_len = t_len;
4172 	LogicalTapeReadExact(state->tapeset, tapenum,
4173 						 &tuple->t_self, sizeof(ItemPointerData));
4174 	/* We don't currently bother to reconstruct t_tableOid */
4175 	tuple->t_tableOid = InvalidOid;
4176 	/* Read in the tuple body */
4177 	LogicalTapeReadExact(state->tapeset, tapenum,
4178 						 tuple->t_data, tuple->t_len);
4179 	if (state->randomAccess)	/* need trailing length word? */
4180 		LogicalTapeReadExact(state->tapeset, tapenum,
4181 							 &tuplen, sizeof(tuplen));
4182 	stup->tuple = (void *) tuple;
4183 	/* set up first-column key value, if it's a simple column */
4184 	if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
4185 		stup->datum1 = heap_getattr(tuple,
4186 									state->indexInfo->ii_IndexAttrNumbers[0],
4187 									state->tupDesc,
4188 									&stup->isnull1);
4189 }
4190 
4191 /*
4192  * Routines specialized for IndexTuple case
4193  *
4194  * The btree and hash cases require separate comparison functions, but the
4195  * IndexTuple representation is the same so the copy/write/read support
4196  * functions can be shared.
4197  */
4198 
4199 static int
comparetup_index_btree(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4200 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
4201 					   Tuplesortstate *state)
4202 {
4203 	/*
4204 	 * This is similar to comparetup_heap(), but expects index tuples.  There
4205 	 * is also special handling for enforcing uniqueness, and special
4206 	 * treatment for equal keys at the end.
4207 	 */
4208 	SortSupport sortKey = state->sortKeys;
4209 	IndexTuple	tuple1;
4210 	IndexTuple	tuple2;
4211 	int			keysz;
4212 	TupleDesc	tupDes;
4213 	bool		equal_hasnull = false;
4214 	int			nkey;
4215 	int32		compare;
4216 	Datum		datum1,
4217 				datum2;
4218 	bool		isnull1,
4219 				isnull2;
4220 
4221 
4222 	/* Compare the leading sort key */
4223 	compare = ApplySortComparator(a->datum1, a->isnull1,
4224 								  b->datum1, b->isnull1,
4225 								  sortKey);
4226 	if (compare != 0)
4227 		return compare;
4228 
4229 	/* Compare additional sort keys */
4230 	tuple1 = (IndexTuple) a->tuple;
4231 	tuple2 = (IndexTuple) b->tuple;
4232 	keysz = state->nKeys;
4233 	tupDes = RelationGetDescr(state->indexRel);
4234 
4235 	if (sortKey->abbrev_converter)
4236 	{
4237 		datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
4238 		datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
4239 
4240 		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
4241 												datum2, isnull2,
4242 												sortKey);
4243 		if (compare != 0)
4244 			return compare;
4245 	}
4246 
4247 	/* they are equal, so we only need to examine one null flag */
4248 	if (a->isnull1)
4249 		equal_hasnull = true;
4250 
4251 	sortKey++;
4252 	for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4253 	{
4254 		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4255 		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4256 
4257 		compare = ApplySortComparator(datum1, isnull1,
4258 									  datum2, isnull2,
4259 									  sortKey);
4260 		if (compare != 0)
4261 			return compare;		/* done when we find unequal attributes */
4262 
4263 		/* they are equal, so we only need to examine one null flag */
4264 		if (isnull1)
4265 			equal_hasnull = true;
4266 	}
4267 
4268 	/*
4269 	 * If btree has asked us to enforce uniqueness, complain if two equal
4270 	 * tuples are detected (unless there was at least one NULL field).
4271 	 *
4272 	 * It is sufficient to make the test here, because if two tuples are equal
4273 	 * they *must* get compared at some stage of the sort --- otherwise the
4274 	 * sort algorithm wouldn't have checked whether one must appear before the
4275 	 * other.
4276 	 */
4277 	if (state->enforceUnique && !equal_hasnull)
4278 	{
4279 		Datum		values[INDEX_MAX_KEYS];
4280 		bool		isnull[INDEX_MAX_KEYS];
4281 		char	   *key_desc;
4282 
4283 		/*
4284 		 * Some rather brain-dead implementations of qsort (such as the one in
4285 		 * QNX 4) will sometimes call the comparison routine to compare a
4286 		 * value to itself, but we always use our own implementation, which
4287 		 * does not.
4288 		 */
4289 		Assert(tuple1 != tuple2);
4290 
4291 		index_deform_tuple(tuple1, tupDes, values, isnull);
4292 
4293 		key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4294 
4295 		ereport(ERROR,
4296 				(errcode(ERRCODE_UNIQUE_VIOLATION),
4297 				 errmsg("could not create unique index \"%s\"",
4298 						RelationGetRelationName(state->indexRel)),
4299 				 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4300 				 errdetail("Duplicate keys exist."),
4301 				 errtableconstraint(state->heapRel,
4302 									RelationGetRelationName(state->indexRel))));
4303 	}
4304 
4305 	/*
4306 	 * If key values are equal, we sort on ItemPointer.  This is required for
4307 	 * btree indexes, since heap TID is treated as an implicit last key
4308 	 * attribute in order to ensure that all keys in the index are physically
4309 	 * unique.
4310 	 */
4311 	{
4312 		BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4313 		BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4314 
4315 		if (blk1 != blk2)
4316 			return (blk1 < blk2) ? -1 : 1;
4317 	}
4318 	{
4319 		OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4320 		OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4321 
4322 		if (pos1 != pos2)
4323 			return (pos1 < pos2) ? -1 : 1;
4324 	}
4325 
4326 	/* ItemPointer values should never be equal */
4327 	Assert(false);
4328 
4329 	return 0;
4330 }
4331 
4332 static int
comparetup_index_hash(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4333 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4334 					  Tuplesortstate *state)
4335 {
4336 	Bucket		bucket1;
4337 	Bucket		bucket2;
4338 	IndexTuple	tuple1;
4339 	IndexTuple	tuple2;
4340 
4341 	/*
4342 	 * Fetch hash keys and mask off bits we don't want to sort by. We know
4343 	 * that the first column of the index tuple is the hash key.
4344 	 */
4345 	Assert(!a->isnull1);
4346 	bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4347 								   state->max_buckets, state->high_mask,
4348 								   state->low_mask);
4349 	Assert(!b->isnull1);
4350 	bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4351 								   state->max_buckets, state->high_mask,
4352 								   state->low_mask);
4353 	if (bucket1 > bucket2)
4354 		return 1;
4355 	else if (bucket1 < bucket2)
4356 		return -1;
4357 
4358 	/*
4359 	 * If hash values are equal, we sort on ItemPointer.  This does not affect
4360 	 * validity of the finished index, but it may be useful to have index
4361 	 * scans in physical order.
4362 	 */
4363 	tuple1 = (IndexTuple) a->tuple;
4364 	tuple2 = (IndexTuple) b->tuple;
4365 
4366 	{
4367 		BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4368 		BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4369 
4370 		if (blk1 != blk2)
4371 			return (blk1 < blk2) ? -1 : 1;
4372 	}
4373 	{
4374 		OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4375 		OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4376 
4377 		if (pos1 != pos2)
4378 			return (pos1 < pos2) ? -1 : 1;
4379 	}
4380 
4381 	/* ItemPointer values should never be equal */
4382 	Assert(false);
4383 
4384 	return 0;
4385 }
4386 
4387 static void
copytup_index(Tuplesortstate * state,SortTuple * stup,void * tup)4388 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4389 {
4390 	/* Not currently needed */
4391 	elog(ERROR, "copytup_index() should not be called");
4392 }
4393 
4394 static void
writetup_index(Tuplesortstate * state,int tapenum,SortTuple * stup)4395 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4396 {
4397 	IndexTuple	tuple = (IndexTuple) stup->tuple;
4398 	unsigned int tuplen;
4399 
4400 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4401 	LogicalTapeWrite(state->tapeset, tapenum,
4402 					 (void *) &tuplen, sizeof(tuplen));
4403 	LogicalTapeWrite(state->tapeset, tapenum,
4404 					 (void *) tuple, IndexTupleSize(tuple));
4405 	if (state->randomAccess)	/* need trailing length word? */
4406 		LogicalTapeWrite(state->tapeset, tapenum,
4407 						 (void *) &tuplen, sizeof(tuplen));
4408 
4409 	if (!state->slabAllocatorUsed)
4410 	{
4411 		FREEMEM(state, GetMemoryChunkSpace(tuple));
4412 		pfree(tuple);
4413 	}
4414 }
4415 
4416 static void
readtup_index(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4417 readtup_index(Tuplesortstate *state, SortTuple *stup,
4418 			  int tapenum, unsigned int len)
4419 {
4420 	unsigned int tuplen = len - sizeof(unsigned int);
4421 	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tuplen);
4422 
4423 	LogicalTapeReadExact(state->tapeset, tapenum,
4424 						 tuple, tuplen);
4425 	if (state->randomAccess)	/* need trailing length word? */
4426 		LogicalTapeReadExact(state->tapeset, tapenum,
4427 							 &tuplen, sizeof(tuplen));
4428 	stup->tuple = (void *) tuple;
4429 	/* set up first-column key value */
4430 	stup->datum1 = index_getattr(tuple,
4431 								 1,
4432 								 RelationGetDescr(state->indexRel),
4433 								 &stup->isnull1);
4434 }
4435 
4436 /*
4437  * Routines specialized for DatumTuple case
4438  */
4439 
4440 static int
comparetup_datum(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4441 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4442 {
4443 	int			compare;
4444 
4445 	compare = ApplySortComparator(a->datum1, a->isnull1,
4446 								  b->datum1, b->isnull1,
4447 								  state->sortKeys);
4448 	if (compare != 0)
4449 		return compare;
4450 
4451 	/* if we have abbreviations, then "tuple" has the original value */
4452 
4453 	if (state->sortKeys->abbrev_converter)
4454 		compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4455 												PointerGetDatum(b->tuple), b->isnull1,
4456 												state->sortKeys);
4457 
4458 	return compare;
4459 }
4460 
4461 static void
copytup_datum(Tuplesortstate * state,SortTuple * stup,void * tup)4462 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4463 {
4464 	/* Not currently needed */
4465 	elog(ERROR, "copytup_datum() should not be called");
4466 }
4467 
4468 static void
writetup_datum(Tuplesortstate * state,int tapenum,SortTuple * stup)4469 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4470 {
4471 	void	   *waddr;
4472 	unsigned int tuplen;
4473 	unsigned int writtenlen;
4474 
4475 	if (stup->isnull1)
4476 	{
4477 		waddr = NULL;
4478 		tuplen = 0;
4479 	}
4480 	else if (!state->tuples)
4481 	{
4482 		waddr = &stup->datum1;
4483 		tuplen = sizeof(Datum);
4484 	}
4485 	else
4486 	{
4487 		waddr = stup->tuple;
4488 		tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4489 		Assert(tuplen != 0);
4490 	}
4491 
4492 	writtenlen = tuplen + sizeof(unsigned int);
4493 
4494 	LogicalTapeWrite(state->tapeset, tapenum,
4495 					 (void *) &writtenlen, sizeof(writtenlen));
4496 	LogicalTapeWrite(state->tapeset, tapenum,
4497 					 waddr, tuplen);
4498 	if (state->randomAccess)	/* need trailing length word? */
4499 		LogicalTapeWrite(state->tapeset, tapenum,
4500 						 (void *) &writtenlen, sizeof(writtenlen));
4501 
4502 	if (!state->slabAllocatorUsed && stup->tuple)
4503 	{
4504 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4505 		pfree(stup->tuple);
4506 	}
4507 }
4508 
4509 static void
readtup_datum(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4510 readtup_datum(Tuplesortstate *state, SortTuple *stup,
4511 			  int tapenum, unsigned int len)
4512 {
4513 	unsigned int tuplen = len - sizeof(unsigned int);
4514 
4515 	if (tuplen == 0)
4516 	{
4517 		/* it's NULL */
4518 		stup->datum1 = (Datum) 0;
4519 		stup->isnull1 = true;
4520 		stup->tuple = NULL;
4521 	}
4522 	else if (!state->tuples)
4523 	{
4524 		Assert(tuplen == sizeof(Datum));
4525 		LogicalTapeReadExact(state->tapeset, tapenum,
4526 							 &stup->datum1, tuplen);
4527 		stup->isnull1 = false;
4528 		stup->tuple = NULL;
4529 	}
4530 	else
4531 	{
4532 		void	   *raddr = readtup_alloc(state, tuplen);
4533 
4534 		LogicalTapeReadExact(state->tapeset, tapenum,
4535 							 raddr, tuplen);
4536 		stup->datum1 = PointerGetDatum(raddr);
4537 		stup->isnull1 = false;
4538 		stup->tuple = raddr;
4539 	}
4540 
4541 	if (state->randomAccess)	/* need trailing length word? */
4542 		LogicalTapeReadExact(state->tapeset, tapenum,
4543 							 &tuplen, sizeof(tuplen));
4544 }
4545 
4546 /*
4547  * Parallel sort routines
4548  */
4549 
4550 /*
4551  * tuplesort_estimate_shared - estimate required shared memory allocation
4552  *
4553  * nWorkers is an estimate of the number of workers (it's the number that
4554  * will be requested).
4555  */
4556 Size
tuplesort_estimate_shared(int nWorkers)4557 tuplesort_estimate_shared(int nWorkers)
4558 {
4559 	Size		tapesSize;
4560 
4561 	Assert(nWorkers > 0);
4562 
4563 	/* Make sure that BufFile shared state is MAXALIGN'd */
4564 	tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4565 	tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4566 
4567 	return tapesSize;
4568 }
4569 
4570 /*
4571  * tuplesort_initialize_shared - initialize shared tuplesort state
4572  *
4573  * Must be called from leader process before workers are launched, to
4574  * establish state needed up-front for worker tuplesortstates.  nWorkers
4575  * should match the argument passed to tuplesort_estimate_shared().
4576  */
4577 void
tuplesort_initialize_shared(Sharedsort * shared,int nWorkers,dsm_segment * seg)4578 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4579 {
4580 	int			i;
4581 
4582 	Assert(nWorkers > 0);
4583 
4584 	SpinLockInit(&shared->mutex);
4585 	shared->currentWorker = 0;
4586 	shared->workersFinished = 0;
4587 	SharedFileSetInit(&shared->fileset, seg);
4588 	shared->nTapes = nWorkers;
4589 	for (i = 0; i < nWorkers; i++)
4590 	{
4591 		shared->tapes[i].firstblocknumber = 0L;
4592 	}
4593 }
4594 
4595 /*
4596  * tuplesort_attach_shared - attach to shared tuplesort state
4597  *
4598  * Must be called by all worker processes.
4599  */
4600 void
tuplesort_attach_shared(Sharedsort * shared,dsm_segment * seg)4601 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4602 {
4603 	/* Attach to SharedFileSet */
4604 	SharedFileSetAttach(&shared->fileset, seg);
4605 }
4606 
4607 /*
4608  * worker_get_identifier - Assign and return ordinal identifier for worker
4609  *
4610  * The order in which these are assigned is not well defined, and should not
4611  * matter; worker numbers across parallel sort participants need only be
4612  * distinct and gapless.  logtape.c requires this.
4613  *
4614  * Note that the identifiers assigned from here have no relation to
4615  * ParallelWorkerNumber number, to avoid making any assumption about
4616  * caller's requirements.  However, we do follow the ParallelWorkerNumber
4617  * convention of representing a non-worker with worker number -1.  This
4618  * includes the leader, as well as serial Tuplesort processes.
4619  */
4620 static int
worker_get_identifier(Tuplesortstate * state)4621 worker_get_identifier(Tuplesortstate *state)
4622 {
4623 	Sharedsort *shared = state->shared;
4624 	int			worker;
4625 
4626 	Assert(WORKER(state));
4627 
4628 	SpinLockAcquire(&shared->mutex);
4629 	worker = shared->currentWorker++;
4630 	SpinLockRelease(&shared->mutex);
4631 
4632 	return worker;
4633 }
4634 
4635 /*
4636  * worker_freeze_result_tape - freeze worker's result tape for leader
4637  *
4638  * This is called by workers just after the result tape has been determined,
4639  * instead of calling LogicalTapeFreeze() directly.  They do so because
4640  * workers require a few additional steps over similar serial
4641  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
4642  * steps are around freeing now unneeded resources, and representing to
4643  * leader that worker's input run is available for its merge.
4644  *
4645  * There should only be one final output run for each worker, which consists
4646  * of all tuples that were originally input into worker.
4647  */
4648 static void
worker_freeze_result_tape(Tuplesortstate * state)4649 worker_freeze_result_tape(Tuplesortstate *state)
4650 {
4651 	Sharedsort *shared = state->shared;
4652 	TapeShare	output;
4653 
4654 	Assert(WORKER(state));
4655 	Assert(state->result_tape != -1);
4656 	Assert(state->memtupcount == 0);
4657 
4658 	/*
4659 	 * Free most remaining memory, in case caller is sensitive to our holding
4660 	 * on to it.  memtuples may not be a tiny merge heap at this point.
4661 	 */
4662 	pfree(state->memtuples);
4663 	/* Be tidy */
4664 	state->memtuples = NULL;
4665 	state->memtupsize = 0;
4666 
4667 	/*
4668 	 * Parallel worker requires result tape metadata, which is to be stored in
4669 	 * shared memory for leader
4670 	 */
4671 	LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4672 
4673 	/* Store properties of output tape, and update finished worker count */
4674 	SpinLockAcquire(&shared->mutex);
4675 	shared->tapes[state->worker] = output;
4676 	shared->workersFinished++;
4677 	SpinLockRelease(&shared->mutex);
4678 }
4679 
4680 /*
4681  * worker_nomergeruns - dump memtuples in worker, without merging
4682  *
4683  * This called as an alternative to mergeruns() with a worker when no
4684  * merging is required.
4685  */
4686 static void
worker_nomergeruns(Tuplesortstate * state)4687 worker_nomergeruns(Tuplesortstate *state)
4688 {
4689 	Assert(WORKER(state));
4690 	Assert(state->result_tape == -1);
4691 
4692 	state->result_tape = state->tp_tapenum[state->destTape];
4693 	worker_freeze_result_tape(state);
4694 }
4695 
4696 /*
4697  * leader_takeover_tapes - create tapeset for leader from worker tapes
4698  *
4699  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
4700  * sorting has occurred in workers, all of which must have already returned
4701  * from tuplesort_performsort().
4702  *
4703  * When this returns, leader process is left in a state that is virtually
4704  * indistinguishable from it having generated runs as a serial external sort
4705  * might have.
4706  */
4707 static void
leader_takeover_tapes(Tuplesortstate * state)4708 leader_takeover_tapes(Tuplesortstate *state)
4709 {
4710 	Sharedsort *shared = state->shared;
4711 	int			nParticipants = state->nParticipants;
4712 	int			workersFinished;
4713 	int			j;
4714 
4715 	Assert(LEADER(state));
4716 	Assert(nParticipants >= 1);
4717 
4718 	SpinLockAcquire(&shared->mutex);
4719 	workersFinished = shared->workersFinished;
4720 	SpinLockRelease(&shared->mutex);
4721 
4722 	if (nParticipants != workersFinished)
4723 		elog(ERROR, "cannot take over tapes before all workers finish");
4724 
4725 	/*
4726 	 * Create the tapeset from worker tapes, including a leader-owned tape at
4727 	 * the end.  Parallel workers are far more expensive than logical tapes,
4728 	 * so the number of tapes allocated here should never be excessive.
4729 	 *
4730 	 * We still have a leader tape, though it's not possible to write to it
4731 	 * due to restrictions in the shared fileset infrastructure used by
4732 	 * logtape.c.  It will never be written to in practice because
4733 	 * randomAccess is disallowed for parallel sorts.
4734 	 */
4735 	inittapestate(state, nParticipants + 1);
4736 	state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
4737 										  shared->tapes, &shared->fileset,
4738 										  state->worker);
4739 
4740 	/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4741 	state->currentRun = nParticipants;
4742 
4743 	/*
4744 	 * Initialize variables of Algorithm D to be consistent with runs from
4745 	 * workers having been generated in the leader.
4746 	 *
4747 	 * There will always be exactly 1 run per worker, and exactly one input
4748 	 * tape per run, because workers always output exactly 1 run, even when
4749 	 * there were no input tuples for workers to sort.
4750 	 */
4751 	for (j = 0; j < state->maxTapes; j++)
4752 	{
4753 		/* One real run; no dummy runs for worker tapes */
4754 		state->tp_fib[j] = 1;
4755 		state->tp_runs[j] = 1;
4756 		state->tp_dummy[j] = 0;
4757 		state->tp_tapenum[j] = j;
4758 	}
4759 	/* Leader tape gets one dummy run, and no real runs */
4760 	state->tp_fib[state->tapeRange] = 0;
4761 	state->tp_runs[state->tapeRange] = 0;
4762 	state->tp_dummy[state->tapeRange] = 1;
4763 
4764 	state->Level = 1;
4765 	state->destTape = 0;
4766 
4767 	state->status = TSS_BUILDRUNS;
4768 }
4769 
4770 /*
4771  * Convenience routine to free a tuple previously loaded into sort memory
4772  */
4773 static void
free_sort_tuple(Tuplesortstate * state,SortTuple * stup)4774 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4775 {
4776 	if (stup->tuple)
4777 	{
4778 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4779 		pfree(stup->tuple);
4780 		stup->tuple = NULL;
4781 	}
4782 }
4783