1 /*-------------------------------------------------------------------------
2 *
3 * logtape.c
4 * Management of "logical tapes" within temporary files.
5 *
6 * This module exists to support sorting via multiple merge passes (see
7 * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 * we implement it on disk by creating a separate file for each "tape",
9 * there is an annoying problem: the peak space usage is at least twice
10 * the volume of actual data to be sorted. (This must be so because each
11 * datum will appear in both the input and output tapes of the final
12 * merge pass. For seven-tape polyphase merge, which is otherwise a
13 * pretty good algorithm, peak usage is more like 4x actual data volume.)
14 *
15 * We can work around this problem by recognizing that any one tape
16 * dataset (with the possible exception of the final output) is written
17 * and read exactly once in a perfectly sequential manner. Therefore,
18 * a datum once read will not be required again, and we can recycle its
19 * space for use by the new tape dataset(s) being generated. In this way,
20 * the total space usage is essentially just the actual data volume, plus
21 * insignificant bookkeeping and start/stop overhead.
22 *
23 * Few OSes allow arbitrary parts of a file to be released back to the OS,
24 * so we have to implement this space-recycling ourselves within a single
25 * logical file. logtape.c exists to perform this bookkeeping and provide
26 * the illusion of N independent tape devices to tuplesort.c. Note that
27 * logtape.c itself depends on buffile.c to provide a "logical file" of
28 * larger size than the underlying OS may support.
29 *
30 * For simplicity, we allocate and release space in the underlying file
31 * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32 * of which blocks in the underlying file belong to which logical tape,
33 * plus any blocks that are free (recycled and not yet reused).
34 * The blocks in each logical tape are remembered using a method borrowed
35 * from the Unix HFS filesystem: we store data block numbers in an
36 * "indirect block". If an indirect block fills up, we write it out to
37 * the underlying file and remember its location in a second-level indirect
38 * block. In the same way second-level blocks are remembered in third-
39 * level blocks, and so on if necessary (of course we're talking huge
40 * amounts of data here). The topmost indirect block of a given logical
41 * tape is never actually written out to the physical file, but all lower-
42 * level indirect blocks will be.
43 *
44 * The initial write pass is guaranteed to fill the underlying file
45 * perfectly sequentially, no matter how data is divided into logical tapes.
46 * Once we begin merge passes, the access pattern becomes considerably
47 * less predictable --- but the seeking involved should be comparable to
48 * what would happen if we kept each logical tape in a separate file,
49 * so there's no serious performance penalty paid to obtain the space
50 * savings of recycling. We try to localize the write accesses by always
51 * writing to the lowest-numbered free block when we have a choice; it's
52 * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
53 * policy for free blocks would be better?)
54 *
55 * To support the above policy of writing to the lowest free block,
56 * ltsGetFreeBlock sorts the list of free block numbers into decreasing
57 * order each time it is asked for a block and the list isn't currently
58 * sorted. This is an efficient way to handle it because we expect cycles
59 * of releasing many blocks followed by re-using many blocks, due to
60 * tuplesort.c's "preread" behavior.
61 *
62 * Since all the bookkeeping and buffer memory is allocated with palloc(),
63 * and the underlying file(s) are made with OpenTemporaryFile, all resources
64 * for a logical tape set are certain to be cleaned up even if processing
65 * is aborted by ereport(ERROR). To avoid confusion, the caller should take
66 * care that all calls for a single LogicalTapeSet are made in the same
67 * palloc context.
68 *
69 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
70 * Portions Copyright (c) 1994, Regents of the University of California
71 *
72 * IDENTIFICATION
73 * src/backend/utils/sort/logtape.c
74 *
75 *-------------------------------------------------------------------------
76 */
77
78 #include "postgres.h"
79
80 #include "storage/buffile.h"
81 #include "utils/logtape.h"
82
83 /*
84 * Block indexes are "long"s, so we can fit this many per indirect block.
85 * NB: we assume this is an exact fit!
86 */
87 #define BLOCKS_PER_INDIR_BLOCK ((int) (BLCKSZ / sizeof(long)))
88
89 /*
90 * We use a struct like this for each active indirection level of each
91 * logical tape. If the indirect block is not the highest level of its
92 * tape, the "nextup" link points to the next higher level. Only the
93 * "ptrs" array is written out if we have to dump the indirect block to
94 * disk. If "ptrs" is not completely full, we store -1L in the first
95 * unused slot at completion of the write phase for the logical tape.
96 */
97 typedef struct IndirectBlock
98 {
99 int nextSlot; /* next pointer slot to write or read */
100 struct IndirectBlock *nextup; /* parent indirect level, or NULL if
101 * top */
102 long ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */
103 } IndirectBlock;
104
105 /*
106 * This data structure represents a single "logical tape" within the set
107 * of logical tapes stored in the same file. We must keep track of the
108 * current partially-read-or-written data block as well as the active
109 * indirect block level(s).
110 */
111 typedef struct LogicalTape
112 {
113 IndirectBlock *indirect; /* bottom of my indirect-block hierarchy */
114 bool writing; /* T while in write phase */
115 bool frozen; /* T if blocks should not be freed when read */
116 bool dirty; /* does buffer need to be written? */
117
118 /*
119 * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
120 * lastBlockBytes. BUT: we do not update lastBlockBytes during writing,
121 * only at completion of a write phase.
122 */
123 long numFullBlocks; /* number of complete blocks in log tape */
124 int lastBlockBytes; /* valid bytes in last (incomplete) block */
125
126 /*
127 * Buffer for current data block. Note we don't bother to store the
128 * actual file block number of the data block (during the write phase it
129 * hasn't been assigned yet, and during read we don't care anymore). But
130 * we do need the relative block number so we can detect end-of-tape while
131 * reading.
132 */
133 char *buffer; /* physical buffer (separately palloc'd) */
134 long curBlockNumber; /* this block's logical blk# within tape */
135 int pos; /* next read/write position in buffer */
136 int nbytes; /* total # of valid bytes in buffer */
137 } LogicalTape;
138
139 /*
140 * This data structure represents a set of related "logical tapes" sharing
141 * space in a single underlying file. (But that "file" may be multiple files
142 * if needed to escape OS limits on file size; buffile.c handles that for us.)
143 * The number of tapes is fixed at creation.
144 */
145 struct LogicalTapeSet
146 {
147 BufFile *pfile; /* underlying file for whole tape set */
148 long nFileBlocks; /* # of blocks used in underlying file */
149
150 /*
151 * We store the numbers of recycled-and-available blocks in freeBlocks[].
152 * When there are no such blocks, we extend the underlying file.
153 *
154 * If forgetFreeSpace is true then any freed blocks are simply forgotten
155 * rather than being remembered in freeBlocks[]. See notes for
156 * LogicalTapeSetForgetFreeSpace().
157 *
158 * If blocksSorted is true then the block numbers in freeBlocks are in
159 * *decreasing* order, so that removing the last entry gives us the lowest
160 * free block. We re-sort the blocks whenever a block is demanded; this
161 * should be reasonably efficient given the expected usage pattern.
162 */
163 bool forgetFreeSpace; /* are we remembering free blocks? */
164 bool blocksSorted; /* is freeBlocks[] currently in order? */
165 long *freeBlocks; /* resizable array */
166 int nFreeBlocks; /* # of currently free blocks */
167 int freeBlocksLen; /* current allocated length of freeBlocks[] */
168
169 /* The array of logical tapes. */
170 int nTapes; /* # of logical tapes in set */
171 LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
172 };
173
174 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
175 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
176 static long ltsGetFreeBlock(LogicalTapeSet *lts);
177 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
178 static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
179 long blocknum);
180 static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
181 IndirectBlock *indirect,
182 bool freezing);
183 static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
184 IndirectBlock *indirect);
185 static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
186 IndirectBlock *indirect,
187 bool frozen);
188 static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
189 IndirectBlock *indirect);
190 static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
191
192
193 /*
194 * Write a block-sized buffer to the specified block of the underlying file.
195 *
196 * NB: should not attempt to write beyond current end of file (ie, create
197 * "holes" in file), since BufFile doesn't allow that. The first write pass
198 * must write blocks sequentially.
199 *
200 * No need for an error return convention; we ereport() on any error.
201 */
202 static void
ltsWriteBlock(LogicalTapeSet * lts,long blocknum,void * buffer)203 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
204 {
205 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
206 ereport(ERROR,
207 (errcode_for_file_access(),
208 errmsg("could not seek to block %ld of temporary file",
209 blocknum)));
210 BufFileWrite(lts->pfile, buffer, BLCKSZ);
211 }
212
213 /*
214 * Read a block-sized buffer from the specified block of the underlying file.
215 *
216 * No need for an error return convention; we ereport() on any error. This
217 * module should never attempt to read a block it doesn't know is there.
218 */
219 static void
ltsReadBlock(LogicalTapeSet * lts,long blocknum,void * buffer)220 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
221 {
222 size_t nread;
223
224 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
225 ereport(ERROR,
226 (errcode_for_file_access(),
227 errmsg("could not seek to block %ld of temporary file",
228 blocknum)));
229 nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
230 if (nread != BLCKSZ)
231 ereport(ERROR,
232 (errcode_for_file_access(),
233 errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
234 blocknum, nread, (size_t) BLCKSZ)));
235 }
236
237 /*
238 * qsort comparator for sorting freeBlocks[] into decreasing order.
239 */
240 static int
freeBlocks_cmp(const void * a,const void * b)241 freeBlocks_cmp(const void *a, const void *b)
242 {
243 long ablk = *((const long *) a);
244 long bblk = *((const long *) b);
245
246 /* can't just subtract because long might be wider than int */
247 if (ablk < bblk)
248 return 1;
249 if (ablk > bblk)
250 return -1;
251 return 0;
252 }
253
254 /*
255 * Select a currently unused block for writing to.
256 *
257 * NB: should only be called when writer is ready to write immediately,
258 * to ensure that first write pass is sequential.
259 */
260 static long
ltsGetFreeBlock(LogicalTapeSet * lts)261 ltsGetFreeBlock(LogicalTapeSet *lts)
262 {
263 /*
264 * If there are multiple free blocks, we select the one appearing last in
265 * freeBlocks[] (after sorting the array if needed). If there are none,
266 * assign the next block at the end of the file.
267 */
268 if (lts->nFreeBlocks > 0)
269 {
270 if (!lts->blocksSorted)
271 {
272 qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
273 sizeof(long), freeBlocks_cmp);
274 lts->blocksSorted = true;
275 }
276 return lts->freeBlocks[--lts->nFreeBlocks];
277 }
278 else
279 return lts->nFileBlocks++;
280 }
281
282 /*
283 * Return a block# to the freelist.
284 */
285 static void
ltsReleaseBlock(LogicalTapeSet * lts,long blocknum)286 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
287 {
288 int ndx;
289
290 /*
291 * Do nothing if we're no longer interested in remembering free space.
292 */
293 if (lts->forgetFreeSpace)
294 return;
295
296 /*
297 * Enlarge freeBlocks array if full.
298 */
299 if (lts->nFreeBlocks >= lts->freeBlocksLen)
300 {
301 lts->freeBlocksLen *= 2;
302 lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
303 lts->freeBlocksLen * sizeof(long));
304 }
305
306 /*
307 * Add blocknum to array, and mark the array unsorted if it's no longer in
308 * decreasing order.
309 */
310 ndx = lts->nFreeBlocks++;
311 lts->freeBlocks[ndx] = blocknum;
312 if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
313 lts->blocksSorted = false;
314 }
315
316 /*
317 * These routines manipulate indirect-block hierarchies. All are recursive
318 * so that they don't have any specific limit on the depth of hierarchy.
319 */
320
321 /*
322 * Record a data block number in a logical tape's lowest indirect block,
323 * or record an indirect block's number in the next higher indirect level.
324 */
325 static void
ltsRecordBlockNum(LogicalTapeSet * lts,IndirectBlock * indirect,long blocknum)326 ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
327 long blocknum)
328 {
329 if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
330 {
331 /*
332 * This indirect block is full, so dump it out and recursively save
333 * its address in the next indirection level. Create a new
334 * indirection level if there wasn't one before.
335 */
336 long indirblock = ltsGetFreeBlock(lts);
337
338 ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
339 if (indirect->nextup == NULL)
340 {
341 indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
342 indirect->nextup->nextSlot = 0;
343 indirect->nextup->nextup = NULL;
344 }
345 ltsRecordBlockNum(lts, indirect->nextup, indirblock);
346
347 /*
348 * Reset to fill another indirect block at this level.
349 */
350 indirect->nextSlot = 0;
351 }
352 indirect->ptrs[indirect->nextSlot++] = blocknum;
353 }
354
355 /*
356 * Reset a logical tape's indirect-block hierarchy after a write pass
357 * to prepare for reading. We dump out partly-filled blocks except
358 * at the top of the hierarchy, and we rewind each level to the start.
359 * This call returns the first data block number, or -1L if the tape
360 * is empty.
361 *
362 * Unless 'freezing' is true, release indirect blocks to the free pool after
363 * reading them.
364 */
365 static long
ltsRewindIndirectBlock(LogicalTapeSet * lts,IndirectBlock * indirect,bool freezing)366 ltsRewindIndirectBlock(LogicalTapeSet *lts,
367 IndirectBlock *indirect,
368 bool freezing)
369 {
370 /* Handle case of never-written-to tape */
371 if (indirect == NULL)
372 return -1L;
373
374 /* Insert sentinel if block is not full */
375 if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
376 indirect->ptrs[indirect->nextSlot] = -1L;
377
378 /*
379 * If block is not topmost, write it out, and recurse to obtain address of
380 * first block in this hierarchy level. Read that one in.
381 */
382 if (indirect->nextup != NULL)
383 {
384 long indirblock = ltsGetFreeBlock(lts);
385
386 ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
387 ltsRecordBlockNum(lts, indirect->nextup, indirblock);
388 indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
389 Assert(indirblock != -1L);
390 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
391 if (!freezing)
392 ltsReleaseBlock(lts, indirblock);
393 }
394
395 /*
396 * Reset my next-block pointer, and then fetch a block number if any.
397 */
398 indirect->nextSlot = 0;
399 if (indirect->ptrs[0] == -1L)
400 return -1L;
401 return indirect->ptrs[indirect->nextSlot++];
402 }
403
404 /*
405 * Rewind a previously-frozen indirect-block hierarchy for another read pass.
406 * This call returns the first data block number, or -1L if the tape
407 * is empty.
408 */
409 static long
ltsRewindFrozenIndirectBlock(LogicalTapeSet * lts,IndirectBlock * indirect)410 ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
411 IndirectBlock *indirect)
412 {
413 /* Handle case of never-written-to tape */
414 if (indirect == NULL)
415 return -1L;
416
417 /*
418 * If block is not topmost, recurse to obtain address of first block in
419 * this hierarchy level. Read that one in.
420 */
421 if (indirect->nextup != NULL)
422 {
423 long indirblock;
424
425 indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
426 Assert(indirblock != -1L);
427 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
428 }
429
430 /*
431 * Reset my next-block pointer, and then fetch a block number if any.
432 */
433 indirect->nextSlot = 0;
434 if (indirect->ptrs[0] == -1L)
435 return -1L;
436 return indirect->ptrs[indirect->nextSlot++];
437 }
438
439 /*
440 * Obtain next data block number in the forward direction, or -1L if no more.
441 *
442 * Unless 'frozen' is true, release indirect blocks to the free pool after
443 * reading them.
444 */
445 static long
ltsRecallNextBlockNum(LogicalTapeSet * lts,IndirectBlock * indirect,bool frozen)446 ltsRecallNextBlockNum(LogicalTapeSet *lts,
447 IndirectBlock *indirect,
448 bool frozen)
449 {
450 /* Handle case of never-written-to tape */
451 if (indirect == NULL)
452 return -1L;
453
454 if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
455 indirect->ptrs[indirect->nextSlot] == -1L)
456 {
457 long indirblock;
458
459 if (indirect->nextup == NULL)
460 return -1L; /* nothing left at this level */
461 indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
462 if (indirblock == -1L)
463 return -1L; /* nothing left at this level */
464 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
465 if (!frozen)
466 ltsReleaseBlock(lts, indirblock);
467 indirect->nextSlot = 0;
468 }
469 if (indirect->ptrs[indirect->nextSlot] == -1L)
470 return -1L;
471 return indirect->ptrs[indirect->nextSlot++];
472 }
473
474 /*
475 * Obtain next data block number in the reverse direction, or -1L if no more.
476 *
477 * Note this fetches the block# before the one last returned, no matter which
478 * direction of call returned that one. If we fail, no change in state.
479 *
480 * This routine can only be used in 'frozen' state, so there's no need to
481 * pass a parameter telling whether to release blocks ... we never do.
482 */
483 static long
ltsRecallPrevBlockNum(LogicalTapeSet * lts,IndirectBlock * indirect)484 ltsRecallPrevBlockNum(LogicalTapeSet *lts,
485 IndirectBlock *indirect)
486 {
487 /* Handle case of never-written-to tape */
488 if (indirect == NULL)
489 return -1L;
490
491 if (indirect->nextSlot <= 1)
492 {
493 long indirblock;
494
495 if (indirect->nextup == NULL)
496 return -1L; /* nothing left at this level */
497 indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
498 if (indirblock == -1L)
499 return -1L; /* nothing left at this level */
500 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
501
502 /*
503 * The previous block would only have been written out if full, so we
504 * need not search it for a -1 sentinel.
505 */
506 indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
507 }
508 indirect->nextSlot--;
509 return indirect->ptrs[indirect->nextSlot - 1];
510 }
511
512
513 /*
514 * Create a set of logical tapes in a temporary underlying file.
515 *
516 * Each tape is initialized in write state.
517 */
518 LogicalTapeSet *
LogicalTapeSetCreate(int ntapes)519 LogicalTapeSetCreate(int ntapes)
520 {
521 LogicalTapeSet *lts;
522 LogicalTape *lt;
523 int i;
524
525 /*
526 * Create top-level struct including per-tape LogicalTape structs.
527 */
528 Assert(ntapes > 0);
529 lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
530 ntapes * sizeof(LogicalTape));
531 lts->pfile = BufFileCreateTemp(false);
532 lts->nFileBlocks = 0L;
533 lts->forgetFreeSpace = false;
534 lts->blocksSorted = true; /* a zero-length array is sorted ... */
535 lts->freeBlocksLen = 32; /* reasonable initial guess */
536 lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
537 lts->nFreeBlocks = 0;
538 lts->nTapes = ntapes;
539
540 /*
541 * Initialize per-tape structs. Note we allocate the I/O buffer and
542 * first-level indirect block for a tape only when it is first actually
543 * written to. This avoids wasting memory space when tuplesort.c
544 * overestimates the number of tapes needed.
545 */
546 for (i = 0; i < ntapes; i++)
547 {
548 lt = <s->tapes[i];
549 lt->indirect = NULL;
550 lt->writing = true;
551 lt->frozen = false;
552 lt->dirty = false;
553 lt->numFullBlocks = 0L;
554 lt->lastBlockBytes = 0;
555 lt->buffer = NULL;
556 lt->curBlockNumber = 0L;
557 lt->pos = 0;
558 lt->nbytes = 0;
559 }
560 return lts;
561 }
562
563 /*
564 * Close a logical tape set and release all resources.
565 */
566 void
LogicalTapeSetClose(LogicalTapeSet * lts)567 LogicalTapeSetClose(LogicalTapeSet *lts)
568 {
569 LogicalTape *lt;
570 IndirectBlock *ib,
571 *nextib;
572 int i;
573
574 BufFileClose(lts->pfile);
575 for (i = 0; i < lts->nTapes; i++)
576 {
577 lt = <s->tapes[i];
578 for (ib = lt->indirect; ib != NULL; ib = nextib)
579 {
580 nextib = ib->nextup;
581 pfree(ib);
582 }
583 if (lt->buffer)
584 pfree(lt->buffer);
585 }
586 pfree(lts->freeBlocks);
587 pfree(lts);
588 }
589
590 /*
591 * Mark a logical tape set as not needing management of free space anymore.
592 *
593 * This should be called if the caller does not intend to write any more data
594 * into the tape set, but is reading from un-frozen tapes. Since no more
595 * writes are planned, remembering free blocks is no longer useful. Setting
596 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
597 * is not designed to handle large numbers of free blocks.
598 */
599 void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet * lts)600 LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
601 {
602 lts->forgetFreeSpace = true;
603 }
604
605 /*
606 * Dump the dirty buffer of a logical tape.
607 */
608 static void
ltsDumpBuffer(LogicalTapeSet * lts,LogicalTape * lt)609 ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
610 {
611 long datablock = ltsGetFreeBlock(lts);
612
613 Assert(lt->dirty);
614 ltsWriteBlock(lts, datablock, (void *) lt->buffer);
615 ltsRecordBlockNum(lts, lt->indirect, datablock);
616 lt->dirty = false;
617 /* Caller must do other state update as needed */
618 }
619
620 /*
621 * Write to a logical tape.
622 *
623 * There are no error returns; we ereport() on failure.
624 */
625 void
LogicalTapeWrite(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)626 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
627 void *ptr, size_t size)
628 {
629 LogicalTape *lt;
630 size_t nthistime;
631
632 Assert(tapenum >= 0 && tapenum < lts->nTapes);
633 lt = <s->tapes[tapenum];
634 Assert(lt->writing);
635
636 /* Allocate data buffer and first indirect block on first write */
637 if (lt->buffer == NULL)
638 lt->buffer = (char *) palloc(BLCKSZ);
639 if (lt->indirect == NULL)
640 {
641 lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
642 lt->indirect->nextSlot = 0;
643 lt->indirect->nextup = NULL;
644 }
645
646 while (size > 0)
647 {
648 if (lt->pos >= BLCKSZ)
649 {
650 /* Buffer full, dump it out */
651 if (lt->dirty)
652 ltsDumpBuffer(lts, lt);
653 else
654 {
655 /* Hmm, went directly from reading to writing? */
656 elog(ERROR, "invalid logtape state: should be dirty");
657 }
658 lt->numFullBlocks++;
659 lt->curBlockNumber++;
660 lt->pos = 0;
661 lt->nbytes = 0;
662 }
663
664 nthistime = BLCKSZ - lt->pos;
665 if (nthistime > size)
666 nthistime = size;
667 Assert(nthistime > 0);
668
669 memcpy(lt->buffer + lt->pos, ptr, nthistime);
670
671 lt->dirty = true;
672 lt->pos += nthistime;
673 if (lt->nbytes < lt->pos)
674 lt->nbytes = lt->pos;
675 ptr = (void *) ((char *) ptr + nthistime);
676 size -= nthistime;
677 }
678 }
679
680 /*
681 * Rewind logical tape and switch from writing to reading or vice versa.
682 *
683 * Unless the tape has been "frozen" in read state, forWrite must be the
684 * opposite of the previous tape state.
685 */
686 void
LogicalTapeRewind(LogicalTapeSet * lts,int tapenum,bool forWrite)687 LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
688 {
689 LogicalTape *lt;
690 long datablocknum;
691
692 Assert(tapenum >= 0 && tapenum < lts->nTapes);
693 lt = <s->tapes[tapenum];
694
695 if (!forWrite)
696 {
697 if (lt->writing)
698 {
699 /*
700 * Completion of a write phase. Flush last partial data block,
701 * flush any partial indirect blocks, rewind for normal
702 * (destructive) read.
703 */
704 if (lt->dirty)
705 ltsDumpBuffer(lts, lt);
706 lt->lastBlockBytes = lt->nbytes;
707 lt->writing = false;
708 datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
709 }
710 else
711 {
712 /*
713 * This is only OK if tape is frozen; we rewind for (another) read
714 * pass.
715 */
716 Assert(lt->frozen);
717 datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
718 }
719 /* Read the first block, or reset if tape is empty */
720 lt->curBlockNumber = 0L;
721 lt->pos = 0;
722 lt->nbytes = 0;
723 if (datablocknum != -1L)
724 {
725 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
726 if (!lt->frozen)
727 ltsReleaseBlock(lts, datablocknum);
728 lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
729 BLCKSZ : lt->lastBlockBytes;
730 }
731 }
732 else
733 {
734 /*
735 * Completion of a read phase. Rewind and prepare for write.
736 *
737 * NOTE: we assume the caller has read the tape to the end; otherwise
738 * untouched data and indirect blocks will not have been freed. We
739 * could add more code to free any unread blocks, but in current usage
740 * of this module it'd be useless code.
741 */
742 IndirectBlock *ib,
743 *nextib;
744
745 Assert(!lt->writing && !lt->frozen);
746 /* Must truncate the indirect-block hierarchy down to one level. */
747 if (lt->indirect)
748 {
749 for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
750 {
751 nextib = ib->nextup;
752 pfree(ib);
753 }
754 lt->indirect->nextSlot = 0;
755 lt->indirect->nextup = NULL;
756 }
757 lt->writing = true;
758 lt->dirty = false;
759 lt->numFullBlocks = 0L;
760 lt->lastBlockBytes = 0;
761 lt->curBlockNumber = 0L;
762 lt->pos = 0;
763 lt->nbytes = 0;
764 }
765 }
766
767 /*
768 * Read from a logical tape.
769 *
770 * Early EOF is indicated by return value less than #bytes requested.
771 */
772 size_t
LogicalTapeRead(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)773 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
774 void *ptr, size_t size)
775 {
776 LogicalTape *lt;
777 size_t nread = 0;
778 size_t nthistime;
779
780 Assert(tapenum >= 0 && tapenum < lts->nTapes);
781 lt = <s->tapes[tapenum];
782 Assert(!lt->writing);
783
784 while (size > 0)
785 {
786 if (lt->pos >= lt->nbytes)
787 {
788 /* Try to load more data into buffer. */
789 long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
790 lt->frozen);
791
792 if (datablocknum == -1L)
793 break; /* EOF */
794 lt->curBlockNumber++;
795 lt->pos = 0;
796 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
797 if (!lt->frozen)
798 ltsReleaseBlock(lts, datablocknum);
799 lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
800 BLCKSZ : lt->lastBlockBytes;
801 if (lt->nbytes <= 0)
802 break; /* EOF (possible here?) */
803 }
804
805 nthistime = lt->nbytes - lt->pos;
806 if (nthistime > size)
807 nthistime = size;
808 Assert(nthistime > 0);
809
810 memcpy(ptr, lt->buffer + lt->pos, nthistime);
811
812 lt->pos += nthistime;
813 ptr = (void *) ((char *) ptr + nthistime);
814 size -= nthistime;
815 nread += nthistime;
816 }
817
818 return nread;
819 }
820
821 /*
822 * "Freeze" the contents of a tape so that it can be read multiple times
823 * and/or read backwards. Once a tape is frozen, its contents will not
824 * be released until the LogicalTapeSet is destroyed. This is expected
825 * to be used only for the final output pass of a merge.
826 *
827 * This *must* be called just at the end of a write pass, before the
828 * tape is rewound (after rewind is too late!). It performs a rewind
829 * and switch to read mode "for free". An immediately following rewind-
830 * for-read call is OK but not necessary.
831 */
832 void
LogicalTapeFreeze(LogicalTapeSet * lts,int tapenum)833 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
834 {
835 LogicalTape *lt;
836 long datablocknum;
837
838 Assert(tapenum >= 0 && tapenum < lts->nTapes);
839 lt = <s->tapes[tapenum];
840 Assert(lt->writing);
841
842 /*
843 * Completion of a write phase. Flush last partial data block, flush any
844 * partial indirect blocks, rewind for nondestructive read.
845 */
846 if (lt->dirty)
847 ltsDumpBuffer(lts, lt);
848 lt->lastBlockBytes = lt->nbytes;
849 lt->writing = false;
850 lt->frozen = true;
851 datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
852 /* Read the first block, or reset if tape is empty */
853 lt->curBlockNumber = 0L;
854 lt->pos = 0;
855 lt->nbytes = 0;
856 if (datablocknum != -1L)
857 {
858 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
859 lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
860 BLCKSZ : lt->lastBlockBytes;
861 }
862 }
863
864 /*
865 * Backspace the tape a given number of bytes. (We also support a more
866 * general seek interface, see below.)
867 *
868 * *Only* a frozen-for-read tape can be backed up; we don't support
869 * random access during write, and an unfrozen read tape may have
870 * already discarded the desired data!
871 *
872 * Return value is TRUE if seek successful, FALSE if there isn't that much
873 * data before the current point (in which case there's no state change).
874 */
875 bool
LogicalTapeBackspace(LogicalTapeSet * lts,int tapenum,size_t size)876 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
877 {
878 LogicalTape *lt;
879 long nblocks;
880 int newpos;
881
882 Assert(tapenum >= 0 && tapenum < lts->nTapes);
883 lt = <s->tapes[tapenum];
884 Assert(lt->frozen);
885
886 /*
887 * Easy case for seek within current block.
888 */
889 if (size <= (size_t) lt->pos)
890 {
891 lt->pos -= (int) size;
892 return true;
893 }
894
895 /*
896 * Not-so-easy case. Figure out whether it's possible at all.
897 */
898 size -= (size_t) lt->pos; /* part within this block */
899 nblocks = size / BLCKSZ;
900 size = size % BLCKSZ;
901 if (size)
902 {
903 nblocks++;
904 newpos = (int) (BLCKSZ - size);
905 }
906 else
907 newpos = 0;
908 if (nblocks > lt->curBlockNumber)
909 return false; /* a seek too far... */
910
911 /*
912 * OK, we need to back up nblocks blocks. This implementation would be
913 * pretty inefficient for long seeks, but we really aren't expecting that
914 * (a seek over one tuple is typical).
915 */
916 while (nblocks-- > 0)
917 {
918 long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
919
920 if (datablocknum == -1L)
921 elog(ERROR, "unexpected end of tape");
922 lt->curBlockNumber--;
923 if (nblocks == 0)
924 {
925 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
926 lt->nbytes = BLCKSZ;
927 }
928 }
929 lt->pos = newpos;
930 return true;
931 }
932
933 /*
934 * Seek to an arbitrary position in a logical tape.
935 *
936 * *Only* a frozen-for-read tape can be seeked.
937 *
938 * Return value is TRUE if seek successful, FALSE if there isn't that much
939 * data in the tape (in which case there's no state change).
940 */
941 bool
LogicalTapeSeek(LogicalTapeSet * lts,int tapenum,long blocknum,int offset)942 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
943 long blocknum, int offset)
944 {
945 LogicalTape *lt;
946
947 Assert(tapenum >= 0 && tapenum < lts->nTapes);
948 lt = <s->tapes[tapenum];
949 Assert(lt->frozen);
950 Assert(offset >= 0 && offset <= BLCKSZ);
951
952 /*
953 * Easy case for seek within current block.
954 */
955 if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
956 {
957 lt->pos = offset;
958 return true;
959 }
960
961 /*
962 * Not-so-easy case. Figure out whether it's possible at all.
963 */
964 if (blocknum < 0 || blocknum > lt->numFullBlocks ||
965 (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
966 return false;
967
968 /*
969 * OK, advance or back up to the target block. This implementation would
970 * be pretty inefficient for long seeks, but we really aren't expecting
971 * that (a seek over one tuple is typical).
972 */
973 while (lt->curBlockNumber > blocknum)
974 {
975 long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
976
977 if (datablocknum == -1L)
978 elog(ERROR, "unexpected end of tape");
979 if (--lt->curBlockNumber == blocknum)
980 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
981 }
982 while (lt->curBlockNumber < blocknum)
983 {
984 long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
985 lt->frozen);
986
987 if (datablocknum == -1L)
988 elog(ERROR, "unexpected end of tape");
989 if (++lt->curBlockNumber == blocknum)
990 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
991 }
992 lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
993 BLCKSZ : lt->lastBlockBytes;
994 lt->pos = offset;
995 return true;
996 }
997
998 /*
999 * Obtain current position in a form suitable for a later LogicalTapeSeek.
1000 *
1001 * NOTE: it'd be OK to do this during write phase with intention of using
1002 * the position for a seek after freezing. Not clear if anyone needs that.
1003 */
1004 void
LogicalTapeTell(LogicalTapeSet * lts,int tapenum,long * blocknum,int * offset)1005 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
1006 long *blocknum, int *offset)
1007 {
1008 LogicalTape *lt;
1009
1010 Assert(tapenum >= 0 && tapenum < lts->nTapes);
1011 lt = <s->tapes[tapenum];
1012 *blocknum = lt->curBlockNumber;
1013 *offset = lt->pos;
1014 }
1015
1016 /*
1017 * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1018 */
1019 long
LogicalTapeSetBlocks(LogicalTapeSet * lts)1020 LogicalTapeSetBlocks(LogicalTapeSet *lts)
1021 {
1022 return lts->nFileBlocks;
1023 }
1024