1 /*-------------------------------------------------------------------------
2 *
3 * logtape.c
4 * Management of "logical tapes" within temporary files.
5 *
6 * This module exists to support sorting via multiple merge passes (see
7 * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 * we implement it on disk by creating a separate file for each "tape",
9 * there is an annoying problem: the peak space usage is at least twice
10 * the volume of actual data to be sorted. (This must be so because each
11 * datum will appear in both the input and output tapes of the final
12 * merge pass. For seven-tape polyphase merge, which is otherwise a
13 * pretty good algorithm, peak usage is more like 4x actual data volume.)
14 *
15 * We can work around this problem by recognizing that any one tape
16 * dataset (with the possible exception of the final output) is written
17 * and read exactly once in a perfectly sequential manner. Therefore,
18 * a datum once read will not be required again, and we can recycle its
19 * space for use by the new tape dataset(s) being generated. In this way,
20 * the total space usage is essentially just the actual data volume, plus
21 * insignificant bookkeeping and start/stop overhead.
22 *
23 * Few OSes allow arbitrary parts of a file to be released back to the OS,
24 * so we have to implement this space-recycling ourselves within a single
25 * logical file. logtape.c exists to perform this bookkeeping and provide
26 * the illusion of N independent tape devices to tuplesort.c. Note that
27 * logtape.c itself depends on buffile.c to provide a "logical file" of
28 * larger size than the underlying OS may support.
29 *
30 * For simplicity, we allocate and release space in the underlying file
31 * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32 * of which blocks in the underlying file belong to which logical tape,
33 * plus any blocks that are free (recycled and not yet reused).
34 * The blocks in each logical tape form a chain, with a prev- and next-
35 * pointer in each block.
36 *
37 * The initial write pass is guaranteed to fill the underlying file
38 * perfectly sequentially, no matter how data is divided into logical tapes.
39 * Once we begin merge passes, the access pattern becomes considerably
40 * less predictable --- but the seeking involved should be comparable to
41 * what would happen if we kept each logical tape in a separate file,
42 * so there's no serious performance penalty paid to obtain the space
43 * savings of recycling. We try to localize the write accesses by always
44 * writing to the lowest-numbered free block when we have a choice; it's
45 * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46 * policy for free blocks would be better?)
47 *
48 * To further make the I/Os more sequential, we can use a larger buffer
49 * when reading, and read multiple blocks from the same tape in one go,
50 * whenever the buffer becomes empty.
51 *
52 * To support the above policy of writing to the lowest free block,
53 * ltsGetFreeBlock sorts the list of free block numbers into decreasing
54 * order each time it is asked for a block and the list isn't currently
55 * sorted. This is an efficient way to handle it because we expect cycles
56 * of releasing many blocks followed by re-using many blocks, due to
57 * the larger read buffer.
58 *
59 * Since all the bookkeeping and buffer memory is allocated with palloc(),
60 * and the underlying file(s) are made with OpenTemporaryFile, all resources
61 * for a logical tape set are certain to be cleaned up even if processing
62 * is aborted by ereport(ERROR). To avoid confusion, the caller should take
63 * care that all calls for a single LogicalTapeSet are made in the same
64 * palloc context.
65 *
66 * To support parallel sort operations involving coordinated callers to
67 * tuplesort.c routines across multiple workers, it is necessary to
68 * concatenate each worker BufFile/tapeset into one single logical tapeset
69 * managed by the leader. Workers should have produced one final
70 * materialized tape (their entire output) when this happens in leader.
71 * There will always be the same number of runs as input tapes, and the same
72 * number of input tapes as participants (worker Tuplesortstates).
73 *
74 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
75 * Portions Copyright (c) 1994, Regents of the University of California
76 *
77 * IDENTIFICATION
78 * src/backend/utils/sort/logtape.c
79 *
80 *-------------------------------------------------------------------------
81 */
82
83 #include "postgres.h"
84
85 #include "storage/buffile.h"
86 #include "utils/builtins.h"
87 #include "utils/logtape.h"
88 #include "utils/memdebug.h"
89 #include "utils/memutils.h"
90
91 /*
92 * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
93 *
94 * The first block of a tape has prev == -1. The last block of a tape
95 * stores the number of valid bytes on the block, inverted, in 'next'
96 * Therefore next < 0 indicates the last block.
97 */
98 typedef struct TapeBlockTrailer
99 {
100 long prev; /* previous block on this tape, or -1 on first
101 * block */
102 long next; /* next block on this tape, or # of valid
103 * bytes on last block (if < 0) */
104 } TapeBlockTrailer;
105
106 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
107 #define TapeBlockGetTrailer(buf) \
108 ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
109
110 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
111 #define TapeBlockGetNBytes(buf) \
112 (TapeBlockIsLast(buf) ? \
113 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
114 #define TapeBlockSetNBytes(buf, nbytes) \
115 (TapeBlockGetTrailer(buf)->next = -(nbytes))
116
117
118 /*
119 * This data structure represents a single "logical tape" within the set
120 * of logical tapes stored in the same file.
121 *
122 * While writing, we hold the current partially-written data block in the
123 * buffer. While reading, we can hold multiple blocks in the buffer. Note
124 * that we don't retain the trailers of a block when it's read into the
125 * buffer. The buffer therefore contains one large contiguous chunk of data
126 * from the tape.
127 */
128 typedef struct LogicalTape
129 {
130 bool writing; /* T while in write phase */
131 bool frozen; /* T if blocks should not be freed when read */
132 bool dirty; /* does buffer need to be written? */
133
134 /*
135 * Block numbers of the first, current, and next block of the tape.
136 *
137 * The "current" block number is only valid when writing, or reading from
138 * a frozen tape. (When reading from an unfrozen tape, we use a larger
139 * read buffer that holds multiple blocks, so the "current" block is
140 * ambiguous.)
141 *
142 * When concatenation of worker tape BufFiles is performed, an offset to
143 * the first block in the unified BufFile space is applied during reads.
144 */
145 long firstBlockNumber;
146 long curBlockNumber;
147 long nextBlockNumber;
148 long offsetBlockNumber;
149
150 /*
151 * Buffer for current data block(s).
152 */
153 char *buffer; /* physical buffer (separately palloc'd) */
154 int buffer_size; /* allocated size of the buffer */
155 int max_size; /* highest useful, safe buffer_size */
156 int pos; /* next read/write position in buffer */
157 int nbytes; /* total # of valid bytes in buffer */
158 } LogicalTape;
159
160 /*
161 * This data structure represents a set of related "logical tapes" sharing
162 * space in a single underlying file. (But that "file" may be multiple files
163 * if needed to escape OS limits on file size; buffile.c handles that for us.)
164 * The number of tapes is fixed at creation.
165 */
166 struct LogicalTapeSet
167 {
168 BufFile *pfile; /* underlying file for whole tape set */
169
170 /*
171 * File size tracking. nBlocksWritten is the size of the underlying file,
172 * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
173 * by ltsGetFreeBlock(), and it is always greater than or equal to
174 * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
175 * blocks that have been allocated for a tape, but have not been written
176 * to the underlying file yet. nHoleBlocks tracks the total number of
177 * blocks that are in unused holes between worker spaces following BufFile
178 * concatenation.
179 */
180 long nBlocksAllocated; /* # of blocks allocated */
181 long nBlocksWritten; /* # of blocks used in underlying file */
182 long nHoleBlocks; /* # of "hole" blocks left */
183
184 /*
185 * We store the numbers of recycled-and-available blocks in freeBlocks[].
186 * When there are no such blocks, we extend the underlying file.
187 *
188 * If forgetFreeSpace is true then any freed blocks are simply forgotten
189 * rather than being remembered in freeBlocks[]. See notes for
190 * LogicalTapeSetForgetFreeSpace().
191 *
192 * If blocksSorted is true then the block numbers in freeBlocks are in
193 * *decreasing* order, so that removing the last entry gives us the lowest
194 * free block. We re-sort the blocks whenever a block is demanded; this
195 * should be reasonably efficient given the expected usage pattern.
196 */
197 bool forgetFreeSpace; /* are we remembering free blocks? */
198 bool blocksSorted; /* is freeBlocks[] currently in order? */
199 long *freeBlocks; /* resizable array */
200 int nFreeBlocks; /* # of currently free blocks */
201 int freeBlocksLen; /* current allocated length of freeBlocks[] */
202
203 /* The array of logical tapes. */
204 int nTapes; /* # of logical tapes in set */
205 LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
206 };
207
208 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
209 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
210 static long ltsGetFreeBlock(LogicalTapeSet *lts);
211 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
212 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
213 SharedFileSet *fileset);
214
215
216 /*
217 * Write a block-sized buffer to the specified block of the underlying file.
218 *
219 * No need for an error return convention; we ereport() on any error.
220 */
221 static void
ltsWriteBlock(LogicalTapeSet * lts,long blocknum,void * buffer)222 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
223 {
224 /*
225 * BufFile does not support "holes", so if we're about to write a block
226 * that's past the current end of file, fill the space between the current
227 * end of file and the target block with zeros.
228 *
229 * This should happen rarely, otherwise you are not writing very
230 * sequentially. In current use, this only happens when the sort ends
231 * writing a run, and switches to another tape. The last block of the
232 * previous tape isn't flushed to disk until the end of the sort, so you
233 * get one-block hole, where the last block of the previous tape will
234 * later go.
235 *
236 * Note that BufFile concatenation can leave "holes" in BufFile between
237 * worker-owned block ranges. These are tracked for reporting purposes
238 * only. We never read from nor write to these hole blocks, and so they
239 * are not considered here.
240 */
241 while (blocknum > lts->nBlocksWritten)
242 {
243 PGAlignedBlock zerobuf;
244
245 MemSet(zerobuf.data, 0, sizeof(zerobuf));
246
247 ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
248 }
249
250 /* Write the requested block */
251 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
252 ereport(ERROR,
253 (errcode_for_file_access(),
254 errmsg("could not seek to block %ld of temporary file",
255 blocknum)));
256 BufFileWrite(lts->pfile, buffer, BLCKSZ);
257
258 /* Update nBlocksWritten, if we extended the file */
259 if (blocknum == lts->nBlocksWritten)
260 lts->nBlocksWritten++;
261 }
262
263 /*
264 * Read a block-sized buffer from the specified block of the underlying file.
265 *
266 * No need for an error return convention; we ereport() on any error. This
267 * module should never attempt to read a block it doesn't know is there.
268 */
269 static void
ltsReadBlock(LogicalTapeSet * lts,long blocknum,void * buffer)270 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
271 {
272 size_t nread;
273
274 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
275 ereport(ERROR,
276 (errcode_for_file_access(),
277 errmsg("could not seek to block %ld of temporary file",
278 blocknum)));
279 nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
280 if (nread != BLCKSZ)
281 ereport(ERROR,
282 (errcode_for_file_access(),
283 errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
284 blocknum, nread, (size_t) BLCKSZ)));
285 }
286
287 /*
288 * Read as many blocks as we can into the per-tape buffer.
289 *
290 * Returns true if anything was read, 'false' on EOF.
291 */
292 static bool
ltsReadFillBuffer(LogicalTapeSet * lts,LogicalTape * lt)293 ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
294 {
295 lt->pos = 0;
296 lt->nbytes = 0;
297
298 do
299 {
300 char *thisbuf = lt->buffer + lt->nbytes;
301 long datablocknum = lt->nextBlockNumber;
302
303 /* Fetch next block number */
304 if (datablocknum == -1L)
305 break; /* EOF */
306 /* Apply worker offset, needed for leader tapesets */
307 datablocknum += lt->offsetBlockNumber;
308
309 /* Read the block */
310 ltsReadBlock(lts, datablocknum, (void *) thisbuf);
311 if (!lt->frozen)
312 ltsReleaseBlock(lts, datablocknum);
313 lt->curBlockNumber = lt->nextBlockNumber;
314
315 lt->nbytes += TapeBlockGetNBytes(thisbuf);
316 if (TapeBlockIsLast(thisbuf))
317 {
318 lt->nextBlockNumber = -1L;
319 /* EOF */
320 break;
321 }
322 else
323 lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
324
325 /* Advance to next block, if we have buffer space left */
326 } while (lt->buffer_size - lt->nbytes > BLCKSZ);
327
328 return (lt->nbytes > 0);
329 }
330
331 /*
332 * qsort comparator for sorting freeBlocks[] into decreasing order.
333 */
334 static int
freeBlocks_cmp(const void * a,const void * b)335 freeBlocks_cmp(const void *a, const void *b)
336 {
337 long ablk = *((const long *) a);
338 long bblk = *((const long *) b);
339
340 /* can't just subtract because long might be wider than int */
341 if (ablk < bblk)
342 return 1;
343 if (ablk > bblk)
344 return -1;
345 return 0;
346 }
347
348 /*
349 * Select a currently unused block for writing to.
350 */
351 static long
ltsGetFreeBlock(LogicalTapeSet * lts)352 ltsGetFreeBlock(LogicalTapeSet *lts)
353 {
354 /*
355 * If there are multiple free blocks, we select the one appearing last in
356 * freeBlocks[] (after sorting the array if needed). If there are none,
357 * assign the next block at the end of the file.
358 */
359 if (lts->nFreeBlocks > 0)
360 {
361 if (!lts->blocksSorted)
362 {
363 qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
364 sizeof(long), freeBlocks_cmp);
365 lts->blocksSorted = true;
366 }
367 return lts->freeBlocks[--lts->nFreeBlocks];
368 }
369 else
370 return lts->nBlocksAllocated++;
371 }
372
373 /*
374 * Return a block# to the freelist.
375 */
376 static void
ltsReleaseBlock(LogicalTapeSet * lts,long blocknum)377 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
378 {
379 int ndx;
380
381 /*
382 * Do nothing if we're no longer interested in remembering free space.
383 */
384 if (lts->forgetFreeSpace)
385 return;
386
387 /*
388 * Enlarge freeBlocks array if full.
389 */
390 if (lts->nFreeBlocks >= lts->freeBlocksLen)
391 {
392 lts->freeBlocksLen *= 2;
393 lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
394 lts->freeBlocksLen * sizeof(long));
395 }
396
397 /*
398 * Add blocknum to array, and mark the array unsorted if it's no longer in
399 * decreasing order.
400 */
401 ndx = lts->nFreeBlocks++;
402 lts->freeBlocks[ndx] = blocknum;
403 if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
404 lts->blocksSorted = false;
405 }
406
407 /*
408 * Claim ownership of a set of logical tapes from existing shared BufFiles.
409 *
410 * Caller should be leader process. Though tapes are marked as frozen in
411 * workers, they are not frozen when opened within leader, since unfrozen tapes
412 * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
413 * for random access.)
414 */
415 static void
ltsConcatWorkerTapes(LogicalTapeSet * lts,TapeShare * shared,SharedFileSet * fileset)416 ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
417 SharedFileSet *fileset)
418 {
419 LogicalTape *lt = NULL;
420 long tapeblocks = 0L;
421 long nphysicalblocks = 0L;
422 int i;
423
424 /* Should have at least one worker tape, plus leader's tape */
425 Assert(lts->nTapes >= 2);
426
427 /*
428 * Build concatenated view of all BufFiles, remembering the block number
429 * where each source file begins. No changes are needed for leader/last
430 * tape.
431 */
432 for (i = 0; i < lts->nTapes - 1; i++)
433 {
434 char filename[MAXPGPATH];
435 BufFile *file;
436 int64 filesize;
437
438 lt = <s->tapes[i];
439
440 pg_itoa(i, filename);
441 file = BufFileOpenShared(fileset, filename);
442 filesize = BufFileSize(file);
443
444 /*
445 * Stash first BufFile, and concatenate subsequent BufFiles to that.
446 * Store block offset into each tape as we go.
447 */
448 lt->firstBlockNumber = shared[i].firstblocknumber;
449 if (i == 0)
450 {
451 lts->pfile = file;
452 lt->offsetBlockNumber = 0L;
453 }
454 else
455 {
456 lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
457 }
458 /* Don't allocate more for read buffer than could possibly help */
459 lt->max_size = Min(MaxAllocSize, filesize);
460 tapeblocks = filesize / BLCKSZ;
461 nphysicalblocks += tapeblocks;
462 }
463
464 /*
465 * Set # of allocated blocks, as well as # blocks written. Use extent of
466 * new BufFile space (from 0 to end of last worker's tape space) for this.
467 * Allocated/written blocks should include space used by holes left
468 * between concatenated BufFiles.
469 */
470 lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
471 lts->nBlocksWritten = lts->nBlocksAllocated;
472
473 /*
474 * Compute number of hole blocks so that we can later work backwards, and
475 * instrument number of physical blocks. We don't simply use physical
476 * blocks directly for instrumentation because this would break if we ever
477 * subsequently wrote to the leader tape.
478 *
479 * Working backwards like this keeps our options open. If shared BufFiles
480 * ever support being written to post-export, logtape.c can automatically
481 * take advantage of that. We'd then support writing to the leader tape
482 * while recycling space from worker tapes, because the leader tape has a
483 * zero offset (write routines won't need to have extra logic to apply an
484 * offset).
485 *
486 * The only thing that currently prevents writing to the leader tape from
487 * working is the fact that BufFiles opened using BufFileOpenShared() are
488 * read-only by definition, but that could be changed if it seemed
489 * worthwhile. For now, writing to the leader tape will raise a "Bad file
490 * descriptor" error, so tuplesort must avoid writing to the leader tape
491 * altogether.
492 */
493 lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
494 }
495
496 /*
497 * Create a set of logical tapes in a temporary underlying file.
498 *
499 * Each tape is initialized in write state. Serial callers pass ntapes,
500 * NULL argument for shared, and -1 for worker. Parallel worker callers
501 * pass ntapes, a shared file handle, NULL shared argument, and their own
502 * worker number. Leader callers, which claim shared worker tapes here,
503 * must supply non-sentinel values for all arguments except worker number,
504 * which should be -1.
505 *
506 * Leader caller is passing back an array of metadata each worker captured
507 * when LogicalTapeFreeze() was called for their final result tapes. Passed
508 * tapes array is actually sized ntapes - 1, because it includes only
509 * worker tapes, whereas leader requires its own leader tape. Note that we
510 * rely on the assumption that reclaimed worker tapes will only be read
511 * from once by leader, and never written to again (tapes are initialized
512 * for writing, but that's only to be consistent). Leader may not write to
513 * its own tape purely due to a restriction in the shared buffile
514 * infrastructure that may be lifted in the future.
515 */
516 LogicalTapeSet *
LogicalTapeSetCreate(int ntapes,TapeShare * shared,SharedFileSet * fileset,int worker)517 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
518 int worker)
519 {
520 LogicalTapeSet *lts;
521 LogicalTape *lt;
522 int i;
523
524 /*
525 * Create top-level struct including per-tape LogicalTape structs.
526 */
527 Assert(ntapes > 0);
528 lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
529 ntapes * sizeof(LogicalTape));
530 lts->nBlocksAllocated = 0L;
531 lts->nBlocksWritten = 0L;
532 lts->nHoleBlocks = 0L;
533 lts->forgetFreeSpace = false;
534 lts->blocksSorted = true; /* a zero-length array is sorted ... */
535 lts->freeBlocksLen = 32; /* reasonable initial guess */
536 lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
537 lts->nFreeBlocks = 0;
538 lts->nTapes = ntapes;
539
540 /*
541 * Initialize per-tape structs. Note we allocate the I/O buffer and the
542 * first block for a tape only when it is first actually written to. This
543 * avoids wasting memory space when tuplesort.c overestimates the number
544 * of tapes needed.
545 */
546 for (i = 0; i < ntapes; i++)
547 {
548 lt = <s->tapes[i];
549 lt->writing = true;
550 lt->frozen = false;
551 lt->dirty = false;
552 lt->firstBlockNumber = -1L;
553 lt->curBlockNumber = -1L;
554 lt->nextBlockNumber = -1L;
555 lt->offsetBlockNumber = 0L;
556 lt->buffer = NULL;
557 lt->buffer_size = 0;
558 /* palloc() larger than MaxAllocSize would fail */
559 lt->max_size = MaxAllocSize;
560 lt->pos = 0;
561 lt->nbytes = 0;
562 }
563
564 /*
565 * Create temp BufFile storage as required.
566 *
567 * Leader concatenates worker tapes, which requires special adjustment to
568 * final tapeset data. Things are simpler for the worker case and the
569 * serial case, though. They are generally very similar -- workers use a
570 * shared fileset, whereas serial sorts use a conventional serial BufFile.
571 */
572 if (shared)
573 ltsConcatWorkerTapes(lts, shared, fileset);
574 else if (fileset)
575 {
576 char filename[MAXPGPATH];
577
578 pg_itoa(worker, filename);
579 lts->pfile = BufFileCreateShared(fileset, filename);
580 }
581 else
582 lts->pfile = BufFileCreateTemp(false);
583
584 return lts;
585 }
586
587 /*
588 * Close a logical tape set and release all resources.
589 */
590 void
LogicalTapeSetClose(LogicalTapeSet * lts)591 LogicalTapeSetClose(LogicalTapeSet *lts)
592 {
593 LogicalTape *lt;
594 int i;
595
596 BufFileClose(lts->pfile);
597 for (i = 0; i < lts->nTapes; i++)
598 {
599 lt = <s->tapes[i];
600 if (lt->buffer)
601 pfree(lt->buffer);
602 }
603 pfree(lts->freeBlocks);
604 pfree(lts);
605 }
606
607 /*
608 * Mark a logical tape set as not needing management of free space anymore.
609 *
610 * This should be called if the caller does not intend to write any more data
611 * into the tape set, but is reading from un-frozen tapes. Since no more
612 * writes are planned, remembering free blocks is no longer useful. Setting
613 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
614 * is not designed to handle large numbers of free blocks.
615 */
616 void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet * lts)617 LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
618 {
619 lts->forgetFreeSpace = true;
620 }
621
622 /*
623 * Write to a logical tape.
624 *
625 * There are no error returns; we ereport() on failure.
626 */
627 void
LogicalTapeWrite(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)628 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
629 void *ptr, size_t size)
630 {
631 LogicalTape *lt;
632 size_t nthistime;
633
634 Assert(tapenum >= 0 && tapenum < lts->nTapes);
635 lt = <s->tapes[tapenum];
636 Assert(lt->writing);
637 Assert(lt->offsetBlockNumber == 0L);
638
639 /* Allocate data buffer and first block on first write */
640 if (lt->buffer == NULL)
641 {
642 lt->buffer = (char *) palloc(BLCKSZ);
643 lt->buffer_size = BLCKSZ;
644 }
645 if (lt->curBlockNumber == -1)
646 {
647 Assert(lt->firstBlockNumber == -1);
648 Assert(lt->pos == 0);
649
650 lt->curBlockNumber = ltsGetFreeBlock(lts);
651 lt->firstBlockNumber = lt->curBlockNumber;
652
653 TapeBlockGetTrailer(lt->buffer)->prev = -1L;
654 }
655
656 Assert(lt->buffer_size == BLCKSZ);
657 while (size > 0)
658 {
659 if (lt->pos >= TapeBlockPayloadSize)
660 {
661 /* Buffer full, dump it out */
662 long nextBlockNumber;
663
664 if (!lt->dirty)
665 {
666 /* Hmm, went directly from reading to writing? */
667 elog(ERROR, "invalid logtape state: should be dirty");
668 }
669
670 /*
671 * First allocate the next block, so that we can store it in the
672 * 'next' pointer of this block.
673 */
674 nextBlockNumber = ltsGetFreeBlock(lts);
675
676 /* set the next-pointer and dump the current block. */
677 TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
678 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
679
680 /* initialize the prev-pointer of the next block */
681 TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
682 lt->curBlockNumber = nextBlockNumber;
683 lt->pos = 0;
684 lt->nbytes = 0;
685 }
686
687 nthistime = TapeBlockPayloadSize - lt->pos;
688 if (nthistime > size)
689 nthistime = size;
690 Assert(nthistime > 0);
691
692 memcpy(lt->buffer + lt->pos, ptr, nthistime);
693
694 lt->dirty = true;
695 lt->pos += nthistime;
696 if (lt->nbytes < lt->pos)
697 lt->nbytes = lt->pos;
698 ptr = (void *) ((char *) ptr + nthistime);
699 size -= nthistime;
700 }
701 }
702
703 /*
704 * Rewind logical tape and switch from writing to reading.
705 *
706 * The tape must currently be in writing state, or "frozen" in read state.
707 *
708 * 'buffer_size' specifies how much memory to use for the read buffer.
709 * Regardless of the argument, the actual amount of memory used is between
710 * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
711 * rounded down and truncated to fit those constraints, if necessary. If the
712 * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
713 * byte buffer is used.
714 */
715 void
LogicalTapeRewindForRead(LogicalTapeSet * lts,int tapenum,size_t buffer_size)716 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
717 {
718 LogicalTape *lt;
719
720 Assert(tapenum >= 0 && tapenum < lts->nTapes);
721 lt = <s->tapes[tapenum];
722
723 /*
724 * Round and cap buffer_size if needed.
725 */
726 if (lt->frozen)
727 buffer_size = BLCKSZ;
728 else
729 {
730 /* need at least one block */
731 if (buffer_size < BLCKSZ)
732 buffer_size = BLCKSZ;
733
734 /* palloc() larger than max_size is unlikely to be helpful */
735 if (buffer_size > lt->max_size)
736 buffer_size = lt->max_size;
737
738 /* round down to BLCKSZ boundary */
739 buffer_size -= buffer_size % BLCKSZ;
740 }
741
742 if (lt->writing)
743 {
744 /*
745 * Completion of a write phase. Flush last partial data block, and
746 * rewind for normal (destructive) read.
747 */
748 if (lt->dirty)
749 {
750 /*
751 * As long as we've filled the buffer at least once, its contents
752 * are entirely defined from valgrind's point of view, even though
753 * contents beyond the current end point may be stale. But it's
754 * possible - at least in the case of a parallel sort - to sort
755 * such small amount of data that we do not fill the buffer even
756 * once. Tell valgrind that its contents are defined, so it
757 * doesn't bleat.
758 */
759 VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
760 lt->buffer_size - lt->nbytes);
761
762 TapeBlockSetNBytes(lt->buffer, lt->nbytes);
763 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
764 }
765 lt->writing = false;
766 }
767 else
768 {
769 /*
770 * This is only OK if tape is frozen; we rewind for (another) read
771 * pass.
772 */
773 Assert(lt->frozen);
774 }
775
776 /* Allocate a read buffer (unless the tape is empty) */
777 if (lt->buffer)
778 pfree(lt->buffer);
779 lt->buffer = NULL;
780 lt->buffer_size = 0;
781 if (lt->firstBlockNumber != -1L)
782 {
783 lt->buffer = palloc(buffer_size);
784 lt->buffer_size = buffer_size;
785 }
786
787 /* Read the first block, or reset if tape is empty */
788 lt->nextBlockNumber = lt->firstBlockNumber;
789 lt->pos = 0;
790 lt->nbytes = 0;
791 ltsReadFillBuffer(lts, lt);
792 }
793
794 /*
795 * Rewind logical tape and switch from reading to writing.
796 *
797 * NOTE: we assume the caller has read the tape to the end; otherwise
798 * untouched data will not have been freed. We could add more code to free
799 * any unread blocks, but in current usage of this module it'd be useless
800 * code.
801 */
802 void
LogicalTapeRewindForWrite(LogicalTapeSet * lts,int tapenum)803 LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
804 {
805 LogicalTape *lt;
806
807 Assert(tapenum >= 0 && tapenum < lts->nTapes);
808 lt = <s->tapes[tapenum];
809
810 Assert(!lt->writing && !lt->frozen);
811 lt->writing = true;
812 lt->dirty = false;
813 lt->firstBlockNumber = -1L;
814 lt->curBlockNumber = -1L;
815 lt->pos = 0;
816 lt->nbytes = 0;
817 if (lt->buffer)
818 pfree(lt->buffer);
819 lt->buffer = NULL;
820 lt->buffer_size = 0;
821 }
822
823 /*
824 * Read from a logical tape.
825 *
826 * Early EOF is indicated by return value less than #bytes requested.
827 */
828 size_t
LogicalTapeRead(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)829 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
830 void *ptr, size_t size)
831 {
832 LogicalTape *lt;
833 size_t nread = 0;
834 size_t nthistime;
835
836 Assert(tapenum >= 0 && tapenum < lts->nTapes);
837 lt = <s->tapes[tapenum];
838 Assert(!lt->writing);
839
840 while (size > 0)
841 {
842 if (lt->pos >= lt->nbytes)
843 {
844 /* Try to load more data into buffer. */
845 if (!ltsReadFillBuffer(lts, lt))
846 break; /* EOF */
847 }
848
849 nthistime = lt->nbytes - lt->pos;
850 if (nthistime > size)
851 nthistime = size;
852 Assert(nthistime > 0);
853
854 memcpy(ptr, lt->buffer + lt->pos, nthistime);
855
856 lt->pos += nthistime;
857 ptr = (void *) ((char *) ptr + nthistime);
858 size -= nthistime;
859 nread += nthistime;
860 }
861
862 return nread;
863 }
864
865 /*
866 * "Freeze" the contents of a tape so that it can be read multiple times
867 * and/or read backwards. Once a tape is frozen, its contents will not
868 * be released until the LogicalTapeSet is destroyed. This is expected
869 * to be used only for the final output pass of a merge.
870 *
871 * This *must* be called just at the end of a write pass, before the
872 * tape is rewound (after rewind is too late!). It performs a rewind
873 * and switch to read mode "for free". An immediately following rewind-
874 * for-read call is OK but not necessary.
875 *
876 * share output argument is set with details of storage used for tape after
877 * freezing, which may be passed to LogicalTapeSetCreate within leader
878 * process later. This metadata is only of interest to worker callers
879 * freezing their final output for leader (single materialized tape).
880 * Serial sorts should set share to NULL.
881 */
882 void
LogicalTapeFreeze(LogicalTapeSet * lts,int tapenum,TapeShare * share)883 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
884 {
885 LogicalTape *lt;
886
887 Assert(tapenum >= 0 && tapenum < lts->nTapes);
888 lt = <s->tapes[tapenum];
889 Assert(lt->writing);
890 Assert(lt->offsetBlockNumber == 0L);
891
892 /*
893 * Completion of a write phase. Flush last partial data block, and rewind
894 * for nondestructive read.
895 */
896 if (lt->dirty)
897 {
898 /*
899 * As long as we've filled the buffer at least once, its contents are
900 * entirely defined from valgrind's point of view, even though
901 * contents beyond the current end point may be stale. But it's
902 * possible - at least in the case of a parallel sort - to sort such
903 * small amount of data that we do not fill the buffer even once. Tell
904 * valgrind that its contents are defined, so it doesn't bleat.
905 */
906 VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
907 lt->buffer_size - lt->nbytes);
908
909 TapeBlockSetNBytes(lt->buffer, lt->nbytes);
910 ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
911 lt->writing = false;
912 }
913 lt->writing = false;
914 lt->frozen = true;
915
916 /*
917 * The seek and backspace functions assume a single block read buffer.
918 * That's OK with current usage. A larger buffer is helpful to make the
919 * read pattern of the backing file look more sequential to the OS, when
920 * we're reading from multiple tapes. But at the end of a sort, when a
921 * tape is frozen, we only read from a single tape anyway.
922 */
923 if (!lt->buffer || lt->buffer_size != BLCKSZ)
924 {
925 if (lt->buffer)
926 pfree(lt->buffer);
927 lt->buffer = palloc(BLCKSZ);
928 lt->buffer_size = BLCKSZ;
929 }
930
931 /* Read the first block, or reset if tape is empty */
932 lt->curBlockNumber = lt->firstBlockNumber;
933 lt->pos = 0;
934 lt->nbytes = 0;
935
936 if (lt->firstBlockNumber == -1L)
937 lt->nextBlockNumber = -1L;
938 ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
939 if (TapeBlockIsLast(lt->buffer))
940 lt->nextBlockNumber = -1L;
941 else
942 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
943 lt->nbytes = TapeBlockGetNBytes(lt->buffer);
944
945 /* Handle extra steps when caller is to share its tapeset */
946 if (share)
947 {
948 BufFileExportShared(lts->pfile);
949 share->firstblocknumber = lt->firstBlockNumber;
950 }
951 }
952
953 /*
954 * Backspace the tape a given number of bytes. (We also support a more
955 * general seek interface, see below.)
956 *
957 * *Only* a frozen-for-read tape can be backed up; we don't support
958 * random access during write, and an unfrozen read tape may have
959 * already discarded the desired data!
960 *
961 * Returns the number of bytes backed up. It can be less than the
962 * requested amount, if there isn't that much data before the current
963 * position. The tape is positioned to the beginning of the tape in
964 * that case.
965 */
966 size_t
LogicalTapeBackspace(LogicalTapeSet * lts,int tapenum,size_t size)967 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
968 {
969 LogicalTape *lt;
970 size_t seekpos = 0;
971
972 Assert(tapenum >= 0 && tapenum < lts->nTapes);
973 lt = <s->tapes[tapenum];
974 Assert(lt->frozen);
975 Assert(lt->buffer_size == BLCKSZ);
976
977 /*
978 * Easy case for seek within current block.
979 */
980 if (size <= (size_t) lt->pos)
981 {
982 lt->pos -= (int) size;
983 return size;
984 }
985
986 /*
987 * Not-so-easy case, have to walk back the chain of blocks. This
988 * implementation would be pretty inefficient for long seeks, but we
989 * really aren't doing that (a seek over one tuple is typical).
990 */
991 seekpos = (size_t) lt->pos; /* part within this block */
992 while (size > seekpos)
993 {
994 long prev = TapeBlockGetTrailer(lt->buffer)->prev;
995
996 if (prev == -1L)
997 {
998 /* Tried to back up beyond the beginning of tape. */
999 if (lt->curBlockNumber != lt->firstBlockNumber)
1000 elog(ERROR, "unexpected end of tape");
1001 lt->pos = 0;
1002 return seekpos;
1003 }
1004
1005 ltsReadBlock(lts, prev, (void *) lt->buffer);
1006
1007 if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1008 elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1009 prev,
1010 TapeBlockGetTrailer(lt->buffer)->next,
1011 lt->curBlockNumber);
1012
1013 lt->nbytes = TapeBlockPayloadSize;
1014 lt->curBlockNumber = prev;
1015 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1016
1017 seekpos += TapeBlockPayloadSize;
1018 }
1019
1020 /*
1021 * 'seekpos' can now be greater than 'size', because it points to the
1022 * beginning the target block. The difference is the position within the
1023 * page.
1024 */
1025 lt->pos = seekpos - size;
1026 return size;
1027 }
1028
1029 /*
1030 * Seek to an arbitrary position in a logical tape.
1031 *
1032 * *Only* a frozen-for-read tape can be seeked.
1033 *
1034 * Must be called with a block/offset previously returned by
1035 * LogicalTapeTell().
1036 */
1037 void
LogicalTapeSeek(LogicalTapeSet * lts,int tapenum,long blocknum,int offset)1038 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
1039 long blocknum, int offset)
1040 {
1041 LogicalTape *lt;
1042
1043 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1044 lt = <s->tapes[tapenum];
1045 Assert(lt->frozen);
1046 Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1047 Assert(lt->buffer_size == BLCKSZ);
1048
1049 if (blocknum != lt->curBlockNumber)
1050 {
1051 ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1052 lt->curBlockNumber = blocknum;
1053 lt->nbytes = TapeBlockPayloadSize;
1054 lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1055 }
1056
1057 if (offset > lt->nbytes)
1058 elog(ERROR, "invalid tape seek position");
1059 lt->pos = offset;
1060 }
1061
1062 /*
1063 * Obtain current position in a form suitable for a later LogicalTapeSeek.
1064 *
1065 * NOTE: it'd be OK to do this during write phase with intention of using
1066 * the position for a seek after freezing. Not clear if anyone needs that.
1067 */
1068 void
LogicalTapeTell(LogicalTapeSet * lts,int tapenum,long * blocknum,int * offset)1069 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
1070 long *blocknum, int *offset)
1071 {
1072 LogicalTape *lt;
1073
1074 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1075 lt = <s->tapes[tapenum];
1076 Assert(lt->offsetBlockNumber == 0L);
1077
1078 /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1079 Assert(lt->buffer_size == BLCKSZ);
1080
1081 *blocknum = lt->curBlockNumber;
1082 *offset = lt->pos;
1083 }
1084
1085 /*
1086 * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1087 */
1088 long
LogicalTapeSetBlocks(LogicalTapeSet * lts)1089 LogicalTapeSetBlocks(LogicalTapeSet *lts)
1090 {
1091 return lts->nBlocksAllocated - lts->nHoleBlocks;
1092 }
1093