1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  *	  Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted.  (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass.  For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner.  Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated.  In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file.  logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c.  Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape form a chain, with a prev- and next-
35  * pointer in each block.
36  *
37  * The initial write pass is guaranteed to fill the underlying file
38  * perfectly sequentially, no matter how data is divided into logical tapes.
39  * Once we begin merge passes, the access pattern becomes considerably
40  * less predictable --- but the seeking involved should be comparable to
41  * what would happen if we kept each logical tape in a separate file,
42  * so there's no serious performance penalty paid to obtain the space
43  * savings of recycling.  We try to localize the write accesses by always
44  * writing to the lowest-numbered free block when we have a choice; it's
45  * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
46  * policy for free blocks would be better?)
47  *
48  * To further make the I/Os more sequential, we can use a larger buffer
49  * when reading, and read multiple blocks from the same tape in one go,
50  * whenever the buffer becomes empty.
51  *
52  * To support the above policy of writing to the lowest free block,
53  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
54  * order each time it is asked for a block and the list isn't currently
55  * sorted.  This is an efficient way to handle it because we expect cycles
56  * of releasing many blocks followed by re-using many blocks, due to
57  * the larger read buffer.
58  *
59  * Since all the bookkeeping and buffer memory is allocated with palloc(),
60  * and the underlying file(s) are made with OpenTemporaryFile, all resources
61  * for a logical tape set are certain to be cleaned up even if processing
62  * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
63  * care that all calls for a single LogicalTapeSet are made in the same
64  * palloc context.
65  *
66  * To support parallel sort operations involving coordinated callers to
67  * tuplesort.c routines across multiple workers, it is necessary to
68  * concatenate each worker BufFile/tapeset into one single logical tapeset
69  * managed by the leader.  Workers should have produced one final
70  * materialized tape (their entire output) when this happens in leader.
71  * There will always be the same number of runs as input tapes, and the same
72  * number of input tapes as participants (worker Tuplesortstates).
73  *
74  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
75  * Portions Copyright (c) 1994, Regents of the University of California
76  *
77  * IDENTIFICATION
78  *	  src/backend/utils/sort/logtape.c
79  *
80  *-------------------------------------------------------------------------
81  */
82 
83 #include "postgres.h"
84 
85 #include "storage/buffile.h"
86 #include "utils/builtins.h"
87 #include "utils/logtape.h"
88 #include "utils/memdebug.h"
89 #include "utils/memutils.h"
90 
91 /*
92  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
93  *
94  * The first block of a tape has prev == -1.  The last block of a tape
95  * stores the number of valid bytes on the block, inverted, in 'next'
96  * Therefore next < 0 indicates the last block.
97  */
98 typedef struct TapeBlockTrailer
99 {
100 	long		prev;			/* previous block on this tape, or -1 on first
101 								 * block */
102 	long		next;			/* next block on this tape, or # of valid
103 								 * bytes on last block (if < 0) */
104 } TapeBlockTrailer;
105 
106 #define TapeBlockPayloadSize  (BLCKSZ - sizeof(TapeBlockTrailer))
107 #define TapeBlockGetTrailer(buf) \
108 	((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
109 
110 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
111 #define TapeBlockGetNBytes(buf) \
112 	(TapeBlockIsLast(buf) ? \
113 	 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
114 #define TapeBlockSetNBytes(buf, nbytes) \
115 	(TapeBlockGetTrailer(buf)->next = -(nbytes))
116 
117 
118 /*
119  * This data structure represents a single "logical tape" within the set
120  * of logical tapes stored in the same file.
121  *
122  * While writing, we hold the current partially-written data block in the
123  * buffer.  While reading, we can hold multiple blocks in the buffer.  Note
124  * that we don't retain the trailers of a block when it's read into the
125  * buffer.  The buffer therefore contains one large contiguous chunk of data
126  * from the tape.
127  */
128 typedef struct LogicalTape
129 {
130 	bool		writing;		/* T while in write phase */
131 	bool		frozen;			/* T if blocks should not be freed when read */
132 	bool		dirty;			/* does buffer need to be written? */
133 
134 	/*
135 	 * Block numbers of the first, current, and next block of the tape.
136 	 *
137 	 * The "current" block number is only valid when writing, or reading from
138 	 * a frozen tape.  (When reading from an unfrozen tape, we use a larger
139 	 * read buffer that holds multiple blocks, so the "current" block is
140 	 * ambiguous.)
141 	 *
142 	 * When concatenation of worker tape BufFiles is performed, an offset to
143 	 * the first block in the unified BufFile space is applied during reads.
144 	 */
145 	long		firstBlockNumber;
146 	long		curBlockNumber;
147 	long		nextBlockNumber;
148 	long		offsetBlockNumber;
149 
150 	/*
151 	 * Buffer for current data block(s).
152 	 */
153 	char	   *buffer;			/* physical buffer (separately palloc'd) */
154 	int			buffer_size;	/* allocated size of the buffer */
155 	int			max_size;		/* highest useful, safe buffer_size */
156 	int			pos;			/* next read/write position in buffer */
157 	int			nbytes;			/* total # of valid bytes in buffer */
158 } LogicalTape;
159 
160 /*
161  * This data structure represents a set of related "logical tapes" sharing
162  * space in a single underlying file.  (But that "file" may be multiple files
163  * if needed to escape OS limits on file size; buffile.c handles that for us.)
164  * The number of tapes is fixed at creation.
165  */
166 struct LogicalTapeSet
167 {
168 	BufFile    *pfile;			/* underlying file for whole tape set */
169 
170 	/*
171 	 * File size tracking.  nBlocksWritten is the size of the underlying file,
172 	 * in BLCKSZ blocks.  nBlocksAllocated is the number of blocks allocated
173 	 * by ltsGetFreeBlock(), and it is always greater than or equal to
174 	 * nBlocksWritten.  Blocks between nBlocksAllocated and nBlocksWritten are
175 	 * blocks that have been allocated for a tape, but have not been written
176 	 * to the underlying file yet.  nHoleBlocks tracks the total number of
177 	 * blocks that are in unused holes between worker spaces following BufFile
178 	 * concatenation.
179 	 */
180 	long		nBlocksAllocated;	/* # of blocks allocated */
181 	long		nBlocksWritten; /* # of blocks used in underlying file */
182 	long		nHoleBlocks;	/* # of "hole" blocks left */
183 
184 	/*
185 	 * We store the numbers of recycled-and-available blocks in freeBlocks[].
186 	 * When there are no such blocks, we extend the underlying file.
187 	 *
188 	 * If forgetFreeSpace is true then any freed blocks are simply forgotten
189 	 * rather than being remembered in freeBlocks[].  See notes for
190 	 * LogicalTapeSetForgetFreeSpace().
191 	 *
192 	 * If blocksSorted is true then the block numbers in freeBlocks are in
193 	 * *decreasing* order, so that removing the last entry gives us the lowest
194 	 * free block.  We re-sort the blocks whenever a block is demanded; this
195 	 * should be reasonably efficient given the expected usage pattern.
196 	 */
197 	bool		forgetFreeSpace;	/* are we remembering free blocks? */
198 	bool		blocksSorted;	/* is freeBlocks[] currently in order? */
199 	long	   *freeBlocks;		/* resizable array */
200 	int			nFreeBlocks;	/* # of currently free blocks */
201 	int			freeBlocksLen;	/* current allocated length of freeBlocks[] */
202 
203 	/* The array of logical tapes. */
204 	int			nTapes;			/* # of logical tapes in set */
205 	LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER];	/* has nTapes nentries */
206 };
207 
208 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
209 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
210 static long ltsGetFreeBlock(LogicalTapeSet *lts);
211 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
212 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
213 					 SharedFileSet *fileset);
214 
215 
216 /*
217  * Write a block-sized buffer to the specified block of the underlying file.
218  *
219  * No need for an error return convention; we ereport() on any error.
220  */
221 static void
ltsWriteBlock(LogicalTapeSet * lts,long blocknum,void * buffer)222 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
223 {
224 	/*
225 	 * BufFile does not support "holes", so if we're about to write a block
226 	 * that's past the current end of file, fill the space between the current
227 	 * end of file and the target block with zeros.
228 	 *
229 	 * This should happen rarely, otherwise you are not writing very
230 	 * sequentially.  In current use, this only happens when the sort ends
231 	 * writing a run, and switches to another tape.  The last block of the
232 	 * previous tape isn't flushed to disk until the end of the sort, so you
233 	 * get one-block hole, where the last block of the previous tape will
234 	 * later go.
235 	 *
236 	 * Note that BufFile concatenation can leave "holes" in BufFile between
237 	 * worker-owned block ranges.  These are tracked for reporting purposes
238 	 * only.  We never read from nor write to these hole blocks, and so they
239 	 * are not considered here.
240 	 */
241 	while (blocknum > lts->nBlocksWritten)
242 	{
243 		PGAlignedBlock zerobuf;
244 
245 		MemSet(zerobuf.data, 0, sizeof(zerobuf));
246 
247 		ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
248 	}
249 
250 	/* Write the requested block */
251 	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
252 		ereport(ERROR,
253 				(errcode_for_file_access(),
254 				 errmsg("could not seek to block %ld of temporary file",
255 						blocknum)));
256 	BufFileWrite(lts->pfile, buffer, BLCKSZ);
257 
258 	/* Update nBlocksWritten, if we extended the file */
259 	if (blocknum == lts->nBlocksWritten)
260 		lts->nBlocksWritten++;
261 }
262 
263 /*
264  * Read a block-sized buffer from the specified block of the underlying file.
265  *
266  * No need for an error return convention; we ereport() on any error.   This
267  * module should never attempt to read a block it doesn't know is there.
268  */
269 static void
ltsReadBlock(LogicalTapeSet * lts,long blocknum,void * buffer)270 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
271 {
272 	size_t		nread;
273 
274 	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
275 		ereport(ERROR,
276 				(errcode_for_file_access(),
277 				 errmsg("could not seek to block %ld of temporary file",
278 						blocknum)));
279 	nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
280 	if (nread != BLCKSZ)
281 		ereport(ERROR,
282 				(errcode_for_file_access(),
283 				 errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
284 						blocknum, nread, (size_t) BLCKSZ)));
285 }
286 
287 /*
288  * Read as many blocks as we can into the per-tape buffer.
289  *
290  * Returns true if anything was read, 'false' on EOF.
291  */
292 static bool
ltsReadFillBuffer(LogicalTapeSet * lts,LogicalTape * lt)293 ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
294 {
295 	lt->pos = 0;
296 	lt->nbytes = 0;
297 
298 	do
299 	{
300 		char	   *thisbuf = lt->buffer + lt->nbytes;
301 		long		datablocknum = lt->nextBlockNumber;
302 
303 		/* Fetch next block number */
304 		if (datablocknum == -1L)
305 			break;				/* EOF */
306 		/* Apply worker offset, needed for leader tapesets */
307 		datablocknum += lt->offsetBlockNumber;
308 
309 		/* Read the block */
310 		ltsReadBlock(lts, datablocknum, (void *) thisbuf);
311 		if (!lt->frozen)
312 			ltsReleaseBlock(lts, datablocknum);
313 		lt->curBlockNumber = lt->nextBlockNumber;
314 
315 		lt->nbytes += TapeBlockGetNBytes(thisbuf);
316 		if (TapeBlockIsLast(thisbuf))
317 		{
318 			lt->nextBlockNumber = -1L;
319 			/* EOF */
320 			break;
321 		}
322 		else
323 			lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
324 
325 		/* Advance to next block, if we have buffer space left */
326 	} while (lt->buffer_size - lt->nbytes > BLCKSZ);
327 
328 	return (lt->nbytes > 0);
329 }
330 
331 /*
332  * qsort comparator for sorting freeBlocks[] into decreasing order.
333  */
334 static int
freeBlocks_cmp(const void * a,const void * b)335 freeBlocks_cmp(const void *a, const void *b)
336 {
337 	long		ablk = *((const long *) a);
338 	long		bblk = *((const long *) b);
339 
340 	/* can't just subtract because long might be wider than int */
341 	if (ablk < bblk)
342 		return 1;
343 	if (ablk > bblk)
344 		return -1;
345 	return 0;
346 }
347 
348 /*
349  * Select a currently unused block for writing to.
350  */
351 static long
ltsGetFreeBlock(LogicalTapeSet * lts)352 ltsGetFreeBlock(LogicalTapeSet *lts)
353 {
354 	/*
355 	 * If there are multiple free blocks, we select the one appearing last in
356 	 * freeBlocks[] (after sorting the array if needed).  If there are none,
357 	 * assign the next block at the end of the file.
358 	 */
359 	if (lts->nFreeBlocks > 0)
360 	{
361 		if (!lts->blocksSorted)
362 		{
363 			qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
364 				  sizeof(long), freeBlocks_cmp);
365 			lts->blocksSorted = true;
366 		}
367 		return lts->freeBlocks[--lts->nFreeBlocks];
368 	}
369 	else
370 		return lts->nBlocksAllocated++;
371 }
372 
373 /*
374  * Return a block# to the freelist.
375  */
376 static void
ltsReleaseBlock(LogicalTapeSet * lts,long blocknum)377 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
378 {
379 	int			ndx;
380 
381 	/*
382 	 * Do nothing if we're no longer interested in remembering free space.
383 	 */
384 	if (lts->forgetFreeSpace)
385 		return;
386 
387 	/*
388 	 * Enlarge freeBlocks array if full.
389 	 */
390 	if (lts->nFreeBlocks >= lts->freeBlocksLen)
391 	{
392 		lts->freeBlocksLen *= 2;
393 		lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
394 											lts->freeBlocksLen * sizeof(long));
395 	}
396 
397 	/*
398 	 * Add blocknum to array, and mark the array unsorted if it's no longer in
399 	 * decreasing order.
400 	 */
401 	ndx = lts->nFreeBlocks++;
402 	lts->freeBlocks[ndx] = blocknum;
403 	if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
404 		lts->blocksSorted = false;
405 }
406 
407 /*
408  * Claim ownership of a set of logical tapes from existing shared BufFiles.
409  *
410  * Caller should be leader process.  Though tapes are marked as frozen in
411  * workers, they are not frozen when opened within leader, since unfrozen tapes
412  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
413  * for random access.)
414  */
415 static void
ltsConcatWorkerTapes(LogicalTapeSet * lts,TapeShare * shared,SharedFileSet * fileset)416 ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
417 					 SharedFileSet *fileset)
418 {
419 	LogicalTape *lt = NULL;
420 	long		tapeblocks = 0L;
421 	long		nphysicalblocks = 0L;
422 	int			i;
423 
424 	/* Should have at least one worker tape, plus leader's tape */
425 	Assert(lts->nTapes >= 2);
426 
427 	/*
428 	 * Build concatenated view of all BufFiles, remembering the block number
429 	 * where each source file begins.  No changes are needed for leader/last
430 	 * tape.
431 	 */
432 	for (i = 0; i < lts->nTapes - 1; i++)
433 	{
434 		char		filename[MAXPGPATH];
435 		BufFile    *file;
436 		int64		filesize;
437 
438 		lt = &lts->tapes[i];
439 
440 		pg_itoa(i, filename);
441 		file = BufFileOpenShared(fileset, filename);
442 		filesize = BufFileSize(file);
443 
444 		/*
445 		 * Stash first BufFile, and concatenate subsequent BufFiles to that.
446 		 * Store block offset into each tape as we go.
447 		 */
448 		lt->firstBlockNumber = shared[i].firstblocknumber;
449 		if (i == 0)
450 		{
451 			lts->pfile = file;
452 			lt->offsetBlockNumber = 0L;
453 		}
454 		else
455 		{
456 			lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
457 		}
458 		/* Don't allocate more for read buffer than could possibly help */
459 		lt->max_size = Min(MaxAllocSize, filesize);
460 		tapeblocks = filesize / BLCKSZ;
461 		nphysicalblocks += tapeblocks;
462 	}
463 
464 	/*
465 	 * Set # of allocated blocks, as well as # blocks written.  Use extent of
466 	 * new BufFile space (from 0 to end of last worker's tape space) for this.
467 	 * Allocated/written blocks should include space used by holes left
468 	 * between concatenated BufFiles.
469 	 */
470 	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
471 	lts->nBlocksWritten = lts->nBlocksAllocated;
472 
473 	/*
474 	 * Compute number of hole blocks so that we can later work backwards, and
475 	 * instrument number of physical blocks.  We don't simply use physical
476 	 * blocks directly for instrumentation because this would break if we ever
477 	 * subsequently wrote to the leader tape.
478 	 *
479 	 * Working backwards like this keeps our options open.  If shared BufFiles
480 	 * ever support being written to post-export, logtape.c can automatically
481 	 * take advantage of that.  We'd then support writing to the leader tape
482 	 * while recycling space from worker tapes, because the leader tape has a
483 	 * zero offset (write routines won't need to have extra logic to apply an
484 	 * offset).
485 	 *
486 	 * The only thing that currently prevents writing to the leader tape from
487 	 * working is the fact that BufFiles opened using BufFileOpenShared() are
488 	 * read-only by definition, but that could be changed if it seemed
489 	 * worthwhile.  For now, writing to the leader tape will raise a "Bad file
490 	 * descriptor" error, so tuplesort must avoid writing to the leader tape
491 	 * altogether.
492 	 */
493 	lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
494 }
495 
496 /*
497  * Create a set of logical tapes in a temporary underlying file.
498  *
499  * Each tape is initialized in write state.  Serial callers pass ntapes,
500  * NULL argument for shared, and -1 for worker.  Parallel worker callers
501  * pass ntapes, a shared file handle, NULL shared argument,  and their own
502  * worker number.  Leader callers, which claim shared worker tapes here,
503  * must supply non-sentinel values for all arguments except worker number,
504  * which should be -1.
505  *
506  * Leader caller is passing back an array of metadata each worker captured
507  * when LogicalTapeFreeze() was called for their final result tapes.  Passed
508  * tapes array is actually sized ntapes - 1, because it includes only
509  * worker tapes, whereas leader requires its own leader tape.  Note that we
510  * rely on the assumption that reclaimed worker tapes will only be read
511  * from once by leader, and never written to again (tapes are initialized
512  * for writing, but that's only to be consistent).  Leader may not write to
513  * its own tape purely due to a restriction in the shared buffile
514  * infrastructure that may be lifted in the future.
515  */
516 LogicalTapeSet *
LogicalTapeSetCreate(int ntapes,TapeShare * shared,SharedFileSet * fileset,int worker)517 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
518 					 int worker)
519 {
520 	LogicalTapeSet *lts;
521 	LogicalTape *lt;
522 	int			i;
523 
524 	/*
525 	 * Create top-level struct including per-tape LogicalTape structs.
526 	 */
527 	Assert(ntapes > 0);
528 	lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
529 									ntapes * sizeof(LogicalTape));
530 	lts->nBlocksAllocated = 0L;
531 	lts->nBlocksWritten = 0L;
532 	lts->nHoleBlocks = 0L;
533 	lts->forgetFreeSpace = false;
534 	lts->blocksSorted = true;	/* a zero-length array is sorted ... */
535 	lts->freeBlocksLen = 32;	/* reasonable initial guess */
536 	lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
537 	lts->nFreeBlocks = 0;
538 	lts->nTapes = ntapes;
539 
540 	/*
541 	 * Initialize per-tape structs.  Note we allocate the I/O buffer and the
542 	 * first block for a tape only when it is first actually written to.  This
543 	 * avoids wasting memory space when tuplesort.c overestimates the number
544 	 * of tapes needed.
545 	 */
546 	for (i = 0; i < ntapes; i++)
547 	{
548 		lt = &lts->tapes[i];
549 		lt->writing = true;
550 		lt->frozen = false;
551 		lt->dirty = false;
552 		lt->firstBlockNumber = -1L;
553 		lt->curBlockNumber = -1L;
554 		lt->nextBlockNumber = -1L;
555 		lt->offsetBlockNumber = 0L;
556 		lt->buffer = NULL;
557 		lt->buffer_size = 0;
558 		/* palloc() larger than MaxAllocSize would fail */
559 		lt->max_size = MaxAllocSize;
560 		lt->pos = 0;
561 		lt->nbytes = 0;
562 	}
563 
564 	/*
565 	 * Create temp BufFile storage as required.
566 	 *
567 	 * Leader concatenates worker tapes, which requires special adjustment to
568 	 * final tapeset data.  Things are simpler for the worker case and the
569 	 * serial case, though.  They are generally very similar -- workers use a
570 	 * shared fileset, whereas serial sorts use a conventional serial BufFile.
571 	 */
572 	if (shared)
573 		ltsConcatWorkerTapes(lts, shared, fileset);
574 	else if (fileset)
575 	{
576 		char		filename[MAXPGPATH];
577 
578 		pg_itoa(worker, filename);
579 		lts->pfile = BufFileCreateShared(fileset, filename);
580 	}
581 	else
582 		lts->pfile = BufFileCreateTemp(false);
583 
584 	return lts;
585 }
586 
587 /*
588  * Close a logical tape set and release all resources.
589  */
590 void
LogicalTapeSetClose(LogicalTapeSet * lts)591 LogicalTapeSetClose(LogicalTapeSet *lts)
592 {
593 	LogicalTape *lt;
594 	int			i;
595 
596 	BufFileClose(lts->pfile);
597 	for (i = 0; i < lts->nTapes; i++)
598 	{
599 		lt = &lts->tapes[i];
600 		if (lt->buffer)
601 			pfree(lt->buffer);
602 	}
603 	pfree(lts->freeBlocks);
604 	pfree(lts);
605 }
606 
607 /*
608  * Mark a logical tape set as not needing management of free space anymore.
609  *
610  * This should be called if the caller does not intend to write any more data
611  * into the tape set, but is reading from un-frozen tapes.  Since no more
612  * writes are planned, remembering free blocks is no longer useful.  Setting
613  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
614  * is not designed to handle large numbers of free blocks.
615  */
616 void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet * lts)617 LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
618 {
619 	lts->forgetFreeSpace = true;
620 }
621 
622 /*
623  * Write to a logical tape.
624  *
625  * There are no error returns; we ereport() on failure.
626  */
627 void
LogicalTapeWrite(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)628 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
629 				 void *ptr, size_t size)
630 {
631 	LogicalTape *lt;
632 	size_t		nthistime;
633 
634 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
635 	lt = &lts->tapes[tapenum];
636 	Assert(lt->writing);
637 	Assert(lt->offsetBlockNumber == 0L);
638 
639 	/* Allocate data buffer and first block on first write */
640 	if (lt->buffer == NULL)
641 	{
642 		lt->buffer = (char *) palloc(BLCKSZ);
643 		lt->buffer_size = BLCKSZ;
644 	}
645 	if (lt->curBlockNumber == -1)
646 	{
647 		Assert(lt->firstBlockNumber == -1);
648 		Assert(lt->pos == 0);
649 
650 		lt->curBlockNumber = ltsGetFreeBlock(lts);
651 		lt->firstBlockNumber = lt->curBlockNumber;
652 
653 		TapeBlockGetTrailer(lt->buffer)->prev = -1L;
654 	}
655 
656 	Assert(lt->buffer_size == BLCKSZ);
657 	while (size > 0)
658 	{
659 		if (lt->pos >= TapeBlockPayloadSize)
660 		{
661 			/* Buffer full, dump it out */
662 			long		nextBlockNumber;
663 
664 			if (!lt->dirty)
665 			{
666 				/* Hmm, went directly from reading to writing? */
667 				elog(ERROR, "invalid logtape state: should be dirty");
668 			}
669 
670 			/*
671 			 * First allocate the next block, so that we can store it in the
672 			 * 'next' pointer of this block.
673 			 */
674 			nextBlockNumber = ltsGetFreeBlock(lts);
675 
676 			/* set the next-pointer and dump the current block. */
677 			TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
678 			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
679 
680 			/* initialize the prev-pointer of the next block */
681 			TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
682 			lt->curBlockNumber = nextBlockNumber;
683 			lt->pos = 0;
684 			lt->nbytes = 0;
685 		}
686 
687 		nthistime = TapeBlockPayloadSize - lt->pos;
688 		if (nthistime > size)
689 			nthistime = size;
690 		Assert(nthistime > 0);
691 
692 		memcpy(lt->buffer + lt->pos, ptr, nthistime);
693 
694 		lt->dirty = true;
695 		lt->pos += nthistime;
696 		if (lt->nbytes < lt->pos)
697 			lt->nbytes = lt->pos;
698 		ptr = (void *) ((char *) ptr + nthistime);
699 		size -= nthistime;
700 	}
701 }
702 
703 /*
704  * Rewind logical tape and switch from writing to reading.
705  *
706  * The tape must currently be in writing state, or "frozen" in read state.
707  *
708  * 'buffer_size' specifies how much memory to use for the read buffer.
709  * Regardless of the argument, the actual amount of memory used is between
710  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ.  The given value is
711  * rounded down and truncated to fit those constraints, if necessary.  If the
712  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
713  * byte buffer is used.
714  */
715 void
LogicalTapeRewindForRead(LogicalTapeSet * lts,int tapenum,size_t buffer_size)716 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
717 {
718 	LogicalTape *lt;
719 
720 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
721 	lt = &lts->tapes[tapenum];
722 
723 	/*
724 	 * Round and cap buffer_size if needed.
725 	 */
726 	if (lt->frozen)
727 		buffer_size = BLCKSZ;
728 	else
729 	{
730 		/* need at least one block */
731 		if (buffer_size < BLCKSZ)
732 			buffer_size = BLCKSZ;
733 
734 		/* palloc() larger than max_size is unlikely to be helpful */
735 		if (buffer_size > lt->max_size)
736 			buffer_size = lt->max_size;
737 
738 		/* round down to BLCKSZ boundary */
739 		buffer_size -= buffer_size % BLCKSZ;
740 	}
741 
742 	if (lt->writing)
743 	{
744 		/*
745 		 * Completion of a write phase.  Flush last partial data block, and
746 		 * rewind for normal (destructive) read.
747 		 */
748 		if (lt->dirty)
749 		{
750 			/*
751 			 * As long as we've filled the buffer at least once, its contents
752 			 * are entirely defined from valgrind's point of view, even though
753 			 * contents beyond the current end point may be stale.  But it's
754 			 * possible - at least in the case of a parallel sort - to sort
755 			 * such small amount of data that we do not fill the buffer even
756 			 * once.  Tell valgrind that its contents are defined, so it
757 			 * doesn't bleat.
758 			 */
759 			VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
760 									  lt->buffer_size - lt->nbytes);
761 
762 			TapeBlockSetNBytes(lt->buffer, lt->nbytes);
763 			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
764 		}
765 		lt->writing = false;
766 	}
767 	else
768 	{
769 		/*
770 		 * This is only OK if tape is frozen; we rewind for (another) read
771 		 * pass.
772 		 */
773 		Assert(lt->frozen);
774 	}
775 
776 	/* Allocate a read buffer (unless the tape is empty) */
777 	if (lt->buffer)
778 		pfree(lt->buffer);
779 	lt->buffer = NULL;
780 	lt->buffer_size = 0;
781 	if (lt->firstBlockNumber != -1L)
782 	{
783 		lt->buffer = palloc(buffer_size);
784 		lt->buffer_size = buffer_size;
785 	}
786 
787 	/* Read the first block, or reset if tape is empty */
788 	lt->nextBlockNumber = lt->firstBlockNumber;
789 	lt->pos = 0;
790 	lt->nbytes = 0;
791 	ltsReadFillBuffer(lts, lt);
792 }
793 
794 /*
795  * Rewind logical tape and switch from reading to writing.
796  *
797  * NOTE: we assume the caller has read the tape to the end; otherwise
798  * untouched data will not have been freed. We could add more code to free
799  * any unread blocks, but in current usage of this module it'd be useless
800  * code.
801  */
802 void
LogicalTapeRewindForWrite(LogicalTapeSet * lts,int tapenum)803 LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
804 {
805 	LogicalTape *lt;
806 
807 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
808 	lt = &lts->tapes[tapenum];
809 
810 	Assert(!lt->writing && !lt->frozen);
811 	lt->writing = true;
812 	lt->dirty = false;
813 	lt->firstBlockNumber = -1L;
814 	lt->curBlockNumber = -1L;
815 	lt->pos = 0;
816 	lt->nbytes = 0;
817 	if (lt->buffer)
818 		pfree(lt->buffer);
819 	lt->buffer = NULL;
820 	lt->buffer_size = 0;
821 }
822 
823 /*
824  * Read from a logical tape.
825  *
826  * Early EOF is indicated by return value less than #bytes requested.
827  */
828 size_t
LogicalTapeRead(LogicalTapeSet * lts,int tapenum,void * ptr,size_t size)829 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
830 				void *ptr, size_t size)
831 {
832 	LogicalTape *lt;
833 	size_t		nread = 0;
834 	size_t		nthistime;
835 
836 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
837 	lt = &lts->tapes[tapenum];
838 	Assert(!lt->writing);
839 
840 	while (size > 0)
841 	{
842 		if (lt->pos >= lt->nbytes)
843 		{
844 			/* Try to load more data into buffer. */
845 			if (!ltsReadFillBuffer(lts, lt))
846 				break;			/* EOF */
847 		}
848 
849 		nthistime = lt->nbytes - lt->pos;
850 		if (nthistime > size)
851 			nthistime = size;
852 		Assert(nthistime > 0);
853 
854 		memcpy(ptr, lt->buffer + lt->pos, nthistime);
855 
856 		lt->pos += nthistime;
857 		ptr = (void *) ((char *) ptr + nthistime);
858 		size -= nthistime;
859 		nread += nthistime;
860 	}
861 
862 	return nread;
863 }
864 
865 /*
866  * "Freeze" the contents of a tape so that it can be read multiple times
867  * and/or read backwards.  Once a tape is frozen, its contents will not
868  * be released until the LogicalTapeSet is destroyed.  This is expected
869  * to be used only for the final output pass of a merge.
870  *
871  * This *must* be called just at the end of a write pass, before the
872  * tape is rewound (after rewind is too late!).  It performs a rewind
873  * and switch to read mode "for free".  An immediately following rewind-
874  * for-read call is OK but not necessary.
875  *
876  * share output argument is set with details of storage used for tape after
877  * freezing, which may be passed to LogicalTapeSetCreate within leader
878  * process later.  This metadata is only of interest to worker callers
879  * freezing their final output for leader (single materialized tape).
880  * Serial sorts should set share to NULL.
881  */
882 void
LogicalTapeFreeze(LogicalTapeSet * lts,int tapenum,TapeShare * share)883 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
884 {
885 	LogicalTape *lt;
886 
887 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
888 	lt = &lts->tapes[tapenum];
889 	Assert(lt->writing);
890 	Assert(lt->offsetBlockNumber == 0L);
891 
892 	/*
893 	 * Completion of a write phase.  Flush last partial data block, and rewind
894 	 * for nondestructive read.
895 	 */
896 	if (lt->dirty)
897 	{
898 		/*
899 		 * As long as we've filled the buffer at least once, its contents are
900 		 * entirely defined from valgrind's point of view, even though
901 		 * contents beyond the current end point may be stale.  But it's
902 		 * possible - at least in the case of a parallel sort - to sort such
903 		 * small amount of data that we do not fill the buffer even once. Tell
904 		 * valgrind that its contents are defined, so it doesn't bleat.
905 		 */
906 		VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
907 								  lt->buffer_size - lt->nbytes);
908 
909 		TapeBlockSetNBytes(lt->buffer, lt->nbytes);
910 		ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
911 		lt->writing = false;
912 	}
913 	lt->writing = false;
914 	lt->frozen = true;
915 
916 	/*
917 	 * The seek and backspace functions assume a single block read buffer.
918 	 * That's OK with current usage.  A larger buffer is helpful to make the
919 	 * read pattern of the backing file look more sequential to the OS, when
920 	 * we're reading from multiple tapes.  But at the end of a sort, when a
921 	 * tape is frozen, we only read from a single tape anyway.
922 	 */
923 	if (!lt->buffer || lt->buffer_size != BLCKSZ)
924 	{
925 		if (lt->buffer)
926 			pfree(lt->buffer);
927 		lt->buffer = palloc(BLCKSZ);
928 		lt->buffer_size = BLCKSZ;
929 	}
930 
931 	/* Read the first block, or reset if tape is empty */
932 	lt->curBlockNumber = lt->firstBlockNumber;
933 	lt->pos = 0;
934 	lt->nbytes = 0;
935 
936 	if (lt->firstBlockNumber == -1L)
937 		lt->nextBlockNumber = -1L;
938 	ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
939 	if (TapeBlockIsLast(lt->buffer))
940 		lt->nextBlockNumber = -1L;
941 	else
942 		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
943 	lt->nbytes = TapeBlockGetNBytes(lt->buffer);
944 
945 	/* Handle extra steps when caller is to share its tapeset */
946 	if (share)
947 	{
948 		BufFileExportShared(lts->pfile);
949 		share->firstblocknumber = lt->firstBlockNumber;
950 	}
951 }
952 
953 /*
954  * Backspace the tape a given number of bytes.  (We also support a more
955  * general seek interface, see below.)
956  *
957  * *Only* a frozen-for-read tape can be backed up; we don't support
958  * random access during write, and an unfrozen read tape may have
959  * already discarded the desired data!
960  *
961  * Returns the number of bytes backed up.  It can be less than the
962  * requested amount, if there isn't that much data before the current
963  * position.  The tape is positioned to the beginning of the tape in
964  * that case.
965  */
966 size_t
LogicalTapeBackspace(LogicalTapeSet * lts,int tapenum,size_t size)967 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
968 {
969 	LogicalTape *lt;
970 	size_t		seekpos = 0;
971 
972 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
973 	lt = &lts->tapes[tapenum];
974 	Assert(lt->frozen);
975 	Assert(lt->buffer_size == BLCKSZ);
976 
977 	/*
978 	 * Easy case for seek within current block.
979 	 */
980 	if (size <= (size_t) lt->pos)
981 	{
982 		lt->pos -= (int) size;
983 		return size;
984 	}
985 
986 	/*
987 	 * Not-so-easy case, have to walk back the chain of blocks.  This
988 	 * implementation would be pretty inefficient for long seeks, but we
989 	 * really aren't doing that (a seek over one tuple is typical).
990 	 */
991 	seekpos = (size_t) lt->pos; /* part within this block */
992 	while (size > seekpos)
993 	{
994 		long		prev = TapeBlockGetTrailer(lt->buffer)->prev;
995 
996 		if (prev == -1L)
997 		{
998 			/* Tried to back up beyond the beginning of tape. */
999 			if (lt->curBlockNumber != lt->firstBlockNumber)
1000 				elog(ERROR, "unexpected end of tape");
1001 			lt->pos = 0;
1002 			return seekpos;
1003 		}
1004 
1005 		ltsReadBlock(lts, prev, (void *) lt->buffer);
1006 
1007 		if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1008 			elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1009 				 prev,
1010 				 TapeBlockGetTrailer(lt->buffer)->next,
1011 				 lt->curBlockNumber);
1012 
1013 		lt->nbytes = TapeBlockPayloadSize;
1014 		lt->curBlockNumber = prev;
1015 		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1016 
1017 		seekpos += TapeBlockPayloadSize;
1018 	}
1019 
1020 	/*
1021 	 * 'seekpos' can now be greater than 'size', because it points to the
1022 	 * beginning the target block.  The difference is the position within the
1023 	 * page.
1024 	 */
1025 	lt->pos = seekpos - size;
1026 	return size;
1027 }
1028 
1029 /*
1030  * Seek to an arbitrary position in a logical tape.
1031  *
1032  * *Only* a frozen-for-read tape can be seeked.
1033  *
1034  * Must be called with a block/offset previously returned by
1035  * LogicalTapeTell().
1036  */
1037 void
LogicalTapeSeek(LogicalTapeSet * lts,int tapenum,long blocknum,int offset)1038 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
1039 				long blocknum, int offset)
1040 {
1041 	LogicalTape *lt;
1042 
1043 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
1044 	lt = &lts->tapes[tapenum];
1045 	Assert(lt->frozen);
1046 	Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1047 	Assert(lt->buffer_size == BLCKSZ);
1048 
1049 	if (blocknum != lt->curBlockNumber)
1050 	{
1051 		ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1052 		lt->curBlockNumber = blocknum;
1053 		lt->nbytes = TapeBlockPayloadSize;
1054 		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1055 	}
1056 
1057 	if (offset > lt->nbytes)
1058 		elog(ERROR, "invalid tape seek position");
1059 	lt->pos = offset;
1060 }
1061 
1062 /*
1063  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1064  *
1065  * NOTE: it'd be OK to do this during write phase with intention of using
1066  * the position for a seek after freezing.  Not clear if anyone needs that.
1067  */
1068 void
LogicalTapeTell(LogicalTapeSet * lts,int tapenum,long * blocknum,int * offset)1069 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
1070 				long *blocknum, int *offset)
1071 {
1072 	LogicalTape *lt;
1073 
1074 	Assert(tapenum >= 0 && tapenum < lts->nTapes);
1075 	lt = &lts->tapes[tapenum];
1076 	Assert(lt->offsetBlockNumber == 0L);
1077 
1078 	/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1079 	Assert(lt->buffer_size == BLCKSZ);
1080 
1081 	*blocknum = lt->curBlockNumber;
1082 	*offset = lt->pos;
1083 }
1084 
1085 /*
1086  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1087  */
1088 long
LogicalTapeSetBlocks(LogicalTapeSet * lts)1089 LogicalTapeSetBlocks(LogicalTapeSet *lts)
1090 {
1091 	return lts->nBlocksAllocated - lts->nHoleBlocks;
1092 }
1093