1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *	  Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  Historically, we divided the input into sorted runs
15  * using replacement selection, in the form of a priority tree implemented
16  * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17  * quicksort for run generation.  We merge the runs using polyphase merge,
18  * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
19  * implemented by logtape.c, which avoids space wastage by recycling disk
20  * space as soon as each block is read from its "tape".
21  *
22  * The approximate amount of memory allowed for any one sort operation
23  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
24  * we absorb tuples and simply store them in an unsorted array as long as
25  * we haven't exceeded workMem.  If we reach the end of the input without
26  * exceeding workMem, we sort the array using qsort() and subsequently return
27  * tuples just by scanning the tuple array sequentially.  If we do exceed
28  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29  * When tuples are dumped in batch after quicksorting, we begin a new run
30  * with a new output tape (selected per Algorithm D).  After the end of the
31  * input is reached, we dump out remaining tuples in memory into a final run,
32  * then merge the runs using Algorithm D.
33  *
34  * When merging runs, we use a heap containing just the frontmost tuple from
35  * each source run; we repeatedly output the smallest tuple and replace it
36  * with the next tuple from its source tape (if any).  When the heap empties,
37  * the merge is complete.  The basic merge algorithm thus needs very little
38  * memory --- only M tuples for an M-way merge, and M is constrained to a
39  * small number.  However, we can still make good use of our full workMem
40  * allocation by pre-reading additional blocks from each source tape.  Without
41  * prereading, our access pattern to the temporary file would be very erratic;
42  * on average we'd read one block from each of M source tapes during the same
43  * time that we're writing M blocks to the output tape, so there is no
44  * sequentiality of access at all, defeating the read-ahead methods used by
45  * most Unix kernels.  Worse, the output tape gets written into a very random
46  * sequence of blocks of the temp file, ensuring that things will be even
47  * worse when it comes time to read that tape.  A straightforward merge pass
48  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
49  * by prereading from each source tape sequentially, loading about workMem/M
50  * bytes from each tape in turn, and making the sequential blocks immediately
51  * available for reuse.  This approach helps to localize both read and write
52  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
53  * much memory to use for the buffers.
54  *
55  * When the caller requests random access to the sort result, we form
56  * the final sorted run on a logical tape which is then "frozen", so
57  * that we can access it randomly.  When the caller does not need random
58  * access, we return from tuplesort_performsort() as soon as we are down
59  * to one run per logical tape.  The final merge is then performed
60  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61  * saves one cycle of writing all the data out to disk and reading it in.
62  *
63  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
66  * tape drives are expensive beasts, and in particular that there will always
67  * be many more runs than tape drives.  In our implementation a "tape drive"
68  * doesn't cost much more than a few Kb of memory buffers, so we can afford
69  * to have lots of them.  In particular, if we can have as many tape drives
70  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
71  * code we determine the number of tapes M on the basis of workMem: we want
72  * workMem/M to be large enough that we read a fair amount of data each time
73  * we preread from a tape, so as to maintain the locality of access described
74  * above.  Nonetheless, with large workMem we can have many tapes (but not
75  * too many -- see the comments in tuplesort_merge_order).
76  *
77  * This module supports parallel sorting.  Parallel sorts involve coordination
78  * among one or more worker processes, and a leader process, each with its own
79  * tuplesort state.  The leader process (or, more accurately, the
80  * Tuplesortstate associated with a leader process) creates a full tapeset
81  * consisting of worker tapes with one run to merge; a run for every
82  * worker process.  This is then merged.  Worker processes are guaranteed to
83  * produce exactly one output run from their partial input.
84  *
85  *
86  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
87  * Portions Copyright (c) 1994, Regents of the University of California
88  *
89  * IDENTIFICATION
90  *	  src/backend/utils/sort/tuplesort.c
91  *
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include <limits.h>
98 
99 #include "access/hash.h"
100 #include "access/htup_details.h"
101 #include "access/nbtree.h"
102 #include "catalog/index.h"
103 #include "catalog/pg_am.h"
104 #include "commands/tablespace.h"
105 #include "executor/executor.h"
106 #include "miscadmin.h"
107 #include "pg_trace.h"
108 #include "utils/datum.h"
109 #include "utils/logtape.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 #include "utils/pg_rusage.h"
113 #include "utils/rel.h"
114 #include "utils/sortsupport.h"
115 #include "utils/tuplesort.h"
116 
117 
118 /* sort-type codes for sort__start probes */
119 #define HEAP_SORT		0
120 #define INDEX_SORT		1
121 #define DATUM_SORT		2
122 #define CLUSTER_SORT	3
123 
124 /* Sort parallel code from state for sort__start probes */
125 #define PARALLEL_SORT(state)	((state)->shared == NULL ? 0 : \
126 								 (state)->worker >= 0 ? 1 : 2)
127 
128 /*
129  * Initial size of memtuples array.  We're trying to select this size so that
130  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
131  * allocation might possibly be lowered.  However, we don't consider array sizes
132  * less than 1024.
133  *
134  */
135 #define INITIAL_MEMTUPSIZE Max(1024, \
136 	ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
137 
138 /* GUC variables */
139 #ifdef TRACE_SORT
140 bool		trace_sort = false;
141 #endif
142 
143 #ifdef DEBUG_BOUNDED_SORT
144 bool		optimize_bounded_sort = true;
145 #endif
146 
147 
148 /*
149  * The objects we actually sort are SortTuple structs.  These contain
150  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
151  * which is a separate palloc chunk --- we assume it is just one chunk and
152  * can be freed by a simple pfree() (except during merge, when we use a
153  * simple slab allocator).  SortTuples also contain the tuple's first key
154  * column in Datum/nullflag format, and a source/input tape number that
155  * tracks which tape each heap element/slot belongs to during merging.
156  *
157  * Storing the first key column lets us save heap_getattr or index_getattr
158  * calls during tuple comparisons.  We could extract and save all the key
159  * columns not just the first, but this would increase code complexity and
160  * overhead, and wouldn't actually save any comparison cycles in the common
161  * case where the first key determines the comparison result.  Note that
162  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
163  *
164  * There is one special case: when the sort support infrastructure provides an
165  * "abbreviated key" representation, where the key is (typically) a pass by
166  * value proxy for a pass by reference type.  In this case, the abbreviated key
167  * is stored in datum1 in place of the actual first key column.
168  *
169  * When sorting single Datums, the data value is represented directly by
170  * datum1/isnull1 for pass by value types (or null values).  If the datatype is
171  * pass-by-reference and isnull1 is false, then "tuple" points to a separately
172  * palloc'd data value, otherwise "tuple" is NULL.  The value of datum1 is then
173  * either the same pointer as "tuple", or is an abbreviated key value as
174  * described above.  Accordingly, "tuple" is always used in preference to
175  * datum1 as the authoritative value for pass-by-reference cases.
176  */
177 typedef struct
178 {
179 	void	   *tuple;			/* the tuple itself */
180 	Datum		datum1;			/* value of first key column */
181 	bool		isnull1;		/* is first key column NULL? */
182 	int			srctape;		/* source tape number */
183 } SortTuple;
184 
185 /*
186  * During merge, we use a pre-allocated set of fixed-size slots to hold
187  * tuples.  To avoid palloc/pfree overhead.
188  *
189  * Merge doesn't require a lot of memory, so we can afford to waste some,
190  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
191  * palloc() overhead is not significant anymore.
192  *
193  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
194  * slot holds a tuple.
195  */
196 #define SLAB_SLOT_SIZE 1024
197 
198 typedef union SlabSlot
199 {
200 	union SlabSlot *nextfree;
201 	char		buffer[SLAB_SLOT_SIZE];
202 } SlabSlot;
203 
204 /*
205  * Possible states of a Tuplesort object.  These denote the states that
206  * persist between calls of Tuplesort routines.
207  */
208 typedef enum
209 {
210 	TSS_INITIAL,				/* Loading tuples; still within memory limit */
211 	TSS_BOUNDED,				/* Loading tuples into bounded-size heap */
212 	TSS_BUILDRUNS,				/* Loading tuples; writing to tape */
213 	TSS_SORTEDINMEM,			/* Sort completed entirely in memory */
214 	TSS_SORTEDONTAPE,			/* Sort completed, final run is on tape */
215 	TSS_FINALMERGE				/* Performing final merge on-the-fly */
216 } TupSortStatus;
217 
218 /*
219  * Parameters for calculation of number of tapes to use --- see inittapes()
220  * and tuplesort_merge_order().
221  *
222  * In this calculation we assume that each tape will cost us about 1 blocks
223  * worth of buffer space.  This ignores the overhead of all the other data
224  * structures needed for each tape, but it's probably close enough.
225  *
226  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
227  * tape during a preread cycle (see discussion at top of file).
228  */
229 #define MINORDER		6		/* minimum merge order */
230 #define MAXORDER		500		/* maximum merge order */
231 #define TAPE_BUFFER_OVERHEAD		BLCKSZ
232 #define MERGE_BUFFER_SIZE			(BLCKSZ * 32)
233 
234 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
235 									Tuplesortstate *state);
236 
237 /*
238  * Private state of a Tuplesort operation.
239  */
240 struct Tuplesortstate
241 {
242 	TupSortStatus status;		/* enumerated value as shown above */
243 	int			nKeys;			/* number of columns in sort key */
244 	bool		randomAccess;	/* did caller request random access? */
245 	bool		bounded;		/* did caller specify a maximum number of
246 								 * tuples to return? */
247 	bool		boundUsed;		/* true if we made use of a bounded heap */
248 	int			bound;			/* if bounded, the maximum number of tuples */
249 	bool		tuples;			/* Can SortTuple.tuple ever be set? */
250 	int64		availMem;		/* remaining memory available, in bytes */
251 	int64		allowedMem;		/* total memory allowed, in bytes */
252 	int			maxTapes;		/* number of tapes (Knuth's T) */
253 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
254 	int64		maxSpace;		/* maximum amount of space occupied among sort
255 								 * of groups, either in-memory or on-disk */
256 	bool		isMaxSpaceDisk; /* true when maxSpace is value for on-disk
257 								 * space, false when it's value for in-memory
258 								 * space */
259 	TupSortStatus maxSpaceStatus;	/* sort status when maxSpace was reached */
260 	MemoryContext maincontext;	/* memory context for tuple sort metadata that
261 								 * persists across multiple batches */
262 	MemoryContext sortcontext;	/* memory context holding most sort data */
263 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
264 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
265 
266 	/*
267 	 * These function pointers decouple the routines that must know what kind
268 	 * of tuple we are sorting from the routines that don't need to know it.
269 	 * They are set up by the tuplesort_begin_xxx routines.
270 	 *
271 	 * Function to compare two tuples; result is per qsort() convention, ie:
272 	 * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
273 	 * qsort_arg_comparator.
274 	 */
275 	SortTupleComparator comparetup;
276 
277 	/*
278 	 * Function to copy a supplied input tuple into palloc'd space and set up
279 	 * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
280 	 * state->availMem must be decreased by the amount of space used for the
281 	 * tuple copy (note the SortTuple struct itself is not counted).
282 	 */
283 	void		(*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
284 
285 	/*
286 	 * Function to write a stored tuple onto tape.  The representation of the
287 	 * tuple on tape need not be the same as it is in memory; requirements on
288 	 * the tape representation are given below.  Unless the slab allocator is
289 	 * used, after writing the tuple, pfree() the out-of-line data (not the
290 	 * SortTuple struct!), and increase state->availMem by the amount of
291 	 * memory space thereby released.
292 	 */
293 	void		(*writetup) (Tuplesortstate *state, int tapenum,
294 							 SortTuple *stup);
295 
296 	/*
297 	 * Function to read a stored tuple from tape back into memory. 'len' is
298 	 * the already-read length of the stored tuple.  The tuple is allocated
299 	 * from the slab memory arena, or is palloc'd, see readtup_alloc().
300 	 */
301 	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
302 							int tapenum, unsigned int len);
303 
304 	/*
305 	 * This array holds the tuples now in sort memory.  If we are in state
306 	 * INITIAL, the tuples are in no particular order; if we are in state
307 	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
308 	 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
309 	 * H.  In state SORTEDONTAPE, the array is not used.
310 	 */
311 	SortTuple  *memtuples;		/* array of SortTuple structs */
312 	int			memtupcount;	/* number of tuples currently present */
313 	int			memtupsize;		/* allocated length of memtuples array */
314 	bool		growmemtuples;	/* memtuples' growth still underway? */
315 
316 	/*
317 	 * Memory for tuples is sometimes allocated using a simple slab allocator,
318 	 * rather than with palloc().  Currently, we switch to slab allocation
319 	 * when we start merging.  Merging only needs to keep a small, fixed
320 	 * number of tuples in memory at any time, so we can avoid the
321 	 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
322 	 * to hold the tuples.
323 	 *
324 	 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
325 	 * slots.  The allocation is sized to have one slot per tape, plus one
326 	 * additional slot.  We need that many slots to hold all the tuples kept
327 	 * in the heap during merge, plus the one we have last returned from the
328 	 * sort, with tuplesort_gettuple.
329 	 *
330 	 * Initially, all the slots are kept in a linked list of free slots.  When
331 	 * a tuple is read from a tape, it is put to the next available slot, if
332 	 * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
333 	 * instead.
334 	 *
335 	 * When we're done processing a tuple, we return the slot back to the free
336 	 * list, or pfree() if it was palloc'd.  We know that a tuple was
337 	 * allocated from the slab, if its pointer value is between
338 	 * slabMemoryBegin and -End.
339 	 *
340 	 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
341 	 * tracking memory usage is not used.
342 	 */
343 	bool		slabAllocatorUsed;
344 
345 	char	   *slabMemoryBegin;	/* beginning of slab memory arena */
346 	char	   *slabMemoryEnd;	/* end of slab memory arena */
347 	SlabSlot   *slabFreeHead;	/* head of free list */
348 
349 	/* Buffer size to use for reading input tapes, during merge. */
350 	size_t		read_buffer_size;
351 
352 	/*
353 	 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
354 	 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
355 	 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
356 	 * recycle the memory on next gettuple call.
357 	 */
358 	void	   *lastReturnedTuple;
359 
360 	/*
361 	 * While building initial runs, this is the current output run number.
362 	 * Afterwards, it is the number of initial runs we made.
363 	 */
364 	int			currentRun;
365 
366 	/*
367 	 * Unless otherwise noted, all pointer variables below are pointers to
368 	 * arrays of length maxTapes, holding per-tape data.
369 	 */
370 
371 	/*
372 	 * This variable is only used during merge passes.  mergeactive[i] is true
373 	 * if we are reading an input run from (actual) tape number i and have not
374 	 * yet exhausted that run.
375 	 */
376 	bool	   *mergeactive;	/* active input run source? */
377 
378 	/*
379 	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
380 	 * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
381 	 * "logical" and "actual" tape numbers straight!
382 	 */
383 	int			Level;			/* Knuth's l */
384 	int			destTape;		/* current output tape (Knuth's j, less 1) */
385 	int		   *tp_fib;			/* Target Fibonacci run counts (A[]) */
386 	int		   *tp_runs;		/* # of real runs on each tape */
387 	int		   *tp_dummy;		/* # of dummy runs for each tape (D[]) */
388 	int		   *tp_tapenum;		/* Actual tape numbers (TAPE[]) */
389 	int			activeTapes;	/* # of active input tapes in merge pass */
390 
391 	/*
392 	 * These variables are used after completion of sorting to keep track of
393 	 * the next tuple to return.  (In the tape case, the tape's current read
394 	 * position is also critical state.)
395 	 */
396 	int			result_tape;	/* actual tape number of finished output */
397 	int			current;		/* array index (only used if SORTEDINMEM) */
398 	bool		eof_reached;	/* reached EOF (needed for cursors) */
399 
400 	/* markpos_xxx holds marked position for mark and restore */
401 	long		markpos_block;	/* tape block# (only used if SORTEDONTAPE) */
402 	int			markpos_offset; /* saved "current", or offset in tape block */
403 	bool		markpos_eof;	/* saved "eof_reached" */
404 
405 	/*
406 	 * These variables are used during parallel sorting.
407 	 *
408 	 * worker is our worker identifier.  Follows the general convention that
409 	 * -1 value relates to a leader tuplesort, and values >= 0 worker
410 	 * tuplesorts. (-1 can also be a serial tuplesort.)
411 	 *
412 	 * shared is mutable shared memory state, which is used to coordinate
413 	 * parallel sorts.
414 	 *
415 	 * nParticipants is the number of worker Tuplesortstates known by the
416 	 * leader to have actually been launched, which implies that they must
417 	 * finish a run leader can merge.  Typically includes a worker state held
418 	 * by the leader process itself.  Set in the leader Tuplesortstate only.
419 	 */
420 	int			worker;
421 	Sharedsort *shared;
422 	int			nParticipants;
423 
424 	/*
425 	 * The sortKeys variable is used by every case other than the hash index
426 	 * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
427 	 * MinimalTuple and CLUSTER routines, though.
428 	 */
429 	TupleDesc	tupDesc;
430 	SortSupport sortKeys;		/* array of length nKeys */
431 
432 	/*
433 	 * This variable is shared by the single-key MinimalTuple case and the
434 	 * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
435 	 */
436 	SortSupport onlyKey;
437 
438 	/*
439 	 * Additional state for managing "abbreviated key" sortsupport routines
440 	 * (which currently may be used by all cases except the hash index case).
441 	 * Tracks the intervals at which the optimization's effectiveness is
442 	 * tested.
443 	 */
444 	int64		abbrevNext;		/* Tuple # at which to next check
445 								 * applicability */
446 
447 	/*
448 	 * These variables are specific to the CLUSTER case; they are set by
449 	 * tuplesort_begin_cluster.
450 	 */
451 	IndexInfo  *indexInfo;		/* info about index being used for reference */
452 	EState	   *estate;			/* for evaluating index expressions */
453 
454 	/*
455 	 * These variables are specific to the IndexTuple case; they are set by
456 	 * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
457 	 */
458 	Relation	heapRel;		/* table the index is being built on */
459 	Relation	indexRel;		/* index being built */
460 
461 	/* These are specific to the index_btree subcase: */
462 	bool		enforceUnique;	/* complain if we find duplicate tuples */
463 
464 	/* These are specific to the index_hash subcase: */
465 	uint32		high_mask;		/* masks for sortable part of hash code */
466 	uint32		low_mask;
467 	uint32		max_buckets;
468 
469 	/*
470 	 * These variables are specific to the Datum case; they are set by
471 	 * tuplesort_begin_datum and used only by the DatumTuple routines.
472 	 */
473 	Oid			datumType;
474 	/* we need typelen in order to know how to copy the Datums. */
475 	int			datumTypeLen;
476 
477 	/*
478 	 * Resource snapshot for time of sort start.
479 	 */
480 #ifdef TRACE_SORT
481 	PGRUsage	ru_start;
482 #endif
483 };
484 
485 /*
486  * Private mutable state of tuplesort-parallel-operation.  This is allocated
487  * in shared memory.
488  */
489 struct Sharedsort
490 {
491 	/* mutex protects all fields prior to tapes */
492 	slock_t		mutex;
493 
494 	/*
495 	 * currentWorker generates ordinal identifier numbers for parallel sort
496 	 * workers.  These start from 0, and are always gapless.
497 	 *
498 	 * Workers increment workersFinished to indicate having finished.  If this
499 	 * is equal to state.nParticipants within the leader, leader is ready to
500 	 * merge worker runs.
501 	 */
502 	int			currentWorker;
503 	int			workersFinished;
504 
505 	/* Temporary file space */
506 	SharedFileSet fileset;
507 
508 	/* Size of tapes flexible array */
509 	int			nTapes;
510 
511 	/*
512 	 * Tapes array used by workers to report back information needed by the
513 	 * leader to concatenate all worker tapes into one for merging
514 	 */
515 	TapeShare	tapes[FLEXIBLE_ARRAY_MEMBER];
516 };
517 
518 /*
519  * Is the given tuple allocated from the slab memory arena?
520  */
521 #define IS_SLAB_SLOT(state, tuple) \
522 	((char *) (tuple) >= (state)->slabMemoryBegin && \
523 	 (char *) (tuple) < (state)->slabMemoryEnd)
524 
525 /*
526  * Return the given tuple to the slab memory free list, or free it
527  * if it was palloc'd.
528  */
529 #define RELEASE_SLAB_SLOT(state, tuple) \
530 	do { \
531 		SlabSlot *buf = (SlabSlot *) tuple; \
532 		\
533 		if (IS_SLAB_SLOT((state), buf)) \
534 		{ \
535 			buf->nextfree = (state)->slabFreeHead; \
536 			(state)->slabFreeHead = buf; \
537 		} else \
538 			pfree(buf); \
539 	} while(0)
540 
541 #define COMPARETUP(state,a,b)	((*(state)->comparetup) (a, b, state))
542 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
543 #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
544 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
545 #define LACKMEM(state)		((state)->availMem < 0 && !(state)->slabAllocatorUsed)
546 #define USEMEM(state,amt)	((state)->availMem -= (amt))
547 #define FREEMEM(state,amt)	((state)->availMem += (amt))
548 #define SERIAL(state)		((state)->shared == NULL)
549 #define WORKER(state)		((state)->shared && (state)->worker != -1)
550 #define LEADER(state)		((state)->shared && (state)->worker == -1)
551 
552 /*
553  * NOTES about on-tape representation of tuples:
554  *
555  * We require the first "unsigned int" of a stored tuple to be the total size
556  * on-tape of the tuple, including itself (so it is never zero; an all-zero
557  * unsigned int is used to delimit runs).  The remainder of the stored tuple
558  * may or may not match the in-memory representation of the tuple ---
559  * any conversion needed is the job of the writetup and readtup routines.
560  *
561  * If state->randomAccess is true, then the stored representation of the
562  * tuple must be followed by another "unsigned int" that is a copy of the
563  * length --- so the total tape space used is actually sizeof(unsigned int)
564  * more than the stored length value.  This allows read-backwards.  When
565  * randomAccess is not true, the write/read routines may omit the extra
566  * length word.
567  *
568  * writetup is expected to write both length words as well as the tuple
569  * data.  When readtup is called, the tape is positioned just after the
570  * front length word; readtup must read the tuple data and advance past
571  * the back length word (if present).
572  *
573  * The write/read routines can make use of the tuple description data
574  * stored in the Tuplesortstate record, if needed.  They are also expected
575  * to adjust state->availMem by the amount of memory space (not tape space!)
576  * released or consumed.  There is no error return from either writetup
577  * or readtup; they should ereport() on failure.
578  *
579  *
580  * NOTES about memory consumption calculations:
581  *
582  * We count space allocated for tuples against the workMem limit, plus
583  * the space used by the variable-size memtuples array.  Fixed-size space
584  * is not counted; it's small enough to not be interesting.
585  *
586  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
587  * rather than the originally-requested size.  This is important since
588  * palloc can add substantial overhead.  It's not a complete answer since
589  * we won't count any wasted space in palloc allocation blocks, but it's
590  * a lot better than what we were doing before 7.3.  As of 9.6, a
591  * separate memory context is used for caller passed tuples.  Resetting
592  * it at certain key increments significantly ameliorates fragmentation.
593  * Note that this places a responsibility on copytup routines to use the
594  * correct memory context for these tuples (and to not use the reset
595  * context for anything whose lifetime needs to span multiple external
596  * sort runs).  readtup routines use the slab allocator (they cannot use
597  * the reset context because it gets deleted at the point that merging
598  * begins).
599  */
600 
601 /* When using this macro, beware of double evaluation of len */
602 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
603 	do { \
604 		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
605 			elog(ERROR, "unexpected end of data"); \
606 	} while(0)
607 
608 
609 static Tuplesortstate *tuplesort_begin_common(int workMem,
610 											  SortCoordinate coordinate,
611 											  bool randomAccess);
612 static void tuplesort_begin_batch(Tuplesortstate *state);
613 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
614 static bool consider_abort_common(Tuplesortstate *state);
615 static void inittapes(Tuplesortstate *state, bool mergeruns);
616 static void inittapestate(Tuplesortstate *state, int maxTapes);
617 static void selectnewtape(Tuplesortstate *state);
618 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
619 static void mergeruns(Tuplesortstate *state);
620 static void mergeonerun(Tuplesortstate *state);
621 static void beginmerge(Tuplesortstate *state);
622 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
623 static void dumptuples(Tuplesortstate *state, bool alltuples);
624 static void make_bounded_heap(Tuplesortstate *state);
625 static void sort_bounded_heap(Tuplesortstate *state);
626 static void tuplesort_sort_memtuples(Tuplesortstate *state);
627 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
628 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
629 static void tuplesort_heap_delete_top(Tuplesortstate *state);
630 static void reversedirection(Tuplesortstate *state);
631 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
632 static void markrunend(Tuplesortstate *state, int tapenum);
633 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
634 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
635 							Tuplesortstate *state);
636 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
637 static void writetup_heap(Tuplesortstate *state, int tapenum,
638 						  SortTuple *stup);
639 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
640 						 int tapenum, unsigned int len);
641 static int	comparetup_cluster(const SortTuple *a, const SortTuple *b,
642 							   Tuplesortstate *state);
643 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
644 static void writetup_cluster(Tuplesortstate *state, int tapenum,
645 							 SortTuple *stup);
646 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
647 							int tapenum, unsigned int len);
648 static int	comparetup_index_btree(const SortTuple *a, const SortTuple *b,
649 								   Tuplesortstate *state);
650 static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
651 								  Tuplesortstate *state);
652 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
653 static void writetup_index(Tuplesortstate *state, int tapenum,
654 						   SortTuple *stup);
655 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
656 						  int tapenum, unsigned int len);
657 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
658 							 Tuplesortstate *state);
659 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
660 static void writetup_datum(Tuplesortstate *state, int tapenum,
661 						   SortTuple *stup);
662 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
663 						  int tapenum, unsigned int len);
664 static int	worker_get_identifier(Tuplesortstate *state);
665 static void worker_freeze_result_tape(Tuplesortstate *state);
666 static void worker_nomergeruns(Tuplesortstate *state);
667 static void leader_takeover_tapes(Tuplesortstate *state);
668 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
669 static void tuplesort_free(Tuplesortstate *state);
670 static void tuplesort_updatemax(Tuplesortstate *state);
671 
672 /*
673  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
674  * any variant of SortTuples, using the appropriate comparetup function.
675  * qsort_ssup() is specialized for the case where the comparetup function
676  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
677  * and Datum sorts.
678  */
679 #include "qsort_tuple.c"
680 
681 
682 /*
683  *		tuplesort_begin_xxx
684  *
685  * Initialize for a tuple sort operation.
686  *
687  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
688  * zero or more times, then call tuplesort_performsort when all the tuples
689  * have been supplied.  After performsort, retrieve the tuples in sorted
690  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
691  * access was requested, rescan, markpos, and restorepos can also be called.)
692  * Call tuplesort_end to terminate the operation and release memory/disk space.
693  *
694  * Each variant of tuplesort_begin has a workMem parameter specifying the
695  * maximum number of kilobytes of RAM to use before spilling data to disk.
696  * (The normal value of this parameter is work_mem, but some callers use
697  * other values.)  Each variant also has a randomAccess parameter specifying
698  * whether the caller needs non-sequential access to the sort result.
699  */
700 
701 static Tuplesortstate *
tuplesort_begin_common(int workMem,SortCoordinate coordinate,bool randomAccess)702 tuplesort_begin_common(int workMem, SortCoordinate coordinate,
703 					   bool randomAccess)
704 {
705 	Tuplesortstate *state;
706 	MemoryContext maincontext;
707 	MemoryContext sortcontext;
708 	MemoryContext oldcontext;
709 
710 	/* See leader_takeover_tapes() remarks on randomAccess support */
711 	if (coordinate && randomAccess)
712 		elog(ERROR, "random access disallowed under parallel sort");
713 
714 	/*
715 	 * Memory context surviving tuplesort_reset.  This memory context holds
716 	 * data which is useful to keep while sorting multiple similar batches.
717 	 */
718 	maincontext = AllocSetContextCreate(CurrentMemoryContext,
719 										"TupleSort main",
720 										ALLOCSET_DEFAULT_SIZES);
721 
722 	/*
723 	 * Create a working memory context for one sort operation.  The content of
724 	 * this context is deleted by tuplesort_reset.
725 	 */
726 	sortcontext = AllocSetContextCreate(maincontext,
727 										"TupleSort sort",
728 										ALLOCSET_DEFAULT_SIZES);
729 
730 	/*
731 	 * Additionally a working memory context for tuples is setup in
732 	 * tuplesort_begin_batch.
733 	 */
734 
735 	/*
736 	 * Make the Tuplesortstate within the per-sortstate context.  This way, we
737 	 * don't need a separate pfree() operation for it at shutdown.
738 	 */
739 	oldcontext = MemoryContextSwitchTo(maincontext);
740 
741 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
742 
743 #ifdef TRACE_SORT
744 	if (trace_sort)
745 		pg_rusage_init(&state->ru_start);
746 #endif
747 
748 	state->randomAccess = randomAccess;
749 	state->tuples = true;
750 
751 	/*
752 	 * workMem is forced to be at least 64KB, the current minimum valid value
753 	 * for the work_mem GUC.  This is a defense against parallel sort callers
754 	 * that divide out memory among many workers in a way that leaves each
755 	 * with very little memory.
756 	 */
757 	state->allowedMem = Max(workMem, 64) * (int64) 1024;
758 	state->sortcontext = sortcontext;
759 	state->maincontext = maincontext;
760 
761 	/*
762 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
763 	 * see comments in grow_memtuples().
764 	 */
765 	state->memtupsize = INITIAL_MEMTUPSIZE;
766 	state->memtuples = NULL;
767 
768 	/*
769 	 * After all of the other non-parallel-related state, we setup all of the
770 	 * state needed for each batch.
771 	 */
772 	tuplesort_begin_batch(state);
773 
774 	/*
775 	 * Initialize parallel-related state based on coordination information
776 	 * from caller
777 	 */
778 	if (!coordinate)
779 	{
780 		/* Serial sort */
781 		state->shared = NULL;
782 		state->worker = -1;
783 		state->nParticipants = -1;
784 	}
785 	else if (coordinate->isWorker)
786 	{
787 		/* Parallel worker produces exactly one final run from all input */
788 		state->shared = coordinate->sharedsort;
789 		state->worker = worker_get_identifier(state);
790 		state->nParticipants = -1;
791 	}
792 	else
793 	{
794 		/* Parallel leader state only used for final merge */
795 		state->shared = coordinate->sharedsort;
796 		state->worker = -1;
797 		state->nParticipants = coordinate->nParticipants;
798 		Assert(state->nParticipants >= 1);
799 	}
800 
801 	MemoryContextSwitchTo(oldcontext);
802 
803 	return state;
804 }
805 
806 /*
807  *		tuplesort_begin_batch
808  *
809  * Setup, or reset, all state need for processing a new set of tuples with this
810  * sort state. Called both from tuplesort_begin_common (the first time sorting
811  * with this sort state) and tuplesort_reset (for subsequent usages).
812  */
813 static void
tuplesort_begin_batch(Tuplesortstate * state)814 tuplesort_begin_batch(Tuplesortstate *state)
815 {
816 	MemoryContext oldcontext;
817 
818 	oldcontext = MemoryContextSwitchTo(state->maincontext);
819 
820 	/*
821 	 * Caller tuple (e.g. IndexTuple) memory context.
822 	 *
823 	 * A dedicated child context used exclusively for caller passed tuples
824 	 * eases memory management.  Resetting at key points reduces
825 	 * fragmentation. Note that the memtuples array of SortTuples is allocated
826 	 * in the parent context, not this context, because there is no need to
827 	 * free memtuples early.
828 	 */
829 	state->tuplecontext = AllocSetContextCreate(state->sortcontext,
830 												"Caller tuples",
831 												ALLOCSET_DEFAULT_SIZES);
832 
833 	state->status = TSS_INITIAL;
834 	state->bounded = false;
835 	state->boundUsed = false;
836 
837 	state->availMem = state->allowedMem;
838 
839 	state->tapeset = NULL;
840 
841 	state->memtupcount = 0;
842 
843 	/*
844 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
845 	 * see comments in grow_memtuples().
846 	 */
847 	state->growmemtuples = true;
848 	state->slabAllocatorUsed = false;
849 	if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
850 	{
851 		pfree(state->memtuples);
852 		state->memtuples = NULL;
853 		state->memtupsize = INITIAL_MEMTUPSIZE;
854 	}
855 	if (state->memtuples == NULL)
856 	{
857 		state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
858 		USEMEM(state, GetMemoryChunkSpace(state->memtuples));
859 	}
860 
861 	/* workMem must be large enough for the minimal memtuples array */
862 	if (LACKMEM(state))
863 		elog(ERROR, "insufficient memory allowed for sort");
864 
865 	state->currentRun = 0;
866 
867 	/*
868 	 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
869 	 * inittapes(), if needed
870 	 */
871 
872 	state->result_tape = -1;	/* flag that result tape has not been formed */
873 
874 	MemoryContextSwitchTo(oldcontext);
875 }
876 
877 Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,int nkeys,AttrNumber * attNums,Oid * sortOperators,Oid * sortCollations,bool * nullsFirstFlags,int workMem,SortCoordinate coordinate,bool randomAccess)878 tuplesort_begin_heap(TupleDesc tupDesc,
879 					 int nkeys, AttrNumber *attNums,
880 					 Oid *sortOperators, Oid *sortCollations,
881 					 bool *nullsFirstFlags,
882 					 int workMem, SortCoordinate coordinate, bool randomAccess)
883 {
884 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
885 												   randomAccess);
886 	MemoryContext oldcontext;
887 	int			i;
888 
889 	oldcontext = MemoryContextSwitchTo(state->maincontext);
890 
891 	AssertArg(nkeys > 0);
892 
893 #ifdef TRACE_SORT
894 	if (trace_sort)
895 		elog(LOG,
896 			 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
897 			 nkeys, workMem, randomAccess ? 't' : 'f');
898 #endif
899 
900 	state->nKeys = nkeys;
901 
902 	TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
903 								false,	/* no unique check */
904 								nkeys,
905 								workMem,
906 								randomAccess,
907 								PARALLEL_SORT(state));
908 
909 	state->comparetup = comparetup_heap;
910 	state->copytup = copytup_heap;
911 	state->writetup = writetup_heap;
912 	state->readtup = readtup_heap;
913 
914 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
915 	state->abbrevNext = 10;
916 
917 	/* Prepare SortSupport data for each column */
918 	state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
919 
920 	for (i = 0; i < nkeys; i++)
921 	{
922 		SortSupport sortKey = state->sortKeys + i;
923 
924 		AssertArg(attNums[i] != 0);
925 		AssertArg(sortOperators[i] != 0);
926 
927 		sortKey->ssup_cxt = CurrentMemoryContext;
928 		sortKey->ssup_collation = sortCollations[i];
929 		sortKey->ssup_nulls_first = nullsFirstFlags[i];
930 		sortKey->ssup_attno = attNums[i];
931 		/* Convey if abbreviation optimization is applicable in principle */
932 		sortKey->abbreviate = (i == 0);
933 
934 		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
935 	}
936 
937 	/*
938 	 * The "onlyKey" optimization cannot be used with abbreviated keys, since
939 	 * tie-breaker comparisons may be required.  Typically, the optimization
940 	 * is only of value to pass-by-value types anyway, whereas abbreviated
941 	 * keys are typically only of value to pass-by-reference types.
942 	 */
943 	if (nkeys == 1 && !state->sortKeys->abbrev_converter)
944 		state->onlyKey = state->sortKeys;
945 
946 	MemoryContextSwitchTo(oldcontext);
947 
948 	return state;
949 }
950 
951 Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,Relation indexRel,int workMem,SortCoordinate coordinate,bool randomAccess)952 tuplesort_begin_cluster(TupleDesc tupDesc,
953 						Relation indexRel,
954 						int workMem,
955 						SortCoordinate coordinate, bool randomAccess)
956 {
957 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
958 												   randomAccess);
959 	BTScanInsert indexScanKey;
960 	MemoryContext oldcontext;
961 	int			i;
962 
963 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
964 
965 	oldcontext = MemoryContextSwitchTo(state->maincontext);
966 
967 #ifdef TRACE_SORT
968 	if (trace_sort)
969 		elog(LOG,
970 			 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
971 			 RelationGetNumberOfAttributes(indexRel),
972 			 workMem, randomAccess ? 't' : 'f');
973 #endif
974 
975 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
976 
977 	TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
978 								false,	/* no unique check */
979 								state->nKeys,
980 								workMem,
981 								randomAccess,
982 								PARALLEL_SORT(state));
983 
984 	state->comparetup = comparetup_cluster;
985 	state->copytup = copytup_cluster;
986 	state->writetup = writetup_cluster;
987 	state->readtup = readtup_cluster;
988 	state->abbrevNext = 10;
989 
990 	state->indexInfo = BuildIndexInfo(indexRel);
991 
992 	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
993 
994 	indexScanKey = _bt_mkscankey(indexRel, NULL);
995 
996 	if (state->indexInfo->ii_Expressions != NULL)
997 	{
998 		TupleTableSlot *slot;
999 		ExprContext *econtext;
1000 
1001 		/*
1002 		 * We will need to use FormIndexDatum to evaluate the index
1003 		 * expressions.  To do that, we need an EState, as well as a
1004 		 * TupleTableSlot to put the table tuples into.  The econtext's
1005 		 * scantuple has to point to that slot, too.
1006 		 */
1007 		state->estate = CreateExecutorState();
1008 		slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
1009 		econtext = GetPerTupleExprContext(state->estate);
1010 		econtext->ecxt_scantuple = slot;
1011 	}
1012 
1013 	/* Prepare SortSupport data for each column */
1014 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
1015 											sizeof(SortSupportData));
1016 
1017 	for (i = 0; i < state->nKeys; i++)
1018 	{
1019 		SortSupport sortKey = state->sortKeys + i;
1020 		ScanKey		scanKey = indexScanKey->scankeys + i;
1021 		int16		strategy;
1022 
1023 		sortKey->ssup_cxt = CurrentMemoryContext;
1024 		sortKey->ssup_collation = scanKey->sk_collation;
1025 		sortKey->ssup_nulls_first =
1026 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1027 		sortKey->ssup_attno = scanKey->sk_attno;
1028 		/* Convey if abbreviation optimization is applicable in principle */
1029 		sortKey->abbreviate = (i == 0);
1030 
1031 		AssertState(sortKey->ssup_attno != 0);
1032 
1033 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1034 			BTGreaterStrategyNumber : BTLessStrategyNumber;
1035 
1036 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1037 	}
1038 
1039 	pfree(indexScanKey);
1040 
1041 	MemoryContextSwitchTo(oldcontext);
1042 
1043 	return state;
1044 }
1045 
1046 Tuplesortstate *
tuplesort_begin_index_btree(Relation heapRel,Relation indexRel,bool enforceUnique,int workMem,SortCoordinate coordinate,bool randomAccess)1047 tuplesort_begin_index_btree(Relation heapRel,
1048 							Relation indexRel,
1049 							bool enforceUnique,
1050 							int workMem,
1051 							SortCoordinate coordinate,
1052 							bool randomAccess)
1053 {
1054 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1055 												   randomAccess);
1056 	BTScanInsert indexScanKey;
1057 	MemoryContext oldcontext;
1058 	int			i;
1059 
1060 	oldcontext = MemoryContextSwitchTo(state->maincontext);
1061 
1062 #ifdef TRACE_SORT
1063 	if (trace_sort)
1064 		elog(LOG,
1065 			 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
1066 			 enforceUnique ? 't' : 'f',
1067 			 workMem, randomAccess ? 't' : 'f');
1068 #endif
1069 
1070 	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
1071 
1072 	TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1073 								enforceUnique,
1074 								state->nKeys,
1075 								workMem,
1076 								randomAccess,
1077 								PARALLEL_SORT(state));
1078 
1079 	state->comparetup = comparetup_index_btree;
1080 	state->copytup = copytup_index;
1081 	state->writetup = writetup_index;
1082 	state->readtup = readtup_index;
1083 	state->abbrevNext = 10;
1084 
1085 	state->heapRel = heapRel;
1086 	state->indexRel = indexRel;
1087 	state->enforceUnique = enforceUnique;
1088 
1089 	indexScanKey = _bt_mkscankey(indexRel, NULL);
1090 
1091 	/* Prepare SortSupport data for each column */
1092 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
1093 											sizeof(SortSupportData));
1094 
1095 	for (i = 0; i < state->nKeys; i++)
1096 	{
1097 		SortSupport sortKey = state->sortKeys + i;
1098 		ScanKey		scanKey = indexScanKey->scankeys + i;
1099 		int16		strategy;
1100 
1101 		sortKey->ssup_cxt = CurrentMemoryContext;
1102 		sortKey->ssup_collation = scanKey->sk_collation;
1103 		sortKey->ssup_nulls_first =
1104 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1105 		sortKey->ssup_attno = scanKey->sk_attno;
1106 		/* Convey if abbreviation optimization is applicable in principle */
1107 		sortKey->abbreviate = (i == 0);
1108 
1109 		AssertState(sortKey->ssup_attno != 0);
1110 
1111 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1112 			BTGreaterStrategyNumber : BTLessStrategyNumber;
1113 
1114 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1115 	}
1116 
1117 	pfree(indexScanKey);
1118 
1119 	MemoryContextSwitchTo(oldcontext);
1120 
1121 	return state;
1122 }
1123 
1124 Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,Relation indexRel,uint32 high_mask,uint32 low_mask,uint32 max_buckets,int workMem,SortCoordinate coordinate,bool randomAccess)1125 tuplesort_begin_index_hash(Relation heapRel,
1126 						   Relation indexRel,
1127 						   uint32 high_mask,
1128 						   uint32 low_mask,
1129 						   uint32 max_buckets,
1130 						   int workMem,
1131 						   SortCoordinate coordinate,
1132 						   bool randomAccess)
1133 {
1134 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1135 												   randomAccess);
1136 	MemoryContext oldcontext;
1137 
1138 	oldcontext = MemoryContextSwitchTo(state->maincontext);
1139 
1140 #ifdef TRACE_SORT
1141 	if (trace_sort)
1142 		elog(LOG,
1143 			 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1144 			 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1145 			 high_mask,
1146 			 low_mask,
1147 			 max_buckets,
1148 			 workMem, randomAccess ? 't' : 'f');
1149 #endif
1150 
1151 	state->nKeys = 1;			/* Only one sort column, the hash code */
1152 
1153 	state->comparetup = comparetup_index_hash;
1154 	state->copytup = copytup_index;
1155 	state->writetup = writetup_index;
1156 	state->readtup = readtup_index;
1157 
1158 	state->heapRel = heapRel;
1159 	state->indexRel = indexRel;
1160 
1161 	state->high_mask = high_mask;
1162 	state->low_mask = low_mask;
1163 	state->max_buckets = max_buckets;
1164 
1165 	MemoryContextSwitchTo(oldcontext);
1166 
1167 	return state;
1168 }
1169 
1170 Tuplesortstate *
tuplesort_begin_datum(Oid datumType,Oid sortOperator,Oid sortCollation,bool nullsFirstFlag,int workMem,SortCoordinate coordinate,bool randomAccess)1171 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1172 					  bool nullsFirstFlag, int workMem,
1173 					  SortCoordinate coordinate, bool randomAccess)
1174 {
1175 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1176 												   randomAccess);
1177 	MemoryContext oldcontext;
1178 	int16		typlen;
1179 	bool		typbyval;
1180 
1181 	oldcontext = MemoryContextSwitchTo(state->maincontext);
1182 
1183 #ifdef TRACE_SORT
1184 	if (trace_sort)
1185 		elog(LOG,
1186 			 "begin datum sort: workMem = %d, randomAccess = %c",
1187 			 workMem, randomAccess ? 't' : 'f');
1188 #endif
1189 
1190 	state->nKeys = 1;			/* always a one-column sort */
1191 
1192 	TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1193 								false,	/* no unique check */
1194 								1,
1195 								workMem,
1196 								randomAccess,
1197 								PARALLEL_SORT(state));
1198 
1199 	state->comparetup = comparetup_datum;
1200 	state->copytup = copytup_datum;
1201 	state->writetup = writetup_datum;
1202 	state->readtup = readtup_datum;
1203 	state->abbrevNext = 10;
1204 
1205 	state->datumType = datumType;
1206 
1207 	/* lookup necessary attributes of the datum type */
1208 	get_typlenbyval(datumType, &typlen, &typbyval);
1209 	state->datumTypeLen = typlen;
1210 	state->tuples = !typbyval;
1211 
1212 	/* Prepare SortSupport data */
1213 	state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1214 
1215 	state->sortKeys->ssup_cxt = CurrentMemoryContext;
1216 	state->sortKeys->ssup_collation = sortCollation;
1217 	state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1218 
1219 	/*
1220 	 * Abbreviation is possible here only for by-reference types.  In theory,
1221 	 * a pass-by-value datatype could have an abbreviated form that is cheaper
1222 	 * to compare.  In a tuple sort, we could support that, because we can
1223 	 * always extract the original datum from the tuple as needed.  Here, we
1224 	 * can't, because a datum sort only stores a single copy of the datum; the
1225 	 * "tuple" field of each SortTuple is NULL.
1226 	 */
1227 	state->sortKeys->abbreviate = !typbyval;
1228 
1229 	PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1230 
1231 	/*
1232 	 * The "onlyKey" optimization cannot be used with abbreviated keys, since
1233 	 * tie-breaker comparisons may be required.  Typically, the optimization
1234 	 * is only of value to pass-by-value types anyway, whereas abbreviated
1235 	 * keys are typically only of value to pass-by-reference types.
1236 	 */
1237 	if (!state->sortKeys->abbrev_converter)
1238 		state->onlyKey = state->sortKeys;
1239 
1240 	MemoryContextSwitchTo(oldcontext);
1241 
1242 	return state;
1243 }
1244 
1245 /*
1246  * tuplesort_set_bound
1247  *
1248  *	Advise tuplesort that at most the first N result tuples are required.
1249  *
1250  * Must be called before inserting any tuples.  (Actually, we could allow it
1251  * as long as the sort hasn't spilled to disk, but there seems no need for
1252  * delayed calls at the moment.)
1253  *
1254  * This is a hint only. The tuplesort may still return more tuples than
1255  * requested.  Parallel leader tuplesorts will always ignore the hint.
1256  */
1257 void
tuplesort_set_bound(Tuplesortstate * state,int64 bound)1258 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1259 {
1260 	/* Assert we're called before loading any tuples */
1261 	Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
1262 	/* Can't set the bound twice, either */
1263 	Assert(!state->bounded);
1264 	/* Also, this shouldn't be called in a parallel worker */
1265 	Assert(!WORKER(state));
1266 
1267 	/* Parallel leader allows but ignores hint */
1268 	if (LEADER(state))
1269 		return;
1270 
1271 #ifdef DEBUG_BOUNDED_SORT
1272 	/* Honor GUC setting that disables the feature (for easy testing) */
1273 	if (!optimize_bounded_sort)
1274 		return;
1275 #endif
1276 
1277 	/* We want to be able to compute bound * 2, so limit the setting */
1278 	if (bound > (int64) (INT_MAX / 2))
1279 		return;
1280 
1281 	state->bounded = true;
1282 	state->bound = (int) bound;
1283 
1284 	/*
1285 	 * Bounded sorts are not an effective target for abbreviated key
1286 	 * optimization.  Disable by setting state to be consistent with no
1287 	 * abbreviation support.
1288 	 */
1289 	state->sortKeys->abbrev_converter = NULL;
1290 	if (state->sortKeys->abbrev_full_comparator)
1291 		state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1292 
1293 	/* Not strictly necessary, but be tidy */
1294 	state->sortKeys->abbrev_abort = NULL;
1295 	state->sortKeys->abbrev_full_comparator = NULL;
1296 }
1297 
1298 /*
1299  * tuplesort_used_bound
1300  *
1301  * Allow callers to find out if the sort state was able to use a bound.
1302  */
1303 bool
tuplesort_used_bound(Tuplesortstate * state)1304 tuplesort_used_bound(Tuplesortstate *state)
1305 {
1306 	return state->boundUsed;
1307 }
1308 
1309 /*
1310  * tuplesort_free
1311  *
1312  *	Internal routine for freeing resources of tuplesort.
1313  */
1314 static void
tuplesort_free(Tuplesortstate * state)1315 tuplesort_free(Tuplesortstate *state)
1316 {
1317 	/* context swap probably not needed, but let's be safe */
1318 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1319 
1320 #ifdef TRACE_SORT
1321 	long		spaceUsed;
1322 
1323 	if (state->tapeset)
1324 		spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1325 	else
1326 		spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1327 #endif
1328 
1329 	/*
1330 	 * Delete temporary "tape" files, if any.
1331 	 *
1332 	 * Note: want to include this in reported total cost of sort, hence need
1333 	 * for two #ifdef TRACE_SORT sections.
1334 	 */
1335 	if (state->tapeset)
1336 		LogicalTapeSetClose(state->tapeset);
1337 
1338 #ifdef TRACE_SORT
1339 	if (trace_sort)
1340 	{
1341 		if (state->tapeset)
1342 			elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1343 				 SERIAL(state) ? "external sort" : "parallel external sort",
1344 				 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1345 		else
1346 			elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1347 				 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1348 				 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1349 	}
1350 
1351 	TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1352 #else
1353 
1354 	/*
1355 	 * If you disabled TRACE_SORT, you can still probe sort__done, but you
1356 	 * ain't getting space-used stats.
1357 	 */
1358 	TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1359 #endif
1360 
1361 	/* Free any execution state created for CLUSTER case */
1362 	if (state->estate != NULL)
1363 	{
1364 		ExprContext *econtext = GetPerTupleExprContext(state->estate);
1365 
1366 		ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1367 		FreeExecutorState(state->estate);
1368 	}
1369 
1370 	MemoryContextSwitchTo(oldcontext);
1371 
1372 	/*
1373 	 * Free the per-sort memory context, thereby releasing all working memory.
1374 	 */
1375 	MemoryContextReset(state->sortcontext);
1376 }
1377 
1378 /*
1379  * tuplesort_end
1380  *
1381  *	Release resources and clean up.
1382  *
1383  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1384  * pointing to garbage.  Be careful not to attempt to use or free such
1385  * pointers afterwards!
1386  */
1387 void
tuplesort_end(Tuplesortstate * state)1388 tuplesort_end(Tuplesortstate *state)
1389 {
1390 	tuplesort_free(state);
1391 
1392 	/*
1393 	 * Free the main memory context, including the Tuplesortstate struct
1394 	 * itself.
1395 	 */
1396 	MemoryContextDelete(state->maincontext);
1397 }
1398 
1399 /*
1400  * tuplesort_updatemax
1401  *
1402  *	Update maximum resource usage statistics.
1403  */
1404 static void
tuplesort_updatemax(Tuplesortstate * state)1405 tuplesort_updatemax(Tuplesortstate *state)
1406 {
1407 	int64		spaceUsed;
1408 	bool		isSpaceDisk;
1409 
1410 	/*
1411 	 * Note: it might seem we should provide both memory and disk usage for a
1412 	 * disk-based sort.  However, the current code doesn't track memory space
1413 	 * accurately once we have begun to return tuples to the caller (since we
1414 	 * don't account for pfree's the caller is expected to do), so we cannot
1415 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
1416 	 * to fix.  Is it worth creating an API for the memory context code to
1417 	 * tell us how much is actually used in sortcontext?
1418 	 */
1419 	if (state->tapeset)
1420 	{
1421 		isSpaceDisk = true;
1422 		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1423 	}
1424 	else
1425 	{
1426 		isSpaceDisk = false;
1427 		spaceUsed = state->allowedMem - state->availMem;
1428 	}
1429 
1430 	/*
1431 	 * Sort evicts data to the disk when it wasn't able to fit that data into
1432 	 * main memory.  This is why we assume space used on the disk to be more
1433 	 * important for tracking resource usage than space used in memory. Note
1434 	 * that the amount of space occupied by some tupleset on the disk might be
1435 	 * less than amount of space occupied by the same tupleset in memory due
1436 	 * to more compact representation.
1437 	 */
1438 	if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1439 		(isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1440 	{
1441 		state->maxSpace = spaceUsed;
1442 		state->isMaxSpaceDisk = isSpaceDisk;
1443 		state->maxSpaceStatus = state->status;
1444 	}
1445 }
1446 
1447 /*
1448  * tuplesort_reset
1449  *
1450  *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
1451  *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
1452  *	a new sort.  This allows avoiding recreation of tuple sort states (and
1453  *	save resources) when sorting multiple small batches.
1454  */
1455 void
tuplesort_reset(Tuplesortstate * state)1456 tuplesort_reset(Tuplesortstate *state)
1457 {
1458 	tuplesort_updatemax(state);
1459 	tuplesort_free(state);
1460 
1461 	/*
1462 	 * After we've freed up per-batch memory, re-setup all of the state common
1463 	 * to both the first batch and any subsequent batch.
1464 	 */
1465 	tuplesort_begin_batch(state);
1466 
1467 	state->lastReturnedTuple = NULL;
1468 	state->slabMemoryBegin = NULL;
1469 	state->slabMemoryEnd = NULL;
1470 	state->slabFreeHead = NULL;
1471 }
1472 
1473 /*
1474  * Grow the memtuples[] array, if possible within our memory constraint.  We
1475  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1476  * limit.  Return true if we were able to enlarge the array, false if not.
1477  *
1478  * Normally, at each increment we double the size of the array.  When doing
1479  * that would exceed a limit, we attempt one last, smaller increase (and then
1480  * clear the growmemtuples flag so we don't try any more).  That allows us to
1481  * use memory as fully as permitted; sticking to the pure doubling rule could
1482  * result in almost half going unused.  Because availMem moves around with
1483  * tuple addition/removal, we need some rule to prevent making repeated small
1484  * increases in memtupsize, which would just be useless thrashing.  The
1485  * growmemtuples flag accomplishes that and also prevents useless
1486  * recalculations in this function.
1487  */
1488 static bool
grow_memtuples(Tuplesortstate * state)1489 grow_memtuples(Tuplesortstate *state)
1490 {
1491 	int			newmemtupsize;
1492 	int			memtupsize = state->memtupsize;
1493 	int64		memNowUsed = state->allowedMem - state->availMem;
1494 
1495 	/* Forget it if we've already maxed out memtuples, per comment above */
1496 	if (!state->growmemtuples)
1497 		return false;
1498 
1499 	/* Select new value of memtupsize */
1500 	if (memNowUsed <= state->availMem)
1501 	{
1502 		/*
1503 		 * We've used no more than half of allowedMem; double our usage,
1504 		 * clamping at INT_MAX tuples.
1505 		 */
1506 		if (memtupsize < INT_MAX / 2)
1507 			newmemtupsize = memtupsize * 2;
1508 		else
1509 		{
1510 			newmemtupsize = INT_MAX;
1511 			state->growmemtuples = false;
1512 		}
1513 	}
1514 	else
1515 	{
1516 		/*
1517 		 * This will be the last increment of memtupsize.  Abandon doubling
1518 		 * strategy and instead increase as much as we safely can.
1519 		 *
1520 		 * To stay within allowedMem, we can't increase memtupsize by more
1521 		 * than availMem / sizeof(SortTuple) elements.  In practice, we want
1522 		 * to increase it by considerably less, because we need to leave some
1523 		 * space for the tuples to which the new array slots will refer.  We
1524 		 * assume the new tuples will be about the same size as the tuples
1525 		 * we've already seen, and thus we can extrapolate from the space
1526 		 * consumption so far to estimate an appropriate new size for the
1527 		 * memtuples array.  The optimal value might be higher or lower than
1528 		 * this estimate, but it's hard to know that in advance.  We again
1529 		 * clamp at INT_MAX tuples.
1530 		 *
1531 		 * This calculation is safe against enlarging the array so much that
1532 		 * LACKMEM becomes true, because the memory currently used includes
1533 		 * the present array; thus, there would be enough allowedMem for the
1534 		 * new array elements even if no other memory were currently used.
1535 		 *
1536 		 * We do the arithmetic in float8, because otherwise the product of
1537 		 * memtupsize and allowedMem could overflow.  Any inaccuracy in the
1538 		 * result should be insignificant; but even if we computed a
1539 		 * completely insane result, the checks below will prevent anything
1540 		 * really bad from happening.
1541 		 */
1542 		double		grow_ratio;
1543 
1544 		grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1545 		if (memtupsize * grow_ratio < INT_MAX)
1546 			newmemtupsize = (int) (memtupsize * grow_ratio);
1547 		else
1548 			newmemtupsize = INT_MAX;
1549 
1550 		/* We won't make any further enlargement attempts */
1551 		state->growmemtuples = false;
1552 	}
1553 
1554 	/* Must enlarge array by at least one element, else report failure */
1555 	if (newmemtupsize <= memtupsize)
1556 		goto noalloc;
1557 
1558 	/*
1559 	 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
1560 	 * to ensure our request won't be rejected.  Note that we can easily
1561 	 * exhaust address space before facing this outcome.  (This is presently
1562 	 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1563 	 * don't rely on that at this distance.)
1564 	 */
1565 	if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1566 	{
1567 		newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1568 		state->growmemtuples = false;	/* can't grow any more */
1569 	}
1570 
1571 	/*
1572 	 * We need to be sure that we do not cause LACKMEM to become true, else
1573 	 * the space management algorithm will go nuts.  The code above should
1574 	 * never generate a dangerous request, but to be safe, check explicitly
1575 	 * that the array growth fits within availMem.  (We could still cause
1576 	 * LACKMEM if the memory chunk overhead associated with the memtuples
1577 	 * array were to increase.  That shouldn't happen because we chose the
1578 	 * initial array size large enough to ensure that palloc will be treating
1579 	 * both old and new arrays as separate chunks.  But we'll check LACKMEM
1580 	 * explicitly below just in case.)
1581 	 */
1582 	if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1583 		goto noalloc;
1584 
1585 	/* OK, do it */
1586 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1587 	state->memtupsize = newmemtupsize;
1588 	state->memtuples = (SortTuple *)
1589 		repalloc_huge(state->memtuples,
1590 					  state->memtupsize * sizeof(SortTuple));
1591 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1592 	if (LACKMEM(state))
1593 		elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1594 	return true;
1595 
1596 noalloc:
1597 	/* If for any reason we didn't realloc, shut off future attempts */
1598 	state->growmemtuples = false;
1599 	return false;
1600 }
1601 
1602 /*
1603  * Accept one tuple while collecting input data for sort.
1604  *
1605  * Note that the input data is always copied; the caller need not save it.
1606  */
1607 void
tuplesort_puttupleslot(Tuplesortstate * state,TupleTableSlot * slot)1608 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1609 {
1610 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1611 	SortTuple	stup;
1612 
1613 	/*
1614 	 * Copy the given tuple into memory we control, and decrease availMem.
1615 	 * Then call the common code.
1616 	 */
1617 	COPYTUP(state, &stup, (void *) slot);
1618 
1619 	puttuple_common(state, &stup);
1620 
1621 	MemoryContextSwitchTo(oldcontext);
1622 }
1623 
1624 /*
1625  * Accept one tuple while collecting input data for sort.
1626  *
1627  * Note that the input data is always copied; the caller need not save it.
1628  */
1629 void
tuplesort_putheaptuple(Tuplesortstate * state,HeapTuple tup)1630 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1631 {
1632 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1633 	SortTuple	stup;
1634 
1635 	/*
1636 	 * Copy the given tuple into memory we control, and decrease availMem.
1637 	 * Then call the common code.
1638 	 */
1639 	COPYTUP(state, &stup, (void *) tup);
1640 
1641 	puttuple_common(state, &stup);
1642 
1643 	MemoryContextSwitchTo(oldcontext);
1644 }
1645 
1646 /*
1647  * Collect one index tuple while collecting input data for sort, building
1648  * it from caller-supplied values.
1649  */
1650 void
tuplesort_putindextuplevalues(Tuplesortstate * state,Relation rel,ItemPointer self,Datum * values,bool * isnull)1651 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1652 							  ItemPointer self, Datum *values,
1653 							  bool *isnull)
1654 {
1655 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1656 	SortTuple	stup;
1657 	Datum		original;
1658 	IndexTuple	tuple;
1659 
1660 	stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1661 	tuple = ((IndexTuple) stup.tuple);
1662 	tuple->t_tid = *self;
1663 	USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1664 	/* set up first-column key value */
1665 	original = index_getattr(tuple,
1666 							 1,
1667 							 RelationGetDescr(state->indexRel),
1668 							 &stup.isnull1);
1669 
1670 	MemoryContextSwitchTo(state->sortcontext);
1671 
1672 	if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1673 	{
1674 		/*
1675 		 * Store ordinary Datum representation, or NULL value.  If there is a
1676 		 * converter it won't expect NULL values, and cost model is not
1677 		 * required to account for NULL, so in that case we avoid calling
1678 		 * converter and just set datum1 to zeroed representation (to be
1679 		 * consistent, and to support cheap inequality tests for NULL
1680 		 * abbreviated keys).
1681 		 */
1682 		stup.datum1 = original;
1683 	}
1684 	else if (!consider_abort_common(state))
1685 	{
1686 		/* Store abbreviated key representation */
1687 		stup.datum1 = state->sortKeys->abbrev_converter(original,
1688 														state->sortKeys);
1689 	}
1690 	else
1691 	{
1692 		/* Abort abbreviation */
1693 		int			i;
1694 
1695 		stup.datum1 = original;
1696 
1697 		/*
1698 		 * Set state to be consistent with never trying abbreviation.
1699 		 *
1700 		 * Alter datum1 representation in already-copied tuples, so as to
1701 		 * ensure a consistent representation (current tuple was just
1702 		 * handled).  It does not matter if some dumped tuples are already
1703 		 * sorted on tape, since serialized tuples lack abbreviated keys
1704 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1705 		 */
1706 		for (i = 0; i < state->memtupcount; i++)
1707 		{
1708 			SortTuple  *mtup = &state->memtuples[i];
1709 
1710 			tuple = mtup->tuple;
1711 			mtup->datum1 = index_getattr(tuple,
1712 										 1,
1713 										 RelationGetDescr(state->indexRel),
1714 										 &mtup->isnull1);
1715 		}
1716 	}
1717 
1718 	puttuple_common(state, &stup);
1719 
1720 	MemoryContextSwitchTo(oldcontext);
1721 }
1722 
1723 /*
1724  * Accept one Datum while collecting input data for sort.
1725  *
1726  * If the Datum is pass-by-ref type, the value will be copied.
1727  */
1728 void
tuplesort_putdatum(Tuplesortstate * state,Datum val,bool isNull)1729 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1730 {
1731 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1732 	SortTuple	stup;
1733 
1734 	/*
1735 	 * Pass-by-value types or null values are just stored directly in
1736 	 * stup.datum1 (and stup.tuple is not used and set to NULL).
1737 	 *
1738 	 * Non-null pass-by-reference values need to be copied into memory we
1739 	 * control, and possibly abbreviated. The copied value is pointed to by
1740 	 * stup.tuple and is treated as the canonical copy (e.g. to return via
1741 	 * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1742 	 * abbreviated value if abbreviation is happening, otherwise it's
1743 	 * identical to stup.tuple.
1744 	 */
1745 
1746 	if (isNull || !state->tuples)
1747 	{
1748 		/*
1749 		 * Set datum1 to zeroed representation for NULLs (to be consistent,
1750 		 * and to support cheap inequality tests for NULL abbreviated keys).
1751 		 */
1752 		stup.datum1 = !isNull ? val : (Datum) 0;
1753 		stup.isnull1 = isNull;
1754 		stup.tuple = NULL;		/* no separate storage */
1755 		MemoryContextSwitchTo(state->sortcontext);
1756 	}
1757 	else
1758 	{
1759 		Datum		original = datumCopy(val, false, state->datumTypeLen);
1760 
1761 		stup.isnull1 = false;
1762 		stup.tuple = DatumGetPointer(original);
1763 		USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1764 		MemoryContextSwitchTo(state->sortcontext);
1765 
1766 		if (!state->sortKeys->abbrev_converter)
1767 		{
1768 			stup.datum1 = original;
1769 		}
1770 		else if (!consider_abort_common(state))
1771 		{
1772 			/* Store abbreviated key representation */
1773 			stup.datum1 = state->sortKeys->abbrev_converter(original,
1774 															state->sortKeys);
1775 		}
1776 		else
1777 		{
1778 			/* Abort abbreviation */
1779 			int			i;
1780 
1781 			stup.datum1 = original;
1782 
1783 			/*
1784 			 * Set state to be consistent with never trying abbreviation.
1785 			 *
1786 			 * Alter datum1 representation in already-copied tuples, so as to
1787 			 * ensure a consistent representation (current tuple was just
1788 			 * handled).  It does not matter if some dumped tuples are already
1789 			 * sorted on tape, since serialized tuples lack abbreviated keys
1790 			 * (TSS_BUILDRUNS state prevents control reaching here in any
1791 			 * case).
1792 			 */
1793 			for (i = 0; i < state->memtupcount; i++)
1794 			{
1795 				SortTuple  *mtup = &state->memtuples[i];
1796 
1797 				mtup->datum1 = PointerGetDatum(mtup->tuple);
1798 			}
1799 		}
1800 	}
1801 
1802 	puttuple_common(state, &stup);
1803 
1804 	MemoryContextSwitchTo(oldcontext);
1805 }
1806 
1807 /*
1808  * Shared code for tuple and datum cases.
1809  */
1810 static void
puttuple_common(Tuplesortstate * state,SortTuple * tuple)1811 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1812 {
1813 	Assert(!LEADER(state));
1814 
1815 	switch (state->status)
1816 	{
1817 		case TSS_INITIAL:
1818 
1819 			/*
1820 			 * Save the tuple into the unsorted array.  First, grow the array
1821 			 * as needed.  Note that we try to grow the array when there is
1822 			 * still one free slot remaining --- if we fail, there'll still be
1823 			 * room to store the incoming tuple, and then we'll switch to
1824 			 * tape-based operation.
1825 			 */
1826 			if (state->memtupcount >= state->memtupsize - 1)
1827 			{
1828 				(void) grow_memtuples(state);
1829 				Assert(state->memtupcount < state->memtupsize);
1830 			}
1831 			state->memtuples[state->memtupcount++] = *tuple;
1832 
1833 			/*
1834 			 * Check if it's time to switch over to a bounded heapsort. We do
1835 			 * so if the input tuple count exceeds twice the desired tuple
1836 			 * count (this is a heuristic for where heapsort becomes cheaper
1837 			 * than a quicksort), or if we've just filled workMem and have
1838 			 * enough tuples to meet the bound.
1839 			 *
1840 			 * Note that once we enter TSS_BOUNDED state we will always try to
1841 			 * complete the sort that way.  In the worst case, if later input
1842 			 * tuples are larger than earlier ones, this might cause us to
1843 			 * exceed workMem significantly.
1844 			 */
1845 			if (state->bounded &&
1846 				(state->memtupcount > state->bound * 2 ||
1847 				 (state->memtupcount > state->bound && LACKMEM(state))))
1848 			{
1849 #ifdef TRACE_SORT
1850 				if (trace_sort)
1851 					elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1852 						 state->memtupcount,
1853 						 pg_rusage_show(&state->ru_start));
1854 #endif
1855 				make_bounded_heap(state);
1856 				return;
1857 			}
1858 
1859 			/*
1860 			 * Done if we still fit in available memory and have array slots.
1861 			 */
1862 			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1863 				return;
1864 
1865 			/*
1866 			 * Nope; time to switch to tape-based operation.
1867 			 */
1868 			inittapes(state, true);
1869 
1870 			/*
1871 			 * Dump all tuples.
1872 			 */
1873 			dumptuples(state, false);
1874 			break;
1875 
1876 		case TSS_BOUNDED:
1877 
1878 			/*
1879 			 * We don't want to grow the array here, so check whether the new
1880 			 * tuple can be discarded before putting it in.  This should be a
1881 			 * good speed optimization, too, since when there are many more
1882 			 * input tuples than the bound, most input tuples can be discarded
1883 			 * with just this one comparison.  Note that because we currently
1884 			 * have the sort direction reversed, we must check for <= not >=.
1885 			 */
1886 			if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1887 			{
1888 				/* new tuple <= top of the heap, so we can discard it */
1889 				free_sort_tuple(state, tuple);
1890 				CHECK_FOR_INTERRUPTS();
1891 			}
1892 			else
1893 			{
1894 				/* discard top of heap, replacing it with the new tuple */
1895 				free_sort_tuple(state, &state->memtuples[0]);
1896 				tuplesort_heap_replace_top(state, tuple);
1897 			}
1898 			break;
1899 
1900 		case TSS_BUILDRUNS:
1901 
1902 			/*
1903 			 * Save the tuple into the unsorted array (there must be space)
1904 			 */
1905 			state->memtuples[state->memtupcount++] = *tuple;
1906 
1907 			/*
1908 			 * If we are over the memory limit, dump all tuples.
1909 			 */
1910 			dumptuples(state, false);
1911 			break;
1912 
1913 		default:
1914 			elog(ERROR, "invalid tuplesort state");
1915 			break;
1916 	}
1917 }
1918 
1919 static bool
consider_abort_common(Tuplesortstate * state)1920 consider_abort_common(Tuplesortstate *state)
1921 {
1922 	Assert(state->sortKeys[0].abbrev_converter != NULL);
1923 	Assert(state->sortKeys[0].abbrev_abort != NULL);
1924 	Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
1925 
1926 	/*
1927 	 * Check effectiveness of abbreviation optimization.  Consider aborting
1928 	 * when still within memory limit.
1929 	 */
1930 	if (state->status == TSS_INITIAL &&
1931 		state->memtupcount >= state->abbrevNext)
1932 	{
1933 		state->abbrevNext *= 2;
1934 
1935 		/*
1936 		 * Check opclass-supplied abbreviation abort routine.  It may indicate
1937 		 * that abbreviation should not proceed.
1938 		 */
1939 		if (!state->sortKeys->abbrev_abort(state->memtupcount,
1940 										   state->sortKeys))
1941 			return false;
1942 
1943 		/*
1944 		 * Finally, restore authoritative comparator, and indicate that
1945 		 * abbreviation is not in play by setting abbrev_converter to NULL
1946 		 */
1947 		state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
1948 		state->sortKeys[0].abbrev_converter = NULL;
1949 		/* Not strictly necessary, but be tidy */
1950 		state->sortKeys[0].abbrev_abort = NULL;
1951 		state->sortKeys[0].abbrev_full_comparator = NULL;
1952 
1953 		/* Give up - expect original pass-by-value representation */
1954 		return true;
1955 	}
1956 
1957 	return false;
1958 }
1959 
1960 /*
1961  * All tuples have been provided; finish the sort.
1962  */
1963 void
tuplesort_performsort(Tuplesortstate * state)1964 tuplesort_performsort(Tuplesortstate *state)
1965 {
1966 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1967 
1968 #ifdef TRACE_SORT
1969 	if (trace_sort)
1970 		elog(LOG, "performsort of worker %d starting: %s",
1971 			 state->worker, pg_rusage_show(&state->ru_start));
1972 #endif
1973 
1974 	switch (state->status)
1975 	{
1976 		case TSS_INITIAL:
1977 
1978 			/*
1979 			 * We were able to accumulate all the tuples within the allowed
1980 			 * amount of memory, or leader to take over worker tapes
1981 			 */
1982 			if (SERIAL(state))
1983 			{
1984 				/* Just qsort 'em and we're done */
1985 				tuplesort_sort_memtuples(state);
1986 				state->status = TSS_SORTEDINMEM;
1987 			}
1988 			else if (WORKER(state))
1989 			{
1990 				/*
1991 				 * Parallel workers must still dump out tuples to tape.  No
1992 				 * merge is required to produce single output run, though.
1993 				 */
1994 				inittapes(state, false);
1995 				dumptuples(state, true);
1996 				worker_nomergeruns(state);
1997 				state->status = TSS_SORTEDONTAPE;
1998 			}
1999 			else
2000 			{
2001 				/*
2002 				 * Leader will take over worker tapes and merge worker runs.
2003 				 * Note that mergeruns sets the correct state->status.
2004 				 */
2005 				leader_takeover_tapes(state);
2006 				mergeruns(state);
2007 			}
2008 			state->current = 0;
2009 			state->eof_reached = false;
2010 			state->markpos_block = 0L;
2011 			state->markpos_offset = 0;
2012 			state->markpos_eof = false;
2013 			break;
2014 
2015 		case TSS_BOUNDED:
2016 
2017 			/*
2018 			 * We were able to accumulate all the tuples required for output
2019 			 * in memory, using a heap to eliminate excess tuples.  Now we
2020 			 * have to transform the heap to a properly-sorted array.
2021 			 */
2022 			sort_bounded_heap(state);
2023 			state->current = 0;
2024 			state->eof_reached = false;
2025 			state->markpos_offset = 0;
2026 			state->markpos_eof = false;
2027 			state->status = TSS_SORTEDINMEM;
2028 			break;
2029 
2030 		case TSS_BUILDRUNS:
2031 
2032 			/*
2033 			 * Finish tape-based sort.  First, flush all tuples remaining in
2034 			 * memory out to tape; then merge until we have a single remaining
2035 			 * run (or, if !randomAccess and !WORKER(), one run per tape).
2036 			 * Note that mergeruns sets the correct state->status.
2037 			 */
2038 			dumptuples(state, true);
2039 			mergeruns(state);
2040 			state->eof_reached = false;
2041 			state->markpos_block = 0L;
2042 			state->markpos_offset = 0;
2043 			state->markpos_eof = false;
2044 			break;
2045 
2046 		default:
2047 			elog(ERROR, "invalid tuplesort state");
2048 			break;
2049 	}
2050 
2051 #ifdef TRACE_SORT
2052 	if (trace_sort)
2053 	{
2054 		if (state->status == TSS_FINALMERGE)
2055 			elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
2056 				 state->worker, state->activeTapes,
2057 				 pg_rusage_show(&state->ru_start));
2058 		else
2059 			elog(LOG, "performsort of worker %d done: %s",
2060 				 state->worker, pg_rusage_show(&state->ru_start));
2061 	}
2062 #endif
2063 
2064 	MemoryContextSwitchTo(oldcontext);
2065 }
2066 
2067 /*
2068  * Internal routine to fetch the next tuple in either forward or back
2069  * direction into *stup.  Returns false if no more tuples.
2070  * Returned tuple belongs to tuplesort memory context, and must not be freed
2071  * by caller.  Note that fetched tuple is stored in memory that may be
2072  * recycled by any future fetch.
2073  */
2074 static bool
tuplesort_gettuple_common(Tuplesortstate * state,bool forward,SortTuple * stup)2075 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
2076 						  SortTuple *stup)
2077 {
2078 	unsigned int tuplen;
2079 	size_t		nmoved;
2080 
2081 	Assert(!WORKER(state));
2082 
2083 	switch (state->status)
2084 	{
2085 		case TSS_SORTEDINMEM:
2086 			Assert(forward || state->randomAccess);
2087 			Assert(!state->slabAllocatorUsed);
2088 			if (forward)
2089 			{
2090 				if (state->current < state->memtupcount)
2091 				{
2092 					*stup = state->memtuples[state->current++];
2093 					return true;
2094 				}
2095 				state->eof_reached = true;
2096 
2097 				/*
2098 				 * Complain if caller tries to retrieve more tuples than
2099 				 * originally asked for in a bounded sort.  This is because
2100 				 * returning EOF here might be the wrong thing.
2101 				 */
2102 				if (state->bounded && state->current >= state->bound)
2103 					elog(ERROR, "retrieved too many tuples in a bounded sort");
2104 
2105 				return false;
2106 			}
2107 			else
2108 			{
2109 				if (state->current <= 0)
2110 					return false;
2111 
2112 				/*
2113 				 * if all tuples are fetched already then we return last
2114 				 * tuple, else - tuple before last returned.
2115 				 */
2116 				if (state->eof_reached)
2117 					state->eof_reached = false;
2118 				else
2119 				{
2120 					state->current--;	/* last returned tuple */
2121 					if (state->current <= 0)
2122 						return false;
2123 				}
2124 				*stup = state->memtuples[state->current - 1];
2125 				return true;
2126 			}
2127 			break;
2128 
2129 		case TSS_SORTEDONTAPE:
2130 			Assert(forward || state->randomAccess);
2131 			Assert(state->slabAllocatorUsed);
2132 
2133 			/*
2134 			 * The slot that held the tuple that we returned in previous
2135 			 * gettuple call can now be reused.
2136 			 */
2137 			if (state->lastReturnedTuple)
2138 			{
2139 				RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2140 				state->lastReturnedTuple = NULL;
2141 			}
2142 
2143 			if (forward)
2144 			{
2145 				if (state->eof_reached)
2146 					return false;
2147 
2148 				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
2149 				{
2150 					READTUP(state, stup, state->result_tape, tuplen);
2151 
2152 					/*
2153 					 * Remember the tuple we return, so that we can recycle
2154 					 * its memory on next call.  (This can be NULL, in the
2155 					 * !state->tuples case).
2156 					 */
2157 					state->lastReturnedTuple = stup->tuple;
2158 
2159 					return true;
2160 				}
2161 				else
2162 				{
2163 					state->eof_reached = true;
2164 					return false;
2165 				}
2166 			}
2167 
2168 			/*
2169 			 * Backward.
2170 			 *
2171 			 * if all tuples are fetched already then we return last tuple,
2172 			 * else - tuple before last returned.
2173 			 */
2174 			if (state->eof_reached)
2175 			{
2176 				/*
2177 				 * Seek position is pointing just past the zero tuplen at the
2178 				 * end of file; back up to fetch last tuple's ending length
2179 				 * word.  If seek fails we must have a completely empty file.
2180 				 */
2181 				nmoved = LogicalTapeBackspace(state->tapeset,
2182 											  state->result_tape,
2183 											  2 * sizeof(unsigned int));
2184 				if (nmoved == 0)
2185 					return false;
2186 				else if (nmoved != 2 * sizeof(unsigned int))
2187 					elog(ERROR, "unexpected tape position");
2188 				state->eof_reached = false;
2189 			}
2190 			else
2191 			{
2192 				/*
2193 				 * Back up and fetch previously-returned tuple's ending length
2194 				 * word.  If seek fails, assume we are at start of file.
2195 				 */
2196 				nmoved = LogicalTapeBackspace(state->tapeset,
2197 											  state->result_tape,
2198 											  sizeof(unsigned int));
2199 				if (nmoved == 0)
2200 					return false;
2201 				else if (nmoved != sizeof(unsigned int))
2202 					elog(ERROR, "unexpected tape position");
2203 				tuplen = getlen(state, state->result_tape, false);
2204 
2205 				/*
2206 				 * Back up to get ending length word of tuple before it.
2207 				 */
2208 				nmoved = LogicalTapeBackspace(state->tapeset,
2209 											  state->result_tape,
2210 											  tuplen + 2 * sizeof(unsigned int));
2211 				if (nmoved == tuplen + sizeof(unsigned int))
2212 				{
2213 					/*
2214 					 * We backed up over the previous tuple, but there was no
2215 					 * ending length word before it.  That means that the prev
2216 					 * tuple is the first tuple in the file.  It is now the
2217 					 * next to read in forward direction (not obviously right,
2218 					 * but that is what in-memory case does).
2219 					 */
2220 					return false;
2221 				}
2222 				else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2223 					elog(ERROR, "bogus tuple length in backward scan");
2224 			}
2225 
2226 			tuplen = getlen(state, state->result_tape, false);
2227 
2228 			/*
2229 			 * Now we have the length of the prior tuple, back up and read it.
2230 			 * Note: READTUP expects we are positioned after the initial
2231 			 * length word of the tuple, so back up to that point.
2232 			 */
2233 			nmoved = LogicalTapeBackspace(state->tapeset,
2234 										  state->result_tape,
2235 										  tuplen);
2236 			if (nmoved != tuplen)
2237 				elog(ERROR, "bogus tuple length in backward scan");
2238 			READTUP(state, stup, state->result_tape, tuplen);
2239 
2240 			/*
2241 			 * Remember the tuple we return, so that we can recycle its memory
2242 			 * on next call. (This can be NULL, in the Datum case).
2243 			 */
2244 			state->lastReturnedTuple = stup->tuple;
2245 
2246 			return true;
2247 
2248 		case TSS_FINALMERGE:
2249 			Assert(forward);
2250 			/* We are managing memory ourselves, with the slab allocator. */
2251 			Assert(state->slabAllocatorUsed);
2252 
2253 			/*
2254 			 * The slab slot holding the tuple that we returned in previous
2255 			 * gettuple call can now be reused.
2256 			 */
2257 			if (state->lastReturnedTuple)
2258 			{
2259 				RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2260 				state->lastReturnedTuple = NULL;
2261 			}
2262 
2263 			/*
2264 			 * This code should match the inner loop of mergeonerun().
2265 			 */
2266 			if (state->memtupcount > 0)
2267 			{
2268 				int			srcTape = state->memtuples[0].srctape;
2269 				SortTuple	newtup;
2270 
2271 				*stup = state->memtuples[0];
2272 
2273 				/*
2274 				 * Remember the tuple we return, so that we can recycle its
2275 				 * memory on next call. (This can be NULL, in the Datum case).
2276 				 */
2277 				state->lastReturnedTuple = stup->tuple;
2278 
2279 				/*
2280 				 * Pull next tuple from tape, and replace the returned tuple
2281 				 * at top of the heap with it.
2282 				 */
2283 				if (!mergereadnext(state, srcTape, &newtup))
2284 				{
2285 					/*
2286 					 * If no more data, we've reached end of run on this tape.
2287 					 * Remove the top node from the heap.
2288 					 */
2289 					tuplesort_heap_delete_top(state);
2290 
2291 					/*
2292 					 * Rewind to free the read buffer.  It'd go away at the
2293 					 * end of the sort anyway, but better to release the
2294 					 * memory early.
2295 					 */
2296 					LogicalTapeRewindForWrite(state->tapeset, srcTape);
2297 					return true;
2298 				}
2299 				newtup.srctape = srcTape;
2300 				tuplesort_heap_replace_top(state, &newtup);
2301 				return true;
2302 			}
2303 			return false;
2304 
2305 		default:
2306 			elog(ERROR, "invalid tuplesort state");
2307 			return false;		/* keep compiler quiet */
2308 	}
2309 }
2310 
2311 /*
2312  * Fetch the next tuple in either forward or back direction.
2313  * If successful, put tuple in slot and return true; else, clear the slot
2314  * and return false.
2315  *
2316  * Caller may optionally be passed back abbreviated value (on true return
2317  * value) when abbreviation was used, which can be used to cheaply avoid
2318  * equality checks that might otherwise be required.  Caller can safely make a
2319  * determination of "non-equal tuple" based on simple binary inequality.  A
2320  * NULL value in leading attribute will set abbreviated value to zeroed
2321  * representation, which caller may rely on in abbreviated inequality check.
2322  *
2323  * If copy is true, the slot receives a tuple that's been copied into the
2324  * caller's memory context, so that it will stay valid regardless of future
2325  * manipulations of the tuplesort's state (up to and including deleting the
2326  * tuplesort).  If copy is false, the slot will just receive a pointer to a
2327  * tuple held within the tuplesort, which is more efficient, but only safe for
2328  * callers that are prepared to have any subsequent manipulation of the
2329  * tuplesort's state invalidate slot contents.
2330  */
2331 bool
tuplesort_gettupleslot(Tuplesortstate * state,bool forward,bool copy,TupleTableSlot * slot,Datum * abbrev)2332 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2333 					   TupleTableSlot *slot, Datum *abbrev)
2334 {
2335 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2336 	SortTuple	stup;
2337 
2338 	if (!tuplesort_gettuple_common(state, forward, &stup))
2339 		stup.tuple = NULL;
2340 
2341 	MemoryContextSwitchTo(oldcontext);
2342 
2343 	if (stup.tuple)
2344 	{
2345 		/* Record abbreviated key for caller */
2346 		if (state->sortKeys->abbrev_converter && abbrev)
2347 			*abbrev = stup.datum1;
2348 
2349 		if (copy)
2350 			stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2351 
2352 		ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2353 		return true;
2354 	}
2355 	else
2356 	{
2357 		ExecClearTuple(slot);
2358 		return false;
2359 	}
2360 }
2361 
2362 /*
2363  * Fetch the next tuple in either forward or back direction.
2364  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2365  * context, and must not be freed by caller.  Caller may not rely on tuple
2366  * remaining valid after any further manipulation of tuplesort.
2367  */
2368 HeapTuple
tuplesort_getheaptuple(Tuplesortstate * state,bool forward)2369 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2370 {
2371 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2372 	SortTuple	stup;
2373 
2374 	if (!tuplesort_gettuple_common(state, forward, &stup))
2375 		stup.tuple = NULL;
2376 
2377 	MemoryContextSwitchTo(oldcontext);
2378 
2379 	return stup.tuple;
2380 }
2381 
2382 /*
2383  * Fetch the next index tuple in either forward or back direction.
2384  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2385  * context, and must not be freed by caller.  Caller may not rely on tuple
2386  * remaining valid after any further manipulation of tuplesort.
2387  */
2388 IndexTuple
tuplesort_getindextuple(Tuplesortstate * state,bool forward)2389 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2390 {
2391 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2392 	SortTuple	stup;
2393 
2394 	if (!tuplesort_gettuple_common(state, forward, &stup))
2395 		stup.tuple = NULL;
2396 
2397 	MemoryContextSwitchTo(oldcontext);
2398 
2399 	return (IndexTuple) stup.tuple;
2400 }
2401 
2402 /*
2403  * Fetch the next Datum in either forward or back direction.
2404  * Returns false if no more datums.
2405  *
2406  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2407  * in caller's context, and is now owned by the caller (this differs from
2408  * similar routines for other types of tuplesorts).
2409  *
2410  * Caller may optionally be passed back abbreviated value (on true return
2411  * value) when abbreviation was used, which can be used to cheaply avoid
2412  * equality checks that might otherwise be required.  Caller can safely make a
2413  * determination of "non-equal tuple" based on simple binary inequality.  A
2414  * NULL value will have a zeroed abbreviated value representation, which caller
2415  * may rely on in abbreviated inequality check.
2416  */
2417 bool
tuplesort_getdatum(Tuplesortstate * state,bool forward,Datum * val,bool * isNull,Datum * abbrev)2418 tuplesort_getdatum(Tuplesortstate *state, bool forward,
2419 				   Datum *val, bool *isNull, Datum *abbrev)
2420 {
2421 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2422 	SortTuple	stup;
2423 
2424 	if (!tuplesort_gettuple_common(state, forward, &stup))
2425 	{
2426 		MemoryContextSwitchTo(oldcontext);
2427 		return false;
2428 	}
2429 
2430 	/* Ensure we copy into caller's memory context */
2431 	MemoryContextSwitchTo(oldcontext);
2432 
2433 	/* Record abbreviated key for caller */
2434 	if (state->sortKeys->abbrev_converter && abbrev)
2435 		*abbrev = stup.datum1;
2436 
2437 	if (stup.isnull1 || !state->tuples)
2438 	{
2439 		*val = stup.datum1;
2440 		*isNull = stup.isnull1;
2441 	}
2442 	else
2443 	{
2444 		/* use stup.tuple because stup.datum1 may be an abbreviation */
2445 		*val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2446 		*isNull = false;
2447 	}
2448 
2449 	return true;
2450 }
2451 
2452 /*
2453  * Advance over N tuples in either forward or back direction,
2454  * without returning any data.  N==0 is a no-op.
2455  * Returns true if successful, false if ran out of tuples.
2456  */
2457 bool
tuplesort_skiptuples(Tuplesortstate * state,int64 ntuples,bool forward)2458 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2459 {
2460 	MemoryContext oldcontext;
2461 
2462 	/*
2463 	 * We don't actually support backwards skip yet, because no callers need
2464 	 * it.  The API is designed to allow for that later, though.
2465 	 */
2466 	Assert(forward);
2467 	Assert(ntuples >= 0);
2468 	Assert(!WORKER(state));
2469 
2470 	switch (state->status)
2471 	{
2472 		case TSS_SORTEDINMEM:
2473 			if (state->memtupcount - state->current >= ntuples)
2474 			{
2475 				state->current += ntuples;
2476 				return true;
2477 			}
2478 			state->current = state->memtupcount;
2479 			state->eof_reached = true;
2480 
2481 			/*
2482 			 * Complain if caller tries to retrieve more tuples than
2483 			 * originally asked for in a bounded sort.  This is because
2484 			 * returning EOF here might be the wrong thing.
2485 			 */
2486 			if (state->bounded && state->current >= state->bound)
2487 				elog(ERROR, "retrieved too many tuples in a bounded sort");
2488 
2489 			return false;
2490 
2491 		case TSS_SORTEDONTAPE:
2492 		case TSS_FINALMERGE:
2493 
2494 			/*
2495 			 * We could probably optimize these cases better, but for now it's
2496 			 * not worth the trouble.
2497 			 */
2498 			oldcontext = MemoryContextSwitchTo(state->sortcontext);
2499 			while (ntuples-- > 0)
2500 			{
2501 				SortTuple	stup;
2502 
2503 				if (!tuplesort_gettuple_common(state, forward, &stup))
2504 				{
2505 					MemoryContextSwitchTo(oldcontext);
2506 					return false;
2507 				}
2508 				CHECK_FOR_INTERRUPTS();
2509 			}
2510 			MemoryContextSwitchTo(oldcontext);
2511 			return true;
2512 
2513 		default:
2514 			elog(ERROR, "invalid tuplesort state");
2515 			return false;		/* keep compiler quiet */
2516 	}
2517 }
2518 
2519 /*
2520  * tuplesort_merge_order - report merge order we'll use for given memory
2521  * (note: "merge order" just means the number of input tapes in the merge).
2522  *
2523  * This is exported for use by the planner.  allowedMem is in bytes.
2524  */
2525 int
tuplesort_merge_order(int64 allowedMem)2526 tuplesort_merge_order(int64 allowedMem)
2527 {
2528 	int			mOrder;
2529 
2530 	/*
2531 	 * We need one tape for each merge input, plus another one for the output,
2532 	 * and each of these tapes needs buffer space.  In addition we want
2533 	 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2534 	 * count).
2535 	 *
2536 	 * Note: you might be thinking we need to account for the memtuples[]
2537 	 * array in this calculation, but we effectively treat that as part of the
2538 	 * MERGE_BUFFER_SIZE workspace.
2539 	 */
2540 	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2541 		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2542 
2543 	/*
2544 	 * Even in minimum memory, use at least a MINORDER merge.  On the other
2545 	 * hand, even when we have lots of memory, do not use more than a MAXORDER
2546 	 * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
2547 	 * additional tape reduces the amount of memory available to build runs,
2548 	 * which in turn can cause the same sort to need more runs, which makes
2549 	 * merging slower even if it can still be done in a single pass.  Also,
2550 	 * high order merges are quite slow due to CPU cache effects; it can be
2551 	 * faster to pay the I/O cost of a polyphase merge than to perform a
2552 	 * single merge pass across many hundreds of tapes.
2553 	 */
2554 	mOrder = Max(mOrder, MINORDER);
2555 	mOrder = Min(mOrder, MAXORDER);
2556 
2557 	return mOrder;
2558 }
2559 
2560 /*
2561  * inittapes - initialize for tape sorting.
2562  *
2563  * This is called only if we have found we won't sort in memory.
2564  */
2565 static void
inittapes(Tuplesortstate * state,bool mergeruns)2566 inittapes(Tuplesortstate *state, bool mergeruns)
2567 {
2568 	int			maxTapes,
2569 				j;
2570 
2571 	Assert(!LEADER(state));
2572 
2573 	if (mergeruns)
2574 	{
2575 		/* Compute number of tapes to use: merge order plus 1 */
2576 		maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2577 	}
2578 	else
2579 	{
2580 		/* Workers can sometimes produce single run, output without merge */
2581 		Assert(WORKER(state));
2582 		maxTapes = MINORDER + 1;
2583 	}
2584 
2585 #ifdef TRACE_SORT
2586 	if (trace_sort)
2587 		elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2588 			 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2589 #endif
2590 
2591 	/* Create the tape set and allocate the per-tape data arrays */
2592 	inittapestate(state, maxTapes);
2593 	state->tapeset =
2594 		LogicalTapeSetCreate(maxTapes, false, NULL,
2595 							 state->shared ? &state->shared->fileset : NULL,
2596 							 state->worker);
2597 
2598 	state->currentRun = 0;
2599 
2600 	/*
2601 	 * Initialize variables of Algorithm D (step D1).
2602 	 */
2603 	for (j = 0; j < maxTapes; j++)
2604 	{
2605 		state->tp_fib[j] = 1;
2606 		state->tp_runs[j] = 0;
2607 		state->tp_dummy[j] = 1;
2608 		state->tp_tapenum[j] = j;
2609 	}
2610 	state->tp_fib[state->tapeRange] = 0;
2611 	state->tp_dummy[state->tapeRange] = 0;
2612 
2613 	state->Level = 1;
2614 	state->destTape = 0;
2615 
2616 	state->status = TSS_BUILDRUNS;
2617 }
2618 
2619 /*
2620  * inittapestate - initialize generic tape management state
2621  */
2622 static void
inittapestate(Tuplesortstate * state,int maxTapes)2623 inittapestate(Tuplesortstate *state, int maxTapes)
2624 {
2625 	int64		tapeSpace;
2626 
2627 	/*
2628 	 * Decrease availMem to reflect the space needed for tape buffers; but
2629 	 * don't decrease it to the point that we have no room for tuples. (That
2630 	 * case is only likely to occur if sorting pass-by-value Datums; in all
2631 	 * other scenarios the memtuples[] array is unlikely to occupy more than
2632 	 * half of allowedMem.  In the pass-by-value case it's not important to
2633 	 * account for tuple space, so we don't care if LACKMEM becomes
2634 	 * inaccurate.)
2635 	 */
2636 	tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2637 
2638 	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2639 		USEMEM(state, tapeSpace);
2640 
2641 	/*
2642 	 * Make sure that the temp file(s) underlying the tape set are created in
2643 	 * suitable temp tablespaces.  For parallel sorts, this should have been
2644 	 * called already, but it doesn't matter if it is called a second time.
2645 	 */
2646 	PrepareTempTablespaces();
2647 
2648 	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2649 	state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2650 	state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2651 	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2652 	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2653 
2654 	/* Record # of tapes allocated (for duration of sort) */
2655 	state->maxTapes = maxTapes;
2656 	/* Record maximum # of tapes usable as inputs when merging */
2657 	state->tapeRange = maxTapes - 1;
2658 }
2659 
2660 /*
2661  * selectnewtape -- select new tape for new initial run.
2662  *
2663  * This is called after finishing a run when we know another run
2664  * must be started.  This implements steps D3, D4 of Algorithm D.
2665  */
2666 static void
selectnewtape(Tuplesortstate * state)2667 selectnewtape(Tuplesortstate *state)
2668 {
2669 	int			j;
2670 	int			a;
2671 
2672 	/* Step D3: advance j (destTape) */
2673 	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2674 	{
2675 		state->destTape++;
2676 		return;
2677 	}
2678 	if (state->tp_dummy[state->destTape] != 0)
2679 	{
2680 		state->destTape = 0;
2681 		return;
2682 	}
2683 
2684 	/* Step D4: increase level */
2685 	state->Level++;
2686 	a = state->tp_fib[0];
2687 	for (j = 0; j < state->tapeRange; j++)
2688 	{
2689 		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2690 		state->tp_fib[j] = a + state->tp_fib[j + 1];
2691 	}
2692 	state->destTape = 0;
2693 }
2694 
2695 /*
2696  * Initialize the slab allocation arena, for the given number of slots.
2697  */
2698 static void
init_slab_allocator(Tuplesortstate * state,int numSlots)2699 init_slab_allocator(Tuplesortstate *state, int numSlots)
2700 {
2701 	if (numSlots > 0)
2702 	{
2703 		char	   *p;
2704 		int			i;
2705 
2706 		state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2707 		state->slabMemoryEnd = state->slabMemoryBegin +
2708 			numSlots * SLAB_SLOT_SIZE;
2709 		state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2710 		USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2711 
2712 		p = state->slabMemoryBegin;
2713 		for (i = 0; i < numSlots - 1; i++)
2714 		{
2715 			((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2716 			p += SLAB_SLOT_SIZE;
2717 		}
2718 		((SlabSlot *) p)->nextfree = NULL;
2719 	}
2720 	else
2721 	{
2722 		state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2723 		state->slabFreeHead = NULL;
2724 	}
2725 	state->slabAllocatorUsed = true;
2726 }
2727 
2728 /*
2729  * mergeruns -- merge all the completed initial runs.
2730  *
2731  * This implements steps D5, D6 of Algorithm D.  All input data has
2732  * already been written to initial runs on tape (see dumptuples).
2733  */
2734 static void
mergeruns(Tuplesortstate * state)2735 mergeruns(Tuplesortstate *state)
2736 {
2737 	int			tapenum,
2738 				svTape,
2739 				svRuns,
2740 				svDummy;
2741 	int			numTapes;
2742 	int			numInputTapes;
2743 
2744 	Assert(state->status == TSS_BUILDRUNS);
2745 	Assert(state->memtupcount == 0);
2746 
2747 	if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2748 	{
2749 		/*
2750 		 * If there are multiple runs to be merged, when we go to read back
2751 		 * tuples from disk, abbreviated keys will not have been stored, and
2752 		 * we don't care to regenerate them.  Disable abbreviation from this
2753 		 * point on.
2754 		 */
2755 		state->sortKeys->abbrev_converter = NULL;
2756 		state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2757 
2758 		/* Not strictly necessary, but be tidy */
2759 		state->sortKeys->abbrev_abort = NULL;
2760 		state->sortKeys->abbrev_full_comparator = NULL;
2761 	}
2762 
2763 	/*
2764 	 * Reset tuple memory.  We've freed all the tuples that we previously
2765 	 * allocated.  We will use the slab allocator from now on.
2766 	 */
2767 	MemoryContextResetOnly(state->tuplecontext);
2768 
2769 	/*
2770 	 * We no longer need a large memtuples array.  (We will allocate a smaller
2771 	 * one for the heap later.)
2772 	 */
2773 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2774 	pfree(state->memtuples);
2775 	state->memtuples = NULL;
2776 
2777 	/*
2778 	 * If we had fewer runs than tapes, refund the memory that we imagined we
2779 	 * would need for the tape buffers of the unused tapes.
2780 	 *
2781 	 * numTapes and numInputTapes reflect the actual number of tapes we will
2782 	 * use.  Note that the output tape's tape number is maxTapes - 1, so the
2783 	 * tape numbers of the used tapes are not consecutive, and you cannot just
2784 	 * loop from 0 to numTapes to visit all used tapes!
2785 	 */
2786 	if (state->Level == 1)
2787 	{
2788 		numInputTapes = state->currentRun;
2789 		numTapes = numInputTapes + 1;
2790 		FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2791 	}
2792 	else
2793 	{
2794 		numInputTapes = state->tapeRange;
2795 		numTapes = state->maxTapes;
2796 	}
2797 
2798 	/*
2799 	 * Initialize the slab allocator.  We need one slab slot per input tape,
2800 	 * for the tuples in the heap, plus one to hold the tuple last returned
2801 	 * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
2802 	 * however, we don't need to do allocate anything.)
2803 	 *
2804 	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2805 	 * to track memory usage of individual tuples.
2806 	 */
2807 	if (state->tuples)
2808 		init_slab_allocator(state, numInputTapes + 1);
2809 	else
2810 		init_slab_allocator(state, 0);
2811 
2812 	/*
2813 	 * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
2814 	 * from each input tape.
2815 	 */
2816 	state->memtupsize = numInputTapes;
2817 	state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
2818 														numInputTapes * sizeof(SortTuple));
2819 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2820 
2821 	/*
2822 	 * Use all the remaining memory we have available for read buffers among
2823 	 * the input tapes.
2824 	 *
2825 	 * We don't try to "rebalance" the memory among tapes, when we start a new
2826 	 * merge phase, even if some tapes are inactive in the new phase.  That
2827 	 * would be hard, because logtape.c doesn't know where one run ends and
2828 	 * another begins.  When a new merge phase begins, and a tape doesn't
2829 	 * participate in it, its buffer nevertheless already contains tuples from
2830 	 * the next run on same tape, so we cannot release the buffer.  That's OK
2831 	 * in practice, merge performance isn't that sensitive to the amount of
2832 	 * buffers used, and most merge phases use all or almost all tapes,
2833 	 * anyway.
2834 	 */
2835 #ifdef TRACE_SORT
2836 	if (trace_sort)
2837 		elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2838 			 state->worker, state->availMem / 1024, numInputTapes);
2839 #endif
2840 
2841 	state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2842 	USEMEM(state, state->read_buffer_size * numInputTapes);
2843 
2844 	/* End of step D2: rewind all output tapes to prepare for merging */
2845 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2846 		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2847 
2848 	for (;;)
2849 	{
2850 		/*
2851 		 * At this point we know that tape[T] is empty.  If there's just one
2852 		 * (real or dummy) run left on each input tape, then only one merge
2853 		 * pass remains.  If we don't have to produce a materialized sorted
2854 		 * tape, we can stop at this point and do the final merge on-the-fly.
2855 		 */
2856 		if (!state->randomAccess && !WORKER(state))
2857 		{
2858 			bool		allOneRun = true;
2859 
2860 			Assert(state->tp_runs[state->tapeRange] == 0);
2861 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2862 			{
2863 				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2864 				{
2865 					allOneRun = false;
2866 					break;
2867 				}
2868 			}
2869 			if (allOneRun)
2870 			{
2871 				/* Tell logtape.c we won't be writing anymore */
2872 				LogicalTapeSetForgetFreeSpace(state->tapeset);
2873 				/* Initialize for the final merge pass */
2874 				beginmerge(state);
2875 				state->status = TSS_FINALMERGE;
2876 				return;
2877 			}
2878 		}
2879 
2880 		/* Step D5: merge runs onto tape[T] until tape[P] is empty */
2881 		while (state->tp_runs[state->tapeRange - 1] ||
2882 			   state->tp_dummy[state->tapeRange - 1])
2883 		{
2884 			bool		allDummy = true;
2885 
2886 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2887 			{
2888 				if (state->tp_dummy[tapenum] == 0)
2889 				{
2890 					allDummy = false;
2891 					break;
2892 				}
2893 			}
2894 
2895 			if (allDummy)
2896 			{
2897 				state->tp_dummy[state->tapeRange]++;
2898 				for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2899 					state->tp_dummy[tapenum]--;
2900 			}
2901 			else
2902 				mergeonerun(state);
2903 		}
2904 
2905 		/* Step D6: decrease level */
2906 		if (--state->Level == 0)
2907 			break;
2908 		/* rewind output tape T to use as new input */
2909 		LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2910 								 state->read_buffer_size);
2911 		/* rewind used-up input tape P, and prepare it for write pass */
2912 		LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2913 		state->tp_runs[state->tapeRange - 1] = 0;
2914 
2915 		/*
2916 		 * reassign tape units per step D6; note we no longer care about A[]
2917 		 */
2918 		svTape = state->tp_tapenum[state->tapeRange];
2919 		svDummy = state->tp_dummy[state->tapeRange];
2920 		svRuns = state->tp_runs[state->tapeRange];
2921 		for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2922 		{
2923 			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
2924 			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
2925 			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
2926 		}
2927 		state->tp_tapenum[0] = svTape;
2928 		state->tp_dummy[0] = svDummy;
2929 		state->tp_runs[0] = svRuns;
2930 	}
2931 
2932 	/*
2933 	 * Done.  Knuth says that the result is on TAPE[1], but since we exited
2934 	 * the loop without performing the last iteration of step D6, we have not
2935 	 * rearranged the tape unit assignment, and therefore the result is on
2936 	 * TAPE[T].  We need to do it this way so that we can freeze the final
2937 	 * output tape while rewinding it.  The last iteration of step D6 would be
2938 	 * a waste of cycles anyway...
2939 	 */
2940 	state->result_tape = state->tp_tapenum[state->tapeRange];
2941 	if (!WORKER(state))
2942 		LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
2943 	else
2944 		worker_freeze_result_tape(state);
2945 	state->status = TSS_SORTEDONTAPE;
2946 
2947 	/* Release the read buffers of all the other tapes, by rewinding them. */
2948 	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
2949 	{
2950 		if (tapenum != state->result_tape)
2951 			LogicalTapeRewindForWrite(state->tapeset, tapenum);
2952 	}
2953 }
2954 
2955 /*
2956  * Merge one run from each input tape, except ones with dummy runs.
2957  *
2958  * This is the inner loop of Algorithm D step D5.  We know that the
2959  * output tape is TAPE[T].
2960  */
2961 static void
mergeonerun(Tuplesortstate * state)2962 mergeonerun(Tuplesortstate *state)
2963 {
2964 	int			destTape = state->tp_tapenum[state->tapeRange];
2965 	int			srcTape;
2966 
2967 	/*
2968 	 * Start the merge by loading one tuple from each active source tape into
2969 	 * the heap.  We can also decrease the input run/dummy run counts.
2970 	 */
2971 	beginmerge(state);
2972 
2973 	/*
2974 	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2975 	 * out, and replacing it with next tuple from same tape (if there is
2976 	 * another one).
2977 	 */
2978 	while (state->memtupcount > 0)
2979 	{
2980 		SortTuple	stup;
2981 
2982 		/* write the tuple to destTape */
2983 		srcTape = state->memtuples[0].srctape;
2984 		WRITETUP(state, destTape, &state->memtuples[0]);
2985 
2986 		/* recycle the slot of the tuple we just wrote out, for the next read */
2987 		if (state->memtuples[0].tuple)
2988 			RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2989 
2990 		/*
2991 		 * pull next tuple from the tape, and replace the written-out tuple in
2992 		 * the heap with it.
2993 		 */
2994 		if (mergereadnext(state, srcTape, &stup))
2995 		{
2996 			stup.srctape = srcTape;
2997 			tuplesort_heap_replace_top(state, &stup);
2998 		}
2999 		else
3000 			tuplesort_heap_delete_top(state);
3001 	}
3002 
3003 	/*
3004 	 * When the heap empties, we're done.  Write an end-of-run marker on the
3005 	 * output tape, and increment its count of real runs.
3006 	 */
3007 	markrunend(state, destTape);
3008 	state->tp_runs[state->tapeRange]++;
3009 
3010 #ifdef TRACE_SORT
3011 	if (trace_sort)
3012 		elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
3013 			 state->activeTapes, pg_rusage_show(&state->ru_start));
3014 #endif
3015 }
3016 
3017 /*
3018  * beginmerge - initialize for a merge pass
3019  *
3020  * We decrease the counts of real and dummy runs for each tape, and mark
3021  * which tapes contain active input runs in mergeactive[].  Then, fill the
3022  * merge heap with the first tuple from each active tape.
3023  */
3024 static void
beginmerge(Tuplesortstate * state)3025 beginmerge(Tuplesortstate *state)
3026 {
3027 	int			activeTapes;
3028 	int			tapenum;
3029 	int			srcTape;
3030 
3031 	/* Heap should be empty here */
3032 	Assert(state->memtupcount == 0);
3033 
3034 	/* Adjust run counts and mark the active tapes */
3035 	memset(state->mergeactive, 0,
3036 		   state->maxTapes * sizeof(*state->mergeactive));
3037 	activeTapes = 0;
3038 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
3039 	{
3040 		if (state->tp_dummy[tapenum] > 0)
3041 			state->tp_dummy[tapenum]--;
3042 		else
3043 		{
3044 			Assert(state->tp_runs[tapenum] > 0);
3045 			state->tp_runs[tapenum]--;
3046 			srcTape = state->tp_tapenum[tapenum];
3047 			state->mergeactive[srcTape] = true;
3048 			activeTapes++;
3049 		}
3050 	}
3051 	Assert(activeTapes > 0);
3052 	state->activeTapes = activeTapes;
3053 
3054 	/* Load the merge heap with the first tuple from each input tape */
3055 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
3056 	{
3057 		SortTuple	tup;
3058 
3059 		if (mergereadnext(state, srcTape, &tup))
3060 		{
3061 			tup.srctape = srcTape;
3062 			tuplesort_heap_insert(state, &tup);
3063 		}
3064 	}
3065 }
3066 
3067 /*
3068  * mergereadnext - read next tuple from one merge input tape
3069  *
3070  * Returns false on EOF.
3071  */
3072 static bool
mergereadnext(Tuplesortstate * state,int srcTape,SortTuple * stup)3073 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
3074 {
3075 	unsigned int tuplen;
3076 
3077 	if (!state->mergeactive[srcTape])
3078 		return false;			/* tape's run is already exhausted */
3079 
3080 	/* read next tuple, if any */
3081 	if ((tuplen = getlen(state, srcTape, true)) == 0)
3082 	{
3083 		state->mergeactive[srcTape] = false;
3084 		return false;
3085 	}
3086 	READTUP(state, stup, srcTape, tuplen);
3087 
3088 	return true;
3089 }
3090 
3091 /*
3092  * dumptuples - remove tuples from memtuples and write initial run to tape
3093  *
3094  * When alltuples = true, dump everything currently in memory.  (This case is
3095  * only used at end of input data.)
3096  */
3097 static void
dumptuples(Tuplesortstate * state,bool alltuples)3098 dumptuples(Tuplesortstate *state, bool alltuples)
3099 {
3100 	int			memtupwrite;
3101 	int			i;
3102 
3103 	/*
3104 	 * Nothing to do if we still fit in available memory and have array slots,
3105 	 * unless this is the final call during initial run generation.
3106 	 */
3107 	if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
3108 		!alltuples)
3109 		return;
3110 
3111 	/*
3112 	 * Final call might require no sorting, in rare cases where we just so
3113 	 * happen to have previously LACKMEM()'d at the point where exactly all
3114 	 * remaining tuples are loaded into memory, just before input was
3115 	 * exhausted.
3116 	 *
3117 	 * In general, short final runs are quite possible.  Rather than allowing
3118 	 * a special case where there was a superfluous selectnewtape() call (i.e.
3119 	 * a call with no subsequent run actually written to destTape), we prefer
3120 	 * to write out a 0 tuple run.
3121 	 *
3122 	 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
3123 	 * the tape inactive for the merge when called from beginmerge().  This
3124 	 * case is therefore similar to the case where mergeonerun() finds a dummy
3125 	 * run for the tape, and so doesn't need to merge a run from the tape (or
3126 	 * conceptually "merges" the dummy run, if you prefer).  According to
3127 	 * Knuth, Algorithm D "isn't strictly optimal" in its method of
3128 	 * distribution and dummy run assignment; this edge case seems very
3129 	 * unlikely to make that appreciably worse.
3130 	 */
3131 	Assert(state->status == TSS_BUILDRUNS);
3132 
3133 	/*
3134 	 * It seems unlikely that this limit will ever be exceeded, but take no
3135 	 * chances
3136 	 */
3137 	if (state->currentRun == INT_MAX)
3138 		ereport(ERROR,
3139 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
3140 				 errmsg("cannot have more than %d runs for an external sort",
3141 						INT_MAX)));
3142 
3143 	state->currentRun++;
3144 
3145 #ifdef TRACE_SORT
3146 	if (trace_sort)
3147 		elog(LOG, "worker %d starting quicksort of run %d: %s",
3148 			 state->worker, state->currentRun,
3149 			 pg_rusage_show(&state->ru_start));
3150 #endif
3151 
3152 	/*
3153 	 * Sort all tuples accumulated within the allowed amount of memory for
3154 	 * this run using quicksort
3155 	 */
3156 	tuplesort_sort_memtuples(state);
3157 
3158 #ifdef TRACE_SORT
3159 	if (trace_sort)
3160 		elog(LOG, "worker %d finished quicksort of run %d: %s",
3161 			 state->worker, state->currentRun,
3162 			 pg_rusage_show(&state->ru_start));
3163 #endif
3164 
3165 	memtupwrite = state->memtupcount;
3166 	for (i = 0; i < memtupwrite; i++)
3167 	{
3168 		WRITETUP(state, state->tp_tapenum[state->destTape],
3169 				 &state->memtuples[i]);
3170 		state->memtupcount--;
3171 	}
3172 
3173 	/*
3174 	 * Reset tuple memory.  We've freed all of the tuples that we previously
3175 	 * allocated.  It's important to avoid fragmentation when there is a stark
3176 	 * change in the sizes of incoming tuples.  Fragmentation due to
3177 	 * AllocSetFree's bucketing by size class might be particularly bad if
3178 	 * this step wasn't taken.
3179 	 */
3180 	MemoryContextReset(state->tuplecontext);
3181 
3182 	markrunend(state, state->tp_tapenum[state->destTape]);
3183 	state->tp_runs[state->destTape]++;
3184 	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3185 
3186 #ifdef TRACE_SORT
3187 	if (trace_sort)
3188 		elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3189 			 state->worker, state->currentRun, state->destTape,
3190 			 pg_rusage_show(&state->ru_start));
3191 #endif
3192 
3193 	if (!alltuples)
3194 		selectnewtape(state);
3195 }
3196 
3197 /*
3198  * tuplesort_rescan		- rewind and replay the scan
3199  */
3200 void
tuplesort_rescan(Tuplesortstate * state)3201 tuplesort_rescan(Tuplesortstate *state)
3202 {
3203 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3204 
3205 	Assert(state->randomAccess);
3206 
3207 	switch (state->status)
3208 	{
3209 		case TSS_SORTEDINMEM:
3210 			state->current = 0;
3211 			state->eof_reached = false;
3212 			state->markpos_offset = 0;
3213 			state->markpos_eof = false;
3214 			break;
3215 		case TSS_SORTEDONTAPE:
3216 			LogicalTapeRewindForRead(state->tapeset,
3217 									 state->result_tape,
3218 									 0);
3219 			state->eof_reached = false;
3220 			state->markpos_block = 0L;
3221 			state->markpos_offset = 0;
3222 			state->markpos_eof = false;
3223 			break;
3224 		default:
3225 			elog(ERROR, "invalid tuplesort state");
3226 			break;
3227 	}
3228 
3229 	MemoryContextSwitchTo(oldcontext);
3230 }
3231 
3232 /*
3233  * tuplesort_markpos	- saves current position in the merged sort file
3234  */
3235 void
tuplesort_markpos(Tuplesortstate * state)3236 tuplesort_markpos(Tuplesortstate *state)
3237 {
3238 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3239 
3240 	Assert(state->randomAccess);
3241 
3242 	switch (state->status)
3243 	{
3244 		case TSS_SORTEDINMEM:
3245 			state->markpos_offset = state->current;
3246 			state->markpos_eof = state->eof_reached;
3247 			break;
3248 		case TSS_SORTEDONTAPE:
3249 			LogicalTapeTell(state->tapeset,
3250 							state->result_tape,
3251 							&state->markpos_block,
3252 							&state->markpos_offset);
3253 			state->markpos_eof = state->eof_reached;
3254 			break;
3255 		default:
3256 			elog(ERROR, "invalid tuplesort state");
3257 			break;
3258 	}
3259 
3260 	MemoryContextSwitchTo(oldcontext);
3261 }
3262 
3263 /*
3264  * tuplesort_restorepos - restores current position in merged sort file to
3265  *						  last saved position
3266  */
3267 void
tuplesort_restorepos(Tuplesortstate * state)3268 tuplesort_restorepos(Tuplesortstate *state)
3269 {
3270 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3271 
3272 	Assert(state->randomAccess);
3273 
3274 	switch (state->status)
3275 	{
3276 		case TSS_SORTEDINMEM:
3277 			state->current = state->markpos_offset;
3278 			state->eof_reached = state->markpos_eof;
3279 			break;
3280 		case TSS_SORTEDONTAPE:
3281 			LogicalTapeSeek(state->tapeset,
3282 							state->result_tape,
3283 							state->markpos_block,
3284 							state->markpos_offset);
3285 			state->eof_reached = state->markpos_eof;
3286 			break;
3287 		default:
3288 			elog(ERROR, "invalid tuplesort state");
3289 			break;
3290 	}
3291 
3292 	MemoryContextSwitchTo(oldcontext);
3293 }
3294 
3295 /*
3296  * tuplesort_get_stats - extract summary statistics
3297  *
3298  * This can be called after tuplesort_performsort() finishes to obtain
3299  * printable summary information about how the sort was performed.
3300  */
3301 void
tuplesort_get_stats(Tuplesortstate * state,TuplesortInstrumentation * stats)3302 tuplesort_get_stats(Tuplesortstate *state,
3303 					TuplesortInstrumentation *stats)
3304 {
3305 	/*
3306 	 * Note: it might seem we should provide both memory and disk usage for a
3307 	 * disk-based sort.  However, the current code doesn't track memory space
3308 	 * accurately once we have begun to return tuples to the caller (since we
3309 	 * don't account for pfree's the caller is expected to do), so we cannot
3310 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
3311 	 * to fix.  Is it worth creating an API for the memory context code to
3312 	 * tell us how much is actually used in sortcontext?
3313 	 */
3314 	tuplesort_updatemax(state);
3315 
3316 	if (state->isMaxSpaceDisk)
3317 		stats->spaceType = SORT_SPACE_TYPE_DISK;
3318 	else
3319 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3320 	stats->spaceUsed = (state->maxSpace + 1023) / 1024;
3321 
3322 	switch (state->maxSpaceStatus)
3323 	{
3324 		case TSS_SORTEDINMEM:
3325 			if (state->boundUsed)
3326 				stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3327 			else
3328 				stats->sortMethod = SORT_TYPE_QUICKSORT;
3329 			break;
3330 		case TSS_SORTEDONTAPE:
3331 			stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3332 			break;
3333 		case TSS_FINALMERGE:
3334 			stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3335 			break;
3336 		default:
3337 			stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3338 			break;
3339 	}
3340 }
3341 
3342 /*
3343  * Convert TuplesortMethod to a string.
3344  */
3345 const char *
tuplesort_method_name(TuplesortMethod m)3346 tuplesort_method_name(TuplesortMethod m)
3347 {
3348 	switch (m)
3349 	{
3350 		case SORT_TYPE_STILL_IN_PROGRESS:
3351 			return "still in progress";
3352 		case SORT_TYPE_TOP_N_HEAPSORT:
3353 			return "top-N heapsort";
3354 		case SORT_TYPE_QUICKSORT:
3355 			return "quicksort";
3356 		case SORT_TYPE_EXTERNAL_SORT:
3357 			return "external sort";
3358 		case SORT_TYPE_EXTERNAL_MERGE:
3359 			return "external merge";
3360 	}
3361 
3362 	return "unknown";
3363 }
3364 
3365 /*
3366  * Convert TuplesortSpaceType to a string.
3367  */
3368 const char *
tuplesort_space_type_name(TuplesortSpaceType t)3369 tuplesort_space_type_name(TuplesortSpaceType t)
3370 {
3371 	Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3372 	return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3373 }
3374 
3375 
3376 /*
3377  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3378  */
3379 
3380 /*
3381  * Convert the existing unordered array of SortTuples to a bounded heap,
3382  * discarding all but the smallest "state->bound" tuples.
3383  *
3384  * When working with a bounded heap, we want to keep the largest entry
3385  * at the root (array entry zero), instead of the smallest as in the normal
3386  * sort case.  This allows us to discard the largest entry cheaply.
3387  * Therefore, we temporarily reverse the sort direction.
3388  */
3389 static void
make_bounded_heap(Tuplesortstate * state)3390 make_bounded_heap(Tuplesortstate *state)
3391 {
3392 	int			tupcount = state->memtupcount;
3393 	int			i;
3394 
3395 	Assert(state->status == TSS_INITIAL);
3396 	Assert(state->bounded);
3397 	Assert(tupcount >= state->bound);
3398 	Assert(SERIAL(state));
3399 
3400 	/* Reverse sort direction so largest entry will be at root */
3401 	reversedirection(state);
3402 
3403 	state->memtupcount = 0;		/* make the heap empty */
3404 	for (i = 0; i < tupcount; i++)
3405 	{
3406 		if (state->memtupcount < state->bound)
3407 		{
3408 			/* Insert next tuple into heap */
3409 			/* Must copy source tuple to avoid possible overwrite */
3410 			SortTuple	stup = state->memtuples[i];
3411 
3412 			tuplesort_heap_insert(state, &stup);
3413 		}
3414 		else
3415 		{
3416 			/*
3417 			 * The heap is full.  Replace the largest entry with the new
3418 			 * tuple, or just discard it, if it's larger than anything already
3419 			 * in the heap.
3420 			 */
3421 			if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3422 			{
3423 				free_sort_tuple(state, &state->memtuples[i]);
3424 				CHECK_FOR_INTERRUPTS();
3425 			}
3426 			else
3427 				tuplesort_heap_replace_top(state, &state->memtuples[i]);
3428 		}
3429 	}
3430 
3431 	Assert(state->memtupcount == state->bound);
3432 	state->status = TSS_BOUNDED;
3433 }
3434 
3435 /*
3436  * Convert the bounded heap to a properly-sorted array
3437  */
3438 static void
sort_bounded_heap(Tuplesortstate * state)3439 sort_bounded_heap(Tuplesortstate *state)
3440 {
3441 	int			tupcount = state->memtupcount;
3442 
3443 	Assert(state->status == TSS_BOUNDED);
3444 	Assert(state->bounded);
3445 	Assert(tupcount == state->bound);
3446 	Assert(SERIAL(state));
3447 
3448 	/*
3449 	 * We can unheapify in place because each delete-top call will remove the
3450 	 * largest entry, which we can promptly store in the newly freed slot at
3451 	 * the end.  Once we're down to a single-entry heap, we're done.
3452 	 */
3453 	while (state->memtupcount > 1)
3454 	{
3455 		SortTuple	stup = state->memtuples[0];
3456 
3457 		/* this sifts-up the next-largest entry and decreases memtupcount */
3458 		tuplesort_heap_delete_top(state);
3459 		state->memtuples[state->memtupcount] = stup;
3460 	}
3461 	state->memtupcount = tupcount;
3462 
3463 	/*
3464 	 * Reverse sort direction back to the original state.  This is not
3465 	 * actually necessary but seems like a good idea for tidiness.
3466 	 */
3467 	reversedirection(state);
3468 
3469 	state->status = TSS_SORTEDINMEM;
3470 	state->boundUsed = true;
3471 }
3472 
3473 /*
3474  * Sort all memtuples using specialized qsort() routines.
3475  *
3476  * Quicksort is used for small in-memory sorts, and external sort runs.
3477  */
3478 static void
tuplesort_sort_memtuples(Tuplesortstate * state)3479 tuplesort_sort_memtuples(Tuplesortstate *state)
3480 {
3481 	Assert(!LEADER(state));
3482 
3483 	if (state->memtupcount > 1)
3484 	{
3485 		/* Can we use the single-key sort function? */
3486 		if (state->onlyKey != NULL)
3487 			qsort_ssup(state->memtuples, state->memtupcount,
3488 					   state->onlyKey);
3489 		else
3490 			qsort_tuple(state->memtuples,
3491 						state->memtupcount,
3492 						state->comparetup,
3493 						state);
3494 	}
3495 }
3496 
3497 /*
3498  * Insert a new tuple into an empty or existing heap, maintaining the
3499  * heap invariant.  Caller is responsible for ensuring there's room.
3500  *
3501  * Note: For some callers, tuple points to a memtuples[] entry above the
3502  * end of the heap.  This is safe as long as it's not immediately adjacent
3503  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3504  * is, it might get overwritten before being moved into the heap!
3505  */
3506 static void
tuplesort_heap_insert(Tuplesortstate * state,SortTuple * tuple)3507 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3508 {
3509 	SortTuple  *memtuples;
3510 	int			j;
3511 
3512 	memtuples = state->memtuples;
3513 	Assert(state->memtupcount < state->memtupsize);
3514 
3515 	CHECK_FOR_INTERRUPTS();
3516 
3517 	/*
3518 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3519 	 * using 1-based array indexes, not 0-based.
3520 	 */
3521 	j = state->memtupcount++;
3522 	while (j > 0)
3523 	{
3524 		int			i = (j - 1) >> 1;
3525 
3526 		if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3527 			break;
3528 		memtuples[j] = memtuples[i];
3529 		j = i;
3530 	}
3531 	memtuples[j] = *tuple;
3532 }
3533 
3534 /*
3535  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
3536  * memtupcount, and sift up to maintain the heap invariant.
3537  *
3538  * The caller has already free'd the tuple the top node points to,
3539  * if necessary.
3540  */
3541 static void
tuplesort_heap_delete_top(Tuplesortstate * state)3542 tuplesort_heap_delete_top(Tuplesortstate *state)
3543 {
3544 	SortTuple  *memtuples = state->memtuples;
3545 	SortTuple  *tuple;
3546 
3547 	if (--state->memtupcount <= 0)
3548 		return;
3549 
3550 	/*
3551 	 * Remove the last tuple in the heap, and re-insert it, by replacing the
3552 	 * current top node with it.
3553 	 */
3554 	tuple = &memtuples[state->memtupcount];
3555 	tuplesort_heap_replace_top(state, tuple);
3556 }
3557 
3558 /*
3559  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
3560  * maintain the heap invariant.
3561  *
3562  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3563  * Heapsort, steps H3-H8).
3564  */
3565 static void
tuplesort_heap_replace_top(Tuplesortstate * state,SortTuple * tuple)3566 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3567 {
3568 	SortTuple  *memtuples = state->memtuples;
3569 	unsigned int i,
3570 				n;
3571 
3572 	Assert(state->memtupcount >= 1);
3573 
3574 	CHECK_FOR_INTERRUPTS();
3575 
3576 	/*
3577 	 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3578 	 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3579 	 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3580 	 */
3581 	n = state->memtupcount;
3582 	i = 0;						/* i is where the "hole" is */
3583 	for (;;)
3584 	{
3585 		unsigned int j = 2 * i + 1;
3586 
3587 		if (j >= n)
3588 			break;
3589 		if (j + 1 < n &&
3590 			COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3591 			j++;
3592 		if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3593 			break;
3594 		memtuples[i] = memtuples[j];
3595 		i = j;
3596 	}
3597 	memtuples[i] = *tuple;
3598 }
3599 
3600 /*
3601  * Function to reverse the sort direction from its current state
3602  *
3603  * It is not safe to call this when performing hash tuplesorts
3604  */
3605 static void
reversedirection(Tuplesortstate * state)3606 reversedirection(Tuplesortstate *state)
3607 {
3608 	SortSupport sortKey = state->sortKeys;
3609 	int			nkey;
3610 
3611 	for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3612 	{
3613 		sortKey->ssup_reverse = !sortKey->ssup_reverse;
3614 		sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3615 	}
3616 }
3617 
3618 
3619 /*
3620  * Tape interface routines
3621  */
3622 
3623 static unsigned int
getlen(Tuplesortstate * state,int tapenum,bool eofOK)3624 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3625 {
3626 	unsigned int len;
3627 
3628 	if (LogicalTapeRead(state->tapeset, tapenum,
3629 						&len, sizeof(len)) != sizeof(len))
3630 		elog(ERROR, "unexpected end of tape");
3631 	if (len == 0 && !eofOK)
3632 		elog(ERROR, "unexpected end of data");
3633 	return len;
3634 }
3635 
3636 static void
markrunend(Tuplesortstate * state,int tapenum)3637 markrunend(Tuplesortstate *state, int tapenum)
3638 {
3639 	unsigned int len = 0;
3640 
3641 	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3642 }
3643 
3644 /*
3645  * Get memory for tuple from within READTUP() routine.
3646  *
3647  * We use next free slot from the slab allocator, or palloc() if the tuple
3648  * is too large for that.
3649  */
3650 static void *
readtup_alloc(Tuplesortstate * state,Size tuplen)3651 readtup_alloc(Tuplesortstate *state, Size tuplen)
3652 {
3653 	SlabSlot   *buf;
3654 
3655 	/*
3656 	 * We pre-allocate enough slots in the slab arena that we should never run
3657 	 * out.
3658 	 */
3659 	Assert(state->slabFreeHead);
3660 
3661 	if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3662 		return MemoryContextAlloc(state->sortcontext, tuplen);
3663 	else
3664 	{
3665 		buf = state->slabFreeHead;
3666 		/* Reuse this slot */
3667 		state->slabFreeHead = buf->nextfree;
3668 
3669 		return buf;
3670 	}
3671 }
3672 
3673 
3674 /*
3675  * Routines specialized for HeapTuple (actually MinimalTuple) case
3676  */
3677 
3678 static int
comparetup_heap(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3679 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3680 {
3681 	SortSupport sortKey = state->sortKeys;
3682 	HeapTupleData ltup;
3683 	HeapTupleData rtup;
3684 	TupleDesc	tupDesc;
3685 	int			nkey;
3686 	int32		compare;
3687 	AttrNumber	attno;
3688 	Datum		datum1,
3689 				datum2;
3690 	bool		isnull1,
3691 				isnull2;
3692 
3693 
3694 	/* Compare the leading sort key */
3695 	compare = ApplySortComparator(a->datum1, a->isnull1,
3696 								  b->datum1, b->isnull1,
3697 								  sortKey);
3698 	if (compare != 0)
3699 		return compare;
3700 
3701 	/* Compare additional sort keys */
3702 	ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3703 	ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3704 	rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3705 	rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3706 	tupDesc = state->tupDesc;
3707 
3708 	if (sortKey->abbrev_converter)
3709 	{
3710 		attno = sortKey->ssup_attno;
3711 
3712 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3713 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3714 
3715 		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3716 												datum2, isnull2,
3717 												sortKey);
3718 		if (compare != 0)
3719 			return compare;
3720 	}
3721 
3722 	sortKey++;
3723 	for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3724 	{
3725 		attno = sortKey->ssup_attno;
3726 
3727 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3728 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3729 
3730 		compare = ApplySortComparator(datum1, isnull1,
3731 									  datum2, isnull2,
3732 									  sortKey);
3733 		if (compare != 0)
3734 			return compare;
3735 	}
3736 
3737 	return 0;
3738 }
3739 
3740 static void
copytup_heap(Tuplesortstate * state,SortTuple * stup,void * tup)3741 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3742 {
3743 	/*
3744 	 * We expect the passed "tup" to be a TupleTableSlot, and form a
3745 	 * MinimalTuple using the exported interface for that.
3746 	 */
3747 	TupleTableSlot *slot = (TupleTableSlot *) tup;
3748 	Datum		original;
3749 	MinimalTuple tuple;
3750 	HeapTupleData htup;
3751 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3752 
3753 	/* copy the tuple into sort storage */
3754 	tuple = ExecCopySlotMinimalTuple(slot);
3755 	stup->tuple = (void *) tuple;
3756 	USEMEM(state, GetMemoryChunkSpace(tuple));
3757 	/* set up first-column key value */
3758 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3759 	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3760 	original = heap_getattr(&htup,
3761 							state->sortKeys[0].ssup_attno,
3762 							state->tupDesc,
3763 							&stup->isnull1);
3764 
3765 	MemoryContextSwitchTo(oldcontext);
3766 
3767 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
3768 	{
3769 		/*
3770 		 * Store ordinary Datum representation, or NULL value.  If there is a
3771 		 * converter it won't expect NULL values, and cost model is not
3772 		 * required to account for NULL, so in that case we avoid calling
3773 		 * converter and just set datum1 to zeroed representation (to be
3774 		 * consistent, and to support cheap inequality tests for NULL
3775 		 * abbreviated keys).
3776 		 */
3777 		stup->datum1 = original;
3778 	}
3779 	else if (!consider_abort_common(state))
3780 	{
3781 		/* Store abbreviated key representation */
3782 		stup->datum1 = state->sortKeys->abbrev_converter(original,
3783 														 state->sortKeys);
3784 	}
3785 	else
3786 	{
3787 		/* Abort abbreviation */
3788 		int			i;
3789 
3790 		stup->datum1 = original;
3791 
3792 		/*
3793 		 * Set state to be consistent with never trying abbreviation.
3794 		 *
3795 		 * Alter datum1 representation in already-copied tuples, so as to
3796 		 * ensure a consistent representation (current tuple was just
3797 		 * handled).  It does not matter if some dumped tuples are already
3798 		 * sorted on tape, since serialized tuples lack abbreviated keys
3799 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
3800 		 */
3801 		for (i = 0; i < state->memtupcount; i++)
3802 		{
3803 			SortTuple  *mtup = &state->memtuples[i];
3804 
3805 			htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3806 				MINIMAL_TUPLE_OFFSET;
3807 			htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3808 											 MINIMAL_TUPLE_OFFSET);
3809 
3810 			mtup->datum1 = heap_getattr(&htup,
3811 										state->sortKeys[0].ssup_attno,
3812 										state->tupDesc,
3813 										&mtup->isnull1);
3814 		}
3815 	}
3816 }
3817 
3818 static void
writetup_heap(Tuplesortstate * state,int tapenum,SortTuple * stup)3819 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3820 {
3821 	MinimalTuple tuple = (MinimalTuple) stup->tuple;
3822 
3823 	/* the part of the MinimalTuple we'll write: */
3824 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3825 	unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3826 
3827 	/* total on-disk footprint: */
3828 	unsigned int tuplen = tupbodylen + sizeof(int);
3829 
3830 	LogicalTapeWrite(state->tapeset, tapenum,
3831 					 (void *) &tuplen, sizeof(tuplen));
3832 	LogicalTapeWrite(state->tapeset, tapenum,
3833 					 (void *) tupbody, tupbodylen);
3834 	if (state->randomAccess)	/* need trailing length word? */
3835 		LogicalTapeWrite(state->tapeset, tapenum,
3836 						 (void *) &tuplen, sizeof(tuplen));
3837 
3838 	if (!state->slabAllocatorUsed)
3839 	{
3840 		FREEMEM(state, GetMemoryChunkSpace(tuple));
3841 		heap_free_minimal_tuple(tuple);
3842 	}
3843 }
3844 
3845 static void
readtup_heap(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)3846 readtup_heap(Tuplesortstate *state, SortTuple *stup,
3847 			 int tapenum, unsigned int len)
3848 {
3849 	unsigned int tupbodylen = len - sizeof(int);
3850 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3851 	MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3852 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3853 	HeapTupleData htup;
3854 
3855 	/* read in the tuple proper */
3856 	tuple->t_len = tuplen;
3857 	LogicalTapeReadExact(state->tapeset, tapenum,
3858 						 tupbody, tupbodylen);
3859 	if (state->randomAccess)	/* need trailing length word? */
3860 		LogicalTapeReadExact(state->tapeset, tapenum,
3861 							 &tuplen, sizeof(tuplen));
3862 	stup->tuple = (void *) tuple;
3863 	/* set up first-column key value */
3864 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3865 	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3866 	stup->datum1 = heap_getattr(&htup,
3867 								state->sortKeys[0].ssup_attno,
3868 								state->tupDesc,
3869 								&stup->isnull1);
3870 }
3871 
3872 /*
3873  * Routines specialized for the CLUSTER case (HeapTuple data, with
3874  * comparisons per a btree index definition)
3875  */
3876 
3877 static int
comparetup_cluster(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)3878 comparetup_cluster(const SortTuple *a, const SortTuple *b,
3879 				   Tuplesortstate *state)
3880 {
3881 	SortSupport sortKey = state->sortKeys;
3882 	HeapTuple	ltup;
3883 	HeapTuple	rtup;
3884 	TupleDesc	tupDesc;
3885 	int			nkey;
3886 	int32		compare;
3887 	Datum		datum1,
3888 				datum2;
3889 	bool		isnull1,
3890 				isnull2;
3891 	AttrNumber	leading = state->indexInfo->ii_IndexAttrNumbers[0];
3892 
3893 	/* Be prepared to compare additional sort keys */
3894 	ltup = (HeapTuple) a->tuple;
3895 	rtup = (HeapTuple) b->tuple;
3896 	tupDesc = state->tupDesc;
3897 
3898 	/* Compare the leading sort key, if it's simple */
3899 	if (leading != 0)
3900 	{
3901 		compare = ApplySortComparator(a->datum1, a->isnull1,
3902 									  b->datum1, b->isnull1,
3903 									  sortKey);
3904 		if (compare != 0)
3905 			return compare;
3906 
3907 		if (sortKey->abbrev_converter)
3908 		{
3909 			datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3910 			datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3911 
3912 			compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3913 													datum2, isnull2,
3914 													sortKey);
3915 		}
3916 		if (compare != 0 || state->nKeys == 1)
3917 			return compare;
3918 		/* Compare additional columns the hard way */
3919 		sortKey++;
3920 		nkey = 1;
3921 	}
3922 	else
3923 	{
3924 		/* Must compare all keys the hard way */
3925 		nkey = 0;
3926 	}
3927 
3928 	if (state->indexInfo->ii_Expressions == NULL)
3929 	{
3930 		/* If not expression index, just compare the proper heap attrs */
3931 
3932 		for (; nkey < state->nKeys; nkey++, sortKey++)
3933 		{
3934 			AttrNumber	attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
3935 
3936 			datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
3937 			datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
3938 
3939 			compare = ApplySortComparator(datum1, isnull1,
3940 										  datum2, isnull2,
3941 										  sortKey);
3942 			if (compare != 0)
3943 				return compare;
3944 		}
3945 	}
3946 	else
3947 	{
3948 		/*
3949 		 * In the expression index case, compute the whole index tuple and
3950 		 * then compare values.  It would perhaps be faster to compute only as
3951 		 * many columns as we need to compare, but that would require
3952 		 * duplicating all the logic in FormIndexDatum.
3953 		 */
3954 		Datum		l_index_values[INDEX_MAX_KEYS];
3955 		bool		l_index_isnull[INDEX_MAX_KEYS];
3956 		Datum		r_index_values[INDEX_MAX_KEYS];
3957 		bool		r_index_isnull[INDEX_MAX_KEYS];
3958 		TupleTableSlot *ecxt_scantuple;
3959 
3960 		/* Reset context each time to prevent memory leakage */
3961 		ResetPerTupleExprContext(state->estate);
3962 
3963 		ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
3964 
3965 		ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
3966 		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3967 					   l_index_values, l_index_isnull);
3968 
3969 		ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
3970 		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3971 					   r_index_values, r_index_isnull);
3972 
3973 		for (; nkey < state->nKeys; nkey++, sortKey++)
3974 		{
3975 			compare = ApplySortComparator(l_index_values[nkey],
3976 										  l_index_isnull[nkey],
3977 										  r_index_values[nkey],
3978 										  r_index_isnull[nkey],
3979 										  sortKey);
3980 			if (compare != 0)
3981 				return compare;
3982 		}
3983 	}
3984 
3985 	return 0;
3986 }
3987 
3988 static void
copytup_cluster(Tuplesortstate * state,SortTuple * stup,void * tup)3989 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3990 {
3991 	HeapTuple	tuple = (HeapTuple) tup;
3992 	Datum		original;
3993 	MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3994 
3995 	/* copy the tuple into sort storage */
3996 	tuple = heap_copytuple(tuple);
3997 	stup->tuple = (void *) tuple;
3998 	USEMEM(state, GetMemoryChunkSpace(tuple));
3999 
4000 	MemoryContextSwitchTo(oldcontext);
4001 
4002 	/*
4003 	 * set up first-column key value, and potentially abbreviate, if it's a
4004 	 * simple column
4005 	 */
4006 	if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
4007 		return;
4008 
4009 	original = heap_getattr(tuple,
4010 							state->indexInfo->ii_IndexAttrNumbers[0],
4011 							state->tupDesc,
4012 							&stup->isnull1);
4013 
4014 	if (!state->sortKeys->abbrev_converter || stup->isnull1)
4015 	{
4016 		/*
4017 		 * Store ordinary Datum representation, or NULL value.  If there is a
4018 		 * converter it won't expect NULL values, and cost model is not
4019 		 * required to account for NULL, so in that case we avoid calling
4020 		 * converter and just set datum1 to zeroed representation (to be
4021 		 * consistent, and to support cheap inequality tests for NULL
4022 		 * abbreviated keys).
4023 		 */
4024 		stup->datum1 = original;
4025 	}
4026 	else if (!consider_abort_common(state))
4027 	{
4028 		/* Store abbreviated key representation */
4029 		stup->datum1 = state->sortKeys->abbrev_converter(original,
4030 														 state->sortKeys);
4031 	}
4032 	else
4033 	{
4034 		/* Abort abbreviation */
4035 		int			i;
4036 
4037 		stup->datum1 = original;
4038 
4039 		/*
4040 		 * Set state to be consistent with never trying abbreviation.
4041 		 *
4042 		 * Alter datum1 representation in already-copied tuples, so as to
4043 		 * ensure a consistent representation (current tuple was just
4044 		 * handled).  It does not matter if some dumped tuples are already
4045 		 * sorted on tape, since serialized tuples lack abbreviated keys
4046 		 * (TSS_BUILDRUNS state prevents control reaching here in any case).
4047 		 */
4048 		for (i = 0; i < state->memtupcount; i++)
4049 		{
4050 			SortTuple  *mtup = &state->memtuples[i];
4051 
4052 			tuple = (HeapTuple) mtup->tuple;
4053 			mtup->datum1 = heap_getattr(tuple,
4054 										state->indexInfo->ii_IndexAttrNumbers[0],
4055 										state->tupDesc,
4056 										&mtup->isnull1);
4057 		}
4058 	}
4059 }
4060 
4061 static void
writetup_cluster(Tuplesortstate * state,int tapenum,SortTuple * stup)4062 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
4063 {
4064 	HeapTuple	tuple = (HeapTuple) stup->tuple;
4065 	unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
4066 
4067 	/* We need to store t_self, but not other fields of HeapTupleData */
4068 	LogicalTapeWrite(state->tapeset, tapenum,
4069 					 &tuplen, sizeof(tuplen));
4070 	LogicalTapeWrite(state->tapeset, tapenum,
4071 					 &tuple->t_self, sizeof(ItemPointerData));
4072 	LogicalTapeWrite(state->tapeset, tapenum,
4073 					 tuple->t_data, tuple->t_len);
4074 	if (state->randomAccess)	/* need trailing length word? */
4075 		LogicalTapeWrite(state->tapeset, tapenum,
4076 						 &tuplen, sizeof(tuplen));
4077 
4078 	if (!state->slabAllocatorUsed)
4079 	{
4080 		FREEMEM(state, GetMemoryChunkSpace(tuple));
4081 		heap_freetuple(tuple);
4082 	}
4083 }
4084 
4085 static void
readtup_cluster(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int tuplen)4086 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
4087 				int tapenum, unsigned int tuplen)
4088 {
4089 	unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
4090 	HeapTuple	tuple = (HeapTuple) readtup_alloc(state,
4091 												  t_len + HEAPTUPLESIZE);
4092 
4093 	/* Reconstruct the HeapTupleData header */
4094 	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
4095 	tuple->t_len = t_len;
4096 	LogicalTapeReadExact(state->tapeset, tapenum,
4097 						 &tuple->t_self, sizeof(ItemPointerData));
4098 	/* We don't currently bother to reconstruct t_tableOid */
4099 	tuple->t_tableOid = InvalidOid;
4100 	/* Read in the tuple body */
4101 	LogicalTapeReadExact(state->tapeset, tapenum,
4102 						 tuple->t_data, tuple->t_len);
4103 	if (state->randomAccess)	/* need trailing length word? */
4104 		LogicalTapeReadExact(state->tapeset, tapenum,
4105 							 &tuplen, sizeof(tuplen));
4106 	stup->tuple = (void *) tuple;
4107 	/* set up first-column key value, if it's a simple column */
4108 	if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
4109 		stup->datum1 = heap_getattr(tuple,
4110 									state->indexInfo->ii_IndexAttrNumbers[0],
4111 									state->tupDesc,
4112 									&stup->isnull1);
4113 }
4114 
4115 /*
4116  * Routines specialized for IndexTuple case
4117  *
4118  * The btree and hash cases require separate comparison functions, but the
4119  * IndexTuple representation is the same so the copy/write/read support
4120  * functions can be shared.
4121  */
4122 
4123 static int
comparetup_index_btree(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4124 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
4125 					   Tuplesortstate *state)
4126 {
4127 	/*
4128 	 * This is similar to comparetup_heap(), but expects index tuples.  There
4129 	 * is also special handling for enforcing uniqueness, and special
4130 	 * treatment for equal keys at the end.
4131 	 */
4132 	SortSupport sortKey = state->sortKeys;
4133 	IndexTuple	tuple1;
4134 	IndexTuple	tuple2;
4135 	int			keysz;
4136 	TupleDesc	tupDes;
4137 	bool		equal_hasnull = false;
4138 	int			nkey;
4139 	int32		compare;
4140 	Datum		datum1,
4141 				datum2;
4142 	bool		isnull1,
4143 				isnull2;
4144 
4145 
4146 	/* Compare the leading sort key */
4147 	compare = ApplySortComparator(a->datum1, a->isnull1,
4148 								  b->datum1, b->isnull1,
4149 								  sortKey);
4150 	if (compare != 0)
4151 		return compare;
4152 
4153 	/* Compare additional sort keys */
4154 	tuple1 = (IndexTuple) a->tuple;
4155 	tuple2 = (IndexTuple) b->tuple;
4156 	keysz = state->nKeys;
4157 	tupDes = RelationGetDescr(state->indexRel);
4158 
4159 	if (sortKey->abbrev_converter)
4160 	{
4161 		datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
4162 		datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
4163 
4164 		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
4165 												datum2, isnull2,
4166 												sortKey);
4167 		if (compare != 0)
4168 			return compare;
4169 	}
4170 
4171 	/* they are equal, so we only need to examine one null flag */
4172 	if (a->isnull1)
4173 		equal_hasnull = true;
4174 
4175 	sortKey++;
4176 	for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4177 	{
4178 		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4179 		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4180 
4181 		compare = ApplySortComparator(datum1, isnull1,
4182 									  datum2, isnull2,
4183 									  sortKey);
4184 		if (compare != 0)
4185 			return compare;		/* done when we find unequal attributes */
4186 
4187 		/* they are equal, so we only need to examine one null flag */
4188 		if (isnull1)
4189 			equal_hasnull = true;
4190 	}
4191 
4192 	/*
4193 	 * If btree has asked us to enforce uniqueness, complain if two equal
4194 	 * tuples are detected (unless there was at least one NULL field).
4195 	 *
4196 	 * It is sufficient to make the test here, because if two tuples are equal
4197 	 * they *must* get compared at some stage of the sort --- otherwise the
4198 	 * sort algorithm wouldn't have checked whether one must appear before the
4199 	 * other.
4200 	 */
4201 	if (state->enforceUnique && !equal_hasnull)
4202 	{
4203 		Datum		values[INDEX_MAX_KEYS];
4204 		bool		isnull[INDEX_MAX_KEYS];
4205 		char	   *key_desc;
4206 
4207 		/*
4208 		 * Some rather brain-dead implementations of qsort (such as the one in
4209 		 * QNX 4) will sometimes call the comparison routine to compare a
4210 		 * value to itself, but we always use our own implementation, which
4211 		 * does not.
4212 		 */
4213 		Assert(tuple1 != tuple2);
4214 
4215 		index_deform_tuple(tuple1, tupDes, values, isnull);
4216 
4217 		key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4218 
4219 		ereport(ERROR,
4220 				(errcode(ERRCODE_UNIQUE_VIOLATION),
4221 				 errmsg("could not create unique index \"%s\"",
4222 						RelationGetRelationName(state->indexRel)),
4223 				 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4224 				 errdetail("Duplicate keys exist."),
4225 				 errtableconstraint(state->heapRel,
4226 									RelationGetRelationName(state->indexRel))));
4227 	}
4228 
4229 	/*
4230 	 * If key values are equal, we sort on ItemPointer.  This is required for
4231 	 * btree indexes, since heap TID is treated as an implicit last key
4232 	 * attribute in order to ensure that all keys in the index are physically
4233 	 * unique.
4234 	 */
4235 	{
4236 		BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4237 		BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4238 
4239 		if (blk1 != blk2)
4240 			return (blk1 < blk2) ? -1 : 1;
4241 	}
4242 	{
4243 		OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4244 		OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4245 
4246 		if (pos1 != pos2)
4247 			return (pos1 < pos2) ? -1 : 1;
4248 	}
4249 
4250 	/* ItemPointer values should never be equal */
4251 	Assert(false);
4252 
4253 	return 0;
4254 }
4255 
4256 static int
comparetup_index_hash(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4257 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4258 					  Tuplesortstate *state)
4259 {
4260 	Bucket		bucket1;
4261 	Bucket		bucket2;
4262 	IndexTuple	tuple1;
4263 	IndexTuple	tuple2;
4264 
4265 	/*
4266 	 * Fetch hash keys and mask off bits we don't want to sort by. We know
4267 	 * that the first column of the index tuple is the hash key.
4268 	 */
4269 	Assert(!a->isnull1);
4270 	bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4271 								   state->max_buckets, state->high_mask,
4272 								   state->low_mask);
4273 	Assert(!b->isnull1);
4274 	bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4275 								   state->max_buckets, state->high_mask,
4276 								   state->low_mask);
4277 	if (bucket1 > bucket2)
4278 		return 1;
4279 	else if (bucket1 < bucket2)
4280 		return -1;
4281 
4282 	/*
4283 	 * If hash values are equal, we sort on ItemPointer.  This does not affect
4284 	 * validity of the finished index, but it may be useful to have index
4285 	 * scans in physical order.
4286 	 */
4287 	tuple1 = (IndexTuple) a->tuple;
4288 	tuple2 = (IndexTuple) b->tuple;
4289 
4290 	{
4291 		BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4292 		BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4293 
4294 		if (blk1 != blk2)
4295 			return (blk1 < blk2) ? -1 : 1;
4296 	}
4297 	{
4298 		OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4299 		OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4300 
4301 		if (pos1 != pos2)
4302 			return (pos1 < pos2) ? -1 : 1;
4303 	}
4304 
4305 	/* ItemPointer values should never be equal */
4306 	Assert(false);
4307 
4308 	return 0;
4309 }
4310 
4311 static void
copytup_index(Tuplesortstate * state,SortTuple * stup,void * tup)4312 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4313 {
4314 	/* Not currently needed */
4315 	elog(ERROR, "copytup_index() should not be called");
4316 }
4317 
4318 static void
writetup_index(Tuplesortstate * state,int tapenum,SortTuple * stup)4319 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4320 {
4321 	IndexTuple	tuple = (IndexTuple) stup->tuple;
4322 	unsigned int tuplen;
4323 
4324 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4325 	LogicalTapeWrite(state->tapeset, tapenum,
4326 					 (void *) &tuplen, sizeof(tuplen));
4327 	LogicalTapeWrite(state->tapeset, tapenum,
4328 					 (void *) tuple, IndexTupleSize(tuple));
4329 	if (state->randomAccess)	/* need trailing length word? */
4330 		LogicalTapeWrite(state->tapeset, tapenum,
4331 						 (void *) &tuplen, sizeof(tuplen));
4332 
4333 	if (!state->slabAllocatorUsed)
4334 	{
4335 		FREEMEM(state, GetMemoryChunkSpace(tuple));
4336 		pfree(tuple);
4337 	}
4338 }
4339 
4340 static void
readtup_index(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4341 readtup_index(Tuplesortstate *state, SortTuple *stup,
4342 			  int tapenum, unsigned int len)
4343 {
4344 	unsigned int tuplen = len - sizeof(unsigned int);
4345 	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tuplen);
4346 
4347 	LogicalTapeReadExact(state->tapeset, tapenum,
4348 						 tuple, tuplen);
4349 	if (state->randomAccess)	/* need trailing length word? */
4350 		LogicalTapeReadExact(state->tapeset, tapenum,
4351 							 &tuplen, sizeof(tuplen));
4352 	stup->tuple = (void *) tuple;
4353 	/* set up first-column key value */
4354 	stup->datum1 = index_getattr(tuple,
4355 								 1,
4356 								 RelationGetDescr(state->indexRel),
4357 								 &stup->isnull1);
4358 }
4359 
4360 /*
4361  * Routines specialized for DatumTuple case
4362  */
4363 
4364 static int
comparetup_datum(const SortTuple * a,const SortTuple * b,Tuplesortstate * state)4365 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4366 {
4367 	int			compare;
4368 
4369 	compare = ApplySortComparator(a->datum1, a->isnull1,
4370 								  b->datum1, b->isnull1,
4371 								  state->sortKeys);
4372 	if (compare != 0)
4373 		return compare;
4374 
4375 	/* if we have abbreviations, then "tuple" has the original value */
4376 
4377 	if (state->sortKeys->abbrev_converter)
4378 		compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4379 												PointerGetDatum(b->tuple), b->isnull1,
4380 												state->sortKeys);
4381 
4382 	return compare;
4383 }
4384 
4385 static void
copytup_datum(Tuplesortstate * state,SortTuple * stup,void * tup)4386 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4387 {
4388 	/* Not currently needed */
4389 	elog(ERROR, "copytup_datum() should not be called");
4390 }
4391 
4392 static void
writetup_datum(Tuplesortstate * state,int tapenum,SortTuple * stup)4393 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4394 {
4395 	void	   *waddr;
4396 	unsigned int tuplen;
4397 	unsigned int writtenlen;
4398 
4399 	if (stup->isnull1)
4400 	{
4401 		waddr = NULL;
4402 		tuplen = 0;
4403 	}
4404 	else if (!state->tuples)
4405 	{
4406 		waddr = &stup->datum1;
4407 		tuplen = sizeof(Datum);
4408 	}
4409 	else
4410 	{
4411 		waddr = stup->tuple;
4412 		tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4413 		Assert(tuplen != 0);
4414 	}
4415 
4416 	writtenlen = tuplen + sizeof(unsigned int);
4417 
4418 	LogicalTapeWrite(state->tapeset, tapenum,
4419 					 (void *) &writtenlen, sizeof(writtenlen));
4420 	LogicalTapeWrite(state->tapeset, tapenum,
4421 					 waddr, tuplen);
4422 	if (state->randomAccess)	/* need trailing length word? */
4423 		LogicalTapeWrite(state->tapeset, tapenum,
4424 						 (void *) &writtenlen, sizeof(writtenlen));
4425 
4426 	if (!state->slabAllocatorUsed && stup->tuple)
4427 	{
4428 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4429 		pfree(stup->tuple);
4430 	}
4431 }
4432 
4433 static void
readtup_datum(Tuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)4434 readtup_datum(Tuplesortstate *state, SortTuple *stup,
4435 			  int tapenum, unsigned int len)
4436 {
4437 	unsigned int tuplen = len - sizeof(unsigned int);
4438 
4439 	if (tuplen == 0)
4440 	{
4441 		/* it's NULL */
4442 		stup->datum1 = (Datum) 0;
4443 		stup->isnull1 = true;
4444 		stup->tuple = NULL;
4445 	}
4446 	else if (!state->tuples)
4447 	{
4448 		Assert(tuplen == sizeof(Datum));
4449 		LogicalTapeReadExact(state->tapeset, tapenum,
4450 							 &stup->datum1, tuplen);
4451 		stup->isnull1 = false;
4452 		stup->tuple = NULL;
4453 	}
4454 	else
4455 	{
4456 		void	   *raddr = readtup_alloc(state, tuplen);
4457 
4458 		LogicalTapeReadExact(state->tapeset, tapenum,
4459 							 raddr, tuplen);
4460 		stup->datum1 = PointerGetDatum(raddr);
4461 		stup->isnull1 = false;
4462 		stup->tuple = raddr;
4463 	}
4464 
4465 	if (state->randomAccess)	/* need trailing length word? */
4466 		LogicalTapeReadExact(state->tapeset, tapenum,
4467 							 &tuplen, sizeof(tuplen));
4468 }
4469 
4470 /*
4471  * Parallel sort routines
4472  */
4473 
4474 /*
4475  * tuplesort_estimate_shared - estimate required shared memory allocation
4476  *
4477  * nWorkers is an estimate of the number of workers (it's the number that
4478  * will be requested).
4479  */
4480 Size
tuplesort_estimate_shared(int nWorkers)4481 tuplesort_estimate_shared(int nWorkers)
4482 {
4483 	Size		tapesSize;
4484 
4485 	Assert(nWorkers > 0);
4486 
4487 	/* Make sure that BufFile shared state is MAXALIGN'd */
4488 	tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4489 	tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4490 
4491 	return tapesSize;
4492 }
4493 
4494 /*
4495  * tuplesort_initialize_shared - initialize shared tuplesort state
4496  *
4497  * Must be called from leader process before workers are launched, to
4498  * establish state needed up-front for worker tuplesortstates.  nWorkers
4499  * should match the argument passed to tuplesort_estimate_shared().
4500  */
4501 void
tuplesort_initialize_shared(Sharedsort * shared,int nWorkers,dsm_segment * seg)4502 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4503 {
4504 	int			i;
4505 
4506 	Assert(nWorkers > 0);
4507 
4508 	SpinLockInit(&shared->mutex);
4509 	shared->currentWorker = 0;
4510 	shared->workersFinished = 0;
4511 	SharedFileSetInit(&shared->fileset, seg);
4512 	shared->nTapes = nWorkers;
4513 	for (i = 0; i < nWorkers; i++)
4514 	{
4515 		shared->tapes[i].firstblocknumber = 0L;
4516 	}
4517 }
4518 
4519 /*
4520  * tuplesort_attach_shared - attach to shared tuplesort state
4521  *
4522  * Must be called by all worker processes.
4523  */
4524 void
tuplesort_attach_shared(Sharedsort * shared,dsm_segment * seg)4525 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4526 {
4527 	/* Attach to SharedFileSet */
4528 	SharedFileSetAttach(&shared->fileset, seg);
4529 }
4530 
4531 /*
4532  * worker_get_identifier - Assign and return ordinal identifier for worker
4533  *
4534  * The order in which these are assigned is not well defined, and should not
4535  * matter; worker numbers across parallel sort participants need only be
4536  * distinct and gapless.  logtape.c requires this.
4537  *
4538  * Note that the identifiers assigned from here have no relation to
4539  * ParallelWorkerNumber number, to avoid making any assumption about
4540  * caller's requirements.  However, we do follow the ParallelWorkerNumber
4541  * convention of representing a non-worker with worker number -1.  This
4542  * includes the leader, as well as serial Tuplesort processes.
4543  */
4544 static int
worker_get_identifier(Tuplesortstate * state)4545 worker_get_identifier(Tuplesortstate *state)
4546 {
4547 	Sharedsort *shared = state->shared;
4548 	int			worker;
4549 
4550 	Assert(WORKER(state));
4551 
4552 	SpinLockAcquire(&shared->mutex);
4553 	worker = shared->currentWorker++;
4554 	SpinLockRelease(&shared->mutex);
4555 
4556 	return worker;
4557 }
4558 
4559 /*
4560  * worker_freeze_result_tape - freeze worker's result tape for leader
4561  *
4562  * This is called by workers just after the result tape has been determined,
4563  * instead of calling LogicalTapeFreeze() directly.  They do so because
4564  * workers require a few additional steps over similar serial
4565  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
4566  * steps are around freeing now unneeded resources, and representing to
4567  * leader that worker's input run is available for its merge.
4568  *
4569  * There should only be one final output run for each worker, which consists
4570  * of all tuples that were originally input into worker.
4571  */
4572 static void
worker_freeze_result_tape(Tuplesortstate * state)4573 worker_freeze_result_tape(Tuplesortstate *state)
4574 {
4575 	Sharedsort *shared = state->shared;
4576 	TapeShare	output;
4577 
4578 	Assert(WORKER(state));
4579 	Assert(state->result_tape != -1);
4580 	Assert(state->memtupcount == 0);
4581 
4582 	/*
4583 	 * Free most remaining memory, in case caller is sensitive to our holding
4584 	 * on to it.  memtuples may not be a tiny merge heap at this point.
4585 	 */
4586 	pfree(state->memtuples);
4587 	/* Be tidy */
4588 	state->memtuples = NULL;
4589 	state->memtupsize = 0;
4590 
4591 	/*
4592 	 * Parallel worker requires result tape metadata, which is to be stored in
4593 	 * shared memory for leader
4594 	 */
4595 	LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4596 
4597 	/* Store properties of output tape, and update finished worker count */
4598 	SpinLockAcquire(&shared->mutex);
4599 	shared->tapes[state->worker] = output;
4600 	shared->workersFinished++;
4601 	SpinLockRelease(&shared->mutex);
4602 }
4603 
4604 /*
4605  * worker_nomergeruns - dump memtuples in worker, without merging
4606  *
4607  * This called as an alternative to mergeruns() with a worker when no
4608  * merging is required.
4609  */
4610 static void
worker_nomergeruns(Tuplesortstate * state)4611 worker_nomergeruns(Tuplesortstate *state)
4612 {
4613 	Assert(WORKER(state));
4614 	Assert(state->result_tape == -1);
4615 
4616 	state->result_tape = state->tp_tapenum[state->destTape];
4617 	worker_freeze_result_tape(state);
4618 }
4619 
4620 /*
4621  * leader_takeover_tapes - create tapeset for leader from worker tapes
4622  *
4623  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
4624  * sorting has occurred in workers, all of which must have already returned
4625  * from tuplesort_performsort().
4626  *
4627  * When this returns, leader process is left in a state that is virtually
4628  * indistinguishable from it having generated runs as a serial external sort
4629  * might have.
4630  */
4631 static void
leader_takeover_tapes(Tuplesortstate * state)4632 leader_takeover_tapes(Tuplesortstate *state)
4633 {
4634 	Sharedsort *shared = state->shared;
4635 	int			nParticipants = state->nParticipants;
4636 	int			workersFinished;
4637 	int			j;
4638 
4639 	Assert(LEADER(state));
4640 	Assert(nParticipants >= 1);
4641 
4642 	SpinLockAcquire(&shared->mutex);
4643 	workersFinished = shared->workersFinished;
4644 	SpinLockRelease(&shared->mutex);
4645 
4646 	if (nParticipants != workersFinished)
4647 		elog(ERROR, "cannot take over tapes before all workers finish");
4648 
4649 	/*
4650 	 * Create the tapeset from worker tapes, including a leader-owned tape at
4651 	 * the end.  Parallel workers are far more expensive than logical tapes,
4652 	 * so the number of tapes allocated here should never be excessive.
4653 	 *
4654 	 * We still have a leader tape, though it's not possible to write to it
4655 	 * due to restrictions in the shared fileset infrastructure used by
4656 	 * logtape.c.  It will never be written to in practice because
4657 	 * randomAccess is disallowed for parallel sorts.
4658 	 */
4659 	inittapestate(state, nParticipants + 1);
4660 	state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
4661 										  shared->tapes, &shared->fileset,
4662 										  state->worker);
4663 
4664 	/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4665 	state->currentRun = nParticipants;
4666 
4667 	/*
4668 	 * Initialize variables of Algorithm D to be consistent with runs from
4669 	 * workers having been generated in the leader.
4670 	 *
4671 	 * There will always be exactly 1 run per worker, and exactly one input
4672 	 * tape per run, because workers always output exactly 1 run, even when
4673 	 * there were no input tuples for workers to sort.
4674 	 */
4675 	for (j = 0; j < state->maxTapes; j++)
4676 	{
4677 		/* One real run; no dummy runs for worker tapes */
4678 		state->tp_fib[j] = 1;
4679 		state->tp_runs[j] = 1;
4680 		state->tp_dummy[j] = 0;
4681 		state->tp_tapenum[j] = j;
4682 	}
4683 	/* Leader tape gets one dummy run, and no real runs */
4684 	state->tp_fib[state->tapeRange] = 0;
4685 	state->tp_runs[state->tapeRange] = 0;
4686 	state->tp_dummy[state->tapeRange] = 1;
4687 
4688 	state->Level = 1;
4689 	state->destTape = 0;
4690 
4691 	state->status = TSS_BUILDRUNS;
4692 }
4693 
4694 /*
4695  * Convenience routine to free a tuple previously loaded into sort memory
4696  */
4697 static void
free_sort_tuple(Tuplesortstate * state,SortTuple * stup)4698 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4699 {
4700 	if (stup->tuple)
4701 	{
4702 		FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4703 		pfree(stup->tuple);
4704 		stup->tuple = NULL;
4705 	}
4706 }
4707