Refactor LogicalTapeSet/LogicalTape interface.
All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now take a LogicalTape as argument, instead of LogicalTapeSet+tape number. You can create any number of LogicalTapes in a single LogicalTapeSet, and you don't need to decide the number upfront, when you create the tape set. This makes the tape management in hash agg spilling in nodeAgg.c simpler. Discussion: https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi Reviewed-by: Peter Geoghegan, Zhihong Yu, John Naylor
This commit is contained in:
parent
409f9ca447
commit
c4649cce39
|
@ -208,7 +208,16 @@
|
|||
*
|
||||
* Spilled data is written to logical tapes. These provide better control
|
||||
* over memory usage, disk space, and the number of files than if we were
|
||||
* to use a BufFile for each spill.
|
||||
* to use a BufFile for each spill. We don't know the number of tapes needed
|
||||
* at the start of the algorithm (because it can recurse), so a tape set is
|
||||
* allocated at the beginning, and individual tapes are created as needed.
|
||||
* As a particular tape is read, logtape.c recycles its disk space. When a
|
||||
* tape is read to completion, it is destroyed entirely.
|
||||
*
|
||||
* Tapes' buffers can take up substantial memory when many tapes are open at
|
||||
* once. We only need one tape open at a time in read mode (using a buffer
|
||||
* that's a multiple of BLCKSZ); but we need one tape open in write mode (each
|
||||
* requiring a buffer of size BLCKSZ) for each partition.
|
||||
*
|
||||
* Note that it's possible for transition states to start small but then
|
||||
* grow very large; for instance in the case of ARRAY_AGG. In such cases,
|
||||
|
@ -311,27 +320,6 @@
|
|||
*/
|
||||
#define CHUNKHDRSZ 16
|
||||
|
||||
/*
|
||||
* Track all tapes needed for a HashAgg that spills. We don't know the maximum
|
||||
* number of tapes needed at the start of the algorithm (because it can
|
||||
* recurse), so one tape set is allocated and extended as needed for new
|
||||
* tapes. When a particular tape is already read, rewind it for write mode and
|
||||
* put it in the free list.
|
||||
*
|
||||
* Tapes' buffers can take up substantial memory when many tapes are open at
|
||||
* once. We only need one tape open at a time in read mode (using a buffer
|
||||
* that's a multiple of BLCKSZ); but we need one tape open in write mode (each
|
||||
* requiring a buffer of size BLCKSZ) for each partition.
|
||||
*/
|
||||
typedef struct HashTapeInfo
|
||||
{
|
||||
LogicalTapeSet *tapeset;
|
||||
int ntapes;
|
||||
int *freetapes;
|
||||
int nfreetapes;
|
||||
int freetapes_alloc;
|
||||
} HashTapeInfo;
|
||||
|
||||
/*
|
||||
* Represents partitioned spill data for a single hashtable. Contains the
|
||||
* necessary information to route tuples to the correct partition, and to
|
||||
|
@ -343,9 +331,8 @@ typedef struct HashTapeInfo
|
|||
*/
|
||||
typedef struct HashAggSpill
|
||||
{
|
||||
LogicalTapeSet *tapeset; /* borrowed reference to tape set */
|
||||
int npartitions; /* number of partitions */
|
||||
int *partitions; /* spill partition tape numbers */
|
||||
LogicalTape **partitions; /* spill partition tapes */
|
||||
int64 *ntuples; /* number of tuples in each partition */
|
||||
uint32 mask; /* mask to find partition from hash value */
|
||||
int shift; /* after masking, shift by this amount */
|
||||
|
@ -365,8 +352,7 @@ typedef struct HashAggBatch
|
|||
{
|
||||
int setno; /* grouping set */
|
||||
int used_bits; /* number of bits of hash already used */
|
||||
LogicalTapeSet *tapeset; /* borrowed reference to tape set */
|
||||
int input_tapenum; /* input partition tape */
|
||||
LogicalTape *input_tape; /* input partition tape */
|
||||
int64 input_tuples; /* number of tuples in this batch */
|
||||
double input_card; /* estimated group cardinality */
|
||||
} HashAggBatch;
|
||||
|
@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
|
|||
int npartitions);
|
||||
static void hashagg_finish_initial_spills(AggState *aggstate);
|
||||
static void hashagg_reset_spill_state(AggState *aggstate);
|
||||
static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
|
||||
int input_tapenum, int setno,
|
||||
static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
|
||||
int64 input_tuples, double input_card,
|
||||
int used_bits);
|
||||
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
|
||||
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
|
||||
static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
|
||||
int used_bits, double input_groups,
|
||||
double hashentrysize);
|
||||
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||
TupleTableSlot *slot, uint32 hash);
|
||||
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
|
||||
int setno);
|
||||
static void hashagg_tapeinfo_init(AggState *aggstate);
|
||||
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
|
||||
int ndest);
|
||||
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
|
||||
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
|
||||
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
|
||||
AggState *aggstate, EState *estate,
|
||||
|
@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
|
|||
|
||||
if (!aggstate->hash_ever_spilled)
|
||||
{
|
||||
Assert(aggstate->hash_tapeinfo == NULL);
|
||||
Assert(aggstate->hash_tapeset == NULL);
|
||||
Assert(aggstate->hash_spills == NULL);
|
||||
|
||||
aggstate->hash_ever_spilled = true;
|
||||
|
||||
hashagg_tapeinfo_init(aggstate);
|
||||
aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
|
||||
|
||||
aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
|
||||
|
||||
|
@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
|
|||
AggStatePerHash perhash = &aggstate->perhash[setno];
|
||||
HashAggSpill *spill = &aggstate->hash_spills[setno];
|
||||
|
||||
hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
|
||||
hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
|
||||
perhash->aggnode->numGroups,
|
||||
aggstate->hashentrysize);
|
||||
}
|
||||
|
@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
|
|||
aggstate->hash_mem_peak = total_mem;
|
||||
|
||||
/* update disk usage */
|
||||
if (aggstate->hash_tapeinfo != NULL)
|
||||
if (aggstate->hash_tapeset != NULL)
|
||||
{
|
||||
uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
|
||||
uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
|
||||
|
||||
if (aggstate->hash_disk_used < disk_used)
|
||||
aggstate->hash_disk_used = disk_used;
|
||||
|
@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
|
|||
TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
|
||||
|
||||
if (spill->partitions == NULL)
|
||||
hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
|
||||
hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
|
||||
perhash->aggnode->numGroups,
|
||||
aggstate->hashentrysize);
|
||||
|
||||
|
@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate)
|
|||
HashAggBatch *batch;
|
||||
AggStatePerHash perhash;
|
||||
HashAggSpill spill;
|
||||
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
|
||||
LogicalTapeSet *tapeset = aggstate->hash_tapeset;
|
||||
bool spill_initialized = false;
|
||||
|
||||
if (aggstate->hash_batches == NIL)
|
||||
|
@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate)
|
|||
* that we don't assign tapes that will never be used.
|
||||
*/
|
||||
spill_initialized = true;
|
||||
hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
|
||||
hashagg_spill_init(&spill, tapeset, batch->used_bits,
|
||||
batch->input_card, aggstate->hashentrysize);
|
||||
}
|
||||
/* no memory for a new group, spill */
|
||||
|
@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate)
|
|||
ResetExprContext(aggstate->tmpcontext);
|
||||
}
|
||||
|
||||
hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
|
||||
LogicalTapeClose(batch->input_tape);
|
||||
|
||||
/* change back to phase 0 */
|
||||
aggstate->current_phase = 0;
|
||||
|
@ -2884,67 +2865,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize HashTapeInfo
|
||||
*/
|
||||
static void
|
||||
hashagg_tapeinfo_init(AggState *aggstate)
|
||||
{
|
||||
HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
|
||||
int init_tapes = 16; /* expanded dynamically */
|
||||
|
||||
tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
|
||||
tapeinfo->ntapes = init_tapes;
|
||||
tapeinfo->nfreetapes = init_tapes;
|
||||
tapeinfo->freetapes_alloc = init_tapes;
|
||||
tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
|
||||
for (int i = 0; i < init_tapes; i++)
|
||||
tapeinfo->freetapes[i] = i;
|
||||
|
||||
aggstate->hash_tapeinfo = tapeinfo;
|
||||
}
|
||||
|
||||
/*
|
||||
* Assign unused tapes to spill partitions, extending the tape set if
|
||||
* necessary.
|
||||
*/
|
||||
static void
|
||||
hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
|
||||
int npartitions)
|
||||
{
|
||||
int partidx = 0;
|
||||
|
||||
/* use free tapes if available */
|
||||
while (partidx < npartitions && tapeinfo->nfreetapes > 0)
|
||||
partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
|
||||
|
||||
if (partidx < npartitions)
|
||||
{
|
||||
LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
|
||||
|
||||
while (partidx < npartitions)
|
||||
partitions[partidx++] = tapeinfo->ntapes++;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* After a tape has already been written to and then read, this function
|
||||
* rewinds it for writing and adds it to the free list.
|
||||
*/
|
||||
static void
|
||||
hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
|
||||
{
|
||||
/* rewinding frees the buffer while not in use */
|
||||
LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
|
||||
if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
|
||||
{
|
||||
tapeinfo->freetapes_alloc <<= 1;
|
||||
tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
|
||||
tapeinfo->freetapes_alloc * sizeof(int));
|
||||
}
|
||||
tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
|
||||
}
|
||||
|
||||
/*
|
||||
* hashagg_spill_init
|
||||
*
|
||||
|
@ -2952,7 +2872,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
|
|||
* of partitions to create, and initializes them.
|
||||
*/
|
||||
static void
|
||||
hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
|
||||
hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
|
||||
double input_groups, double hashentrysize)
|
||||
{
|
||||
int npartitions;
|
||||
|
@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
|
|||
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
|
||||
used_bits, &partition_bits);
|
||||
|
||||
spill->partitions = palloc0(sizeof(int) * npartitions);
|
||||
spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
|
||||
spill->ntuples = palloc0(sizeof(int64) * npartitions);
|
||||
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
|
||||
|
||||
hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
|
||||
for (int i = 0; i < npartitions; i++)
|
||||
spill->partitions[i] = LogicalTapeCreate(tapeset);
|
||||
|
||||
spill->tapeset = tapeinfo->tapeset;
|
||||
spill->shift = 32 - used_bits - partition_bits;
|
||||
spill->mask = (npartitions - 1) << spill->shift;
|
||||
spill->npartitions = npartitions;
|
||||
|
@ -2986,11 +2906,10 @@ static Size
|
|||
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||
TupleTableSlot *inputslot, uint32 hash)
|
||||
{
|
||||
LogicalTapeSet *tapeset = spill->tapeset;
|
||||
TupleTableSlot *spillslot;
|
||||
int partition;
|
||||
MinimalTuple tuple;
|
||||
int tapenum;
|
||||
LogicalTape *tape;
|
||||
int total_written = 0;
|
||||
bool shouldFree;
|
||||
|
||||
|
@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
|||
*/
|
||||
addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
|
||||
|
||||
tapenum = spill->partitions[partition];
|
||||
tape = spill->partitions[partition];
|
||||
|
||||
LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
|
||||
LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
|
||||
total_written += sizeof(uint32);
|
||||
|
||||
LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
|
||||
LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
|
||||
total_written += tuple->t_len;
|
||||
|
||||
if (shouldFree)
|
||||
|
@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
|||
* be done.
|
||||
*/
|
||||
static HashAggBatch *
|
||||
hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
|
||||
hashagg_batch_new(LogicalTape *input_tape, int setno,
|
||||
int64 input_tuples, double input_card, int used_bits)
|
||||
{
|
||||
HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
|
||||
|
||||
batch->setno = setno;
|
||||
batch->used_bits = used_bits;
|
||||
batch->tapeset = tapeset;
|
||||
batch->input_tapenum = tapenum;
|
||||
batch->input_tape = input_tape;
|
||||
batch->input_tuples = input_tuples;
|
||||
batch->input_card = input_card;
|
||||
|
||||
|
@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
|
|||
static MinimalTuple
|
||||
hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
|
||||
{
|
||||
LogicalTapeSet *tapeset = batch->tapeset;
|
||||
int tapenum = batch->input_tapenum;
|
||||
LogicalTape *tape = batch->input_tape;
|
||||
MinimalTuple tuple;
|
||||
uint32 t_len;
|
||||
size_t nread;
|
||||
uint32 hash;
|
||||
|
||||
nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
|
||||
nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
|
||||
if (nread == 0)
|
||||
return NULL;
|
||||
if (nread != sizeof(uint32))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
|
||||
tapenum, sizeof(uint32), nread)));
|
||||
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
|
||||
tape, sizeof(uint32), nread)));
|
||||
if (hashp != NULL)
|
||||
*hashp = hash;
|
||||
|
||||
nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
|
||||
nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
|
||||
if (nread != sizeof(uint32))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
|
||||
tapenum, sizeof(uint32), nread)));
|
||||
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
|
||||
tape, sizeof(uint32), nread)));
|
||||
|
||||
tuple = (MinimalTuple) palloc(t_len);
|
||||
tuple->t_len = t_len;
|
||||
|
||||
nread = LogicalTapeRead(tapeset, tapenum,
|
||||
nread = LogicalTapeRead(tape,
|
||||
(void *) ((char *) tuple + sizeof(uint32)),
|
||||
t_len - sizeof(uint32));
|
||||
if (nread != t_len - sizeof(uint32))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
|
||||
tapenum, t_len - sizeof(uint32), nread)));
|
||||
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
|
||||
tape, t_len - sizeof(uint32), nread)));
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
|
|||
|
||||
for (i = 0; i < spill->npartitions; i++)
|
||||
{
|
||||
LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
|
||||
int tapenum = spill->partitions[i];
|
||||
LogicalTape *tape = spill->partitions[i];
|
||||
HashAggBatch *new_batch;
|
||||
double cardinality;
|
||||
|
||||
|
@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
|
|||
freeHyperLogLog(&spill->hll_card[i]);
|
||||
|
||||
/* rewinding frees the buffer while not in use */
|
||||
LogicalTapeRewindForRead(tapeset, tapenum,
|
||||
HASHAGG_READ_BUFFER_SIZE);
|
||||
LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
|
||||
|
||||
new_batch = hashagg_batch_new(tapeset, tapenum, setno,
|
||||
new_batch = hashagg_batch_new(tape, setno,
|
||||
spill->ntuples[i], cardinality,
|
||||
used_bits);
|
||||
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
|
||||
|
@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate)
|
|||
aggstate->hash_batches = NIL;
|
||||
|
||||
/* close tape set */
|
||||
if (aggstate->hash_tapeinfo != NULL)
|
||||
if (aggstate->hash_tapeset != NULL)
|
||||
{
|
||||
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
|
||||
|
||||
LogicalTapeSetClose(tapeinfo->tapeset);
|
||||
pfree(tapeinfo->freetapes);
|
||||
pfree(tapeinfo);
|
||||
aggstate->hash_tapeinfo = NULL;
|
||||
LogicalTapeSetClose(aggstate->hash_tapeset);
|
||||
aggstate->hash_tapeset = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -9,8 +9,7 @@
|
|||
* there is an annoying problem: the peak space usage is at least twice
|
||||
* the volume of actual data to be sorted. (This must be so because each
|
||||
* datum will appear in both the input and output tapes of the final
|
||||
* merge pass. For seven-tape polyphase merge, which is otherwise a
|
||||
* pretty good algorithm, peak usage is more like 4x actual data volume.)
|
||||
* merge pass.)
|
||||
*
|
||||
* We can work around this problem by recognizing that any one tape
|
||||
* dataset (with the possible exception of the final output) is written
|
||||
|
@ -137,6 +136,8 @@ typedef struct TapeBlockTrailer
|
|||
*/
|
||||
typedef struct LogicalTape
|
||||
{
|
||||
LogicalTapeSet *tapeSet; /* tape set this tape is part of */
|
||||
|
||||
bool writing; /* T while in write phase */
|
||||
bool frozen; /* T if blocks should not be freed when read */
|
||||
bool dirty; /* does buffer need to be written? */
|
||||
|
@ -180,11 +181,14 @@ typedef struct LogicalTape
|
|||
* This data structure represents a set of related "logical tapes" sharing
|
||||
* space in a single underlying file. (But that "file" may be multiple files
|
||||
* if needed to escape OS limits on file size; buffile.c handles that for us.)
|
||||
* The number of tapes is fixed at creation.
|
||||
* Tapes belonging to a tape set can be created and destroyed on-the-fly, on
|
||||
* demand.
|
||||
*/
|
||||
struct LogicalTapeSet
|
||||
{
|
||||
BufFile *pfile; /* underlying file for whole tape set */
|
||||
SharedFileSet *fileset;
|
||||
int worker; /* worker # if shared, -1 for leader/serial */
|
||||
|
||||
/*
|
||||
* File size tracking. nBlocksWritten is the size of the underlying file,
|
||||
|
@ -213,22 +217,16 @@ struct LogicalTapeSet
|
|||
long nFreeBlocks; /* # of currently free blocks */
|
||||
Size freeBlocksLen; /* current allocated length of freeBlocks[] */
|
||||
bool enable_prealloc; /* preallocate write blocks? */
|
||||
|
||||
/* The array of logical tapes. */
|
||||
int nTapes; /* # of logical tapes in set */
|
||||
LogicalTape *tapes; /* has nTapes nentries */
|
||||
};
|
||||
|
||||
static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
|
||||
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
|
||||
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
|
||||
static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
|
||||
static long ltsGetFreeBlock(LogicalTapeSet *lts);
|
||||
static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
|
||||
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
|
||||
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
|
||||
SharedFileSet *fileset);
|
||||
static void ltsInitTape(LogicalTape *lt);
|
||||
static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
|
||||
static void ltsInitReadBuffer(LogicalTape *lt);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -304,7 +302,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
|
|||
* Returns true if anything was read, 'false' on EOF.
|
||||
*/
|
||||
static bool
|
||||
ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
|
||||
ltsReadFillBuffer(LogicalTape *lt)
|
||||
{
|
||||
lt->pos = 0;
|
||||
lt->nbytes = 0;
|
||||
|
@ -321,9 +319,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
|
|||
datablocknum += lt->offsetBlockNumber;
|
||||
|
||||
/* Read the block */
|
||||
ltsReadBlock(lts, datablocknum, (void *) thisbuf);
|
||||
ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
|
||||
if (!lt->frozen)
|
||||
ltsReleaseBlock(lts, datablocknum);
|
||||
ltsReleaseBlock(lt->tapeSet, datablocknum);
|
||||
lt->curBlockNumber = lt->nextBlockNumber;
|
||||
|
||||
lt->nbytes += TapeBlockGetNBytes(thisbuf);
|
||||
|
@ -531,100 +529,188 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
|
|||
}
|
||||
|
||||
/*
|
||||
* Claim ownership of a set of logical tapes from existing shared BufFiles.
|
||||
* Lazily allocate and initialize the read buffer. This avoids waste when many
|
||||
* tapes are open at once, but not all are active between rewinding and
|
||||
* reading.
|
||||
*/
|
||||
static void
|
||||
ltsInitReadBuffer(LogicalTape *lt)
|
||||
{
|
||||
Assert(lt->buffer_size > 0);
|
||||
lt->buffer = palloc(lt->buffer_size);
|
||||
|
||||
/* Read the first block, or reset if tape is empty */
|
||||
lt->nextBlockNumber = lt->firstBlockNumber;
|
||||
lt->pos = 0;
|
||||
lt->nbytes = 0;
|
||||
ltsReadFillBuffer(lt);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a tape set, backed by a temporary underlying file.
|
||||
*
|
||||
* The tape set is initially empty. Use LogicalTapeCreate() to create
|
||||
* tapes in it.
|
||||
*
|
||||
* Serial callers pass NULL argument for shared, and -1 for worker. Parallel
|
||||
* worker callers pass a shared file handle and their own worker number.
|
||||
*
|
||||
* Leader callers pass a shared file handle and -1 for worker. After creating
|
||||
* the tape set, use LogicalTapeImport() to import the worker tapes into it.
|
||||
*
|
||||
* Currently, the leader will only import worker tapes into the set, it does
|
||||
* not create tapes of its own, although in principle that should work.
|
||||
*/
|
||||
LogicalTapeSet *
|
||||
LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
|
||||
{
|
||||
LogicalTapeSet *lts;
|
||||
|
||||
/*
|
||||
* Create top-level struct including per-tape LogicalTape structs.
|
||||
*/
|
||||
lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
|
||||
lts->nBlocksAllocated = 0L;
|
||||
lts->nBlocksWritten = 0L;
|
||||
lts->nHoleBlocks = 0L;
|
||||
lts->forgetFreeSpace = false;
|
||||
lts->freeBlocksLen = 32; /* reasonable initial guess */
|
||||
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
|
||||
lts->nFreeBlocks = 0;
|
||||
lts->enable_prealloc = preallocate;
|
||||
|
||||
lts->fileset = fileset;
|
||||
lts->worker = worker;
|
||||
|
||||
/*
|
||||
* Create temp BufFile storage as required.
|
||||
*
|
||||
* In leader, we hijack the BufFile of the first tape that's imported, and
|
||||
* concatenate the BufFiles of any subsequent tapes to that. Hence don't
|
||||
* create a BufFile here. Things are simpler for the worker case and the
|
||||
* serial case, though. They are generally very similar -- workers use a
|
||||
* shared fileset, whereas serial sorts use a conventional serial BufFile.
|
||||
*/
|
||||
if (fileset && worker == -1)
|
||||
lts->pfile = NULL;
|
||||
else if (fileset)
|
||||
{
|
||||
char filename[MAXPGPATH];
|
||||
|
||||
pg_itoa(worker, filename);
|
||||
lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
|
||||
}
|
||||
else
|
||||
lts->pfile = BufFileCreateTemp(false);
|
||||
|
||||
return lts;
|
||||
}
|
||||
|
||||
/*
|
||||
* Claim ownership of a logical tape from an existing shared BufFile.
|
||||
*
|
||||
* Caller should be leader process. Though tapes are marked as frozen in
|
||||
* workers, they are not frozen when opened within leader, since unfrozen tapes
|
||||
* use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
|
||||
* for random access.)
|
||||
*/
|
||||
static void
|
||||
ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
|
||||
SharedFileSet *fileset)
|
||||
LogicalTape *
|
||||
LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
|
||||
{
|
||||
LogicalTape *lt = NULL;
|
||||
long tapeblocks = 0L;
|
||||
long nphysicalblocks = 0L;
|
||||
int i;
|
||||
LogicalTape *lt;
|
||||
long tapeblocks;
|
||||
char filename[MAXPGPATH];
|
||||
BufFile *file;
|
||||
int64 filesize;
|
||||
|
||||
/* Should have at least one worker tape, plus leader's tape */
|
||||
Assert(lts->nTapes >= 2);
|
||||
lt = ltsCreateTape(lts);
|
||||
|
||||
/*
|
||||
* Build concatenated view of all BufFiles, remembering the block number
|
||||
* where each source file begins. No changes are needed for leader/last
|
||||
* tape.
|
||||
* build concatenated view of all buffiles, remembering the block number
|
||||
* where each source file begins.
|
||||
*/
|
||||
for (i = 0; i < lts->nTapes - 1; i++)
|
||||
pg_itoa(worker, filename);
|
||||
file = BufFileOpenFileSet(<s->fileset->fs, filename, O_RDONLY, false);
|
||||
filesize = BufFileSize(file);
|
||||
|
||||
/*
|
||||
* Stash first BufFile, and concatenate subsequent BufFiles to that. Store
|
||||
* block offset into each tape as we go.
|
||||
*/
|
||||
lt->firstBlockNumber = shared->firstblocknumber;
|
||||
if (lts->pfile == NULL)
|
||||
{
|
||||
char filename[MAXPGPATH];
|
||||
BufFile *file;
|
||||
int64 filesize;
|
||||
|
||||
lt = <s->tapes[i];
|
||||
|
||||
pg_itoa(i, filename);
|
||||
file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
|
||||
filesize = BufFileSize(file);
|
||||
|
||||
/*
|
||||
* Stash first BufFile, and concatenate subsequent BufFiles to that.
|
||||
* Store block offset into each tape as we go.
|
||||
*/
|
||||
lt->firstBlockNumber = shared[i].firstblocknumber;
|
||||
if (i == 0)
|
||||
{
|
||||
lts->pfile = file;
|
||||
lt->offsetBlockNumber = 0L;
|
||||
}
|
||||
else
|
||||
{
|
||||
lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
|
||||
}
|
||||
/* Don't allocate more for read buffer than could possibly help */
|
||||
lt->max_size = Min(MaxAllocSize, filesize);
|
||||
tapeblocks = filesize / BLCKSZ;
|
||||
nphysicalblocks += tapeblocks;
|
||||
lts->pfile = file;
|
||||
lt->offsetBlockNumber = 0L;
|
||||
}
|
||||
else
|
||||
{
|
||||
lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
|
||||
}
|
||||
/* Don't allocate more for read buffer than could possibly help */
|
||||
lt->max_size = Min(MaxAllocSize, filesize);
|
||||
tapeblocks = filesize / BLCKSZ;
|
||||
|
||||
/*
|
||||
* Set # of allocated blocks, as well as # blocks written. Use extent of
|
||||
* new BufFile space (from 0 to end of last worker's tape space) for this.
|
||||
* Allocated/written blocks should include space used by holes left
|
||||
* between concatenated BufFiles.
|
||||
* Update # of allocated blocks and # blocks written to reflect the
|
||||
* imported BufFile. Allocated/written blocks include space used by holes
|
||||
* left between concatenated BufFiles. Also track the number of hole
|
||||
* blocks so that we can later work backwards to calculate the number of
|
||||
* physical blocks for instrumentation.
|
||||
*/
|
||||
lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
|
||||
|
||||
lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
|
||||
lts->nBlocksWritten = lts->nBlocksAllocated;
|
||||
|
||||
/*
|
||||
* Compute number of hole blocks so that we can later work backwards, and
|
||||
* instrument number of physical blocks. We don't simply use physical
|
||||
* blocks directly for instrumentation because this would break if we ever
|
||||
* subsequently wrote to the leader tape.
|
||||
*
|
||||
* Working backwards like this keeps our options open. If shared BufFiles
|
||||
* ever support being written to post-export, logtape.c can automatically
|
||||
* take advantage of that. We'd then support writing to the leader tape
|
||||
* while recycling space from worker tapes, because the leader tape has a
|
||||
* zero offset (write routines won't need to have extra logic to apply an
|
||||
* offset).
|
||||
*
|
||||
* The only thing that currently prevents writing to the leader tape from
|
||||
* working is the fact that BufFiles opened using BufFileOpenFileSet() are
|
||||
* read-only by definition, but that could be changed if it seemed
|
||||
* worthwhile. For now, writing to the leader tape will raise a "Bad file
|
||||
* descriptor" error, so tuplesort must avoid writing to the leader tape
|
||||
* altogether.
|
||||
*/
|
||||
lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
|
||||
return lt;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize per-tape struct. Note we allocate the I/O buffer lazily.
|
||||
* Close a logical tape set and release all resources.
|
||||
*
|
||||
* NOTE: This doesn't close any of the tapes! You must close them
|
||||
* first, or you can let them be destroyed along with the memory context.
|
||||
*/
|
||||
static void
|
||||
ltsInitTape(LogicalTape *lt)
|
||||
void
|
||||
LogicalTapeSetClose(LogicalTapeSet *lts)
|
||||
{
|
||||
BufFileClose(lts->pfile);
|
||||
pfree(lts->freeBlocks);
|
||||
pfree(lts);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a logical tape in the given tapeset.
|
||||
*
|
||||
* The tape is initialized in write state.
|
||||
*/
|
||||
LogicalTape *
|
||||
LogicalTapeCreate(LogicalTapeSet *lts)
|
||||
{
|
||||
/*
|
||||
* The only thing that currently prevents creating new tapes in leader is
|
||||
* the fact that BufFiles opened using BufFileOpenShared() are read-only
|
||||
* by definition, but that could be changed if it seemed worthwhile. For
|
||||
* now, writing to the leader tape will raise a "Bad file descriptor"
|
||||
* error, so tuplesort must avoid writing to the leader tape altogether.
|
||||
*/
|
||||
if (lts->fileset && lts->worker == -1)
|
||||
elog(ERROR, "cannot create new tapes in leader process");
|
||||
|
||||
return ltsCreateTape(lts);
|
||||
}
|
||||
|
||||
static LogicalTape *
|
||||
ltsCreateTape(LogicalTapeSet *lts)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
|
||||
/*
|
||||
* Create per-tape struct. Note we allocate the I/O buffer lazily.
|
||||
*/
|
||||
lt = palloc(sizeof(LogicalTape));
|
||||
lt->tapeSet = lts;
|
||||
lt->writing = true;
|
||||
lt->frozen = false;
|
||||
lt->dirty = false;
|
||||
|
@ -641,114 +727,23 @@ ltsInitTape(LogicalTape *lt)
|
|||
lt->prealloc = NULL;
|
||||
lt->nprealloc = 0;
|
||||
lt->prealloc_size = 0;
|
||||
|
||||
return lt;
|
||||
}
|
||||
|
||||
/*
|
||||
* Lazily allocate and initialize the read buffer. This avoids waste when many
|
||||
* tapes are open at once, but not all are active between rewinding and
|
||||
* reading.
|
||||
*/
|
||||
static void
|
||||
ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
|
||||
{
|
||||
Assert(lt->buffer_size > 0);
|
||||
lt->buffer = palloc(lt->buffer_size);
|
||||
|
||||
/* Read the first block, or reset if tape is empty */
|
||||
lt->nextBlockNumber = lt->firstBlockNumber;
|
||||
lt->pos = 0;
|
||||
lt->nbytes = 0;
|
||||
ltsReadFillBuffer(lts, lt);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a set of logical tapes in a temporary underlying file.
|
||||
* Close a logical tape.
|
||||
*
|
||||
* Each tape is initialized in write state. Serial callers pass ntapes,
|
||||
* NULL argument for shared, and -1 for worker. Parallel worker callers
|
||||
* pass ntapes, a shared file handle, NULL shared argument, and their own
|
||||
* worker number. Leader callers, which claim shared worker tapes here,
|
||||
* must supply non-sentinel values for all arguments except worker number,
|
||||
* which should be -1.
|
||||
*
|
||||
* Leader caller is passing back an array of metadata each worker captured
|
||||
* when LogicalTapeFreeze() was called for their final result tapes. Passed
|
||||
* tapes array is actually sized ntapes - 1, because it includes only
|
||||
* worker tapes, whereas leader requires its own leader tape. Note that we
|
||||
* rely on the assumption that reclaimed worker tapes will only be read
|
||||
* from once by leader, and never written to again (tapes are initialized
|
||||
* for writing, but that's only to be consistent). Leader may not write to
|
||||
* its own tape purely due to a restriction in the shared buffile
|
||||
* infrastructure that may be lifted in the future.
|
||||
*/
|
||||
LogicalTapeSet *
|
||||
LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
|
||||
SharedFileSet *fileset, int worker)
|
||||
{
|
||||
LogicalTapeSet *lts;
|
||||
int i;
|
||||
|
||||
/*
|
||||
* Create top-level struct including per-tape LogicalTape structs.
|
||||
*/
|
||||
Assert(ntapes > 0);
|
||||
lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
|
||||
lts->nBlocksAllocated = 0L;
|
||||
lts->nBlocksWritten = 0L;
|
||||
lts->nHoleBlocks = 0L;
|
||||
lts->forgetFreeSpace = false;
|
||||
lts->freeBlocksLen = 32; /* reasonable initial guess */
|
||||
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
|
||||
lts->nFreeBlocks = 0;
|
||||
lts->enable_prealloc = preallocate;
|
||||
lts->nTapes = ntapes;
|
||||
lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
|
||||
|
||||
for (i = 0; i < ntapes; i++)
|
||||
ltsInitTape(<s->tapes[i]);
|
||||
|
||||
/*
|
||||
* Create temp BufFile storage as required.
|
||||
*
|
||||
* Leader concatenates worker tapes, which requires special adjustment to
|
||||
* final tapeset data. Things are simpler for the worker case and the
|
||||
* serial case, though. They are generally very similar -- workers use a
|
||||
* shared fileset, whereas serial sorts use a conventional serial BufFile.
|
||||
*/
|
||||
if (shared)
|
||||
ltsConcatWorkerTapes(lts, shared, fileset);
|
||||
else if (fileset)
|
||||
{
|
||||
char filename[MAXPGPATH];
|
||||
|
||||
pg_itoa(worker, filename);
|
||||
lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
|
||||
}
|
||||
else
|
||||
lts->pfile = BufFileCreateTemp(false);
|
||||
|
||||
return lts;
|
||||
}
|
||||
|
||||
/*
|
||||
* Close a logical tape set and release all resources.
|
||||
* Note: This doesn't return any blocks to the free list! You must read
|
||||
* the tape to the end first, to reuse the space. In current use, though,
|
||||
* we only close tapes after fully reading them.
|
||||
*/
|
||||
void
|
||||
LogicalTapeSetClose(LogicalTapeSet *lts)
|
||||
LogicalTapeClose(LogicalTape *lt)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
int i;
|
||||
|
||||
BufFileClose(lts->pfile);
|
||||
for (i = 0; i < lts->nTapes; i++)
|
||||
{
|
||||
lt = <s->tapes[i];
|
||||
if (lt->buffer)
|
||||
pfree(lt->buffer);
|
||||
}
|
||||
pfree(lts->tapes);
|
||||
pfree(lts->freeBlocks);
|
||||
pfree(lts);
|
||||
if (lt->buffer)
|
||||
pfree(lt->buffer);
|
||||
pfree(lt);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -772,14 +767,11 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
|
|||
* There are no error returns; we ereport() on failure.
|
||||
*/
|
||||
void
|
||||
LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
|
||||
void *ptr, size_t size)
|
||||
LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
LogicalTapeSet *lts = lt->tapeSet;
|
||||
size_t nthistime;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
Assert(lt->writing);
|
||||
Assert(lt->offsetBlockNumber == 0L);
|
||||
|
||||
|
@ -818,11 +810,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
|
|||
* First allocate the next block, so that we can store it in the
|
||||
* 'next' pointer of this block.
|
||||
*/
|
||||
nextBlockNumber = ltsGetBlock(lts, lt);
|
||||
nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
|
||||
|
||||
/* set the next-pointer and dump the current block. */
|
||||
TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
|
||||
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
|
||||
ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
|
||||
|
||||
/* initialize the prev-pointer of the next block */
|
||||
TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
|
||||
|
@ -860,12 +852,9 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
|
|||
* byte buffer is used.
|
||||
*/
|
||||
void
|
||||
LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
|
||||
LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
LogicalTapeSet *lts = lt->tapeSet;
|
||||
|
||||
/*
|
||||
* Round and cap buffer_size if needed.
|
||||
|
@ -907,7 +896,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
|
|||
lt->buffer_size - lt->nbytes);
|
||||
|
||||
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
|
||||
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
|
||||
ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
|
||||
}
|
||||
lt->writing = false;
|
||||
}
|
||||
|
@ -939,61 +928,28 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Rewind logical tape and switch from reading to writing.
|
||||
*
|
||||
* NOTE: we assume the caller has read the tape to the end; otherwise
|
||||
* untouched data will not have been freed. We could add more code to free
|
||||
* any unread blocks, but in current usage of this module it'd be useless
|
||||
* code.
|
||||
*/
|
||||
void
|
||||
LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
|
||||
Assert(!lt->writing && !lt->frozen);
|
||||
lt->writing = true;
|
||||
lt->dirty = false;
|
||||
lt->firstBlockNumber = -1L;
|
||||
lt->curBlockNumber = -1L;
|
||||
lt->pos = 0;
|
||||
lt->nbytes = 0;
|
||||
if (lt->buffer)
|
||||
pfree(lt->buffer);
|
||||
lt->buffer = NULL;
|
||||
lt->buffer_size = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Read from a logical tape.
|
||||
*
|
||||
* Early EOF is indicated by return value less than #bytes requested.
|
||||
*/
|
||||
size_t
|
||||
LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
|
||||
void *ptr, size_t size)
|
||||
LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
size_t nread = 0;
|
||||
size_t nthistime;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
Assert(!lt->writing);
|
||||
|
||||
if (lt->buffer == NULL)
|
||||
ltsInitReadBuffer(lts, lt);
|
||||
ltsInitReadBuffer(lt);
|
||||
|
||||
while (size > 0)
|
||||
{
|
||||
if (lt->pos >= lt->nbytes)
|
||||
{
|
||||
/* Try to load more data into buffer. */
|
||||
if (!ltsReadFillBuffer(lts, lt))
|
||||
if (!ltsReadFillBuffer(lt))
|
||||
break; /* EOF */
|
||||
}
|
||||
|
||||
|
@ -1031,12 +987,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
|
|||
* Serial sorts should set share to NULL.
|
||||
*/
|
||||
void
|
||||
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
|
||||
LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
LogicalTapeSet *lts = lt->tapeSet;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
Assert(lt->writing);
|
||||
Assert(lt->offsetBlockNumber == 0L);
|
||||
|
||||
|
@ -1058,8 +1012,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
|
|||
lt->buffer_size - lt->nbytes);
|
||||
|
||||
TapeBlockSetNBytes(lt->buffer, lt->nbytes);
|
||||
ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
|
||||
lt->writing = false;
|
||||
ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
|
||||
}
|
||||
lt->writing = false;
|
||||
lt->frozen = true;
|
||||
|
@ -1086,7 +1039,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
|
|||
|
||||
if (lt->firstBlockNumber == -1L)
|
||||
lt->nextBlockNumber = -1L;
|
||||
ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
|
||||
ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
|
||||
if (TapeBlockIsLast(lt->buffer))
|
||||
lt->nextBlockNumber = -1L;
|
||||
else
|
||||
|
@ -1101,25 +1054,6 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Add additional tapes to this tape set. Not intended to be used when any
|
||||
* tapes are frozen.
|
||||
*/
|
||||
void
|
||||
LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
|
||||
{
|
||||
int i;
|
||||
int nTapesOrig = lts->nTapes;
|
||||
|
||||
lts->nTapes += nAdditional;
|
||||
|
||||
lts->tapes = (LogicalTape *) repalloc(lts->tapes,
|
||||
lts->nTapes * sizeof(LogicalTape));
|
||||
|
||||
for (i = nTapesOrig; i < lts->nTapes; i++)
|
||||
ltsInitTape(<s->tapes[i]);
|
||||
}
|
||||
|
||||
/*
|
||||
* Backspace the tape a given number of bytes. (We also support a more
|
||||
* general seek interface, see below.)
|
||||
|
@ -1134,18 +1068,15 @@ LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
|
|||
* that case.
|
||||
*/
|
||||
size_t
|
||||
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
|
||||
LogicalTapeBackspace(LogicalTape *lt, size_t size)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
size_t seekpos = 0;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
Assert(lt->frozen);
|
||||
Assert(lt->buffer_size == BLCKSZ);
|
||||
|
||||
if (lt->buffer == NULL)
|
||||
ltsInitReadBuffer(lts, lt);
|
||||
ltsInitReadBuffer(lt);
|
||||
|
||||
/*
|
||||
* Easy case for seek within current block.
|
||||
|
@ -1175,7 +1106,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
|
|||
return seekpos;
|
||||
}
|
||||
|
||||
ltsReadBlock(lts, prev, (void *) lt->buffer);
|
||||
ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
|
||||
|
||||
if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
|
||||
elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
|
||||
|
@ -1208,23 +1139,18 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
|
|||
* LogicalTapeTell().
|
||||
*/
|
||||
void
|
||||
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
|
||||
long blocknum, int offset)
|
||||
LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
Assert(lt->frozen);
|
||||
Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
|
||||
Assert(lt->buffer_size == BLCKSZ);
|
||||
|
||||
if (lt->buffer == NULL)
|
||||
ltsInitReadBuffer(lts, lt);
|
||||
ltsInitReadBuffer(lt);
|
||||
|
||||
if (blocknum != lt->curBlockNumber)
|
||||
{
|
||||
ltsReadBlock(lts, blocknum, (void *) lt->buffer);
|
||||
ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
|
||||
lt->curBlockNumber = blocknum;
|
||||
lt->nbytes = TapeBlockPayloadSize;
|
||||
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
|
||||
|
@ -1242,16 +1168,10 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
|
|||
* the position for a seek after freezing. Not clear if anyone needs that.
|
||||
*/
|
||||
void
|
||||
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
|
||||
long *blocknum, int *offset)
|
||||
LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
|
||||
{
|
||||
LogicalTape *lt;
|
||||
|
||||
Assert(tapenum >= 0 && tapenum < lts->nTapes);
|
||||
lt = <s->tapes[tapenum];
|
||||
|
||||
if (lt->buffer == NULL)
|
||||
ltsInitReadBuffer(lts, lt);
|
||||
ltsInitReadBuffer(lt);
|
||||
|
||||
Assert(lt->offsetBlockNumber == 0L);
|
||||
|
||||
|
@ -1271,13 +1191,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
|
|||
long
|
||||
LogicalTapeSetBlocks(LogicalTapeSet *lts)
|
||||
{
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
for (int i = 0; i < lts->nTapes; i++)
|
||||
{
|
||||
LogicalTape *lt = <s->tapes[i];
|
||||
|
||||
Assert(!lt->writing || lt->buffer == NULL);
|
||||
}
|
||||
#endif
|
||||
return lts->nBlocksWritten - lts->nHoleBlocks;
|
||||
}
|
||||
|
|
|
@ -262,6 +262,7 @@ struct Tuplesortstate
|
|||
MemoryContext sortcontext; /* memory context holding most sort data */
|
||||
MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
|
||||
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
|
||||
LogicalTape **tapes;
|
||||
|
||||
/*
|
||||
* These function pointers decouple the routines that must know what kind
|
||||
|
@ -290,7 +291,7 @@ struct Tuplesortstate
|
|||
* SortTuple struct!), and increase state->availMem by the amount of
|
||||
* memory space thereby released.
|
||||
*/
|
||||
void (*writetup) (Tuplesortstate *state, int tapenum,
|
||||
void (*writetup) (Tuplesortstate *state, LogicalTape *tape,
|
||||
SortTuple *stup);
|
||||
|
||||
/*
|
||||
|
@ -299,7 +300,7 @@ struct Tuplesortstate
|
|||
* from the slab memory arena, or is palloc'd, see readtup_alloc().
|
||||
*/
|
||||
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len);
|
||||
LogicalTape *tape, unsigned int len);
|
||||
|
||||
/*
|
||||
* This array holds the tuples now in sort memory. If we are in state
|
||||
|
@ -393,7 +394,7 @@ struct Tuplesortstate
|
|||
* the next tuple to return. (In the tape case, the tape's current read
|
||||
* position is also critical state.)
|
||||
*/
|
||||
int result_tape; /* actual tape number of finished output */
|
||||
LogicalTape *result_tape; /* tape of finished output */
|
||||
int current; /* array index (only used if SORTEDINMEM) */
|
||||
bool eof_reached; /* reached EOF (needed for cursors) */
|
||||
|
||||
|
@ -599,9 +600,9 @@ struct Sharedsort
|
|||
*/
|
||||
|
||||
/* When using this macro, beware of double evaluation of len */
|
||||
#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
|
||||
#define LogicalTapeReadExact(tape, ptr, len) \
|
||||
do { \
|
||||
if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
|
||||
if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
|
||||
elog(ERROR, "unexpected end of data"); \
|
||||
} while(0)
|
||||
|
||||
|
@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
|
|||
static void mergeruns(Tuplesortstate *state);
|
||||
static void mergeonerun(Tuplesortstate *state);
|
||||
static void beginmerge(Tuplesortstate *state);
|
||||
static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
|
||||
static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
|
||||
static void dumptuples(Tuplesortstate *state, bool alltuples);
|
||||
static void make_bounded_heap(Tuplesortstate *state);
|
||||
static void sort_bounded_heap(Tuplesortstate *state);
|
||||
|
@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
|
|||
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
|
||||
static void tuplesort_heap_delete_top(Tuplesortstate *state);
|
||||
static void reversedirection(Tuplesortstate *state);
|
||||
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
|
||||
static void markrunend(Tuplesortstate *state, int tapenum);
|
||||
static unsigned int getlen(LogicalTape *tape, bool eofOK);
|
||||
static void markrunend(LogicalTape *tape);
|
||||
static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
|
||||
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
|
||||
Tuplesortstate *state);
|
||||
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
|
||||
static void writetup_heap(Tuplesortstate *state, int tapenum,
|
||||
static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
|
||||
SortTuple *stup);
|
||||
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len);
|
||||
LogicalTape *tape, unsigned int len);
|
||||
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
|
||||
Tuplesortstate *state);
|
||||
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
|
||||
static void writetup_cluster(Tuplesortstate *state, int tapenum,
|
||||
static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
|
||||
SortTuple *stup);
|
||||
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len);
|
||||
LogicalTape *tape, unsigned int len);
|
||||
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
|
||||
Tuplesortstate *state);
|
||||
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
|
||||
Tuplesortstate *state);
|
||||
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
|
||||
static void writetup_index(Tuplesortstate *state, int tapenum,
|
||||
static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
|
||||
SortTuple *stup);
|
||||
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len);
|
||||
LogicalTape *tape, unsigned int len);
|
||||
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
|
||||
Tuplesortstate *state);
|
||||
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
|
||||
static void writetup_datum(Tuplesortstate *state, int tapenum,
|
||||
static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
|
||||
SortTuple *stup);
|
||||
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len);
|
||||
LogicalTape *tape, unsigned int len);
|
||||
static int worker_get_identifier(Tuplesortstate *state);
|
||||
static void worker_freeze_result_tape(Tuplesortstate *state);
|
||||
static void worker_nomergeruns(Tuplesortstate *state);
|
||||
|
@ -888,7 +889,7 @@ tuplesort_begin_batch(Tuplesortstate *state)
|
|||
* inittapes(), if needed
|
||||
*/
|
||||
|
||||
state->result_tape = -1; /* flag that result tape has not been formed */
|
||||
state->result_tape = NULL; /* flag that result tape has not been formed */
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
@ -2221,7 +2222,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
|
|||
if (state->eof_reached)
|
||||
return false;
|
||||
|
||||
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
|
||||
if ((tuplen = getlen(state->result_tape, true)) != 0)
|
||||
{
|
||||
READTUP(state, stup, state->result_tape, tuplen);
|
||||
|
||||
|
@ -2254,8 +2255,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
|
|||
* end of file; back up to fetch last tuple's ending length
|
||||
* word. If seek fails we must have a completely empty file.
|
||||
*/
|
||||
nmoved = LogicalTapeBackspace(state->tapeset,
|
||||
state->result_tape,
|
||||
nmoved = LogicalTapeBackspace(state->result_tape,
|
||||
2 * sizeof(unsigned int));
|
||||
if (nmoved == 0)
|
||||
return false;
|
||||
|
@ -2269,20 +2269,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
|
|||
* Back up and fetch previously-returned tuple's ending length
|
||||
* word. If seek fails, assume we are at start of file.
|
||||
*/
|
||||
nmoved = LogicalTapeBackspace(state->tapeset,
|
||||
state->result_tape,
|
||||
nmoved = LogicalTapeBackspace(state->result_tape,
|
||||
sizeof(unsigned int));
|
||||
if (nmoved == 0)
|
||||
return false;
|
||||
else if (nmoved != sizeof(unsigned int))
|
||||
elog(ERROR, "unexpected tape position");
|
||||
tuplen = getlen(state, state->result_tape, false);
|
||||
tuplen = getlen(state->result_tape, false);
|
||||
|
||||
/*
|
||||
* Back up to get ending length word of tuple before it.
|
||||
*/
|
||||
nmoved = LogicalTapeBackspace(state->tapeset,
|
||||
state->result_tape,
|
||||
nmoved = LogicalTapeBackspace(state->result_tape,
|
||||
tuplen + 2 * sizeof(unsigned int));
|
||||
if (nmoved == tuplen + sizeof(unsigned int))
|
||||
{
|
||||
|
@ -2299,15 +2297,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
|
|||
elog(ERROR, "bogus tuple length in backward scan");
|
||||
}
|
||||
|
||||
tuplen = getlen(state, state->result_tape, false);
|
||||
tuplen = getlen(state->result_tape, false);
|
||||
|
||||
/*
|
||||
* Now we have the length of the prior tuple, back up and read it.
|
||||
* Note: READTUP expects we are positioned after the initial
|
||||
* length word of the tuple, so back up to that point.
|
||||
*/
|
||||
nmoved = LogicalTapeBackspace(state->tapeset,
|
||||
state->result_tape,
|
||||
nmoved = LogicalTapeBackspace(state->result_tape,
|
||||
tuplen);
|
||||
if (nmoved != tuplen)
|
||||
elog(ERROR, "bogus tuple length in backward scan");
|
||||
|
@ -2365,11 +2362,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
|
|||
tuplesort_heap_delete_top(state);
|
||||
|
||||
/*
|
||||
* Rewind to free the read buffer. It'd go away at the
|
||||
* end of the sort anyway, but better to release the
|
||||
* memory early.
|
||||
* Close the tape. It'd go away at the end of the sort
|
||||
* anyway, but better to release the memory early.
|
||||
*/
|
||||
LogicalTapeRewindForWrite(state->tapeset, srcTape);
|
||||
LogicalTapeClose(state->tapes[srcTape]);
|
||||
return true;
|
||||
}
|
||||
newtup.srctape = srcTape;
|
||||
|
@ -2667,9 +2663,12 @@ inittapes(Tuplesortstate *state, bool mergeruns)
|
|||
/* Create the tape set and allocate the per-tape data arrays */
|
||||
inittapestate(state, maxTapes);
|
||||
state->tapeset =
|
||||
LogicalTapeSetCreate(maxTapes, false, NULL,
|
||||
LogicalTapeSetCreate(false,
|
||||
state->shared ? &state->shared->fileset : NULL,
|
||||
state->worker);
|
||||
state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
|
||||
for (j = 0; j < maxTapes; j++)
|
||||
state->tapes[j] = LogicalTapeCreate(state->tapeset);
|
||||
|
||||
state->currentRun = 0;
|
||||
|
||||
|
@ -2919,7 +2918,7 @@ mergeruns(Tuplesortstate *state)
|
|||
|
||||
/* End of step D2: rewind all output tapes to prepare for merging */
|
||||
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
|
||||
LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
|
||||
LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
|
@ -2981,11 +2980,14 @@ mergeruns(Tuplesortstate *state)
|
|||
/* Step D6: decrease level */
|
||||
if (--state->Level == 0)
|
||||
break;
|
||||
|
||||
/* rewind output tape T to use as new input */
|
||||
LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
|
||||
LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
|
||||
state->read_buffer_size);
|
||||
/* rewind used-up input tape P, and prepare it for write pass */
|
||||
LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
|
||||
|
||||
/* close used-up input tape P, and create a new one for write pass */
|
||||
LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
|
||||
state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
|
||||
state->tp_runs[state->tapeRange - 1] = 0;
|
||||
|
||||
/*
|
||||
|
@ -3013,18 +3015,21 @@ mergeruns(Tuplesortstate *state)
|
|||
* output tape while rewinding it. The last iteration of step D6 would be
|
||||
* a waste of cycles anyway...
|
||||
*/
|
||||
state->result_tape = state->tp_tapenum[state->tapeRange];
|
||||
state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
|
||||
if (!WORKER(state))
|
||||
LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
|
||||
LogicalTapeFreeze(state->result_tape, NULL);
|
||||
else
|
||||
worker_freeze_result_tape(state);
|
||||
state->status = TSS_SORTEDONTAPE;
|
||||
|
||||
/* Release the read buffers of all the other tapes, by rewinding them. */
|
||||
/* Close all the other tapes, to release their read buffers. */
|
||||
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
|
||||
{
|
||||
if (tapenum != state->result_tape)
|
||||
LogicalTapeRewindForWrite(state->tapeset, tapenum);
|
||||
if (state->tapes[tapenum] != state->result_tape)
|
||||
{
|
||||
LogicalTapeClose(state->tapes[tapenum]);
|
||||
state->tapes[tapenum] = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3037,7 +3042,8 @@ mergeruns(Tuplesortstate *state)
|
|||
static void
|
||||
mergeonerun(Tuplesortstate *state)
|
||||
{
|
||||
int destTape = state->tp_tapenum[state->tapeRange];
|
||||
int destTapeNum = state->tp_tapenum[state->tapeRange];
|
||||
LogicalTape *destTape = state->tapes[destTapeNum];
|
||||
int srcTape;
|
||||
|
||||
/*
|
||||
|
@ -3080,7 +3086,7 @@ mergeonerun(Tuplesortstate *state)
|
|||
* When the heap empties, we're done. Write an end-of-run marker on the
|
||||
* output tape, and increment its count of real runs.
|
||||
*/
|
||||
markrunend(state, destTape);
|
||||
markrunend(destTape);
|
||||
state->tp_runs[state->tapeRange]++;
|
||||
|
||||
#ifdef TRACE_SORT
|
||||
|
@ -3146,17 +3152,18 @@ beginmerge(Tuplesortstate *state)
|
|||
* Returns false on EOF.
|
||||
*/
|
||||
static bool
|
||||
mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
|
||||
mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
|
||||
{
|
||||
LogicalTape *srcTape = state->tapes[srcTapeIndex];
|
||||
unsigned int tuplen;
|
||||
|
||||
if (!state->mergeactive[srcTape])
|
||||
if (!state->mergeactive[srcTapeIndex])
|
||||
return false; /* tape's run is already exhausted */
|
||||
|
||||
/* read next tuple, if any */
|
||||
if ((tuplen = getlen(state, srcTape, true)) == 0)
|
||||
if ((tuplen = getlen(srcTape, true)) == 0)
|
||||
{
|
||||
state->mergeactive[srcTape] = false;
|
||||
state->mergeactive[srcTapeIndex] = false;
|
||||
return false;
|
||||
}
|
||||
READTUP(state, stup, srcTape, tuplen);
|
||||
|
@ -3173,6 +3180,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
|
|||
static void
|
||||
dumptuples(Tuplesortstate *state, bool alltuples)
|
||||
{
|
||||
LogicalTape *destTape;
|
||||
int memtupwrite;
|
||||
int i;
|
||||
|
||||
|
@ -3239,10 +3247,10 @@ dumptuples(Tuplesortstate *state, bool alltuples)
|
|||
#endif
|
||||
|
||||
memtupwrite = state->memtupcount;
|
||||
destTape = state->tapes[state->tp_tapenum[state->destTape]];
|
||||
for (i = 0; i < memtupwrite; i++)
|
||||
{
|
||||
WRITETUP(state, state->tp_tapenum[state->destTape],
|
||||
&state->memtuples[i]);
|
||||
WRITETUP(state, destTape, &state->memtuples[i]);
|
||||
state->memtupcount--;
|
||||
}
|
||||
|
||||
|
@ -3255,7 +3263,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
|
|||
*/
|
||||
MemoryContextReset(state->tuplecontext);
|
||||
|
||||
markrunend(state, state->tp_tapenum[state->destTape]);
|
||||
markrunend(destTape);
|
||||
state->tp_runs[state->destTape]++;
|
||||
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
|
||||
|
||||
|
@ -3289,9 +3297,7 @@ tuplesort_rescan(Tuplesortstate *state)
|
|||
state->markpos_eof = false;
|
||||
break;
|
||||
case TSS_SORTEDONTAPE:
|
||||
LogicalTapeRewindForRead(state->tapeset,
|
||||
state->result_tape,
|
||||
0);
|
||||
LogicalTapeRewindForRead(state->result_tape, 0);
|
||||
state->eof_reached = false;
|
||||
state->markpos_block = 0L;
|
||||
state->markpos_offset = 0;
|
||||
|
@ -3322,8 +3328,7 @@ tuplesort_markpos(Tuplesortstate *state)
|
|||
state->markpos_eof = state->eof_reached;
|
||||
break;
|
||||
case TSS_SORTEDONTAPE:
|
||||
LogicalTapeTell(state->tapeset,
|
||||
state->result_tape,
|
||||
LogicalTapeTell(state->result_tape,
|
||||
&state->markpos_block,
|
||||
&state->markpos_offset);
|
||||
state->markpos_eof = state->eof_reached;
|
||||
|
@ -3354,8 +3359,7 @@ tuplesort_restorepos(Tuplesortstate *state)
|
|||
state->eof_reached = state->markpos_eof;
|
||||
break;
|
||||
case TSS_SORTEDONTAPE:
|
||||
LogicalTapeSeek(state->tapeset,
|
||||
state->result_tape,
|
||||
LogicalTapeSeek(state->result_tape,
|
||||
state->markpos_block,
|
||||
state->markpos_offset);
|
||||
state->eof_reached = state->markpos_eof;
|
||||
|
@ -3697,11 +3701,11 @@ reversedirection(Tuplesortstate *state)
|
|||
*/
|
||||
|
||||
static unsigned int
|
||||
getlen(Tuplesortstate *state, int tapenum, bool eofOK)
|
||||
getlen(LogicalTape *tape, bool eofOK)
|
||||
{
|
||||
unsigned int len;
|
||||
|
||||
if (LogicalTapeRead(state->tapeset, tapenum,
|
||||
if (LogicalTapeRead(tape,
|
||||
&len, sizeof(len)) != sizeof(len))
|
||||
elog(ERROR, "unexpected end of tape");
|
||||
if (len == 0 && !eofOK)
|
||||
|
@ -3710,11 +3714,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
|
|||
}
|
||||
|
||||
static void
|
||||
markrunend(Tuplesortstate *state, int tapenum)
|
||||
markrunend(LogicalTape *tape)
|
||||
{
|
||||
unsigned int len = 0;
|
||||
|
||||
LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
|
||||
LogicalTapeWrite(tape, (void *) &len, sizeof(len));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -3892,7 +3896,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|||
}
|
||||
|
||||
static void
|
||||
writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
||||
writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
|
||||
{
|
||||
MinimalTuple tuple = (MinimalTuple) stup->tuple;
|
||||
|
||||
|
@ -3903,13 +3907,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|||
/* total on-disk footprint: */
|
||||
unsigned int tuplen = tupbodylen + sizeof(int);
|
||||
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) tupbody, tupbodylen);
|
||||
LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
|
||||
|
||||
if (!state->slabAllocatorUsed)
|
||||
{
|
||||
|
@ -3920,7 +3921,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|||
|
||||
static void
|
||||
readtup_heap(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len)
|
||||
LogicalTape *tape, unsigned int len)
|
||||
{
|
||||
unsigned int tupbodylen = len - sizeof(int);
|
||||
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
|
||||
|
@ -3930,11 +3931,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
|
|||
|
||||
/* read in the tuple proper */
|
||||
tuple->t_len = tuplen;
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
tupbody, tupbodylen);
|
||||
LogicalTapeReadExact(tape, tupbody, tupbodylen);
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
&tuplen, sizeof(tuplen));
|
||||
LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
|
||||
stup->tuple = (void *) tuple;
|
||||
/* set up first-column key value */
|
||||
htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
|
||||
|
@ -4135,21 +4134,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|||
}
|
||||
|
||||
static void
|
||||
writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
||||
writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
|
||||
{
|
||||
HeapTuple tuple = (HeapTuple) stup->tuple;
|
||||
unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
|
||||
|
||||
/* We need to store t_self, but not other fields of HeapTupleData */
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
&tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
&tuple->t_self, sizeof(ItemPointerData));
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
tuple->t_data, tuple->t_len);
|
||||
LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
|
||||
LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
&tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
|
||||
|
||||
if (!state->slabAllocatorUsed)
|
||||
{
|
||||
|
@ -4160,7 +4155,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|||
|
||||
static void
|
||||
readtup_cluster(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int tuplen)
|
||||
LogicalTape *tape, unsigned int tuplen)
|
||||
{
|
||||
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
|
||||
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
|
||||
|
@ -4169,16 +4164,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
|
|||
/* Reconstruct the HeapTupleData header */
|
||||
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
|
||||
tuple->t_len = t_len;
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
&tuple->t_self, sizeof(ItemPointerData));
|
||||
LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
|
||||
/* We don't currently bother to reconstruct t_tableOid */
|
||||
tuple->t_tableOid = InvalidOid;
|
||||
/* Read in the tuple body */
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
tuple->t_data, tuple->t_len);
|
||||
LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
&tuplen, sizeof(tuplen));
|
||||
LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
|
||||
stup->tuple = (void *) tuple;
|
||||
/* set up first-column key value, if it's a simple column */
|
||||
if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
|
||||
|
@ -4392,19 +4384,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|||
}
|
||||
|
||||
static void
|
||||
writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
||||
writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
|
||||
{
|
||||
IndexTuple tuple = (IndexTuple) stup->tuple;
|
||||
unsigned int tuplen;
|
||||
|
||||
tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) tuple, IndexTupleSize(tuple));
|
||||
LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) &tuplen, sizeof(tuplen));
|
||||
LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
|
||||
|
||||
if (!state->slabAllocatorUsed)
|
||||
{
|
||||
|
@ -4415,16 +4404,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|||
|
||||
static void
|
||||
readtup_index(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len)
|
||||
LogicalTape *tape, unsigned int len)
|
||||
{
|
||||
unsigned int tuplen = len - sizeof(unsigned int);
|
||||
IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
|
||||
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
tuple, tuplen);
|
||||
LogicalTapeReadExact(tape, tuple, tuplen);
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
&tuplen, sizeof(tuplen));
|
||||
LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
|
||||
stup->tuple = (void *) tuple;
|
||||
/* set up first-column key value */
|
||||
stup->datum1 = index_getattr(tuple,
|
||||
|
@ -4466,7 +4453,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|||
}
|
||||
|
||||
static void
|
||||
writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
||||
writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
|
||||
{
|
||||
void *waddr;
|
||||
unsigned int tuplen;
|
||||
|
@ -4491,13 +4478,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|||
|
||||
writtenlen = tuplen + sizeof(unsigned int);
|
||||
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) &writtenlen, sizeof(writtenlen));
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
waddr, tuplen);
|
||||
LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
|
||||
LogicalTapeWrite(tape, waddr, tuplen);
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeWrite(state->tapeset, tapenum,
|
||||
(void *) &writtenlen, sizeof(writtenlen));
|
||||
LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
|
||||
|
||||
if (!state->slabAllocatorUsed && stup->tuple)
|
||||
{
|
||||
|
@ -4508,7 +4492,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|||
|
||||
static void
|
||||
readtup_datum(Tuplesortstate *state, SortTuple *stup,
|
||||
int tapenum, unsigned int len)
|
||||
LogicalTape *tape, unsigned int len)
|
||||
{
|
||||
unsigned int tuplen = len - sizeof(unsigned int);
|
||||
|
||||
|
@ -4522,8 +4506,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
|
|||
else if (!state->tuples)
|
||||
{
|
||||
Assert(tuplen == sizeof(Datum));
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
&stup->datum1, tuplen);
|
||||
LogicalTapeReadExact(tape, &stup->datum1, tuplen);
|
||||
stup->isnull1 = false;
|
||||
stup->tuple = NULL;
|
||||
}
|
||||
|
@ -4531,16 +4514,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
|
|||
{
|
||||
void *raddr = readtup_alloc(state, tuplen);
|
||||
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
raddr, tuplen);
|
||||
LogicalTapeReadExact(tape, raddr, tuplen);
|
||||
stup->datum1 = PointerGetDatum(raddr);
|
||||
stup->isnull1 = false;
|
||||
stup->tuple = raddr;
|
||||
}
|
||||
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
LogicalTapeReadExact(state->tapeset, tapenum,
|
||||
&tuplen, sizeof(tuplen));
|
||||
LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -4652,7 +4633,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
|
|||
TapeShare output;
|
||||
|
||||
Assert(WORKER(state));
|
||||
Assert(state->result_tape != -1);
|
||||
Assert(state->result_tape != NULL);
|
||||
Assert(state->memtupcount == 0);
|
||||
|
||||
/*
|
||||
|
@ -4668,7 +4649,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
|
|||
* Parallel worker requires result tape metadata, which is to be stored in
|
||||
* shared memory for leader
|
||||
*/
|
||||
LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
|
||||
LogicalTapeFreeze(state->result_tape, &output);
|
||||
|
||||
/* Store properties of output tape, and update finished worker count */
|
||||
SpinLockAcquire(&shared->mutex);
|
||||
|
@ -4687,9 +4668,9 @@ static void
|
|||
worker_nomergeruns(Tuplesortstate *state)
|
||||
{
|
||||
Assert(WORKER(state));
|
||||
Assert(state->result_tape == -1);
|
||||
Assert(state->result_tape == NULL);
|
||||
|
||||
state->result_tape = state->tp_tapenum[state->destTape];
|
||||
state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
|
||||
worker_freeze_result_tape(state);
|
||||
}
|
||||
|
||||
|
@ -4733,9 +4714,13 @@ leader_takeover_tapes(Tuplesortstate *state)
|
|||
* randomAccess is disallowed for parallel sorts.
|
||||
*/
|
||||
inittapestate(state, nParticipants + 1);
|
||||
state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
|
||||
shared->tapes, &shared->fileset,
|
||||
state->tapeset = LogicalTapeSetCreate(false,
|
||||
&shared->fileset,
|
||||
state->worker);
|
||||
state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
|
||||
for (j = 0; j < nParticipants; j++)
|
||||
state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
|
||||
/* tapes[nParticipants] represents the "leader tape", which is not used */
|
||||
|
||||
/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
|
||||
state->currentRun = nParticipants;
|
||||
|
|
|
@ -41,6 +41,7 @@ struct ExprContext;
|
|||
struct RangeTblEntry; /* avoid including parsenodes.h here */
|
||||
struct ExprEvalStep; /* avoid including execExpr.h everywhere */
|
||||
struct CopyMultiInsertBuffer;
|
||||
struct LogicalTapeSet;
|
||||
|
||||
|
||||
/* ----------------
|
||||
|
@ -2316,7 +2317,7 @@ typedef struct AggState
|
|||
bool table_filled; /* hash table filled yet? */
|
||||
int num_hashes;
|
||||
MemoryContext hash_metacxt; /* memory for hash table itself */
|
||||
struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
|
||||
struct LogicalTapeSet *hash_tapeset; /* tape set for hash spill tapes */
|
||||
struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
|
||||
* exists only during first pass */
|
||||
TupleTableSlot *hash_spill_rslot; /* for reading spill files */
|
||||
|
|
|
@ -18,9 +18,13 @@
|
|||
|
||||
#include "storage/sharedfileset.h"
|
||||
|
||||
/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
|
||||
|
||||
/*
|
||||
* LogicalTapeSet and LogicalTape are opaque types whose details are not
|
||||
* known outside logtape.c.
|
||||
*/
|
||||
typedef struct LogicalTapeSet LogicalTapeSet;
|
||||
typedef struct LogicalTape LogicalTape;
|
||||
|
||||
|
||||
/*
|
||||
* The approach tuplesort.c takes to parallel external sorts is that workers,
|
||||
|
@ -54,27 +58,20 @@ typedef struct TapeShare
|
|||
* prototypes for functions in logtape.c
|
||||
*/
|
||||
|
||||
extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate,
|
||||
TapeShare *shared,
|
||||
extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate,
|
||||
SharedFileSet *fileset, int worker);
|
||||
extern void LogicalTapeClose(LogicalTape *lt);
|
||||
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
|
||||
extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
|
||||
extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared);
|
||||
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
|
||||
extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
|
||||
void *ptr, size_t size);
|
||||
extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
|
||||
void *ptr, size_t size);
|
||||
extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
|
||||
size_t buffer_size);
|
||||
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
|
||||
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
|
||||
TapeShare *share);
|
||||
extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional);
|
||||
extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
|
||||
size_t size);
|
||||
extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
|
||||
long blocknum, int offset);
|
||||
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
|
||||
long *blocknum, int *offset);
|
||||
extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
|
||||
extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
|
||||
extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
|
||||
extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share);
|
||||
extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
|
||||
extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
|
||||
extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
|
||||
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
|
||||
|
||||
#endif /* LOGTAPE_H */
|
||||
|
|
Loading…
Reference in New Issue