Recent changes in memory management in tuplesort.c had a problem: the

case where we run low on array slots before we run low on memory is much
more probable than I had thought, and so it's important to treat each
tape fairly in that case.  To fix this, track per-tape slot allocations
just like we track per-tape space allocation.  Also, in the FINALMERGE
code path avoid scanning all the input tapes when we really only need to
read from one.  This should fix poor behavior with very large work_mem
as exhibited by Stefan Kaltenbrunner.
I didn't do anything about putting an upper bound on the number of tapes,
but maybe we should still consider that.
This commit is contained in:
Tom Lane 2006-03-10 23:19:00 +00:00
parent 9f6192490e
commit c65ab0bfa9
1 changed files with 102 additions and 82 deletions

View File

@ -91,7 +91,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.64 2006/03/08 16:59:03 tgl Exp $
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.65 2006/03/10 23:19:00 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -266,24 +266,22 @@ struct Tuplesortstate
* have not yet exhausted that run. mergenext[i] is the memtuples index
* of the next pre-read tuple (next to be loaded into the heap) for tape
* i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
* points to the last pre-read tuple from each tape. mergeavailmem[i] is
* the amount of unused space allocated for tape i. mergefreelist and
* mergefirstfree keep track of unused locations in the memtuples[] array.
* The memtuples[].tupindex fields link together pre-read tuples for each
* tape as well as recycled locations in mergefreelist. It is OK to use 0
* as a null link in these lists, because memtuples[0] is part of the
* merge heap and is never a pre-read tuple. mergeslotsfree counts the
* total number of free memtuples[] slots, both those in the freelist and
* those beyond mergefirstfree.
* points to the last pre-read tuple from each tape. mergeavailslots[i]
* is the number of unused memtuples[] slots reserved for tape i, and
* mergeavailmem[i] is the amount of unused space allocated for tape i.
* mergefreelist and mergefirstfree keep track of unused locations in the
* memtuples[] array. The memtuples[].tupindex fields link together
* pre-read tuples for each tape as well as recycled locations in
* mergefreelist. It is OK to use 0 as a null link in these lists, because
* memtuples[0] is part of the merge heap and is never a pre-read tuple.
*/
bool *mergeactive; /* Active input run source? */
bool *mergeactive; /* active input run source? */
int *mergenext; /* first preread tuple for each source */
int *mergelast; /* last preread tuple for each source */
long *mergeavailmem; /* availMem for prereading tapes */
long spacePerTape; /* actual per-tape target usage */
int *mergeavailslots; /* slots left for prereading each tape */
long *mergeavailmem; /* availMem for prereading each tape */
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
int mergeslotsfree; /* number of free slots during merge */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
@ -406,6 +404,7 @@ static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
static void mergepreread(Tuplesortstate *state);
static void mergeprereadone(Tuplesortstate *state, int srcTape);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex);
@ -1118,8 +1117,11 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
{
/*
* out of preloaded data on this tape, try to read more
*
* Unlike mergeonerun(), we only preload from the single
* tape that's run dry. See mergepreread() comments.
*/
mergepreread(state);
mergeprereadone(state, srcTape);
/*
* if still no data, we've reached end of run on this tape
@ -1136,7 +1138,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
/* put the now-unused memtuples entry on the freelist */
newtup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeslotsfree++;
state->mergeavailslots[srcTape]++;
return true;
}
return false;
@ -1290,6 +1292,7 @@ inittapes(Tuplesortstate *state)
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
@ -1561,7 +1564,7 @@ mergeonerun(Tuplesortstate *state)
/* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeslotsfree++;
state->mergeavailslots[srcTape]++;
}
/*
@ -1592,21 +1595,15 @@ beginmerge(Tuplesortstate *state)
int activeTapes;
int tapenum;
int srcTape;
int slotsPerTape;
long spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
/* Clear merge-pass state variables */
memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive));
memset(state->mergenext, 0, state->maxTapes * sizeof(*state->mergenext));
memset(state->mergelast, 0, state->maxTapes * sizeof(*state->mergelast));
memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = state->maxTapes; /* 1st slot avail for preread */
state->mergeslotsfree = state->memtupsize - state->mergefirstfree;
Assert(state->mergeslotsfree >= state->maxTapes);
/* Adjust run counts and mark the active tapes */
memset(state->mergeactive, 0,
state->maxTapes * sizeof(*state->mergeactive));
activeTapes = 0;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
@ -1623,16 +1620,29 @@ beginmerge(Tuplesortstate *state)
}
state->activeTapes = activeTapes;
/* Clear merge-pass state variables */
memset(state->mergenext, 0,
state->maxTapes * sizeof(*state->mergenext));
memset(state->mergelast, 0,
state->maxTapes * sizeof(*state->mergelast));
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space.
*/
Assert(activeTapes > 0);
state->spacePerTape = state->availMem / activeTapes;
slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
Assert(slotsPerTape > 0);
spacePerTape = state->availMem / activeTapes;
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (state->mergeactive[srcTape])
state->mergeavailmem[srcTape] = state->spacePerTape;
{
state->mergeavailslots[srcTape] = slotsPerTape;
state->mergeavailmem[srcTape] = spacePerTape;
}
}
/*
@ -1657,7 +1667,7 @@ beginmerge(Tuplesortstate *state)
/* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeslotsfree++;
state->mergeavailslots[srcTape]++;
}
}
}
@ -1670,73 +1680,83 @@ beginmerge(Tuplesortstate *state)
* active source tape until the tape's run is exhausted or it has used up
* its fair share of available memory. In any case, we guarantee that there
* is at least one preread tuple available from each unexhausted input tape.
*
* We invoke this routine at the start of a merge pass for initial load,
* and then whenever any tape's preread data runs out. Note that we load
* as much data as possible from all tapes, not just the one that ran out.
* This is because logtape.c works best with a usage pattern that alternates
* between reading a lot of data and writing a lot of data, so whenever we
* are forced to read, we should fill working memory completely.
*
* In FINALMERGE state, we *don't* use this routine, but instead just preread
* from the single tape that ran dry. There's no read/write alternation in
* that state and so no point in scanning through all the tapes to fix one.
* (Moreover, there may be quite a lot of inactive tapes in that state, since
* we might have had many fewer runs than tapes. In a regular tape-to-tape
* merge we can expect most of the tapes to be active.)
*/
static void
mergepreread(Tuplesortstate *state)
{
int srcTape;
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
mergeprereadone(state, srcTape);
}
/*
* mergeprereadone - load tuples from one merge input tape
*
* Read tuples from the specified tape until it has used up its free memory
* or array slots; but ensure that we have at least one tuple, if any are
* to be had.
*/
static void
mergeprereadone(Tuplesortstate *state, int srcTape)
{
unsigned int tuplen;
SortTuple stup;
int tupIndex;
long priorAvail,
spaceUsed;
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
if (!state->mergeactive[srcTape])
return; /* tape's run is already exhausted */
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
state->mergenext[srcTape] == 0)
{
if (!state->mergeactive[srcTape])
continue;
/*
* Skip reading from any tape that still has at least half of its
* target memory filled with tuples (threshold fraction may need
* adjustment?). This avoids reading just a few tuples when the
* incoming runs are not being consumed evenly.
*/
if (state->mergenext[srcTape] != 0 &&
state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
continue;
/*
* Read tuples from this tape until it has used up its free memory,
* or we are low on memtuples slots; but ensure that we have at least
* one tuple.
*/
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
while ((!LACKMEM(state) && state->mergeslotsfree > state->tapeRange) ||
state->mergenext[srcTape] == 0)
/* read next tuple, if any */
if ((tuplen = getlen(state, srcTape, true)) == 0)
{
/* read next tuple, if any */
if ((tuplen = getlen(state, srcTape, true)) == 0)
{
state->mergeactive[srcTape] = false;
break;
}
READTUP(state, &stup, srcTape, tuplen);
/* find a free slot in memtuples[] for it */
tupIndex = state->mergefreelist;
if (tupIndex)
state->mergefreelist = state->memtuples[tupIndex].tupindex;
else
{
tupIndex = state->mergefirstfree++;
Assert(tupIndex < state->memtupsize);
}
state->mergeslotsfree--;
/* store tuple, append to list for its tape */
stup.tupindex = 0;
state->memtuples[tupIndex] = stup;
if (state->mergelast[srcTape])
state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
else
state->mergenext[srcTape] = tupIndex;
state->mergelast[srcTape] = tupIndex;
state->mergeactive[srcTape] = false;
break;
}
/* update per-tape and global availmem counts */
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
state->mergeavailmem[srcTape] = state->availMem;
state->availMem = priorAvail - spaceUsed;
READTUP(state, &stup, srcTape, tuplen);
/* find a free slot in memtuples[] for it */
tupIndex = state->mergefreelist;
if (tupIndex)
state->mergefreelist = state->memtuples[tupIndex].tupindex;
else
{
tupIndex = state->mergefirstfree++;
Assert(tupIndex < state->memtupsize);
}
state->mergeavailslots[srcTape]--;
/* store tuple, append to list for its tape */
stup.tupindex = 0;
state->memtuples[tupIndex] = stup;
if (state->mergelast[srcTape])
state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
else
state->mergenext[srcTape] = tupIndex;
state->mergelast[srcTape] = tupIndex;
}
/* update per-tape and global availmem counts */
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
state->mergeavailmem[srcTape] = state->availMem;
state->availMem = priorAvail - spaceUsed;
}
/*