Simplify the code for logical tape read buffers.

Pass the buffer size as argument to LogicalTapeRewindForRead, rather than
setting it earlier with the separate LogicTapeAssignReadBufferSize call.
This way, the buffer size is set closer to where it's actually used, which
makes the code easier to understand.

This makes the calculation for how much memory to use for the buffers less
precise. We now use the same amount of memory for every tape, rounded down
to the nearest BLCKSZ boundary, instead of using one more block for some
tapes, to get the total up to exact amount of memory available. That should
be OK, merging isn't too sensitive to the exact amount of memory used.

Reviewed by Peter Geoghegan

Discussion: <0f607c4b-df23-353e-bf56-c0389d28495f@iki.fi>
This commit is contained in:
Heikki Linnakangas 2016-10-12 12:05:45 +03:00
parent 2f1eaf87e8
commit b75f467b6e
3 changed files with 139 additions and 200 deletions

View File

@ -141,14 +141,6 @@ typedef struct LogicalTape
long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
/*
* Desired buffer size to use when reading. To keep things simple, we use
* a single-block buffer when writing, or when reading a frozen tape. But
* when we are reading and will only read forwards, we allocate a larger
* buffer, determined by read_buffer_size.
*/
int read_buffer_size;
} LogicalTape;
/*
@ -609,7 +601,6 @@ LogicalTapeSetCreate(int ntapes)
lt->lastBlockBytes = 0;
lt->buffer = NULL;
lt->buffer_size = 0;
lt->read_buffer_size = BLCKSZ;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
@ -739,13 +730,20 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
}
/*
* Rewind logical tape and switch from writing to reading or vice versa.
* Rewind logical tape and switch from writing to reading.
*
* Unless the tape has been "frozen" in read state, forWrite must be the
* opposite of the previous tape state.
* The tape must currently be in writing state, or "frozen" in read state.
*
* 'buffer_size' specifies how much memory to use for the read buffer. It
* does not include the memory needed for the indirect blocks. Regardless
* of the argument, the actual amount of memory used is between BLCKSZ and
* MaxAllocSize, and is a multiple of BLCKSZ. The given value is rounded
* down and truncated to fit those constraints, if necessary. If the tape
* is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ byte
* buffer is used.
*/
void
LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
{
LogicalTape *lt;
long datablocknum;
@ -753,88 +751,112 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
if (!forWrite)
/*
* Round and cap buffer_size if needed.
*/
if (lt->frozen)
buffer_size = BLCKSZ;
else
{
if (lt->writing)
{
/*
* Completion of a write phase. Flush last partial data block,
* flush any partial indirect blocks, rewind for normal
* (destructive) read.
*/
if (lt->dirty)
ltsDumpBuffer(lts, lt);
lt->lastBlockBytes = lt->nbytes;
lt->writing = false;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
}
else
{
/*
* This is only OK if tape is frozen; we rewind for (another) read
* pass.
*/
Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
/* need at least one block */
if (buffer_size < BLCKSZ)
buffer_size = BLCKSZ;
/* Allocate a read buffer (unless the tape is empty) */
if (lt->buffer)
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
if (datablocknum != -1L)
{
lt->buffer = palloc(lt->read_buffer_size);
lt->buffer_size = lt->read_buffer_size;
}
/*
* palloc() larger than MaxAllocSize would fail (a multi-gigabyte
* buffer is unlikely to be helpful, anyway)
*/
if (buffer_size > MaxAllocSize)
buffer_size = MaxAllocSize;
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
ltsReadFillBuffer(lts, lt, datablocknum);
/* round down to BLCKSZ boundary */
buffer_size -= buffer_size % BLCKSZ;
}
if (lt->writing)
{
/*
* Completion of a write phase. Flush last partial data block, flush
* any partial indirect blocks, rewind for normal (destructive) read.
*/
if (lt->dirty)
ltsDumpBuffer(lts, lt);
lt->lastBlockBytes = lt->nbytes;
lt->writing = false;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
}
else
{
/*
* Completion of a read phase. Rewind and prepare for write.
*
* NOTE: we assume the caller has read the tape to the end; otherwise
* untouched data and indirect blocks will not have been freed. We
* could add more code to free any unread blocks, but in current usage
* of this module it'd be useless code.
* This is only OK if tape is frozen; we rewind for (another) read
* pass.
*/
IndirectBlock *ib,
*nextib;
Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
Assert(!lt->writing && !lt->frozen);
/* Must truncate the indirect-block hierarchy down to one level. */
if (lt->indirect)
{
for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
{
nextib = ib->nextup;
pfree(ib);
}
lt->indirect->nextSlot = 0;
lt->indirect->nextup = NULL;
}
lt->writing = true;
lt->dirty = false;
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
/* Allocate a read buffer (unless the tape is empty) */
if (lt->buffer)
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
if (datablocknum != -1L)
{
lt->buffer = palloc(buffer_size);
lt->buffer_size = buffer_size;
}
if (lt->buffer)
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
ltsReadFillBuffer(lts, lt, datablocknum);
}
/*
* Rewind logical tape and switch from reading to writing.
*
* NOTE: we assume the caller has read the tape to the end; otherwise
* untouched data and indirect blocks will not have been freed. We
* could add more code to free any unread blocks, but in current usage
* of this module it'd be useless code.
*/
void
LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
IndirectBlock *ib,
*nextib;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
Assert(!lt->writing && !lt->frozen);
/* Must truncate the indirect-block hierarchy down to one level. */
if (lt->indirect)
{
for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
{
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
nextib = ib->nextup;
pfree(ib);
}
lt->indirect->nextSlot = 0;
lt->indirect->nextup = NULL;
}
lt->writing = true;
lt->dirty = false;
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (lt->buffer)
{
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
}
}
@ -1105,28 +1127,3 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
return lts->nFileBlocks;
}
/*
* Set buffer size to use, when reading from given tape.
*/
void
LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
/*
* The buffer size must be a multiple of BLCKSZ in size, so round the
* given value down to nearest BLCKSZ. Make sure we have at least one
* page. Also, don't go above MaxAllocSize, to avoid erroring out. A
* multi-gigabyte buffer is unlikely to be helpful, anyway.
*/
if (avail_mem < BLCKSZ)
avail_mem = BLCKSZ;
if (avail_mem > MaxAllocSize)
avail_mem = MaxAllocSize;
avail_mem -= avail_mem % BLCKSZ;
lt->read_buffer_size = avail_mem;
}

