diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 774520752f..caa6960b95 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -52,12 +52,17 @@ * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO * policy for free blocks would be better?) * + * To further make the I/Os more sequential, we can use a larger buffer + * when reading, and read multiple blocks from the same tape in one go, + * whenever the buffer becomes empty. LogicalTapeAssignReadBufferSize() + * can be used to set the size of the read buffer. + * * To support the above policy of writing to the lowest free block, * ltsGetFreeBlock sorts the list of free block numbers into decreasing * order each time it is asked for a block and the list isn't currently * sorted. This is an efficient way to handle it because we expect cycles * of releasing many blocks followed by re-using many blocks, due to - * tuplesort.c's "preread" behavior. + * the larger read buffer. * * Since all the bookkeeping and buffer memory is allocated with palloc(), * and the underlying file(s) are made with OpenTemporaryFile, all resources @@ -79,6 +84,7 @@ #include "storage/buffile.h" #include "utils/logtape.h" +#include "utils/memutils.h" /* * Block indexes are "long"s, so we can fit this many per indirect block. @@ -131,9 +137,18 @@ typedef struct LogicalTape * reading. */ char *buffer; /* physical buffer (separately palloc'd) */ + int buffer_size; /* allocated size of the buffer */ long curBlockNumber; /* this block's logical blk# within tape */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ + + /* + * Desired buffer size to use when reading. To keep things simple, we use + * a single-block buffer when writing, or when reading a frozen tape. But + * when we are reading and will only read forwards, we allocate a larger + * buffer, determined by read_buffer_size. + */ + int read_buffer_size; } LogicalTape; /* @@ -227,6 +242,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer) blocknum))); } +/* + * Read as many blocks as we can into the per-tape buffer. + * + * The caller can specify the next physical block number to read, in + * datablocknum, or -1 to fetch the next block number from the internal block. + * If datablocknum == -1, the caller must've already set curBlockNumber. + * + * Returns true if anything was read, 'false' on EOF. + */ +static bool +ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum) +{ + lt->pos = 0; + lt->nbytes = 0; + + do + { + /* Fetch next block number (unless provided by caller) */ + if (datablocknum == -1) + { + datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen); + if (datablocknum == -1L) + break; /* EOF */ + lt->curBlockNumber++; + } + + /* Read the block */ + ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes)); + if (!lt->frozen) + ltsReleaseBlock(lts, datablocknum); + + if (lt->curBlockNumber < lt->numFullBlocks) + lt->nbytes += BLCKSZ; + else + { + /* EOF */ + lt->nbytes += lt->lastBlockBytes; + break; + } + + /* Advance to next block, if we have buffer space left */ + datablocknum = -1; + } while (lt->nbytes < lt->buffer_size); + + return (lt->nbytes > 0); +} + /* * qsort comparator for sorting freeBlocks[] into decreasing order. */ @@ -546,6 +608,8 @@ LogicalTapeSetCreate(int ntapes) lt->numFullBlocks = 0L; lt->lastBlockBytes = 0; lt->buffer = NULL; + lt->buffer_size = 0; + lt->read_buffer_size = BLCKSZ; lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; @@ -628,7 +692,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, /* Allocate data buffer and first indirect block on first write */ if (lt->buffer == NULL) + { lt->buffer = (char *) palloc(BLCKSZ); + lt->buffer_size = BLCKSZ; + } if (lt->indirect == NULL) { lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock)); @@ -636,6 +703,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, lt->indirect->nextup = NULL; } + Assert(lt->buffer_size == BLCKSZ); while (size > 0) { if (lt->pos >= BLCKSZ) @@ -709,18 +777,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) Assert(lt->frozen); datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); } + + /* Allocate a read buffer */ + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = palloc(lt->read_buffer_size); + lt->buffer_size = lt->read_buffer_size; + /* Read the first block, or reset if tape is empty */ lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; if (datablocknum != -1L) - { - ltsReadBlock(lts, datablocknum, (void *) lt->buffer); - if (!lt->frozen) - ltsReleaseBlock(lts, datablocknum); - lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? - BLCKSZ : lt->lastBlockBytes; - } + ltsReadFillBuffer(lts, lt, datablocknum); } else { @@ -754,6 +823,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + if (lt->buffer) + { + pfree(lt->buffer); + lt->buffer = NULL; + lt->buffer_size = 0; + } } } @@ -779,20 +855,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ - long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, - lt->frozen); - - if (datablocknum == -1L) + if (!ltsReadFillBuffer(lts, lt, -1)) break; /* EOF */ - lt->curBlockNumber++; - lt->pos = 0; - ltsReadBlock(lts, datablocknum, (void *) lt->buffer); - if (!lt->frozen) - ltsReleaseBlock(lts, datablocknum); - lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? - BLCKSZ : lt->lastBlockBytes; - if (lt->nbytes <= 0) - break; /* EOF (possible here?) */ } nthistime = lt->nbytes - lt->pos; @@ -842,6 +906,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum) lt->writing = false; lt->frozen = true; datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true); + + /* + * The seek and backspace functions assume a single block read buffer. + * That's OK with current usage. A larger buffer is helpful to make the + * read pattern of the backing file look more sequential to the OS, when + * we're reading from multiple tapes. But at the end of a sort, when a + * tape is frozen, we only read from a single tape anyway. + */ + if (!lt->buffer || lt->buffer_size != BLCKSZ) + { + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = palloc(BLCKSZ); + lt->buffer_size = BLCKSZ; + } + /* Read the first block, or reset if tape is empty */ lt->curBlockNumber = 0L; lt->pos = 0; @@ -875,6 +955,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->frozen); + Assert(lt->buffer_size == BLCKSZ); /* * Easy case for seek within current block. @@ -941,6 +1022,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, lt = <s->tapes[tapenum]; Assert(lt->frozen); Assert(offset >= 0 && offset <= BLCKSZ); + Assert(lt->buffer_size == BLCKSZ); /* * Easy case for seek within current block. @@ -1002,6 +1084,10 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum, Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; + + /* With a larger buffer, 'pos' wouldn't be the same as offset within page */ + Assert(lt->buffer_size == BLCKSZ); + *blocknum = lt->curBlockNumber; *offset = lt->pos; } @@ -1014,3 +1100,28 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nFileBlocks; } + +/* + * Set buffer size to use, when reading from given tape. + */ +void +LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem) +{ + LogicalTape *lt; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = <s->tapes[tapenum]; + + /* + * The buffer size must be a multiple of BLCKSZ in size, so round the + * given value down to nearest BLCKSZ. Make sure we have at least one + * page. Also, don't go above MaxAllocSize, to avoid erroring out. A + * multi-gigabyte buffer is unlikely to be helpful, anyway. + */ + if (avail_mem < BLCKSZ) + avail_mem = BLCKSZ; + if (avail_mem > MaxAllocSize) + avail_mem = MaxAllocSize; + avail_mem -= avail_mem % BLCKSZ; + lt->read_buffer_size = avail_mem; +} diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 16ceb30b27..20cfb0b139 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -74,7 +74,7 @@ * the merge is complete. The basic merge algorithm thus needs very little * memory --- only M tuples for an M-way merge, and M is constrained to a * small number. However, we can still make good use of our full workMem - * allocation by pre-reading additional tuples from each source tape. Without + * allocation by pre-reading additional blocks from each source tape. Without * prereading, our access pattern to the temporary file would be very erratic; * on average we'd read one block from each of M source tapes during the same * time that we're writing M blocks to the output tape, so there is no @@ -84,10 +84,10 @@ * worse when it comes time to read that tape. A straightforward merge pass * thus ends up doing a lot of waiting for disk seeks. We can improve matters * by prereading from each source tape sequentially, loading about workMem/M - * bytes from each tape in turn. Then we run the merge algorithm, writing but - * not reading until one of the preloaded tuple series runs out. Then we - * switch back to preread mode, fill memory again, and repeat. This approach - * helps to localize both read and write accesses. + * bytes from each tape in turn, and making the sequential blocks immediately + * available for reuse. This approach helps to localize both read and write + * accesses. The pre-reading is handled by logtape.c, we just tell it how + * much memory to use for the buffers. * * When the caller requests random access to the sort result, we form * the final sorted run on a logical tape which is then "frozen", so @@ -162,9 +162,9 @@ bool optimize_bounded_sort = true; * The objects we actually sort are SortTuple structs. These contain * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), * which is a separate palloc chunk --- we assume it is just one chunk and - * can be freed by a simple pfree() (except during final on-the-fly merge, - * when memory is used in batch). SortTuples also contain the tuple's - * first key column in Datum/nullflag format, and an index integer. + * can be freed by a simple pfree() (except during merge, when we use a + * simple slab allocator). SortTuples also contain the tuple's first key + * column in Datum/nullflag format, and an index integer. * * Storing the first key column lets us save heap_getattr or index_getattr * calls during tuple comparisons. We could extract and save all the key @@ -191,9 +191,8 @@ bool optimize_bounded_sort = true; * it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement * selection is always abandoned after the first run; no other run number * should be represented here. During merge passes, we re-use it to hold the - * input tape number that each tuple in the heap was read from, or to hold the - * index of the next tuple pre-read from the same tape in the case of pre-read - * entries. tupindex goes unused if the sort occurs entirely in memory. + * input tape number that each tuple in the heap was read from. tupindex goes + * unused if the sort occurs entirely in memory. */ typedef struct { @@ -203,6 +202,24 @@ typedef struct int tupindex; /* see notes above */ } SortTuple; +/* + * During merge, we use a pre-allocated set of fixed-size slots to hold + * tuples. To avoid palloc/pfree overhead. + * + * Merge doesn't require a lot of memory, so we can afford to waste some, + * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the + * palloc() overhead is not significant anymore. + * + * 'nextfree' is valid when this chunk is in the free list. When in use, the + * slot holds a tuple. + */ +#define SLAB_SLOT_SIZE 1024 + +typedef union SlabSlot +{ + union SlabSlot *nextfree; + char buffer[SLAB_SLOT_SIZE]; +} SlabSlot; /* * Possible states of a Tuplesort object. These denote the states that @@ -288,41 +305,28 @@ struct Tuplesortstate /* * Function to write a stored tuple onto tape. The representation of the * tuple on tape need not be the same as it is in memory; requirements on - * the tape representation are given below. After writing the tuple, - * pfree() the out-of-line data (not the SortTuple struct!), and increase - * state->availMem by the amount of memory space thereby released. + * the tape representation are given below. Unless the slab allocator is + * used, after writing the tuple, pfree() the out-of-line data (not the + * SortTuple struct!), and increase state->availMem by the amount of + * memory space thereby released. */ void (*writetup) (Tuplesortstate *state, int tapenum, SortTuple *stup); /* * Function to read a stored tuple from tape back into memory. 'len' is - * the already-read length of the stored tuple. Create a palloc'd copy, - * initialize tuple/datum1/isnull1 in the target SortTuple struct, and - * decrease state->availMem by the amount of memory space consumed. (See - * batchUsed notes for details on how memory is handled when incremental - * accounting is abandoned.) + * the already-read length of the stored tuple. The tuple is allocated + * from the slab memory arena, or is palloc'd, see readtup_alloc(). */ void (*readtup) (Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); - /* - * Function to move a caller tuple. This is usually implemented as a - * memmove() shim, but function may also perform additional fix-up of - * caller tuple where needed. Batch memory support requires the movement - * of caller tuples from one location in memory to another. - */ - void (*movetup) (void *dest, void *src, unsigned int len); - /* * This array holds the tuples now in sort memory. If we are in state * INITIAL, the tuples are in no particular order; if we are in state * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS * and FINALMERGE, the tuples are organized in "heap" order per Algorithm - * H. (Note that memtupcount only counts the tuples that are part of the - * heap --- during merge passes, memtuples[] entries beyond tapeRange are - * never in the heap and are used to hold pre-read tuples.) In state - * SORTEDONTAPE, the array is not used. + * H. In state SORTEDONTAPE, the array is not used. */ SortTuple *memtuples; /* array of SortTuple structs */ int memtupcount; /* number of tuples currently present */ @@ -330,13 +334,45 @@ struct Tuplesortstate bool growmemtuples; /* memtuples' growth still underway? */ /* - * Memory for tuples is sometimes allocated in batch, rather than - * incrementally. This implies that incremental memory accounting has - * been abandoned. Currently, this only happens for the final on-the-fly - * merge step. Large batch allocations can store tuples (e.g. - * IndexTuples) without palloc() fragmentation and other overhead. + * Memory for tuples is sometimes allocated using a simple slab allocator, + * rather than with palloc(). Currently, we switch to slab allocation + * when we start merging. Merging only needs to keep a small, fixed + * number of tuples in memory at any time, so we can avoid the + * palloc/pfree overhead by recycling a fixed number of fixed-size slots + * to hold the tuples. + * + * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE + * slots. The allocation is sized to have one slot per tape, plus one + * additional slot. We need that many slots to hold all the tuples kept + * in the heap during merge, plus the one we have last returned from the + * sort, with tuplesort_gettuple. + * + * Initially, all the slots are kept in a linked list of free slots. When + * a tuple is read from a tape, it is put to the next available slot, if + * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd + * instead. + * + * When we're done processing a tuple, we return the slot back to the free + * list, or pfree() if it was palloc'd. We know that a tuple was + * allocated from the slab, if its pointer value is between + * slabMemoryBegin and -End. + * + * When the slab allocator is used, the USEMEM/LACKMEM mechanism of + * tracking memory usage is not used. */ - bool batchUsed; + bool slabAllocatorUsed; + + char *slabMemoryBegin; /* beginning of slab memory arena */ + char *slabMemoryEnd; /* end of slab memory arena */ + SlabSlot *slabFreeHead; /* head of free list */ + + /* + * When we return a tuple to the caller in tuplesort_gettuple_XXX, that + * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE + * modes), we remember the tuple in 'lastReturnedTuple', so that we can + * recycle the memory on next gettuple call. + */ + void *lastReturnedTuple; /* * While building initial runs, this indicates if the replacement @@ -358,42 +394,11 @@ struct Tuplesortstate */ /* - * These variables are only used during merge passes. mergeactive[i] is - * true if we are reading an input run from (actual) tape number i and - * have not yet exhausted that run. mergenext[i] is the memtuples index - * of the next pre-read tuple (next to be loaded into the heap) for tape - * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly - * points to the last pre-read tuple from each tape. mergeavailslots[i] - * is the number of unused memtuples[] slots reserved for tape i, and - * mergeavailmem[i] is the amount of unused space allocated for tape i. - * mergefreelist and mergefirstfree keep track of unused locations in the - * memtuples[] array. The memtuples[].tupindex fields link together - * pre-read tuples for each tape as well as recycled locations in - * mergefreelist. It is OK to use 0 as a null link in these lists, because - * memtuples[0] is part of the merge heap and is never a pre-read tuple. + * This variable is only used during merge passes. mergeactive[i] is true + * if we are reading an input run from (actual) tape number i and have not + * yet exhausted that run. */ bool *mergeactive; /* active input run source? */ - int *mergenext; /* first preread tuple for each source */ - int *mergelast; /* last preread tuple for each source */ - int *mergeavailslots; /* slots left for prereading each tape */ - int64 *mergeavailmem; /* availMem for prereading each tape */ - int mergefreelist; /* head of freelist of recycled slots */ - int mergefirstfree; /* first slot never used in this merge */ - - /* - * Per-tape batch state, when final on-the-fly merge consumes memory from - * just a few large allocations. - * - * Aside from the general benefits of performing fewer individual retail - * palloc() calls, this also helps make merging more cache efficient, - * since each tape's tuples must naturally be accessed sequentially (in - * sorted order). - */ - int64 spacePerTape; /* Space (memory) for tuples (not slots) */ - char **mergetuples; /* Each tape's memory allocation */ - char **mergecurrent; /* Current offset into each tape's memory */ - char **mergetail; /* Last item's start point for each tape */ - char **mergeoverflow; /* Retail palloc() "overflow" for each tape */ /* * Variables for Algorithm D. Note that destTape is a "logical" tape @@ -481,12 +486,34 @@ struct Tuplesortstate #endif }; +/* + * Is the given tuple allocated from the slab memory arena? + */ +#define IS_SLAB_SLOT(state, tuple) \ + ((char *) (tuple) >= (state)->slabMemoryBegin && \ + (char *) (tuple) < (state)->slabMemoryEnd) + +/* + * Return the given tuple to the slab memory free list, or free it + * if it was palloc'd. + */ +#define RELEASE_SLAB_SLOT(state, tuple) \ + do { \ + SlabSlot *buf = (SlabSlot *) tuple; \ + \ + if (IS_SLAB_SLOT((state), buf)) \ + { \ + buf->nextfree = (state)->slabFreeHead; \ + (state)->slabFreeHead = buf; \ + } else \ + pfree(buf); \ + } while(0) + #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state)) #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) -#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len)) -#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed) +#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) @@ -551,18 +578,12 @@ static bool consider_abort_common(Tuplesortstate *state); static bool useselection(Tuplesortstate *state); static void inittapes(Tuplesortstate *state); static void selectnewtape(Tuplesortstate *state); +static void init_slab_allocator(Tuplesortstate *state, int numSlots); +static void init_tape_buffers(Tuplesortstate *state, int numInputTapes); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); -static void beginmerge(Tuplesortstate *state, bool finalMergeBatch); -static void batchmemtuples(Tuplesortstate *state); -static void mergebatch(Tuplesortstate *state, int64 spacePerTape); -static void mergebatchone(Tuplesortstate *state, int srcTape, - SortTuple *stup, bool *should_free); -static void mergebatchfreetape(Tuplesortstate *state, int srcTape, - SortTuple *rtup, bool *should_free); -static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen); -static void mergepreread(Tuplesortstate *state); -static void mergeprereadone(Tuplesortstate *state, int srcTape); +static void beginmerge(Tuplesortstate *state); +static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); static void dumptuples(Tuplesortstate *state, bool alltuples); static void dumpbatch(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); @@ -576,7 +597,7 @@ static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex); static void reversedirection(Tuplesortstate *state); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); -static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen); +static void *readtup_alloc(Tuplesortstate *state, Size tuplen); static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); @@ -584,7 +605,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void movetup_heap(void *dest, void *src, unsigned int len); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); @@ -592,7 +612,6 @@ static void writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void movetup_cluster(void *dest, void *src, unsigned int len); static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, @@ -602,7 +621,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void movetup_index(void *dest, void *src, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); @@ -610,7 +628,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); -static void movetup_datum(void *dest, void *src, unsigned int len); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); /* @@ -705,7 +722,7 @@ tuplesort_begin_common(int workMem, bool randomAccess) ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1); state->growmemtuples = true; - state->batchUsed = false; + state->slabAllocatorUsed = false; state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); @@ -762,7 +779,6 @@ tuplesort_begin_heap(TupleDesc tupDesc, state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; - state->movetup = movetup_heap; state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ state->abbrevNext = 10; @@ -835,7 +851,6 @@ tuplesort_begin_cluster(TupleDesc tupDesc, state->copytup = copytup_cluster; state->writetup = writetup_cluster; state->readtup = readtup_cluster; - state->movetup = movetup_cluster; state->abbrevNext = 10; state->indexInfo = BuildIndexInfo(indexRel); @@ -927,7 +942,6 @@ tuplesort_begin_index_btree(Relation heapRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; - state->movetup = movetup_index; state->abbrevNext = 10; state->heapRel = heapRel; @@ -995,7 +1009,6 @@ tuplesort_begin_index_hash(Relation heapRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; - state->movetup = movetup_index; state->heapRel = heapRel; state->indexRel = indexRel; @@ -1038,7 +1051,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; - state->movetup = movetup_datum; state->abbrevNext = 10; state->datumType = datumType; @@ -1838,7 +1850,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, { case TSS_SORTEDINMEM: Assert(forward || state->randomAccess); - Assert(!state->batchUsed); + Assert(!state->slabAllocatorUsed); *should_free = false; if (forward) { @@ -1883,15 +1895,35 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, case TSS_SORTEDONTAPE: Assert(forward || state->randomAccess); - Assert(!state->batchUsed); - *should_free = true; + Assert(state->slabAllocatorUsed); + + /* + * The slot that held the tuple that we returned in previous + * gettuple call can now be reused. + */ + if (state->lastReturnedTuple) + { + RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); + state->lastReturnedTuple = NULL; + } + if (forward) { if (state->eof_reached) return false; + if ((tuplen = getlen(state, state->result_tape, true)) != 0) { READTUP(state, stup, state->result_tape, tuplen); + + /* + * Remember the tuple we return, so that we can recycle + * its memory on next call. (This can be NULL, in the + * !state->tuples case). + */ + state->lastReturnedTuple = stup->tuple; + + *should_free = false; return true; } else @@ -1965,74 +1997,70 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, tuplen)) elog(ERROR, "bogus tuple length in backward scan"); READTUP(state, stup, state->result_tape, tuplen); + + /* + * Remember the tuple we return, so that we can recycle its memory + * on next call. (This can be NULL, in the Datum case). + */ + state->lastReturnedTuple = stup->tuple; + + *should_free = false; return true; case TSS_FINALMERGE: Assert(forward); - Assert(state->batchUsed || !state->tuples); - /* For now, assume tuple is stored in tape's batch memory */ + /* We are managing memory ourselves, with the slab allocator. */ + Assert(state->slabAllocatorUsed); *should_free = false; + /* + * The slab slot holding the tuple that we returned in previous + * gettuple call can now be reused. + */ + if (state->lastReturnedTuple) + { + RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); + state->lastReturnedTuple = NULL; + } + /* * This code should match the inner loop of mergeonerun(). */ if (state->memtupcount > 0) { int srcTape = state->memtuples[0].tupindex; - int tupIndex; - SortTuple *newtup; + SortTuple newtup; + + *stup = state->memtuples[0]; /* - * Returned tuple is still counted in our memory space most of - * the time. See mergebatchone() for discussion of why caller - * may occasionally be required to free returned tuple, and - * how preread memory is managed with regard to edge cases - * more generally. + * Remember the tuple we return, so that we can recycle its + * memory on next call. (This can be NULL, in the Datum case). */ - *stup = state->memtuples[0]; - if ((tupIndex = state->mergenext[srcTape]) == 0) + state->lastReturnedTuple = stup->tuple; + + /* + * Pull next tuple from tape, and replace the returned tuple + * at top of the heap with it. + */ + if (!mergereadnext(state, srcTape, &newtup)) { /* - * out of preloaded data on this tape, try to read more - * - * Unlike mergeonerun(), we only preload from the single - * tape that's run dry, though not before preparing its - * batch memory for a new round of sequential consumption. - * See mergepreread() comments. + * If no more data, we've reached end of run on this tape. + * Remove the top node from the heap. */ - if (state->batchUsed) - mergebatchone(state, srcTape, stup, should_free); - - mergeprereadone(state, srcTape); + tuplesort_heap_delete_top(state, false); /* - * if still no data, we've reached end of run on this tape + * Rewind to free the read buffer. It'd go away at the + * end of the sort anyway, but better to release the + * memory early. */ - if ((tupIndex = state->mergenext[srcTape]) == 0) - { - /* Remove the top node from the heap */ - tuplesort_heap_delete_top(state, false); - /* Free tape's buffer, avoiding dangling pointer */ - if (state->batchUsed) - mergebatchfreetape(state, srcTape, stup, should_free); - return true; - } + LogicalTapeRewind(state->tapeset, srcTape, true); + return true; } - - /* - * pull next preread tuple from list, and replace the returned - * tuple at top of the heap with it. - */ - newtup = &state->memtuples[tupIndex]; - state->mergenext[srcTape] = newtup->tupindex; - if (state->mergenext[srcTape] == 0) - state->mergelast[srcTape] = 0; - newtup->tupindex = srcTape; - tuplesort_heap_replace_top(state, newtup, false); - /* put the now-unused memtuples entry on the freelist */ - newtup->tupindex = state->mergefreelist; - state->mergefreelist = tupIndex; - state->mergeavailslots[srcTape]++; + newtup.tupindex = srcTape; + tuplesort_heap_replace_top(state, &newtup, false); return true; } return false; @@ -2317,13 +2345,6 @@ inittapes(Tuplesortstate *state) /* Compute number of tapes to use: merge order plus 1 */ maxTapes = tuplesort_merge_order(state->allowedMem) + 1; - /* - * We must have at least 2*maxTapes slots in the memtuples[] array, else - * we'd not have room for merge heap plus preread. It seems unlikely that - * this case would ever occur, but be safe. - */ - maxTapes = Min(maxTapes, state->memtupsize / 2); - state->maxTapes = maxTapes; state->tapeRange = maxTapes - 1; @@ -2334,13 +2355,13 @@ inittapes(Tuplesortstate *state) #endif /* - * Decrease availMem to reflect the space needed for tape buffers; but - * don't decrease it to the point that we have no room for tuples. (That - * case is only likely to occur if sorting pass-by-value Datums; in all - * other scenarios the memtuples[] array is unlikely to occupy more than - * half of allowedMem. In the pass-by-value case it's not important to - * account for tuple space, so we don't care if LACKMEM becomes - * inaccurate.) + * Decrease availMem to reflect the space needed for tape buffers, when + * writing the initial runs; but don't decrease it to the point that we + * have no room for tuples. (That case is only likely to occur if sorting + * pass-by-value Datums; in all other scenarios the memtuples[] array is + * unlikely to occupy more than half of allowedMem. In the pass-by-value + * case it's not important to account for tuple space, so we don't care if + * LACKMEM becomes inaccurate.) */ tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD; @@ -2359,14 +2380,6 @@ inittapes(Tuplesortstate *state) state->tapeset = LogicalTapeSetCreate(maxTapes); state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); - state->mergenext = (int *) palloc0(maxTapes * sizeof(int)); - state->mergelast = (int *) palloc0(maxTapes * sizeof(int)); - state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int)); - state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64)); - state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *)); - state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *)); - state->mergetail = (char **) palloc0(maxTapes * sizeof(char *)); - state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *)); state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); @@ -2465,6 +2478,105 @@ selectnewtape(Tuplesortstate *state) state->destTape = 0; } +/* + * Initialize the slab allocation arena, for the given number of slots. + */ +static void +init_slab_allocator(Tuplesortstate *state, int numSlots) +{ + if (numSlots > 0) + { + char *p; + int i; + + state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE); + state->slabMemoryEnd = state->slabMemoryBegin + + numSlots * SLAB_SLOT_SIZE; + state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; + USEMEM(state, numSlots * SLAB_SLOT_SIZE); + + p = state->slabMemoryBegin; + for (i = 0; i < numSlots - 1; i++) + { + ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); + p += SLAB_SLOT_SIZE; + } + ((SlabSlot *) p)->nextfree = NULL; + } + else + { + state->slabMemoryBegin = state->slabMemoryEnd = NULL; + state->slabFreeHead = NULL; + } + state->slabAllocatorUsed = true; +} + +/* + * Divide all remaining work memory (availMem) as read buffers, for all + * the tapes that will be used during the merge. + * + * We use the number of possible *input* tapes here, rather than maxTapes, + * for the calculation. At all times, we'll be reading from at most + * numInputTapes tapes, and one tape is used for output (unless we do an + * on-the-fly final merge, in which case we don't have an output tape). + */ +static void +init_tape_buffers(Tuplesortstate *state, int numInputTapes) +{ + int64 availBlocks; + int64 blocksPerTape; + int remainder; + int tapenum; + + /* + * Divide availMem evenly among the number of input tapes. + */ + availBlocks = state->availMem / BLCKSZ; + blocksPerTape = availBlocks / numInputTapes; + remainder = availBlocks % numInputTapes; + USEMEM(state, availBlocks * BLCKSZ); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", + (availBlocks * BLCKSZ) / 1024, numInputTapes); +#endif + + /* + * Use one page per tape, even if we are out of memory. + * tuplesort_merge_order() should've chosen the number of tapes so that + * this can't happen, but better safe than sorry. (This also protects + * from a negative availMem.) + */ + if (blocksPerTape < 1) + { + blocksPerTape = 1; + remainder = 0; + } + + /* + * Set the buffers for the tapes. + * + * In a multi-phase merge, the tape that is initially used as an output + * tape, will later be rewound and read from, and should also use a large + * buffer at that point. So we must loop up to maxTapes, not just + * numInputTapes! + * + * If there are fewer runs than tapes, we will set the buffer size also + * for tapes that will go completely unused, but that's harmless. + * LogicalTapeAssignReadBufferSize() doesn't allocate the buffer + * immediately, it just sets the size that will be used, when the tape is + * rewound for read, and the tape isn't empty. + */ + for (tapenum = 0; tapenum < state->maxTapes; tapenum++) + { + int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0); + + LogicalTapeAssignReadBufferSize(state->tapeset, tapenum, + numBlocks * BLCKSZ); + } +} + /* * mergeruns -- merge all the completed initial runs. * @@ -2478,6 +2590,8 @@ mergeruns(Tuplesortstate *state) svTape, svRuns, svDummy; + int numTapes; + int numInputTapes; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); @@ -2498,6 +2612,56 @@ mergeruns(Tuplesortstate *state) state->sortKeys->abbrev_full_comparator = NULL; } + /* + * Reset tuple memory. We've freed all the tuples that we previously + * allocated. We will use the slab allocator from now on. + */ + MemoryContextDelete(state->tuplecontext); + state->tuplecontext = NULL; + + /* + * We no longer need a large memtuples array. (We will allocate a smaller + * one for the heap later.) + */ + FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); + pfree(state->memtuples); + state->memtuples = NULL; + + /* + * If we had fewer runs than tapes, refund the memory that we imagined we + * would need for the tape buffers of the unused tapes. + * + * numTapes and numInputTapes reflect the actual number of tapes we will + * use. Note that the output tape's tape number is maxTapes - 1, so the + * tape numbers of the used tapes are not consecutive, and you cannot just + * loop from 0 to numTapes to visit all used tapes! + */ + if (state->Level == 1) + { + numInputTapes = state->currentRun; + numTapes = numInputTapes + 1; + FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); + } + else + { + numInputTapes = state->tapeRange; + numTapes = state->maxTapes; + } + + /* + * Initialize the slab allocator. We need one slab slot per input tape, + * for the tuples in the heap, plus one to hold the tuple last returned + * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, + * however, we don't need to do allocate anything.) + * + * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism + * to track memory usage of individual tuples. + */ + if (state->tuples) + init_slab_allocator(state, numInputTapes + 1); + else + init_slab_allocator(state, 0); + /* * If we produced only one initial run (quite likely if the total data * volume is between 1X and 2X workMem when replacement selection is used, @@ -2514,6 +2678,35 @@ mergeruns(Tuplesortstate *state) return; } + /* + * Use all the spare memory we have available for read buffers for the + * tapes. + * + * We do this only after checking for the case that we produced only one + * initial run, because there is no need to use a large read buffer when + * we're reading from a single tape. With one tape, the I/O pattern will + * be the same regardless of the buffer size. + * + * We don't try to "rebalance" the amount of memory among tapes, when we + * start a new merge phase, even if some tapes can be inactive in the + * phase. That would be hard, because logtape.c doesn't know where one + * run ends and another begins. When a new merge phase begins, and a tape + * doesn't participate in it, its buffer nevertheless already contains + * tuples from the next run on same tape, so we cannot release the buffer. + * That's OK in practice, merge performance isn't that sensitive to the + * amount of buffers used, and most merge phases use all or almost all + * tapes, anyway. + */ + init_tape_buffers(state, numInputTapes); + + /* + * Allocate a new 'memtuples' array, for the heap. It will hold one tuple + * from each input tape. + */ + state->memtupsize = numInputTapes; + state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple)); + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) LogicalTapeRewind(state->tapeset, tapenum, false); @@ -2544,7 +2737,7 @@ mergeruns(Tuplesortstate *state) /* Tell logtape.c we won't be writing anymore */ LogicalTapeSetForgetFreeSpace(state->tapeset); /* Initialize for the final merge pass */ - beginmerge(state, state->tuples); + beginmerge(state); state->status = TSS_FINALMERGE; return; } @@ -2614,6 +2807,13 @@ mergeruns(Tuplesortstate *state) state->result_tape = state->tp_tapenum[state->tapeRange]; LogicalTapeFreeze(state->tapeset, state->result_tape); state->status = TSS_SORTEDONTAPE; + + /* Release the read buffers of all the other tapes, by rewinding them. */ + for (tapenum = 0; tapenum < state->maxTapes; tapenum++) + { + if (tapenum != state->result_tape) + LogicalTapeRewind(state->tapeset, tapenum, true); + } } /* @@ -2627,16 +2827,12 @@ mergeonerun(Tuplesortstate *state) { int destTape = state->tp_tapenum[state->tapeRange]; int srcTape; - int tupIndex; - SortTuple *tup; - int64 priorAvail, - spaceFreed; /* * Start the merge by loading one tuple from each active source tape into * the heap. We can also decrease the input run/dummy run counts. */ - beginmerge(state, false); + beginmerge(state); /* * Execute merge by repeatedly extracting lowest tuple in heap, writing it @@ -2645,50 +2841,28 @@ mergeonerun(Tuplesortstate *state) */ while (state->memtupcount > 0) { + SortTuple stup; + /* write the tuple to destTape */ - priorAvail = state->availMem; srcTape = state->memtuples[0].tupindex; WRITETUP(state, destTape, &state->memtuples[0]); - /* writetup adjusted total free space, now fix per-tape space */ - spaceFreed = state->availMem - priorAvail; - state->mergeavailmem[srcTape] += spaceFreed; - if ((tupIndex = state->mergenext[srcTape]) == 0) - { - /* out of preloaded data on this tape, try to read more */ - mergepreread(state); - /* if still no data, we've reached end of run on this tape */ - if ((tupIndex = state->mergenext[srcTape]) == 0) - { - /* remove the written-out tuple from the heap */ - tuplesort_heap_delete_top(state, false); - continue; - } - } + + /* recycle the slot of the tuple we just wrote out, for the next read */ + RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); /* - * pull next preread tuple from list, and replace the written-out - * tuple in the heap with it. + * pull next tuple from the tape, and replace the written-out tuple in + * the heap with it. */ - tup = &state->memtuples[tupIndex]; - state->mergenext[srcTape] = tup->tupindex; - if (state->mergenext[srcTape] == 0) - state->mergelast[srcTape] = 0; - tup->tupindex = srcTape; - tuplesort_heap_replace_top(state, tup, false); - /* put the now-unused memtuples entry on the freelist */ - tup->tupindex = state->mergefreelist; - state->mergefreelist = tupIndex; - state->mergeavailslots[srcTape]++; - } + if (mergereadnext(state, srcTape, &stup)) + { + stup.tupindex = srcTape; + tuplesort_heap_replace_top(state, &stup, false); - /* - * Reset tuple memory. We've freed all of the tuples that we previously - * allocated, but AllocSetFree will have put those chunks of memory on - * particular free lists, bucketed by size class. Thus, although all of - * that memory is free, it is effectively fragmented. Resetting the - * context gets us out from under that problem. - */ - MemoryContextReset(state->tuplecontext); + } + else + tuplesort_heap_delete_top(state, false); + } /* * When the heap empties, we're done. Write an end-of-run marker on the @@ -2711,18 +2885,13 @@ mergeonerun(Tuplesortstate *state) * which tapes contain active input runs in mergeactive[]. Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. - * - * finalMergeBatch indicates if this is the beginning of a final on-the-fly - * merge where a batched allocation of tuple memory is required. */ static void -beginmerge(Tuplesortstate *state, bool finalMergeBatch) +beginmerge(Tuplesortstate *state) { int activeTapes; int tapenum; int srcTape; - int slotsPerTape; - int64 spacePerTape; /* Heap should be empty here */ Assert(state->memtupcount == 0); @@ -2744,519 +2913,44 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch) activeTapes++; } } - state->activeTapes = activeTapes; - - /* Clear merge-pass state variables */ - memset(state->mergenext, 0, - state->maxTapes * sizeof(*state->mergenext)); - memset(state->mergelast, 0, - state->maxTapes * sizeof(*state->mergelast)); - state->mergefreelist = 0; /* nothing in the freelist */ - state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ - - if (finalMergeBatch) - { - /* Free outright buffers for tape never actually allocated */ - FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD); - - /* - * Grow memtuples one last time, since the palloc() overhead no longer - * incurred can make a big difference - */ - batchmemtuples(state); - } - - /* - * Initialize space allocation to let each active input tape have an equal - * share of preread space. - */ Assert(activeTapes > 0); - slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes; - Assert(slotsPerTape > 0); - spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes); - for (srcTape = 0; srcTape < state->maxTapes; srcTape++) - { - if (state->mergeactive[srcTape]) - { - state->mergeavailslots[srcTape] = slotsPerTape; - state->mergeavailmem[srcTape] = spacePerTape; - } - } - - /* - * Preallocate tuple batch memory for each tape. This is the memory used - * for tuples themselves (not SortTuples), so it's never used by - * pass-by-value datum sorts. Memory allocation is performed here at most - * once per sort, just in advance of the final on-the-fly merge step. - */ - if (finalMergeBatch) - mergebatch(state, spacePerTape); - - /* - * Preread as many tuples as possible (and at least one) from each active - * tape - */ - mergepreread(state); + state->activeTapes = activeTapes; /* Load the merge heap with the first tuple from each input tape */ for (srcTape = 0; srcTape < state->maxTapes; srcTape++) { - int tupIndex = state->mergenext[srcTape]; - SortTuple *tup; + SortTuple tup; - if (tupIndex) + if (mergereadnext(state, srcTape, &tup)) { - tup = &state->memtuples[tupIndex]; - state->mergenext[srcTape] = tup->tupindex; - if (state->mergenext[srcTape] == 0) - state->mergelast[srcTape] = 0; - tup->tupindex = srcTape; - tuplesort_heap_insert(state, tup, false); - /* put the now-unused memtuples entry on the freelist */ - tup->tupindex = state->mergefreelist; - state->mergefreelist = tupIndex; - state->mergeavailslots[srcTape]++; - -#ifdef TRACE_SORT - if (trace_sort && finalMergeBatch) - { - int64 perTapeKB = (spacePerTape + 1023) / 1024; - int64 usedSpaceKB; - int usedSlots; - - /* - * Report how effective batchmemtuples() was in balancing the - * number of slots against the need for memory for the - * underlying tuples (e.g. IndexTuples). The big preread of - * all tapes when switching to FINALMERGE state should be - * fairly representative of memory utilization during the - * final merge step, and in any case is the only point at - * which all tapes are guaranteed to have depleted either - * their batch memory allowance or slot allowance. Ideally, - * both will be completely depleted for every tape by now. - */ - usedSpaceKB = (state->mergecurrent[srcTape] - - state->mergetuples[srcTape] + 1023) / 1024; - usedSlots = slotsPerTape - state->mergeavailslots[srcTape]; - - elog(LOG, "tape %d initially used " INT64_FORMAT " KB of " - INT64_FORMAT " KB batch (%2.3f) and %d out of %d slots " - "(%2.3f)", srcTape, - usedSpaceKB, perTapeKB, - (double) usedSpaceKB / (double) perTapeKB, - usedSlots, slotsPerTape, - (double) usedSlots / (double) slotsPerTape); - } -#endif + tup.tupindex = srcTape; + tuplesort_heap_insert(state, &tup, false); } } } /* - * batchmemtuples - grow memtuples without palloc overhead + * mergereadnext - read next tuple from one merge input tape * - * When called, availMem should be approximately the amount of memory we'd - * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots) - * that were allocated with palloc() overhead, and in doing so use up all - * allocated slots. However, though slots and tuple memory is in balance - * following the last grow_memtuples() call, that's predicated on the observed - * average tuple size for the "final" grow_memtuples() call, which includes - * palloc overhead. During the final merge pass, where we will arrange to - * squeeze out the palloc overhead, we might need more slots in the memtuples - * array. - * - * To make that happen, arrange for the amount of remaining memory to be - * exactly equal to the palloc overhead multiplied by the current size of - * the memtuples array, force the grow_memtuples flag back to true (it's - * probably but not necessarily false on entry to this routine), and then - * call grow_memtuples. This simulates loading enough tuples to fill the - * whole memtuples array and then having some space left over because of the - * elided palloc overhead. We expect that grow_memtuples() will conclude that - * it can't double the size of the memtuples array but that it can increase - * it by some percentage; but if it does decide to double it, that just means - * that we've never managed to use many slots in the memtuples array, in which - * case doubling it shouldn't hurt anything anyway. + * Returns false on EOF. */ -static void -batchmemtuples(Tuplesortstate *state) -{ - int64 refund; - int64 availMemLessRefund; - int memtupsize = state->memtupsize; - - /* Caller error if we have no tapes */ - Assert(state->activeTapes > 0); - - /* For simplicity, assume no memtuples are actually currently counted */ - Assert(state->memtupcount == 0); - - /* - * Refund STANDARDCHUNKHEADERSIZE per tuple. - * - * This sometimes fails to make memory use perfectly balanced, but it - * should never make the situation worse. Note that Assert-enabled builds - * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE. - */ - refund = memtupsize * STANDARDCHUNKHEADERSIZE; - availMemLessRefund = state->availMem - refund; - - /* - * We need to be sure that we do not cause LACKMEM to become true, else - * the batch allocation size could be calculated as negative, causing - * havoc. Hence, if availMemLessRefund is negative at this point, we must - * do nothing. Moreover, if it's positive but rather small, there's - * little point in proceeding because we could only increase memtuples by - * a small amount, not worth the cost of the repalloc's. We somewhat - * arbitrarily set the threshold at ALLOCSET_DEFAULT_INITSIZE per tape. - * (Note that this does not represent any assumption about tuple sizes.) - */ - if (availMemLessRefund <= - (int64) state->activeTapes * ALLOCSET_DEFAULT_INITSIZE) - return; - - /* - * To establish balanced memory use after refunding palloc overhead, - * temporarily have our accounting indicate that we've allocated all - * memory we're allowed to less that refund, and call grow_memtuples() to - * have it increase the number of slots. - */ - state->growmemtuples = true; - USEMEM(state, availMemLessRefund); - (void) grow_memtuples(state); - state->growmemtuples = false; - /* availMem must stay accurate for spacePerTape calculation */ - FREEMEM(state, availMemLessRefund); - if (LACKMEM(state)) - elog(ERROR, "unexpected out-of-memory situation in tuplesort"); - -#ifdef TRACE_SORT - if (trace_sort) - { - Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024; - Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024; - - elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge", - (double) NewKb / (double) OldKb, - memtupsize, OldKb, - state->memtupsize, NewKb); - } -#endif -} - -/* - * mergebatch - initialize tuple memory in batch - * - * This allows sequential access to sorted tuples buffered in memory from - * tapes/runs on disk during a final on-the-fly merge step. Note that the - * memory is not used for SortTuples, but for the underlying tuples (e.g. - * MinimalTuples). - * - * Note that when batch memory is used, there is a simple division of space - * into large buffers (one per active tape). The conventional incremental - * memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead, - * when each tape's memory budget is exceeded, a retail palloc() "overflow" is - * performed, which is then immediately detected in a way that is analogous to - * LACKMEM(). This keeps each tape's use of memory fair, which is always a - * goal. - */ -static void -mergebatch(Tuplesortstate *state, int64 spacePerTape) -{ - int srcTape; - - Assert(state->activeTapes > 0); - Assert(state->tuples); - - /* - * For the purposes of tuplesort's memory accounting, the batch allocation - * is special, and regular memory accounting through USEMEM() calls is - * abandoned (see mergeprereadone()). - */ - for (srcTape = 0; srcTape < state->maxTapes; srcTape++) - { - char *mergetuples; - - if (!state->mergeactive[srcTape]) - continue; - - /* Allocate buffer for each active tape */ - mergetuples = MemoryContextAllocHuge(state->tuplecontext, - spacePerTape); - - /* Initialize state for tape */ - state->mergetuples[srcTape] = mergetuples; - state->mergecurrent[srcTape] = mergetuples; - state->mergetail[srcTape] = mergetuples; - state->mergeoverflow[srcTape] = NULL; - } - - state->batchUsed = true; - state->spacePerTape = spacePerTape; -} - -/* - * mergebatchone - prepare batch memory for one merge input tape - * - * This is called following the exhaustion of preread tuples for one input - * tape. All that actually occurs is that the state for the source tape is - * reset to indicate that all memory may be reused. - * - * This routine must deal with fixing up the tuple that is about to be returned - * to the client, due to "overflow" allocations. - */ -static void -mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup, - bool *should_free) -{ - Assert(state->batchUsed); - - /* - * Tuple about to be returned to caller ("stup") is final preread tuple - * from tape, just removed from the top of the heap. Special steps around - * memory management must be performed for that tuple, to make sure it - * isn't overwritten early. - */ - if (!state->mergeoverflow[srcTape]) - { - Size tupLen; - - /* - * Mark tuple buffer range for reuse, but be careful to move final, - * tail tuple to start of space for next run so that it's available to - * caller when stup is returned, and remains available at least until - * the next tuple is requested. - */ - tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape]; - state->mergecurrent[srcTape] = state->mergetuples[srcTape]; - MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape], - tupLen); - - /* Make SortTuple at top of the merge heap point to new tuple */ - rtup->tuple = (void *) state->mergecurrent[srcTape]; - - state->mergetail[srcTape] = state->mergecurrent[srcTape]; - state->mergecurrent[srcTape] += tupLen; - } - else - { - /* - * Handle an "overflow" retail palloc. - * - * This is needed when we run out of tuple memory for the tape. - */ - state->mergecurrent[srcTape] = state->mergetuples[srcTape]; - state->mergetail[srcTape] = state->mergetuples[srcTape]; - - if (rtup->tuple) - { - Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]); - /* Caller should free palloc'd tuple */ - *should_free = true; - } - state->mergeoverflow[srcTape] = NULL; - } -} - -/* - * mergebatchfreetape - handle final clean-up for batch memory once tape is - * about to become exhausted - * - * All tuples are returned from tape, but a single final tuple, *rtup, is to be - * passed back to caller. Free tape's batch allocation buffer while ensuring - * that the final tuple is managed appropriately. - */ -static void -mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup, - bool *should_free) -{ - Assert(state->batchUsed); - Assert(state->status == TSS_FINALMERGE); - - /* - * Tuple may or may not already be an overflow allocation from - * mergebatchone() - */ - if (!*should_free && rtup->tuple) - { - /* - * Final tuple still in tape's batch allocation. - * - * Return palloc()'d copy to caller, and have it freed in a similar - * manner to overflow allocation. Otherwise, we'd free batch memory - * and pass back a pointer to garbage. Note that we deliberately - * allocate this in the parent tuplesort context, to be on the safe - * side. - */ - Size tuplen; - void *oldTuple = rtup->tuple; - - tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape]; - rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen); - MOVETUP(rtup->tuple, oldTuple, tuplen); - *should_free = true; - } - - /* Free spacePerTape-sized buffer */ - pfree(state->mergetuples[srcTape]); -} - -/* - * mergebatchalloc - allocate memory for one tuple using a batch memory - * "logical allocation". - * - * This is used for the final on-the-fly merge phase only. READTUP() routines - * receive memory from here in place of palloc() and USEMEM() calls. - * - * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted, - * contiguous order (while allowing safe reuse of memory made available to - * each tape). This maximizes locality of access as tuples are returned by - * final merge. - * - * Caller must not subsequently attempt to free memory returned here. In - * general, only mergebatch* functions know about how memory returned from - * here should be freed, and this function's caller must ensure that batch - * memory management code will definitely have the opportunity to do the right - * thing during the final on-the-fly merge. - */ -static void * -mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen) -{ - Size reserve_tuplen = MAXALIGN(tuplen); - char *ret; - - /* Should overflow at most once before mergebatchone() call: */ - Assert(state->mergeoverflow[tapenum] == NULL); - Assert(state->batchUsed); - - /* It should be possible to use precisely spacePerTape memory at once */ - if (state->mergecurrent[tapenum] + reserve_tuplen <= - state->mergetuples[tapenum] + state->spacePerTape) - { - /* - * Usual case -- caller is returned pointer into its tape's buffer, - * and an offset from that point is recorded as where tape has - * consumed up to for current round of preloading. - */ - ret = state->mergetail[tapenum] = state->mergecurrent[tapenum]; - state->mergecurrent[tapenum] += reserve_tuplen; - } - else - { - /* - * Allocate memory, and record as tape's overflow allocation. This - * will be detected quickly, in a similar fashion to a LACKMEM() - * condition, and should not happen again before a new round of - * preloading for caller's tape. Note that we deliberately allocate - * this in the parent tuplesort context, to be on the safe side. - * - * Sometimes, this does not happen because merging runs out of slots - * before running out of memory. - */ - ret = state->mergeoverflow[tapenum] = - MemoryContextAlloc(state->sortcontext, tuplen); - } - - return ret; -} - -/* - * mergepreread - load tuples from merge input tapes - * - * This routine exists to improve sequentiality of reads during a merge pass, - * as explained in the header comments of this file. Load tuples from each - * active source tape until the tape's run is exhausted or it has used up - * its fair share of available memory. In any case, we guarantee that there - * is at least one preread tuple available from each unexhausted input tape. - * - * We invoke this routine at the start of a merge pass for initial load, - * and then whenever any tape's preread data runs out. Note that we load - * as much data as possible from all tapes, not just the one that ran out. - * This is because logtape.c works best with a usage pattern that alternates - * between reading a lot of data and writing a lot of data, so whenever we - * are forced to read, we should fill working memory completely. - * - * In FINALMERGE state, we *don't* use this routine, but instead just preread - * from the single tape that ran dry. There's no read/write alternation in - * that state and so no point in scanning through all the tapes to fix one. - * (Moreover, there may be quite a lot of inactive tapes in that state, since - * we might have had many fewer runs than tapes. In a regular tape-to-tape - * merge we can expect most of the tapes to be active. Plus, only - * FINALMERGE state has to consider memory management for a batch - * allocation.) - */ -static void -mergepreread(Tuplesortstate *state) -{ - int srcTape; - - for (srcTape = 0; srcTape < state->maxTapes; srcTape++) - mergeprereadone(state, srcTape); -} - -/* - * mergeprereadone - load tuples from one merge input tape - * - * Read tuples from the specified tape until it has used up its free memory - * or array slots; but ensure that we have at least one tuple, if any are - * to be had. - */ -static void -mergeprereadone(Tuplesortstate *state, int srcTape) +static bool +mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) { unsigned int tuplen; - SortTuple stup; - int tupIndex; - int64 priorAvail, - spaceUsed; if (!state->mergeactive[srcTape]) - return; /* tape's run is already exhausted */ + return false; /* tape's run is already exhausted */ - /* - * Manage per-tape availMem. Only actually matters when batch memory not - * in use. - */ - priorAvail = state->availMem; - state->availMem = state->mergeavailmem[srcTape]; - - /* - * When batch memory is used if final on-the-fly merge, only mergeoverflow - * test is relevant; otherwise, only LACKMEM() test is relevant. - */ - while ((state->mergeavailslots[srcTape] > 0 && - state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) || - state->mergenext[srcTape] == 0) + /* read next tuple, if any */ + if ((tuplen = getlen(state, srcTape, true)) == 0) { - /* read next tuple, if any */ - if ((tuplen = getlen(state, srcTape, true)) == 0) - { - state->mergeactive[srcTape] = false; - break; - } - READTUP(state, &stup, srcTape, tuplen); - /* find a free slot in memtuples[] for it */ - tupIndex = state->mergefreelist; - if (tupIndex) - state->mergefreelist = state->memtuples[tupIndex].tupindex; - else - { - tupIndex = state->mergefirstfree++; - Assert(tupIndex < state->memtupsize); - } - state->mergeavailslots[srcTape]--; - /* store tuple, append to list for its tape */ - stup.tupindex = 0; - state->memtuples[tupIndex] = stup; - if (state->mergelast[srcTape]) - state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex; - else - state->mergenext[srcTape] = tupIndex; - state->mergelast[srcTape] = tupIndex; + state->mergeactive[srcTape] = false; + return false; } - /* update per-tape and global availmem counts */ - spaceUsed = state->mergeavailmem[srcTape] - state->availMem; - state->mergeavailmem[srcTape] = state->availMem; - state->availMem = priorAvail - spaceUsed; + READTUP(state, stup, srcTape, tuplen); + + return true; } /* @@ -3441,9 +3135,9 @@ dumpbatch(Tuplesortstate *state, bool alltuples) /* * Reset tuple memory. We've freed all of the tuples that we previously * allocated. It's important to avoid fragmentation when there is a stark - * change in allocation patterns due to the use of batch memory. - * Fragmentation due to AllocSetFree's bucketing by size class might be - * particularly bad if this step wasn't taken. + * change in the sizes of incoming tuples. Fragmentation due to + * AllocSetFree's bucketing by size class might be particularly bad if + * this step wasn't taken. */ MemoryContextReset(state->tuplecontext); @@ -3901,38 +3595,31 @@ markrunend(Tuplesortstate *state, int tapenum) } /* - * Get memory for tuple from within READTUP() routine. Allocate - * memory and account for that, or consume from tape's batch - * allocation. + * Get memory for tuple from within READTUP() routine. * - * Memory returned here in the final on-the-fly merge case is recycled - * from tape's batch allocation. Otherwise, callers must pfree() or - * reset tuple child memory context, and account for that with a - * FREEMEM(). Currently, this only ever needs to happen in WRITETUP() - * routines. + * We use next free slot from the slab allocator, or palloc() if the tuple + * is too large for that. */ static void * -readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen) +readtup_alloc(Tuplesortstate *state, Size tuplen) { - if (state->batchUsed) - { - /* - * No USEMEM() call, because during final on-the-fly merge accounting - * is based on tape-private state. ("Overflow" allocations are - * detected as an indication that a new round or preloading is - * required. Preloading marks existing contents of tape's batch buffer - * for reuse.) - */ - return mergebatchalloc(state, tapenum, tuplen); - } + SlabSlot *buf; + + /* + * We pre-allocate enough slots in the slab arena that we should never run + * out. + */ + Assert(state->slabFreeHead); + + if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) + return MemoryContextAlloc(state->sortcontext, tuplen); else { - char *ret; + buf = state->slabFreeHead; + /* Reuse this slot */ + state->slabFreeHead = buf->nextfree; - /* Batch allocation yet to be performed */ - ret = MemoryContextAlloc(state->tuplecontext, tuplen); - USEMEM(state, GetMemoryChunkSpace(ret)); - return ret; + return buf; } } @@ -4101,8 +3788,11 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) LogicalTapeWrite(state->tapeset, tapenum, (void *) &tuplen, sizeof(tuplen)); - FREEMEM(state, GetMemoryChunkSpace(tuple)); - heap_free_minimal_tuple(tuple); + if (!state->slabAllocatorUsed) + { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_free_minimal_tuple(tuple); + } } static void @@ -4111,7 +3801,7 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; - MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen); + MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; HeapTupleData htup; @@ -4132,12 +3822,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } -static void -movetup_heap(void *dest, void *src, unsigned int len) -{ - memmove(dest, src, len); -} - /* * Routines specialized for the CLUSTER case (HeapTuple data, with * comparisons per a btree index definition) @@ -4344,8 +4028,11 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) LogicalTapeWrite(state->tapeset, tapenum, &tuplen, sizeof(tuplen)); - FREEMEM(state, GetMemoryChunkSpace(tuple)); - heap_freetuple(tuple); + if (!state->slabAllocatorUsed) + { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_freetuple(tuple); + } } static void @@ -4354,7 +4041,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup, { unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); HeapTuple tuple = (HeapTuple) readtup_alloc(state, - tapenum, t_len + HEAPTUPLESIZE); /* Reconstruct the HeapTupleData header */ @@ -4379,19 +4065,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } -static void -movetup_cluster(void *dest, void *src, unsigned int len) -{ - HeapTuple tuple; - - memmove(dest, src, len); - - /* Repoint the HeapTupleData header */ - tuple = (HeapTuple) dest; - tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); -} - - /* * Routines specialized for IndexTuple case * @@ -4659,8 +4332,11 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) LogicalTapeWrite(state->tapeset, tapenum, (void *) &tuplen, sizeof(tuplen)); - FREEMEM(state, GetMemoryChunkSpace(tuple)); - pfree(tuple); + if (!state->slabAllocatorUsed) + { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + pfree(tuple); + } } static void @@ -4668,7 +4344,7 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); - IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen); + IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); LogicalTapeReadExact(state->tapeset, tapenum, tuple, tuplen); @@ -4683,12 +4359,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } -static void -movetup_index(void *dest, void *src, unsigned int len) -{ - memmove(dest, src, len); -} - /* * Routines specialized for DatumTuple case */ @@ -4755,7 +4425,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) LogicalTapeWrite(state->tapeset, tapenum, (void *) &writtenlen, sizeof(writtenlen)); - if (stup->tuple) + if (!state->slabAllocatorUsed && stup->tuple) { FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); pfree(stup->tuple); @@ -4785,7 +4455,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, } else { - void *raddr = readtup_alloc(state, tapenum, tuplen); + void *raddr = readtup_alloc(state, tuplen); LogicalTapeReadExact(state->tapeset, tapenum, raddr, tuplen); @@ -4799,12 +4469,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, &tuplen, sizeof(tuplen)); } -static void -movetup_datum(void *dest, void *src, unsigned int len) -{ - memmove(dest, src, len); -} - /* * Convenience routine to free a tuple previously loaded into sort memory */ diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index fa1e992082..362a6196dc 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -39,6 +39,8 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset); extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); +extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, + size_t bufsize); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); #endif /* LOGTAPE_H */