/*------------------------------------------------------------------------- * * tuplestore.c * Generalized routines for temporary tuple storage. * * This module handles temporary storage of tuples for purposes such * as Materialize nodes, hashjoin batch files, etc. It is essentially * a dumbed-down version of tuplesort.c; it does no sorting of tuples * but can only store and regurgitate a sequence of tuples. However, * because no sort is required, it is allowed to start reading the sequence * before it has all been written. This is particularly useful for cursors, * because it allows random access within the already-scanned portion of * a query without having to process the underlying scan to completion. * Also, it is possible to support multiple independent read pointers. * * A temporary file is used to handle the data if it exceeds the * space limit specified by the caller. * * The (approximate) amount of memory allowed to the tuplestore is specified * in kilobytes by the caller. We absorb tuples and simply store them in an * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed * maxKBytes, we dump all the tuples into a temp file and then read from that * when needed. * * Upon creation, a tuplestore supports a single read pointer, numbered 0. * Additional read pointers can be created using tuplestore_alloc_read_pointer. * Mark/restore behavior is supported by copying read pointers. * * When the caller requests backward-scan capability, we write the temp file * in a format that allows either forward or backward scan. Otherwise, only * forward scan is allowed. A request for backward scan must be made before * putting any tuples into the tuplestore. Rewind is normally allowed but * can be turned off via tuplestore_set_eflags; turning off rewind for all * read pointers enables truncation of the tuplestore at the oldest read point * for minimal memory usage. (The caller must explicitly call tuplestore_trim * at appropriate times for truncation to actually happen.) * * Note: in TSS_WRITEFILE state, the temp file's seek position is the * current write position, and the write-position variables in the tuplestore * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's * seek position is the active read pointer's position, and that read pointer * isn't kept up to date. We update the appropriate variables using ftell() * before switching to the other state or activating a different read pointer. * * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/utils/sort/tuplestore.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include "access/htup_details.h" #include "commands/tablespace.h" #include "executor/executor.h" #include "miscadmin.h" #include "storage/buffile.h" #include "utils/memutils.h" #include "utils/resowner.h" /* * Possible states of a Tuplestore object. These denote the states that * persist between calls of Tuplestore routines. */ typedef enum { TSS_INMEM, /* Tuples still fit in memory */ TSS_WRITEFILE, /* Writing to temp file */ TSS_READFILE /* Reading from temp file */ } TupStoreStatus; /* * State for a single read pointer. If we are in state INMEM then all the * read pointers' "current" fields denote the read positions. In state * WRITEFILE, the file/offset fields denote the read positions. In state * READFILE, inactive read pointers have valid file/offset, but the active * read pointer implicitly has position equal to the temp file's seek position. * * Special case: if eof_reached is true, then the pointer's read position is * implicitly equal to the write position, and current/file/offset aren't * maintained. This way we need not update all the read pointers each time * we write. */ typedef struct { int eflags; /* capability flags */ bool eof_reached; /* read has reached EOF */ int current; /* next array index to read */ int file; /* temp file# */ off_t offset; /* byte offset in file */ } TSReadPointer; /* * Private state of a Tuplestore operation. */ struct Tuplestorestate { TupStoreStatus status; /* enumerated value as shown above */ int eflags; /* capability flags (OR of pointers' flags) */ bool backward; /* store extra length words in file? */ bool interXact; /* keep open through transactions? */ bool truncated; /* tuplestore_trim has removed tuples? */ int64 availMem; /* remaining memory available, in bytes */ int64 allowedMem; /* total memory allowed, in bytes */ int64 tuples; /* number of tuples added */ BufFile *myfile; /* underlying file, or NULL if none */ MemoryContext context; /* memory context for holding tuples */ ResourceOwner resowner; /* resowner for holding temp files */ /* * These function pointers decouple the routines that must know what kind * of tuple we are handling from the routines that don't need to know it. * They are set up by the tuplestore_begin_xxx routines. * * (Although tuplestore.c currently only supports heap tuples, I've copied * this part of tuplesort.c so that extension to other kinds of objects * will be easy if it's ever needed.) * * Function to copy a supplied input tuple into palloc'd space. (NB: we * assume that a single pfree() is enough to release the tuple later, so * the representation must be "flat" in one palloc chunk.) state->availMem * must be decreased by the amount of space used. */ void *(*copytup) (Tuplestorestate *state, void *tup); /* * Function to write a stored tuple onto tape. The representation of the * tuple on tape need not be the same as it is in memory; requirements on * the tape representation are given below. After writing the tuple, * pfree() it, and increase state->availMem by the amount of memory space * thereby released. */ void (*writetup) (Tuplestorestate *state, void *tup); /* * Function to read a stored tuple from tape back into memory. 'len' is * the already-read length of the stored tuple. Create and return a * palloc'd copy, and decrease state->availMem by the amount of memory * space consumed. */ void *(*readtup) (Tuplestorestate *state, unsigned int len); /* * This array holds pointers to tuples in memory if we are in state INMEM. * In states WRITEFILE and READFILE it's not used. * * When memtupdeleted > 0, the first memtupdeleted pointers are already * released due to a tuplestore_trim() operation, but we haven't expended * the effort to slide the remaining pointers down. These unused pointers * are set to NULL to catch any invalid accesses. Note that memtupcount * includes the deleted pointers. */ void **memtuples; /* array of pointers to palloc'd tuples */ int memtupdeleted; /* the first N slots are currently unused */ int memtupcount; /* number of tuples currently present */ int memtupsize; /* allocated length of memtuples array */ bool growmemtuples; /* memtuples' growth still underway? */ /* * These variables are used to keep track of the current positions. * * In state WRITEFILE, the current file seek position is the write point; * in state READFILE, the write position is remembered in writepos_xxx. * (The write position is the same as EOF, but since BufFileSeek doesn't * currently implement SEEK_END, we have to remember it explicitly.) */ TSReadPointer *readptrs; /* array of read pointers */ int activeptr; /* index of the active read pointer */ int readptrcount; /* number of pointers currently valid */ int readptrsize; /* allocated length of readptrs array */ int writepos_file; /* file# (valid if READFILE state) */ off_t writepos_offset; /* offset (valid if READFILE state) */ }; #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup)) #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup)) #define READTUP(state,len) ((*(state)->readtup) (state, len)) #define LACKMEM(state) ((state)->availMem < 0) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) /*-------------------- * * NOTES about on-tape representation of tuples: * * We require the first "unsigned int" of a stored tuple to be the total size * on-tape of the tuple, including itself (so it is never zero). * The remainder of the stored tuple * may or may not match the in-memory representation of the tuple --- * any conversion needed is the job of the writetup and readtup routines. * * If state->backward is true, then the stored representation of * the tuple must be followed by another "unsigned int" that is a copy of the * length --- so the total tape space used is actually sizeof(unsigned int) * more than the stored length value. This allows read-backwards. When * state->backward is not set, the write/read routines may omit the extra * length word. * * writetup is expected to write both length words as well as the tuple * data. When readtup is called, the tape is positioned just after the * front length word; readtup must read the tuple data and advance past * the back length word (if present). * * The write/read routines can make use of the tuple description data * stored in the Tuplestorestate record, if needed. They are also expected * to adjust state->availMem by the amount of memory space (not tape space!) * released or consumed. There is no error return from either writetup * or readtup; they should ereport() on failure. * * * NOTES about memory consumption calculations: * * We count space allocated for tuples against the maxKBytes limit, * plus the space used by the variable-size array memtuples. * Fixed-size space (primarily the BufFile I/O buffer) is not counted. * We don't worry about the size of the read pointer array, either. * * Note that we count actual space used (as shown by GetMemoryChunkSpace) * rather than the originally-requested size. This is important since * palloc can add substantial overhead. It's not a complete answer since * we won't count any wasted space in palloc allocation blocks, but it's * a lot better than what we were doing before 7.3. * *-------------------- */ static Tuplestorestate *tuplestore_begin_common(int eflags, bool interXact, int maxKBytes); static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple); static void dumptuples(Tuplestorestate *state); static unsigned int getlen(Tuplestorestate *state, bool eofOK); static void *copytup_heap(Tuplestorestate *state, void *tup); static void writetup_heap(Tuplestorestate *state, void *tup); static void *readtup_heap(Tuplestorestate *state, unsigned int len); /* * tuplestore_begin_xxx * * Initialize for a tuple store operation. */ static Tuplestorestate * tuplestore_begin_common(int eflags, bool interXact, int maxKBytes) { Tuplestorestate *state; state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate)); state->status = TSS_INMEM; state->eflags = eflags; state->interXact = interXact; state->truncated = false; state->allowedMem = maxKBytes * 1024L; state->availMem = state->allowedMem; state->myfile = NULL; state->context = CurrentMemoryContext; state->resowner = CurrentResourceOwner; state->memtupdeleted = 0; state->memtupcount = 0; state->tuples = 0; /* * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; * see comments in grow_memtuples(). */ state->memtupsize = Max(16384 / sizeof(void *), ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1); state->growmemtuples = true; state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->activeptr = 0; state->readptrcount = 1; state->readptrsize = 8; /* arbitrary */ state->readptrs = (TSReadPointer *) palloc(state->readptrsize * sizeof(TSReadPointer)); state->readptrs[0].eflags = eflags; state->readptrs[0].eof_reached = false; state->readptrs[0].current = 0; return state; } /* * tuplestore_begin_heap * * Create a new tuplestore; other types of tuple stores (other than * "heap" tuple stores, for heap tuples) are possible, but not presently * implemented. * * randomAccess: if true, both forward and backward accesses to the * tuple store are allowed. * * interXact: if true, the files used for on-disk storage persist beyond the * end of the current transaction. NOTE: It's the caller's responsibility to * create such a tuplestore in a memory context and resource owner that will * also survive transaction boundaries, and to ensure the tuplestore is closed * when it's no longer wanted. * * maxKBytes: how much data to store in memory (any data beyond this * amount is paged to disk). When in doubt, use work_mem. */ Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) { Tuplestorestate *state; int eflags; /* * This interpretation of the meaning of randomAccess is compatible with * the pre-8.3 behavior of tuplestores. */ eflags = randomAccess ? (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : (EXEC_FLAG_REWIND); state = tuplestore_begin_common(eflags, interXact, maxKBytes); state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; return state; } /* * tuplestore_set_eflags * * Set the capability flags for read pointer 0 at a finer grain than is * allowed by tuplestore_begin_xxx. This must be called before inserting * any data into the tuplestore. * * eflags is a bitmask following the meanings used for executor node * startup flags (see executor.h). tuplestore pays attention to these bits: * EXEC_FLAG_REWIND need rewind to start * EXEC_FLAG_BACKWARD need backward fetch * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD * is set per "randomAccess" in the tuplestore_begin_xxx call. * * NOTE: setting BACKWARD without REWIND means the pointer can read backwards, * but not further than the truncation point (the furthest-back read pointer * position at the time of the last tuplestore_trim call). */ void tuplestore_set_eflags(Tuplestorestate *state, int eflags) { int i; if (state->status != TSS_INMEM || state->memtupcount != 0) elog(ERROR, "too late to call tuplestore_set_eflags"); state->readptrs[0].eflags = eflags; for (i = 1; i < state->readptrcount; i++) eflags |= state->readptrs[i].eflags; state->eflags = eflags; } /* * tuplestore_alloc_read_pointer - allocate another read pointer. * * Returns the pointer's index. * * The new pointer initially copies the position of read pointer 0. * It can have its own eflags, but if any data has been inserted into * the tuplestore, these eflags must not represent an increase in * requirements. */ int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags) { /* Check for possible increase of requirements */ if (state->status != TSS_INMEM || state->memtupcount != 0) { if ((state->eflags | eflags) != state->eflags) elog(ERROR, "too late to require new tuplestore eflags"); } /* Make room for another read pointer if needed */ if (state->readptrcount >= state->readptrsize) { int newcnt = state->readptrsize * 2; state->readptrs = (TSReadPointer *) repalloc(state->readptrs, newcnt * sizeof(TSReadPointer)); state->readptrsize = newcnt; } /* And set it up */ state->readptrs[state->readptrcount] = state->readptrs[0]; state->readptrs[state->readptrcount].eflags = eflags; state->eflags |= eflags; return state->readptrcount++; } /* * tuplestore_clear * * Delete all the contents of a tuplestore, and reset its read pointers * to the start. */ void tuplestore_clear(Tuplestorestate *state) { int i; TSReadPointer *readptr; if (state->myfile) BufFileClose(state->myfile); state->myfile = NULL; if (state->memtuples) { for (i = state->memtupdeleted; i < state->memtupcount; i++) { FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i])); pfree(state->memtuples[i]); } } state->status = TSS_INMEM; state->truncated = false; state->memtupdeleted = 0; state->memtupcount = 0; state->tuples = 0; readptr = state->readptrs; for (i = 0; i < state->readptrcount; readptr++, i++) { readptr->eof_reached = false; readptr->current = 0; } } /* * tuplestore_end * * Release resources and clean up. */ void tuplestore_end(Tuplestorestate *state) { int i; if (state->myfile) BufFileClose(state->myfile); if (state->memtuples) { for (i = state->memtupdeleted; i < state->memtupcount; i++) pfree(state->memtuples[i]); pfree(state->memtuples); } pfree(state->readptrs); pfree(state); } /* * tuplestore_select_read_pointer - make the specified read pointer active */ void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr) { TSReadPointer *readptr; TSReadPointer *oldptr; Assert(ptr >= 0 && ptr < state->readptrcount); /* No work if already active */ if (ptr == state->activeptr) return; readptr = &state->readptrs[ptr]; oldptr = &state->readptrs[state->activeptr]; switch (state->status) { case TSS_INMEM: case TSS_WRITEFILE: /* no work */ break; case TSS_READFILE: /* * First, save the current read position in the pointer about to * become inactive. */ if (!oldptr->eof_reached) BufFileTell(state->myfile, &oldptr->file, &oldptr->offset); /* * We have to make the temp file's seek position equal to the * logical position of the new read pointer. In eof_reached * state, that's the EOF, which we have available from the saved * write position. */ if (readptr->eof_reached) { if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); } else { if (BufFileSeek(state->myfile, readptr->file, readptr->offset, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); } break; default: elog(ERROR, "invalid tuplestore state"); break; } state->activeptr = ptr; } /* * tuplestore_tuple_count * * Returns the number of tuples added since creation or the last * tuplestore_clear(). */ int64 tuplestore_tuple_count(Tuplestorestate *state) { return state->tuples; } /* * tuplestore_ateof * * Returns the active read pointer's eof_reached state. */ bool tuplestore_ateof(Tuplestorestate *state) { return state->readptrs[state->activeptr].eof_reached; } /* * Grow the memtuples[] array, if possible within our memory constraint. We * must not exceed INT_MAX tuples in memory or the caller-provided memory * limit. Return true if we were able to enlarge the array, false if not. * * Normally, at each increment we double the size of the array. When doing * that would exceed a limit, we attempt one last, smaller increase (and then * clear the growmemtuples flag so we don't try any more). That allows us to * use memory as fully as permitted; sticking to the pure doubling rule could * result in almost half going unused. Because availMem moves around with * tuple addition/removal, we need some rule to prevent making repeated small * increases in memtupsize, which would just be useless thrashing. The * growmemtuples flag accomplishes that and also prevents useless * recalculations in this function. */ static bool grow_memtuples(Tuplestorestate *state) { int newmemtupsize; int memtupsize = state->memtupsize; int64 memNowUsed = state->allowedMem - state->availMem; /* Forget it if we've already maxed out memtuples, per comment above */ if (!state->growmemtuples) return false; /* Select new value of memtupsize */ if (memNowUsed <= state->availMem) { /* * We've used no more than half of allowedMem; double our usage, * clamping at INT_MAX tuples. */ if (memtupsize < INT_MAX / 2) newmemtupsize = memtupsize * 2; else { newmemtupsize = INT_MAX; state->growmemtuples = false; } } else { /* * This will be the last increment of memtupsize. Abandon doubling * strategy and instead increase as much as we safely can. * * To stay within allowedMem, we can't increase memtupsize by more * than availMem / sizeof(void *) elements. In practice, we want to * increase it by considerably less, because we need to leave some * space for the tuples to which the new array slots will refer. We * assume the new tuples will be about the same size as the tuples * we've already seen, and thus we can extrapolate from the space * consumption so far to estimate an appropriate new size for the * memtuples array. The optimal value might be higher or lower than * this estimate, but it's hard to know that in advance. We again * clamp at INT_MAX tuples. * * This calculation is safe against enlarging the array so much that * LACKMEM becomes true, because the memory currently used includes * the present array; thus, there would be enough allowedMem for the * new array elements even if no other memory were currently used. * * We do the arithmetic in float8, because otherwise the product of * memtupsize and allowedMem could overflow. Any inaccuracy in the * result should be insignificant; but even if we computed a * completely insane result, the checks below will prevent anything * really bad from happening. */ double grow_ratio; grow_ratio = (double) state->allowedMem / (double) memNowUsed; if (memtupsize * grow_ratio < INT_MAX) newmemtupsize = (int) (memtupsize * grow_ratio); else newmemtupsize = INT_MAX; /* We won't make any further enlargement attempts */ state->growmemtuples = false; } /* Must enlarge array by at least one element, else report failure */ if (newmemtupsize <= memtupsize) goto noalloc; /* * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp * to ensure our request won't be rejected. Note that we can easily * exhaust address space before facing this outcome. (This is presently * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but * don't rely on that at this distance.) */ if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *)) { newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *)); state->growmemtuples = false; /* can't grow any more */ } /* * We need to be sure that we do not cause LACKMEM to become true, else * the space management algorithm will go nuts. The code above should * never generate a dangerous request, but to be safe, check explicitly * that the array growth fits within availMem. (We could still cause * LACKMEM if the memory chunk overhead associated with the memtuples * array were to increase. That shouldn't happen because we chose the * initial array size large enough to ensure that palloc will be treating * both old and new arrays as separate chunks. But we'll check LACKMEM * explicitly below just in case.) */ if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *))) goto noalloc; /* OK, do it */ FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->memtupsize = newmemtupsize; state->memtuples = (void **) repalloc_huge(state->memtuples, state->memtupsize * sizeof(void *)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); if (LACKMEM(state)) elog(ERROR, "unexpected out-of-memory situation in tuplestore"); return true; noalloc: /* If for any reason we didn't realloc, shut off future attempts */ state->growmemtuples = false; return false; } /* * Accept one tuple and append it to the tuplestore. * * Note that the input tuple is always copied; the caller need not save it. * * If the active read pointer is currently "at EOF", it remains so (the read * pointer implicitly advances along with the write pointer); otherwise the * read pointer is unchanged. Non-active read pointers do not move, which * means they are certain to not be "at EOF" immediately after puttuple. * This curious-seeming behavior is for the convenience of nodeMaterial.c and * nodeCtescan.c, which would otherwise need to do extra pointer repositioning * steps. * * tuplestore_puttupleslot() is a convenience routine to collect data from * a TupleTableSlot without an extra copy operation. */ void tuplestore_puttupleslot(Tuplestorestate *state, TupleTableSlot *slot) { MinimalTuple tuple; MemoryContext oldcxt = MemoryContextSwitchTo(state->context); /* * Form a MinimalTuple in working memory */ tuple = ExecCopySlotMinimalTuple(slot); USEMEM(state, GetMemoryChunkSpace(tuple)); tuplestore_puttuple_common(state, (void *) tuple); MemoryContextSwitchTo(oldcxt); } /* * "Standard" case to copy from a HeapTuple. This is actually now somewhat * deprecated, but not worth getting rid of in view of the number of callers. */ void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple) { MemoryContext oldcxt = MemoryContextSwitchTo(state->context); /* * Copy the tuple. (Must do this even in WRITEFILE case. Note that * COPYTUP includes USEMEM, so we needn't do that here.) */ tuple = COPYTUP(state, tuple); tuplestore_puttuple_common(state, (void *) tuple); MemoryContextSwitchTo(oldcxt); } /* * Similar to tuplestore_puttuple(), but work from values + nulls arrays. * This avoids an extra tuple-construction operation. */ void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull) { MinimalTuple tuple; MemoryContext oldcxt = MemoryContextSwitchTo(state->context); tuple = heap_form_minimal_tuple(tdesc, values, isnull); USEMEM(state, GetMemoryChunkSpace(tuple)); tuplestore_puttuple_common(state, (void *) tuple); MemoryContextSwitchTo(oldcxt); } static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) { TSReadPointer *readptr; int i; ResourceOwner oldowner; state->tuples++; switch (state->status) { case TSS_INMEM: /* * Update read pointers as needed; see API spec above. */ readptr = state->readptrs; for (i = 0; i < state->readptrcount; readptr++, i++) { if (readptr->eof_reached && i != state->activeptr) { readptr->eof_reached = false; readptr->current = state->memtupcount; } } /* * Grow the array as needed. Note that we try to grow the array * when there is still one free slot remaining --- if we fail, * there'll still be room to store the incoming tuple, and then * we'll switch to tape-based operation. */ if (state->memtupcount >= state->memtupsize - 1) { (void) grow_memtuples(state); Assert(state->memtupcount < state->memtupsize); } /* Stash the tuple in the in-memory array */ state->memtuples[state->memtupcount++] = tuple; /* * Done if we still fit in available memory and have array slots. */ if (state->memtupcount < state->memtupsize && !LACKMEM(state)) return; /* * Nope; time to switch to tape-based operation. Make sure that * the temp file(s) are created in suitable temp tablespaces. */ PrepareTempTablespaces(); /* associate the file with the store's resource owner */ oldowner = CurrentResourceOwner; CurrentResourceOwner = state->resowner; state->myfile = BufFileCreateTemp(state->interXact); CurrentResourceOwner = oldowner; /* * Freeze the decision about whether trailing length words will be * used. We can't change this choice once data is on tape, even * though callers might drop the requirement. */ state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0; state->status = TSS_WRITEFILE; dumptuples(state); break; case TSS_WRITEFILE: /* * Update read pointers as needed; see API spec above. Note: * BufFileTell is quite cheap, so not worth trying to avoid * multiple calls. */ readptr = state->readptrs; for (i = 0; i < state->readptrcount; readptr++, i++) { if (readptr->eof_reached && i != state->activeptr) { readptr->eof_reached = false; BufFileTell(state->myfile, &readptr->file, &readptr->offset); } } WRITETUP(state, tuple); break; case TSS_READFILE: /* * Switch from reading to writing. */ if (!state->readptrs[state->activeptr].eof_reached) BufFileTell(state->myfile, &state->readptrs[state->activeptr].file, &state->readptrs[state->activeptr].offset); if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); state->status = TSS_WRITEFILE; /* * Update read pointers as needed; see API spec above. */ readptr = state->readptrs; for (i = 0; i < state->readptrcount; readptr++, i++) { if (readptr->eof_reached && i != state->activeptr) { readptr->eof_reached = false; readptr->file = state->writepos_file; readptr->offset = state->writepos_offset; } } WRITETUP(state, tuple); break; default: elog(ERROR, "invalid tuplestore state"); break; } } /* * Fetch the next tuple in either forward or back direction. * Returns NULL if no more tuples. If should_free is set, the * caller must pfree the returned tuple when done with it. * * Backward scan is only allowed if randomAccess was set true or * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags(). */ static void * tuplestore_gettuple(Tuplestorestate *state, bool forward, bool *should_free) { TSReadPointer *readptr = &state->readptrs[state->activeptr]; unsigned int tuplen; void *tup; Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); switch (state->status) { case TSS_INMEM: *should_free = false; if (forward) { if (readptr->eof_reached) return NULL; if (readptr->current < state->memtupcount) { /* We have another tuple, so return it */ return state->memtuples[readptr->current++]; } readptr->eof_reached = true; return NULL; } else { /* * if all tuples are fetched already then we return last * tuple, else tuple before last returned. */ if (readptr->eof_reached) { readptr->current = state->memtupcount; readptr->eof_reached = false; } else { if (readptr->current <= state->memtupdeleted) { Assert(!state->truncated); return NULL; } readptr->current--; /* last returned tuple */ } if (readptr->current <= state->memtupdeleted) { Assert(!state->truncated); return NULL; } return state->memtuples[readptr->current - 1]; } break; case TSS_WRITEFILE: /* Skip state change if we'll just return NULL */ if (readptr->eof_reached && forward) return NULL; /* * Switch from writing to reading. */ BufFileTell(state->myfile, &state->writepos_file, &state->writepos_offset); if (!readptr->eof_reached) if (BufFileSeek(state->myfile, readptr->file, readptr->offset, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); state->status = TSS_READFILE; /* FALLTHROUGH */ case TSS_READFILE: *should_free = true; if (forward) { if ((tuplen = getlen(state, true)) != 0) { tup = READTUP(state, tuplen); return tup; } else { readptr->eof_reached = true; return NULL; } } /* * Backward. * * if all tuples are fetched already then we return last tuple, * else tuple before last returned. * * Back up to fetch previously-returned tuple's ending length * word. If seek fails, assume we are at start of file. */ if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int), SEEK_CUR) != 0) { /* even a failed backwards fetch gets you out of eof state */ readptr->eof_reached = false; Assert(!state->truncated); return NULL; } tuplen = getlen(state, false); if (readptr->eof_reached) { readptr->eof_reached = false; /* We will return the tuple returned before returning NULL */ } else { /* * Back up to get ending length word of tuple before it. */ if (BufFileSeek(state->myfile, 0, -(long) (tuplen + 2 * sizeof(unsigned int)), SEEK_CUR) != 0) { /* * If that fails, presumably the prev tuple is the first * in the file. Back up so that it becomes next to read * in forward direction (not obviously right, but that is * what in-memory case does). */ if (BufFileSeek(state->myfile, 0, -(long) (tuplen + sizeof(unsigned int)), SEEK_CUR) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); Assert(!state->truncated); return NULL; } tuplen = getlen(state, false); } /* * Now we have the length of the prior tuple, back up and read it. * Note: READTUP expects we are positioned after the initial * length word of the tuple, so back up to that point. */ if (BufFileSeek(state->myfile, 0, -(long) tuplen, SEEK_CUR) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); tup = READTUP(state, tuplen); return tup; default: elog(ERROR, "invalid tuplestore state"); return NULL; /* keep compiler quiet */ } } /* * tuplestore_gettupleslot - exported function to fetch a MinimalTuple * * If successful, put tuple in slot and return true; else, clear the slot * and return false. * * If copy is true, the slot receives a copied tuple (allocated in current * memory context) that will stay valid regardless of future manipulations of * the tuplestore's state. If copy is false, the slot may just receive a * pointer to a tuple held within the tuplestore. The latter is more * efficient but the slot contents may be corrupted if additional writes to * the tuplestore occur. (If using tuplestore_trim, see comments therein.) */ bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot) { MinimalTuple tuple; bool should_free; tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free); if (tuple) { if (copy && !should_free) { tuple = heap_copy_minimal_tuple(tuple); should_free = true; } ExecStoreMinimalTuple(tuple, slot, should_free); return true; } else { ExecClearTuple(slot); return false; } } /* * tuplestore_advance - exported function to adjust position without fetching * * We could optimize this case to avoid palloc/pfree overhead, but for the * moment it doesn't seem worthwhile. */ bool tuplestore_advance(Tuplestorestate *state, bool forward) { void *tuple; bool should_free; tuple = tuplestore_gettuple(state, forward, &should_free); if (tuple) { if (should_free) pfree(tuple); return true; } else { return false; } } /* * Advance over N tuples in either forward or back direction, * without returning any data. N<=0 is a no-op. * Returns true if successful, false if ran out of tuples. */ bool tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward) { TSReadPointer *readptr = &state->readptrs[state->activeptr]; Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); if (ntuples <= 0) return true; switch (state->status) { case TSS_INMEM: if (forward) { if (readptr->eof_reached) return false; if (state->memtupcount - readptr->current >= ntuples) { readptr->current += ntuples; return true; } readptr->current = state->memtupcount; readptr->eof_reached = true; return false; } else { if (readptr->eof_reached) { readptr->current = state->memtupcount; readptr->eof_reached = false; ntuples--; } if (readptr->current - state->memtupdeleted > ntuples) { readptr->current -= ntuples; return true; } Assert(!state->truncated); readptr->current = state->memtupdeleted; return false; } break; default: /* We don't currently try hard to optimize other cases */ while (ntuples-- > 0) { void *tuple; bool should_free; tuple = tuplestore_gettuple(state, forward, &should_free); if (tuple == NULL) return false; if (should_free) pfree(tuple); CHECK_FOR_INTERRUPTS(); } return true; } } /* * dumptuples - remove tuples from memory and write to tape * * As a side effect, we must convert each read pointer's position from * "current" to file/offset format. But eof_reached pointers don't * need to change state. */ static void dumptuples(Tuplestorestate *state) { int i; for (i = state->memtupdeleted;; i++) { TSReadPointer *readptr = state->readptrs; int j; for (j = 0; j < state->readptrcount; readptr++, j++) { if (i == readptr->current && !readptr->eof_reached) BufFileTell(state->myfile, &readptr->file, &readptr->offset); } if (i >= state->memtupcount) break; WRITETUP(state, state->memtuples[i]); } state->memtupdeleted = 0; state->memtupcount = 0; } /* * tuplestore_rescan - rewind the active read pointer to start */ void tuplestore_rescan(Tuplestorestate *state) { TSReadPointer *readptr = &state->readptrs[state->activeptr]; Assert(readptr->eflags & EXEC_FLAG_REWIND); Assert(!state->truncated); switch (state->status) { case TSS_INMEM: readptr->eof_reached = false; readptr->current = 0; break; case TSS_WRITEFILE: readptr->eof_reached = false; readptr->file = 0; readptr->offset = 0L; break; case TSS_READFILE: readptr->eof_reached = false; if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); break; default: elog(ERROR, "invalid tuplestore state"); break; } } /* * tuplestore_copy_read_pointer - copy a read pointer's state to another */ void tuplestore_copy_read_pointer(Tuplestorestate *state, int srcptr, int destptr) { TSReadPointer *sptr = &state->readptrs[srcptr]; TSReadPointer *dptr = &state->readptrs[destptr]; Assert(srcptr >= 0 && srcptr < state->readptrcount); Assert(destptr >= 0 && destptr < state->readptrcount); /* Assigning to self is a no-op */ if (srcptr == destptr) return; if (dptr->eflags != sptr->eflags) { /* Possible change of overall eflags, so copy and then recompute */ int eflags; int i; *dptr = *sptr; eflags = state->readptrs[0].eflags; for (i = 1; i < state->readptrcount; i++) eflags |= state->readptrs[i].eflags; state->eflags = eflags; } else *dptr = *sptr; switch (state->status) { case TSS_INMEM: case TSS_WRITEFILE: /* no work */ break; case TSS_READFILE: /* * This case is a bit tricky since the active read pointer's * position corresponds to the seek point, not what is in its * variables. Assigning to the active requires a seek, and * assigning from the active requires a tell, except when * eof_reached. */ if (destptr == state->activeptr) { if (dptr->eof_reached) { if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); } else { if (BufFileSeek(state->myfile, dptr->file, dptr->offset, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in tuplestore temporary file"))); } } else if (srcptr == state->activeptr) { if (!dptr->eof_reached) BufFileTell(state->myfile, &dptr->file, &dptr->offset); } break; default: elog(ERROR, "invalid tuplestore state"); break; } } /* * tuplestore_trim - remove all no-longer-needed tuples * * Calling this function authorizes the tuplestore to delete all tuples * before the oldest read pointer, if no read pointer is marked as requiring * REWIND capability. * * Note: this is obviously safe if no pointer has BACKWARD capability either. * If a pointer is marked as BACKWARD but not REWIND capable, it means that * the pointer can be moved backward but not before the oldest other read * pointer. */ void tuplestore_trim(Tuplestorestate *state) { int oldest; int nremove; int i; /* * Truncation is disallowed if any read pointer requires rewind * capability. */ if (state->eflags & EXEC_FLAG_REWIND) return; /* * We don't bother trimming temp files since it usually would mean more * work than just letting them sit in kernel buffers until they age out. */ if (state->status != TSS_INMEM) return; /* Find the oldest read pointer */ oldest = state->memtupcount; for (i = 0; i < state->readptrcount; i++) { if (!state->readptrs[i].eof_reached) oldest = Min(oldest, state->readptrs[i].current); } /* * Note: you might think we could remove all the tuples before the oldest * "current", since that one is the next to be returned. However, since * tuplestore_gettuple returns a direct pointer to our internal copy of * the tuple, it's likely that the caller has still got the tuple just * before "current" referenced in a slot. So we keep one extra tuple * before the oldest "current". (Strictly speaking, we could require such * callers to use the "copy" flag to tuplestore_gettupleslot, but for * efficiency we allow this one case to not use "copy".) */ nremove = oldest - 1; if (nremove <= 0) return; /* nothing to do */ Assert(nremove >= state->memtupdeleted); Assert(nremove <= state->memtupcount); /* Release no-longer-needed tuples */ for (i = state->memtupdeleted; i < nremove; i++) { FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i])); pfree(state->memtuples[i]); state->memtuples[i] = NULL; } state->memtupdeleted = nremove; /* mark tuplestore as truncated (used for Assert crosschecks only) */ state->truncated = true; /* * If nremove is less than 1/8th memtupcount, just stop here, leaving the * "deleted" slots as NULL. This prevents us from expending O(N^2) time * repeatedly memmove-ing a large pointer array. The worst case space * wastage is pretty small, since it's just pointers and not whole tuples. */ if (nremove < state->memtupcount / 8) return; /* * Slide the array down and readjust pointers. * * In mergejoin's current usage, it's demonstrable that there will always * be exactly one non-removed tuple; so optimize that case. */ if (nremove + 1 == state->memtupcount) state->memtuples[0] = state->memtuples[nremove]; else memmove(state->memtuples, state->memtuples + nremove, (state->memtupcount - nremove) * sizeof(void *)); state->memtupdeleted = 0; state->memtupcount -= nremove; for (i = 0; i < state->readptrcount; i++) { if (!state->readptrs[i].eof_reached) state->readptrs[i].current -= nremove; } } /* * tuplestore_in_memory * * Returns true if the tuplestore has not spilled to disk. * * XXX exposing this is a violation of modularity ... should get rid of it. */ bool tuplestore_in_memory(Tuplestorestate *state) { return (state->status == TSS_INMEM); } /* * Tape interface routines */ static unsigned int getlen(Tuplestorestate *state, bool eofOK) { unsigned int len; size_t nbytes; nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len)); if (nbytes == sizeof(len)) return len; if (nbytes != 0 || !eofOK) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes", nbytes, sizeof(len)))); return 0; } /* * Routines specialized for HeapTuple case * * The stored form is actually a MinimalTuple, but for largely historical * reasons we allow COPYTUP to work from a HeapTuple. * * Since MinimalTuple already has length in its first word, we don't need * to write that separately. */ static void * copytup_heap(Tuplestorestate *state, void *tup) { MinimalTuple tuple; tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup); USEMEM(state, GetMemoryChunkSpace(tuple)); return (void *) tuple; } static void writetup_heap(Tuplestorestate *state, void *tup) { MinimalTuple tuple = (MinimalTuple) tup; /* the part of the MinimalTuple we'll write: */ char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; /* total on-disk footprint: */ unsigned int tuplen = tupbodylen + sizeof(int); BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen)); BufFileWrite(state->myfile, (void *) tupbody, tupbodylen); if (state->backward) /* need trailing length word? */ BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen)); FREEMEM(state, GetMemoryChunkSpace(tuple)); heap_free_minimal_tuple(tuple); } static void * readtup_heap(Tuplestorestate *state, unsigned int len) { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; MinimalTuple tuple = (MinimalTuple) palloc(tuplen); char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; size_t nread; USEMEM(state, GetMemoryChunkSpace(tuple)); /* read in the tuple proper */ tuple->t_len = tuplen; nread = BufFileRead(state->myfile, (void *) tupbody, tupbodylen); if (nread != (size_t) tupbodylen) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes", nread, (size_t) tupbodylen))); if (state->backward) /* need trailing length word? */ { nread = BufFileRead(state->myfile, (void *) &tuplen, sizeof(tuplen)); if (nread != sizeof(tuplen)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes", nread, sizeof(tuplen)))); } return (void *) tuple; }