diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 6cc06b24e0..316301061d 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -141,14 +141,6 @@ typedef struct LogicalTape long curBlockNumber; /* this block's logical blk# within tape */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ - - /* - * Desired buffer size to use when reading. To keep things simple, we use - * a single-block buffer when writing, or when reading a frozen tape. But - * when we are reading and will only read forwards, we allocate a larger - * buffer, determined by read_buffer_size. - */ - int read_buffer_size; } LogicalTape; /* @@ -609,7 +601,6 @@ LogicalTapeSetCreate(int ntapes) lt->lastBlockBytes = 0; lt->buffer = NULL; lt->buffer_size = 0; - lt->read_buffer_size = BLCKSZ; lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; @@ -739,13 +730,20 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, } /* - * Rewind logical tape and switch from writing to reading or vice versa. + * Rewind logical tape and switch from writing to reading. * - * Unless the tape has been "frozen" in read state, forWrite must be the - * opposite of the previous tape state. + * The tape must currently be in writing state, or "frozen" in read state. + * + * 'buffer_size' specifies how much memory to use for the read buffer. It + * does not include the memory needed for the indirect blocks. Regardless + * of the argument, the actual amount of memory used is between BLCKSZ and + * MaxAllocSize, and is a multiple of BLCKSZ. The given value is rounded + * down and truncated to fit those constraints, if necessary. If the tape + * is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ byte + * buffer is used. */ void -LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) +LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) { LogicalTape *lt; long datablocknum; @@ -753,88 +751,112 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; - if (!forWrite) + /* + * Round and cap buffer_size if needed. + */ + if (lt->frozen) + buffer_size = BLCKSZ; + else { - if (lt->writing) - { - /* - * Completion of a write phase. Flush last partial data block, - * flush any partial indirect blocks, rewind for normal - * (destructive) read. - */ - if (lt->dirty) - ltsDumpBuffer(lts, lt); - lt->lastBlockBytes = lt->nbytes; - lt->writing = false; - datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); - } - else - { - /* - * This is only OK if tape is frozen; we rewind for (another) read - * pass. - */ - Assert(lt->frozen); - datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); - } + /* need at least one block */ + if (buffer_size < BLCKSZ) + buffer_size = BLCKSZ; - /* Allocate a read buffer (unless the tape is empty) */ - if (lt->buffer) - pfree(lt->buffer); - lt->buffer = NULL; - lt->buffer_size = 0; - if (datablocknum != -1L) - { - lt->buffer = palloc(lt->read_buffer_size); - lt->buffer_size = lt->read_buffer_size; - } + /* + * palloc() larger than MaxAllocSize would fail (a multi-gigabyte + * buffer is unlikely to be helpful, anyway) + */ + if (buffer_size > MaxAllocSize) + buffer_size = MaxAllocSize; - /* Read the first block, or reset if tape is empty */ - lt->curBlockNumber = 0L; - lt->pos = 0; - lt->nbytes = 0; - if (datablocknum != -1L) - ltsReadFillBuffer(lts, lt, datablocknum); + /* round down to BLCKSZ boundary */ + buffer_size -= buffer_size % BLCKSZ; + } + + if (lt->writing) + { + /* + * Completion of a write phase. Flush last partial data block, flush + * any partial indirect blocks, rewind for normal (destructive) read. + */ + if (lt->dirty) + ltsDumpBuffer(lts, lt); + lt->lastBlockBytes = lt->nbytes; + lt->writing = false; + datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); } else { /* - * Completion of a read phase. Rewind and prepare for write. - * - * NOTE: we assume the caller has read the tape to the end; otherwise - * untouched data and indirect blocks will not have been freed. We - * could add more code to free any unread blocks, but in current usage - * of this module it'd be useless code. + * This is only OK if tape is frozen; we rewind for (another) read + * pass. */ - IndirectBlock *ib, - *nextib; + Assert(lt->frozen); + datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); + } - Assert(!lt->writing && !lt->frozen); - /* Must truncate the indirect-block hierarchy down to one level. */ - if (lt->indirect) - { - for (ib = lt->indirect->nextup; ib != NULL; ib = nextib) - { - nextib = ib->nextup; - pfree(ib); - } - lt->indirect->nextSlot = 0; - lt->indirect->nextup = NULL; - } - lt->writing = true; - lt->dirty = false; - lt->numFullBlocks = 0L; - lt->lastBlockBytes = 0; - lt->curBlockNumber = 0L; - lt->pos = 0; - lt->nbytes = 0; + /* Allocate a read buffer (unless the tape is empty) */ + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = NULL; + lt->buffer_size = 0; + if (datablocknum != -1L) + { + lt->buffer = palloc(buffer_size); + lt->buffer_size = buffer_size; + } - if (lt->buffer) + /* Read the first block, or reset if tape is empty */ + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + if (datablocknum != -1L) + ltsReadFillBuffer(lts, lt, datablocknum); +} + +/* + * Rewind logical tape and switch from reading to writing. + * + * NOTE: we assume the caller has read the tape to the end; otherwise + * untouched data and indirect blocks will not have been freed. We + * could add more code to free any unread blocks, but in current usage + * of this module it'd be useless code. + */ +void +LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum) +{ + LogicalTape *lt; + IndirectBlock *ib, + *nextib; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = <s->tapes[tapenum]; + + Assert(!lt->writing && !lt->frozen); + /* Must truncate the indirect-block hierarchy down to one level. */ + if (lt->indirect) + { + for (ib = lt->indirect->nextup; ib != NULL; ib = nextib) { - pfree(lt->buffer); - lt->buffer = NULL; - lt->buffer_size = 0; + nextib = ib->nextup; + pfree(ib); } + lt->indirect->nextSlot = 0; + lt->indirect->nextup = NULL; + } + lt->writing = true; + lt->dirty = false; + lt->numFullBlocks = 0L; + lt->lastBlockBytes = 0; + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + + if (lt->buffer) + { + pfree(lt->buffer); + lt->buffer = NULL; + lt->buffer_size = 0; } } @@ -1105,28 +1127,3 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nFileBlocks; } - -/* - * Set buffer size to use, when reading from given tape. - */ -void -LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem) -{ - LogicalTape *lt; - - Assert(tapenum >= 0 && tapenum < lts->nTapes); - lt = <s->tapes[tapenum]; - - /* - * The buffer size must be a multiple of BLCKSZ in size, so round the - * given value down to nearest BLCKSZ. Make sure we have at least one - * page. Also, don't go above MaxAllocSize, to avoid erroring out. A - * multi-gigabyte buffer is unlikely to be helpful, anyway. - */ - if (avail_mem < BLCKSZ) - avail_mem = BLCKSZ; - if (avail_mem > MaxAllocSize) - avail_mem = MaxAllocSize; - avail_mem -= avail_mem % BLCKSZ; - lt->read_buffer_size = avail_mem; -} diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index dd83e7a8f2..f6eb30c2ce 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -366,6 +366,9 @@ struct Tuplesortstate char *slabMemoryEnd; /* end of slab memory arena */ SlabSlot *slabFreeHead; /* head of free list */ + /* Buffer size to use for reading input tapes, during merge. */ + size_t read_buffer_size; + /* * When we return a tuple to the caller in tuplesort_gettuple_XXX, that * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE @@ -579,7 +582,6 @@ static bool useselection(Tuplesortstate *state); static void inittapes(Tuplesortstate *state); static void selectnewtape(Tuplesortstate *state); static void init_slab_allocator(Tuplesortstate *state, int numSlots); -static void init_tape_buffers(Tuplesortstate *state, int numInputTapes); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); @@ -2056,7 +2058,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * end of the sort anyway, but better to release the * memory early. */ - LogicalTapeRewind(state->tapeset, srcTape, true); + LogicalTapeRewindForWrite(state->tapeset, srcTape); return true; } newtup.tupindex = srcTape; @@ -2511,72 +2513,6 @@ init_slab_allocator(Tuplesortstate *state, int numSlots) state->slabAllocatorUsed = true; } -/* - * Divide all remaining work memory (availMem) as read buffers, for all - * the tapes that will be used during the merge. - * - * We use the number of possible *input* tapes here, rather than maxTapes, - * for the calculation. At all times, we'll be reading from at most - * numInputTapes tapes, and one tape is used for output (unless we do an - * on-the-fly final merge, in which case we don't have an output tape). - */ -static void -init_tape_buffers(Tuplesortstate *state, int numInputTapes) -{ - int64 availBlocks; - int64 blocksPerTape; - int remainder; - int tapenum; - - /* - * Divide availMem evenly among the number of input tapes. - */ - availBlocks = state->availMem / BLCKSZ; - blocksPerTape = availBlocks / numInputTapes; - remainder = availBlocks % numInputTapes; - USEMEM(state, availBlocks * BLCKSZ); - -#ifdef TRACE_SORT - if (trace_sort) - elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", - (availBlocks * BLCKSZ) / 1024, numInputTapes); -#endif - - /* - * Use one page per tape, even if we are out of memory. - * tuplesort_merge_order() should've chosen the number of tapes so that - * this can't happen, but better safe than sorry. (This also protects - * from a negative availMem.) - */ - if (blocksPerTape < 1) - { - blocksPerTape = 1; - remainder = 0; - } - - /* - * Set the buffers for the tapes. - * - * In a multi-phase merge, the tape that is initially used as an output - * tape, will later be rewound and read from, and should also use a large - * buffer at that point. So we must loop up to maxTapes, not just - * numInputTapes! - * - * If there are fewer runs than tapes, we will set the buffer size also - * for tapes that will go completely unused, but that's harmless. - * LogicalTapeAssignReadBufferSize() doesn't allocate the buffer - * immediately, it just sets the size that will be used, when the tape is - * rewound for read, and the tape isn't empty. - */ - for (tapenum = 0; tapenum < state->maxTapes; tapenum++) - { - int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0); - - LogicalTapeAssignReadBufferSize(state->tapeset, tapenum, - numBlocks * BLCKSZ); - } -} - /* * mergeruns -- merge all the completed initial runs. * @@ -2679,25 +2615,32 @@ mergeruns(Tuplesortstate *state) } /* - * Use all the spare memory we have available for read buffers for the - * tapes. + * Use all the spare memory we have available for read buffers among the + * input tapes. * * We do this only after checking for the case that we produced only one * initial run, because there is no need to use a large read buffer when * we're reading from a single tape. With one tape, the I/O pattern will * be the same regardless of the buffer size. * - * We don't try to "rebalance" the amount of memory among tapes, when we - * start a new merge phase, even if some tapes can be inactive in the - * phase. That would be hard, because logtape.c doesn't know where one - * run ends and another begins. When a new merge phase begins, and a tape - * doesn't participate in it, its buffer nevertheless already contains - * tuples from the next run on same tape, so we cannot release the buffer. - * That's OK in practice, merge performance isn't that sensitive to the - * amount of buffers used, and most merge phases use all or almost all - * tapes, anyway. + * We don't try to "rebalance" the memory among tapes, when we start a new + * merge phase, even if some tapes are inactive in the new phase. That + * would be hard, because logtape.c doesn't know where one run ends and + * another begins. When a new merge phase begins, and a tape doesn't + * participate in it, its buffer nevertheless already contains tuples from + * the next run on same tape, so we cannot release the buffer. That's OK + * in practice, merge performance isn't that sensitive to the amount of + * buffers used, and most merge phases use all or almost all tapes, + * anyway. */ - init_tape_buffers(state, numInputTapes); +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", + (state->availMem) / 1024, numInputTapes); +#endif + + state->read_buffer_size = state->availMem / numInputTapes; + USEMEM(state, state->availMem); /* * Allocate a new 'memtuples' array, for the heap. It will hold one tuple @@ -2709,7 +2652,7 @@ mergeruns(Tuplesortstate *state) /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - LogicalTapeRewind(state->tapeset, tapenum, false); + LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); for (;;) { @@ -2772,11 +2715,10 @@ mergeruns(Tuplesortstate *state) if (--state->Level == 0) break; /* rewind output tape T to use as new input */ - LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange], - false); + LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], + state->read_buffer_size); /* rewind used-up input tape P, and prepare it for write pass */ - LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1], - true); + LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); state->tp_runs[state->tapeRange - 1] = 0; /* @@ -2812,7 +2754,7 @@ mergeruns(Tuplesortstate *state) for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { if (tapenum != state->result_tape) - LogicalTapeRewind(state->tapeset, tapenum, true); + LogicalTapeRewindForWrite(state->tapeset, tapenum); } } @@ -3174,9 +3116,9 @@ tuplesort_rescan(Tuplesortstate *state) state->markpos_eof = false; break; case TSS_SORTEDONTAPE: - LogicalTapeRewind(state->tapeset, - state->result_tape, - false); + LogicalTapeRewindForRead(state->tapeset, + state->result_tape, + 0); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 362a6196dc..d7dccb85be 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -31,7 +31,9 @@ extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size); extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size); -extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite); +extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, + size_t buffer_size); +extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum); extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); @@ -39,8 +41,6 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset); extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); -extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, - size_t bufsize); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); #endif /* LOGTAPE_H */