Simplify tape block format.

No more indirect blocks. The blocks form a linked list instead.

This saves some memory, because we don't need to have a buffer in memory to
hold the indirect block (or blocks). To reflect that, TAPE_BUFFER_OVERHEAD
is reduced from 3 to 1 buffer, which allows using more memory for building
the initial runs.

Reviewed by Peter Geoghegan and Robert Haas.

Discussion: https://www.postgresql.org/message-id/34678beb-938e-646e-db9f-a7def5c44ada%40iki.fi
This commit is contained in:
Heikki Linnakangas 2016-12-22 18:45:00 +02:00
parent b86515da1a
commit 01ec25631f
3 changed files with 218 additions and 478 deletions

View File

@ -31,15 +31,8 @@
* in BLCKSZ-size blocks. Space allocation boils down to keeping track
* of which blocks in the underlying file belong to which logical tape,
* plus any blocks that are free (recycled and not yet reused).
* The blocks in each logical tape are remembered using a method borrowed
* from the Unix HFS filesystem: we store data block numbers in an
* "indirect block". If an indirect block fills up, we write it out to
* the underlying file and remember its location in a second-level indirect
* block. In the same way second-level blocks are remembered in third-
* level blocks, and so on if necessary (of course we're talking huge
* amounts of data here). The topmost indirect block of a given logical
* tape is never actually written out to the physical file, but all lower-
* level indirect blocks will be.
* The blocks in each logical tape form a chain, with a prev- and next-
* pointer in each block.
*
* The initial write pass is guaranteed to fill the underlying file
* perfectly sequentially, no matter how data is divided into logical tapes.
@ -87,58 +80,65 @@
#include "utils/memutils.h"
/*
* Block indexes are "long"s, so we can fit this many per indirect block.
* NB: we assume this is an exact fit!
* A TapeBlockTrailer is stored at the end of each BLCKSZ block.
*
* The first block of a tape has prev == -1. The last block of a tape
* stores the number of valid bytes on the block, inverted, in 'next'
* Therefore next < 0 indicates the last block.
*/
#define BLOCKS_PER_INDIR_BLOCK ((int) (BLCKSZ / sizeof(long)))
/*
* We use a struct like this for each active indirection level of each
* logical tape. If the indirect block is not the highest level of its
* tape, the "nextup" link points to the next higher level. Only the
* "ptrs" array is written out if we have to dump the indirect block to
* disk. If "ptrs" is not completely full, we store -1L in the first
* unused slot at completion of the write phase for the logical tape.
*/
typedef struct IndirectBlock
typedef struct TapeBlockTrailer
{
int nextSlot; /* next pointer slot to write or read */
struct IndirectBlock *nextup; /* parent indirect level, or NULL if
* top */
long ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */
} IndirectBlock;
long prev; /* previous block on this tape, or -1 on first
* block */
long next; /* next block on this tape, or # of valid
* bytes on last block (if < 0) */
} TapeBlockTrailer;
#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
#define TapeBlockGetTrailer(buf) \
((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
#define TapeBlockGetNBytes(buf) \
(TapeBlockIsLast(buf) ? \
(- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
#define TapeBlockSetNBytes(buf, nbytes) \
(TapeBlockGetTrailer(buf)->next = -(nbytes))
/*
* This data structure represents a single "logical tape" within the set
* of logical tapes stored in the same file. We must keep track of the
* current partially-read-or-written data block as well as the active
* indirect block level(s).
* of logical tapes stored in the same file.
*
* While writing, we hold the current partially-written data block in the
* buffer. While reading, we can hold multiple blocks in the buffer. Note
* that we don't retain the trailers of a block when it's read into the
* buffer. The buffer therefore contains one large contiguous chunk of data
* from the tape.
*/
typedef struct LogicalTape
{
IndirectBlock *indirect; /* bottom of my indirect-block hierarchy */
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
/*
* The total data volume in the logical tape is numFullBlocks * BLCKSZ +
* lastBlockBytes. BUT: we do not update lastBlockBytes during writing,
* only at completion of a write phase.
* Block numbers of the first, current, and next block of the tape.
*
* The "current" block number is only valid when writing, or reading from
* a frozen tape. (When reading from an unfrozen tape, we use a larger
* read buffer that holds multiple blocks, so the "current" block is
* ambiguous.)
*/
long numFullBlocks; /* number of complete blocks in log tape */
int lastBlockBytes; /* valid bytes in last (incomplete) block */
long firstBlockNumber;
long curBlockNumber;
long nextBlockNumber;
/*
* Buffer for current data block. Note we don't bother to store the
* actual file block number of the data block (during the write phase it
* hasn't been assigned yet, and during read we don't care anymore). But
* we do need the relative block number so we can detect end-of-tape while
* reading.
* Buffer for current data block(s).
*/
char *buffer; /* physical buffer (separately palloc'd) */
int buffer_size; /* allocated size of the buffer */
long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
} LogicalTape;
@ -182,19 +182,6 @@ static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
long blocknum);
static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool freezing);
static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect);
static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool frozen);
static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect);
static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
/*
@ -237,46 +224,40 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
/*
* Read as many blocks as we can into the per-tape buffer.
*
* The caller can specify the next physical block number to read, in
* datablocknum, or -1 to fetch the next block number from the internal block.
* If datablocknum == -1, the caller must've already set curBlockNumber.
*
* Returns true if anything was read, 'false' on EOF.
*/
static bool
ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
lt->pos = 0;
lt->nbytes = 0;
do
{
/* Fetch next block number (unless provided by caller) */
if (datablocknum == -1)
{
datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
if (datablocknum == -1L)
break; /* EOF */
lt->curBlockNumber++;
}
char *thisbuf = lt->buffer + lt->nbytes;
/* Fetch next block number */
if (lt->nextBlockNumber == -1L)
break; /* EOF */
/* Read the block */
ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
if (!lt->frozen)
ltsReleaseBlock(lts, datablocknum);
ltsReleaseBlock(lts, lt->nextBlockNumber);
lt->curBlockNumber = lt->nextBlockNumber;
if (lt->curBlockNumber < lt->numFullBlocks)
lt->nbytes += BLCKSZ;
else
lt->nbytes += TapeBlockGetNBytes(thisbuf);
if (TapeBlockIsLast(thisbuf))
{
lt->nextBlockNumber = -1L;
/* EOF */
lt->nbytes += lt->lastBlockBytes;
break;
}
else
lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
/* Advance to next block, if we have buffer space left */
datablocknum = -1;
} while (lt->nbytes < lt->buffer_size);
} while (lt->buffer_size - lt->nbytes > BLCKSZ);
return (lt->nbytes > 0);
}
@ -360,203 +341,6 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
lts->blocksSorted = false;
}
/*
* These routines manipulate indirect-block hierarchies. All are recursive
* so that they don't have any specific limit on the depth of hierarchy.
*/
/*
* Record a data block number in a logical tape's lowest indirect block,
* or record an indirect block's number in the next higher indirect level.
*/
static void
ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
long blocknum)
{
if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
{
/*
* This indirect block is full, so dump it out and recursively save
* its address in the next indirection level. Create a new
* indirection level if there wasn't one before.
*/
long indirblock = ltsGetFreeBlock(lts);
ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
if (indirect->nextup == NULL)
{
indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
indirect->nextup->nextSlot = 0;
indirect->nextup->nextup = NULL;
}
ltsRecordBlockNum(lts, indirect->nextup, indirblock);
/*
* Reset to fill another indirect block at this level.
*/
indirect->nextSlot = 0;
}
indirect->ptrs[indirect->nextSlot++] = blocknum;
}
/*
* Reset a logical tape's indirect-block hierarchy after a write pass
* to prepare for reading. We dump out partly-filled blocks except
* at the top of the hierarchy, and we rewind each level to the start.
* This call returns the first data block number, or -1L if the tape
* is empty.
*
* Unless 'freezing' is true, release indirect blocks to the free pool after
* reading them.
*/
static long
ltsRewindIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool freezing)
{
/* Handle case of never-written-to tape */
if (indirect == NULL)
return -1L;
/* Insert sentinel if block is not full */
if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
indirect->ptrs[indirect->nextSlot] = -1L;
/*
* If block is not topmost, write it out, and recurse to obtain address of
* first block in this hierarchy level. Read that one in.
*/
if (indirect->nextup != NULL)
{
long indirblock = ltsGetFreeBlock(lts);
ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
ltsRecordBlockNum(lts, indirect->nextup, indirblock);
indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
Assert(indirblock != -1L);
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
if (!freezing)
ltsReleaseBlock(lts, indirblock);
}
/*
* Reset my next-block pointer, and then fetch a block number if any.
*/
indirect->nextSlot = 0;
if (indirect->ptrs[0] == -1L)
return -1L;
return indirect->ptrs[indirect->nextSlot++];
}
/*
* Rewind a previously-frozen indirect-block hierarchy for another read pass.
* This call returns the first data block number, or -1L if the tape
* is empty.
*/
static long
ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect)
{
/* Handle case of never-written-to tape */
if (indirect == NULL)
return -1L;
/*
* If block is not topmost, recurse to obtain address of first block in
* this hierarchy level. Read that one in.
*/
if (indirect->nextup != NULL)
{
long indirblock;
indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
Assert(indirblock != -1L);
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
}
/*
* Reset my next-block pointer, and then fetch a block number if any.
*/
indirect->nextSlot = 0;
if (indirect->ptrs[0] == -1L)
return -1L;
return indirect->ptrs[indirect->nextSlot++];
}
/*
* Obtain next data block number in the forward direction, or -1L if no more.
*
* Unless 'frozen' is true, release indirect blocks to the free pool after
* reading them.
*/
static long
ltsRecallNextBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool frozen)
{
/* Handle case of never-written-to tape */
if (indirect == NULL)
return -1L;
if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
indirect->ptrs[indirect->nextSlot] == -1L)
{
long indirblock;
if (indirect->nextup == NULL)
return -1L; /* nothing left at this level */
indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
if (indirblock == -1L)
return -1L; /* nothing left at this level */
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
if (!frozen)
ltsReleaseBlock(lts, indirblock);
indirect->nextSlot = 0;
}
if (indirect->ptrs[indirect->nextSlot] == -1L)
return -1L;
return indirect->ptrs[indirect->nextSlot++];
}
/*
* Obtain next data block number in the reverse direction, or -1L if no more.
*
* Note this fetches the block# before the one last returned, no matter which
* direction of call returned that one. If we fail, no change in state.
*
* This routine can only be used in 'frozen' state, so there's no need to
* pass a parameter telling whether to release blocks ... we never do.
*/
static long
ltsRecallPrevBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect)
{
/* Handle case of never-written-to tape */
if (indirect == NULL)
return -1L;
if (indirect->nextSlot <= 1)
{
long indirblock;
if (indirect->nextup == NULL)
return -1L; /* nothing left at this level */
indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
if (indirblock == -1L)
return -1L; /* nothing left at this level */
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
/*
* The previous block would only have been written out if full, so we
* need not search it for a -1 sentinel.
*/
indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
}
indirect->nextSlot--;
return indirect->ptrs[indirect->nextSlot - 1];
}
/*
* Create a set of logical tapes in a temporary underlying file.
*
@ -585,23 +369,21 @@ LogicalTapeSetCreate(int ntapes)
lts->nTapes = ntapes;
/*
* Initialize per-tape structs. Note we allocate the I/O buffer and
* first-level indirect block for a tape only when it is first actually
* written to. This avoids wasting memory space when tuplesort.c
* overestimates the number of tapes needed.
* Initialize per-tape structs. Note we allocate the I/O buffer and the
* first block for a tape only when it is first actually written to. This
* avoids wasting memory space when tuplesort.c overestimates the number
* of tapes needed.
*/
for (i = 0; i < ntapes; i++)
{
lt = &lts->tapes[i];
lt->indirect = NULL;
lt->writing = true;
lt->frozen = false;
lt->dirty = false;
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
lt->buffer = NULL;
lt->buffer_size = 0;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
}
@ -615,19 +397,12 @@ void
LogicalTapeSetClose(LogicalTapeSet *lts)
{
LogicalTape *lt;
IndirectBlock *ib,
*nextib;
int i;
BufFileClose(lts->pfile);
for (i = 0; i < lts->nTapes; i++)
{
lt = &lts->tapes[i];
for (ib = lt->indirect; ib != NULL; ib = nextib)
{
nextib = ib->nextup;
pfree(ib);
}
if (lt->buffer)
pfree(lt->buffer);
}
@ -650,21 +425,6 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
lts->forgetFreeSpace = true;
}
/*
* Dump the dirty buffer of a logical tape.
*/
static void
ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
long datablock = ltsGetFreeBlock(lts);
Assert(lt->dirty);
ltsWriteBlock(lts, datablock, (void *) lt->buffer);
ltsRecordBlockNum(lts, lt->indirect, datablock);
lt->dirty = false;
/* Caller must do other state update as needed */
}
/*
* Write to a logical tape.
*
@ -681,39 +441,55 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
lt = &lts->tapes[tapenum];
Assert(lt->writing);
/* Allocate data buffer and first indirect block on first write */
/* Allocate data buffer and first block on first write */
if (lt->buffer == NULL)
{
lt->buffer = (char *) palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
}
if (lt->indirect == NULL)
if (lt->curBlockNumber == -1)
{
lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
lt->indirect->nextSlot = 0;
lt->indirect->nextup = NULL;
Assert(lt->firstBlockNumber == -1);
Assert(lt->pos == 0);
lt->curBlockNumber = ltsGetFreeBlock(lts);
lt->firstBlockNumber = lt->curBlockNumber;
TapeBlockGetTrailer(lt->buffer)->prev = -1L;
}
Assert(lt->buffer_size == BLCKSZ);
while (size > 0)
{
if (lt->pos >= BLCKSZ)
if (lt->pos >= TapeBlockPayloadSize)
{
/* Buffer full, dump it out */
if (lt->dirty)
ltsDumpBuffer(lts, lt);
else
long nextBlockNumber;
if (!lt->dirty)
{
/* Hmm, went directly from reading to writing? */
elog(ERROR, "invalid logtape state: should be dirty");
}
lt->numFullBlocks++;
lt->curBlockNumber++;
/*
* First allocate the next block, so that we can store it in the
* 'next' pointer of this block.
*/
nextBlockNumber = ltsGetFreeBlock(lts);
/* set the next-pointer and dump the current block. */
TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
/* initialize the prev-pointer of the next block */
TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
lt->curBlockNumber = nextBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
}
nthistime = BLCKSZ - lt->pos;
nthistime = TapeBlockPayloadSize - lt->pos;
if (nthistime > size)
nthistime = size;
Assert(nthistime > 0);
@ -734,19 +510,17 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
*
* The tape must currently be in writing state, or "frozen" in read state.
*
* 'buffer_size' specifies how much memory to use for the read buffer. It
* does not include the memory needed for the indirect blocks. Regardless
* of the argument, the actual amount of memory used is between BLCKSZ and
* MaxAllocSize, and is a multiple of BLCKSZ. The given value is rounded
* down and truncated to fit those constraints, if necessary. If the tape
* is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ byte
* buffer is used.
* 'buffer_size' specifies how much memory to use for the read buffer.
* Regardless of the argument, the actual amount of memory used is between
* BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
* rounded down and truncated to fit those constraints, if necessary. If the
* tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
* byte buffer is used.
*/
void
LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
{
LogicalTape *lt;
long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
@ -776,14 +550,15 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
if (lt->writing)
{
/*
* Completion of a write phase. Flush last partial data block, flush
* any partial indirect blocks, rewind for normal (destructive) read.
* Completion of a write phase. Flush last partial data block, and
* rewind for normal (destructive) read.
*/
if (lt->dirty)
ltsDumpBuffer(lts, lt);
lt->lastBlockBytes = lt->nbytes;
{
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
}
lt->writing = false;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
}
else
{
@ -792,7 +567,6 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
* pass.
*/
Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
/* Allocate a read buffer (unless the tape is empty) */
@ -800,18 +574,17 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
if (datablocknum != -1L)
if (lt->firstBlockNumber != -1L)
{
lt->buffer = palloc(buffer_size);
lt->buffer_size = buffer_size;
}
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->nextBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
ltsReadFillBuffer(lts, lt, datablocknum);
ltsReadFillBuffer(lts, lt);
}
/*
@ -826,38 +599,21 @@ void
LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
IndirectBlock *ib,
*nextib;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
Assert(!lt->writing && !lt->frozen);
/* Must truncate the indirect-block hierarchy down to one level. */
if (lt->indirect)
{
for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
{
nextib = ib->nextup;
pfree(ib);
}
lt->indirect->nextSlot = 0;
lt->indirect->nextup = NULL;
}
lt->writing = true;
lt->dirty = false;
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->curBlockNumber = 0L;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
lt->pos = 0;
lt->nbytes = 0;
if (lt->buffer)
{
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
}
lt->buffer = NULL;
lt->buffer_size = 0;
}
/*
@ -882,7 +638,7 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
if (!ltsReadFillBuffer(lts, lt, -1))
if (!ltsReadFillBuffer(lts, lt))
break; /* EOF */
}
@ -917,22 +673,23 @@ void
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
Assert(lt->writing);
/*
* Completion of a write phase. Flush last partial data block, flush any
* partial indirect blocks, rewind for nondestructive read.
* Completion of a write phase. Flush last partial data block, and rewind
* for nondestructive read.
*/
if (lt->dirty)
ltsDumpBuffer(lts, lt);
lt->lastBlockBytes = lt->nbytes;
{
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
lt->writing = false;
}
lt->writing = false;
lt->frozen = true;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
/*
* The seek and backspace functions assume a single block read buffer.
@ -950,15 +707,18 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
}
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->curBlockNumber = lt->firstBlockNumber;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
{
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
}
if (lt->firstBlockNumber == -1L)
lt->nextBlockNumber = -1L;
ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
if (TapeBlockIsLast(lt->buffer))
lt->nextBlockNumber = -1L;
else
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
lt->nbytes = TapeBlockGetNBytes(lt->buffer);
}
/*
@ -969,15 +729,16 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
* random access during write, and an unfrozen read tape may have
* already discarded the desired data!
*
* Return value is TRUE if seek successful, FALSE if there isn't that much
* data before the current point (in which case there's no state change).
* Returns the number of bytes backed up. It can be less than the
* requested amount, if there isn't that much data before the current
* position. The tape is positioned to the beginning of the tape in
* that case.
*/
bool
size_t
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
{
LogicalTape *lt;
long nblocks;
int newpos;
size_t seekpos = 0;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
@ -990,45 +751,50 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
if (size <= (size_t) lt->pos)
{
lt->pos -= (int) size;
return true;
return size;
}
/*
* Not-so-easy case. Figure out whether it's possible at all.
* Not-so-easy case, have to walk back the chain of blocks. This
* implementation would be pretty inefficient for long seeks, but we
* really aren't doing that (a seek over one tuple is typical).
*/
size -= (size_t) lt->pos; /* part within this block */
nblocks = size / BLCKSZ;
size = size % BLCKSZ;
if (size)
seekpos = (size_t) lt->pos; /* part within this block */
while (size > seekpos)
{
nblocks++;
newpos = (int) (BLCKSZ - size);
}
else
newpos = 0;
if (nblocks > lt->curBlockNumber)
return false; /* a seek too far... */
long prev = TapeBlockGetTrailer(lt->buffer)->prev;
/*
* OK, we need to back up nblocks blocks. This implementation would be
* pretty inefficient for long seeks, but we really aren't expecting that
* (a seek over one tuple is typical).
*/
while (nblocks-- > 0)
{
long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
if (datablocknum == -1L)
elog(ERROR, "unexpected end of tape");
lt->curBlockNumber--;
if (nblocks == 0)
if (prev == -1L)
{
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
lt->nbytes = BLCKSZ;
/* Tried to back up beyond the beginning of tape. */
if (lt->curBlockNumber != lt->firstBlockNumber)
elog(ERROR, "unexpected end of tape");
lt->pos = 0;
return seekpos;
}
ltsReadBlock(lts, prev, (void *) lt->buffer);
if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
prev,
TapeBlockGetTrailer(lt->buffer)->next,
lt->curBlockNumber);
lt->nbytes = TapeBlockPayloadSize;
lt->curBlockNumber = prev;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
seekpos += TapeBlockPayloadSize;
}
lt->pos = newpos;
return true;
/*
* 'seekpos' can now be greater than 'size', because it points to the
* beginning the target block. The difference is the position within the
* page.
*/
lt->pos = seekpos - size;
return size;
}
/*
@ -1036,10 +802,10 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
*
* *Only* a frozen-for-read tape can be seeked.
*
* Return value is TRUE if seek successful, FALSE if there isn't that much
* data in the tape (in which case there's no state change).
* Must be called with a block/offset previously returned by
* LogicalTapeTell().
*/
bool
void
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset)
{
@ -1048,53 +814,20 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= BLCKSZ);
Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
Assert(lt->buffer_size == BLCKSZ);
/*
* Easy case for seek within current block.
*/
if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
if (blocknum != lt->curBlockNumber)
{
lt->pos = offset;
return true;
ltsReadBlock(lts, blocknum, (void *) lt->buffer);
lt->curBlockNumber = blocknum;
lt->nbytes = TapeBlockPayloadSize;
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
}
/*
* Not-so-easy case. Figure out whether it's possible at all.
*/
if (blocknum < 0 || blocknum > lt->numFullBlocks ||
(blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
return false;
/*
* OK, advance or back up to the target block. This implementation would
* be pretty inefficient for long seeks, but we really aren't expecting
* that (a seek over one tuple is typical).
*/
while (lt->curBlockNumber > blocknum)
{
long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
if (datablocknum == -1L)
elog(ERROR, "unexpected end of tape");
if (--lt->curBlockNumber == blocknum)
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
}
while (lt->curBlockNumber < blocknum)
{
long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
lt->frozen);
if (datablocknum == -1L)
elog(ERROR, "unexpected end of tape");
if (++lt->curBlockNumber == blocknum)
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
}
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
if (offset > lt->nbytes)
elog(ERROR, "invalid tape seek position");
lt->pos = offset;
return true;
}
/*

View File

@ -240,16 +240,16 @@ typedef enum
* Parameters for calculation of number of tapes to use --- see inittapes()
* and tuplesort_merge_order().
*
* In this calculation we assume that each tape will cost us about 3 blocks
* worth of buffer space (which is an underestimate for very large data
* volumes, but it's probably close enough --- see logtape.c).
* In this calculation we assume that each tape will cost us about 1 blocks
* worth of buffer space. This ignores the overhead of all the other data
* structures needed for each tape, but it's probably close enough.
*
* MERGE_BUFFER_SIZE is how much data we'd like to read from each input
* tape during a preread cycle (see discussion at top of file).
*/
#define MINORDER 6 /* minimum merge order */
#define MAXORDER 500 /* maximum merge order */
#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
#define TAPE_BUFFER_OVERHEAD BLCKSZ
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
/*
@ -1849,6 +1849,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
SortTuple *stup)
{
unsigned int tuplen;
size_t nmoved;
switch (state->status)
{
@ -1948,10 +1949,13 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* end of file; back up to fetch last tuple's ending length
* word. If seek fails we must have a completely empty file.
*/
if (!LogicalTapeBackspace(state->tapeset,
state->result_tape,
2 * sizeof(unsigned int)))
nmoved = LogicalTapeBackspace(state->tapeset,
state->result_tape,
2 * sizeof(unsigned int));
if (nmoved == 0)
return false;
else if (nmoved != 2 * sizeof(unsigned int))
elog(ERROR, "unexpected tape position");
state->eof_reached = false;
}
else
@ -1960,31 +1964,34 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Back up and fetch previously-returned tuple's ending length
* word. If seek fails, assume we are at start of file.
*/
if (!LogicalTapeBackspace(state->tapeset,
state->result_tape,
sizeof(unsigned int)))
nmoved = LogicalTapeBackspace(state->tapeset,
state->result_tape,
sizeof(unsigned int));
if (nmoved == 0)
return false;
else if (nmoved != sizeof(unsigned int))
elog(ERROR, "unexpected tape position");
tuplen = getlen(state, state->result_tape, false);
/*
* Back up to get ending length word of tuple before it.
*/
if (!LogicalTapeBackspace(state->tapeset,
state->result_tape,
tuplen + 2 * sizeof(unsigned int)))
nmoved = LogicalTapeBackspace(state->tapeset,
state->result_tape,
tuplen + 2 * sizeof(unsigned int));
if (nmoved == tuplen + sizeof(unsigned int))
{
/*
* If that fails, presumably the prev tuple is the first
* in the file. Back up so that it becomes next to read
* in forward direction (not obviously right, but that is
* what in-memory case does).
* We backed up over the previous tuple, but there was no
* ending length word before it. That means that the prev
* tuple is the first tuple in the file. It is now the
* next to read in forward direction (not obviously right,
* but that is what in-memory case does).
*/
if (!LogicalTapeBackspace(state->tapeset,
state->result_tape,
tuplen + sizeof(unsigned int)))
elog(ERROR, "bogus tuple length in backward scan");
return false;
}
else if (nmoved != tuplen + 2 * sizeof(unsigned int))
elog(ERROR, "bogus tuple length in backward scan");
}
tuplen = getlen(state, state->result_tape, false);
@ -1994,9 +2001,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
if (!LogicalTapeBackspace(state->tapeset,
state->result_tape,
tuplen))
nmoved = LogicalTapeBackspace(state->tapeset,
state->result_tape,
tuplen);
if (nmoved != tuplen)
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
@ -3183,11 +3191,10 @@ tuplesort_restorepos(Tuplesortstate *state)
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
if (!LogicalTapeSeek(state->tapeset,
state->result_tape,
state->markpos_block,
state->markpos_offset))
elog(ERROR, "tuplesort_restorepos failed");
LogicalTapeSeek(state->tapeset,
state->result_tape,
state->markpos_block,
state->markpos_offset);
state->eof_reached = state->markpos_eof;
break;
default:

View File

@ -35,9 +35,9 @@ extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
size_t buffer_size);
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);