1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  *	  Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted.  (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass.  For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner.  Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated.  In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file.  logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c.  Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape are remembered using a method borrowed
35  * from the Unix HFS filesystem: we store data block numbers in an
36  * "indirect block".  If an indirect block fills up, we write it out to
37  * the underlying file and remember its location in a second-level indirect
38  * block.  In the same way second-level blocks are remembered in third-
39  * level blocks, and so on if necessary (of course we're talking huge
40  * amounts of data here).  The topmost indirect block of a given logical
41  * tape is never actually written out to the physical file, but all lower-
42  * level indirect blocks will be.
43  *
44  * The initial write pass is guaranteed to fill the underlying file
45  * perfectly sequentially, no matter how data is divided into logical tapes.
46  * Once we begin merge passes, the access pattern becomes considerably
47  * less predictable --- but the seeking involved should be comparable to
48  * what would happen if we kept each logical tape in a separate file,
49  * so there's no serious performance penalty paid to obtain the space
50  * savings of recycling.  We try to localize the write accesses by always
51  * writing to the lowest-numbered free block when we have a choice; it's
52  * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
53  * policy for free blocks would be better?)
54  *
55  * To support the above policy of writing to the lowest free block,
56  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
57  * order each time it is asked for a block and the list isn't currently
58  * sorted.  This is an efficient way to handle it because we expect cycles
59  * of releasing many blocks followed by re-using many blocks, due to
60  * tuplesort.c's "preread" behavior.
61  *
62  * Since all the bookkeeping and buffer memory is allocated with palloc(),
63  * and the underlying file(s) are made with OpenTemporaryFile, all resources
64  * for a logical tape set are certain to be cleaned up even if processing
65  * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
66  * care that all calls for a single LogicalTapeSet are made in the same
67  * palloc context.
68  *
69  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
70  * Portions Copyright (c) 1994, Regents of the University of California
71  *
72  * IDENTIFICATION
73  *	  src/backend/utils/sort/logtape.c
74  *
75  *-------------------------------------------------------------------------
76  */
77 
78 #include "postgres.h"
79 
80 #include "storage/buffile.h"
81 #include "utils/logtape.h"
82 
83 /*
84  * Block indexes are "long"s, so we can fit this many per indirect block.
85  * NB: we assume this is an exact fit!
86  */
87 #define BLOCKS_PER_INDIR_BLOCK	((int) (BLCKSZ / sizeof(long)))
88 
89 /*
90  * We use a struct like this for each active indirection level of each
91  * logical tape.  If the indirect block is not the highest level of its
92  * tape, the "nextup" link points to the next higher level.  Only the
93  * "ptrs" array is written out if we have to dump the indirect block to
94  * disk.  If "ptrs" is not completely full, we store -1L in the first
95  * unused slot at completion of the write phase for the logical tape.
96  */
97 typedef struct IndirectBlock
98 {
99 	int			nextSlot;		/* next pointer slot to write or read */
100 	struct IndirectBlock *nextup;		/* parent indirect level, or NULL if
101 										 * top */
102 	long		ptrs[BLOCKS_PER_INDIR_BLOCK];	/* indexes of contained blocks */
103 } IndirectBlock;
104 
105 /*
106  * This data structure represents a single "logical tape" within the set
107  * of logical tapes stored in the same file.  We must keep track of the
108  * current partially-read-or-written data block as well as the active
109  * indirect block level(s).
110  */
111 typedef struct LogicalTape
112 {
113 	IndirectBlock *indirect;	/* bottom of my indirect-block hierarchy */
114 	bool		writing;		/* T while in write phase */
115 	bool		frozen;			/* T if blocks should not be freed when read */
116 	bool		dirty;			/* does buffer need to be written? */
117 
118 	/*
119 	 * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
120 	 * lastBlockBytes.  BUT: we do not update lastBlockBytes during writing,
121 	 * only at completion of a write phase.
122 	 */
123 	long		numFullBlocks;	/* number of complete blocks in log tape */
124 	int			lastBlockBytes; /* valid bytes in last (incomplete) block */
125 
126 	/*
127 	 * Buffer for current data block.  Note we don't bother to store the
128 	 * actual file block number of the data block (during the write phase it
129 	 * hasn't been assigned yet, and during read we don't care anymore). But
130 	 * we do need the relative block number so we can detect end-of-tape while
131 	 * reading.
132 	 */
133 	char	   *buffer;			/* physical buffer (separately palloc'd) */
134 	long		curBlockNumber; /* this block's logical blk# within tape */
135 	int			pos;			/* next read/write position in buffer */
136 	int			nbytes;			/* total # of valid bytes in buffer */
137 } LogicalTape;
138 
139 /*
140  * This data structure represents a set of related "logical tapes" sharing
141  * space in a single underlying file.  (But that "file" may be multiple files
142  * if needed to escape OS limits on file size; buffile.c handles that for us.)
143  * The number of tapes is fixed at creation.
144  */
145 struct LogicalTapeSet
146 {
147 	BufFile    *pfile;			/* underlying file for whole tape set */
148 	long		nFileBlocks;	/* # of blocks used in underlying file */
149 
150 	/*
151 	 * We store the numbers of recycled-and-available blocks in freeBlocks[].
152 	 * When there are no such blocks, we extend the underlying file.
153 	 *
154 	 * If forgetFreeSpace is true then any freed blocks are simply forgotten
155 	 * rather than being remembered in freeBlocks[].  See notes for
156 	 * LogicalTapeSetForgetFreeSpace().
157 	 *
158 	 * If blocksSorted is true then the block numbers in freeBlocks are in
159 	 * *decreasing* order, so that removing the last entry gives us the lowest
160 	 * free block.  We re-sort the blocks whenever a block is demanded; this
161 	 * should be reasonably efficient given the expected usage pattern.
162 	 */
163 	bool		forgetFreeSpace;	/* are we remembering free blocks? */
164 	bool		blocksSorted;	/* is freeBlocks[] currently in order? */
165 	long	   *freeBlocks;		/* resizable array */
166 	int			nFreeBlocks;	/* # of currently free blocks */
167 	int			freeBlocksLen;	/* current allocated length of freeBlocks[] */
168 
169 	/* The array of logical tapes. */
170 	int			nTapes;			/* # of logical tapes in set */
171 	LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER];	/* has nTapes nentries */
172 };
173 
174 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
175 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
176 static long ltsGetFreeBlock(LogicalTapeSet *lts);
177 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
178 static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
179 				  long blocknum);
180 static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
181 					   IndirectBlock *indirect,
182 					   bool freezing);
183 static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
184 							 IndirectBlock *indirect);
185 static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
186 					  IndirectBlock *indirect,
187 					  bool frozen);
188 static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
189 					  IndirectBlock *indirect);
190 static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
191 
192 
193 /*
194  * Write a block-sized buffer to the specified block of the underlying file.
195  *
196  * NB: should not attempt to write beyond current end of file (ie, create
197  * "holes" in file), since BufFile doesn't allow that.  The first write pass
198  * must write blocks sequentially.
199  *
200  * No need for an error return convention; we ereport() on any error.
201  */
202 static void
ltsWriteBlock(LogicalTapeSet * lts,long blocknum,void * buffer)203 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
204 {
205 	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
206 		ereport(ERROR,
207 				(errcode_for_file_access(),
208 				 errmsg("could not seek to block %ld of temporary file",
209 						blocknum)));
210 	BufFileWrite(lts->pfile, buffer, BLCKSZ);
211 }
212 
213 /*
214  * Read a block-sized buffer from the specified block of the underlying file.
215  *
216  * No need for an error return convention; we ereport() on any error.   This
217  * module should never attempt to read a block it doesn't know is there.
218  */
219 static void
ltsReadBlock(LogicalTapeSet * lts,long blocknum,void * buffer)220 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
221 {
222 	size_t		nread;
223 
224 	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
225 		ereport(ERROR,
226 				(errcode_for_file_access(),
227 				 errmsg("could not seek to block %ld of temporary file",
228 						blocknum)));
229 	nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
230 	if (nread != BLCKSZ)
231 		ereport(ERROR,
232 				(errcode_for_file_access(),
233 				 errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
234 						blocknum, nread, (size_t) BLCKSZ)));
235 }
236 
237 /*
238  * qsort comparator for sorting freeBlocks[] into decreasing order.
239  */
240 static int
freeBlocks_cmp(const void * a,const void * b)241 freeBlocks_cmp(const void *a, const void *b)
242 {
243 	long		ablk = *((const long *) a);
244 	long		bblk = *((const long *) b);
245 
246 	/* can't just subtract because long might be wider than int */
247 	if (ablk < bblk)
248 		return 1;
249 	if (ablk > bblk)
250 		return -1;
251 	return 0;
252 }
253 
254 /*
255  * Select a currently unused block for writing to.
256  *
257  * NB: should only be called when writer is ready to write immediately,
258  * to ensure that first write pass is sequential.
259  */
260 static long
ltsGetFreeBlock(LogicalTapeSet * lts)261 ltsGetFreeBlock(LogicalTapeSet *lts)
262 {
263 	/*
264 	 * If there are multiple free blocks, we select the one appearing last in
265 	 * freeBlocks[] (after sorting the array if needed).  If there are none,
266 	 * assign the next block at the end of the file.
267 	 */
268 	if (lts->nFreeBlocks > 0)
269 	{
270 		if (!lts->blocksSorted)
271 		{
272 			qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
273 				  sizeof(long), freeBlocks_cmp);
274 			lts->blocksSorted = true;
275 		}
276 		return lts->freeBlocks[--lts->nFreeBlocks];
277 	}
278 	else
279 		return lts->nFileBlocks++;
280 }
281 
282 /*
283  * Return a block# to the freelist.
284  */
285 static void
ltsReleaseBlock(LogicalTapeSet * lts,long blocknum)286 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
287 {
288 	int			ndx;
289 
290 	/*
291 	 * Do nothing if we're no longer interested in remembering free space.
292 	 */
293 	if (lts->forgetFreeSpace)
294 		return;
295 
296 	/*
297 	 * Enlarge freeBlocks array if full.
298 	 */
299 	if (lts->nFreeBlocks >= lts->freeBlocksLen)
300 	{
301 		lts->freeBlocksLen *= 2;
302 		lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
303 										  lts->freeBlocksLen * sizeof(long));
304 	}
305 
306 	/*
307 	 * Add blocknum to array, and mark the array unsorted if it's no longer in
308 	 * decreasing order.
309 	 */
310 	ndx = lts->nFreeBlocks++;
311 	lts->freeBlocks[ndx] = blocknum;
312 	if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
313 		lts->blocksSorted = false;
314 }
315 
316 /*
317  * These routines manipulate indirect-block hierarchies.  All are recursive
318  * so that they don't have any specific limit on the depth of hierarchy.
319  */
320 
321 /*
322  * Record a data block number in a logical tape's lowest indirect block,
323  * or record an indirect block's number in the next higher indirect level.
324  */
325 static void
ltsRecordBlockNum(LogicalTapeSet * lts,IndirectBlock * indirect,long blocknum)326 ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
327 				  long blocknum)
328 {
329 	if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
330 	{
331 		/*
332 		 * This indirect block is full, so dump it out and recursively save
333 		 * its address in the next indirection level.  Create a new
334 		 * indirection level if there wasn't one before.
335 		 */
336 		long		indirblock = ltsGetFreeBlock(lts);
337 
338 		ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
339 		if (indirect->nextup == NULL)
340 		{
341 			indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
342 			indirect->nextup->nextSlot = 0;
343 			indirect->nextup->nextup = NULL;
344 		}
345 		ltsRecordBlockNum(lts, indirect->nextup, indirblock);
346 
347 		/*
348 		 * Reset to fill another indirect block at this level.
349 		 */
350 		indirect->nextSlot = 0;
351 	}
352 	indirect->ptrs[indirect->nextSlot++] = blocknum;
353 }
354 
355 /*
356  * Reset a logical tape's indirect-block hierarchy after a write pass
357  * to prepare for reading.  We dump out partly-filled blocks except
358  * at the top of the hierarchy, and we rewind each level to the start.
359  * This call returns the first data block number, or -1L if the tape
360  * is empty.
361  *
362  * Unless 'freezing' is true, release indirect blocks to the free pool after
363  * reading them.
364  */
365 static long
ltsRewindIndirectBlock(LogicalTapeSet * lts,IndirectBlock * indirect,bool freezing)366 ltsRewindIndirectBlock(LogicalTapeSet *lts,
367 					   IndirectBlock *indirect,
368 					   bool freezing)
369 {
370 	/* Handle case of never-written-to tape */
371 	if (indirect == NULL)
372 		return -1L;
373 
374 	/* Insert sentinel if block is not full */
375 	if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
376 		indirect->ptrs[indirect->nextSlot] = -1L;
377 
378 	/*
379 	 * If block is not topmost, write it out, and recurse to obtain address of
380 	 * first block in this hierarchy level.  Read that one in.
381 	 */
382 	if (indirect->nextup != NULL)
383 	{
384 		long		indirblock = ltsGetFreeBlock(lts);
385 
386 		ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
387 		ltsRecordBlockNum(lts, indirect->nextup, indirblock);
388 		indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
389 		Assert(indirblock != -1L);
390 		ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
391 		if (!freezing)
392 			ltsReleaseBlock(lts, indirblock);
393 	}
394 
395 	/*
396 	 * Reset my next-block pointer, and then fetch a block number if any.
397 	 */
398 	indirect->nextSlot = 0;
399 	if (indirect->ptrs[0] == -1L)
400 		return -1L;
401 	return indirect->ptrs[indirect->nextSlot++];
402 }
403 
404 /*
405  * Rewind a previously-frozen indirect-block hierarchy for another read pass.
406  * This call returns the first data block number, or -1L if the tape
407  * is empty.
408  */
409 static long
ltsRewindFrozenIndirectBlock(LogicalTapeSet * lts,IndirectBlock * indirect)410 ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
411 							 IndirectBlock *indirect)
412 {
413 	/* Handle case of never-written-to tape */
414 	if (indirect == NULL)
415 		return -1L;
416 
417 	/*
418 	 * If block is not topmost, recurse to obtain address of first block in
419 	 * this hierarchy level.  Read that one in.
420 	 */
421 	if (indirect->nextup != NULL)
422 	{
423 		long		indirblock;
424 
425 		indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
426 		Assert(indirblock != -1L);
427 		ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
428 	}
429 
430 	/*
431 	 * Reset my next-block pointer, and then fetch a block number if any.
432 	 */
433 	indirect->nextSlot = 0;
434 	if (indirect->ptrs[0] == -1L)
435 		return -1L;
436 	return indirect->ptrs[indirect->nextSlot++];
437 }
438 
439 /*
440  * Obtain next data block number in the forward direction, or -1L if no more.
441  *
442  * Unless 'frozen' is true, release indirect blocks to the free pool after
443  * reading them.
444  */
445 static long
ltsRecallNextBlockNum(LogicalTapeSet * lts,IndirectBlock * indirect,bool frozen)446 ltsRecallNextBlockNum(LogicalTapeSet *lts,
447 					  IndirectBlock *indirect,
448 					  bool frozen)
449 {
450 	/* Handle case of never-written-to tape */
451 	if (indirect == NULL)
452 		return -1L;
453 
454 	if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
455 		indirect->ptrs[indirect->nextSlot] == -1L)
456 	{
457 		long		indirblock;
458 
459 		if (indirect->nextup == NULL)
460 			return -1L;			/* nothing left at this level */
461 		indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
462 		if (indirblock == -1L)
463 			return -1L;			/* nothing left at this level */
464 		ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
465 		if (!frozen)
466 			ltsReleaseBlock(lts, indirblock);
467 		indirect->nextSlot = 0;
468 	}
469 	if (indirect->ptrs[indirect->nextSlot] == -1L)
470 		return -1L;
471 	return indirect->ptrs[indirect->nextSlot++];
472 }
473 
474 /*
475  * Obtain next data block number in the reverse direction, or -1L if no more.
476  *
477  * Note this fetches the block# before the one last returned, no matter which
478  * direction of call returned that one.  If we fail, no change in state.
479  *
480  * This routine can only be used in 'frozen' state, so there's no need to
481  * pass a parameter telling whether to release blocks ... we never do.
482  */
483 static long
ltsRecallPrevBlockNum(LogicalTapeSet * lts,IndirectBlock * indirect)484 ltsRecallPrevBlockNum(LogicalTapeSet *lts,
485 					  IndirectBlock *indirect)
486 {
487 	/* Handle case of never-written-to tape */
488 	if (indirect == NULL)
489 		return -1L;
490 
491 	if (indirect->nextSlot <= 1)
492 	{
493 		long		indirblock;
494 
495 		if (indirect->nextup == NULL)
496 			return -1L;			/* nothing left at this level */
497 		indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
498 		if (indirblock == -1L)
499 			return -1L;			/* nothing left at this level */
500 		ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
501 
502 		/*
503 		 * The previous block would only have been written out if full, so we
504 		 * need not search it for a -1 sentinel.
505 		 */
506 		indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
507 	}
508 	indirect->nextSlot--;
509 	return indirect->ptrs[indirect->nextSlot - 1];
510 }
511 
512 
513 /*
514  * Create a set of logical tapes in a temporary underlying file.
515  *
516  * Each tape is initialized in write state.
517  */
518 LogicalTapeSet *
LogicalTapeSetCreate(int ntapes)519 LogicalTapeSetCreate(int ntapes)
520 {
521 	LogicalTapeSet *lts;
522 	LogicalTape *lt;
523 	int			i;
524 
525 	/*
526 	 * Create top-level struct including per-tape LogicalTape structs.
527 	 */
528 	Assert(ntapes > 0);
529 	lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
530 									ntapes * sizeof(LogicalTape));
531 	lts->pfile = BufFileCreateTemp(false);
532 	lts->nFileBlocks = 0L;
533 	lts->forgetFreeSpace = false;
534 	lts->blocksSorted = true;	/* a zero-length array is sorted ... */
535 	lts->freeBlocksLen = 32;	/* reasonable initial guess */
536 	lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
537 	lts->nFreeBlocks = 0;
538 	lts->nTapes = ntapes;
539 
540 	/*
541 	 * Initialize per-tape structs.  Note we allocate the I/O buffer and
542 	 * first-level indirect block for a tape only when it is first actually
543 	 * written to.  This avoids wasting memory space when tuplesort.c
544 	 * overestimates the number of tapes needed.
545 	 */
546 	for (i = 0; i < ntapes; i++)
547 	{
548 		lt = &lts->tapes[i];
549 		lt->indirect = NULL;
550 		lt->writing = true;
551 		lt->frozen = false;
552 		lt->dirty = false;
553 		lt->numFullBlocks = 0L;
554 		lt->lastBlockBytes = 0;
555 		lt->buffer = NULL;
556 		lt->curBlockNumber = 0L;
557 		lt->pos = 0;
558 		lt->nbytes = 0;
559 	}
560 	return lts;
561 }
562 
563 /*
564  * Close a logical tape set and release all resources.
565  */
566 void
LogicalTapeSetClose(LogicalTapeSet * lts)567 LogicalTapeSetClose(LogicalTapeSet *lts)
568 {
569 	LogicalTape *lt;
570 	IndirectBlock *ib,
571 			   *nextib;
572 	int			i;
573 
574 	BufFileClose(lts->pfile);
575 	for (i = 0; i < lts->nTapes; i++)
576 	{
577 		lt = &lts->tapes[i];
578 		for (ib = lt->indirect; ib != NULL; ib = nextib)
579 		{
580 			nextib = ib->nextup;
581 			pfree(ib);
582 		}
583 		if (lt->buffer)
584 			pfree(lt->buffer);
585 	}
586 	pfree(lts->freeBlocks);
587 	pfree(lts);
588 }
589 
590 /*
591  * Mark a logical tape set as not needing management of free space anymore.
592  *
593  * This should be called if the caller does not intend to write any more data
594  * into the tape set, but is reading from un-frozen tapes.  Since no more
595  * writes are planned, remembering free blocks is no longer useful.  Setting
596  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
597  * is not designed to handle large numbers of free blocks.
598  */
599 void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet * lts)600 LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
601 {
602 	lts->forgetFreeSpace = true;
603 }
604 
605 /*
606  * Dump the dirty buffer of a logical tape.
607  */
608 static void
ltsDumpBuffer(LogicalTapeSet * lts,LogicalTape * lt)609 ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
610 {
611 	long		datablock = ltsGetFreeBlock(lts);
612 
613 	Assert(lt->dirty);
614 	ltsWriteBlock(lts, datablock, (void *) lt->buffer);
615 	ltsRecordBlockNum(lts, lt->indirect, datablock);
616 	lt->dirty = false;
617 	/* Caller must do other state update as needed */
618 }
619 
620 /*
621  * Write to a logical tape.
622  *
623  * There are no error returns; we ereport() on failure.
624  */
625 void
LogicalTapeWrite(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)626 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
627 				 void *ptr, size_t size)
628 {
629 	LogicalTape *lt;
630 	size_t		nthistime;
631 
632 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
633 	lt = &lts->tapes[tapenum];
634 	Assert(lt->writing);
635 
636 	/* Allocate data buffer and first indirect block on first write */
637 	if (lt->buffer == NULL)
638 		lt->buffer = (char *) palloc(BLCKSZ);
639 	if (lt->indirect == NULL)
640 	{
641 		lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
642 		lt->indirect->nextSlot = 0;
643 		lt->indirect->nextup = NULL;
644 	}
645 
646 	while (size > 0)
647 	{
648 		if (lt->pos >= BLCKSZ)
649 		{
650 			/* Buffer full, dump it out */
651 			if (lt->dirty)
652 				ltsDumpBuffer(lts, lt);
653 			else
654 			{
655 				/* Hmm, went directly from reading to writing? */
656 				elog(ERROR, "invalid logtape state: should be dirty");
657 			}
658 			lt->numFullBlocks++;
659 			lt->curBlockNumber++;
660 			lt->pos = 0;
661 			lt->nbytes = 0;
662 		}
663 
664 		nthistime = BLCKSZ - lt->pos;
665 		if (nthistime > size)
666 			nthistime = size;
667 		Assert(nthistime > 0);
668 
669 		memcpy(lt->buffer + lt->pos, ptr, nthistime);
670 
671 		lt->dirty = true;
672 		lt->pos += nthistime;
673 		if (lt->nbytes < lt->pos)
674 			lt->nbytes = lt->pos;
675 		ptr = (void *) ((char *) ptr + nthistime);
676 		size -= nthistime;
677 	}
678 }
679 
680 /*
681  * Rewind logical tape and switch from writing to reading or vice versa.
682  *
683  * Unless the tape has been "frozen" in read state, forWrite must be the
684  * opposite of the previous tape state.
685  */
686 void
LogicalTapeRewind(LogicalTapeSet * lts,int tapenum,bool forWrite)687 LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
688 {
689 	LogicalTape *lt;
690 	long		datablocknum;
691 
692 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
693 	lt = &lts->tapes[tapenum];
694 
695 	if (!forWrite)
696 	{
697 		if (lt->writing)
698 		{
699 			/*
700 			 * Completion of a write phase.  Flush last partial data block,
701 			 * flush any partial indirect blocks, rewind for normal
702 			 * (destructive) read.
703 			 */
704 			if (lt->dirty)
705 				ltsDumpBuffer(lts, lt);
706 			lt->lastBlockBytes = lt->nbytes;
707 			lt->writing = false;
708 			datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
709 		}
710 		else
711 		{
712 			/*
713 			 * This is only OK if tape is frozen; we rewind for (another) read
714 			 * pass.
715 			 */
716 			Assert(lt->frozen);
717 			datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
718 		}
719 		/* Read the first block, or reset if tape is empty */
720 		lt->curBlockNumber = 0L;
721 		lt->pos = 0;
722 		lt->nbytes = 0;
723 		if (datablocknum != -1L)
724 		{
725 			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
726 			if (!lt->frozen)
727 				ltsReleaseBlock(lts, datablocknum);
728 			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
729 				BLCKSZ : lt->lastBlockBytes;
730 		}
731 	}
732 	else
733 	{
734 		/*
735 		 * Completion of a read phase.  Rewind and prepare for write.
736 		 *
737 		 * NOTE: we assume the caller has read the tape to the end; otherwise
738 		 * untouched data and indirect blocks will not have been freed. We
739 		 * could add more code to free any unread blocks, but in current usage
740 		 * of this module it'd be useless code.
741 		 */
742 		IndirectBlock *ib,
743 				   *nextib;
744 
745 		Assert(!lt->writing && !lt->frozen);
746 		/* Must truncate the indirect-block hierarchy down to one level. */
747 		if (lt->indirect)
748 		{
749 			for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
750 			{
751 				nextib = ib->nextup;
752 				pfree(ib);
753 			}
754 			lt->indirect->nextSlot = 0;
755 			lt->indirect->nextup = NULL;
756 		}
757 		lt->writing = true;
758 		lt->dirty = false;
759 		lt->numFullBlocks = 0L;
760 		lt->lastBlockBytes = 0;
761 		lt->curBlockNumber = 0L;
762 		lt->pos = 0;
763 		lt->nbytes = 0;
764 	}
765 }
766 
767 /*
768  * Read from a logical tape.
769  *
770  * Early EOF is indicated by return value less than #bytes requested.
771  */
772 size_t
LogicalTapeRead(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)773 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
774 				void *ptr, size_t size)
775 {
776 	LogicalTape *lt;
777 	size_t		nread = 0;
778 	size_t		nthistime;
779 
780 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
781 	lt = &lts->tapes[tapenum];
782 	Assert(!lt->writing);
783 
784 	while (size > 0)
785 	{
786 		if (lt->pos >= lt->nbytes)
787 		{
788 			/* Try to load more data into buffer. */
789 			long		datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
790 															 lt->frozen);
791 
792 			if (datablocknum == -1L)
793 				break;			/* EOF */
794 			lt->curBlockNumber++;
795 			lt->pos = 0;
796 			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
797 			if (!lt->frozen)
798 				ltsReleaseBlock(lts, datablocknum);
799 			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
800 				BLCKSZ : lt->lastBlockBytes;
801 			if (lt->nbytes <= 0)
802 				break;			/* EOF (possible here?) */
803 		}
804 
805 		nthistime = lt->nbytes - lt->pos;
806 		if (nthistime > size)
807 			nthistime = size;
808 		Assert(nthistime > 0);
809 
810 		memcpy(ptr, lt->buffer + lt->pos, nthistime);
811 
812 		lt->pos += nthistime;
813 		ptr = (void *) ((char *) ptr + nthistime);
814 		size -= nthistime;
815 		nread += nthistime;
816 	}
817 
818 	return nread;
819 }
820 
821 /*
822  * "Freeze" the contents of a tape so that it can be read multiple times
823  * and/or read backwards.  Once a tape is frozen, its contents will not
824  * be released until the LogicalTapeSet is destroyed.  This is expected
825  * to be used only for the final output pass of a merge.
826  *
827  * This *must* be called just at the end of a write pass, before the
828  * tape is rewound (after rewind is too late!).  It performs a rewind
829  * and switch to read mode "for free".  An immediately following rewind-
830  * for-read call is OK but not necessary.
831  */
832 void
LogicalTapeFreeze(LogicalTapeSet * lts,int tapenum)833 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
834 {
835 	LogicalTape *lt;
836 	long		datablocknum;
837 
838 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
839 	lt = &lts->tapes[tapenum];
840 	Assert(lt->writing);
841 
842 	/*
843 	 * Completion of a write phase.  Flush last partial data block, flush any
844 	 * partial indirect blocks, rewind for nondestructive read.
845 	 */
846 	if (lt->dirty)
847 		ltsDumpBuffer(lts, lt);
848 	lt->lastBlockBytes = lt->nbytes;
849 	lt->writing = false;
850 	lt->frozen = true;
851 	datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
852 	/* Read the first block, or reset if tape is empty */
853 	lt->curBlockNumber = 0L;
854 	lt->pos = 0;
855 	lt->nbytes = 0;
856 	if (datablocknum != -1L)
857 	{
858 		ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
859 		lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
860 			BLCKSZ : lt->lastBlockBytes;
861 	}
862 }
863 
864 /*
865  * Backspace the tape a given number of bytes.  (We also support a more
866  * general seek interface, see below.)
867  *
868  * *Only* a frozen-for-read tape can be backed up; we don't support
869  * random access during write, and an unfrozen read tape may have
870  * already discarded the desired data!
871  *
872  * Return value is TRUE if seek successful, FALSE if there isn't that much
873  * data before the current point (in which case there's no state change).
874  */
875 bool
LogicalTapeBackspace(LogicalTapeSet * lts,int tapenum,size_t size)876 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
877 {
878 	LogicalTape *lt;
879 	long		nblocks;
880 	int			newpos;
881 
882 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
883 	lt = &lts->tapes[tapenum];
884 	Assert(lt->frozen);
885 
886 	/*
887 	 * Easy case for seek within current block.
888 	 */
889 	if (size <= (size_t) lt->pos)
890 	{
891 		lt->pos -= (int) size;
892 		return true;
893 	}
894 
895 	/*
896 	 * Not-so-easy case.  Figure out whether it's possible at all.
897 	 */
898 	size -= (size_t) lt->pos;	/* part within this block */
899 	nblocks = size / BLCKSZ;
900 	size = size % BLCKSZ;
901 	if (size)
902 	{
903 		nblocks++;
904 		newpos = (int) (BLCKSZ - size);
905 	}
906 	else
907 		newpos = 0;
908 	if (nblocks > lt->curBlockNumber)
909 		return false;			/* a seek too far... */
910 
911 	/*
912 	 * OK, we need to back up nblocks blocks.  This implementation would be
913 	 * pretty inefficient for long seeks, but we really aren't expecting that
914 	 * (a seek over one tuple is typical).
915 	 */
916 	while (nblocks-- > 0)
917 	{
918 		long		datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
919 
920 		if (datablocknum == -1L)
921 			elog(ERROR, "unexpected end of tape");
922 		lt->curBlockNumber--;
923 		if (nblocks == 0)
924 		{
925 			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
926 			lt->nbytes = BLCKSZ;
927 		}
928 	}
929 	lt->pos = newpos;
930 	return true;
931 }
932 
933 /*
934  * Seek to an arbitrary position in a logical tape.
935  *
936  * *Only* a frozen-for-read tape can be seeked.
937  *
938  * Return value is TRUE if seek successful, FALSE if there isn't that much
939  * data in the tape (in which case there's no state change).
940  */
941 bool
LogicalTapeSeek(LogicalTapeSet * lts,int tapenum,long blocknum,int offset)942 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
943 				long blocknum, int offset)
944 {
945 	LogicalTape *lt;
946 
947 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
948 	lt = &lts->tapes[tapenum];
949 	Assert(lt->frozen);
950 	Assert(offset >= 0 && offset <= BLCKSZ);
951 
952 	/*
953 	 * Easy case for seek within current block.
954 	 */
955 	if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
956 	{
957 		lt->pos = offset;
958 		return true;
959 	}
960 
961 	/*
962 	 * Not-so-easy case.  Figure out whether it's possible at all.
963 	 */
964 	if (blocknum < 0 || blocknum > lt->numFullBlocks ||
965 		(blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
966 		return false;
967 
968 	/*
969 	 * OK, advance or back up to the target block.  This implementation would
970 	 * be pretty inefficient for long seeks, but we really aren't expecting
971 	 * that (a seek over one tuple is typical).
972 	 */
973 	while (lt->curBlockNumber > blocknum)
974 	{
975 		long		datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
976 
977 		if (datablocknum == -1L)
978 			elog(ERROR, "unexpected end of tape");
979 		if (--lt->curBlockNumber == blocknum)
980 			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
981 	}
982 	while (lt->curBlockNumber < blocknum)
983 	{
984 		long		datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
985 														 lt->frozen);
986 
987 		if (datablocknum == -1L)
988 			elog(ERROR, "unexpected end of tape");
989 		if (++lt->curBlockNumber == blocknum)
990 			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
991 	}
992 	lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
993 		BLCKSZ : lt->lastBlockBytes;
994 	lt->pos = offset;
995 	return true;
996 }
997 
998 /*
999  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1000  *
1001  * NOTE: it'd be OK to do this during write phase with intention of using
1002  * the position for a seek after freezing.  Not clear if anyone needs that.
1003  */
1004 void
LogicalTapeTell(LogicalTapeSet * lts,int tapenum,long * blocknum,int * offset)1005 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
1006 				long *blocknum, int *offset)
1007 {
1008 	LogicalTape *lt;
1009 
1010 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
1011 	lt = &lts->tapes[tapenum];
1012 	*blocknum = lt->curBlockNumber;
1013 	*offset = lt->pos;
1014 }
1015 
1016 /*
1017  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1018  */
1019 long
LogicalTapeSetBlocks(LogicalTapeSet * lts)1020 LogicalTapeSetBlocks(LogicalTapeSet *lts)
1021 {
1022 	return lts->nFileBlocks;
1023 }
1024