1 /*-------------------------------------------------------------------------
2  *
3  * rumsort.c
4  *	  Generalized tuple sorting routines.
5  *
6  * This module handles sorting of RumSortItem or RumScanItem structures.
7  * It contains copy of static functions from
8  * src/backend/utils/sort/tuplesort.c.
9  *
10  *
11  * Portions Copyright (c) 2015-2019, Postgres Professional
12  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 #include "miscadmin.h"
20 #include "rumsort.h"
21 
22 #include "commands/tablespace.h"
23 #include "executor/executor.h"
24 #include "utils/logtape.h"
25 #include "utils/pg_rusage.h"
26 
27 #include "rum.h" /* RumItem */
28 
29 /* sort-type codes for sort__start probes */
30 #define HEAP_SORT		0
31 #define INDEX_SORT		1
32 #define DATUM_SORT		2
33 #define CLUSTER_SORT	3
34 
35 #if PG_VERSION_NUM < 100000
36 /* Provide fallback for old version of tape interface for 9.6 */
37 #define LogicalTapeRewindForRead(x, y, z) LogicalTapeRewind((x), (y), false)
38 #define LogicalTapeRewindForWrite(x, y) LogicalTapeRewind((x), (y), true)
39 #endif
40 
41 #if PG_VERSION_NUM >= 110000
42 #if PG_VERSION_NUM >= 130000
43 #define LogicalTapeSetCreate(X) LogicalTapeSetCreate(X, false, NULL, NULL, 1)
44 #else
45 #define LogicalTapeSetCreate(X) LogicalTapeSetCreate(X, NULL, NULL, 1)
46 #endif
47 #define LogicalTapeFreeze(X, Y) LogicalTapeFreeze(X, Y, NULL)
48 #endif
49 
50 /*
51  * Below are copied definitions from src/backend/utils/sort/tuplesort.c.
52  */
53 
54 /* For PGPRO since v.13 trace_sort is imported from backend by including its
55  * declaration in guc.h (guc.h contains added Windows export/import magic to be done
56  * during postgres.exe compilation).
57  * For older or non-PGPRO versions on Windows platform trace_sort is not exported by
58  * backend so it is declared local for this case.
59  */
60 #ifdef TRACE_SORT
61 #if ( !defined (_MSC_VER) || (PG_VERSION_NUM >= 130000 && defined (PGPRO_VERSION)) )
62 #include "utils/guc.h"
63 #else
64 bool	trace_sort = false;
65 #endif
66 #endif
67 
68 typedef struct
69 {
70 	void	   *tuple;			/* the tuple proper */
71 	Datum		datum1;			/* value of first key column */
72 	bool		isnull1;		/* is first key column NULL? */
73 	int			tupindex;		/* see notes above */
74 } SortTuple;
75 
76 typedef enum
77 {
78 	TSS_INITIAL,				/* Loading tuples; still within memory limit */
79 	TSS_BOUNDED,				/* Loading tuples into bounded-size heap */
80 	TSS_BUILDRUNS,				/* Loading tuples; writing to tape */
81 	TSS_SORTEDINMEM,			/* Sort completed entirely in memory */
82 	TSS_SORTEDONTAPE,			/* Sort completed, final run is on tape */
83 	TSS_FINALMERGE				/* Performing final merge on-the-fly */
84 } TupSortStatus;
85 
86 #define MINORDER		6		/* minimum merge order */
87 #define TAPE_BUFFER_OVERHEAD		(BLCKSZ * 3)
88 #define MERGE_BUFFER_SIZE			(BLCKSZ * 32)
89 
90 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
91 												RumTuplesortstate *state);
92 
93 /*
94  * Renamed copy of Tuplesortstate.
95  */
96 struct RumTuplesortstate
97 {
98 	TupSortStatus status;		/* enumerated value as shown above */
99 	int			nKeys;			/* number of columns in sort key */
100 	bool		randomAccess;	/* did caller request random access? */
101 	bool		bounded;		/* did caller specify a maximum number of
102 								 * tuples to return? */
103 	bool		boundUsed;		/* true if we made use of a bounded heap */
104 	int			bound;			/* if bounded, the maximum number of tuples */
105 	long		availMem;		/* remaining memory available, in bytes */
106 	long		allowedMem;		/* total memory allowed, in bytes */
107 	int			maxTapes;		/* number of tapes (Knuth's T) */
108 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
109 	MemoryContext sortcontext;	/* memory context holding all sort data */
110 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
111 
112 	/*
113 	 * These function pointers decouple the routines that must know what kind
114 	 * of tuple we are sorting from the routines that don't need to know it.
115 	 * They are set up by the rum_tuplesort_begin_xxx routines.
116 	 *
117 	 * Function to compare two tuples; result is per qsort() convention, ie:
118 	 * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
119 	 * qsort_arg_comparator.
120 	 */
121 	SortTupleComparator comparetup;
122 
123 	/*
124 	 * Function to copy a supplied input tuple into palloc'd space and set up
125 	 * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
126 	 * state->availMem must be decreased by the amount of space used for the
127 	 * tuple copy (note the SortTuple struct itself is not counted).
128 	 */
129 	void		(*copytup) (RumTuplesortstate *state, SortTuple *stup, void *tup);
130 
131 	/*
132 	 * Function to write a stored tuple onto tape.  The representation of the
133 	 * tuple on tape need not be the same as it is in memory; requirements on
134 	 * the tape representation are given below.  After writing the tuple,
135 	 * pfree() the out-of-line data (not the SortTuple struct!), and increase
136 	 * state->availMem by the amount of memory space thereby released.
137 	 */
138 	void		(*writetup) (RumTuplesortstate *state, int tapenum,
139 										 SortTuple *stup);
140 
141 	/*
142 	 * Function to read a stored tuple from tape back into memory. 'len' is
143 	 * the already-read length of the stored tuple.  Create a palloc'd copy,
144 	 * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
145 	 * decrease state->availMem by the amount of memory space consumed.
146 	 */
147 	void		(*readtup) (RumTuplesortstate *state, SortTuple *stup,
148 										int tapenum, unsigned int len);
149 
150 	/*
151 	 * Function to reverse the sort direction from its current state. (We
152 	 * could dispense with this if we wanted to enforce that all variants
153 	 * represent the sort key information alike.)
154 	 */
155 	void		(*reversedirection) (RumTuplesortstate *state);
156 
157 	/*
158 	 * This array holds the tuples now in sort memory.  If we are in state
159 	 * INITIAL, the tuples are in no particular order; if we are in state
160 	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
161 	 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
162 	 * H.  (Note that memtupcount only counts the tuples that are part of the
163 	 * heap --- during merge passes, memtuples[] entries beyond tapeRange are
164 	 * never in the heap and are used to hold pre-read tuples.)  In state
165 	 * SORTEDONTAPE, the array is not used.
166 	 */
167 	SortTuple  *memtuples;		/* array of SortTuple structs */
168 	int			memtupcount;	/* number of tuples currently present */
169 	int			memtupsize;		/* allocated length of memtuples array */
170 	bool		growmemtuples;	/* memtuples' growth still underway? */
171 
172 	/* Buffer size to use for reading input tapes, during merge. */
173 	size_t		read_buffer_size;
174 
175 	/*
176 	 * While building initial runs, this is the current output run number
177 	 * (starting at 0).  Afterwards, it is the number of initial runs we made.
178 	 */
179 	int			currentRun;
180 
181 	/*
182 	 * Unless otherwise noted, all pointer variables below are pointers to
183 	 * arrays of length maxTapes, holding per-tape data.
184 	 */
185 
186 	/*
187 	 * These variables are only used during merge passes.  mergeactive[i] is
188 	 * true if we are reading an input run from (actual) tape number i and
189 	 * have not yet exhausted that run.  mergenext[i] is the memtuples index
190 	 * of the next pre-read tuple (next to be loaded into the heap) for tape
191 	 * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
192 	 * points to the last pre-read tuple from each tape.  mergeavailslots[i]
193 	 * is the number of unused memtuples[] slots reserved for tape i, and
194 	 * mergeavailmem[i] is the amount of unused space allocated for tape i.
195 	 * mergefreelist and mergefirstfree keep track of unused locations in the
196 	 * memtuples[] array.  The memtuples[].tupindex fields link together
197 	 * pre-read tuples for each tape as well as recycled locations in
198 	 * mergefreelist. It is OK to use 0 as a null link in these lists, because
199 	 * memtuples[0] is part of the merge heap and is never a pre-read tuple.
200 	 */
201 	bool	   *mergeactive;	/* active input run source? */
202 	int		   *mergenext;		/* first preread tuple for each source */
203 	int		   *mergelast;		/* last preread tuple for each source */
204 	int		   *mergeavailslots;	/* slots left for prereading each tape */
205 	long	   *mergeavailmem;	/* availMem for prereading each tape */
206 	int			mergefreelist;	/* head of freelist of recycled slots */
207 	int			mergefirstfree; /* first slot never used in this merge */
208 
209 	/*
210 	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
211 	 * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
212 	 * "logical" and "actual" tape numbers straight!
213 	 */
214 	int			Level;			/* Knuth's l */
215 	int			destTape;		/* current output tape (Knuth's j, less 1) */
216 	int		   *tp_fib;			/* Target Fibonacci run counts (A[]) */
217 	int		   *tp_runs;		/* # of real runs on each tape */
218 	int		   *tp_dummy;		/* # of dummy runs for each tape (D[]) */
219 	int		   *tp_tapenum;		/* Actual tape numbers (TAPE[]) */
220 	int			activeTapes;	/* # of active input tapes in merge pass */
221 
222 	/*
223 	 * These variables are used after completion of sorting to keep track of
224 	 * the next tuple to return.  (In the tape case, the tape's current read
225 	 * position is also critical state.)
226 	 */
227 	int			result_tape;	/* actual tape number of finished output */
228 	int			current;		/* array index (only used if SORTEDINMEM) */
229 	bool		eof_reached;	/* reached EOF (needed for cursors) */
230 
231 	/* markpos_xxx holds marked position for mark and restore */
232 	long		markpos_block;	/* tape block# (only used if SORTEDONTAPE) */
233 	int			markpos_offset; /* saved "current", or offset in tape block */
234 	bool		markpos_eof;	/* saved "eof_reached" */
235 
236 	/*
237 	 * These variables are specific to the MinimalTuple case; they are set by
238 	 * rum_tuplesort_begin_heap and used only by the MinimalTuple routines.
239 	 */
240 	TupleDesc	tupDesc;
241 	SortSupport sortKeys;		/* array of length nKeys */
242 
243 	/*
244 	 * This variable is shared by the single-key MinimalTuple case and the
245 	 * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
246 	 */
247 	SortSupport onlyKey;
248 
249 	/*
250 	 * These variables are specific to the CLUSTER case; they are set by
251 	 * rum_tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
252 	 * indexScanKey.
253 	 */
254 	IndexInfo  *indexInfo;		/* info about index being used for reference */
255 	EState	   *estate;			/* for evaluating index expressions */
256 
257 	/*
258 	 * These variables are specific to the IndexTuple case; they are set by
259 	 * rum_tuplesort_begin_index_xxx and used only by the IndexTuple routines.
260 	 */
261 	Relation	heapRel;		/* table the index is being built on */
262 	Relation	indexRel;		/* index being built */
263 
264 	/* These are specific to the index_btree subcase: */
265 	ScanKey		indexScanKey;
266 	bool		enforceUnique;	/* complain if we find duplicate tuples */
267 
268 	/* These are specific to the index_hash subcase: */
269 	uint32		hash_mask;		/* mask for sortable part of hash code */
270 
271 	/*
272 	 * These variables are specific to the Datum case; they are set by
273 	 * rum_tuplesort_begin_datum and used only by the DatumTuple routines.
274 	 */
275 	Oid			datumType;
276 	/* we need typelen and byval in order to know how to copy the Datums. */
277 	int			datumTypeLen;
278 	bool		datumTypeByVal;
279 
280 	bool		reverse;
281 
282 	/* Do we need ItemPointer comparison in comparetup_rum()? */
283 	bool		compareItemPointer;
284 
285 	/* compare_rumitem */
286 	FmgrInfo	*cmp;
287 
288 	/*
289 	 * Resource snapshot for time of sort start.
290 	 */
291 #ifdef TRACE_SORT
292 	PGRUsage	ru_start;
293 #endif
294 };
295 
296 #define COMPARETUP(state,a,b)	((*(state)->comparetup) (a, b, state))
297 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
298 #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
299 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
300 #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
301 #define LACKMEM(state)		((state)->availMem < 0)
302 #define USEMEM(state,amt)	((state)->availMem -= (amt))
303 #define FREEMEM(state,amt)	((state)->availMem += (amt))
304 
305 /* When using this macro, beware of double evaluation of len */
306 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
307 	do { \
308 		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
309 			elog(ERROR, "unexpected end of data"); \
310 	} while(0)
311 
312 
313 static RumTuplesortstate *rum_tuplesort_begin_common(int workMem, bool randomAccess);
314 static void puttuple_common(RumTuplesortstate *state, SortTuple *tuple);
315 static void inittapes(RumTuplesortstate *state);
316 static void selectnewtape(RumTuplesortstate *state);
317 static void mergeruns(RumTuplesortstate *state);
318 static void mergeonerun(RumTuplesortstate *state);
319 static void beginmerge(RumTuplesortstate *state);
320 static void mergepreread(RumTuplesortstate *state);
321 static void mergeprereadone(RumTuplesortstate *state, int srcTape);
322 static void dumptuples(RumTuplesortstate *state, bool alltuples);
323 static void make_bounded_heap(RumTuplesortstate *state);
324 static void sort_bounded_heap(RumTuplesortstate *state);
325 static void rum_tuplesort_heap_insert(RumTuplesortstate *state, SortTuple *tuple,
326 						  int tupleindex, bool checkIndex);
327 static void rum_tuplesort_heap_siftup(RumTuplesortstate *state, bool checkIndex);
328 static unsigned int getlen(RumTuplesortstate *state, int tapenum, bool eofOK);
329 static void markrunend(RumTuplesortstate *state, int tapenum);
330 static void free_sort_tuple(RumTuplesortstate *state, SortTuple *stup);
331 static int comparetup_rum(const SortTuple *a, const SortTuple *b,
332 			   RumTuplesortstate *state);
333 static void copytup_rum(RumTuplesortstate *state, SortTuple *stup, void *tup);
334 static void writetup_rum(RumTuplesortstate *state, int tapenum,
335 			 SortTuple *stup);
336 static void readtup_rum(RumTuplesortstate *state, SortTuple *stup,
337 			int tapenum, unsigned int len);
338 static void reversedirection_rum(RumTuplesortstate *state);
339 static int comparetup_rumitem(const SortTuple *a, const SortTuple *b,
340 			   RumTuplesortstate *state);
341 static void copytup_rumitem(RumTuplesortstate *state, SortTuple *stup, void *tup);
342 static void writetup_rumitem(RumTuplesortstate *state, int tapenum,
343 			 SortTuple *stup);
344 static void readtup_rumitem(RumTuplesortstate *state, SortTuple *stup,
345 			int tapenum, unsigned int len);
346 
347 /*
348  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
349  * any variant of SortTuples, using the appropriate comparetup function.
350  * qsort_ssup() is specialized for the case where the comparetup function
351  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
352  * and Datum sorts.
353  */
354 /* #include "qsort_tuple.c" */
355 
356 static void
swapfunc(SortTuple * a,SortTuple * b,size_t n)357 swapfunc(SortTuple *a, SortTuple *b, size_t n)
358 {
359 	do
360 	{
361 		SortTuple	t = *a;
362 
363 		*a++ = *b;
364 		*b++ = t;
365 	} while (--n > 0);
366 }
367 
368 #define cmp_ssup(a, b, ssup) \
369 	ApplySortComparator((a)->datum1, (a)->isnull1, \
370 						(b)->datum1, (b)->isnull1, ssup)
371 
372 #define swap(a, b)						\
373 	do {								\
374 		SortTuple t = *(a);				\
375 		*(a) = *(b);					\
376 		*(b) = t;						\
377 	} while (0);
378 
379 #define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n)
380 
381 static SortTuple *
med3_tuple(SortTuple * a,SortTuple * b,SortTuple * c,SortTupleComparator cmp_tuple,RumTuplesortstate * state)382 med3_tuple(SortTuple *a, SortTuple *b, SortTuple *c, SortTupleComparator cmp_tuple, RumTuplesortstate *state)
383 {
384 	return cmp_tuple(a, b, state) < 0 ?
385 		(cmp_tuple(b, c, state) < 0 ? b :
386 		 (cmp_tuple(a, c, state) < 0 ? c : a))
387 		: (cmp_tuple(b, c, state) > 0 ? b :
388 		   (cmp_tuple(a, c, state) < 0 ? a : c));
389 }
390 
391 static SortTuple *
med3_ssup(SortTuple * a,SortTuple * b,SortTuple * c,SortSupport ssup)392 med3_ssup(SortTuple *a, SortTuple *b, SortTuple *c, SortSupport ssup)
393 {
394 	return cmp_ssup(a, b, ssup) < 0 ?
395 		(cmp_ssup(b, c, ssup) < 0 ? b :
396 		 (cmp_ssup(a, c, ssup) < 0 ? c : a))
397 		: (cmp_ssup(b, c, ssup) > 0 ? b :
398 		   (cmp_ssup(a, c, ssup) < 0 ? a : c));
399 }
400 
401 static void
qsort_ssup(SortTuple * a,size_t n,SortSupport ssup)402 qsort_ssup(SortTuple *a, size_t n, SortSupport ssup)
403 {
404 	SortTuple  *pa,
405 			   *pb,
406 			   *pc,
407 			   *pd,
408 			   *pl,
409 			   *pm,
410 			   *pn;
411 	size_t		d1,
412 				d2;
413 	int			r,
414 				presorted;
415 
416 loop:
417 	CHECK_FOR_INTERRUPTS();
418 	if (n < 7)
419 	{
420 		for (pm = a + 1; pm < a + n; pm++)
421 			for (pl = pm; pl > a && cmp_ssup(pl - 1, pl, ssup) > 0; pl--)
422 				swap(pl, pl - 1);
423 		return;
424 	}
425 	presorted = 1;
426 	for (pm = a + 1; pm < a + n; pm++)
427 	{
428 		CHECK_FOR_INTERRUPTS();
429 		if (cmp_ssup(pm - 1, pm, ssup) > 0)
430 		{
431 			presorted = 0;
432 			break;
433 		}
434 	}
435 	if (presorted)
436 		return;
437 	pm = a + (n / 2);
438 	if (n > 7)
439 	{
440 		pl = a;
441 		pn = a + (n - 1);
442 		if (n > 40)
443 		{
444 			size_t		d = (n / 8);
445 
446 			pl = med3_ssup(pl, pl + d, pl + 2 * d, ssup);
447 			pm = med3_ssup(pm - d, pm, pm + d, ssup);
448 			pn = med3_ssup(pn - 2 * d, pn - d, pn, ssup);
449 		}
450 		pm = med3_ssup(pl, pm, pn, ssup);
451 	}
452 	swap(a, pm);
453 	pa = pb = a + 1;
454 	pc = pd = a + (n - 1);
455 	for (;;)
456 	{
457 		while (pb <= pc && (r = cmp_ssup(pb, a, ssup)) <= 0)
458 		{
459 			if (r == 0)
460 			{
461 				swap(pa, pb);
462 				pa++;
463 			}
464 			pb++;
465 			CHECK_FOR_INTERRUPTS();
466 		}
467 		while (pb <= pc && (r = cmp_ssup(pc, a, ssup)) >= 0)
468 		{
469 			if (r == 0)
470 			{
471 				swap(pc, pd);
472 				pd--;
473 			}
474 			pc--;
475 			CHECK_FOR_INTERRUPTS();
476 		}
477 		if (pb > pc)
478 			break;
479 		swap(pb, pc);
480 		pb++;
481 		pc--;
482 	}
483 	pn = a + n;
484 	d1 = Min(pa - a, pb - pa);
485 	vecswap(a, pb - d1, d1);
486 	d1 = Min(pd - pc, pn - pd - 1);
487 	vecswap(pb, pn - d1, d1);
488 	d1 = pb - pa;
489 	d2 = pd - pc;
490 	if (d1 <= d2)
491 	{
492 		/* Recurse on left partition, then iterate on right partition */
493 		if (d1 > 1)
494 			qsort_ssup(a, d1, ssup);
495 		if (d2 > 1)
496 		{
497 			/* Iterate rather than recurse to save stack space */
498 			/* qsort_ssup(pn - d2, d2, ssup); */
499 			a = pn - d2;
500 			n = d2;
501 			goto loop;
502 		}
503 	}
504 	else
505 	{
506 		/* Recurse on right partition, then iterate on left partition */
507 		if (d2 > 1)
508 			qsort_ssup(pn - d2, d2, ssup);
509 		if (d1 > 1)
510 		{
511 			/* Iterate rather than recurse to save stack space */
512 			/* qsort_ssup(a, d1, ssup); */
513 			n = d1;
514 			goto loop;
515 		}
516 	}
517 }
518 
519 static void
qsort_tuple(SortTuple * a,size_t n,SortTupleComparator cmp_tuple,RumTuplesortstate * state)520 qsort_tuple(SortTuple *a, size_t n, SortTupleComparator cmp_tuple, RumTuplesortstate *state)
521 {
522 	SortTuple  *pa,
523 			   *pb,
524 			   *pc,
525 			   *pd,
526 			   *pl,
527 			   *pm,
528 			   *pn;
529 	size_t		d1,
530 				d2;
531 	int			r,
532 				presorted;
533 
534 loop:
535 	CHECK_FOR_INTERRUPTS();
536 	if (n < 7)
537 	{
538 		for (pm = a + 1; pm < a + n; pm++)
539 			for (pl = pm; pl > a && cmp_tuple(pl - 1, pl, state) > 0; pl--)
540 				swap(pl, pl - 1);
541 		return;
542 	}
543 	presorted = 1;
544 	for (pm = a + 1; pm < a + n; pm++)
545 	{
546 		CHECK_FOR_INTERRUPTS();
547 		if (cmp_tuple(pm - 1, pm, state) > 0)
548 		{
549 			presorted = 0;
550 			break;
551 		}
552 	}
553 	if (presorted)
554 		return;
555 	pm = a + (n / 2);
556 	if (n > 7)
557 	{
558 		pl = a;
559 		pn = a + (n - 1);
560 		if (n > 40)
561 		{
562 			size_t		d = (n / 8);
563 
564 			pl = med3_tuple(pl, pl + d, pl + 2 * d, cmp_tuple, state);
565 			pm = med3_tuple(pm - d, pm, pm + d, cmp_tuple, state);
566 			pn = med3_tuple(pn - 2 * d, pn - d, pn, cmp_tuple, state);
567 		}
568 		pm = med3_tuple(pl, pm, pn, cmp_tuple, state);
569 	}
570 	swap(a, pm);
571 	pa = pb = a + 1;
572 	pc = pd = a + (n - 1);
573 	for (;;)
574 	{
575 		while (pb <= pc && (r = cmp_tuple(pb, a, state)) <= 0)
576 		{
577 			if (r == 0)
578 			{
579 				swap(pa, pb);
580 				pa++;
581 			}
582 			pb++;
583 			CHECK_FOR_INTERRUPTS();
584 		}
585 		while (pb <= pc && (r = cmp_tuple(pc, a, state)) >= 0)
586 		{
587 			if (r == 0)
588 			{
589 				swap(pc, pd);
590 				pd--;
591 			}
592 			pc--;
593 			CHECK_FOR_INTERRUPTS();
594 		}
595 		if (pb > pc)
596 			break;
597 		swap(pb, pc);
598 		pb++;
599 		pc--;
600 	}
601 	pn = a + n;
602 	d1 = Min(pa - a, pb - pa);
603 	vecswap(a, pb - d1, d1);
604 	d1 = Min(pd - pc, pn - pd - 1);
605 	vecswap(pb, pn - d1, d1);
606 	d1 = pb - pa;
607 	d2 = pd - pc;
608 	if (d1 <= d2)
609 	{
610 		/* Recurse on left partition, then iterate on right partition */
611 		if (d1 > 1)
612 			qsort_tuple(a, d1, cmp_tuple, state);
613 		if (d2 > 1)
614 		{
615 			/* Iterate rather than recurse to save stack space */
616 			/* qsort_tuple(pn - d2, d2, cmp_tuple, state); */
617 			a = pn - d2;
618 			n = d2;
619 			goto loop;
620 		}
621 	}
622 	else
623 	{
624 		/* Recurse on right partition, then iterate on left partition */
625 		if (d2 > 1)
626 			qsort_tuple(pn - d2, d2, cmp_tuple, state);
627 		if (d1 > 1)
628 		{
629 			/* Iterate rather than recurse to save stack space */
630 			/* qsort_tuple(a, d1, cmp_tuple, state); */
631 			n = d1;
632 			goto loop;
633 		}
634 	}
635 }
636 
637 /*
638  *		rum_tuplesort_begin_xxx
639  *
640  * Initialize for a tuple sort operation.
641  *
642  * After calling rum_tuplesort_begin, the caller should call rum_tuplesort_putXXX
643  * zero or more times, then call rum_tuplesort_performsort when all the tuples
644  * have been supplied.  After performsort, retrieve the tuples in sorted
645  * order by calling rum_tuplesort_getXXX until it returns false/NULL.  (If random
646  * access was requested, rescan, markpos, and restorepos can also be called.)
647  * Call rum_tuplesort_end to terminate the operation and release memory/disk space.
648  *
649  * Each variant of rum_tuplesort_begin has a workMem parameter specifying the
650  * maximum number of kilobytes of RAM to use before spilling data to disk.
651  * (The normal value of this parameter is work_mem, but some callers use
652  * other values.)  Each variant also has a randomAccess parameter specifying
653  * whether the caller needs non-sequential access to the sort result.
654  */
655 
656 static RumTuplesortstate *
rum_tuplesort_begin_common(int workMem,bool randomAccess)657 rum_tuplesort_begin_common(int workMem, bool randomAccess)
658 {
659 	RumTuplesortstate *state;
660 	MemoryContext sortcontext;
661 	MemoryContext oldcontext;
662 
663 	/*
664 	 * Create a working memory context for this sort operation. All data
665 	 * needed by the sort will live inside this context.
666 	 */
667 	sortcontext = RumContextCreate(CurrentMemoryContext, "TupleSort");
668 
669 	/*
670 	 * Make the Tuplesortstate within the per-sort context.  This way, we
671 	 * don't need a separate pfree() operation for it at shutdown.
672 	 */
673 	oldcontext = MemoryContextSwitchTo(sortcontext);
674 
675 	state = (RumTuplesortstate *) palloc0(sizeof(RumTuplesortstate));
676 
677 #ifdef TRACE_SORT
678 	if (trace_sort)
679 		pg_rusage_init(&state->ru_start);
680 #endif
681 
682 	state->status = TSS_INITIAL;
683 	state->randomAccess = randomAccess;
684 	state->bounded = false;
685 	state->boundUsed = false;
686 	state->allowedMem = workMem * 1024L;
687 	state->availMem = state->allowedMem;
688 	state->sortcontext = sortcontext;
689 	state->tapeset = NULL;
690 
691 	state->memtupcount = 0;
692 
693 	/*
694 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
695 	 * see comments in grow_memtuples().
696 	 */
697 	state->memtupsize = Max(1024,
698 						ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
699 
700 	state->growmemtuples = true;
701 	state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
702 
703 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
704 
705 	/* workMem must be large enough for the minimal memtuples array */
706 	if (LACKMEM(state))
707 		elog(ERROR, "insufficient memory allowed for sort");
708 
709 	state->currentRun = 0;
710 
711 	/*
712 	 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
713 	 * inittapes(), if needed
714 	 */
715 
716 	state->result_tape = -1;	/* flag that result tape has not been formed */
717 
718 	MemoryContextSwitchTo(oldcontext);
719 
720 	return state;
721 }
722 
723 /*
724  * Get sort state memory context.  Currently it is used only to allocate
725  * RumSortItem.
726  */
727 MemoryContext
rum_tuplesort_get_memorycontext(RumTuplesortstate * state)728 rum_tuplesort_get_memorycontext(RumTuplesortstate *state)
729 {
730 	return state->sortcontext;
731 }
732 
733 RumTuplesortstate *
rum_tuplesort_begin_rum(int workMem,int nKeys,bool randomAccess,bool compareItemPointer)734 rum_tuplesort_begin_rum(int workMem, int nKeys, bool randomAccess,
735 						bool compareItemPointer)
736 {
737 	RumTuplesortstate *state = rum_tuplesort_begin_common(workMem, randomAccess);
738 	MemoryContext oldcontext;
739 
740 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
741 
742 #ifdef TRACE_SORT
743 	if (trace_sort)
744 		elog(LOG,
745 			 "begin rum sort: nKeys = %d, workMem = %d, randomAccess = %c",
746 			 nKeys, workMem, randomAccess ? 't' : 'f');
747 #endif
748 
749 	state->nKeys = nKeys;
750 
751 	state->comparetup = comparetup_rum;
752 	state->copytup = copytup_rum;
753 	state->writetup = writetup_rum;
754 	state->readtup = readtup_rum;
755 	state->reversedirection = reversedirection_rum;
756 	state->reverse = false;
757 	state->compareItemPointer = compareItemPointer;
758 
759 	MemoryContextSwitchTo(oldcontext);
760 
761 	return state;
762 }
763 
764 RumTuplesortstate *
rum_tuplesort_begin_rumitem(int workMem,FmgrInfo * cmp)765 rum_tuplesort_begin_rumitem(int workMem, FmgrInfo *cmp)
766 {
767 	RumTuplesortstate *state = rum_tuplesort_begin_common(workMem, false);
768 	MemoryContext oldcontext;
769 
770 	oldcontext = MemoryContextSwitchTo(state->sortcontext);
771 
772 #ifdef TRACE_SORT
773 	if (trace_sort)
774 		elog(LOG,
775 			 "begin rumitem sort: workMem = %d", workMem);
776 #endif
777 
778 	state->cmp = cmp;
779 	state->comparetup = comparetup_rumitem;
780 	state->copytup = copytup_rumitem;
781 	state->writetup = writetup_rumitem;
782 	state->readtup = readtup_rumitem;
783 	state->reversedirection = reversedirection_rum;
784 	state->reverse = false;
785 	state->compareItemPointer = false;
786 
787 	MemoryContextSwitchTo(oldcontext);
788 
789 	return state;
790 }
791 
792 /*
793  * rum_tuplesort_end
794  *
795  *	Release resources and clean up.
796  *
797  * NOTE: after calling this, any pointers returned by rum_tuplesort_getXXX are
798  * pointing to garbage.  Be careful not to attempt to use or free such
799  * pointers afterwards!
800  */
801 void
rum_tuplesort_end(RumTuplesortstate * state)802 rum_tuplesort_end(RumTuplesortstate *state)
803 {
804 	/* context swap probably not needed, but let's be safe */
805 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
806 
807 #ifdef TRACE_SORT
808 	long		spaceUsed;
809 
810 	if (state->tapeset)
811 		spaceUsed = LogicalTapeSetBlocks(state->tapeset);
812 	else
813 		spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
814 #endif
815 
816 	/*
817 	 * Delete temporary "tape" files, if any.
818 	 *
819 	 * Note: want to include this in reported total cost of sort, hence need
820 	 * for two #ifdef TRACE_SORT sections.
821 	 */
822 	if (state->tapeset)
823 		LogicalTapeSetClose(state->tapeset);
824 
825 #ifdef TRACE_SORT
826 	if (trace_sort)
827 	{
828 		if (state->tapeset)
829 			elog(LOG, "external sort ended, %ld disk blocks used: %s",
830 				 spaceUsed, pg_rusage_show(&state->ru_start));
831 		else
832 			elog(LOG, "internal sort ended, %ld KB used: %s",
833 				 spaceUsed, pg_rusage_show(&state->ru_start));
834 	}
835 #endif
836 
837 	/* Free any execution state created for CLUSTER case */
838 	if (state->estate != NULL)
839 	{
840 		ExprContext *econtext = GetPerTupleExprContext(state->estate);
841 
842 		ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
843 		FreeExecutorState(state->estate);
844 	}
845 
846 	MemoryContextSwitchTo(oldcontext);
847 
848 	/*
849 	 * Free the per-sort memory context, thereby releasing all working memory,
850 	 * including the Tuplesortstate struct itself.
851 	 */
852 	MemoryContextDelete(state->sortcontext);
853 }
854 
855 /*
856  * Grow the memtuples[] array, if possible within our memory constraint.
857  * Return true if we were able to enlarge the array, false if not.
858  *
859  * Normally, at each increment we double the size of the array.  When we no
860  * longer have enough memory to do that, we attempt one last, smaller increase
861  * (and then clear the growmemtuples flag so we don't try any more).  That
862  * allows us to use allowedMem as fully as possible; sticking to the pure
863  * doubling rule could result in almost half of allowedMem going unused.
864  * Because availMem moves around with tuple addition/removal, we need some
865  * rule to prevent making repeated small increases in memtupsize, which would
866  * just be useless thrashing.  The growmemtuples flag accomplishes that and
867  * also prevents useless recalculations in this function.
868  */
869 static bool
grow_memtuples(RumTuplesortstate * state)870 grow_memtuples(RumTuplesortstate *state)
871 {
872 	int			newmemtupsize;
873 	int			memtupsize = state->memtupsize;
874 	long		memNowUsed = state->allowedMem - state->availMem;
875 
876 	/* Forget it if we've already maxed out memtuples, per comment above */
877 	if (!state->growmemtuples)
878 		return false;
879 
880 	/* Select new value of memtupsize */
881 	if (memNowUsed <= state->availMem)
882 	{
883 		/*
884 		 * It is surely safe to double memtupsize if we've used no more than
885 		 * half of allowedMem.
886 		 *
887 		 * Note: it might seem that we need to worry about memtupsize * 2
888 		 * overflowing an int, but the MaxAllocSize clamp applied below
889 		 * ensures the existing memtupsize can't be large enough for that.
890 		 */
891 		newmemtupsize = memtupsize * 2;
892 	}
893 	else
894 	{
895 		/*
896 		 * This will be the last increment of memtupsize.  Abandon doubling
897 		 * strategy and instead increase as much as we safely can.
898 		 *
899 		 * To stay within allowedMem, we can't increase memtupsize by more
900 		 * than availMem / sizeof(SortTuple) elements.  In practice, we want
901 		 * to increase it by considerably less, because we need to leave some
902 		 * space for the tuples to which the new array slots will refer.  We
903 		 * assume the new tuples will be about the same size as the tuples
904 		 * we've already seen, and thus we can extrapolate from the space
905 		 * consumption so far to estimate an appropriate new size for the
906 		 * memtuples array.  The optimal value might be higher or lower than
907 		 * this estimate, but it's hard to know that in advance.
908 		 *
909 		 * This calculation is safe against enlarging the array so much that
910 		 * LACKMEM becomes true, because the memory currently used includes
911 		 * the present array; thus, there would be enough allowedMem for the
912 		 * new array elements even if no other memory were currently used.
913 		 *
914 		 * We do the arithmetic in float8, because otherwise the product of
915 		 * memtupsize and allowedMem could overflow.  (A little algebra shows
916 		 * that grow_ratio must be less than 2 here, so we are not risking
917 		 * integer overflow this way.)	Any inaccuracy in the result should be
918 		 * insignificant; but even if we computed a completely insane result,
919 		 * the checks below will prevent anything really bad from happening.
920 		 */
921 		double		grow_ratio;
922 
923 		grow_ratio = (double) state->allowedMem / (double) memNowUsed;
924 		newmemtupsize = (int) (memtupsize * grow_ratio);
925 
926 		/* We won't make any further enlargement attempts */
927 		state->growmemtuples = false;
928 	}
929 
930 	/* Must enlarge array by at least one element, else report failure */
931 	if (newmemtupsize <= memtupsize)
932 		goto noalloc;
933 
934 	/*
935 	 * On a 64-bit machine, allowedMem could be more than MaxAllocSize.  Clamp
936 	 * to ensure our request won't be rejected by palloc.
937 	 */
938 	if ((Size) newmemtupsize >= MaxAllocSize / sizeof(SortTuple))
939 	{
940 		newmemtupsize = (int) (MaxAllocSize / sizeof(SortTuple));
941 		state->growmemtuples = false;	/* can't grow any more */
942 	}
943 
944 	/*
945 	 * We need to be sure that we do not cause LACKMEM to become true, else
946 	 * the space management algorithm will go nuts.  The code above should
947 	 * never generate a dangerous request, but to be safe, check explicitly
948 	 * that the array growth fits within availMem.  (We could still cause
949 	 * LACKMEM if the memory chunk overhead associated with the memtuples
950 	 * array were to increase.  That shouldn't happen because we chose the
951 	 * initial array size large enough to ensure that palloc will be treating
952 	 * both old and new arrays as separate chunks.  But we'll check LACKMEM
953 	 * explicitly below just in case.)
954 	 */
955 	if (state->availMem < (long) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
956 		goto noalloc;
957 
958 	/* OK, do it */
959 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
960 	state->memtupsize = newmemtupsize;
961 	state->memtuples = (SortTuple *)
962 		repalloc(state->memtuples,
963 				 state->memtupsize * sizeof(SortTuple));
964 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
965 	if (LACKMEM(state))
966 		elog(ERROR, "unexpected out-of-memory situation in tuplesort");
967 	return true;
968 
969 noalloc:
970 	/* If for any reason we didn't realloc, shut off future attempts */
971 	state->growmemtuples = false;
972 	return false;
973 }
974 
975 void
rum_tuplesort_putrum(RumTuplesortstate * state,RumSortItem * item)976 rum_tuplesort_putrum(RumTuplesortstate *state, RumSortItem * item)
977 {
978 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
979 	SortTuple	stup;
980 
981 	/*
982 	 * Copy the given tuple into memory we control, and decrease availMem.
983 	 * Then call the common code.
984 	 */
985 	COPYTUP(state, &stup, (void *) item);
986 
987 	puttuple_common(state, &stup);
988 
989 	MemoryContextSwitchTo(oldcontext);
990 }
991 
992 void
rum_tuplesort_putrumitem(RumTuplesortstate * state,RumScanItem * item)993 rum_tuplesort_putrumitem(RumTuplesortstate *state, RumScanItem * item)
994 {
995 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
996 	SortTuple	stup;
997 
998 	/*
999 	 * Copy the given tuple into memory we control, and decrease availMem.
1000 	 * Then call the common code.
1001 	 */
1002 	COPYTUP(state, &stup, (void *) item);
1003 
1004 	puttuple_common(state, &stup);
1005 
1006 	MemoryContextSwitchTo(oldcontext);
1007 }
1008 
1009 /*
1010  * Shared code for tuple and datum cases.
1011  */
1012 static void
puttuple_common(RumTuplesortstate * state,SortTuple * tuple)1013 puttuple_common(RumTuplesortstate *state, SortTuple *tuple)
1014 {
1015 	switch (state->status)
1016 	{
1017 		case TSS_INITIAL:
1018 
1019 			/*
1020 			 * Save the tuple into the unsorted array.  First, grow the array
1021 			 * as needed.  Note that we try to grow the array when there is
1022 			 * still one free slot remaining --- if we fail, there'll still be
1023 			 * room to store the incoming tuple, and then we'll switch to
1024 			 * tape-based operation.
1025 			 */
1026 			if (state->memtupcount >= state->memtupsize - 1)
1027 			{
1028 				(void) grow_memtuples(state);
1029 				Assert(state->memtupcount < state->memtupsize);
1030 			}
1031 			state->memtuples[state->memtupcount++] = *tuple;
1032 
1033 			/*
1034 			 * Check if it's time to switch over to a bounded heapsort. We do
1035 			 * so if the input tuple count exceeds twice the desired tuple
1036 			 * count (this is a heuristic for where heapsort becomes cheaper
1037 			 * than a quicksort), or if we've just filled workMem and have
1038 			 * enough tuples to meet the bound.
1039 			 *
1040 			 * Note that once we enter TSS_BOUNDED state we will always try to
1041 			 * complete the sort that way.  In the worst case, if later input
1042 			 * tuples are larger than earlier ones, this might cause us to
1043 			 * exceed workMem significantly.
1044 			 */
1045 			if (state->bounded &&
1046 				(state->memtupcount > state->bound * 2 ||
1047 				 (state->memtupcount > state->bound && LACKMEM(state))))
1048 			{
1049 #ifdef TRACE_SORT
1050 				if (trace_sort)
1051 					elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1052 						 state->memtupcount,
1053 						 pg_rusage_show(&state->ru_start));
1054 #endif
1055 				make_bounded_heap(state);
1056 				return;
1057 			}
1058 
1059 			/*
1060 			 * Done if we still fit in available memory and have array slots.
1061 			 */
1062 			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1063 				return;
1064 
1065 			/*
1066 			 * Nope; time to switch to tape-based operation.
1067 			 */
1068 			inittapes(state);
1069 
1070 			/*
1071 			 * Dump tuples until we are back under the limit.
1072 			 */
1073 			dumptuples(state, false);
1074 			break;
1075 
1076 		case TSS_BOUNDED:
1077 
1078 			/*
1079 			 * We don't want to grow the array here, so check whether the new
1080 			 * tuple can be discarded before putting it in.  This should be a
1081 			 * good speed optimization, too, since when there are many more
1082 			 * input tuples than the bound, most input tuples can be discarded
1083 			 * with just this one comparison.  Note that because we currently
1084 			 * have the sort direction reversed, we must check for <= not >=.
1085 			 */
1086 			if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1087 			{
1088 				/* new tuple <= top of the heap, so we can discard it */
1089 				free_sort_tuple(state, tuple);
1090 				CHECK_FOR_INTERRUPTS();
1091 			}
1092 			else
1093 			{
1094 				/* discard top of heap, sift up, insert new tuple */
1095 				free_sort_tuple(state, &state->memtuples[0]);
1096 				rum_tuplesort_heap_siftup(state, false);
1097 				rum_tuplesort_heap_insert(state, tuple, 0, false);
1098 			}
1099 			break;
1100 
1101 		case TSS_BUILDRUNS:
1102 
1103 			/*
1104 			 * Insert the tuple into the heap, with run number currentRun if
1105 			 * it can go into the current run, else run number currentRun+1.
1106 			 * The tuple can go into the current run if it is >= the first
1107 			 * not-yet-output tuple.  (Actually, it could go into the current
1108 			 * run if it is >= the most recently output tuple ... but that
1109 			 * would require keeping around the tuple we last output, and it's
1110 			 * simplest to let writetup free each tuple as soon as it's
1111 			 * written.)
1112 			 *
1113 			 * Note there will always be at least one tuple in the heap at
1114 			 * this point; see dumptuples.
1115 			 */
1116 			Assert(state->memtupcount > 0);
1117 			if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
1118 				rum_tuplesort_heap_insert(state, tuple, state->currentRun, true);
1119 			else
1120 				rum_tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
1121 
1122 			/*
1123 			 * If we are over the memory limit, dump tuples till we're under.
1124 			 */
1125 			dumptuples(state, false);
1126 			break;
1127 
1128 		default:
1129 			elog(ERROR, "invalid tuplesort state");
1130 			break;
1131 	}
1132 }
1133 
1134 /*
1135  * All tuples have been provided; finish the sort.
1136  */
1137 void
rum_tuplesort_performsort(RumTuplesortstate * state)1138 rum_tuplesort_performsort(RumTuplesortstate *state)
1139 {
1140 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1141 
1142 #ifdef TRACE_SORT
1143 	if (trace_sort)
1144 		elog(LOG, "performsort starting: %s",
1145 			 pg_rusage_show(&state->ru_start));
1146 #endif
1147 
1148 	switch (state->status)
1149 	{
1150 		case TSS_INITIAL:
1151 
1152 			/*
1153 			 * We were able to accumulate all the tuples within the allowed
1154 			 * amount of memory.  Just qsort 'em and we're done.
1155 			 */
1156 			if (state->memtupcount > 1)
1157 			{
1158 				/* Can we use the single-key sort function? */
1159 				if (state->onlyKey != NULL)
1160 					qsort_ssup(state->memtuples, state->memtupcount,
1161 							   state->onlyKey);
1162 				else
1163 					qsort_tuple(state->memtuples,
1164 								state->memtupcount,
1165 								state->comparetup,
1166 								state);
1167 			}
1168 			state->current = 0;
1169 			state->eof_reached = false;
1170 			state->markpos_offset = 0;
1171 			state->markpos_eof = false;
1172 			state->status = TSS_SORTEDINMEM;
1173 			break;
1174 
1175 		case TSS_BOUNDED:
1176 
1177 			/*
1178 			 * We were able to accumulate all the tuples required for output
1179 			 * in memory, using a heap to eliminate excess tuples.  Now we
1180 			 * have to transform the heap to a properly-sorted array.
1181 			 */
1182 			sort_bounded_heap(state);
1183 			state->current = 0;
1184 			state->eof_reached = false;
1185 			state->markpos_offset = 0;
1186 			state->markpos_eof = false;
1187 			state->status = TSS_SORTEDINMEM;
1188 			break;
1189 
1190 		case TSS_BUILDRUNS:
1191 
1192 			/*
1193 			 * Finish tape-based sort.  First, flush all tuples remaining in
1194 			 * memory out to tape; then merge until we have a single remaining
1195 			 * run (or, if !randomAccess, one run per tape). Note that
1196 			 * mergeruns sets the correct state->status.
1197 			 */
1198 			dumptuples(state, true);
1199 			mergeruns(state);
1200 			state->eof_reached = false;
1201 			state->markpos_block = 0L;
1202 			state->markpos_offset = 0;
1203 			state->markpos_eof = false;
1204 			break;
1205 
1206 		default:
1207 			elog(ERROR, "invalid tuplesort state");
1208 			break;
1209 	}
1210 
1211 #ifdef TRACE_SORT
1212 	if (trace_sort)
1213 	{
1214 		if (state->status == TSS_FINALMERGE)
1215 			elog(LOG, "performsort done (except %d-way final merge): %s",
1216 				 state->activeTapes,
1217 				 pg_rusage_show(&state->ru_start));
1218 		else
1219 			elog(LOG, "performsort done: %s",
1220 				 pg_rusage_show(&state->ru_start));
1221 	}
1222 #endif
1223 
1224 	MemoryContextSwitchTo(oldcontext);
1225 }
1226 
1227 /*
1228  * Internal routine to fetch the next tuple in either forward or back
1229  * direction into *stup.  Returns false if no more tuples.
1230  * If *should_free is set, the caller must pfree stup.tuple when done with it.
1231  */
1232 static bool
rum_tuplesort_gettuple_common(RumTuplesortstate * state,bool forward,SortTuple * stup,bool * should_free)1233 rum_tuplesort_gettuple_common(RumTuplesortstate *state, bool forward,
1234 							  SortTuple *stup, bool *should_free)
1235 {
1236 	unsigned int tuplen;
1237 
1238 	switch (state->status)
1239 	{
1240 		case TSS_SORTEDINMEM:
1241 			Assert(forward || state->randomAccess);
1242 			*should_free = false;
1243 			if (forward)
1244 			{
1245 				if (state->current < state->memtupcount)
1246 				{
1247 					*stup = state->memtuples[state->current++];
1248 					return true;
1249 				}
1250 				state->eof_reached = true;
1251 
1252 				/*
1253 				 * Complain if caller tries to retrieve more tuples than
1254 				 * originally asked for in a bounded sort.  This is because
1255 				 * returning EOF here might be the wrong thing.
1256 				 */
1257 				if (state->bounded && state->current >= state->bound)
1258 					elog(ERROR, "retrieved too many tuples in a bounded sort");
1259 
1260 				return false;
1261 			}
1262 			else
1263 			{
1264 				if (state->current <= 0)
1265 					return false;
1266 
1267 				/*
1268 				 * if all tuples are fetched already then we return last
1269 				 * tuple, else - tuple before last returned.
1270 				 */
1271 				if (state->eof_reached)
1272 					state->eof_reached = false;
1273 				else
1274 				{
1275 					state->current--;	/* last returned tuple */
1276 					if (state->current <= 0)
1277 						return false;
1278 				}
1279 				*stup = state->memtuples[state->current - 1];
1280 				return true;
1281 			}
1282 			break;
1283 
1284 		case TSS_SORTEDONTAPE:
1285 			Assert(forward || state->randomAccess);
1286 			*should_free = true;
1287 			if (forward)
1288 			{
1289 				if (state->eof_reached)
1290 					return false;
1291 				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1292 				{
1293 					READTUP(state, stup, state->result_tape, tuplen);
1294 					return true;
1295 				}
1296 				else
1297 				{
1298 					state->eof_reached = true;
1299 					return false;
1300 				}
1301 			}
1302 
1303 			/*
1304 			 * Backward.
1305 			 *
1306 			 * if all tuples are fetched already then we return last tuple,
1307 			 * else - tuple before last returned.
1308 			 */
1309 			if (state->eof_reached)
1310 			{
1311 				/*
1312 				 * Seek position is pointing just past the zero tuplen at the
1313 				 * end of file; back up to fetch last tuple's ending length
1314 				 * word.  If seek fails we must have a completely empty file.
1315 				 */
1316 				if (!LogicalTapeBackspace(state->tapeset,
1317 										  state->result_tape,
1318 										  2 * sizeof(unsigned int)))
1319 					return false;
1320 				state->eof_reached = false;
1321 			}
1322 			else
1323 			{
1324 				/*
1325 				 * Back up and fetch previously-returned tuple's ending length
1326 				 * word.  If seek fails, assume we are at start of file.
1327 				 */
1328 				if (!LogicalTapeBackspace(state->tapeset,
1329 										  state->result_tape,
1330 										  sizeof(unsigned int)))
1331 					return false;
1332 				tuplen = getlen(state, state->result_tape, false);
1333 
1334 				/*
1335 				 * Back up to get ending length word of tuple before it.
1336 				 */
1337 				if (!LogicalTapeBackspace(state->tapeset,
1338 										  state->result_tape,
1339 										  tuplen + 2 * sizeof(unsigned int)))
1340 				{
1341 					/*
1342 					 * If that fails, presumably the prev tuple is the first
1343 					 * in the file.  Back up so that it becomes next to read
1344 					 * in forward direction (not obviously right, but that is
1345 					 * what in-memory case does).
1346 					 */
1347 					if (!LogicalTapeBackspace(state->tapeset,
1348 											  state->result_tape,
1349 											  tuplen + sizeof(unsigned int)))
1350 						elog(ERROR, "bogus tuple length in backward scan");
1351 					return false;
1352 				}
1353 			}
1354 
1355 			tuplen = getlen(state, state->result_tape, false);
1356 
1357 			/*
1358 			 * Now we have the length of the prior tuple, back up and read it.
1359 			 * Note: READTUP expects we are positioned after the initial
1360 			 * length word of the tuple, so back up to that point.
1361 			 */
1362 			if (!LogicalTapeBackspace(state->tapeset,
1363 									  state->result_tape,
1364 									  tuplen))
1365 				elog(ERROR, "bogus tuple length in backward scan");
1366 			READTUP(state, stup, state->result_tape, tuplen);
1367 			return true;
1368 
1369 		case TSS_FINALMERGE:
1370 			Assert(forward);
1371 			*should_free = true;
1372 
1373 			/*
1374 			 * This code should match the inner loop of mergeonerun().
1375 			 */
1376 			if (state->memtupcount > 0)
1377 			{
1378 				int			srcTape = state->memtuples[0].tupindex;
1379 				Size		tuplen;
1380 				int			tupIndex;
1381 				SortTuple  *newtup;
1382 
1383 				*stup = state->memtuples[0];
1384 				/* returned tuple is no longer counted in our memory space */
1385 				if (stup->tuple)
1386 				{
1387 					tuplen = GetMemoryChunkSpace(stup->tuple);
1388 					state->availMem += tuplen;
1389 					state->mergeavailmem[srcTape] += tuplen;
1390 				}
1391 				rum_tuplesort_heap_siftup(state, false);
1392 				if ((tupIndex = state->mergenext[srcTape]) == 0)
1393 				{
1394 					/*
1395 					 * out of preloaded data on this tape, try to read more
1396 					 *
1397 					 * Unlike mergeonerun(), we only preload from the single
1398 					 * tape that's run dry.  See mergepreread() comments.
1399 					 */
1400 					mergeprereadone(state, srcTape);
1401 
1402 					/*
1403 					 * if still no data, we've reached end of run on this tape
1404 					 */
1405 					if ((tupIndex = state->mergenext[srcTape]) == 0)
1406 						return true;
1407 				}
1408 				/* pull next preread tuple from list, insert in heap */
1409 				newtup = &state->memtuples[tupIndex];
1410 				state->mergenext[srcTape] = newtup->tupindex;
1411 				if (state->mergenext[srcTape] == 0)
1412 					state->mergelast[srcTape] = 0;
1413 				rum_tuplesort_heap_insert(state, newtup, srcTape, false);
1414 				/* put the now-unused memtuples entry on the freelist */
1415 				newtup->tupindex = state->mergefreelist;
1416 				state->mergefreelist = tupIndex;
1417 				state->mergeavailslots[srcTape]++;
1418 				return true;
1419 			}
1420 			return false;
1421 
1422 		default:
1423 			elog(ERROR, "invalid tuplesort state");
1424 			return false;		/* keep compiler quiet */
1425 	}
1426 }
1427 
1428 RumSortItem *
rum_tuplesort_getrum(RumTuplesortstate * state,bool forward,bool * should_free)1429 rum_tuplesort_getrum(RumTuplesortstate *state, bool forward, bool *should_free)
1430 {
1431 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1432 	SortTuple	stup;
1433 
1434 	if (!rum_tuplesort_gettuple_common(state, forward, &stup, should_free))
1435 		stup.tuple = NULL;
1436 
1437 	MemoryContextSwitchTo(oldcontext);
1438 
1439 	return (RumSortItem *) stup.tuple;
1440 }
1441 
1442 RumScanItem *
rum_tuplesort_getrumitem(RumTuplesortstate * state,bool forward,bool * should_free)1443 rum_tuplesort_getrumitem(RumTuplesortstate *state, bool forward, bool *should_free)
1444 {
1445 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1446 	SortTuple	stup;
1447 
1448 	if (!rum_tuplesort_gettuple_common(state, forward, &stup, should_free))
1449 		stup.tuple = NULL;
1450 
1451 	MemoryContextSwitchTo(oldcontext);
1452 
1453 	return (RumScanItem *) stup.tuple;
1454 }
1455 
1456 /*
1457  * rum_tuplesort_merge_order - report merge order we'll use for given memory
1458  * (note: "merge order" just means the number of input tapes in the merge).
1459  *
1460  * This is exported for use by the planner.  allowedMem is in bytes.
1461  */
1462 int
rum_tuplesort_merge_order(long allowedMem)1463 rum_tuplesort_merge_order(long allowedMem)
1464 {
1465 	int			mOrder;
1466 
1467 	/*
1468 	 * We need one tape for each merge input, plus another one for the output,
1469 	 * and each of these tapes needs buffer space.  In addition we want
1470 	 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
1471 	 * count).
1472 	 *
1473 	 * Note: you might be thinking we need to account for the memtuples[]
1474 	 * array in this calculation, but we effectively treat that as part of the
1475 	 * MERGE_BUFFER_SIZE workspace.
1476 	 */
1477 	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
1478 		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
1479 
1480 	/* Even in minimum memory, use at least a MINORDER merge */
1481 	mOrder = Max(mOrder, MINORDER);
1482 
1483 	return mOrder;
1484 }
1485 
1486 /*
1487  * inittapes - initialize for tape sorting.
1488  *
1489  * This is called only if we have found we don't have room to sort in memory.
1490  */
1491 static void
inittapes(RumTuplesortstate * state)1492 inittapes(RumTuplesortstate *state)
1493 {
1494 	int			maxTapes,
1495 				ntuples,
1496 				j;
1497 	long		tapeSpace;
1498 
1499 	/* Compute number of tapes to use: merge order plus 1 */
1500 	maxTapes = rum_tuplesort_merge_order(state->allowedMem) + 1;
1501 
1502 	/*
1503 	 * We must have at least 2*maxTapes slots in the memtuples[] array, else
1504 	 * we'd not have room for merge heap plus preread.  It seems unlikely that
1505 	 * this case would ever occur, but be safe.
1506 	 */
1507 	maxTapes = Min(maxTapes, state->memtupsize / 2);
1508 
1509 	state->maxTapes = maxTapes;
1510 	state->tapeRange = maxTapes - 1;
1511 
1512 #ifdef TRACE_SORT
1513 	if (trace_sort)
1514 		elog(LOG, "switching to external sort with %d tapes: %s",
1515 			 maxTapes, pg_rusage_show(&state->ru_start));
1516 #endif
1517 
1518 	/*
1519 	 * Decrease availMem to reflect the space needed for tape buffers; but
1520 	 * don't decrease it to the point that we have no room for tuples. (That
1521 	 * case is only likely to occur if sorting pass-by-value Datums; in all
1522 	 * other scenarios the memtuples[] array is unlikely to occupy more than
1523 	 * half of allowedMem.  In the pass-by-value case it's not important to
1524 	 * account for tuple space, so we don't care if LACKMEM becomes
1525 	 * inaccurate.)
1526 	 */
1527 	tapeSpace = (long) maxTapes *TAPE_BUFFER_OVERHEAD;
1528 
1529 	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1530 		USEMEM(state, tapeSpace);
1531 
1532 	/*
1533 	 * Make sure that the temp file(s) underlying the tape set are created in
1534 	 * suitable temp tablespaces.
1535 	 */
1536 	PrepareTempTablespaces();
1537 
1538 	/*
1539 	 * Create the tape set and allocate the per-tape data arrays.
1540 	 */
1541 	state->tapeset = LogicalTapeSetCreate(maxTapes);
1542 
1543 	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
1544 	state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
1545 	state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1546 	state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
1547 	state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
1548 	state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
1549 	state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
1550 	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
1551 	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
1552 
1553 	/*
1554 	 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1555 	 * marked as belonging to run number zero.
1556 	 *
1557 	 * NOTE: we pass false for checkIndex since there's no point in comparing
1558 	 * indexes in this step, even though we do intend the indexes to be part
1559 	 * of the sort key...
1560 	 */
1561 	ntuples = state->memtupcount;
1562 	state->memtupcount = 0;		/* make the heap empty */
1563 	for (j = 0; j < ntuples; j++)
1564 	{
1565 		/* Must copy source tuple to avoid possible overwrite */
1566 		SortTuple	stup = state->memtuples[j];
1567 
1568 		rum_tuplesort_heap_insert(state, &stup, 0, false);
1569 	}
1570 	Assert(state->memtupcount == ntuples);
1571 
1572 	state->currentRun = 0;
1573 
1574 	/*
1575 	 * Initialize variables of Algorithm D (step D1).
1576 	 */
1577 	for (j = 0; j < maxTapes; j++)
1578 	{
1579 		state->tp_fib[j] = 1;
1580 		state->tp_runs[j] = 0;
1581 		state->tp_dummy[j] = 1;
1582 		state->tp_tapenum[j] = j;
1583 	}
1584 	state->tp_fib[state->tapeRange] = 0;
1585 	state->tp_dummy[state->tapeRange] = 0;
1586 
1587 	state->Level = 1;
1588 	state->destTape = 0;
1589 
1590 	state->status = TSS_BUILDRUNS;
1591 }
1592 
1593 /*
1594  * selectnewtape -- select new tape for new initial run.
1595  *
1596  * This is called after finishing a run when we know another run
1597  * must be started.  This implements steps D3, D4 of Algorithm D.
1598  */
1599 static void
selectnewtape(RumTuplesortstate * state)1600 selectnewtape(RumTuplesortstate *state)
1601 {
1602 	int			j;
1603 	int			a;
1604 
1605 	/* Step D3: advance j (destTape) */
1606 	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1607 	{
1608 		state->destTape++;
1609 		return;
1610 	}
1611 	if (state->tp_dummy[state->destTape] != 0)
1612 	{
1613 		state->destTape = 0;
1614 		return;
1615 	}
1616 
1617 	/* Step D4: increase level */
1618 	state->Level++;
1619 	a = state->tp_fib[0];
1620 	for (j = 0; j < state->tapeRange; j++)
1621 	{
1622 		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1623 		state->tp_fib[j] = a + state->tp_fib[j + 1];
1624 	}
1625 	state->destTape = 0;
1626 }
1627 
1628 /*
1629  * mergeruns -- merge all the completed initial runs.
1630  *
1631  * This implements steps D5, D6 of Algorithm D.  All input data has
1632  * already been written to initial runs on tape (see dumptuples).
1633  */
1634 static void
mergeruns(RumTuplesortstate * state)1635 mergeruns(RumTuplesortstate *state)
1636 {
1637 	int			tapenum,
1638 				svTape,
1639 				svRuns,
1640 				svDummy;
1641 	int			numTapes;
1642 	int			numInputTapes;
1643 
1644 	Assert(state->status == TSS_BUILDRUNS);
1645 	Assert(state->memtupcount == 0);
1646 
1647 	/*
1648 	 * If we produced only one initial run (quite likely if the total data
1649 	 * volume is between 1X and 2X workMem), we can just use that tape as the
1650 	 * finished output, rather than doing a useless merge.  (This obvious
1651 	 * optimization is not in Knuth's algorithm.)
1652 	 */
1653 	if (state->currentRun == 1)
1654 	{
1655 		state->result_tape = state->tp_tapenum[state->destTape];
1656 		/* must freeze and rewind the finished output tape */
1657 		LogicalTapeFreeze(state->tapeset, state->result_tape);
1658 		state->status = TSS_SORTEDONTAPE;
1659 		return;
1660 	}
1661 
1662 	/*
1663 	 * If we had fewer runs than tapes, refund the memory that we imagined we
1664 	 * would need for the tape buffers of the unused tapes.
1665 	 *
1666 	 * numTapes and numInputTapes reflect the actual number of tapes we will
1667 	 * use.  Note that the output tape's tape number is maxTapes - 1, so the
1668 	 * tape numbers of the used tapes are not consecutive, and you cannot just
1669 	 * loop from 0 to numTapes to visit all used tapes!
1670 	 */
1671 	if (state->Level == 1)
1672 	{
1673 		numInputTapes = state->currentRun;
1674 		numTapes = numInputTapes + 1;
1675 		FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
1676 	}
1677 	else
1678 	{
1679 		numInputTapes = state->tapeRange;
1680 	}
1681 
1682 	state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
1683 	USEMEM(state, state->read_buffer_size * numInputTapes);
1684 
1685 	/* End of step D2: rewind all output tapes to prepare for merging */
1686 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1687 		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
1688 
1689 	for (;;)
1690 	{
1691 		/*
1692 		 * At this point we know that tape[T] is empty.  If there's just one
1693 		 * (real or dummy) run left on each input tape, then only one merge
1694 		 * pass remains.  If we don't have to produce a materialized sorted
1695 		 * tape, we can stop at this point and do the final merge on-the-fly.
1696 		 */
1697 		if (!state->randomAccess)
1698 		{
1699 			bool		allOneRun = true;
1700 
1701 			Assert(state->tp_runs[state->tapeRange] == 0);
1702 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1703 			{
1704 				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1705 				{
1706 					allOneRun = false;
1707 					break;
1708 				}
1709 			}
1710 			if (allOneRun)
1711 			{
1712 				/* Tell logtape.c we won't be writing anymore */
1713 				LogicalTapeSetForgetFreeSpace(state->tapeset);
1714 				/* Initialize for the final merge pass */
1715 				beginmerge(state);
1716 				state->status = TSS_FINALMERGE;
1717 				return;
1718 			}
1719 		}
1720 
1721 		/* Step D5: merge runs onto tape[T] until tape[P] is empty */
1722 		while (state->tp_runs[state->tapeRange - 1] ||
1723 			   state->tp_dummy[state->tapeRange - 1])
1724 		{
1725 			bool		allDummy = true;
1726 
1727 			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1728 			{
1729 				if (state->tp_dummy[tapenum] == 0)
1730 				{
1731 					allDummy = false;
1732 					break;
1733 				}
1734 			}
1735 
1736 			if (allDummy)
1737 			{
1738 				state->tp_dummy[state->tapeRange]++;
1739 				for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1740 					state->tp_dummy[tapenum]--;
1741 			}
1742 			else
1743 				mergeonerun(state);
1744 		}
1745 
1746 		/* Step D6: decrease level */
1747 		if (--state->Level == 0)
1748 			break;
1749 		/* rewind output tape T to use as new input */
1750 		LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
1751 								 state->read_buffer_size);
1752 		/* rewind used-up input tape P, and prepare it for write pass */
1753 		LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
1754 		state->tp_runs[state->tapeRange - 1] = 0;
1755 
1756 		/*
1757 		 * reassign tape units per step D6; note we no longer care about A[]
1758 		 */
1759 		svTape = state->tp_tapenum[state->tapeRange];
1760 		svDummy = state->tp_dummy[state->tapeRange];
1761 		svRuns = state->tp_runs[state->tapeRange];
1762 		for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
1763 		{
1764 			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1765 			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1766 			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1767 		}
1768 		state->tp_tapenum[0] = svTape;
1769 		state->tp_dummy[0] = svDummy;
1770 		state->tp_runs[0] = svRuns;
1771 	}
1772 
1773 	/*
1774 	 * Done.  Knuth says that the result is on TAPE[1], but since we exited
1775 	 * the loop without performing the last iteration of step D6, we have not
1776 	 * rearranged the tape unit assignment, and therefore the result is on
1777 	 * TAPE[T].  We need to do it this way so that we can freeze the final
1778 	 * output tape while rewinding it.  The last iteration of step D6 would be
1779 	 * a waste of cycles anyway...
1780 	 */
1781 	state->result_tape = state->tp_tapenum[state->tapeRange];
1782 	LogicalTapeFreeze(state->tapeset, state->result_tape);
1783 	state->status = TSS_SORTEDONTAPE;
1784 }
1785 
1786 /*
1787  * Merge one run from each input tape, except ones with dummy runs.
1788  *
1789  * This is the inner loop of Algorithm D step D5.  We know that the
1790  * output tape is TAPE[T].
1791  */
1792 static void
mergeonerun(RumTuplesortstate * state)1793 mergeonerun(RumTuplesortstate *state)
1794 {
1795 	int			destTape = state->tp_tapenum[state->tapeRange];
1796 	int			srcTape;
1797 	int			tupIndex;
1798 	SortTuple  *tup;
1799 	long		priorAvail,
1800 				spaceFreed;
1801 
1802 	/*
1803 	 * Start the merge by loading one tuple from each active source tape into
1804 	 * the heap.  We can also decrease the input run/dummy run counts.
1805 	 */
1806 	beginmerge(state);
1807 
1808 	/*
1809 	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
1810 	 * out, and replacing it with next tuple from same tape (if there is
1811 	 * another one).
1812 	 */
1813 	while (state->memtupcount > 0)
1814 	{
1815 		/* write the tuple to destTape */
1816 		priorAvail = state->availMem;
1817 		srcTape = state->memtuples[0].tupindex;
1818 		WRITETUP(state, destTape, &state->memtuples[0]);
1819 		/* writetup adjusted total free space, now fix per-tape space */
1820 		spaceFreed = state->availMem - priorAvail;
1821 		state->mergeavailmem[srcTape] += spaceFreed;
1822 		/* compact the heap */
1823 		rum_tuplesort_heap_siftup(state, false);
1824 		if ((tupIndex = state->mergenext[srcTape]) == 0)
1825 		{
1826 			/* out of preloaded data on this tape, try to read more */
1827 			mergepreread(state);
1828 			/* if still no data, we've reached end of run on this tape */
1829 			if ((tupIndex = state->mergenext[srcTape]) == 0)
1830 				continue;
1831 		}
1832 		/* pull next preread tuple from list, insert in heap */
1833 		tup = &state->memtuples[tupIndex];
1834 		state->mergenext[srcTape] = tup->tupindex;
1835 		if (state->mergenext[srcTape] == 0)
1836 			state->mergelast[srcTape] = 0;
1837 		rum_tuplesort_heap_insert(state, tup, srcTape, false);
1838 		/* put the now-unused memtuples entry on the freelist */
1839 		tup->tupindex = state->mergefreelist;
1840 		state->mergefreelist = tupIndex;
1841 		state->mergeavailslots[srcTape]++;
1842 	}
1843 
1844 	/*
1845 	 * When the heap empties, we're done.  Write an end-of-run marker on the
1846 	 * output tape, and increment its count of real runs.
1847 	 */
1848 	markrunend(state, destTape);
1849 	state->tp_runs[state->tapeRange]++;
1850 
1851 #ifdef TRACE_SORT
1852 	if (trace_sort)
1853 		elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
1854 			 pg_rusage_show(&state->ru_start));
1855 #endif
1856 }
1857 
1858 /*
1859  * beginmerge - initialize for a merge pass
1860  *
1861  * We decrease the counts of real and dummy runs for each tape, and mark
1862  * which tapes contain active input runs in mergeactive[].  Then, load
1863  * as many tuples as we can from each active input tape, and finally
1864  * fill the merge heap with the first tuple from each active tape.
1865  */
1866 static void
beginmerge(RumTuplesortstate * state)1867 beginmerge(RumTuplesortstate *state)
1868 {
1869 	int			activeTapes;
1870 	int			tapenum;
1871 	int			srcTape;
1872 	int			slotsPerTape;
1873 	long		spacePerTape;
1874 
1875 	/* Heap should be empty here */
1876 	Assert(state->memtupcount == 0);
1877 
1878 	/* Adjust run counts and mark the active tapes */
1879 	memset(state->mergeactive, 0,
1880 		   state->maxTapes * sizeof(*state->mergeactive));
1881 	activeTapes = 0;
1882 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1883 	{
1884 		if (state->tp_dummy[tapenum] > 0)
1885 			state->tp_dummy[tapenum]--;
1886 		else
1887 		{
1888 			Assert(state->tp_runs[tapenum] > 0);
1889 			state->tp_runs[tapenum]--;
1890 			srcTape = state->tp_tapenum[tapenum];
1891 			state->mergeactive[srcTape] = true;
1892 			activeTapes++;
1893 		}
1894 	}
1895 	state->activeTapes = activeTapes;
1896 
1897 	/* Clear merge-pass state variables */
1898 	memset(state->mergenext, 0,
1899 		   state->maxTapes * sizeof(*state->mergenext));
1900 	memset(state->mergelast, 0,
1901 		   state->maxTapes * sizeof(*state->mergelast));
1902 	state->mergefreelist = 0;	/* nothing in the freelist */
1903 	state->mergefirstfree = activeTapes;		/* 1st slot avail for preread */
1904 
1905 	/*
1906 	 * Initialize space allocation to let each active input tape have an equal
1907 	 * share of preread space.
1908 	 */
1909 	Assert(activeTapes > 0);
1910 	slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
1911 	Assert(slotsPerTape > 0);
1912 	spacePerTape = state->availMem / activeTapes;
1913 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1914 	{
1915 		if (state->mergeactive[srcTape])
1916 		{
1917 			state->mergeavailslots[srcTape] = slotsPerTape;
1918 			state->mergeavailmem[srcTape] = spacePerTape;
1919 		}
1920 	}
1921 
1922 	/*
1923 	 * Preread as many tuples as possible (and at least one) from each active
1924 	 * tape
1925 	 */
1926 	mergepreread(state);
1927 
1928 	/* Load the merge heap with the first tuple from each input tape */
1929 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1930 	{
1931 		int			tupIndex = state->mergenext[srcTape];
1932 		SortTuple  *tup;
1933 
1934 		if (tupIndex)
1935 		{
1936 			tup = &state->memtuples[tupIndex];
1937 			state->mergenext[srcTape] = tup->tupindex;
1938 			if (state->mergenext[srcTape] == 0)
1939 				state->mergelast[srcTape] = 0;
1940 			rum_tuplesort_heap_insert(state, tup, srcTape, false);
1941 			/* put the now-unused memtuples entry on the freelist */
1942 			tup->tupindex = state->mergefreelist;
1943 			state->mergefreelist = tupIndex;
1944 			state->mergeavailslots[srcTape]++;
1945 		}
1946 	}
1947 }
1948 
1949 /*
1950  * mergepreread - load tuples from merge input tapes
1951  *
1952  * This routine exists to improve sequentiality of reads during a merge pass,
1953  * as explained in the header comments of this file.  Load tuples from each
1954  * active source tape until the tape's run is exhausted or it has used up
1955  * its fair share of available memory.  In any case, we guarantee that there
1956  * is at least one preread tuple available from each unexhausted input tape.
1957  *
1958  * We invoke this routine at the start of a merge pass for initial load,
1959  * and then whenever any tape's preread data runs out.  Note that we load
1960  * as much data as possible from all tapes, not just the one that ran out.
1961  * This is because logtape.c works best with a usage pattern that alternates
1962  * between reading a lot of data and writing a lot of data, so whenever we
1963  * are forced to read, we should fill working memory completely.
1964  *
1965  * In FINALMERGE state, we *don't* use this routine, but instead just preread
1966  * from the single tape that ran dry.  There's no read/write alternation in
1967  * that state and so no point in scanning through all the tapes to fix one.
1968  * (Moreover, there may be quite a lot of inactive tapes in that state, since
1969  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
1970  * merge we can expect most of the tapes to be active.)
1971  */
1972 static void
mergepreread(RumTuplesortstate * state)1973 mergepreread(RumTuplesortstate *state)
1974 {
1975 	int			srcTape;
1976 
1977 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1978 		mergeprereadone(state, srcTape);
1979 }
1980 
1981 /*
1982  * mergeprereadone - load tuples from one merge input tape
1983  *
1984  * Read tuples from the specified tape until it has used up its free memory
1985  * or array slots; but ensure that we have at least one tuple, if any are
1986  * to be had.
1987  */
1988 static void
mergeprereadone(RumTuplesortstate * state,int srcTape)1989 mergeprereadone(RumTuplesortstate *state, int srcTape)
1990 {
1991 	unsigned int tuplen;
1992 	SortTuple	stup;
1993 	int			tupIndex;
1994 	long		priorAvail,
1995 				spaceUsed;
1996 
1997 	if (!state->mergeactive[srcTape])
1998 		return;					/* tape's run is already exhausted */
1999 	priorAvail = state->availMem;
2000 	state->availMem = state->mergeavailmem[srcTape];
2001 	while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
2002 		   state->mergenext[srcTape] == 0)
2003 	{
2004 		/* read next tuple, if any */
2005 		if ((tuplen = getlen(state, srcTape, true)) == 0)
2006 		{
2007 			state->mergeactive[srcTape] = false;
2008 			break;
2009 		}
2010 		READTUP(state, &stup, srcTape, tuplen);
2011 		/* find a free slot in memtuples[] for it */
2012 		tupIndex = state->mergefreelist;
2013 		if (tupIndex)
2014 			state->mergefreelist = state->memtuples[tupIndex].tupindex;
2015 		else
2016 		{
2017 			tupIndex = state->mergefirstfree++;
2018 			Assert(tupIndex < state->memtupsize);
2019 		}
2020 		state->mergeavailslots[srcTape]--;
2021 		/* store tuple, append to list for its tape */
2022 		stup.tupindex = 0;
2023 		state->memtuples[tupIndex] = stup;
2024 		if (state->mergelast[srcTape])
2025 			state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
2026 		else
2027 			state->mergenext[srcTape] = tupIndex;
2028 		state->mergelast[srcTape] = tupIndex;
2029 	}
2030 	/* update per-tape and global availmem counts */
2031 	spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
2032 	state->mergeavailmem[srcTape] = state->availMem;
2033 	state->availMem = priorAvail - spaceUsed;
2034 }
2035 
2036 /*
2037  * dumptuples - remove tuples from heap and write to tape
2038  *
2039  * This is used during initial-run building, but not during merging.
2040  *
2041  * When alltuples = false, dump only enough tuples to get under the
2042  * availMem limit (and leave at least one tuple in the heap in any case,
2043  * since puttuple assumes it always has a tuple to compare to).  We also
2044  * insist there be at least one free slot in the memtuples[] array.
2045  *
2046  * When alltuples = true, dump everything currently in memory.
2047  * (This case is only used at end of input data.)
2048  *
2049  * If we empty the heap, close out the current run and return (this should
2050  * only happen at end of input data).  If we see that the tuple run number
2051  * at the top of the heap has changed, start a new run.
2052  */
2053 static void
dumptuples(RumTuplesortstate * state,bool alltuples)2054 dumptuples(RumTuplesortstate *state, bool alltuples)
2055 {
2056 	while (alltuples ||
2057 		   (LACKMEM(state) && state->memtupcount > 1) ||
2058 		   state->memtupcount >= state->memtupsize)
2059 	{
2060 		/*
2061 		 * Dump the heap's frontmost entry, and sift up to remove it from the
2062 		 * heap.
2063 		 */
2064 		Assert(state->memtupcount > 0);
2065 		WRITETUP(state, state->tp_tapenum[state->destTape],
2066 				 &state->memtuples[0]);
2067 		rum_tuplesort_heap_siftup(state, true);
2068 
2069 		/*
2070 		 * If the heap is empty *or* top run number has changed, we've
2071 		 * finished the current run.
2072 		 */
2073 		if (state->memtupcount == 0 ||
2074 			state->currentRun != state->memtuples[0].tupindex)
2075 		{
2076 			markrunend(state, state->tp_tapenum[state->destTape]);
2077 			state->currentRun++;
2078 			state->tp_runs[state->destTape]++;
2079 			state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
2080 
2081 #ifdef TRACE_SORT
2082 			if (trace_sort)
2083 				elog(LOG, "finished writing%s run %d to tape %d: %s",
2084 					 (state->memtupcount == 0) ? " final" : "",
2085 					 state->currentRun, state->destTape,
2086 					 pg_rusage_show(&state->ru_start));
2087 #endif
2088 
2089 			/*
2090 			 * Done if heap is empty, else prepare for new run.
2091 			 */
2092 			if (state->memtupcount == 0)
2093 				break;
2094 			Assert(state->currentRun == state->memtuples[0].tupindex);
2095 			selectnewtape(state);
2096 		}
2097 	}
2098 }
2099 
2100 
2101 /*
2102  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2103  *
2104  * Compare two SortTuples.  If checkIndex is true, use the tuple index
2105  * as the front of the sort key; otherwise, no.
2106  */
2107 
2108 #define HEAPCOMPARE(tup1,tup2) \
2109 	(checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
2110 	 ((tup1)->tupindex) - ((tup2)->tupindex) : \
2111 	 COMPARETUP(state, tup1, tup2))
2112 
2113 /*
2114  * Convert the existing unordered array of SortTuples to a bounded heap,
2115  * discarding all but the smallest "state->bound" tuples.
2116  *
2117  * When working with a bounded heap, we want to keep the largest entry
2118  * at the root (array entry zero), instead of the smallest as in the normal
2119  * sort case.  This allows us to discard the largest entry cheaply.
2120  * Therefore, we temporarily reverse the sort direction.
2121  *
2122  * We assume that all entries in a bounded heap will always have tupindex
2123  * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
2124  * the direction of comparison for tupindexes.
2125  */
2126 static void
make_bounded_heap(RumTuplesortstate * state)2127 make_bounded_heap(RumTuplesortstate *state)
2128 {
2129 	int			tupcount = state->memtupcount;
2130 	int			i;
2131 
2132 	Assert(state->status == TSS_INITIAL);
2133 	Assert(state->bounded);
2134 	Assert(tupcount >= state->bound);
2135 
2136 	/* Reverse sort direction so largest entry will be at root */
2137 	REVERSEDIRECTION(state);
2138 
2139 	state->memtupcount = 0;		/* make the heap empty */
2140 	for (i = 0; i < tupcount; i++)
2141 	{
2142 		if (state->memtupcount >= state->bound &&
2143 		  COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2144 		{
2145 			/* New tuple would just get thrown out, so skip it */
2146 			free_sort_tuple(state, &state->memtuples[i]);
2147 			CHECK_FOR_INTERRUPTS();
2148 		}
2149 		else
2150 		{
2151 			/* Insert next tuple into heap */
2152 			/* Must copy source tuple to avoid possible overwrite */
2153 			SortTuple	stup = state->memtuples[i];
2154 
2155 			rum_tuplesort_heap_insert(state, &stup, 0, false);
2156 
2157 			/* If heap too full, discard largest entry */
2158 			if (state->memtupcount > state->bound)
2159 			{
2160 				free_sort_tuple(state, &state->memtuples[0]);
2161 				rum_tuplesort_heap_siftup(state, false);
2162 			}
2163 		}
2164 	}
2165 
2166 	Assert(state->memtupcount == state->bound);
2167 	state->status = TSS_BOUNDED;
2168 }
2169 
2170 /*
2171  * Convert the bounded heap to a properly-sorted array
2172  */
2173 static void
sort_bounded_heap(RumTuplesortstate * state)2174 sort_bounded_heap(RumTuplesortstate *state)
2175 {
2176 	int			tupcount = state->memtupcount;
2177 
2178 	Assert(state->status == TSS_BOUNDED);
2179 	Assert(state->bounded);
2180 	Assert(tupcount == state->bound);
2181 
2182 	/*
2183 	 * We can unheapify in place because each sift-up will remove the largest
2184 	 * entry, which we can promptly store in the newly freed slot at the end.
2185 	 * Once we're down to a single-entry heap, we're done.
2186 	 */
2187 	while (state->memtupcount > 1)
2188 	{
2189 		SortTuple	stup = state->memtuples[0];
2190 
2191 		/* this sifts-up the next-largest entry and decreases memtupcount */
2192 		rum_tuplesort_heap_siftup(state, false);
2193 		state->memtuples[state->memtupcount] = stup;
2194 	}
2195 	state->memtupcount = tupcount;
2196 
2197 	/*
2198 	 * Reverse sort direction back to the original state.  This is not
2199 	 * actually necessary but seems like a good idea for tidiness.
2200 	 */
2201 	REVERSEDIRECTION(state);
2202 
2203 	state->status = TSS_SORTEDINMEM;
2204 	state->boundUsed = true;
2205 }
2206 
2207 /*
2208  * Insert a new tuple into an empty or existing heap, maintaining the
2209  * heap invariant.  Caller is responsible for ensuring there's room.
2210  *
2211  * Note: we assume *tuple is a temporary variable that can be scribbled on.
2212  * For some callers, tuple actually points to a memtuples[] entry above the
2213  * end of the heap.  This is safe as long as it's not immediately adjacent
2214  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2215  * is, it might get overwritten before being moved into the heap!
2216  */
2217 static void
rum_tuplesort_heap_insert(RumTuplesortstate * state,SortTuple * tuple,int tupleindex,bool checkIndex)2218 rum_tuplesort_heap_insert(RumTuplesortstate *state, SortTuple *tuple,
2219 						  int tupleindex, bool checkIndex)
2220 {
2221 	SortTuple  *memtuples;
2222 	int			j;
2223 
2224 	/*
2225 	 * Save the tupleindex --- see notes above about writing on *tuple. It's a
2226 	 * historical artifact that tupleindex is passed as a separate argument
2227 	 * and not in *tuple, but it's notationally convenient so let's leave it
2228 	 * that way.
2229 	 */
2230 	tuple->tupindex = tupleindex;
2231 
2232 	memtuples = state->memtuples;
2233 	Assert(state->memtupcount < state->memtupsize);
2234 
2235 	CHECK_FOR_INTERRUPTS();
2236 
2237 	/*
2238 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2239 	 * using 1-based array indexes, not 0-based.
2240 	 */
2241 	j = state->memtupcount++;
2242 	while (j > 0)
2243 	{
2244 		int			i = (j - 1) >> 1;
2245 
2246 		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
2247 			break;
2248 		memtuples[j] = memtuples[i];
2249 		j = i;
2250 	}
2251 	memtuples[j] = *tuple;
2252 }
2253 
2254 /*
2255  * The tuple at state->memtuples[0] has been removed from the heap.
2256  * Decrement memtupcount, and sift up to maintain the heap invariant.
2257  */
2258 static void
rum_tuplesort_heap_siftup(RumTuplesortstate * state,bool checkIndex)2259 rum_tuplesort_heap_siftup(RumTuplesortstate *state, bool checkIndex)
2260 {
2261 	SortTuple  *memtuples = state->memtuples;
2262 	SortTuple  *tuple;
2263 	int			i,
2264 				n;
2265 
2266 	if (--state->memtupcount <= 0)
2267 		return;
2268 
2269 	CHECK_FOR_INTERRUPTS();
2270 
2271 	n = state->memtupcount;
2272 	tuple = &memtuples[n];		/* tuple that must be reinserted */
2273 	i = 0;						/* i is where the "hole" is */
2274 	for (;;)
2275 	{
2276 		int			j = 2 * i + 1;
2277 
2278 		if (j >= n)
2279 			break;
2280 		if (j + 1 < n &&
2281 			HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
2282 			j++;
2283 		if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
2284 			break;
2285 		memtuples[i] = memtuples[j];
2286 		i = j;
2287 	}
2288 	memtuples[i] = *tuple;
2289 }
2290 
2291 
2292 /*
2293  * Tape interface routines
2294  */
2295 
2296 static unsigned int
getlen(RumTuplesortstate * state,int tapenum,bool eofOK)2297 getlen(RumTuplesortstate *state, int tapenum, bool eofOK)
2298 {
2299 	unsigned int len;
2300 
2301 	if (LogicalTapeRead(state->tapeset, tapenum,
2302 						&len, sizeof(len)) != sizeof(len))
2303 		elog(ERROR, "unexpected end of tape");
2304 	if (len == 0 && !eofOK)
2305 		elog(ERROR, "unexpected end of data");
2306 	return len;
2307 }
2308 
2309 static void
markrunend(RumTuplesortstate * state,int tapenum)2310 markrunend(RumTuplesortstate *state, int tapenum)
2311 {
2312 	unsigned int len = 0;
2313 
2314 	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
2315 }
2316 
2317 
2318 /*
2319  * Convenience routine to free a tuple previously loaded into sort memory
2320  */
2321 static void
free_sort_tuple(RumTuplesortstate * state,SortTuple * stup)2322 free_sort_tuple(RumTuplesortstate *state, SortTuple *stup)
2323 {
2324 	FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
2325 	pfree(stup->tuple);
2326 }
2327 
2328 static int
comparetup_rum(const SortTuple * a,const SortTuple * b,RumTuplesortstate * state)2329 comparetup_rum(const SortTuple *a, const SortTuple *b, RumTuplesortstate *state)
2330 {
2331 	RumSortItem *i1,
2332 			   *i2;
2333 	float8		v1 = DatumGetFloat8(a->datum1);
2334 	float8		v2 = DatumGetFloat8(b->datum1);
2335 	int			i;
2336 
2337 	if (v1 < v2)
2338 		return -1;
2339 	else if (v1 > v2)
2340 		return 1;
2341 
2342 	i1 = (RumSortItem *) a->tuple;
2343 	i2 = (RumSortItem *) b->tuple;
2344 	for (i = 1; i < state->nKeys; i++)
2345 	{
2346 		if (i1->data[i] < i2->data[i])
2347 			return -1;
2348 		else if (i1->data[i] > i2->data[i])
2349 			return 1;
2350 	}
2351 
2352 	if (!state->compareItemPointer)
2353 		return 0;
2354 
2355 	/*
2356 	 * If key values are equal, we sort on ItemPointer.
2357 	 */
2358 	if (i1->iptr.ip_blkid.bi_hi < i2->iptr.ip_blkid.bi_hi)
2359 		return -1;
2360 	else if (i1->iptr.ip_blkid.bi_hi > i2->iptr.ip_blkid.bi_hi)
2361 		return 1;
2362 
2363 	if (i1->iptr.ip_blkid.bi_lo < i2->iptr.ip_blkid.bi_lo)
2364 		return -1;
2365 	else if (i1->iptr.ip_blkid.bi_lo > i2->iptr.ip_blkid.bi_lo)
2366 		return 1;
2367 
2368 	if (i1->iptr.ip_posid < i2->iptr.ip_posid)
2369 		return -1;
2370 	else if (i1->iptr.ip_posid > i2->iptr.ip_posid)
2371 		return 1;
2372 
2373 	return 0;
2374 }
2375 
2376 static void
copytup_rum(RumTuplesortstate * state,SortTuple * stup,void * tup)2377 copytup_rum(RumTuplesortstate *state, SortTuple *stup, void *tup)
2378 {
2379 	RumSortItem *item = (RumSortItem *) tup;
2380 
2381 	stup->datum1 = Float8GetDatum(state->nKeys > 0 ? item->data[0] : 0);
2382 	stup->isnull1 = false;
2383 	stup->tuple = tup;
2384 	USEMEM(state, GetMemoryChunkSpace(tup));
2385 }
2386 
2387 static void
writetup_rum(RumTuplesortstate * state,int tapenum,SortTuple * stup)2388 writetup_rum(RumTuplesortstate *state, int tapenum, SortTuple *stup)
2389 {
2390 	RumSortItem *item = (RumSortItem *) stup->tuple;
2391 	unsigned int writtenlen = RumSortItemSize(state->nKeys) + sizeof(unsigned int);
2392 
2393 
2394 	LogicalTapeWrite(state->tapeset, tapenum,
2395 					 (void *) &writtenlen, sizeof(writtenlen));
2396 	LogicalTapeWrite(state->tapeset, tapenum,
2397 					 (void *) item, RumSortItemSize(state->nKeys));
2398 	if (state->randomAccess)	/* need trailing length word? */
2399 		LogicalTapeWrite(state->tapeset, tapenum,
2400 						 (void *) &writtenlen, sizeof(writtenlen));
2401 
2402 	FREEMEM(state, GetMemoryChunkSpace(item));
2403 	pfree(item);
2404 }
2405 
2406 static void
readtup_rum(RumTuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)2407 readtup_rum(RumTuplesortstate *state, SortTuple *stup,
2408 			int tapenum, unsigned int len)
2409 {
2410 	unsigned int tuplen = len - sizeof(unsigned int);
2411 	RumSortItem *item = (RumSortItem *) palloc(RumSortItemSize(state->nKeys));
2412 
2413 	Assert(tuplen == RumSortItemSize(state->nKeys));
2414 
2415 	USEMEM(state, GetMemoryChunkSpace(item));
2416 	LogicalTapeReadExact(state->tapeset, tapenum,
2417 						 (void *) item, RumSortItemSize(state->nKeys));
2418 	stup->datum1 = Float8GetDatum(state->nKeys > 0 ? item->data[0] : 0);
2419 	stup->isnull1 = false;
2420 	stup->tuple = item;
2421 
2422 	if (state->randomAccess)	/* need trailing length word? */
2423 		LogicalTapeReadExact(state->tapeset, tapenum,
2424 							 &tuplen, sizeof(tuplen));
2425 }
2426 
2427 static void
reversedirection_rum(RumTuplesortstate * state)2428 reversedirection_rum(RumTuplesortstate *state)
2429 {
2430 	state->reverse = !state->reverse;
2431 }
2432 
2433 static int
comparetup_rumitem(const SortTuple * a,const SortTuple * b,RumTuplesortstate * state)2434 comparetup_rumitem(const SortTuple *a, const SortTuple *b, RumTuplesortstate *state)
2435 {
2436 	RumItem	   *i1, *i2;
2437 
2438 	/* Extract RumItem from RumScanItem */
2439 	i1 = (RumItem *) a->tuple;
2440 	i2 = (RumItem *) b->tuple;
2441 
2442 	if (state->cmp)
2443 	{
2444 		if (i1->addInfoIsNull || i2->addInfoIsNull)
2445 		{
2446 			if (!(i1->addInfoIsNull && i2->addInfoIsNull))
2447 				return (i1->addInfoIsNull) ? 1 : -1;
2448 			/* go to itempointer compare */
2449 		}
2450 		else
2451 		{
2452 			int r;
2453 
2454 			r = DatumGetInt32(FunctionCall2(state->cmp,
2455 											i1->addInfo,
2456 											i2->addInfo));
2457 
2458 			if (r != 0)
2459 				return r;
2460 		}
2461 	}
2462 
2463 	/*
2464 	 * If key values are equal, we sort on ItemPointer.
2465 	 */
2466 	if (i1->iptr.ip_blkid.bi_hi < i2->iptr.ip_blkid.bi_hi)
2467 		return -1;
2468 	else if (i1->iptr.ip_blkid.bi_hi > i2->iptr.ip_blkid.bi_hi)
2469 		return 1;
2470 
2471 	if (i1->iptr.ip_blkid.bi_lo < i2->iptr.ip_blkid.bi_lo)
2472 		return -1;
2473 	else if (i1->iptr.ip_blkid.bi_lo > i2->iptr.ip_blkid.bi_lo)
2474 		return 1;
2475 
2476 	if (i1->iptr.ip_posid < i2->iptr.ip_posid)
2477 		return -1;
2478 	else if (i1->iptr.ip_posid > i2->iptr.ip_posid)
2479 		return 1;
2480 
2481 	return 0;
2482 }
2483 
2484 static void
copytup_rumitem(RumTuplesortstate * state,SortTuple * stup,void * tup)2485 copytup_rumitem(RumTuplesortstate *state, SortTuple *stup, void *tup)
2486 {
2487 	stup->isnull1 = true;
2488 	stup->tuple = palloc(sizeof(RumScanItem));
2489 	memcpy(stup->tuple, tup, sizeof(RumScanItem));
2490 	USEMEM(state, GetMemoryChunkSpace(stup->tuple));
2491 }
2492 
2493 static void
writetup_rumitem(RumTuplesortstate * state,int tapenum,SortTuple * stup)2494 writetup_rumitem(RumTuplesortstate *state, int tapenum, SortTuple *stup)
2495 {
2496 	RumScanItem *item = (RumScanItem *) stup->tuple;
2497 	unsigned int writtenlen = sizeof(*item) + sizeof(unsigned int);
2498 
2499 	LogicalTapeWrite(state->tapeset, tapenum,
2500 					 (void *) &writtenlen, sizeof(writtenlen));
2501 	LogicalTapeWrite(state->tapeset, tapenum,
2502 					 (void *) item, sizeof(*item));
2503 	if (state->randomAccess)	/* need trailing length word? */
2504 		LogicalTapeWrite(state->tapeset, tapenum,
2505 						 (void *) &writtenlen, sizeof(writtenlen));
2506 
2507 	FREEMEM(state, GetMemoryChunkSpace(item));
2508 	pfree(item);
2509 }
2510 
2511 static void
readtup_rumitem(RumTuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)2512 readtup_rumitem(RumTuplesortstate *state, SortTuple *stup,
2513 			int tapenum, unsigned int len)
2514 {
2515 	unsigned int tuplen = len - sizeof(unsigned int);
2516 	RumScanItem *item = (RumScanItem *) palloc(sizeof(RumScanItem));
2517 
2518 	Assert(tuplen == sizeof(RumScanItem));
2519 
2520 	USEMEM(state, GetMemoryChunkSpace(item));
2521 	LogicalTapeReadExact(state->tapeset, tapenum,
2522 						 (void *) item, tuplen);
2523 	stup->isnull1 = true;
2524 	stup->tuple = item;
2525 
2526 	if (state->randomAccess)	/* need trailing length word? */
2527 		LogicalTapeReadExact(state->tapeset, tapenum,
2528 							 &tuplen, sizeof(tuplen));
2529 }
2530 
2531