1 /*-------------------------------------------------------------------------
2 *
3 * rumsort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module handles sorting of RumSortItem or RumScanItem structures.
7 * It contains copy of static functions from
8 * src/backend/utils/sort/tuplesort.c.
9 *
10 *
11 * Portions Copyright (c) 2015-2019, Postgres Professional
12 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
13 * Portions Copyright (c) 1994, Regents of the University of California
14 *
15 *-------------------------------------------------------------------------
16 */
17
18 #include "postgres.h"
19 #include "miscadmin.h"
20 #include "rumsort.h"
21
22 #include "commands/tablespace.h"
23 #include "executor/executor.h"
24 #include "utils/logtape.h"
25 #include "utils/pg_rusage.h"
26
27 #include "rum.h" /* RumItem */
28
29 /* sort-type codes for sort__start probes */
30 #define HEAP_SORT 0
31 #define INDEX_SORT 1
32 #define DATUM_SORT 2
33 #define CLUSTER_SORT 3
34
35 #if PG_VERSION_NUM < 100000
36 /* Provide fallback for old version of tape interface for 9.6 */
37 #define LogicalTapeRewindForRead(x, y, z) LogicalTapeRewind((x), (y), false)
38 #define LogicalTapeRewindForWrite(x, y) LogicalTapeRewind((x), (y), true)
39 #endif
40
41 #if PG_VERSION_NUM >= 110000
42 #if PG_VERSION_NUM >= 130000
43 #define LogicalTapeSetCreate(X) LogicalTapeSetCreate(X, false, NULL, NULL, 1)
44 #else
45 #define LogicalTapeSetCreate(X) LogicalTapeSetCreate(X, NULL, NULL, 1)
46 #endif
47 #define LogicalTapeFreeze(X, Y) LogicalTapeFreeze(X, Y, NULL)
48 #endif
49
50 /*
51 * Below are copied definitions from src/backend/utils/sort/tuplesort.c.
52 */
53
54 /* For PGPRO since v.13 trace_sort is imported from backend by including its
55 * declaration in guc.h (guc.h contains added Windows export/import magic to be done
56 * during postgres.exe compilation).
57 * For older or non-PGPRO versions on Windows platform trace_sort is not exported by
58 * backend so it is declared local for this case.
59 */
60 #ifdef TRACE_SORT
61 #if ( !defined (_MSC_VER) || (PG_VERSION_NUM >= 130000 && defined (PGPRO_VERSION)) )
62 #include "utils/guc.h"
63 #else
64 bool trace_sort = false;
65 #endif
66 #endif
67
68 typedef struct
69 {
70 void *tuple; /* the tuple proper */
71 Datum datum1; /* value of first key column */
72 bool isnull1; /* is first key column NULL? */
73 int tupindex; /* see notes above */
74 } SortTuple;
75
76 typedef enum
77 {
78 TSS_INITIAL, /* Loading tuples; still within memory limit */
79 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
80 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
81 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
82 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
83 TSS_FINALMERGE /* Performing final merge on-the-fly */
84 } TupSortStatus;
85
86 #define MINORDER 6 /* minimum merge order */
87 #define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
88 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
89
90 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
91 RumTuplesortstate *state);
92
93 /*
94 * Renamed copy of Tuplesortstate.
95 */
96 struct RumTuplesortstate
97 {
98 TupSortStatus status; /* enumerated value as shown above */
99 int nKeys; /* number of columns in sort key */
100 bool randomAccess; /* did caller request random access? */
101 bool bounded; /* did caller specify a maximum number of
102 * tuples to return? */
103 bool boundUsed; /* true if we made use of a bounded heap */
104 int bound; /* if bounded, the maximum number of tuples */
105 long availMem; /* remaining memory available, in bytes */
106 long allowedMem; /* total memory allowed, in bytes */
107 int maxTapes; /* number of tapes (Knuth's T) */
108 int tapeRange; /* maxTapes-1 (Knuth's P) */
109 MemoryContext sortcontext; /* memory context holding all sort data */
110 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
111
112 /*
113 * These function pointers decouple the routines that must know what kind
114 * of tuple we are sorting from the routines that don't need to know it.
115 * They are set up by the rum_tuplesort_begin_xxx routines.
116 *
117 * Function to compare two tuples; result is per qsort() convention, ie:
118 * <0, 0, >0 according as a<b, a=b, a>b. The API must match
119 * qsort_arg_comparator.
120 */
121 SortTupleComparator comparetup;
122
123 /*
124 * Function to copy a supplied input tuple into palloc'd space and set up
125 * its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
126 * state->availMem must be decreased by the amount of space used for the
127 * tuple copy (note the SortTuple struct itself is not counted).
128 */
129 void (*copytup) (RumTuplesortstate *state, SortTuple *stup, void *tup);
130
131 /*
132 * Function to write a stored tuple onto tape. The representation of the
133 * tuple on tape need not be the same as it is in memory; requirements on
134 * the tape representation are given below. After writing the tuple,
135 * pfree() the out-of-line data (not the SortTuple struct!), and increase
136 * state->availMem by the amount of memory space thereby released.
137 */
138 void (*writetup) (RumTuplesortstate *state, int tapenum,
139 SortTuple *stup);
140
141 /*
142 * Function to read a stored tuple from tape back into memory. 'len' is
143 * the already-read length of the stored tuple. Create a palloc'd copy,
144 * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
145 * decrease state->availMem by the amount of memory space consumed.
146 */
147 void (*readtup) (RumTuplesortstate *state, SortTuple *stup,
148 int tapenum, unsigned int len);
149
150 /*
151 * Function to reverse the sort direction from its current state. (We
152 * could dispense with this if we wanted to enforce that all variants
153 * represent the sort key information alike.)
154 */
155 void (*reversedirection) (RumTuplesortstate *state);
156
157 /*
158 * This array holds the tuples now in sort memory. If we are in state
159 * INITIAL, the tuples are in no particular order; if we are in state
160 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
161 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
162 * H. (Note that memtupcount only counts the tuples that are part of the
163 * heap --- during merge passes, memtuples[] entries beyond tapeRange are
164 * never in the heap and are used to hold pre-read tuples.) In state
165 * SORTEDONTAPE, the array is not used.
166 */
167 SortTuple *memtuples; /* array of SortTuple structs */
168 int memtupcount; /* number of tuples currently present */
169 int memtupsize; /* allocated length of memtuples array */
170 bool growmemtuples; /* memtuples' growth still underway? */
171
172 /* Buffer size to use for reading input tapes, during merge. */
173 size_t read_buffer_size;
174
175 /*
176 * While building initial runs, this is the current output run number
177 * (starting at 0). Afterwards, it is the number of initial runs we made.
178 */
179 int currentRun;
180
181 /*
182 * Unless otherwise noted, all pointer variables below are pointers to
183 * arrays of length maxTapes, holding per-tape data.
184 */
185
186 /*
187 * These variables are only used during merge passes. mergeactive[i] is
188 * true if we are reading an input run from (actual) tape number i and
189 * have not yet exhausted that run. mergenext[i] is the memtuples index
190 * of the next pre-read tuple (next to be loaded into the heap) for tape
191 * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
192 * points to the last pre-read tuple from each tape. mergeavailslots[i]
193 * is the number of unused memtuples[] slots reserved for tape i, and
194 * mergeavailmem[i] is the amount of unused space allocated for tape i.
195 * mergefreelist and mergefirstfree keep track of unused locations in the
196 * memtuples[] array. The memtuples[].tupindex fields link together
197 * pre-read tuples for each tape as well as recycled locations in
198 * mergefreelist. It is OK to use 0 as a null link in these lists, because
199 * memtuples[0] is part of the merge heap and is never a pre-read tuple.
200 */
201 bool *mergeactive; /* active input run source? */
202 int *mergenext; /* first preread tuple for each source */
203 int *mergelast; /* last preread tuple for each source */
204 int *mergeavailslots; /* slots left for prereading each tape */
205 long *mergeavailmem; /* availMem for prereading each tape */
206 int mergefreelist; /* head of freelist of recycled slots */
207 int mergefirstfree; /* first slot never used in this merge */
208
209 /*
210 * Variables for Algorithm D. Note that destTape is a "logical" tape
211 * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
212 * "logical" and "actual" tape numbers straight!
213 */
214 int Level; /* Knuth's l */
215 int destTape; /* current output tape (Knuth's j, less 1) */
216 int *tp_fib; /* Target Fibonacci run counts (A[]) */
217 int *tp_runs; /* # of real runs on each tape */
218 int *tp_dummy; /* # of dummy runs for each tape (D[]) */
219 int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
220 int activeTapes; /* # of active input tapes in merge pass */
221
222 /*
223 * These variables are used after completion of sorting to keep track of
224 * the next tuple to return. (In the tape case, the tape's current read
225 * position is also critical state.)
226 */
227 int result_tape; /* actual tape number of finished output */
228 int current; /* array index (only used if SORTEDINMEM) */
229 bool eof_reached; /* reached EOF (needed for cursors) */
230
231 /* markpos_xxx holds marked position for mark and restore */
232 long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
233 int markpos_offset; /* saved "current", or offset in tape block */
234 bool markpos_eof; /* saved "eof_reached" */
235
236 /*
237 * These variables are specific to the MinimalTuple case; they are set by
238 * rum_tuplesort_begin_heap and used only by the MinimalTuple routines.
239 */
240 TupleDesc tupDesc;
241 SortSupport sortKeys; /* array of length nKeys */
242
243 /*
244 * This variable is shared by the single-key MinimalTuple case and the
245 * Datum case (which both use qsort_ssup()). Otherwise it's NULL.
246 */
247 SortSupport onlyKey;
248
249 /*
250 * These variables are specific to the CLUSTER case; they are set by
251 * rum_tuplesort_begin_cluster. Note CLUSTER also uses tupDesc and
252 * indexScanKey.
253 */
254 IndexInfo *indexInfo; /* info about index being used for reference */
255 EState *estate; /* for evaluating index expressions */
256
257 /*
258 * These variables are specific to the IndexTuple case; they are set by
259 * rum_tuplesort_begin_index_xxx and used only by the IndexTuple routines.
260 */
261 Relation heapRel; /* table the index is being built on */
262 Relation indexRel; /* index being built */
263
264 /* These are specific to the index_btree subcase: */
265 ScanKey indexScanKey;
266 bool enforceUnique; /* complain if we find duplicate tuples */
267
268 /* These are specific to the index_hash subcase: */
269 uint32 hash_mask; /* mask for sortable part of hash code */
270
271 /*
272 * These variables are specific to the Datum case; they are set by
273 * rum_tuplesort_begin_datum and used only by the DatumTuple routines.
274 */
275 Oid datumType;
276 /* we need typelen and byval in order to know how to copy the Datums. */
277 int datumTypeLen;
278 bool datumTypeByVal;
279
280 bool reverse;
281
282 /* Do we need ItemPointer comparison in comparetup_rum()? */
283 bool compareItemPointer;
284
285 /* compare_rumitem */
286 FmgrInfo *cmp;
287
288 /*
289 * Resource snapshot for time of sort start.
290 */
291 #ifdef TRACE_SORT
292 PGRUsage ru_start;
293 #endif
294 };
295
296 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
297 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
298 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
299 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
300 #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
301 #define LACKMEM(state) ((state)->availMem < 0)
302 #define USEMEM(state,amt) ((state)->availMem -= (amt))
303 #define FREEMEM(state,amt) ((state)->availMem += (amt))
304
305 /* When using this macro, beware of double evaluation of len */
306 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
307 do { \
308 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
309 elog(ERROR, "unexpected end of data"); \
310 } while(0)
311
312
313 static RumTuplesortstate *rum_tuplesort_begin_common(int workMem, bool randomAccess);
314 static void puttuple_common(RumTuplesortstate *state, SortTuple *tuple);
315 static void inittapes(RumTuplesortstate *state);
316 static void selectnewtape(RumTuplesortstate *state);
317 static void mergeruns(RumTuplesortstate *state);
318 static void mergeonerun(RumTuplesortstate *state);
319 static void beginmerge(RumTuplesortstate *state);
320 static void mergepreread(RumTuplesortstate *state);
321 static void mergeprereadone(RumTuplesortstate *state, int srcTape);
322 static void dumptuples(RumTuplesortstate *state, bool alltuples);
323 static void make_bounded_heap(RumTuplesortstate *state);
324 static void sort_bounded_heap(RumTuplesortstate *state);
325 static void rum_tuplesort_heap_insert(RumTuplesortstate *state, SortTuple *tuple,
326 int tupleindex, bool checkIndex);
327 static void rum_tuplesort_heap_siftup(RumTuplesortstate *state, bool checkIndex);
328 static unsigned int getlen(RumTuplesortstate *state, int tapenum, bool eofOK);
329 static void markrunend(RumTuplesortstate *state, int tapenum);
330 static void free_sort_tuple(RumTuplesortstate *state, SortTuple *stup);
331 static int comparetup_rum(const SortTuple *a, const SortTuple *b,
332 RumTuplesortstate *state);
333 static void copytup_rum(RumTuplesortstate *state, SortTuple *stup, void *tup);
334 static void writetup_rum(RumTuplesortstate *state, int tapenum,
335 SortTuple *stup);
336 static void readtup_rum(RumTuplesortstate *state, SortTuple *stup,
337 int tapenum, unsigned int len);
338 static void reversedirection_rum(RumTuplesortstate *state);
339 static int comparetup_rumitem(const SortTuple *a, const SortTuple *b,
340 RumTuplesortstate *state);
341 static void copytup_rumitem(RumTuplesortstate *state, SortTuple *stup, void *tup);
342 static void writetup_rumitem(RumTuplesortstate *state, int tapenum,
343 SortTuple *stup);
344 static void readtup_rumitem(RumTuplesortstate *state, SortTuple *stup,
345 int tapenum, unsigned int len);
346
347 /*
348 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
349 * any variant of SortTuples, using the appropriate comparetup function.
350 * qsort_ssup() is specialized for the case where the comparetup function
351 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
352 * and Datum sorts.
353 */
354 /* #include "qsort_tuple.c" */
355
356 static void
swapfunc(SortTuple * a,SortTuple * b,size_t n)357 swapfunc(SortTuple *a, SortTuple *b, size_t n)
358 {
359 do
360 {
361 SortTuple t = *a;
362
363 *a++ = *b;
364 *b++ = t;
365 } while (--n > 0);
366 }
367
368 #define cmp_ssup(a, b, ssup) \
369 ApplySortComparator((a)->datum1, (a)->isnull1, \
370 (b)->datum1, (b)->isnull1, ssup)
371
372 #define swap(a, b) \
373 do { \
374 SortTuple t = *(a); \
375 *(a) = *(b); \
376 *(b) = t; \
377 } while (0);
378
379 #define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n)
380
381 static SortTuple *
med3_tuple(SortTuple * a,SortTuple * b,SortTuple * c,SortTupleComparator cmp_tuple,RumTuplesortstate * state)382 med3_tuple(SortTuple *a, SortTuple *b, SortTuple *c, SortTupleComparator cmp_tuple, RumTuplesortstate *state)
383 {
384 return cmp_tuple(a, b, state) < 0 ?
385 (cmp_tuple(b, c, state) < 0 ? b :
386 (cmp_tuple(a, c, state) < 0 ? c : a))
387 : (cmp_tuple(b, c, state) > 0 ? b :
388 (cmp_tuple(a, c, state) < 0 ? a : c));
389 }
390
391 static SortTuple *
med3_ssup(SortTuple * a,SortTuple * b,SortTuple * c,SortSupport ssup)392 med3_ssup(SortTuple *a, SortTuple *b, SortTuple *c, SortSupport ssup)
393 {
394 return cmp_ssup(a, b, ssup) < 0 ?
395 (cmp_ssup(b, c, ssup) < 0 ? b :
396 (cmp_ssup(a, c, ssup) < 0 ? c : a))
397 : (cmp_ssup(b, c, ssup) > 0 ? b :
398 (cmp_ssup(a, c, ssup) < 0 ? a : c));
399 }
400
401 static void
qsort_ssup(SortTuple * a,size_t n,SortSupport ssup)402 qsort_ssup(SortTuple *a, size_t n, SortSupport ssup)
403 {
404 SortTuple *pa,
405 *pb,
406 *pc,
407 *pd,
408 *pl,
409 *pm,
410 *pn;
411 size_t d1,
412 d2;
413 int r,
414 presorted;
415
416 loop:
417 CHECK_FOR_INTERRUPTS();
418 if (n < 7)
419 {
420 for (pm = a + 1; pm < a + n; pm++)
421 for (pl = pm; pl > a && cmp_ssup(pl - 1, pl, ssup) > 0; pl--)
422 swap(pl, pl - 1);
423 return;
424 }
425 presorted = 1;
426 for (pm = a + 1; pm < a + n; pm++)
427 {
428 CHECK_FOR_INTERRUPTS();
429 if (cmp_ssup(pm - 1, pm, ssup) > 0)
430 {
431 presorted = 0;
432 break;
433 }
434 }
435 if (presorted)
436 return;
437 pm = a + (n / 2);
438 if (n > 7)
439 {
440 pl = a;
441 pn = a + (n - 1);
442 if (n > 40)
443 {
444 size_t d = (n / 8);
445
446 pl = med3_ssup(pl, pl + d, pl + 2 * d, ssup);
447 pm = med3_ssup(pm - d, pm, pm + d, ssup);
448 pn = med3_ssup(pn - 2 * d, pn - d, pn, ssup);
449 }
450 pm = med3_ssup(pl, pm, pn, ssup);
451 }
452 swap(a, pm);
453 pa = pb = a + 1;
454 pc = pd = a + (n - 1);
455 for (;;)
456 {
457 while (pb <= pc && (r = cmp_ssup(pb, a, ssup)) <= 0)
458 {
459 if (r == 0)
460 {
461 swap(pa, pb);
462 pa++;
463 }
464 pb++;
465 CHECK_FOR_INTERRUPTS();
466 }
467 while (pb <= pc && (r = cmp_ssup(pc, a, ssup)) >= 0)
468 {
469 if (r == 0)
470 {
471 swap(pc, pd);
472 pd--;
473 }
474 pc--;
475 CHECK_FOR_INTERRUPTS();
476 }
477 if (pb > pc)
478 break;
479 swap(pb, pc);
480 pb++;
481 pc--;
482 }
483 pn = a + n;
484 d1 = Min(pa - a, pb - pa);
485 vecswap(a, pb - d1, d1);
486 d1 = Min(pd - pc, pn - pd - 1);
487 vecswap(pb, pn - d1, d1);
488 d1 = pb - pa;
489 d2 = pd - pc;
490 if (d1 <= d2)
491 {
492 /* Recurse on left partition, then iterate on right partition */
493 if (d1 > 1)
494 qsort_ssup(a, d1, ssup);
495 if (d2 > 1)
496 {
497 /* Iterate rather than recurse to save stack space */
498 /* qsort_ssup(pn - d2, d2, ssup); */
499 a = pn - d2;
500 n = d2;
501 goto loop;
502 }
503 }
504 else
505 {
506 /* Recurse on right partition, then iterate on left partition */
507 if (d2 > 1)
508 qsort_ssup(pn - d2, d2, ssup);
509 if (d1 > 1)
510 {
511 /* Iterate rather than recurse to save stack space */
512 /* qsort_ssup(a, d1, ssup); */
513 n = d1;
514 goto loop;
515 }
516 }
517 }
518
519 static void
qsort_tuple(SortTuple * a,size_t n,SortTupleComparator cmp_tuple,RumTuplesortstate * state)520 qsort_tuple(SortTuple *a, size_t n, SortTupleComparator cmp_tuple, RumTuplesortstate *state)
521 {
522 SortTuple *pa,
523 *pb,
524 *pc,
525 *pd,
526 *pl,
527 *pm,
528 *pn;
529 size_t d1,
530 d2;
531 int r,
532 presorted;
533
534 loop:
535 CHECK_FOR_INTERRUPTS();
536 if (n < 7)
537 {
538 for (pm = a + 1; pm < a + n; pm++)
539 for (pl = pm; pl > a && cmp_tuple(pl - 1, pl, state) > 0; pl--)
540 swap(pl, pl - 1);
541 return;
542 }
543 presorted = 1;
544 for (pm = a + 1; pm < a + n; pm++)
545 {
546 CHECK_FOR_INTERRUPTS();
547 if (cmp_tuple(pm - 1, pm, state) > 0)
548 {
549 presorted = 0;
550 break;
551 }
552 }
553 if (presorted)
554 return;
555 pm = a + (n / 2);
556 if (n > 7)
557 {
558 pl = a;
559 pn = a + (n - 1);
560 if (n > 40)
561 {
562 size_t d = (n / 8);
563
564 pl = med3_tuple(pl, pl + d, pl + 2 * d, cmp_tuple, state);
565 pm = med3_tuple(pm - d, pm, pm + d, cmp_tuple, state);
566 pn = med3_tuple(pn - 2 * d, pn - d, pn, cmp_tuple, state);
567 }
568 pm = med3_tuple(pl, pm, pn, cmp_tuple, state);
569 }
570 swap(a, pm);
571 pa = pb = a + 1;
572 pc = pd = a + (n - 1);
573 for (;;)
574 {
575 while (pb <= pc && (r = cmp_tuple(pb, a, state)) <= 0)
576 {
577 if (r == 0)
578 {
579 swap(pa, pb);
580 pa++;
581 }
582 pb++;
583 CHECK_FOR_INTERRUPTS();
584 }
585 while (pb <= pc && (r = cmp_tuple(pc, a, state)) >= 0)
586 {
587 if (r == 0)
588 {
589 swap(pc, pd);
590 pd--;
591 }
592 pc--;
593 CHECK_FOR_INTERRUPTS();
594 }
595 if (pb > pc)
596 break;
597 swap(pb, pc);
598 pb++;
599 pc--;
600 }
601 pn = a + n;
602 d1 = Min(pa - a, pb - pa);
603 vecswap(a, pb - d1, d1);
604 d1 = Min(pd - pc, pn - pd - 1);
605 vecswap(pb, pn - d1, d1);
606 d1 = pb - pa;
607 d2 = pd - pc;
608 if (d1 <= d2)
609 {
610 /* Recurse on left partition, then iterate on right partition */
611 if (d1 > 1)
612 qsort_tuple(a, d1, cmp_tuple, state);
613 if (d2 > 1)
614 {
615 /* Iterate rather than recurse to save stack space */
616 /* qsort_tuple(pn - d2, d2, cmp_tuple, state); */
617 a = pn - d2;
618 n = d2;
619 goto loop;
620 }
621 }
622 else
623 {
624 /* Recurse on right partition, then iterate on left partition */
625 if (d2 > 1)
626 qsort_tuple(pn - d2, d2, cmp_tuple, state);
627 if (d1 > 1)
628 {
629 /* Iterate rather than recurse to save stack space */
630 /* qsort_tuple(a, d1, cmp_tuple, state); */
631 n = d1;
632 goto loop;
633 }
634 }
635 }
636
637 /*
638 * rum_tuplesort_begin_xxx
639 *
640 * Initialize for a tuple sort operation.
641 *
642 * After calling rum_tuplesort_begin, the caller should call rum_tuplesort_putXXX
643 * zero or more times, then call rum_tuplesort_performsort when all the tuples
644 * have been supplied. After performsort, retrieve the tuples in sorted
645 * order by calling rum_tuplesort_getXXX until it returns false/NULL. (If random
646 * access was requested, rescan, markpos, and restorepos can also be called.)
647 * Call rum_tuplesort_end to terminate the operation and release memory/disk space.
648 *
649 * Each variant of rum_tuplesort_begin has a workMem parameter specifying the
650 * maximum number of kilobytes of RAM to use before spilling data to disk.
651 * (The normal value of this parameter is work_mem, but some callers use
652 * other values.) Each variant also has a randomAccess parameter specifying
653 * whether the caller needs non-sequential access to the sort result.
654 */
655
656 static RumTuplesortstate *
rum_tuplesort_begin_common(int workMem,bool randomAccess)657 rum_tuplesort_begin_common(int workMem, bool randomAccess)
658 {
659 RumTuplesortstate *state;
660 MemoryContext sortcontext;
661 MemoryContext oldcontext;
662
663 /*
664 * Create a working memory context for this sort operation. All data
665 * needed by the sort will live inside this context.
666 */
667 sortcontext = RumContextCreate(CurrentMemoryContext, "TupleSort");
668
669 /*
670 * Make the Tuplesortstate within the per-sort context. This way, we
671 * don't need a separate pfree() operation for it at shutdown.
672 */
673 oldcontext = MemoryContextSwitchTo(sortcontext);
674
675 state = (RumTuplesortstate *) palloc0(sizeof(RumTuplesortstate));
676
677 #ifdef TRACE_SORT
678 if (trace_sort)
679 pg_rusage_init(&state->ru_start);
680 #endif
681
682 state->status = TSS_INITIAL;
683 state->randomAccess = randomAccess;
684 state->bounded = false;
685 state->boundUsed = false;
686 state->allowedMem = workMem * 1024L;
687 state->availMem = state->allowedMem;
688 state->sortcontext = sortcontext;
689 state->tapeset = NULL;
690
691 state->memtupcount = 0;
692
693 /*
694 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
695 * see comments in grow_memtuples().
696 */
697 state->memtupsize = Max(1024,
698 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
699
700 state->growmemtuples = true;
701 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
702
703 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
704
705 /* workMem must be large enough for the minimal memtuples array */
706 if (LACKMEM(state))
707 elog(ERROR, "insufficient memory allowed for sort");
708
709 state->currentRun = 0;
710
711 /*
712 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
713 * inittapes(), if needed
714 */
715
716 state->result_tape = -1; /* flag that result tape has not been formed */
717
718 MemoryContextSwitchTo(oldcontext);
719
720 return state;
721 }
722
723 /*
724 * Get sort state memory context. Currently it is used only to allocate
725 * RumSortItem.
726 */
727 MemoryContext
rum_tuplesort_get_memorycontext(RumTuplesortstate * state)728 rum_tuplesort_get_memorycontext(RumTuplesortstate *state)
729 {
730 return state->sortcontext;
731 }
732
733 RumTuplesortstate *
rum_tuplesort_begin_rum(int workMem,int nKeys,bool randomAccess,bool compareItemPointer)734 rum_tuplesort_begin_rum(int workMem, int nKeys, bool randomAccess,
735 bool compareItemPointer)
736 {
737 RumTuplesortstate *state = rum_tuplesort_begin_common(workMem, randomAccess);
738 MemoryContext oldcontext;
739
740 oldcontext = MemoryContextSwitchTo(state->sortcontext);
741
742 #ifdef TRACE_SORT
743 if (trace_sort)
744 elog(LOG,
745 "begin rum sort: nKeys = %d, workMem = %d, randomAccess = %c",
746 nKeys, workMem, randomAccess ? 't' : 'f');
747 #endif
748
749 state->nKeys = nKeys;
750
751 state->comparetup = comparetup_rum;
752 state->copytup = copytup_rum;
753 state->writetup = writetup_rum;
754 state->readtup = readtup_rum;
755 state->reversedirection = reversedirection_rum;
756 state->reverse = false;
757 state->compareItemPointer = compareItemPointer;
758
759 MemoryContextSwitchTo(oldcontext);
760
761 return state;
762 }
763
764 RumTuplesortstate *
rum_tuplesort_begin_rumitem(int workMem,FmgrInfo * cmp)765 rum_tuplesort_begin_rumitem(int workMem, FmgrInfo *cmp)
766 {
767 RumTuplesortstate *state = rum_tuplesort_begin_common(workMem, false);
768 MemoryContext oldcontext;
769
770 oldcontext = MemoryContextSwitchTo(state->sortcontext);
771
772 #ifdef TRACE_SORT
773 if (trace_sort)
774 elog(LOG,
775 "begin rumitem sort: workMem = %d", workMem);
776 #endif
777
778 state->cmp = cmp;
779 state->comparetup = comparetup_rumitem;
780 state->copytup = copytup_rumitem;
781 state->writetup = writetup_rumitem;
782 state->readtup = readtup_rumitem;
783 state->reversedirection = reversedirection_rum;
784 state->reverse = false;
785 state->compareItemPointer = false;
786
787 MemoryContextSwitchTo(oldcontext);
788
789 return state;
790 }
791
792 /*
793 * rum_tuplesort_end
794 *
795 * Release resources and clean up.
796 *
797 * NOTE: after calling this, any pointers returned by rum_tuplesort_getXXX are
798 * pointing to garbage. Be careful not to attempt to use or free such
799 * pointers afterwards!
800 */
801 void
rum_tuplesort_end(RumTuplesortstate * state)802 rum_tuplesort_end(RumTuplesortstate *state)
803 {
804 /* context swap probably not needed, but let's be safe */
805 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
806
807 #ifdef TRACE_SORT
808 long spaceUsed;
809
810 if (state->tapeset)
811 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
812 else
813 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
814 #endif
815
816 /*
817 * Delete temporary "tape" files, if any.
818 *
819 * Note: want to include this in reported total cost of sort, hence need
820 * for two #ifdef TRACE_SORT sections.
821 */
822 if (state->tapeset)
823 LogicalTapeSetClose(state->tapeset);
824
825 #ifdef TRACE_SORT
826 if (trace_sort)
827 {
828 if (state->tapeset)
829 elog(LOG, "external sort ended, %ld disk blocks used: %s",
830 spaceUsed, pg_rusage_show(&state->ru_start));
831 else
832 elog(LOG, "internal sort ended, %ld KB used: %s",
833 spaceUsed, pg_rusage_show(&state->ru_start));
834 }
835 #endif
836
837 /* Free any execution state created for CLUSTER case */
838 if (state->estate != NULL)
839 {
840 ExprContext *econtext = GetPerTupleExprContext(state->estate);
841
842 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
843 FreeExecutorState(state->estate);
844 }
845
846 MemoryContextSwitchTo(oldcontext);
847
848 /*
849 * Free the per-sort memory context, thereby releasing all working memory,
850 * including the Tuplesortstate struct itself.
851 */
852 MemoryContextDelete(state->sortcontext);
853 }
854
855 /*
856 * Grow the memtuples[] array, if possible within our memory constraint.
857 * Return true if we were able to enlarge the array, false if not.
858 *
859 * Normally, at each increment we double the size of the array. When we no
860 * longer have enough memory to do that, we attempt one last, smaller increase
861 * (and then clear the growmemtuples flag so we don't try any more). That
862 * allows us to use allowedMem as fully as possible; sticking to the pure
863 * doubling rule could result in almost half of allowedMem going unused.
864 * Because availMem moves around with tuple addition/removal, we need some
865 * rule to prevent making repeated small increases in memtupsize, which would
866 * just be useless thrashing. The growmemtuples flag accomplishes that and
867 * also prevents useless recalculations in this function.
868 */
869 static bool
grow_memtuples(RumTuplesortstate * state)870 grow_memtuples(RumTuplesortstate *state)
871 {
872 int newmemtupsize;
873 int memtupsize = state->memtupsize;
874 long memNowUsed = state->allowedMem - state->availMem;
875
876 /* Forget it if we've already maxed out memtuples, per comment above */
877 if (!state->growmemtuples)
878 return false;
879
880 /* Select new value of memtupsize */
881 if (memNowUsed <= state->availMem)
882 {
883 /*
884 * It is surely safe to double memtupsize if we've used no more than
885 * half of allowedMem.
886 *
887 * Note: it might seem that we need to worry about memtupsize * 2
888 * overflowing an int, but the MaxAllocSize clamp applied below
889 * ensures the existing memtupsize can't be large enough for that.
890 */
891 newmemtupsize = memtupsize * 2;
892 }
893 else
894 {
895 /*
896 * This will be the last increment of memtupsize. Abandon doubling
897 * strategy and instead increase as much as we safely can.
898 *
899 * To stay within allowedMem, we can't increase memtupsize by more
900 * than availMem / sizeof(SortTuple) elements. In practice, we want
901 * to increase it by considerably less, because we need to leave some
902 * space for the tuples to which the new array slots will refer. We
903 * assume the new tuples will be about the same size as the tuples
904 * we've already seen, and thus we can extrapolate from the space
905 * consumption so far to estimate an appropriate new size for the
906 * memtuples array. The optimal value might be higher or lower than
907 * this estimate, but it's hard to know that in advance.
908 *
909 * This calculation is safe against enlarging the array so much that
910 * LACKMEM becomes true, because the memory currently used includes
911 * the present array; thus, there would be enough allowedMem for the
912 * new array elements even if no other memory were currently used.
913 *
914 * We do the arithmetic in float8, because otherwise the product of
915 * memtupsize and allowedMem could overflow. (A little algebra shows
916 * that grow_ratio must be less than 2 here, so we are not risking
917 * integer overflow this way.) Any inaccuracy in the result should be
918 * insignificant; but even if we computed a completely insane result,
919 * the checks below will prevent anything really bad from happening.
920 */
921 double grow_ratio;
922
923 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
924 newmemtupsize = (int) (memtupsize * grow_ratio);
925
926 /* We won't make any further enlargement attempts */
927 state->growmemtuples = false;
928 }
929
930 /* Must enlarge array by at least one element, else report failure */
931 if (newmemtupsize <= memtupsize)
932 goto noalloc;
933
934 /*
935 * On a 64-bit machine, allowedMem could be more than MaxAllocSize. Clamp
936 * to ensure our request won't be rejected by palloc.
937 */
938 if ((Size) newmemtupsize >= MaxAllocSize / sizeof(SortTuple))
939 {
940 newmemtupsize = (int) (MaxAllocSize / sizeof(SortTuple));
941 state->growmemtuples = false; /* can't grow any more */
942 }
943
944 /*
945 * We need to be sure that we do not cause LACKMEM to become true, else
946 * the space management algorithm will go nuts. The code above should
947 * never generate a dangerous request, but to be safe, check explicitly
948 * that the array growth fits within availMem. (We could still cause
949 * LACKMEM if the memory chunk overhead associated with the memtuples
950 * array were to increase. That shouldn't happen because we chose the
951 * initial array size large enough to ensure that palloc will be treating
952 * both old and new arrays as separate chunks. But we'll check LACKMEM
953 * explicitly below just in case.)
954 */
955 if (state->availMem < (long) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
956 goto noalloc;
957
958 /* OK, do it */
959 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
960 state->memtupsize = newmemtupsize;
961 state->memtuples = (SortTuple *)
962 repalloc(state->memtuples,
963 state->memtupsize * sizeof(SortTuple));
964 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
965 if (LACKMEM(state))
966 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
967 return true;
968
969 noalloc:
970 /* If for any reason we didn't realloc, shut off future attempts */
971 state->growmemtuples = false;
972 return false;
973 }
974
975 void
rum_tuplesort_putrum(RumTuplesortstate * state,RumSortItem * item)976 rum_tuplesort_putrum(RumTuplesortstate *state, RumSortItem * item)
977 {
978 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
979 SortTuple stup;
980
981 /*
982 * Copy the given tuple into memory we control, and decrease availMem.
983 * Then call the common code.
984 */
985 COPYTUP(state, &stup, (void *) item);
986
987 puttuple_common(state, &stup);
988
989 MemoryContextSwitchTo(oldcontext);
990 }
991
992 void
rum_tuplesort_putrumitem(RumTuplesortstate * state,RumScanItem * item)993 rum_tuplesort_putrumitem(RumTuplesortstate *state, RumScanItem * item)
994 {
995 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
996 SortTuple stup;
997
998 /*
999 * Copy the given tuple into memory we control, and decrease availMem.
1000 * Then call the common code.
1001 */
1002 COPYTUP(state, &stup, (void *) item);
1003
1004 puttuple_common(state, &stup);
1005
1006 MemoryContextSwitchTo(oldcontext);
1007 }
1008
1009 /*
1010 * Shared code for tuple and datum cases.
1011 */
1012 static void
puttuple_common(RumTuplesortstate * state,SortTuple * tuple)1013 puttuple_common(RumTuplesortstate *state, SortTuple *tuple)
1014 {
1015 switch (state->status)
1016 {
1017 case TSS_INITIAL:
1018
1019 /*
1020 * Save the tuple into the unsorted array. First, grow the array
1021 * as needed. Note that we try to grow the array when there is
1022 * still one free slot remaining --- if we fail, there'll still be
1023 * room to store the incoming tuple, and then we'll switch to
1024 * tape-based operation.
1025 */
1026 if (state->memtupcount >= state->memtupsize - 1)
1027 {
1028 (void) grow_memtuples(state);
1029 Assert(state->memtupcount < state->memtupsize);
1030 }
1031 state->memtuples[state->memtupcount++] = *tuple;
1032
1033 /*
1034 * Check if it's time to switch over to a bounded heapsort. We do
1035 * so if the input tuple count exceeds twice the desired tuple
1036 * count (this is a heuristic for where heapsort becomes cheaper
1037 * than a quicksort), or if we've just filled workMem and have
1038 * enough tuples to meet the bound.
1039 *
1040 * Note that once we enter TSS_BOUNDED state we will always try to
1041 * complete the sort that way. In the worst case, if later input
1042 * tuples are larger than earlier ones, this might cause us to
1043 * exceed workMem significantly.
1044 */
1045 if (state->bounded &&
1046 (state->memtupcount > state->bound * 2 ||
1047 (state->memtupcount > state->bound && LACKMEM(state))))
1048 {
1049 #ifdef TRACE_SORT
1050 if (trace_sort)
1051 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1052 state->memtupcount,
1053 pg_rusage_show(&state->ru_start));
1054 #endif
1055 make_bounded_heap(state);
1056 return;
1057 }
1058
1059 /*
1060 * Done if we still fit in available memory and have array slots.
1061 */
1062 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1063 return;
1064
1065 /*
1066 * Nope; time to switch to tape-based operation.
1067 */
1068 inittapes(state);
1069
1070 /*
1071 * Dump tuples until we are back under the limit.
1072 */
1073 dumptuples(state, false);
1074 break;
1075
1076 case TSS_BOUNDED:
1077
1078 /*
1079 * We don't want to grow the array here, so check whether the new
1080 * tuple can be discarded before putting it in. This should be a
1081 * good speed optimization, too, since when there are many more
1082 * input tuples than the bound, most input tuples can be discarded
1083 * with just this one comparison. Note that because we currently
1084 * have the sort direction reversed, we must check for <= not >=.
1085 */
1086 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1087 {
1088 /* new tuple <= top of the heap, so we can discard it */
1089 free_sort_tuple(state, tuple);
1090 CHECK_FOR_INTERRUPTS();
1091 }
1092 else
1093 {
1094 /* discard top of heap, sift up, insert new tuple */
1095 free_sort_tuple(state, &state->memtuples[0]);
1096 rum_tuplesort_heap_siftup(state, false);
1097 rum_tuplesort_heap_insert(state, tuple, 0, false);
1098 }
1099 break;
1100
1101 case TSS_BUILDRUNS:
1102
1103 /*
1104 * Insert the tuple into the heap, with run number currentRun if
1105 * it can go into the current run, else run number currentRun+1.
1106 * The tuple can go into the current run if it is >= the first
1107 * not-yet-output tuple. (Actually, it could go into the current
1108 * run if it is >= the most recently output tuple ... but that
1109 * would require keeping around the tuple we last output, and it's
1110 * simplest to let writetup free each tuple as soon as it's
1111 * written.)
1112 *
1113 * Note there will always be at least one tuple in the heap at
1114 * this point; see dumptuples.
1115 */
1116 Assert(state->memtupcount > 0);
1117 if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
1118 rum_tuplesort_heap_insert(state, tuple, state->currentRun, true);
1119 else
1120 rum_tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
1121
1122 /*
1123 * If we are over the memory limit, dump tuples till we're under.
1124 */
1125 dumptuples(state, false);
1126 break;
1127
1128 default:
1129 elog(ERROR, "invalid tuplesort state");
1130 break;
1131 }
1132 }
1133
1134 /*
1135 * All tuples have been provided; finish the sort.
1136 */
1137 void
rum_tuplesort_performsort(RumTuplesortstate * state)1138 rum_tuplesort_performsort(RumTuplesortstate *state)
1139 {
1140 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1141
1142 #ifdef TRACE_SORT
1143 if (trace_sort)
1144 elog(LOG, "performsort starting: %s",
1145 pg_rusage_show(&state->ru_start));
1146 #endif
1147
1148 switch (state->status)
1149 {
1150 case TSS_INITIAL:
1151
1152 /*
1153 * We were able to accumulate all the tuples within the allowed
1154 * amount of memory. Just qsort 'em and we're done.
1155 */
1156 if (state->memtupcount > 1)
1157 {
1158 /* Can we use the single-key sort function? */
1159 if (state->onlyKey != NULL)
1160 qsort_ssup(state->memtuples, state->memtupcount,
1161 state->onlyKey);
1162 else
1163 qsort_tuple(state->memtuples,
1164 state->memtupcount,
1165 state->comparetup,
1166 state);
1167 }
1168 state->current = 0;
1169 state->eof_reached = false;
1170 state->markpos_offset = 0;
1171 state->markpos_eof = false;
1172 state->status = TSS_SORTEDINMEM;
1173 break;
1174
1175 case TSS_BOUNDED:
1176
1177 /*
1178 * We were able to accumulate all the tuples required for output
1179 * in memory, using a heap to eliminate excess tuples. Now we
1180 * have to transform the heap to a properly-sorted array.
1181 */
1182 sort_bounded_heap(state);
1183 state->current = 0;
1184 state->eof_reached = false;
1185 state->markpos_offset = 0;
1186 state->markpos_eof = false;
1187 state->status = TSS_SORTEDINMEM;
1188 break;
1189
1190 case TSS_BUILDRUNS:
1191
1192 /*
1193 * Finish tape-based sort. First, flush all tuples remaining in
1194 * memory out to tape; then merge until we have a single remaining
1195 * run (or, if !randomAccess, one run per tape). Note that
1196 * mergeruns sets the correct state->status.
1197 */
1198 dumptuples(state, true);
1199 mergeruns(state);
1200 state->eof_reached = false;
1201 state->markpos_block = 0L;
1202 state->markpos_offset = 0;
1203 state->markpos_eof = false;
1204 break;
1205
1206 default:
1207 elog(ERROR, "invalid tuplesort state");
1208 break;
1209 }
1210
1211 #ifdef TRACE_SORT
1212 if (trace_sort)
1213 {
1214 if (state->status == TSS_FINALMERGE)
1215 elog(LOG, "performsort done (except %d-way final merge): %s",
1216 state->activeTapes,
1217 pg_rusage_show(&state->ru_start));
1218 else
1219 elog(LOG, "performsort done: %s",
1220 pg_rusage_show(&state->ru_start));
1221 }
1222 #endif
1223
1224 MemoryContextSwitchTo(oldcontext);
1225 }
1226
1227 /*
1228 * Internal routine to fetch the next tuple in either forward or back
1229 * direction into *stup. Returns false if no more tuples.
1230 * If *should_free is set, the caller must pfree stup.tuple when done with it.
1231 */
1232 static bool
rum_tuplesort_gettuple_common(RumTuplesortstate * state,bool forward,SortTuple * stup,bool * should_free)1233 rum_tuplesort_gettuple_common(RumTuplesortstate *state, bool forward,
1234 SortTuple *stup, bool *should_free)
1235 {
1236 unsigned int tuplen;
1237
1238 switch (state->status)
1239 {
1240 case TSS_SORTEDINMEM:
1241 Assert(forward || state->randomAccess);
1242 *should_free = false;
1243 if (forward)
1244 {
1245 if (state->current < state->memtupcount)
1246 {
1247 *stup = state->memtuples[state->current++];
1248 return true;
1249 }
1250 state->eof_reached = true;
1251
1252 /*
1253 * Complain if caller tries to retrieve more tuples than
1254 * originally asked for in a bounded sort. This is because
1255 * returning EOF here might be the wrong thing.
1256 */
1257 if (state->bounded && state->current >= state->bound)
1258 elog(ERROR, "retrieved too many tuples in a bounded sort");
1259
1260 return false;
1261 }
1262 else
1263 {
1264 if (state->current <= 0)
1265 return false;
1266
1267 /*
1268 * if all tuples are fetched already then we return last
1269 * tuple, else - tuple before last returned.
1270 */
1271 if (state->eof_reached)
1272 state->eof_reached = false;
1273 else
1274 {
1275 state->current--; /* last returned tuple */
1276 if (state->current <= 0)
1277 return false;
1278 }
1279 *stup = state->memtuples[state->current - 1];
1280 return true;
1281 }
1282 break;
1283
1284 case TSS_SORTEDONTAPE:
1285 Assert(forward || state->randomAccess);
1286 *should_free = true;
1287 if (forward)
1288 {
1289 if (state->eof_reached)
1290 return false;
1291 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1292 {
1293 READTUP(state, stup, state->result_tape, tuplen);
1294 return true;
1295 }
1296 else
1297 {
1298 state->eof_reached = true;
1299 return false;
1300 }
1301 }
1302
1303 /*
1304 * Backward.
1305 *
1306 * if all tuples are fetched already then we return last tuple,
1307 * else - tuple before last returned.
1308 */
1309 if (state->eof_reached)
1310 {
1311 /*
1312 * Seek position is pointing just past the zero tuplen at the
1313 * end of file; back up to fetch last tuple's ending length
1314 * word. If seek fails we must have a completely empty file.
1315 */
1316 if (!LogicalTapeBackspace(state->tapeset,
1317 state->result_tape,
1318 2 * sizeof(unsigned int)))
1319 return false;
1320 state->eof_reached = false;
1321 }
1322 else
1323 {
1324 /*
1325 * Back up and fetch previously-returned tuple's ending length
1326 * word. If seek fails, assume we are at start of file.
1327 */
1328 if (!LogicalTapeBackspace(state->tapeset,
1329 state->result_tape,
1330 sizeof(unsigned int)))
1331 return false;
1332 tuplen = getlen(state, state->result_tape, false);
1333
1334 /*
1335 * Back up to get ending length word of tuple before it.
1336 */
1337 if (!LogicalTapeBackspace(state->tapeset,
1338 state->result_tape,
1339 tuplen + 2 * sizeof(unsigned int)))
1340 {
1341 /*
1342 * If that fails, presumably the prev tuple is the first
1343 * in the file. Back up so that it becomes next to read
1344 * in forward direction (not obviously right, but that is
1345 * what in-memory case does).
1346 */
1347 if (!LogicalTapeBackspace(state->tapeset,
1348 state->result_tape,
1349 tuplen + sizeof(unsigned int)))
1350 elog(ERROR, "bogus tuple length in backward scan");
1351 return false;
1352 }
1353 }
1354
1355 tuplen = getlen(state, state->result_tape, false);
1356
1357 /*
1358 * Now we have the length of the prior tuple, back up and read it.
1359 * Note: READTUP expects we are positioned after the initial
1360 * length word of the tuple, so back up to that point.
1361 */
1362 if (!LogicalTapeBackspace(state->tapeset,
1363 state->result_tape,
1364 tuplen))
1365 elog(ERROR, "bogus tuple length in backward scan");
1366 READTUP(state, stup, state->result_tape, tuplen);
1367 return true;
1368
1369 case TSS_FINALMERGE:
1370 Assert(forward);
1371 *should_free = true;
1372
1373 /*
1374 * This code should match the inner loop of mergeonerun().
1375 */
1376 if (state->memtupcount > 0)
1377 {
1378 int srcTape = state->memtuples[0].tupindex;
1379 Size tuplen;
1380 int tupIndex;
1381 SortTuple *newtup;
1382
1383 *stup = state->memtuples[0];
1384 /* returned tuple is no longer counted in our memory space */
1385 if (stup->tuple)
1386 {
1387 tuplen = GetMemoryChunkSpace(stup->tuple);
1388 state->availMem += tuplen;
1389 state->mergeavailmem[srcTape] += tuplen;
1390 }
1391 rum_tuplesort_heap_siftup(state, false);
1392 if ((tupIndex = state->mergenext[srcTape]) == 0)
1393 {
1394 /*
1395 * out of preloaded data on this tape, try to read more
1396 *
1397 * Unlike mergeonerun(), we only preload from the single
1398 * tape that's run dry. See mergepreread() comments.
1399 */
1400 mergeprereadone(state, srcTape);
1401
1402 /*
1403 * if still no data, we've reached end of run on this tape
1404 */
1405 if ((tupIndex = state->mergenext[srcTape]) == 0)
1406 return true;
1407 }
1408 /* pull next preread tuple from list, insert in heap */
1409 newtup = &state->memtuples[tupIndex];
1410 state->mergenext[srcTape] = newtup->tupindex;
1411 if (state->mergenext[srcTape] == 0)
1412 state->mergelast[srcTape] = 0;
1413 rum_tuplesort_heap_insert(state, newtup, srcTape, false);
1414 /* put the now-unused memtuples entry on the freelist */
1415 newtup->tupindex = state->mergefreelist;
1416 state->mergefreelist = tupIndex;
1417 state->mergeavailslots[srcTape]++;
1418 return true;
1419 }
1420 return false;
1421
1422 default:
1423 elog(ERROR, "invalid tuplesort state");
1424 return false; /* keep compiler quiet */
1425 }
1426 }
1427
1428 RumSortItem *
rum_tuplesort_getrum(RumTuplesortstate * state,bool forward,bool * should_free)1429 rum_tuplesort_getrum(RumTuplesortstate *state, bool forward, bool *should_free)
1430 {
1431 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1432 SortTuple stup;
1433
1434 if (!rum_tuplesort_gettuple_common(state, forward, &stup, should_free))
1435 stup.tuple = NULL;
1436
1437 MemoryContextSwitchTo(oldcontext);
1438
1439 return (RumSortItem *) stup.tuple;
1440 }
1441
1442 RumScanItem *
rum_tuplesort_getrumitem(RumTuplesortstate * state,bool forward,bool * should_free)1443 rum_tuplesort_getrumitem(RumTuplesortstate *state, bool forward, bool *should_free)
1444 {
1445 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1446 SortTuple stup;
1447
1448 if (!rum_tuplesort_gettuple_common(state, forward, &stup, should_free))
1449 stup.tuple = NULL;
1450
1451 MemoryContextSwitchTo(oldcontext);
1452
1453 return (RumScanItem *) stup.tuple;
1454 }
1455
1456 /*
1457 * rum_tuplesort_merge_order - report merge order we'll use for given memory
1458 * (note: "merge order" just means the number of input tapes in the merge).
1459 *
1460 * This is exported for use by the planner. allowedMem is in bytes.
1461 */
1462 int
rum_tuplesort_merge_order(long allowedMem)1463 rum_tuplesort_merge_order(long allowedMem)
1464 {
1465 int mOrder;
1466
1467 /*
1468 * We need one tape for each merge input, plus another one for the output,
1469 * and each of these tapes needs buffer space. In addition we want
1470 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
1471 * count).
1472 *
1473 * Note: you might be thinking we need to account for the memtuples[]
1474 * array in this calculation, but we effectively treat that as part of the
1475 * MERGE_BUFFER_SIZE workspace.
1476 */
1477 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
1478 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
1479
1480 /* Even in minimum memory, use at least a MINORDER merge */
1481 mOrder = Max(mOrder, MINORDER);
1482
1483 return mOrder;
1484 }
1485
1486 /*
1487 * inittapes - initialize for tape sorting.
1488 *
1489 * This is called only if we have found we don't have room to sort in memory.
1490 */
1491 static void
inittapes(RumTuplesortstate * state)1492 inittapes(RumTuplesortstate *state)
1493 {
1494 int maxTapes,
1495 ntuples,
1496 j;
1497 long tapeSpace;
1498
1499 /* Compute number of tapes to use: merge order plus 1 */
1500 maxTapes = rum_tuplesort_merge_order(state->allowedMem) + 1;
1501
1502 /*
1503 * We must have at least 2*maxTapes slots in the memtuples[] array, else
1504 * we'd not have room for merge heap plus preread. It seems unlikely that
1505 * this case would ever occur, but be safe.
1506 */
1507 maxTapes = Min(maxTapes, state->memtupsize / 2);
1508
1509 state->maxTapes = maxTapes;
1510 state->tapeRange = maxTapes - 1;
1511
1512 #ifdef TRACE_SORT
1513 if (trace_sort)
1514 elog(LOG, "switching to external sort with %d tapes: %s",
1515 maxTapes, pg_rusage_show(&state->ru_start));
1516 #endif
1517
1518 /*
1519 * Decrease availMem to reflect the space needed for tape buffers; but
1520 * don't decrease it to the point that we have no room for tuples. (That
1521 * case is only likely to occur if sorting pass-by-value Datums; in all
1522 * other scenarios the memtuples[] array is unlikely to occupy more than
1523 * half of allowedMem. In the pass-by-value case it's not important to
1524 * account for tuple space, so we don't care if LACKMEM becomes
1525 * inaccurate.)
1526 */
1527 tapeSpace = (long) maxTapes *TAPE_BUFFER_OVERHEAD;
1528
1529 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1530 USEMEM(state, tapeSpace);
1531
1532 /*
1533 * Make sure that the temp file(s) underlying the tape set are created in
1534 * suitable temp tablespaces.
1535 */
1536 PrepareTempTablespaces();
1537
1538 /*
1539 * Create the tape set and allocate the per-tape data arrays.
1540 */
1541 state->tapeset = LogicalTapeSetCreate(maxTapes);
1542
1543 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
1544 state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
1545 state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1546 state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
1547 state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
1548 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
1549 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
1550 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
1551 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
1552
1553 /*
1554 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1555 * marked as belonging to run number zero.
1556 *
1557 * NOTE: we pass false for checkIndex since there's no point in comparing
1558 * indexes in this step, even though we do intend the indexes to be part
1559 * of the sort key...
1560 */
1561 ntuples = state->memtupcount;
1562 state->memtupcount = 0; /* make the heap empty */
1563 for (j = 0; j < ntuples; j++)
1564 {
1565 /* Must copy source tuple to avoid possible overwrite */
1566 SortTuple stup = state->memtuples[j];
1567
1568 rum_tuplesort_heap_insert(state, &stup, 0, false);
1569 }
1570 Assert(state->memtupcount == ntuples);
1571
1572 state->currentRun = 0;
1573
1574 /*
1575 * Initialize variables of Algorithm D (step D1).
1576 */
1577 for (j = 0; j < maxTapes; j++)
1578 {
1579 state->tp_fib[j] = 1;
1580 state->tp_runs[j] = 0;
1581 state->tp_dummy[j] = 1;
1582 state->tp_tapenum[j] = j;
1583 }
1584 state->tp_fib[state->tapeRange] = 0;
1585 state->tp_dummy[state->tapeRange] = 0;
1586
1587 state->Level = 1;
1588 state->destTape = 0;
1589
1590 state->status = TSS_BUILDRUNS;
1591 }
1592
1593 /*
1594 * selectnewtape -- select new tape for new initial run.
1595 *
1596 * This is called after finishing a run when we know another run
1597 * must be started. This implements steps D3, D4 of Algorithm D.
1598 */
1599 static void
selectnewtape(RumTuplesortstate * state)1600 selectnewtape(RumTuplesortstate *state)
1601 {
1602 int j;
1603 int a;
1604
1605 /* Step D3: advance j (destTape) */
1606 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1607 {
1608 state->destTape++;
1609 return;
1610 }
1611 if (state->tp_dummy[state->destTape] != 0)
1612 {
1613 state->destTape = 0;
1614 return;
1615 }
1616
1617 /* Step D4: increase level */
1618 state->Level++;
1619 a = state->tp_fib[0];
1620 for (j = 0; j < state->tapeRange; j++)
1621 {
1622 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1623 state->tp_fib[j] = a + state->tp_fib[j + 1];
1624 }
1625 state->destTape = 0;
1626 }
1627
1628 /*
1629 * mergeruns -- merge all the completed initial runs.
1630 *
1631 * This implements steps D5, D6 of Algorithm D. All input data has
1632 * already been written to initial runs on tape (see dumptuples).
1633 */
1634 static void
mergeruns(RumTuplesortstate * state)1635 mergeruns(RumTuplesortstate *state)
1636 {
1637 int tapenum,
1638 svTape,
1639 svRuns,
1640 svDummy;
1641 int numTapes;
1642 int numInputTapes;
1643
1644 Assert(state->status == TSS_BUILDRUNS);
1645 Assert(state->memtupcount == 0);
1646
1647 /*
1648 * If we produced only one initial run (quite likely if the total data
1649 * volume is between 1X and 2X workMem), we can just use that tape as the
1650 * finished output, rather than doing a useless merge. (This obvious
1651 * optimization is not in Knuth's algorithm.)
1652 */
1653 if (state->currentRun == 1)
1654 {
1655 state->result_tape = state->tp_tapenum[state->destTape];
1656 /* must freeze and rewind the finished output tape */
1657 LogicalTapeFreeze(state->tapeset, state->result_tape);
1658 state->status = TSS_SORTEDONTAPE;
1659 return;
1660 }
1661
1662 /*
1663 * If we had fewer runs than tapes, refund the memory that we imagined we
1664 * would need for the tape buffers of the unused tapes.
1665 *
1666 * numTapes and numInputTapes reflect the actual number of tapes we will
1667 * use. Note that the output tape's tape number is maxTapes - 1, so the
1668 * tape numbers of the used tapes are not consecutive, and you cannot just
1669 * loop from 0 to numTapes to visit all used tapes!
1670 */
1671 if (state->Level == 1)
1672 {
1673 numInputTapes = state->currentRun;
1674 numTapes = numInputTapes + 1;
1675 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
1676 }
1677 else
1678 {
1679 numInputTapes = state->tapeRange;
1680 }
1681
1682 state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
1683 USEMEM(state, state->read_buffer_size * numInputTapes);
1684
1685 /* End of step D2: rewind all output tapes to prepare for merging */
1686 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1687 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
1688
1689 for (;;)
1690 {
1691 /*
1692 * At this point we know that tape[T] is empty. If there's just one
1693 * (real or dummy) run left on each input tape, then only one merge
1694 * pass remains. If we don't have to produce a materialized sorted
1695 * tape, we can stop at this point and do the final merge on-the-fly.
1696 */
1697 if (!state->randomAccess)
1698 {
1699 bool allOneRun = true;
1700
1701 Assert(state->tp_runs[state->tapeRange] == 0);
1702 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1703 {
1704 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1705 {
1706 allOneRun = false;
1707 break;
1708 }
1709 }
1710 if (allOneRun)
1711 {
1712 /* Tell logtape.c we won't be writing anymore */
1713 LogicalTapeSetForgetFreeSpace(state->tapeset);
1714 /* Initialize for the final merge pass */
1715 beginmerge(state);
1716 state->status = TSS_FINALMERGE;
1717 return;
1718 }
1719 }
1720
1721 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1722 while (state->tp_runs[state->tapeRange - 1] ||
1723 state->tp_dummy[state->tapeRange - 1])
1724 {
1725 bool allDummy = true;
1726
1727 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1728 {
1729 if (state->tp_dummy[tapenum] == 0)
1730 {
1731 allDummy = false;
1732 break;
1733 }
1734 }
1735
1736 if (allDummy)
1737 {
1738 state->tp_dummy[state->tapeRange]++;
1739 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1740 state->tp_dummy[tapenum]--;
1741 }
1742 else
1743 mergeonerun(state);
1744 }
1745
1746 /* Step D6: decrease level */
1747 if (--state->Level == 0)
1748 break;
1749 /* rewind output tape T to use as new input */
1750 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
1751 state->read_buffer_size);
1752 /* rewind used-up input tape P, and prepare it for write pass */
1753 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
1754 state->tp_runs[state->tapeRange - 1] = 0;
1755
1756 /*
1757 * reassign tape units per step D6; note we no longer care about A[]
1758 */
1759 svTape = state->tp_tapenum[state->tapeRange];
1760 svDummy = state->tp_dummy[state->tapeRange];
1761 svRuns = state->tp_runs[state->tapeRange];
1762 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
1763 {
1764 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1765 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1766 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1767 }
1768 state->tp_tapenum[0] = svTape;
1769 state->tp_dummy[0] = svDummy;
1770 state->tp_runs[0] = svRuns;
1771 }
1772
1773 /*
1774 * Done. Knuth says that the result is on TAPE[1], but since we exited
1775 * the loop without performing the last iteration of step D6, we have not
1776 * rearranged the tape unit assignment, and therefore the result is on
1777 * TAPE[T]. We need to do it this way so that we can freeze the final
1778 * output tape while rewinding it. The last iteration of step D6 would be
1779 * a waste of cycles anyway...
1780 */
1781 state->result_tape = state->tp_tapenum[state->tapeRange];
1782 LogicalTapeFreeze(state->tapeset, state->result_tape);
1783 state->status = TSS_SORTEDONTAPE;
1784 }
1785
1786 /*
1787 * Merge one run from each input tape, except ones with dummy runs.
1788 *
1789 * This is the inner loop of Algorithm D step D5. We know that the
1790 * output tape is TAPE[T].
1791 */
1792 static void
mergeonerun(RumTuplesortstate * state)1793 mergeonerun(RumTuplesortstate *state)
1794 {
1795 int destTape = state->tp_tapenum[state->tapeRange];
1796 int srcTape;
1797 int tupIndex;
1798 SortTuple *tup;
1799 long priorAvail,
1800 spaceFreed;
1801
1802 /*
1803 * Start the merge by loading one tuple from each active source tape into
1804 * the heap. We can also decrease the input run/dummy run counts.
1805 */
1806 beginmerge(state);
1807
1808 /*
1809 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
1810 * out, and replacing it with next tuple from same tape (if there is
1811 * another one).
1812 */
1813 while (state->memtupcount > 0)
1814 {
1815 /* write the tuple to destTape */
1816 priorAvail = state->availMem;
1817 srcTape = state->memtuples[0].tupindex;
1818 WRITETUP(state, destTape, &state->memtuples[0]);
1819 /* writetup adjusted total free space, now fix per-tape space */
1820 spaceFreed = state->availMem - priorAvail;
1821 state->mergeavailmem[srcTape] += spaceFreed;
1822 /* compact the heap */
1823 rum_tuplesort_heap_siftup(state, false);
1824 if ((tupIndex = state->mergenext[srcTape]) == 0)
1825 {
1826 /* out of preloaded data on this tape, try to read more */
1827 mergepreread(state);
1828 /* if still no data, we've reached end of run on this tape */
1829 if ((tupIndex = state->mergenext[srcTape]) == 0)
1830 continue;
1831 }
1832 /* pull next preread tuple from list, insert in heap */
1833 tup = &state->memtuples[tupIndex];
1834 state->mergenext[srcTape] = tup->tupindex;
1835 if (state->mergenext[srcTape] == 0)
1836 state->mergelast[srcTape] = 0;
1837 rum_tuplesort_heap_insert(state, tup, srcTape, false);
1838 /* put the now-unused memtuples entry on the freelist */
1839 tup->tupindex = state->mergefreelist;
1840 state->mergefreelist = tupIndex;
1841 state->mergeavailslots[srcTape]++;
1842 }
1843
1844 /*
1845 * When the heap empties, we're done. Write an end-of-run marker on the
1846 * output tape, and increment its count of real runs.
1847 */
1848 markrunend(state, destTape);
1849 state->tp_runs[state->tapeRange]++;
1850
1851 #ifdef TRACE_SORT
1852 if (trace_sort)
1853 elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
1854 pg_rusage_show(&state->ru_start));
1855 #endif
1856 }
1857
1858 /*
1859 * beginmerge - initialize for a merge pass
1860 *
1861 * We decrease the counts of real and dummy runs for each tape, and mark
1862 * which tapes contain active input runs in mergeactive[]. Then, load
1863 * as many tuples as we can from each active input tape, and finally
1864 * fill the merge heap with the first tuple from each active tape.
1865 */
1866 static void
beginmerge(RumTuplesortstate * state)1867 beginmerge(RumTuplesortstate *state)
1868 {
1869 int activeTapes;
1870 int tapenum;
1871 int srcTape;
1872 int slotsPerTape;
1873 long spacePerTape;
1874
1875 /* Heap should be empty here */
1876 Assert(state->memtupcount == 0);
1877
1878 /* Adjust run counts and mark the active tapes */
1879 memset(state->mergeactive, 0,
1880 state->maxTapes * sizeof(*state->mergeactive));
1881 activeTapes = 0;
1882 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1883 {
1884 if (state->tp_dummy[tapenum] > 0)
1885 state->tp_dummy[tapenum]--;
1886 else
1887 {
1888 Assert(state->tp_runs[tapenum] > 0);
1889 state->tp_runs[tapenum]--;
1890 srcTape = state->tp_tapenum[tapenum];
1891 state->mergeactive[srcTape] = true;
1892 activeTapes++;
1893 }
1894 }
1895 state->activeTapes = activeTapes;
1896
1897 /* Clear merge-pass state variables */
1898 memset(state->mergenext, 0,
1899 state->maxTapes * sizeof(*state->mergenext));
1900 memset(state->mergelast, 0,
1901 state->maxTapes * sizeof(*state->mergelast));
1902 state->mergefreelist = 0; /* nothing in the freelist */
1903 state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
1904
1905 /*
1906 * Initialize space allocation to let each active input tape have an equal
1907 * share of preread space.
1908 */
1909 Assert(activeTapes > 0);
1910 slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
1911 Assert(slotsPerTape > 0);
1912 spacePerTape = state->availMem / activeTapes;
1913 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1914 {
1915 if (state->mergeactive[srcTape])
1916 {
1917 state->mergeavailslots[srcTape] = slotsPerTape;
1918 state->mergeavailmem[srcTape] = spacePerTape;
1919 }
1920 }
1921
1922 /*
1923 * Preread as many tuples as possible (and at least one) from each active
1924 * tape
1925 */
1926 mergepreread(state);
1927
1928 /* Load the merge heap with the first tuple from each input tape */
1929 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1930 {
1931 int tupIndex = state->mergenext[srcTape];
1932 SortTuple *tup;
1933
1934 if (tupIndex)
1935 {
1936 tup = &state->memtuples[tupIndex];
1937 state->mergenext[srcTape] = tup->tupindex;
1938 if (state->mergenext[srcTape] == 0)
1939 state->mergelast[srcTape] = 0;
1940 rum_tuplesort_heap_insert(state, tup, srcTape, false);
1941 /* put the now-unused memtuples entry on the freelist */
1942 tup->tupindex = state->mergefreelist;
1943 state->mergefreelist = tupIndex;
1944 state->mergeavailslots[srcTape]++;
1945 }
1946 }
1947 }
1948
1949 /*
1950 * mergepreread - load tuples from merge input tapes
1951 *
1952 * This routine exists to improve sequentiality of reads during a merge pass,
1953 * as explained in the header comments of this file. Load tuples from each
1954 * active source tape until the tape's run is exhausted or it has used up
1955 * its fair share of available memory. In any case, we guarantee that there
1956 * is at least one preread tuple available from each unexhausted input tape.
1957 *
1958 * We invoke this routine at the start of a merge pass for initial load,
1959 * and then whenever any tape's preread data runs out. Note that we load
1960 * as much data as possible from all tapes, not just the one that ran out.
1961 * This is because logtape.c works best with a usage pattern that alternates
1962 * between reading a lot of data and writing a lot of data, so whenever we
1963 * are forced to read, we should fill working memory completely.
1964 *
1965 * In FINALMERGE state, we *don't* use this routine, but instead just preread
1966 * from the single tape that ran dry. There's no read/write alternation in
1967 * that state and so no point in scanning through all the tapes to fix one.
1968 * (Moreover, there may be quite a lot of inactive tapes in that state, since
1969 * we might have had many fewer runs than tapes. In a regular tape-to-tape
1970 * merge we can expect most of the tapes to be active.)
1971 */
1972 static void
mergepreread(RumTuplesortstate * state)1973 mergepreread(RumTuplesortstate *state)
1974 {
1975 int srcTape;
1976
1977 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1978 mergeprereadone(state, srcTape);
1979 }
1980
1981 /*
1982 * mergeprereadone - load tuples from one merge input tape
1983 *
1984 * Read tuples from the specified tape until it has used up its free memory
1985 * or array slots; but ensure that we have at least one tuple, if any are
1986 * to be had.
1987 */
1988 static void
mergeprereadone(RumTuplesortstate * state,int srcTape)1989 mergeprereadone(RumTuplesortstate *state, int srcTape)
1990 {
1991 unsigned int tuplen;
1992 SortTuple stup;
1993 int tupIndex;
1994 long priorAvail,
1995 spaceUsed;
1996
1997 if (!state->mergeactive[srcTape])
1998 return; /* tape's run is already exhausted */
1999 priorAvail = state->availMem;
2000 state->availMem = state->mergeavailmem[srcTape];
2001 while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
2002 state->mergenext[srcTape] == 0)
2003 {
2004 /* read next tuple, if any */
2005 if ((tuplen = getlen(state, srcTape, true)) == 0)
2006 {
2007 state->mergeactive[srcTape] = false;
2008 break;
2009 }
2010 READTUP(state, &stup, srcTape, tuplen);
2011 /* find a free slot in memtuples[] for it */
2012 tupIndex = state->mergefreelist;
2013 if (tupIndex)
2014 state->mergefreelist = state->memtuples[tupIndex].tupindex;
2015 else
2016 {
2017 tupIndex = state->mergefirstfree++;
2018 Assert(tupIndex < state->memtupsize);
2019 }
2020 state->mergeavailslots[srcTape]--;
2021 /* store tuple, append to list for its tape */
2022 stup.tupindex = 0;
2023 state->memtuples[tupIndex] = stup;
2024 if (state->mergelast[srcTape])
2025 state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
2026 else
2027 state->mergenext[srcTape] = tupIndex;
2028 state->mergelast[srcTape] = tupIndex;
2029 }
2030 /* update per-tape and global availmem counts */
2031 spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
2032 state->mergeavailmem[srcTape] = state->availMem;
2033 state->availMem = priorAvail - spaceUsed;
2034 }
2035
2036 /*
2037 * dumptuples - remove tuples from heap and write to tape
2038 *
2039 * This is used during initial-run building, but not during merging.
2040 *
2041 * When alltuples = false, dump only enough tuples to get under the
2042 * availMem limit (and leave at least one tuple in the heap in any case,
2043 * since puttuple assumes it always has a tuple to compare to). We also
2044 * insist there be at least one free slot in the memtuples[] array.
2045 *
2046 * When alltuples = true, dump everything currently in memory.
2047 * (This case is only used at end of input data.)
2048 *
2049 * If we empty the heap, close out the current run and return (this should
2050 * only happen at end of input data). If we see that the tuple run number
2051 * at the top of the heap has changed, start a new run.
2052 */
2053 static void
dumptuples(RumTuplesortstate * state,bool alltuples)2054 dumptuples(RumTuplesortstate *state, bool alltuples)
2055 {
2056 while (alltuples ||
2057 (LACKMEM(state) && state->memtupcount > 1) ||
2058 state->memtupcount >= state->memtupsize)
2059 {
2060 /*
2061 * Dump the heap's frontmost entry, and sift up to remove it from the
2062 * heap.
2063 */
2064 Assert(state->memtupcount > 0);
2065 WRITETUP(state, state->tp_tapenum[state->destTape],
2066 &state->memtuples[0]);
2067 rum_tuplesort_heap_siftup(state, true);
2068
2069 /*
2070 * If the heap is empty *or* top run number has changed, we've
2071 * finished the current run.
2072 */
2073 if (state->memtupcount == 0 ||
2074 state->currentRun != state->memtuples[0].tupindex)
2075 {
2076 markrunend(state, state->tp_tapenum[state->destTape]);
2077 state->currentRun++;
2078 state->tp_runs[state->destTape]++;
2079 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
2080
2081 #ifdef TRACE_SORT
2082 if (trace_sort)
2083 elog(LOG, "finished writing%s run %d to tape %d: %s",
2084 (state->memtupcount == 0) ? " final" : "",
2085 state->currentRun, state->destTape,
2086 pg_rusage_show(&state->ru_start));
2087 #endif
2088
2089 /*
2090 * Done if heap is empty, else prepare for new run.
2091 */
2092 if (state->memtupcount == 0)
2093 break;
2094 Assert(state->currentRun == state->memtuples[0].tupindex);
2095 selectnewtape(state);
2096 }
2097 }
2098 }
2099
2100
2101 /*
2102 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2103 *
2104 * Compare two SortTuples. If checkIndex is true, use the tuple index
2105 * as the front of the sort key; otherwise, no.
2106 */
2107
2108 #define HEAPCOMPARE(tup1,tup2) \
2109 (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
2110 ((tup1)->tupindex) - ((tup2)->tupindex) : \
2111 COMPARETUP(state, tup1, tup2))
2112
2113 /*
2114 * Convert the existing unordered array of SortTuples to a bounded heap,
2115 * discarding all but the smallest "state->bound" tuples.
2116 *
2117 * When working with a bounded heap, we want to keep the largest entry
2118 * at the root (array entry zero), instead of the smallest as in the normal
2119 * sort case. This allows us to discard the largest entry cheaply.
2120 * Therefore, we temporarily reverse the sort direction.
2121 *
2122 * We assume that all entries in a bounded heap will always have tupindex
2123 * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
2124 * the direction of comparison for tupindexes.
2125 */
2126 static void
make_bounded_heap(RumTuplesortstate * state)2127 make_bounded_heap(RumTuplesortstate *state)
2128 {
2129 int tupcount = state->memtupcount;
2130 int i;
2131
2132 Assert(state->status == TSS_INITIAL);
2133 Assert(state->bounded);
2134 Assert(tupcount >= state->bound);
2135
2136 /* Reverse sort direction so largest entry will be at root */
2137 REVERSEDIRECTION(state);
2138
2139 state->memtupcount = 0; /* make the heap empty */
2140 for (i = 0; i < tupcount; i++)
2141 {
2142 if (state->memtupcount >= state->bound &&
2143 COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2144 {
2145 /* New tuple would just get thrown out, so skip it */
2146 free_sort_tuple(state, &state->memtuples[i]);
2147 CHECK_FOR_INTERRUPTS();
2148 }
2149 else
2150 {
2151 /* Insert next tuple into heap */
2152 /* Must copy source tuple to avoid possible overwrite */
2153 SortTuple stup = state->memtuples[i];
2154
2155 rum_tuplesort_heap_insert(state, &stup, 0, false);
2156
2157 /* If heap too full, discard largest entry */
2158 if (state->memtupcount > state->bound)
2159 {
2160 free_sort_tuple(state, &state->memtuples[0]);
2161 rum_tuplesort_heap_siftup(state, false);
2162 }
2163 }
2164 }
2165
2166 Assert(state->memtupcount == state->bound);
2167 state->status = TSS_BOUNDED;
2168 }
2169
2170 /*
2171 * Convert the bounded heap to a properly-sorted array
2172 */
2173 static void
sort_bounded_heap(RumTuplesortstate * state)2174 sort_bounded_heap(RumTuplesortstate *state)
2175 {
2176 int tupcount = state->memtupcount;
2177
2178 Assert(state->status == TSS_BOUNDED);
2179 Assert(state->bounded);
2180 Assert(tupcount == state->bound);
2181
2182 /*
2183 * We can unheapify in place because each sift-up will remove the largest
2184 * entry, which we can promptly store in the newly freed slot at the end.
2185 * Once we're down to a single-entry heap, we're done.
2186 */
2187 while (state->memtupcount > 1)
2188 {
2189 SortTuple stup = state->memtuples[0];
2190
2191 /* this sifts-up the next-largest entry and decreases memtupcount */
2192 rum_tuplesort_heap_siftup(state, false);
2193 state->memtuples[state->memtupcount] = stup;
2194 }
2195 state->memtupcount = tupcount;
2196
2197 /*
2198 * Reverse sort direction back to the original state. This is not
2199 * actually necessary but seems like a good idea for tidiness.
2200 */
2201 REVERSEDIRECTION(state);
2202
2203 state->status = TSS_SORTEDINMEM;
2204 state->boundUsed = true;
2205 }
2206
2207 /*
2208 * Insert a new tuple into an empty or existing heap, maintaining the
2209 * heap invariant. Caller is responsible for ensuring there's room.
2210 *
2211 * Note: we assume *tuple is a temporary variable that can be scribbled on.
2212 * For some callers, tuple actually points to a memtuples[] entry above the
2213 * end of the heap. This is safe as long as it's not immediately adjacent
2214 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2215 * is, it might get overwritten before being moved into the heap!
2216 */
2217 static void
rum_tuplesort_heap_insert(RumTuplesortstate * state,SortTuple * tuple,int tupleindex,bool checkIndex)2218 rum_tuplesort_heap_insert(RumTuplesortstate *state, SortTuple *tuple,
2219 int tupleindex, bool checkIndex)
2220 {
2221 SortTuple *memtuples;
2222 int j;
2223
2224 /*
2225 * Save the tupleindex --- see notes above about writing on *tuple. It's a
2226 * historical artifact that tupleindex is passed as a separate argument
2227 * and not in *tuple, but it's notationally convenient so let's leave it
2228 * that way.
2229 */
2230 tuple->tupindex = tupleindex;
2231
2232 memtuples = state->memtuples;
2233 Assert(state->memtupcount < state->memtupsize);
2234
2235 CHECK_FOR_INTERRUPTS();
2236
2237 /*
2238 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2239 * using 1-based array indexes, not 0-based.
2240 */
2241 j = state->memtupcount++;
2242 while (j > 0)
2243 {
2244 int i = (j - 1) >> 1;
2245
2246 if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
2247 break;
2248 memtuples[j] = memtuples[i];
2249 j = i;
2250 }
2251 memtuples[j] = *tuple;
2252 }
2253
2254 /*
2255 * The tuple at state->memtuples[0] has been removed from the heap.
2256 * Decrement memtupcount, and sift up to maintain the heap invariant.
2257 */
2258 static void
rum_tuplesort_heap_siftup(RumTuplesortstate * state,bool checkIndex)2259 rum_tuplesort_heap_siftup(RumTuplesortstate *state, bool checkIndex)
2260 {
2261 SortTuple *memtuples = state->memtuples;
2262 SortTuple *tuple;
2263 int i,
2264 n;
2265
2266 if (--state->memtupcount <= 0)
2267 return;
2268
2269 CHECK_FOR_INTERRUPTS();
2270
2271 n = state->memtupcount;
2272 tuple = &memtuples[n]; /* tuple that must be reinserted */
2273 i = 0; /* i is where the "hole" is */
2274 for (;;)
2275 {
2276 int j = 2 * i + 1;
2277
2278 if (j >= n)
2279 break;
2280 if (j + 1 < n &&
2281 HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
2282 j++;
2283 if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
2284 break;
2285 memtuples[i] = memtuples[j];
2286 i = j;
2287 }
2288 memtuples[i] = *tuple;
2289 }
2290
2291
2292 /*
2293 * Tape interface routines
2294 */
2295
2296 static unsigned int
getlen(RumTuplesortstate * state,int tapenum,bool eofOK)2297 getlen(RumTuplesortstate *state, int tapenum, bool eofOK)
2298 {
2299 unsigned int len;
2300
2301 if (LogicalTapeRead(state->tapeset, tapenum,
2302 &len, sizeof(len)) != sizeof(len))
2303 elog(ERROR, "unexpected end of tape");
2304 if (len == 0 && !eofOK)
2305 elog(ERROR, "unexpected end of data");
2306 return len;
2307 }
2308
2309 static void
markrunend(RumTuplesortstate * state,int tapenum)2310 markrunend(RumTuplesortstate *state, int tapenum)
2311 {
2312 unsigned int len = 0;
2313
2314 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
2315 }
2316
2317
2318 /*
2319 * Convenience routine to free a tuple previously loaded into sort memory
2320 */
2321 static void
free_sort_tuple(RumTuplesortstate * state,SortTuple * stup)2322 free_sort_tuple(RumTuplesortstate *state, SortTuple *stup)
2323 {
2324 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
2325 pfree(stup->tuple);
2326 }
2327
2328 static int
comparetup_rum(const SortTuple * a,const SortTuple * b,RumTuplesortstate * state)2329 comparetup_rum(const SortTuple *a, const SortTuple *b, RumTuplesortstate *state)
2330 {
2331 RumSortItem *i1,
2332 *i2;
2333 float8 v1 = DatumGetFloat8(a->datum1);
2334 float8 v2 = DatumGetFloat8(b->datum1);
2335 int i;
2336
2337 if (v1 < v2)
2338 return -1;
2339 else if (v1 > v2)
2340 return 1;
2341
2342 i1 = (RumSortItem *) a->tuple;
2343 i2 = (RumSortItem *) b->tuple;
2344 for (i = 1; i < state->nKeys; i++)
2345 {
2346 if (i1->data[i] < i2->data[i])
2347 return -1;
2348 else if (i1->data[i] > i2->data[i])
2349 return 1;
2350 }
2351
2352 if (!state->compareItemPointer)
2353 return 0;
2354
2355 /*
2356 * If key values are equal, we sort on ItemPointer.
2357 */
2358 if (i1->iptr.ip_blkid.bi_hi < i2->iptr.ip_blkid.bi_hi)
2359 return -1;
2360 else if (i1->iptr.ip_blkid.bi_hi > i2->iptr.ip_blkid.bi_hi)
2361 return 1;
2362
2363 if (i1->iptr.ip_blkid.bi_lo < i2->iptr.ip_blkid.bi_lo)
2364 return -1;
2365 else if (i1->iptr.ip_blkid.bi_lo > i2->iptr.ip_blkid.bi_lo)
2366 return 1;
2367
2368 if (i1->iptr.ip_posid < i2->iptr.ip_posid)
2369 return -1;
2370 else if (i1->iptr.ip_posid > i2->iptr.ip_posid)
2371 return 1;
2372
2373 return 0;
2374 }
2375
2376 static void
copytup_rum(RumTuplesortstate * state,SortTuple * stup,void * tup)2377 copytup_rum(RumTuplesortstate *state, SortTuple *stup, void *tup)
2378 {
2379 RumSortItem *item = (RumSortItem *) tup;
2380
2381 stup->datum1 = Float8GetDatum(state->nKeys > 0 ? item->data[0] : 0);
2382 stup->isnull1 = false;
2383 stup->tuple = tup;
2384 USEMEM(state, GetMemoryChunkSpace(tup));
2385 }
2386
2387 static void
writetup_rum(RumTuplesortstate * state,int tapenum,SortTuple * stup)2388 writetup_rum(RumTuplesortstate *state, int tapenum, SortTuple *stup)
2389 {
2390 RumSortItem *item = (RumSortItem *) stup->tuple;
2391 unsigned int writtenlen = RumSortItemSize(state->nKeys) + sizeof(unsigned int);
2392
2393
2394 LogicalTapeWrite(state->tapeset, tapenum,
2395 (void *) &writtenlen, sizeof(writtenlen));
2396 LogicalTapeWrite(state->tapeset, tapenum,
2397 (void *) item, RumSortItemSize(state->nKeys));
2398 if (state->randomAccess) /* need trailing length word? */
2399 LogicalTapeWrite(state->tapeset, tapenum,
2400 (void *) &writtenlen, sizeof(writtenlen));
2401
2402 FREEMEM(state, GetMemoryChunkSpace(item));
2403 pfree(item);
2404 }
2405
2406 static void
readtup_rum(RumTuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)2407 readtup_rum(RumTuplesortstate *state, SortTuple *stup,
2408 int tapenum, unsigned int len)
2409 {
2410 unsigned int tuplen = len - sizeof(unsigned int);
2411 RumSortItem *item = (RumSortItem *) palloc(RumSortItemSize(state->nKeys));
2412
2413 Assert(tuplen == RumSortItemSize(state->nKeys));
2414
2415 USEMEM(state, GetMemoryChunkSpace(item));
2416 LogicalTapeReadExact(state->tapeset, tapenum,
2417 (void *) item, RumSortItemSize(state->nKeys));
2418 stup->datum1 = Float8GetDatum(state->nKeys > 0 ? item->data[0] : 0);
2419 stup->isnull1 = false;
2420 stup->tuple = item;
2421
2422 if (state->randomAccess) /* need trailing length word? */
2423 LogicalTapeReadExact(state->tapeset, tapenum,
2424 &tuplen, sizeof(tuplen));
2425 }
2426
2427 static void
reversedirection_rum(RumTuplesortstate * state)2428 reversedirection_rum(RumTuplesortstate *state)
2429 {
2430 state->reverse = !state->reverse;
2431 }
2432
2433 static int
comparetup_rumitem(const SortTuple * a,const SortTuple * b,RumTuplesortstate * state)2434 comparetup_rumitem(const SortTuple *a, const SortTuple *b, RumTuplesortstate *state)
2435 {
2436 RumItem *i1, *i2;
2437
2438 /* Extract RumItem from RumScanItem */
2439 i1 = (RumItem *) a->tuple;
2440 i2 = (RumItem *) b->tuple;
2441
2442 if (state->cmp)
2443 {
2444 if (i1->addInfoIsNull || i2->addInfoIsNull)
2445 {
2446 if (!(i1->addInfoIsNull && i2->addInfoIsNull))
2447 return (i1->addInfoIsNull) ? 1 : -1;
2448 /* go to itempointer compare */
2449 }
2450 else
2451 {
2452 int r;
2453
2454 r = DatumGetInt32(FunctionCall2(state->cmp,
2455 i1->addInfo,
2456 i2->addInfo));
2457
2458 if (r != 0)
2459 return r;
2460 }
2461 }
2462
2463 /*
2464 * If key values are equal, we sort on ItemPointer.
2465 */
2466 if (i1->iptr.ip_blkid.bi_hi < i2->iptr.ip_blkid.bi_hi)
2467 return -1;
2468 else if (i1->iptr.ip_blkid.bi_hi > i2->iptr.ip_blkid.bi_hi)
2469 return 1;
2470
2471 if (i1->iptr.ip_blkid.bi_lo < i2->iptr.ip_blkid.bi_lo)
2472 return -1;
2473 else if (i1->iptr.ip_blkid.bi_lo > i2->iptr.ip_blkid.bi_lo)
2474 return 1;
2475
2476 if (i1->iptr.ip_posid < i2->iptr.ip_posid)
2477 return -1;
2478 else if (i1->iptr.ip_posid > i2->iptr.ip_posid)
2479 return 1;
2480
2481 return 0;
2482 }
2483
2484 static void
copytup_rumitem(RumTuplesortstate * state,SortTuple * stup,void * tup)2485 copytup_rumitem(RumTuplesortstate *state, SortTuple *stup, void *tup)
2486 {
2487 stup->isnull1 = true;
2488 stup->tuple = palloc(sizeof(RumScanItem));
2489 memcpy(stup->tuple, tup, sizeof(RumScanItem));
2490 USEMEM(state, GetMemoryChunkSpace(stup->tuple));
2491 }
2492
2493 static void
writetup_rumitem(RumTuplesortstate * state,int tapenum,SortTuple * stup)2494 writetup_rumitem(RumTuplesortstate *state, int tapenum, SortTuple *stup)
2495 {
2496 RumScanItem *item = (RumScanItem *) stup->tuple;
2497 unsigned int writtenlen = sizeof(*item) + sizeof(unsigned int);
2498
2499 LogicalTapeWrite(state->tapeset, tapenum,
2500 (void *) &writtenlen, sizeof(writtenlen));
2501 LogicalTapeWrite(state->tapeset, tapenum,
2502 (void *) item, sizeof(*item));
2503 if (state->randomAccess) /* need trailing length word? */
2504 LogicalTapeWrite(state->tapeset, tapenum,
2505 (void *) &writtenlen, sizeof(writtenlen));
2506
2507 FREEMEM(state, GetMemoryChunkSpace(item));
2508 pfree(item);
2509 }
2510
2511 static void
readtup_rumitem(RumTuplesortstate * state,SortTuple * stup,int tapenum,unsigned int len)2512 readtup_rumitem(RumTuplesortstate *state, SortTuple *stup,
2513 int tapenum, unsigned int len)
2514 {
2515 unsigned int tuplen = len - sizeof(unsigned int);
2516 RumScanItem *item = (RumScanItem *) palloc(sizeof(RumScanItem));
2517
2518 Assert(tuplen == sizeof(RumScanItem));
2519
2520 USEMEM(state, GetMemoryChunkSpace(item));
2521 LogicalTapeReadExact(state->tapeset, tapenum,
2522 (void *) item, tuplen);
2523 stup->isnull1 = true;
2524 stup->tuple = item;
2525
2526 if (state->randomAccess) /* need trailing length word? */
2527 LogicalTapeReadExact(state->tapeset, tapenum,
2528 &tuplen, sizeof(tuplen));
2529 }
2530
2531