View File

@ -366,6 +366,9 @@ struct Tuplesortstate
char *slabMemoryEnd; /* end of slab memory arena */
SlabSlot *slabFreeHead; /* head of free list */
/* Buffer size to use for reading input tapes, during merge. */
size_t read_buffer_size;
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX, that
* came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
@ -579,7 +582,6 @@ static bool useselection(Tuplesortstate *state);
static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void init_tape_buffers(Tuplesortstate *state, int numInputTapes);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
@ -2056,7 +2058,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* end of the sort anyway, but better to release the
* memory early.
*/
LogicalTapeRewind(state->tapeset, srcTape, true);
LogicalTapeRewindForWrite(state->tapeset, srcTape);
return true;
}
newtup.tupindex = srcTape;
@ -2511,72 +2513,6 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
state->slabAllocatorUsed = true;
}
/*
* Divide all remaining work memory (availMem) as read buffers, for all
* the tapes that will be used during the merge.
*
* We use the number of possible *input* tapes here, rather than maxTapes,
* for the calculation. At all times, we'll be reading from at most
* numInputTapes tapes, and one tape is used for output (unless we do an
* on-the-fly final merge, in which case we don't have an output tape).
*/
static void
init_tape_buffers(Tuplesortstate *state, int numInputTapes)
{
int64 availBlocks;
int64 blocksPerTape;
int remainder;
int tapenum;
/*
* Divide availMem evenly among the number of input tapes.
*/
availBlocks = state->availMem / BLCKSZ;
blocksPerTape = availBlocks / numInputTapes;
remainder = availBlocks % numInputTapes;
USEMEM(state, availBlocks * BLCKSZ);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
(availBlocks * BLCKSZ) / 1024, numInputTapes);
#endif
/*
* Use one page per tape, even if we are out of memory.
* tuplesort_merge_order() should've chosen the number of tapes so that
* this can't happen, but better safe than sorry. (This also protects
* from a negative availMem.)
*/
if (blocksPerTape < 1)
{
blocksPerTape = 1;
remainder = 0;
}
/*
* Set the buffers for the tapes.
*
* In a multi-phase merge, the tape that is initially used as an output
* tape, will later be rewound and read from, and should also use a large
* buffer at that point. So we must loop up to maxTapes, not just
* numInputTapes!
*
* If there are fewer runs than tapes, we will set the buffer size also
* for tapes that will go completely unused, but that's harmless.
* LogicalTapeAssignReadBufferSize() doesn't allocate the buffer
* immediately, it just sets the size that will be used, when the tape is
* rewound for read, and the tape isn't empty.
*/
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0);
LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
numBlocks * BLCKSZ);
}
}
/*
* mergeruns -- merge all the completed initial runs.
*
@ -2679,25 +2615,32 @@ mergeruns(Tuplesortstate *state)
}
/*
* Use all the spare memory we have available for read buffers for the
* tapes.
* Use all the spare memory we have available for read buffers among the
* input tapes.
*
* We do this only after checking for the case that we produced only one
* initial run, because there is no need to use a large read buffer when
* we're reading from a single tape. With one tape, the I/O pattern will
* be the same regardless of the buffer size.
*
* We don't try to "rebalance" the amount of memory among tapes, when we
* start a new merge phase, even if some tapes can be inactive in the
* phase. That would be hard, because logtape.c doesn't know where one
* run ends and another begins. When a new merge phase begins, and a tape
* doesn't participate in it, its buffer nevertheless already contains
* tuples from the next run on same tape, so we cannot release the buffer.
* That's OK in practice, merge performance isn't that sensitive to the
* amount of buffers used, and most merge phases use all or almost all
* tapes, anyway.
* We don't try to "rebalance" the memory among tapes, when we start a new
* merge phase, even if some tapes are inactive in the new phase. That
* would be hard, because logtape.c doesn't know where one run ends and
* another begins. When a new merge phase begins, and a tape doesn't
* participate in it, its buffer nevertheless already contains tuples from
* the next run on same tape, so we cannot release the buffer. That's OK
* in practice, merge performance isn't that sensitive to the amount of
* buffers used, and most merge phases use all or almost all tapes,
* anyway.
*/
init_tape_buffers(state, numInputTapes);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
(state->availMem) / 1024, numInputTapes);
#endif
state->read_buffer_size = state->availMem / numInputTapes;
USEMEM(state, state->availMem);
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
@ -2709,7 +2652,7 @@ mergeruns(Tuplesortstate *state)
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
for (;;)
{
@ -2772,11 +2715,10 @@ mergeruns(Tuplesortstate *state)
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
false);
LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
state->read_buffer_size);
/* rewind used-up input tape P, and prepare it for write pass */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
true);
LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
state->tp_runs[state->tapeRange - 1] = 0;
/*
@ -2812,7 +2754,7 @@ mergeruns(Tuplesortstate *state)
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
if (tapenum != state->result_tape)
LogicalTapeRewind(state->tapeset, tapenum, true);
LogicalTapeRewindForWrite(state->tapeset, tapenum);
}
}
@ -3174,9 +3116,9 @@ tuplesort_rescan(Tuplesortstate *state)
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
LogicalTapeRewind(state->tapeset,
state->result_tape,
false);
LogicalTapeRewindForRead(state->tapeset,
state->result_tape,
0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;

View File

@ -31,7 +31,9 @@ extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size);
extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size);
extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
size_t buffer_size);
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
@ -39,8 +41,6 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum,
size_t bufsize);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */