1 /*-------------------------------------------------------------------------
2  *
3  * tuplestore.c
4  *	  Generalized routines for temporary tuple storage.
5  *
6  * This module handles temporary storage of tuples for purposes such
7  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
8  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9  * but can only store and regurgitate a sequence of tuples.  However,
10  * because no sort is required, it is allowed to start reading the sequence
11  * before it has all been written.  This is particularly useful for cursors,
12  * because it allows random access within the already-scanned portion of
13  * a query without having to process the underlying scan to completion.
14  * Also, it is possible to support multiple independent read pointers.
15  *
16  * A temporary file is used to handle the data if it exceeds the
17  * space limit specified by the caller.
18  *
19  * The (approximate) amount of memory allowed to the tuplestore is specified
20  * in kilobytes by the caller.  We absorb tuples and simply store them in an
21  * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
22  * maxKBytes, we dump all the tuples into a temp file and then read from that
23  * when needed.
24  *
25  * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26  * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27  * Mark/restore behavior is supported by copying read pointers.
28  *
29  * When the caller requests backward-scan capability, we write the temp file
30  * in a format that allows either forward or backward scan.  Otherwise, only
31  * forward scan is allowed.  A request for backward scan must be made before
32  * putting any tuples into the tuplestore.  Rewind is normally allowed but
33  * can be turned off via tuplestore_set_eflags; turning off rewind for all
34  * read pointers enables truncation of the tuplestore at the oldest read point
35  * for minimal memory usage.  (The caller must explicitly call tuplestore_trim
36  * at appropriate times for truncation to actually happen.)
37  *
38  * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39  * current write position, and the write-position variables in the tuplestore
40  * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
41  * seek position is the active read pointer's position, and that read pointer
42  * isn't kept up to date.  We update the appropriate variables using ftell()
43  * before switching to the other state or activating a different read pointer.
44  *
45  *
46  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
47  * Portions Copyright (c) 1994, Regents of the University of California
48  *
49  * IDENTIFICATION
50  *	  src/backend/utils/sort/tuplestore.c
51  *
52  *-------------------------------------------------------------------------
53  */
54 
55 #include "postgres.h"
56 
57 #include <limits.h>
58 
59 #include "access/htup_details.h"
60 #include "commands/tablespace.h"
61 #include "executor/executor.h"
62 #include "miscadmin.h"
63 #include "storage/buffile.h"
64 #include "utils/memutils.h"
65 #include "utils/resowner.h"
66 
67 
68 /*
69  * Possible states of a Tuplestore object.  These denote the states that
70  * persist between calls of Tuplestore routines.
71  */
72 typedef enum
73 {
74 	TSS_INMEM,					/* Tuples still fit in memory */
75 	TSS_WRITEFILE,				/* Writing to temp file */
76 	TSS_READFILE				/* Reading from temp file */
77 } TupStoreStatus;
78 
79 /*
80  * State for a single read pointer.  If we are in state INMEM then all the
81  * read pointers' "current" fields denote the read positions.  In state
82  * WRITEFILE, the file/offset fields denote the read positions.  In state
83  * READFILE, inactive read pointers have valid file/offset, but the active
84  * read pointer implicitly has position equal to the temp file's seek position.
85  *
86  * Special case: if eof_reached is true, then the pointer's read position is
87  * implicitly equal to the write position, and current/file/offset aren't
88  * maintained.  This way we need not update all the read pointers each time
89  * we write.
90  */
91 typedef struct
92 {
93 	int			eflags;			/* capability flags */
94 	bool		eof_reached;	/* read has reached EOF */
95 	int			current;		/* next array index to read */
96 	int			file;			/* temp file# */
97 	off_t		offset;			/* byte offset in file */
98 } TSReadPointer;
99 
100 /*
101  * Private state of a Tuplestore operation.
102  */
103 struct Tuplestorestate
104 {
105 	TupStoreStatus status;		/* enumerated value as shown above */
106 	int			eflags;			/* capability flags (OR of pointers' flags) */
107 	bool		backward;		/* store extra length words in file? */
108 	bool		interXact;		/* keep open through transactions? */
109 	bool		truncated;		/* tuplestore_trim has removed tuples? */
110 	int64		availMem;		/* remaining memory available, in bytes */
111 	int64		allowedMem;		/* total memory allowed, in bytes */
112 	int64		tuples;			/* number of tuples added */
113 	BufFile    *myfile;			/* underlying file, or NULL if none */
114 	MemoryContext context;		/* memory context for holding tuples */
115 	ResourceOwner resowner;		/* resowner for holding temp files */
116 
117 	/*
118 	 * These function pointers decouple the routines that must know what kind
119 	 * of tuple we are handling from the routines that don't need to know it.
120 	 * They are set up by the tuplestore_begin_xxx routines.
121 	 *
122 	 * (Although tuplestore.c currently only supports heap tuples, I've copied
123 	 * this part of tuplesort.c so that extension to other kinds of objects
124 	 * will be easy if it's ever needed.)
125 	 *
126 	 * Function to copy a supplied input tuple into palloc'd space. (NB: we
127 	 * assume that a single pfree() is enough to release the tuple later, so
128 	 * the representation must be "flat" in one palloc chunk.) state->availMem
129 	 * must be decreased by the amount of space used.
130 	 */
131 	void	   *(*copytup) (Tuplestorestate *state, void *tup);
132 
133 	/*
134 	 * Function to write a stored tuple onto tape.  The representation of the
135 	 * tuple on tape need not be the same as it is in memory; requirements on
136 	 * the tape representation are given below.  After writing the tuple,
137 	 * pfree() it, and increase state->availMem by the amount of memory space
138 	 * thereby released.
139 	 */
140 	void		(*writetup) (Tuplestorestate *state, void *tup);
141 
142 	/*
143 	 * Function to read a stored tuple from tape back into memory. 'len' is
144 	 * the already-read length of the stored tuple.  Create and return a
145 	 * palloc'd copy, and decrease state->availMem by the amount of memory
146 	 * space consumed.
147 	 */
148 	void	   *(*readtup) (Tuplestorestate *state, unsigned int len);
149 
150 	/*
151 	 * This array holds pointers to tuples in memory if we are in state INMEM.
152 	 * In states WRITEFILE and READFILE it's not used.
153 	 *
154 	 * When memtupdeleted > 0, the first memtupdeleted pointers are already
155 	 * released due to a tuplestore_trim() operation, but we haven't expended
156 	 * the effort to slide the remaining pointers down.  These unused pointers
157 	 * are set to NULL to catch any invalid accesses.  Note that memtupcount
158 	 * includes the deleted pointers.
159 	 */
160 	void	  **memtuples;		/* array of pointers to palloc'd tuples */
161 	int			memtupdeleted;	/* the first N slots are currently unused */
162 	int			memtupcount;	/* number of tuples currently present */
163 	int			memtupsize;		/* allocated length of memtuples array */
164 	bool		growmemtuples;	/* memtuples' growth still underway? */
165 
166 	/*
167 	 * These variables are used to keep track of the current positions.
168 	 *
169 	 * In state WRITEFILE, the current file seek position is the write point;
170 	 * in state READFILE, the write position is remembered in writepos_xxx.
171 	 * (The write position is the same as EOF, but since BufFileSeek doesn't
172 	 * currently implement SEEK_END, we have to remember it explicitly.)
173 	 */
174 	TSReadPointer *readptrs;	/* array of read pointers */
175 	int			activeptr;		/* index of the active read pointer */
176 	int			readptrcount;	/* number of pointers currently valid */
177 	int			readptrsize;	/* allocated length of readptrs array */
178 
179 	int			writepos_file;	/* file# (valid if READFILE state) */
180 	off_t		writepos_offset;	/* offset (valid if READFILE state) */
181 };
182 
183 #define COPYTUP(state,tup)	((*(state)->copytup) (state, tup))
184 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
185 #define READTUP(state,len)	((*(state)->readtup) (state, len))
186 #define LACKMEM(state)		((state)->availMem < 0)
187 #define USEMEM(state,amt)	((state)->availMem -= (amt))
188 #define FREEMEM(state,amt)	((state)->availMem += (amt))
189 
190 /*--------------------
191  *
192  * NOTES about on-tape representation of tuples:
193  *
194  * We require the first "unsigned int" of a stored tuple to be the total size
195  * on-tape of the tuple, including itself (so it is never zero).
196  * The remainder of the stored tuple
197  * may or may not match the in-memory representation of the tuple ---
198  * any conversion needed is the job of the writetup and readtup routines.
199  *
200  * If state->backward is true, then the stored representation of
201  * the tuple must be followed by another "unsigned int" that is a copy of the
202  * length --- so the total tape space used is actually sizeof(unsigned int)
203  * more than the stored length value.  This allows read-backwards.  When
204  * state->backward is not set, the write/read routines may omit the extra
205  * length word.
206  *
207  * writetup is expected to write both length words as well as the tuple
208  * data.  When readtup is called, the tape is positioned just after the
209  * front length word; readtup must read the tuple data and advance past
210  * the back length word (if present).
211  *
212  * The write/read routines can make use of the tuple description data
213  * stored in the Tuplestorestate record, if needed. They are also expected
214  * to adjust state->availMem by the amount of memory space (not tape space!)
215  * released or consumed.  There is no error return from either writetup
216  * or readtup; they should ereport() on failure.
217  *
218  *
219  * NOTES about memory consumption calculations:
220  *
221  * We count space allocated for tuples against the maxKBytes limit,
222  * plus the space used by the variable-size array memtuples.
223  * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
224  * We don't worry about the size of the read pointer array, either.
225  *
226  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
227  * rather than the originally-requested size.  This is important since
228  * palloc can add substantial overhead.  It's not a complete answer since
229  * we won't count any wasted space in palloc allocation blocks, but it's
230  * a lot better than what we were doing before 7.3.
231  *
232  *--------------------
233  */
234 
235 
236 static Tuplestorestate *tuplestore_begin_common(int eflags,
237 												bool interXact,
238 												int maxKBytes);
239 static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
240 static void dumptuples(Tuplestorestate *state);
241 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
242 static void *copytup_heap(Tuplestorestate *state, void *tup);
243 static void writetup_heap(Tuplestorestate *state, void *tup);
244 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
245 
246 
247 /*
248  *		tuplestore_begin_xxx
249  *
250  * Initialize for a tuple store operation.
251  */
252 static Tuplestorestate *
tuplestore_begin_common(int eflags,bool interXact,int maxKBytes)253 tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
254 {
255 	Tuplestorestate *state;
256 
257 	state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
258 
259 	state->status = TSS_INMEM;
260 	state->eflags = eflags;
261 	state->interXact = interXact;
262 	state->truncated = false;
263 	state->allowedMem = maxKBytes * 1024L;
264 	state->availMem = state->allowedMem;
265 	state->myfile = NULL;
266 	state->context = CurrentMemoryContext;
267 	state->resowner = CurrentResourceOwner;
268 
269 	state->memtupdeleted = 0;
270 	state->memtupcount = 0;
271 	state->tuples = 0;
272 
273 	/*
274 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
275 	 * see comments in grow_memtuples().
276 	 */
277 	state->memtupsize = Max(16384 / sizeof(void *),
278 							ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
279 
280 	state->growmemtuples = true;
281 	state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
282 
283 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
284 
285 	state->activeptr = 0;
286 	state->readptrcount = 1;
287 	state->readptrsize = 8;		/* arbitrary */
288 	state->readptrs = (TSReadPointer *)
289 		palloc(state->readptrsize * sizeof(TSReadPointer));
290 
291 	state->readptrs[0].eflags = eflags;
292 	state->readptrs[0].eof_reached = false;
293 	state->readptrs[0].current = 0;
294 
295 	return state;
296 }
297 
298 /*
299  * tuplestore_begin_heap
300  *
301  * Create a new tuplestore; other types of tuple stores (other than
302  * "heap" tuple stores, for heap tuples) are possible, but not presently
303  * implemented.
304  *
305  * randomAccess: if true, both forward and backward accesses to the
306  * tuple store are allowed.
307  *
308  * interXact: if true, the files used for on-disk storage persist beyond the
309  * end of the current transaction.  NOTE: It's the caller's responsibility to
310  * create such a tuplestore in a memory context and resource owner that will
311  * also survive transaction boundaries, and to ensure the tuplestore is closed
312  * when it's no longer wanted.
313  *
314  * maxKBytes: how much data to store in memory (any data beyond this
315  * amount is paged to disk).  When in doubt, use work_mem.
316  */
317 Tuplestorestate *
tuplestore_begin_heap(bool randomAccess,bool interXact,int maxKBytes)318 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
319 {
320 	Tuplestorestate *state;
321 	int			eflags;
322 
323 	/*
324 	 * This interpretation of the meaning of randomAccess is compatible with
325 	 * the pre-8.3 behavior of tuplestores.
326 	 */
327 	eflags = randomAccess ?
328 		(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
329 		(EXEC_FLAG_REWIND);
330 
331 	state = tuplestore_begin_common(eflags, interXact, maxKBytes);
332 
333 	state->copytup = copytup_heap;
334 	state->writetup = writetup_heap;
335 	state->readtup = readtup_heap;
336 
337 	return state;
338 }
339 
340 /*
341  * tuplestore_set_eflags
342  *
343  * Set the capability flags for read pointer 0 at a finer grain than is
344  * allowed by tuplestore_begin_xxx.  This must be called before inserting
345  * any data into the tuplestore.
346  *
347  * eflags is a bitmask following the meanings used for executor node
348  * startup flags (see executor.h).  tuplestore pays attention to these bits:
349  *		EXEC_FLAG_REWIND		need rewind to start
350  *		EXEC_FLAG_BACKWARD		need backward fetch
351  * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
352  * is set per "randomAccess" in the tuplestore_begin_xxx call.
353  *
354  * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
355  * but not further than the truncation point (the furthest-back read pointer
356  * position at the time of the last tuplestore_trim call).
357  */
358 void
tuplestore_set_eflags(Tuplestorestate * state,int eflags)359 tuplestore_set_eflags(Tuplestorestate *state, int eflags)
360 {
361 	int			i;
362 
363 	if (state->status != TSS_INMEM || state->memtupcount != 0)
364 		elog(ERROR, "too late to call tuplestore_set_eflags");
365 
366 	state->readptrs[0].eflags = eflags;
367 	for (i = 1; i < state->readptrcount; i++)
368 		eflags |= state->readptrs[i].eflags;
369 	state->eflags = eflags;
370 }
371 
372 /*
373  * tuplestore_alloc_read_pointer - allocate another read pointer.
374  *
375  * Returns the pointer's index.
376  *
377  * The new pointer initially copies the position of read pointer 0.
378  * It can have its own eflags, but if any data has been inserted into
379  * the tuplestore, these eflags must not represent an increase in
380  * requirements.
381  */
382 int
tuplestore_alloc_read_pointer(Tuplestorestate * state,int eflags)383 tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
384 {
385 	/* Check for possible increase of requirements */
386 	if (state->status != TSS_INMEM || state->memtupcount != 0)
387 	{
388 		if ((state->eflags | eflags) != state->eflags)
389 			elog(ERROR, "too late to require new tuplestore eflags");
390 	}
391 
392 	/* Make room for another read pointer if needed */
393 	if (state->readptrcount >= state->readptrsize)
394 	{
395 		int			newcnt = state->readptrsize * 2;
396 
397 		state->readptrs = (TSReadPointer *)
398 			repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
399 		state->readptrsize = newcnt;
400 	}
401 
402 	/* And set it up */
403 	state->readptrs[state->readptrcount] = state->readptrs[0];
404 	state->readptrs[state->readptrcount].eflags = eflags;
405 
406 	state->eflags |= eflags;
407 
408 	return state->readptrcount++;
409 }
410 
411 /*
412  * tuplestore_clear
413  *
414  *	Delete all the contents of a tuplestore, and reset its read pointers
415  *	to the start.
416  */
417 void
tuplestore_clear(Tuplestorestate * state)418 tuplestore_clear(Tuplestorestate *state)
419 {
420 	int			i;
421 	TSReadPointer *readptr;
422 
423 	if (state->myfile)
424 		BufFileClose(state->myfile);
425 	state->myfile = NULL;
426 	if (state->memtuples)
427 	{
428 		for (i = state->memtupdeleted; i < state->memtupcount; i++)
429 		{
430 			FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
431 			pfree(state->memtuples[i]);
432 		}
433 	}
434 	state->status = TSS_INMEM;
435 	state->truncated = false;
436 	state->memtupdeleted = 0;
437 	state->memtupcount = 0;
438 	state->tuples = 0;
439 	readptr = state->readptrs;
440 	for (i = 0; i < state->readptrcount; readptr++, i++)
441 	{
442 		readptr->eof_reached = false;
443 		readptr->current = 0;
444 	}
445 }
446 
447 /*
448  * tuplestore_end
449  *
450  *	Release resources and clean up.
451  */
452 void
tuplestore_end(Tuplestorestate * state)453 tuplestore_end(Tuplestorestate *state)
454 {
455 	int			i;
456 
457 	if (state->myfile)
458 		BufFileClose(state->myfile);
459 	if (state->memtuples)
460 	{
461 		for (i = state->memtupdeleted; i < state->memtupcount; i++)
462 			pfree(state->memtuples[i]);
463 		pfree(state->memtuples);
464 	}
465 	pfree(state->readptrs);
466 	pfree(state);
467 }
468 
469 /*
470  * tuplestore_select_read_pointer - make the specified read pointer active
471  */
472 void
tuplestore_select_read_pointer(Tuplestorestate * state,int ptr)473 tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
474 {
475 	TSReadPointer *readptr;
476 	TSReadPointer *oldptr;
477 
478 	Assert(ptr >= 0 && ptr < state->readptrcount);
479 
480 	/* No work if already active */
481 	if (ptr == state->activeptr)
482 		return;
483 
484 	readptr = &state->readptrs[ptr];
485 	oldptr = &state->readptrs[state->activeptr];
486 
487 	switch (state->status)
488 	{
489 		case TSS_INMEM:
490 		case TSS_WRITEFILE:
491 			/* no work */
492 			break;
493 		case TSS_READFILE:
494 
495 			/*
496 			 * First, save the current read position in the pointer about to
497 			 * become inactive.
498 			 */
499 			if (!oldptr->eof_reached)
500 				BufFileTell(state->myfile,
501 							&oldptr->file,
502 							&oldptr->offset);
503 
504 			/*
505 			 * We have to make the temp file's seek position equal to the
506 			 * logical position of the new read pointer.  In eof_reached
507 			 * state, that's the EOF, which we have available from the saved
508 			 * write position.
509 			 */
510 			if (readptr->eof_reached)
511 			{
512 				if (BufFileSeek(state->myfile,
513 								state->writepos_file,
514 								state->writepos_offset,
515 								SEEK_SET) != 0)
516 					ereport(ERROR,
517 							(errcode_for_file_access(),
518 							 errmsg("could not seek in tuplestore temporary file")));
519 			}
520 			else
521 			{
522 				if (BufFileSeek(state->myfile,
523 								readptr->file,
524 								readptr->offset,
525 								SEEK_SET) != 0)
526 					ereport(ERROR,
527 							(errcode_for_file_access(),
528 							 errmsg("could not seek in tuplestore temporary file")));
529 			}
530 			break;
531 		default:
532 			elog(ERROR, "invalid tuplestore state");
533 			break;
534 	}
535 
536 	state->activeptr = ptr;
537 }
538 
539 /*
540  * tuplestore_tuple_count
541  *
542  * Returns the number of tuples added since creation or the last
543  * tuplestore_clear().
544  */
545 int64
tuplestore_tuple_count(Tuplestorestate * state)546 tuplestore_tuple_count(Tuplestorestate *state)
547 {
548 	return state->tuples;
549 }
550 
551 /*
552  * tuplestore_ateof
553  *
554  * Returns the active read pointer's eof_reached state.
555  */
556 bool
tuplestore_ateof(Tuplestorestate * state)557 tuplestore_ateof(Tuplestorestate *state)
558 {
559 	return state->readptrs[state->activeptr].eof_reached;
560 }
561 
562 /*
563  * Grow the memtuples[] array, if possible within our memory constraint.  We
564  * must not exceed INT_MAX tuples in memory or the caller-provided memory
565  * limit.  Return true if we were able to enlarge the array, false if not.
566  *
567  * Normally, at each increment we double the size of the array.  When doing
568  * that would exceed a limit, we attempt one last, smaller increase (and then
569  * clear the growmemtuples flag so we don't try any more).  That allows us to
570  * use memory as fully as permitted; sticking to the pure doubling rule could
571  * result in almost half going unused.  Because availMem moves around with
572  * tuple addition/removal, we need some rule to prevent making repeated small
573  * increases in memtupsize, which would just be useless thrashing.  The
574  * growmemtuples flag accomplishes that and also prevents useless
575  * recalculations in this function.
576  */
577 static bool
grow_memtuples(Tuplestorestate * state)578 grow_memtuples(Tuplestorestate *state)
579 {
580 	int			newmemtupsize;
581 	int			memtupsize = state->memtupsize;
582 	int64		memNowUsed = state->allowedMem - state->availMem;
583 
584 	/* Forget it if we've already maxed out memtuples, per comment above */
585 	if (!state->growmemtuples)
586 		return false;
587 
588 	/* Select new value of memtupsize */
589 	if (memNowUsed <= state->availMem)
590 	{
591 		/*
592 		 * We've used no more than half of allowedMem; double our usage,
593 		 * clamping at INT_MAX tuples.
594 		 */
595 		if (memtupsize < INT_MAX / 2)
596 			newmemtupsize = memtupsize * 2;
597 		else
598 		{
599 			newmemtupsize = INT_MAX;
600 			state->growmemtuples = false;
601 		}
602 	}
603 	else
604 	{
605 		/*
606 		 * This will be the last increment of memtupsize.  Abandon doubling
607 		 * strategy and instead increase as much as we safely can.
608 		 *
609 		 * To stay within allowedMem, we can't increase memtupsize by more
610 		 * than availMem / sizeof(void *) elements. In practice, we want to
611 		 * increase it by considerably less, because we need to leave some
612 		 * space for the tuples to which the new array slots will refer.  We
613 		 * assume the new tuples will be about the same size as the tuples
614 		 * we've already seen, and thus we can extrapolate from the space
615 		 * consumption so far to estimate an appropriate new size for the
616 		 * memtuples array.  The optimal value might be higher or lower than
617 		 * this estimate, but it's hard to know that in advance.  We again
618 		 * clamp at INT_MAX tuples.
619 		 *
620 		 * This calculation is safe against enlarging the array so much that
621 		 * LACKMEM becomes true, because the memory currently used includes
622 		 * the present array; thus, there would be enough allowedMem for the
623 		 * new array elements even if no other memory were currently used.
624 		 *
625 		 * We do the arithmetic in float8, because otherwise the product of
626 		 * memtupsize and allowedMem could overflow.  Any inaccuracy in the
627 		 * result should be insignificant; but even if we computed a
628 		 * completely insane result, the checks below will prevent anything
629 		 * really bad from happening.
630 		 */
631 		double		grow_ratio;
632 
633 		grow_ratio = (double) state->allowedMem / (double) memNowUsed;
634 		if (memtupsize * grow_ratio < INT_MAX)
635 			newmemtupsize = (int) (memtupsize * grow_ratio);
636 		else
637 			newmemtupsize = INT_MAX;
638 
639 		/* We won't make any further enlargement attempts */
640 		state->growmemtuples = false;
641 	}
642 
643 	/* Must enlarge array by at least one element, else report failure */
644 	if (newmemtupsize <= memtupsize)
645 		goto noalloc;
646 
647 	/*
648 	 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
649 	 * to ensure our request won't be rejected.  Note that we can easily
650 	 * exhaust address space before facing this outcome.  (This is presently
651 	 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
652 	 * don't rely on that at this distance.)
653 	 */
654 	if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
655 	{
656 		newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
657 		state->growmemtuples = false;	/* can't grow any more */
658 	}
659 
660 	/*
661 	 * We need to be sure that we do not cause LACKMEM to become true, else
662 	 * the space management algorithm will go nuts.  The code above should
663 	 * never generate a dangerous request, but to be safe, check explicitly
664 	 * that the array growth fits within availMem.  (We could still cause
665 	 * LACKMEM if the memory chunk overhead associated with the memtuples
666 	 * array were to increase.  That shouldn't happen because we chose the
667 	 * initial array size large enough to ensure that palloc will be treating
668 	 * both old and new arrays as separate chunks.  But we'll check LACKMEM
669 	 * explicitly below just in case.)
670 	 */
671 	if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
672 		goto noalloc;
673 
674 	/* OK, do it */
675 	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
676 	state->memtupsize = newmemtupsize;
677 	state->memtuples = (void **)
678 		repalloc_huge(state->memtuples,
679 					  state->memtupsize * sizeof(void *));
680 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
681 	if (LACKMEM(state))
682 		elog(ERROR, "unexpected out-of-memory situation in tuplestore");
683 	return true;
684 
685 noalloc:
686 	/* If for any reason we didn't realloc, shut off future attempts */
687 	state->growmemtuples = false;
688 	return false;
689 }
690 
691 /*
692  * Accept one tuple and append it to the tuplestore.
693  *
694  * Note that the input tuple is always copied; the caller need not save it.
695  *
696  * If the active read pointer is currently "at EOF", it remains so (the read
697  * pointer implicitly advances along with the write pointer); otherwise the
698  * read pointer is unchanged.  Non-active read pointers do not move, which
699  * means they are certain to not be "at EOF" immediately after puttuple.
700  * This curious-seeming behavior is for the convenience of nodeMaterial.c and
701  * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
702  * steps.
703  *
704  * tuplestore_puttupleslot() is a convenience routine to collect data from
705  * a TupleTableSlot without an extra copy operation.
706  */
707 void
tuplestore_puttupleslot(Tuplestorestate * state,TupleTableSlot * slot)708 tuplestore_puttupleslot(Tuplestorestate *state,
709 						TupleTableSlot *slot)
710 {
711 	MinimalTuple tuple;
712 	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
713 
714 	/*
715 	 * Form a MinimalTuple in working memory
716 	 */
717 	tuple = ExecCopySlotMinimalTuple(slot);
718 	USEMEM(state, GetMemoryChunkSpace(tuple));
719 
720 	tuplestore_puttuple_common(state, (void *) tuple);
721 
722 	MemoryContextSwitchTo(oldcxt);
723 }
724 
725 /*
726  * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
727  * deprecated, but not worth getting rid of in view of the number of callers.
728  */
729 void
tuplestore_puttuple(Tuplestorestate * state,HeapTuple tuple)730 tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
731 {
732 	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
733 
734 	/*
735 	 * Copy the tuple.  (Must do this even in WRITEFILE case.  Note that
736 	 * COPYTUP includes USEMEM, so we needn't do that here.)
737 	 */
738 	tuple = COPYTUP(state, tuple);
739 
740 	tuplestore_puttuple_common(state, (void *) tuple);
741 
742 	MemoryContextSwitchTo(oldcxt);
743 }
744 
745 /*
746  * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
747  * This avoids an extra tuple-construction operation.
748  */
749 void
tuplestore_putvalues(Tuplestorestate * state,TupleDesc tdesc,Datum * values,bool * isnull)750 tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
751 					 Datum *values, bool *isnull)
752 {
753 	MinimalTuple tuple;
754 	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
755 
756 	tuple = heap_form_minimal_tuple(tdesc, values, isnull);
757 	USEMEM(state, GetMemoryChunkSpace(tuple));
758 
759 	tuplestore_puttuple_common(state, (void *) tuple);
760 
761 	MemoryContextSwitchTo(oldcxt);
762 }
763 
764 static void
tuplestore_puttuple_common(Tuplestorestate * state,void * tuple)765 tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
766 {
767 	TSReadPointer *readptr;
768 	int			i;
769 	ResourceOwner oldowner;
770 
771 	state->tuples++;
772 
773 	switch (state->status)
774 	{
775 		case TSS_INMEM:
776 
777 			/*
778 			 * Update read pointers as needed; see API spec above.
779 			 */
780 			readptr = state->readptrs;
781 			for (i = 0; i < state->readptrcount; readptr++, i++)
782 			{
783 				if (readptr->eof_reached && i != state->activeptr)
784 				{
785 					readptr->eof_reached = false;
786 					readptr->current = state->memtupcount;
787 				}
788 			}
789 
790 			/*
791 			 * Grow the array as needed.  Note that we try to grow the array
792 			 * when there is still one free slot remaining --- if we fail,
793 			 * there'll still be room to store the incoming tuple, and then
794 			 * we'll switch to tape-based operation.
795 			 */
796 			if (state->memtupcount >= state->memtupsize - 1)
797 			{
798 				(void) grow_memtuples(state);
799 				Assert(state->memtupcount < state->memtupsize);
800 			}
801 
802 			/* Stash the tuple in the in-memory array */
803 			state->memtuples[state->memtupcount++] = tuple;
804 
805 			/*
806 			 * Done if we still fit in available memory and have array slots.
807 			 */
808 			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
809 				return;
810 
811 			/*
812 			 * Nope; time to switch to tape-based operation.  Make sure that
813 			 * the temp file(s) are created in suitable temp tablespaces.
814 			 */
815 			PrepareTempTablespaces();
816 
817 			/* associate the file with the store's resource owner */
818 			oldowner = CurrentResourceOwner;
819 			CurrentResourceOwner = state->resowner;
820 
821 			state->myfile = BufFileCreateTemp(state->interXact);
822 
823 			CurrentResourceOwner = oldowner;
824 
825 			/*
826 			 * Freeze the decision about whether trailing length words will be
827 			 * used.  We can't change this choice once data is on tape, even
828 			 * though callers might drop the requirement.
829 			 */
830 			state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
831 			state->status = TSS_WRITEFILE;
832 			dumptuples(state);
833 			break;
834 		case TSS_WRITEFILE:
835 
836 			/*
837 			 * Update read pointers as needed; see API spec above. Note:
838 			 * BufFileTell is quite cheap, so not worth trying to avoid
839 			 * multiple calls.
840 			 */
841 			readptr = state->readptrs;
842 			for (i = 0; i < state->readptrcount; readptr++, i++)
843 			{
844 				if (readptr->eof_reached && i != state->activeptr)
845 				{
846 					readptr->eof_reached = false;
847 					BufFileTell(state->myfile,
848 								&readptr->file,
849 								&readptr->offset);
850 				}
851 			}
852 
853 			WRITETUP(state, tuple);
854 			break;
855 		case TSS_READFILE:
856 
857 			/*
858 			 * Switch from reading to writing.
859 			 */
860 			if (!state->readptrs[state->activeptr].eof_reached)
861 				BufFileTell(state->myfile,
862 							&state->readptrs[state->activeptr].file,
863 							&state->readptrs[state->activeptr].offset);
864 			if (BufFileSeek(state->myfile,
865 							state->writepos_file, state->writepos_offset,
866 							SEEK_SET) != 0)
867 				ereport(ERROR,
868 						(errcode_for_file_access(),
869 						 errmsg("could not seek in tuplestore temporary file")));
870 			state->status = TSS_WRITEFILE;
871 
872 			/*
873 			 * Update read pointers as needed; see API spec above.
874 			 */
875 			readptr = state->readptrs;
876 			for (i = 0; i < state->readptrcount; readptr++, i++)
877 			{
878 				if (readptr->eof_reached && i != state->activeptr)
879 				{
880 					readptr->eof_reached = false;
881 					readptr->file = state->writepos_file;
882 					readptr->offset = state->writepos_offset;
883 				}
884 			}
885 
886 			WRITETUP(state, tuple);
887 			break;
888 		default:
889 			elog(ERROR, "invalid tuplestore state");
890 			break;
891 	}
892 }
893 
894 /*
895  * Fetch the next tuple in either forward or back direction.
896  * Returns NULL if no more tuples.  If should_free is set, the
897  * caller must pfree the returned tuple when done with it.
898  *
899  * Backward scan is only allowed if randomAccess was set true or
900  * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
901  */
902 static void *
tuplestore_gettuple(Tuplestorestate * state,bool forward,bool * should_free)903 tuplestore_gettuple(Tuplestorestate *state, bool forward,
904 					bool *should_free)
905 {
906 	TSReadPointer *readptr = &state->readptrs[state->activeptr];
907 	unsigned int tuplen;
908 	void	   *tup;
909 
910 	Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
911 
912 	switch (state->status)
913 	{
914 		case TSS_INMEM:
915 			*should_free = false;
916 			if (forward)
917 			{
918 				if (readptr->eof_reached)
919 					return NULL;
920 				if (readptr->current < state->memtupcount)
921 				{
922 					/* We have another tuple, so return it */
923 					return state->memtuples[readptr->current++];
924 				}
925 				readptr->eof_reached = true;
926 				return NULL;
927 			}
928 			else
929 			{
930 				/*
931 				 * if all tuples are fetched already then we return last
932 				 * tuple, else tuple before last returned.
933 				 */
934 				if (readptr->eof_reached)
935 				{
936 					readptr->current = state->memtupcount;
937 					readptr->eof_reached = false;
938 				}
939 				else
940 				{
941 					if (readptr->current <= state->memtupdeleted)
942 					{
943 						Assert(!state->truncated);
944 						return NULL;
945 					}
946 					readptr->current--; /* last returned tuple */
947 				}
948 				if (readptr->current <= state->memtupdeleted)
949 				{
950 					Assert(!state->truncated);
951 					return NULL;
952 				}
953 				return state->memtuples[readptr->current - 1];
954 			}
955 			break;
956 
957 		case TSS_WRITEFILE:
958 			/* Skip state change if we'll just return NULL */
959 			if (readptr->eof_reached && forward)
960 				return NULL;
961 
962 			/*
963 			 * Switch from writing to reading.
964 			 */
965 			BufFileTell(state->myfile,
966 						&state->writepos_file, &state->writepos_offset);
967 			if (!readptr->eof_reached)
968 				if (BufFileSeek(state->myfile,
969 								readptr->file, readptr->offset,
970 								SEEK_SET) != 0)
971 					ereport(ERROR,
972 							(errcode_for_file_access(),
973 							 errmsg("could not seek in tuplestore temporary file")));
974 			state->status = TSS_READFILE;
975 			/* FALLTHROUGH */
976 
977 		case TSS_READFILE:
978 			*should_free = true;
979 			if (forward)
980 			{
981 				if ((tuplen = getlen(state, true)) != 0)
982 				{
983 					tup = READTUP(state, tuplen);
984 					return tup;
985 				}
986 				else
987 				{
988 					readptr->eof_reached = true;
989 					return NULL;
990 				}
991 			}
992 
993 			/*
994 			 * Backward.
995 			 *
996 			 * if all tuples are fetched already then we return last tuple,
997 			 * else tuple before last returned.
998 			 *
999 			 * Back up to fetch previously-returned tuple's ending length
1000 			 * word. If seek fails, assume we are at start of file.
1001 			 */
1002 			if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
1003 							SEEK_CUR) != 0)
1004 			{
1005 				/* even a failed backwards fetch gets you out of eof state */
1006 				readptr->eof_reached = false;
1007 				Assert(!state->truncated);
1008 				return NULL;
1009 			}
1010 			tuplen = getlen(state, false);
1011 
1012 			if (readptr->eof_reached)
1013 			{
1014 				readptr->eof_reached = false;
1015 				/* We will return the tuple returned before returning NULL */
1016 			}
1017 			else
1018 			{
1019 				/*
1020 				 * Back up to get ending length word of tuple before it.
1021 				 */
1022 				if (BufFileSeek(state->myfile, 0,
1023 								-(long) (tuplen + 2 * sizeof(unsigned int)),
1024 								SEEK_CUR) != 0)
1025 				{
1026 					/*
1027 					 * If that fails, presumably the prev tuple is the first
1028 					 * in the file.  Back up so that it becomes next to read
1029 					 * in forward direction (not obviously right, but that is
1030 					 * what in-memory case does).
1031 					 */
1032 					if (BufFileSeek(state->myfile, 0,
1033 									-(long) (tuplen + sizeof(unsigned int)),
1034 									SEEK_CUR) != 0)
1035 						ereport(ERROR,
1036 								(errcode_for_file_access(),
1037 								 errmsg("could not seek in tuplestore temporary file")));
1038 					Assert(!state->truncated);
1039 					return NULL;
1040 				}
1041 				tuplen = getlen(state, false);
1042 			}
1043 
1044 			/*
1045 			 * Now we have the length of the prior tuple, back up and read it.
1046 			 * Note: READTUP expects we are positioned after the initial
1047 			 * length word of the tuple, so back up to that point.
1048 			 */
1049 			if (BufFileSeek(state->myfile, 0,
1050 							-(long) tuplen,
1051 							SEEK_CUR) != 0)
1052 				ereport(ERROR,
1053 						(errcode_for_file_access(),
1054 						 errmsg("could not seek in tuplestore temporary file")));
1055 			tup = READTUP(state, tuplen);
1056 			return tup;
1057 
1058 		default:
1059 			elog(ERROR, "invalid tuplestore state");
1060 			return NULL;		/* keep compiler quiet */
1061 	}
1062 }
1063 
1064 /*
1065  * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1066  *
1067  * If successful, put tuple in slot and return true; else, clear the slot
1068  * and return false.
1069  *
1070  * If copy is true, the slot receives a copied tuple (allocated in current
1071  * memory context) that will stay valid regardless of future manipulations of
1072  * the tuplestore's state.  If copy is false, the slot may just receive a
1073  * pointer to a tuple held within the tuplestore.  The latter is more
1074  * efficient but the slot contents may be corrupted if additional writes to
1075  * the tuplestore occur.  (If using tuplestore_trim, see comments therein.)
1076  */
1077 bool
tuplestore_gettupleslot(Tuplestorestate * state,bool forward,bool copy,TupleTableSlot * slot)1078 tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1079 						bool copy, TupleTableSlot *slot)
1080 {
1081 	MinimalTuple tuple;
1082 	bool		should_free;
1083 
1084 	tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1085 
1086 	if (tuple)
1087 	{
1088 		if (copy && !should_free)
1089 		{
1090 			tuple = heap_copy_minimal_tuple(tuple);
1091 			should_free = true;
1092 		}
1093 		ExecStoreMinimalTuple(tuple, slot, should_free);
1094 		return true;
1095 	}
1096 	else
1097 	{
1098 		ExecClearTuple(slot);
1099 		return false;
1100 	}
1101 }
1102 
1103 /*
1104  * tuplestore_advance - exported function to adjust position without fetching
1105  *
1106  * We could optimize this case to avoid palloc/pfree overhead, but for the
1107  * moment it doesn't seem worthwhile.
1108  */
1109 bool
tuplestore_advance(Tuplestorestate * state,bool forward)1110 tuplestore_advance(Tuplestorestate *state, bool forward)
1111 {
1112 	void	   *tuple;
1113 	bool		should_free;
1114 
1115 	tuple = tuplestore_gettuple(state, forward, &should_free);
1116 
1117 	if (tuple)
1118 	{
1119 		if (should_free)
1120 			pfree(tuple);
1121 		return true;
1122 	}
1123 	else
1124 	{
1125 		return false;
1126 	}
1127 }
1128 
1129 /*
1130  * Advance over N tuples in either forward or back direction,
1131  * without returning any data.  N<=0 is a no-op.
1132  * Returns true if successful, false if ran out of tuples.
1133  */
1134 bool
tuplestore_skiptuples(Tuplestorestate * state,int64 ntuples,bool forward)1135 tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1136 {
1137 	TSReadPointer *readptr = &state->readptrs[state->activeptr];
1138 
1139 	Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1140 
1141 	if (ntuples <= 0)
1142 		return true;
1143 
1144 	switch (state->status)
1145 	{
1146 		case TSS_INMEM:
1147 			if (forward)
1148 			{
1149 				if (readptr->eof_reached)
1150 					return false;
1151 				if (state->memtupcount - readptr->current >= ntuples)
1152 				{
1153 					readptr->current += ntuples;
1154 					return true;
1155 				}
1156 				readptr->current = state->memtupcount;
1157 				readptr->eof_reached = true;
1158 				return false;
1159 			}
1160 			else
1161 			{
1162 				if (readptr->eof_reached)
1163 				{
1164 					readptr->current = state->memtupcount;
1165 					readptr->eof_reached = false;
1166 					ntuples--;
1167 				}
1168 				if (readptr->current - state->memtupdeleted > ntuples)
1169 				{
1170 					readptr->current -= ntuples;
1171 					return true;
1172 				}
1173 				Assert(!state->truncated);
1174 				readptr->current = state->memtupdeleted;
1175 				return false;
1176 			}
1177 			break;
1178 
1179 		default:
1180 			/* We don't currently try hard to optimize other cases */
1181 			while (ntuples-- > 0)
1182 			{
1183 				void	   *tuple;
1184 				bool		should_free;
1185 
1186 				tuple = tuplestore_gettuple(state, forward, &should_free);
1187 
1188 				if (tuple == NULL)
1189 					return false;
1190 				if (should_free)
1191 					pfree(tuple);
1192 				CHECK_FOR_INTERRUPTS();
1193 			}
1194 			return true;
1195 	}
1196 }
1197 
1198 /*
1199  * dumptuples - remove tuples from memory and write to tape
1200  *
1201  * As a side effect, we must convert each read pointer's position from
1202  * "current" to file/offset format.  But eof_reached pointers don't
1203  * need to change state.
1204  */
1205 static void
dumptuples(Tuplestorestate * state)1206 dumptuples(Tuplestorestate *state)
1207 {
1208 	int			i;
1209 
1210 	for (i = state->memtupdeleted;; i++)
1211 	{
1212 		TSReadPointer *readptr = state->readptrs;
1213 		int			j;
1214 
1215 		for (j = 0; j < state->readptrcount; readptr++, j++)
1216 		{
1217 			if (i == readptr->current && !readptr->eof_reached)
1218 				BufFileTell(state->myfile,
1219 							&readptr->file, &readptr->offset);
1220 		}
1221 		if (i >= state->memtupcount)
1222 			break;
1223 		WRITETUP(state, state->memtuples[i]);
1224 	}
1225 	state->memtupdeleted = 0;
1226 	state->memtupcount = 0;
1227 }
1228 
1229 /*
1230  * tuplestore_rescan		- rewind the active read pointer to start
1231  */
1232 void
tuplestore_rescan(Tuplestorestate * state)1233 tuplestore_rescan(Tuplestorestate *state)
1234 {
1235 	TSReadPointer *readptr = &state->readptrs[state->activeptr];
1236 
1237 	Assert(readptr->eflags & EXEC_FLAG_REWIND);
1238 	Assert(!state->truncated);
1239 
1240 	switch (state->status)
1241 	{
1242 		case TSS_INMEM:
1243 			readptr->eof_reached = false;
1244 			readptr->current = 0;
1245 			break;
1246 		case TSS_WRITEFILE:
1247 			readptr->eof_reached = false;
1248 			readptr->file = 0;
1249 			readptr->offset = 0L;
1250 			break;
1251 		case TSS_READFILE:
1252 			readptr->eof_reached = false;
1253 			if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
1254 				ereport(ERROR,
1255 						(errcode_for_file_access(),
1256 						 errmsg("could not seek in tuplestore temporary file")));
1257 			break;
1258 		default:
1259 			elog(ERROR, "invalid tuplestore state");
1260 			break;
1261 	}
1262 }
1263 
1264 /*
1265  * tuplestore_copy_read_pointer - copy a read pointer's state to another
1266  */
1267 void
tuplestore_copy_read_pointer(Tuplestorestate * state,int srcptr,int destptr)1268 tuplestore_copy_read_pointer(Tuplestorestate *state,
1269 							 int srcptr, int destptr)
1270 {
1271 	TSReadPointer *sptr = &state->readptrs[srcptr];
1272 	TSReadPointer *dptr = &state->readptrs[destptr];
1273 
1274 	Assert(srcptr >= 0 && srcptr < state->readptrcount);
1275 	Assert(destptr >= 0 && destptr < state->readptrcount);
1276 
1277 	/* Assigning to self is a no-op */
1278 	if (srcptr == destptr)
1279 		return;
1280 
1281 	if (dptr->eflags != sptr->eflags)
1282 	{
1283 		/* Possible change of overall eflags, so copy and then recompute */
1284 		int			eflags;
1285 		int			i;
1286 
1287 		*dptr = *sptr;
1288 		eflags = state->readptrs[0].eflags;
1289 		for (i = 1; i < state->readptrcount; i++)
1290 			eflags |= state->readptrs[i].eflags;
1291 		state->eflags = eflags;
1292 	}
1293 	else
1294 		*dptr = *sptr;
1295 
1296 	switch (state->status)
1297 	{
1298 		case TSS_INMEM:
1299 		case TSS_WRITEFILE:
1300 			/* no work */
1301 			break;
1302 		case TSS_READFILE:
1303 
1304 			/*
1305 			 * This case is a bit tricky since the active read pointer's
1306 			 * position corresponds to the seek point, not what is in its
1307 			 * variables.  Assigning to the active requires a seek, and
1308 			 * assigning from the active requires a tell, except when
1309 			 * eof_reached.
1310 			 */
1311 			if (destptr == state->activeptr)
1312 			{
1313 				if (dptr->eof_reached)
1314 				{
1315 					if (BufFileSeek(state->myfile,
1316 									state->writepos_file,
1317 									state->writepos_offset,
1318 									SEEK_SET) != 0)
1319 						ereport(ERROR,
1320 								(errcode_for_file_access(),
1321 								 errmsg("could not seek in tuplestore temporary file")));
1322 				}
1323 				else
1324 				{
1325 					if (BufFileSeek(state->myfile,
1326 									dptr->file, dptr->offset,
1327 									SEEK_SET) != 0)
1328 						ereport(ERROR,
1329 								(errcode_for_file_access(),
1330 								 errmsg("could not seek in tuplestore temporary file")));
1331 				}
1332 			}
1333 			else if (srcptr == state->activeptr)
1334 			{
1335 				if (!dptr->eof_reached)
1336 					BufFileTell(state->myfile,
1337 								&dptr->file,
1338 								&dptr->offset);
1339 			}
1340 			break;
1341 		default:
1342 			elog(ERROR, "invalid tuplestore state");
1343 			break;
1344 	}
1345 }
1346 
1347 /*
1348  * tuplestore_trim	- remove all no-longer-needed tuples
1349  *
1350  * Calling this function authorizes the tuplestore to delete all tuples
1351  * before the oldest read pointer, if no read pointer is marked as requiring
1352  * REWIND capability.
1353  *
1354  * Note: this is obviously safe if no pointer has BACKWARD capability either.
1355  * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1356  * the pointer can be moved backward but not before the oldest other read
1357  * pointer.
1358  */
1359 void
tuplestore_trim(Tuplestorestate * state)1360 tuplestore_trim(Tuplestorestate *state)
1361 {
1362 	int			oldest;
1363 	int			nremove;
1364 	int			i;
1365 
1366 	/*
1367 	 * Truncation is disallowed if any read pointer requires rewind
1368 	 * capability.
1369 	 */
1370 	if (state->eflags & EXEC_FLAG_REWIND)
1371 		return;
1372 
1373 	/*
1374 	 * We don't bother trimming temp files since it usually would mean more
1375 	 * work than just letting them sit in kernel buffers until they age out.
1376 	 */
1377 	if (state->status != TSS_INMEM)
1378 		return;
1379 
1380 	/* Find the oldest read pointer */
1381 	oldest = state->memtupcount;
1382 	for (i = 0; i < state->readptrcount; i++)
1383 	{
1384 		if (!state->readptrs[i].eof_reached)
1385 			oldest = Min(oldest, state->readptrs[i].current);
1386 	}
1387 
1388 	/*
1389 	 * Note: you might think we could remove all the tuples before the oldest
1390 	 * "current", since that one is the next to be returned.  However, since
1391 	 * tuplestore_gettuple returns a direct pointer to our internal copy of
1392 	 * the tuple, it's likely that the caller has still got the tuple just
1393 	 * before "current" referenced in a slot. So we keep one extra tuple
1394 	 * before the oldest "current".  (Strictly speaking, we could require such
1395 	 * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1396 	 * efficiency we allow this one case to not use "copy".)
1397 	 */
1398 	nremove = oldest - 1;
1399 	if (nremove <= 0)
1400 		return;					/* nothing to do */
1401 
1402 	Assert(nremove >= state->memtupdeleted);
1403 	Assert(nremove <= state->memtupcount);
1404 
1405 	/* Release no-longer-needed tuples */
1406 	for (i = state->memtupdeleted; i < nremove; i++)
1407 	{
1408 		FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1409 		pfree(state->memtuples[i]);
1410 		state->memtuples[i] = NULL;
1411 	}
1412 	state->memtupdeleted = nremove;
1413 
1414 	/* mark tuplestore as truncated (used for Assert crosschecks only) */
1415 	state->truncated = true;
1416 
1417 	/*
1418 	 * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1419 	 * "deleted" slots as NULL.  This prevents us from expending O(N^2) time
1420 	 * repeatedly memmove-ing a large pointer array.  The worst case space
1421 	 * wastage is pretty small, since it's just pointers and not whole tuples.
1422 	 */
1423 	if (nremove < state->memtupcount / 8)
1424 		return;
1425 
1426 	/*
1427 	 * Slide the array down and readjust pointers.
1428 	 *
1429 	 * In mergejoin's current usage, it's demonstrable that there will always
1430 	 * be exactly one non-removed tuple; so optimize that case.
1431 	 */
1432 	if (nremove + 1 == state->memtupcount)
1433 		state->memtuples[0] = state->memtuples[nremove];
1434 	else
1435 		memmove(state->memtuples, state->memtuples + nremove,
1436 				(state->memtupcount - nremove) * sizeof(void *));
1437 
1438 	state->memtupdeleted = 0;
1439 	state->memtupcount -= nremove;
1440 	for (i = 0; i < state->readptrcount; i++)
1441 	{
1442 		if (!state->readptrs[i].eof_reached)
1443 			state->readptrs[i].current -= nremove;
1444 	}
1445 }
1446 
1447 /*
1448  * tuplestore_in_memory
1449  *
1450  * Returns true if the tuplestore has not spilled to disk.
1451  *
1452  * XXX exposing this is a violation of modularity ... should get rid of it.
1453  */
1454 bool
tuplestore_in_memory(Tuplestorestate * state)1455 tuplestore_in_memory(Tuplestorestate *state)
1456 {
1457 	return (state->status == TSS_INMEM);
1458 }
1459 
1460 
1461 /*
1462  * Tape interface routines
1463  */
1464 
1465 static unsigned int
getlen(Tuplestorestate * state,bool eofOK)1466 getlen(Tuplestorestate *state, bool eofOK)
1467 {
1468 	unsigned int len;
1469 	size_t		nbytes;
1470 
1471 	nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
1472 	if (nbytes == sizeof(len))
1473 		return len;
1474 	if (nbytes != 0 || !eofOK)
1475 		ereport(ERROR,
1476 				(errcode_for_file_access(),
1477 				 errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes",
1478 						nbytes, sizeof(len))));
1479 	return 0;
1480 }
1481 
1482 
1483 /*
1484  * Routines specialized for HeapTuple case
1485  *
1486  * The stored form is actually a MinimalTuple, but for largely historical
1487  * reasons we allow COPYTUP to work from a HeapTuple.
1488  *
1489  * Since MinimalTuple already has length in its first word, we don't need
1490  * to write that separately.
1491  */
1492 
1493 static void *
copytup_heap(Tuplestorestate * state,void * tup)1494 copytup_heap(Tuplestorestate *state, void *tup)
1495 {
1496 	MinimalTuple tuple;
1497 
1498 	tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
1499 	USEMEM(state, GetMemoryChunkSpace(tuple));
1500 	return (void *) tuple;
1501 }
1502 
1503 static void
writetup_heap(Tuplestorestate * state,void * tup)1504 writetup_heap(Tuplestorestate *state, void *tup)
1505 {
1506 	MinimalTuple tuple = (MinimalTuple) tup;
1507 
1508 	/* the part of the MinimalTuple we'll write: */
1509 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1510 	unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1511 
1512 	/* total on-disk footprint: */
1513 	unsigned int tuplen = tupbodylen + sizeof(int);
1514 
1515 	BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen));
1516 	BufFileWrite(state->myfile, (void *) tupbody, tupbodylen);
1517 	if (state->backward)		/* need trailing length word? */
1518 		BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen));
1519 
1520 	FREEMEM(state, GetMemoryChunkSpace(tuple));
1521 	heap_free_minimal_tuple(tuple);
1522 }
1523 
1524 static void *
readtup_heap(Tuplestorestate * state,unsigned int len)1525 readtup_heap(Tuplestorestate *state, unsigned int len)
1526 {
1527 	unsigned int tupbodylen = len - sizeof(int);
1528 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1529 	MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1530 	char	   *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1531 	size_t		nread;
1532 
1533 	USEMEM(state, GetMemoryChunkSpace(tuple));
1534 	/* read in the tuple proper */
1535 	tuple->t_len = tuplen;
1536 	nread = BufFileRead(state->myfile, (void *) tupbody, tupbodylen);
1537 	if (nread != (size_t) tupbodylen)
1538 		ereport(ERROR,
1539 				(errcode_for_file_access(),
1540 				 errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes",
1541 						nread, (size_t) tupbodylen)));
1542 	if (state->backward)		/* need trailing length word? */
1543 	{
1544 		nread = BufFileRead(state->myfile, (void *) &tuplen, sizeof(tuplen));
1545 		if (nread != sizeof(tuplen))
1546 			ereport(ERROR,
1547 					(errcode_for_file_access(),
1548 					 errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes",
1549 							nread, sizeof(tuplen))));
1550 	}
1551 	return (void *) tuple;
1552 }
1553