1 /*-------------------------------------------------------------------------
2 *
3 * logtape.c
4 * Management of "logical tapes" within temporary files.
5 *
6 * This module exists to support sorting via multiple merge passes (see
7 * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 * we implement it on disk by creating a separate file for each "tape",
9 * there is an annoying problem: the peak space usage is at least twice
10 * the volume of actual data to be sorted. (This must be so because each
11 * datum will appear in both the input and output tapes of the final
12 * merge pass. For seven-tape polyphase merge, which is otherwise a
13 * pretty good algorithm, peak usage is more like 4x actual data volume.)
14 *
15 * We can work around this problem by recognizing that any one tape
16 * dataset (with the possible exception of the final output) is written
17 * and read exactly once in a perfectly sequential manner. Therefore,
18 * a datum once read will not be required again, and we can recycle its
19 * space for use by the new tape dataset(s) being generated. In this way,
20 * the total space usage is essentially just the actual data volume, plus
21 * insignificant bookkeeping and start/stop overhead.
22 *
23 * Few OSes allow arbitrary parts of a file to be released back to the OS,
24 * so we have to implement this space-recycling ourselves within a single
25 * logical file. logtape.c exists to perform this bookkeeping and provide
26 * the illusion of N independent tape devices to tuplesort.c. Note that
27 * logtape.c itself depends on buffile.c to provide a "logical file" of
28 * larger size than the underlying OS may support.
29 *
30 * For simplicity, we allocate and release space in the underlying file
31 * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32 * of which blocks in the underlying file belong to which logical tape,
33 * plus any blocks that are free (recycled and not yet reused).
34 * The blocks in each logical tape form a chain, with a prev- and next-
35 * pointer in each block.
36 *
37 * The initial write pass is guaranteed to fill the underlying file
38 * perfectly sequentially, no matter how data is divided into logical tapes.
39 * Once we begin merge passes, the access pattern becomes considerably
40 * less predictable --- but the seeking involved should be comparable to
41 * what would happen if we kept each logical tape in a separate file,
42 * so there's no serious performance penalty paid to obtain the space
43 * savings of recycling. We try to localize the write accesses by always
44 * writing to the lowest-numbered free block when we have a choice; it's
45 * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46 * policy for free blocks would be better?)
47 *
48 * To further make the I/Os more sequential, we can use a larger buffer
49 * when reading, and read multiple blocks from the same tape in one go,
50 * whenever the buffer becomes empty.
51 *
52 * To support the above policy of writing to the lowest free block, the
53 * freelist is a min heap.
54 *
55 * Since all the bookkeeping and buffer memory is allocated with palloc(),
56 * and the underlying file(s) are made with OpenTemporaryFile, all resources
57 * for a logical tape set are certain to be cleaned up even if processing
58 * is aborted by ereport(ERROR). To avoid confusion, the caller should take
59 * care that all calls for a single LogicalTapeSet are made in the same
60 * palloc context.
61 *
62 * To support parallel sort operations involving coordinated callers to
63 * tuplesort.c routines across multiple workers, it is necessary to
64 * concatenate each worker BufFile/tapeset into one single logical tapeset
65 * managed by the leader. Workers should have produced one final
66 * materialized tape (their entire output) when this happens in leader.
67 * There will always be the same number of runs as input tapes, and the same
68 * number of input tapes as participants (worker Tuplesortstates).
69 *
70 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
71 * Portions Copyright (c) 1994, Regents of the University of California
72 *
73 * IDENTIFICATION
74 * src/backend/utils/sort/logtape.c
75 *
76 *-------------------------------------------------------------------------
77 */
78
79 #include "postgres.h"
80
81 #include "storage/buffile.h"
82 #include "utils/builtins.h"
83 #include "utils/logtape.h"
84 #include "utils/memdebug.h"
85 #include "utils/memutils.h"
86
87 /*
88 * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
89 *
90 * The first block of a tape has prev == -1. The last block of a tape
91 * stores the number of valid bytes on the block, inverted, in 'next'
92 * Therefore next < 0 indicates the last block.
93 */
94 typedef struct TapeBlockTrailer
95 {
96 long prev; /* previous block on this tape, or -1 on first
97 * block */
98 long next; /* next block on this tape, or # of valid
99 * bytes on last block (if < 0) */
100 } TapeBlockTrailer;
101
102 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
103 #define TapeBlockGetTrailer(buf) \
104 ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
105
106 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
107 #define TapeBlockGetNBytes(buf) \
108 (TapeBlockIsLast(buf) ? \
109 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
110 #define TapeBlockSetNBytes(buf, nbytes) \
111 (TapeBlockGetTrailer(buf)->next = -(nbytes))
112
113 /*
114 * When multiple tapes are being written to concurrently (as in HashAgg),
115 * avoid excessive fragmentation by preallocating block numbers to individual
116 * tapes. Each preallocation doubles in size starting at
117 * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
118 *
119 * No filesystem operations are performed for preallocation; only the block
120 * numbers are reserved. This may lead to sparse writes, which will cause
121 * ltsWriteBlock() to fill in holes with zeros.
122 */
123 #define TAPE_WRITE_PREALLOC_MIN 8
124 #define TAPE_WRITE_PREALLOC_MAX 128
125
126 /*
127 * This data structure represents a single "logical tape" within the set
128 * of logical tapes stored in the same file.
129 *
130 * While writing, we hold the current partially-written data block in the
131 * buffer. While reading, we can hold multiple blocks in the buffer. Note
132 * that we don't retain the trailers of a block when it's read into the
133 * buffer. The buffer therefore contains one large contiguous chunk of data
134 * from the tape.
135 */
136 typedef struct LogicalTape
137 {
138 bool writing; /* T while in write phase */
139 bool frozen; /* T if blocks should not be freed when read */
140 bool dirty; /* does buffer need to be written? */
141
142 /*
143 * Block numbers of the first, current, and next block of the tape.
144 *
145 * The "current" block number is only valid when writing, or reading from
146 * a frozen tape. (When reading from an unfrozen tape, we use a larger
147 * read buffer that holds multiple blocks, so the "current" block is
148 * ambiguous.)
149 *
150 * When concatenation of worker tape BufFiles is performed, an offset to
151 * the first block in the unified BufFile space is applied during reads.
152 */
153 long firstBlockNumber;
154 long curBlockNumber;
155 long nextBlockNumber;
156 long offsetBlockNumber;
157
158 /*
159 * Buffer for current data block(s).
160 */
161 char *buffer; /* physical buffer (separately palloc'd) */
162 int buffer_size; /* allocated size of the buffer */
163 int max_size; /* highest useful, safe buffer_size */
164 int pos; /* next read/write position in buffer */
165 int nbytes; /* total # of valid bytes in buffer */
166
167 /*
168 * Preallocated block numbers are held in an array sorted in descending
169 * order; blocks are consumed from the end of the array (lowest block
170 * numbers first).
171 */
172 long *prealloc;
173 int nprealloc; /* number of elements in list */
174 int prealloc_size; /* number of elements list can hold */
175 } LogicalTape;
176
177 /*
178 * This data structure represents a set of related "logical tapes" sharing
179 * space in a single underlying file. (But that "file" may be multiple files
180 * if needed to escape OS limits on file size; buffile.c handles that for us.)
181 * The number of tapes is fixed at creation.
182 */
183 struct LogicalTapeSet
184 {
185 BufFile *pfile; /* underlying file for whole tape set */
186
187 /*
188 * File size tracking. nBlocksWritten is the size of the underlying file,
189 * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
190 * by ltsReleaseBlock(), and it is always greater than or equal to
191 * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
192 * blocks that have been allocated for a tape, but have not been written
193 * to the underlying file yet. nHoleBlocks tracks the total number of
194 * blocks that are in unused holes between worker spaces following BufFile
195 * concatenation.
196 */
197 long nBlocksAllocated; /* # of blocks allocated */
198 long nBlocksWritten; /* # of blocks used in underlying file */
199 long nHoleBlocks; /* # of "hole" blocks left */
200
201 /*
202 * We store the numbers of recycled-and-available blocks in freeBlocks[].
203 * When there are no such blocks, we extend the underlying file.
204 *
205 * If forgetFreeSpace is true then any freed blocks are simply forgotten
206 * rather than being remembered in freeBlocks[]. See notes for
207 * LogicalTapeSetForgetFreeSpace().
208 */
209 bool forgetFreeSpace; /* are we remembering free blocks? */
210 long *freeBlocks; /* resizable array holding minheap */
211 long nFreeBlocks; /* # of currently free blocks */
212 Size freeBlocksLen; /* current allocated length of freeBlocks[] */
213 bool enable_prealloc; /* preallocate write blocks? */
214
215 /* The array of logical tapes. */
216 int nTapes; /* # of logical tapes in set */
217 LogicalTape *tapes; /* has nTapes nentries */
218 };
219
220 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
221 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
222 static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
223 static long ltsGetFreeBlock(LogicalTapeSet *lts);
224 static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
225 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
226 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
227 SharedFileSet *fileset);
228 static void ltsInitTape(LogicalTape *lt);
229 static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
230
231
232 /*
233 * Write a block-sized buffer to the specified block of the underlying file.
234 *
235 * No need for an error return convention; we ereport() on any error.
236 */
237 static void
ltsWriteBlock(LogicalTapeSet * lts,long blocknum,void * buffer)238 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
239 {
240 /*
241 * BufFile does not support "holes", so if we're about to write a block
242 * that's past the current end of file, fill the space between the current
243 * end of file and the target block with zeros.
244 *
245 * This can happen either when tapes preallocate blocks; or for the last
246 * block of a tape which might not have been flushed.
247 *
248 * Note that BufFile concatenation can leave "holes" in BufFile between
249 * worker-owned block ranges. These are tracked for reporting purposes
250 * only. We never read from nor write to these hole blocks, and so they
251 * are not considered here.
252 */
253 while (blocknum > lts->nBlocksWritten)
254 {
255 PGAlignedBlock zerobuf;
256
257 MemSet(zerobuf.data, 0, sizeof(zerobuf));
258
259 ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
260 }
261
262 /* Write the requested block */
263 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
264 ereport(ERROR,
265 (errcode_for_file_access(),
266 errmsg("could not seek to block %ld of temporary file",
267 blocknum)));
268 BufFileWrite(lts->pfile, buffer, BLCKSZ);
269
270 /* Update nBlocksWritten, if we extended the file */
271 if (blocknum == lts->nBlocksWritten)
272 lts->nBlocksWritten++;
273 }
274
275 /*
276 * Read a block-sized buffer from the specified block of the underlying file.
277 *
278 * No need for an error return convention; we ereport() on any error. This
279 * module should never attempt to read a block it doesn't know is there.
280 */
281 static void
ltsReadBlock(LogicalTapeSet * lts,long blocknum,void * buffer)282 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
283 {
284 size_t nread;
285
286 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
287 ereport(ERROR,
288 (errcode_for_file_access(),
289 errmsg("could not seek to block %ld of temporary file",
290 blocknum)));
291 nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
292 if (nread != BLCKSZ)
293 ereport(ERROR,
294 (errcode_for_file_access(),
295 errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
296 blocknum, nread, (size_t) BLCKSZ)));
297 }
298
299 /*
300 * Read as many blocks as we can into the per-tape buffer.
301 *
302 * Returns true if anything was read, 'false' on EOF.
303 */
304 static bool
ltsReadFillBuffer(LogicalTapeSet * lts,LogicalTape * lt)305 ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
306 {
307 lt->pos = 0;
308 lt->nbytes = 0;
309
310 do
311 {
312 char *thisbuf = lt->buffer + lt->nbytes;
313 long datablocknum = lt->nextBlockNumber;
314
315 /* Fetch next block number */
316 if (datablocknum == -1L)
317 break; /* EOF */
318 /* Apply worker offset, needed for leader tapesets */
319 datablocknum += lt->offsetBlockNumber;
320
321 /* Read the block */
322 ltsReadBlock(lts, datablocknum, (void *) thisbuf);
323 if (!lt->frozen)
324 ltsReleaseBlock(lts, datablocknum);
325 lt->curBlockNumber = lt->nextBlockNumber;
326
327 lt->nbytes += TapeBlockGetNBytes(thisbuf);
328 if (TapeBlockIsLast(thisbuf))
329 {
330 lt->nextBlockNumber = -1L;
331 /* EOF */
332 break;
333 }
334 else
335 lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
336
337 /* Advance to next block, if we have buffer space left */
338 } while (lt->buffer_size - lt->nbytes > BLCKSZ);
339
340 return (lt->nbytes > 0);
341 }
342
343 static inline void
swap_nodes(long * heap,unsigned long a,unsigned long b)344 swap_nodes(long *heap, unsigned long a, unsigned long b)
345 {
346 unsigned long swap;
347
348 swap = heap[a];
349 heap[a] = heap[b];
350 heap[b] = swap;
351 }
352
353 static inline unsigned long
left_offset(unsigned long i)354 left_offset(unsigned long i)
355 {
356 return 2 * i + 1;
357 }
358
359 static inline unsigned long
right_offset(unsigned i)360 right_offset(unsigned i)
361 {
362 return 2 * i + 2;
363 }
364
365 static inline unsigned long
parent_offset(unsigned long i)366 parent_offset(unsigned long i)
367 {
368 return (i - 1) / 2;
369 }
370
371 /*
372 * Get the next block for writing.
373 */
374 static long
ltsGetBlock(LogicalTapeSet * lts,LogicalTape * lt)375 ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
376 {
377 if (lts->enable_prealloc)
378 return ltsGetPreallocBlock(lts, lt);
379 else
380 return ltsGetFreeBlock(lts);
381 }
382
383 /*
384 * Select the lowest currently unused block from the tape set's global free
385 * list min heap.
386 */
387 static long
ltsGetFreeBlock(LogicalTapeSet * lts)388 ltsGetFreeBlock(LogicalTapeSet *lts)
389 {
390 long *heap = lts->freeBlocks;
391 long blocknum;
392 int heapsize;
393 unsigned long pos;
394
395 /* freelist empty; allocate a new block */
396 if (lts->nFreeBlocks == 0)
397 return lts->nBlocksAllocated++;
398
399 if (lts->nFreeBlocks == 1)
400 {
401 lts->nFreeBlocks--;
402 return lts->freeBlocks[0];
403 }
404
405 /* take top of minheap */
406 blocknum = heap[0];
407
408 /* replace with end of minheap array */
409 heap[0] = heap[--lts->nFreeBlocks];
410
411 /* sift down */
412 pos = 0;
413 heapsize = lts->nFreeBlocks;
414 while (true)
415 {
416 unsigned long left = left_offset(pos);
417 unsigned long right = right_offset(pos);
418 unsigned long min_child;
419
420 if (left < heapsize && right < heapsize)
421 min_child = (heap[left] < heap[right]) ? left : right;
422 else if (left < heapsize)
423 min_child = left;
424 else if (right < heapsize)
425 min_child = right;
426 else
427 break;
428
429 if (heap[min_child] >= heap[pos])
430 break;
431
432 swap_nodes(heap, min_child, pos);
433 pos = min_child;
434 }
435
436 return blocknum;
437 }
438
439 /*
440 * Return the lowest free block number from the tape's preallocation list.
441 * Refill the preallocation list with blocks from the tape set's free list if
442 * necessary.
443 */
444 static long
ltsGetPreallocBlock(LogicalTapeSet * lts,LogicalTape * lt)445 ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
446 {
447 /* sorted in descending order, so return the last element */
448 if (lt->nprealloc > 0)
449 return lt->prealloc[--lt->nprealloc];
450
451 if (lt->prealloc == NULL)
452 {
453 lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
454 lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
455 }
456 else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
457 {
458 /* when the preallocation list runs out, double the size */
459 lt->prealloc_size *= 2;
460 if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
461 lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
462 lt->prealloc = (long *) repalloc(lt->prealloc,
463 sizeof(long) * lt->prealloc_size);
464 }
465
466 /* refill preallocation list */
467 lt->nprealloc = lt->prealloc_size;
468 for (int i = lt->nprealloc; i > 0; i--)
469 {
470 lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
471
472 /* verify descending order */
473 Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
474 }
475
476 return lt->prealloc[--lt->nprealloc];
477 }
478
479 /*
480 * Return a block# to the freelist.
481 */
482 static void
ltsReleaseBlock(LogicalTapeSet * lts,long blocknum)483 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
484 {
485 long *heap;
486 unsigned long pos;
487
488 /*
489 * Do nothing if we're no longer interested in remembering free space.
490 */
491 if (lts->forgetFreeSpace)
492 return;
493
494 /*
495 * Enlarge freeBlocks array if full.
496 */
497 if (lts->nFreeBlocks >= lts->freeBlocksLen)
498 {
499 /*
500 * If the freelist becomes very large, just return and leak this free
501 * block.
502 */
503 if (lts->freeBlocksLen * 2 * sizeof(long) > MaxAllocSize)
504 return;
505
506 lts->freeBlocksLen *= 2;
507 lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
508 lts->freeBlocksLen * sizeof(long));
509 }
510
511 heap = lts->freeBlocks;
512 pos = lts->nFreeBlocks;
513
514 /* place entry at end of minheap array */
515 heap[pos] = blocknum;
516 lts->nFreeBlocks++;
517
518 /* sift up */
519 while (pos != 0)
520 {
521 unsigned long parent = parent_offset(pos);
522
523 if (heap[parent] < heap[pos])
524 break;
525
526 swap_nodes(heap, parent, pos);
527 pos = parent;
528 }
529 }
530
531 /*
532 * Claim ownership of a set of logical tapes from existing shared BufFiles.
533 *
534 * Caller should be leader process. Though tapes are marked as frozen in
535 * workers, they are not frozen when opened within leader, since unfrozen tapes
536 * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
537 * for random access.)
538 */
539 static void
ltsConcatWorkerTapes(LogicalTapeSet * lts,TapeShare * shared,SharedFileSet * fileset)540 ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
541 SharedFileSet *fileset)
542 {
543 LogicalTape *lt = NULL;
544 long tapeblocks = 0L;
545 long nphysicalblocks = 0L;
546 int i;
547
548 /* Should have at least one worker tape, plus leader's tape */
549 Assert(lts->nTapes >= 2);
550
551 /*
552 * Build concatenated view of all BufFiles, remembering the block number
553 * where each source file begins. No changes are needed for leader/last
554 * tape.
555 */
556 for (i = 0; i < lts->nTapes - 1; i++)
557 {
558 char filename[MAXPGPATH];
559 BufFile *file;
560 int64 filesize;
561
562 lt = <s->tapes[i];
563
564 pg_itoa(i, filename);
565 file = BufFileOpenShared(fileset, filename);
566 filesize = BufFileSize(file);
567
568 /*
569 * Stash first BufFile, and concatenate subsequent BufFiles to that.
570 * Store block offset into each tape as we go.
571 */
572 lt->firstBlockNumber = shared[i].firstblocknumber;
573 if (i == 0)
574 {
575 lts->pfile = file;
576 lt->offsetBlockNumber = 0L;
577 }
578 else
579 {
580 lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
581 }
582 /* Don't allocate more for read buffer than could possibly help */
583 lt->max_size = Min(MaxAllocSize, filesize);
584 tapeblocks = filesize / BLCKSZ;
585 nphysicalblocks += tapeblocks;
586 }
587
588 /*
589 * Set # of allocated blocks, as well as # blocks written. Use extent of
590 * new BufFile space (from 0 to end of last worker's tape space) for this.
591 * Allocated/written blocks should include space used by holes left
592 * between concatenated BufFiles.
593 */
594 lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
595 lts->nBlocksWritten = lts->nBlocksAllocated;
596
597 /*
598 * Compute number of hole blocks so that we can later work backwards, and
599 * instrument number of physical blocks. We don't simply use physical
600 * blocks directly for instrumentation because this would break if we ever
601 * subsequently wrote to the leader tape.
602 *
603 * Working backwards like this keeps our options open. If shared BufFiles
604 * ever support being written to post-export, logtape.c can automatically
605 * take advantage of that. We'd then support writing to the leader tape
606 * while recycling space from worker tapes, because the leader tape has a
607 * zero offset (write routines won't need to have extra logic to apply an
608 * offset).
609 *
610 * The only thing that currently prevents writing to the leader tape from
611 * working is the fact that BufFiles opened using BufFileOpenShared() are
612 * read-only by definition, but that could be changed if it seemed
613 * worthwhile. For now, writing to the leader tape will raise a "Bad file
614 * descriptor" error, so tuplesort must avoid writing to the leader tape
615 * altogether.
616 */
617 lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
618 }
619
620 /*
621 * Initialize per-tape struct. Note we allocate the I/O buffer lazily.
622 */
623 static void
ltsInitTape(LogicalTape * lt)624 ltsInitTape(LogicalTape *lt)
625 {
626 lt->writing = true;
627 lt->frozen = false;
628 lt->dirty = false;
629 lt->firstBlockNumber = -1L;
630 lt->curBlockNumber = -1L;
631 lt->nextBlockNumber = -1L;
632 lt->offsetBlockNumber = 0L;
633 lt->buffer = NULL;
634 lt->buffer_size = 0;
635 /* palloc() larger than MaxAllocSize would fail */
636 lt->max_size = MaxAllocSize;
637 lt->pos = 0;
638 lt->nbytes = 0;
639 lt->prealloc = NULL;
640 lt->nprealloc = 0;
641 lt->prealloc_size = 0;
642 }
643
644 /*
645 * Lazily allocate and initialize the read buffer. This avoids waste when many
646 * tapes are open at once, but not all are active between rewinding and
647 * reading.
648 */
649 static void
ltsInitReadBuffer(LogicalTapeSet * lts,LogicalTape * lt)650 ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
651 {
652 Assert(lt->buffer_size > 0);
653 lt->buffer = palloc(lt->buffer_size);
654
655 /* Read the first block, or reset if tape is empty */
656 lt->nextBlockNumber = lt->firstBlockNumber;
657 lt->pos = 0;
658 lt->nbytes = 0;
659 ltsReadFillBuffer(lts, lt);
660 }
661
662 /*
663 * Create a set of logical tapes in a temporary underlying file.
664 *
665 * Each tape is initialized in write state. Serial callers pass ntapes,
666 * NULL argument for shared, and -1 for worker. Parallel worker callers
667 * pass ntapes, a shared file handle, NULL shared argument, and their own
668 * worker number. Leader callers, which claim shared worker tapes here,
669 * must supply non-sentinel values for all arguments except worker number,
670 * which should be -1.
671 *
672 * Leader caller is passing back an array of metadata each worker captured
673 * when LogicalTapeFreeze() was called for their final result tapes. Passed
674 * tapes array is actually sized ntapes - 1, because it includes only
675 * worker tapes, whereas leader requires its own leader tape. Note that we
676 * rely on the assumption that reclaimed worker tapes will only be read
677 * from once by leader, and never written to again (tapes are initialized
678 * for writing, but that's only to be consistent). Leader may not write to
679 * its own tape purely due to a restriction in the shared buffile
680 * infrastructure that may be lifted in the future.
681 */
682 LogicalTapeSet *
LogicalTapeSetCreate(int ntapes,bool preallocate,TapeShare * shared,SharedFileSet * fileset,int worker)683 LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
684 SharedFileSet *fileset, int worker)
685 {
686 LogicalTapeSet *lts;
687 int i;
688
689 /*
690 * Create top-level struct including per-tape LogicalTape structs.
691 */
692 Assert(ntapes > 0);
693 lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
694 lts->nBlocksAllocated = 0L;
695 lts->nBlocksWritten = 0L;
696 lts->nHoleBlocks = 0L;
697 lts->forgetFreeSpace = false;
698 lts->freeBlocksLen = 32; /* reasonable initial guess */
699 lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
700 lts->nFreeBlocks = 0;
701 lts->enable_prealloc = preallocate;
702 lts->nTapes = ntapes;
703 lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
704
705 for (i = 0; i < ntapes; i++)
706 ltsInitTape(<s->tapes[i]);
707
708 /*
709 * Create temp BufFile storage as required.
710 *
711 * Leader concatenates worker tapes, which requires special adjustment to
712 * final tapeset data. Things are simpler for the worker case and the
713 * serial case, though. They are generally very similar -- workers use a
714 * shared fileset, whereas serial sorts use a conventional serial BufFile.
715 */
716 if (shared)
717 ltsConcatWorkerTapes(lts, shared, fileset);
718 else if (fileset)
719 {
720 char filename[MAXPGPATH];
721
722 pg_itoa(worker, filename);
723 lts->pfile = BufFileCreateShared(fileset, filename);
724 }
725 else
726 lts->pfile = BufFileCreateTemp(false);
727
728 return lts;
729 }
730
731 /*
732 * Close a logical tape set and release all resources.
733 */
734 void
LogicalTapeSetClose(LogicalTapeSet * lts)735 LogicalTapeSetClose(LogicalTapeSet *lts)
736 {
737 LogicalTape *lt;
738 int i;
739
740 BufFileClose(lts->pfile);
741 for (i = 0; i < lts->nTapes; i++)
742 {
743 lt = <s->tapes[i];
744 if (lt->buffer)
745 pfree(lt->buffer);
746 }
747 pfree(lts->tapes);
748 pfree(lts->freeBlocks);
749 pfree(lts);
750 }
751
752 /*
753 * Mark a logical tape set as not needing management of free space anymore.
754 *
755 * This should be called if the caller does not intend to write any more data
756 * into the tape set, but is reading from un-frozen tapes. Since no more
757 * writes are planned, remembering free blocks is no longer useful. Setting
758 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
759 * is not designed to handle large numbers of free blocks.
760 */
761 void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet * lts)762 LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
763 {
764 lts->forgetFreeSpace = true;
765 }
766
767 /*
768 * Write to a logical tape.
769 *
770 * There are no error returns; we ereport() on failure.
771 */
772 void
LogicalTapeWrite(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)773 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
774 void *ptr, size_t size)
775 {
776 LogicalTape *lt;
777 size_t nthistime;
778
779 Assert(tapenum >= 0 && tapenum < lts->nTapes);
780 lt = <s->tapes[tapenum];
781 Assert(lt->writing);
782 Assert(lt->offsetBlockNumber == 0L);
783
784 /* Allocate data buffer and first block on first write */
785 if (lt->buffer == NULL)
786 {
787 lt->buffer = (char *) palloc(BLCKSZ);
788 lt->buffer_size = BLCKSZ;
789 }
790 if (lt->curBlockNumber == -1)
791 {
792 Assert(lt->firstBlockNumber == -1);
793 Assert(lt->pos == 0);
794
795 lt->curBlockNumber = ltsGetBlock(lts, lt);
796 lt->firstBlockNumber = lt->curBlockNumber;
797
798 TapeBlockGetTrailer(lt->buffer)->prev = -1L;
799 }
800
801 Assert(lt->buffer_size == BLCKSZ);
802 while (size > 0)
803 {
804 if (lt->pos >= (int) TapeBlockPayloadSize)
805 {
806 /* Buffer full, dump it out */
807 long nextBlockNumber;
808
809 if (!lt->dirty)
810 {
811 /* Hmm, went directly from reading to writing? */
812 elog(ERROR, "invalid logtape state: should be dirty");
813 }
814
815 /*
816 * First allocate the next block, so that we can store it in the
817 * 'next' pointer of this block.
818 */
819 nextBlockNumber = ltsGetBlock(lts, lt);
820
821 /* set the next-pointer and dump the current block. */
822 TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
823 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
824
825 /* initialize the prev-pointer of the next block */
826 TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
827 lt->curBlockNumber = nextBlockNumber;
828 lt->pos = 0;
829 lt->nbytes = 0;
830 }
831
832 nthistime = TapeBlockPayloadSize - lt->pos;
833 if (nthistime > size)
834 nthistime = size;
835 Assert(nthistime > 0);
836
837 memcpy(lt->buffer + lt->pos, ptr, nthistime);
838
839 lt->dirty = true;
840 lt->pos += nthistime;
841 if (lt->nbytes < lt->pos)
842 lt->nbytes = lt->pos;
843 ptr = (void *) ((char *) ptr + nthistime);
844 size -= nthistime;
845 }
846 }
847
848 /*
849 * Rewind logical tape and switch from writing to reading.
850 *
851 * The tape must currently be in writing state, or "frozen" in read state.
852 *
853 * 'buffer_size' specifies how much memory to use for the read buffer.
854 * Regardless of the argument, the actual amount of memory used is between
855 * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
856 * rounded down and truncated to fit those constraints, if necessary. If the
857 * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
858 * byte buffer is used.
859 */
860 void
LogicalTapeRewindForRead(LogicalTapeSet * lts,int tapenum,size_t buffer_size)861 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
862 {
863 LogicalTape *lt;
864
865 Assert(tapenum >= 0 && tapenum < lts->nTapes);
866 lt = <s->tapes[tapenum];
867
868 /*
869 * Round and cap buffer_size if needed.
870 */
871 if (lt->frozen)
872 buffer_size = BLCKSZ;
873 else
874 {
875 /* need at least one block */
876 if (buffer_size < BLCKSZ)
877 buffer_size = BLCKSZ;
878
879 /* palloc() larger than max_size is unlikely to be helpful */
880 if (buffer_size > lt->max_size)
881 buffer_size = lt->max_size;
882
883 /* round down to BLCKSZ boundary */
884 buffer_size -= buffer_size % BLCKSZ;
885 }
886
887 if (lt->writing)
888 {
889 /*
890 * Completion of a write phase. Flush last partial data block, and
891 * rewind for normal (destructive) read.
892 */
893 if (lt->dirty)
894 {
895 /*
896 * As long as we've filled the buffer at least once, its contents
897 * are entirely defined from valgrind's point of view, even though
898 * contents beyond the current end point may be stale. But it's
899 * possible - at least in the case of a parallel sort - to sort
900 * such small amount of data that we do not fill the buffer even
901 * once. Tell valgrind that its contents are defined, so it
902 * doesn't bleat.
903 */
904 VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
905 lt->buffer_size - lt->nbytes);
906
907 TapeBlockSetNBytes(lt->buffer, lt->nbytes);
908 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
909 }
910 lt->writing = false;
911 }
912 else
913 {
914 /*
915 * This is only OK if tape is frozen; we rewind for (another) read
916 * pass.
917 */
918 Assert(lt->frozen);
919 }
920
921 if (lt->buffer)
922 pfree(lt->buffer);
923
924 /* the buffer is lazily allocated, but set the size here */
925 lt->buffer = NULL;
926 lt->buffer_size = buffer_size;
927
928 /* free the preallocation list, and return unused block numbers */
929 if (lt->prealloc != NULL)
930 {
931 for (int i = lt->nprealloc; i > 0; i--)
932 ltsReleaseBlock(lts, lt->prealloc[i - 1]);
933 pfree(lt->prealloc);
934 lt->prealloc = NULL;
935 lt->nprealloc = 0;
936 lt->prealloc_size = 0;
937 }
938 }
939
940 /*
941 * Rewind logical tape and switch from reading to writing.
942 *
943 * NOTE: we assume the caller has read the tape to the end; otherwise
944 * untouched data will not have been freed. We could add more code to free
945 * any unread blocks, but in current usage of this module it'd be useless
946 * code.
947 */
948 void
LogicalTapeRewindForWrite(LogicalTapeSet * lts,int tapenum)949 LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
950 {
951 LogicalTape *lt;
952
953 Assert(tapenum >= 0 && tapenum < lts->nTapes);
954 lt = <s->tapes[tapenum];
955
956 Assert(!lt->writing && !lt->frozen);
957 lt->writing = true;
958 lt->dirty = false;
959 lt->firstBlockNumber = -1L;
960 lt->curBlockNumber = -1L;
961 lt->pos = 0;
962 lt->nbytes = 0;
963 if (lt->buffer)
964 pfree(lt->buffer);
965 lt->buffer = NULL;
966 lt->buffer_size = 0;
967 }
968
969 /*
970 * Read from a logical tape.
971 *
972 * Early EOF is indicated by return value less than #bytes requested.
973 */
974 size_t
LogicalTapeRead(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)975 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
976 void *ptr, size_t size)
977 {
978 LogicalTape *lt;
979 size_t nread = 0;
980 size_t nthistime;
981
982 Assert(tapenum >= 0 && tapenum < lts->nTapes);
983 lt = <s->tapes[tapenum];
984 Assert(!lt->writing);
985
986 if (lt->buffer == NULL)
987 ltsInitReadBuffer(lts, lt);
988
989 while (size > 0)
990 {
991 if (lt->pos >= lt->nbytes)
992 {
993 /* Try to load more data into buffer. */
994 if (!ltsReadFillBuffer(lts, lt))
995 break; /* EOF */
996 }
997
998 nthistime = lt->nbytes - lt->pos;
999 if (nthistime > size)
1000 nthistime = size;
1001 Assert(nthistime > 0);
1002
1003 memcpy(ptr, lt->buffer + lt->pos, nthistime);
1004
1005 lt->pos += nthistime;
1006 ptr = (void *) ((char *) ptr + nthistime);
1007 size -= nthistime;
1008 nread += nthistime;
1009 }
1010
1011 return nread;
1012 }
1013
1014 /*
1015 * "Freeze" the contents of a tape so that it can be read multiple times
1016 * and/or read backwards. Once a tape is frozen, its contents will not
1017 * be released until the LogicalTapeSet is destroyed. This is expected
1018 * to be used only for the final output pass of a merge.
1019 *
1020 * This *must* be called just at the end of a write pass, before the
1021 * tape is rewound (after rewind is too late!). It performs a rewind
1022 * and switch to read mode "for free". An immediately following rewind-
1023 * for-read call is OK but not necessary.
1024 *
1025 * share output argument is set with details of storage used for tape after
1026 * freezing, which may be passed to LogicalTapeSetCreate within leader
1027 * process later. This metadata is only of interest to worker callers
1028 * freezing their final output for leader (single materialized tape).
1029 * Serial sorts should set share to NULL.
1030 */
1031 void
LogicalTapeFreeze(LogicalTapeSet * lts,int tapenum,TapeShare * share)1032 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
1033 {
1034 LogicalTape *lt;
1035
1036 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1037 lt = <s->tapes[tapenum];
1038 Assert(lt->writing);
1039 Assert(lt->offsetBlockNumber == 0L);
1040
1041 /*
1042 * Completion of a write phase. Flush last partial data block, and rewind
1043 * for nondestructive read.
1044 */
1045 if (lt->dirty)
1046 {
1047 /*
1048 * As long as we've filled the buffer at least once, its contents are
1049 * entirely defined from valgrind's point of view, even though
1050 * contents beyond the current end point may be stale. But it's
1051 * possible - at least in the case of a parallel sort - to sort such
1052 * small amount of data that we do not fill the buffer even once. Tell
1053 * valgrind that its contents are defined, so it doesn't bleat.
1054 */
1055 VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
1056 lt->buffer_size - lt->nbytes);
1057
1058 TapeBlockSetNBytes(lt->buffer, lt->nbytes);
1059 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
1060 lt->writing = false;
1061 }
1062 lt->writing = false;
1063 lt->frozen = true;
1064
1065 /*
1066 * The seek and backspace functions assume a single block read buffer.
1067 * That's OK with current usage. A larger buffer is helpful to make the
1068 * read pattern of the backing file look more sequential to the OS, when
1069 * we're reading from multiple tapes. But at the end of a sort, when a
1070 * tape is frozen, we only read from a single tape anyway.
1071 */
1072 if (!lt->buffer || lt->buffer_size != BLCKSZ)
1073 {
1074 if (lt->buffer)
1075 pfree(lt->buffer);
1076 lt->buffer = palloc(BLCKSZ);
1077 lt->buffer_size = BLCKSZ;
1078 }
1079
1080 /* Read the first block, or reset if tape is empty */
1081 lt->curBlockNumber = lt->firstBlockNumber;
1082 lt->pos = 0;
1083 lt->nbytes = 0;
1084
1085 if (lt->firstBlockNumber == -1L)
1086 lt->nextBlockNumber = -1L;
1087 ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
1088 if (TapeBlockIsLast(lt->buffer))
1089 lt->nextBlockNumber = -1L;
1090 else
1091 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1092 lt->nbytes = TapeBlockGetNBytes(lt->buffer);
1093
1094 /* Handle extra steps when caller is to share its tapeset */
1095 if (share)
1096 {
1097 BufFileExportShared(lts->pfile);
1098 share->firstblocknumber = lt->firstBlockNumber;
1099 }
1100 }
1101
1102 /*
1103 * Add additional tapes to this tape set. Not intended to be used when any
1104 * tapes are frozen.
1105 */
1106 void
LogicalTapeSetExtend(LogicalTapeSet * lts,int nAdditional)1107 LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
1108 {
1109 int i;
1110 int nTapesOrig = lts->nTapes;
1111
1112 lts->nTapes += nAdditional;
1113
1114 lts->tapes = (LogicalTape *) repalloc(lts->tapes,
1115 lts->nTapes * sizeof(LogicalTape));
1116
1117 for (i = nTapesOrig; i < lts->nTapes; i++)
1118 ltsInitTape(<s->tapes[i]);
1119 }
1120
1121 /*
1122 * Backspace the tape a given number of bytes. (We also support a more
1123 * general seek interface, see below.)
1124 *
1125 * *Only* a frozen-for-read tape can be backed up; we don't support
1126 * random access during write, and an unfrozen read tape may have
1127 * already discarded the desired data!
1128 *
1129 * Returns the number of bytes backed up. It can be less than the
1130 * requested amount, if there isn't that much data before the current
1131 * position. The tape is positioned to the beginning of the tape in
1132 * that case.
1133 */
1134 size_t
LogicalTapeBackspace(LogicalTapeSet * lts,int tapenum,size_t size)1135 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
1136 {
1137 LogicalTape *lt;
1138 size_t seekpos = 0;
1139
1140 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1141 lt = <s->tapes[tapenum];
1142 Assert(lt->frozen);
1143 Assert(lt->buffer_size == BLCKSZ);
1144
1145 if (lt->buffer == NULL)
1146 ltsInitReadBuffer(lts, lt);
1147
1148 /*
1149 * Easy case for seek within current block.
1150 */
1151 if (size <= (size_t) lt->pos)
1152 {
1153 lt->pos -= (int) size;
1154 return size;
1155 }
1156
1157 /*
1158 * Not-so-easy case, have to walk back the chain of blocks. This
1159 * implementation would be pretty inefficient for long seeks, but we
1160 * really aren't doing that (a seek over one tuple is typical).
1161 */
1162 seekpos = (size_t) lt->pos; /* part within this block */
1163 while (size > seekpos)
1164 {
1165 long prev = TapeBlockGetTrailer(lt->buffer)->prev;
1166
1167 if (prev == -1L)
1168 {
1169 /* Tried to back up beyond the beginning of tape. */
1170 if (lt->curBlockNumber != lt->firstBlockNumber)
1171 elog(ERROR, "unexpected end of tape");
1172 lt->pos = 0;
1173 return seekpos;
1174 }
1175
1176 ltsReadBlock(lts, prev, (void *) lt->buffer);
1177
1178 if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1179 elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1180 prev,
1181 TapeBlockGetTrailer(lt->buffer)->next,
1182 lt->curBlockNumber);
1183
1184 lt->nbytes = TapeBlockPayloadSize;
1185 lt->curBlockNumber = prev;
1186 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1187
1188 seekpos += TapeBlockPayloadSize;
1189 }
1190
1191 /*
1192 * 'seekpos' can now be greater than 'size', because it points to the
1193 * beginning the target block. The difference is the position within the
1194 * page.
1195 */
1196 lt->pos = seekpos - size;
1197 return size;
1198 }
1199
1200 /*
1201 * Seek to an arbitrary position in a logical tape.
1202 *
1203 * *Only* a frozen-for-read tape can be seeked.
1204 *
1205 * Must be called with a block/offset previously returned by
1206 * LogicalTapeTell().
1207 */
1208 void
LogicalTapeSeek(LogicalTapeSet * lts,int tapenum,long blocknum,int offset)1209 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
1210 long blocknum, int offset)
1211 {
1212 LogicalTape *lt;
1213
1214 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1215 lt = <s->tapes[tapenum];
1216 Assert(lt->frozen);
1217 Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1218 Assert(lt->buffer_size == BLCKSZ);
1219
1220 if (lt->buffer == NULL)
1221 ltsInitReadBuffer(lts, lt);
1222
1223 if (blocknum != lt->curBlockNumber)
1224 {
1225 ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1226 lt->curBlockNumber = blocknum;
1227 lt->nbytes = TapeBlockPayloadSize;
1228 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1229 }
1230
1231 if (offset > lt->nbytes)
1232 elog(ERROR, "invalid tape seek position");
1233 lt->pos = offset;
1234 }
1235
1236 /*
1237 * Obtain current position in a form suitable for a later LogicalTapeSeek.
1238 *
1239 * NOTE: it'd be OK to do this during write phase with intention of using
1240 * the position for a seek after freezing. Not clear if anyone needs that.
1241 */
1242 void
LogicalTapeTell(LogicalTapeSet * lts,int tapenum,long * blocknum,int * offset)1243 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
1244 long *blocknum, int *offset)
1245 {
1246 LogicalTape *lt;
1247
1248 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1249 lt = <s->tapes[tapenum];
1250
1251 if (lt->buffer == NULL)
1252 ltsInitReadBuffer(lts, lt);
1253
1254 Assert(lt->offsetBlockNumber == 0L);
1255
1256 /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1257 Assert(lt->buffer_size == BLCKSZ);
1258
1259 *blocknum = lt->curBlockNumber;
1260 *offset = lt->pos;
1261 }
1262
1263 /*
1264 * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1265 *
1266 * This should not be called while there are open write buffers; otherwise it
1267 * may not account for buffered data.
1268 */
1269 long
LogicalTapeSetBlocks(LogicalTapeSet * lts)1270 LogicalTapeSetBlocks(LogicalTapeSet *lts)
1271 {
1272 #ifdef USE_ASSERT_CHECKING
1273 for (int i = 0; i < lts->nTapes; i++)
1274 {
1275 LogicalTape *lt = <s->tapes[i];
1276 Assert(!lt->writing || lt->buffer == NULL);
1277 }
1278 #endif
1279 return lts->nBlocksWritten - lts->nHoleBlocks;
1280 }
1281