Improve eviction algorithm in ReorderBuffer using max-heap for many subtransactions.

Previously, when selecting the transaction to evict during logical
decoding, we check all transactions to find the largest
transaction. This could lead to a significant replication lag
especially in the case where there are many subtransactions.

This commit improves the eviction algorithm in ReorderBuffer using the
max-heap with transaction size as the key to efficiently find the
largest transaction.

The max-heap starts with empty. While the max-heap is empty, we don't
do anything for the max-heap when updating the memory
counter. Therefore, we get the largest transaction in O(N) time, where
N is the number of transactions including top-level transactions and
subtransactions.

We build the max-heap just before selecting the largest transactions
if the number of transactions being decoded is higher than the
threshold, MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap,
we also update the max-heap when updating the memory counter. The
intention is to efficiently find the largest transaction in O(1) time
instead of incurring the cost of memory counter updates (O(log
N)). Once the number of transactions got lower than the threshold, we
reset the max-heap.

The performance benchmark results showed significant speed up (more
than x30 speed up on my machine) in decoding a transaction with 100k
subtransactions, whereas there is no visible overhead in other cases.

Reviewed-by: Amit Kapila, Hayato Kuroda, Vignesh C, Ajin Cherian,
Tomas Vondra, Shubham Khanna, Peter Smith, Álvaro Herrera,
Euler Taveira
Discussion: https://postgr.es/m/CAD21AoAfKTgrBrLq96GcTv9d6k97zaQcDM-rxfKEt4GSe0qnaQ%40mail.gmail.com
This commit is contained in:
Masahiko Sawada 2024-04-03 11:40:42 +09:00
parent 7487044d6c
commit 5bec1d6bc5
2 changed files with 214 additions and 27 deletions

View File

@ -67,6 +67,21 @@
* allocator, evicting the oldest changes would make it more likely the * allocator, evicting the oldest changes would make it more likely the
* memory gets actually freed. * memory gets actually freed.
* *
* We use a max-heap with transaction size as the key to efficiently find
* the largest transaction. While the max-heap is empty, we don't update
* the max-heap when updating the memory counter. Therefore, we can get
* the largest transaction in O(N) time, where N is the number of
* transactions including top-level transactions and subtransactions.
*
* We build the max-heap just before selecting the largest transactions
* if the number of transactions being decoded is higher than the threshold,
* MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
* update the max-heap when updating the memory counter. The intention is
* to efficiently find the largest transaction in O(1) time instead of
* incurring the cost of memory counter updates (O(log N)). Once the number
* of transactions got lower than the threshold, we reset the max-heap
* (refer to ReorderBufferMaybeResetMaxHeap() for details).
*
* We still rely on max_changes_in_memory when loading serialized changes * We still rely on max_changes_in_memory when loading serialized changes
* back into memory. At that point we can't use the memory limit directly * back into memory. At that point we can't use the memory limit directly
* as we load the subxacts independently. One option to deal with this * as we load the subxacts independently. One option to deal with this
@ -107,6 +122,22 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relfilenumbermap.h" #include "utils/relfilenumbermap.h"
/*
* Threshold of the total number of top-level and sub transactions that
* controls whether we use the max-heap for tracking their sizes. Although
* using the max-heap to select the largest transaction is effective when
* there are many transactions being decoded, maintaining the max-heap while
* updating the memory statistics can be costly. Therefore, we use
* MaxConnections as the threshold so that we use the max-heap only when
* using subtransactions.
*/
#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
/*
* A macro to check if the max-heap is ready to use and needs to be updated
* accordingly.
*/
#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
/* entry for a hash table we use to map from xid to our transaction state */ /* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt typedef struct ReorderBufferTXNByIdEnt
@ -259,6 +290,9 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno); TransactionId xid, XLogSegNo segno);
static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb);
static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@ -293,6 +327,7 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
static Size ReorderBufferChangeSize(ReorderBufferChange *change); static Size ReorderBufferChangeSize(ReorderBufferChange *change);
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change, ReorderBufferChange *change,
ReorderBufferTXN *txn,
bool addition, Size sz); bool addition, Size sz);
/* /*
@ -355,6 +390,17 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0; buffer->outbufsize = 0;
buffer->size = 0; buffer->size = 0;
/*
* The binaryheap is indexed for faster manipulations.
*
* We allocate the initial heap size greater than
* MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
* until the threshold is exceeded.
*/
buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
ReorderBufferTXNSizeCompare,
true, NULL);
buffer->spillTxns = 0; buffer->spillTxns = 0;
buffer->spillCount = 0; buffer->spillCount = 0;
buffer->spillBytes = 0; buffer->spillBytes = 0;
@ -485,7 +531,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
{ {
/* update memory accounting info */ /* update memory accounting info */
if (upd_mem) if (upd_mem)
ReorderBufferChangeMemoryUpdate(rb, change, false, ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
ReorderBufferChangeSize(change)); ReorderBufferChangeSize(change));
/* free contained data */ /* free contained data */
@ -816,7 +862,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn->nentries_mem++; txn->nentries_mem++;
/* update memory accounting information */ /* update memory accounting information */
ReorderBufferChangeMemoryUpdate(rb, change, true, ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change)); ReorderBufferChangeSize(change));
/* process partial change */ /* process partial change */
@ -1527,7 +1573,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */ /* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn); Assert(change->txn == txn);
ReorderBufferReturnChange(rb, change, true); ReorderBufferReturnChange(rb, change, false);
} }
/* /*
@ -1586,8 +1632,17 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn)) if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn); ReorderBufferRestoreCleanup(rb, txn);
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
/* deallocate */ /* deallocate */
ReorderBufferReturnTXN(rb, txn); ReorderBufferReturnTXN(rb, txn);
/*
* After cleaning up one transaction, the number of transactions might get
* lower than the threshold for the max-heap.
*/
ReorderBufferMaybeResetMaxHeap(rb);
} }
/* /*
@ -1637,9 +1692,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* remove the change from it's containing list */ /* remove the change from it's containing list */
dlist_delete(&change->node); dlist_delete(&change->node);
ReorderBufferReturnChange(rb, change, true); ReorderBufferReturnChange(rb, change, false);
} }
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
/* /*
* Mark the transaction as streamed. * Mark the transaction as streamed.
* *
@ -3166,6 +3224,9 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
* decide if we reached the memory limit, the transaction counter allows * decide if we reached the memory limit, the transaction counter allows
* us to quickly pick the largest transaction for eviction. * us to quickly pick the largest transaction for eviction.
* *
* Either txn or change must be non-NULL at least. We update the memory
* counter of txn if it's non-NULL, otherwise change->txn.
*
* When streaming is enabled, we need to update the toplevel transaction * When streaming is enabled, we need to update the toplevel transaction
* counters instead - we don't really care about subtransactions as we * counters instead - we don't really care about subtransactions as we
* can't stream them individually anyway, and we only pick toplevel * can't stream them individually anyway, and we only pick toplevel
@ -3174,22 +3235,27 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
static void static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change, ReorderBufferChange *change,
ReorderBufferTXN *txn,
bool addition, Size sz) bool addition, Size sz)
{ {
ReorderBufferTXN *txn;
ReorderBufferTXN *toptxn; ReorderBufferTXN *toptxn;
Assert(change->txn); Assert(txn || change);
/* /*
* Ignore tuple CID changes, because those are not evicted when reaching * Ignore tuple CID changes, because those are not evicted when reaching
* memory limit. So we just don't count them, because it might easily * memory limit. So we just don't count them, because it might easily
* trigger a pointless attempt to spill. * trigger a pointless attempt to spill.
*/ */
if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
return; return;
txn = change->txn; if (sz == 0)
return;
if (txn == NULL)
txn = change->txn;
Assert(txn != NULL);
/* /*
* Update the total size in top level as well. This is later used to * Update the total size in top level as well. This is later used to
@ -3204,6 +3270,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */ /* Update the total size in the top transaction. */
toptxn->total_size += sz; toptxn->total_size += sz;
/* Update the max-heap as well if necessary */
if (ReorderBufferMaxHeapIsReady(rb))
{
if ((txn->size - sz) == 0)
binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
else
binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
}
} }
else else
{ {
@ -3213,6 +3288,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */ /* Update the total size in the top transaction. */
toptxn->total_size -= sz; toptxn->total_size -= sz;
/* Update the max-heap as well if necessary */
if (ReorderBufferMaxHeapIsReady(rb))
{
if (txn->size == 0)
binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
else
binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
}
} }
Assert(txn->size <= rb->size); Assert(txn->size <= rb->size);
@ -3468,34 +3552,123 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
} }
} }
/* Compare two transactions by size */
static int
ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
{
ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
if (ta->size < tb->size)
return -1;
if (ta->size > tb->size)
return 1;
return 0;
}
/* /*
* Find the largest transaction (toplevel or subxact) to evict (spill to disk). * Build the max-heap. The heap assembly step is deferred until the end, for
* * efficiency.
* XXX With many subtransactions this might be quite slow, because we'll have
* to walk through all of them. There are some options how we could improve
* that: (a) maintain some secondary structure with transactions sorted by
* amount of changes, (b) not looking for the entirely largest transaction,
* but e.g. for transaction using at least some fraction of the memory limit,
* and (c) evicting multiple transactions at once, e.g. to free a given portion
* of the memory limit (e.g. 50%).
*/ */
static ReorderBufferTXN * static void
ReorderBufferLargestTXN(ReorderBuffer *rb) ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
{ {
HASH_SEQ_STATUS hash_seq; HASH_SEQ_STATUS hash_seq;
ReorderBufferTXNByIdEnt *ent; ReorderBufferTXNByIdEnt *ent;
ReorderBufferTXN *largest = NULL;
Assert(binaryheap_empty(rb->txn_heap));
hash_seq_init(&hash_seq, rb->by_txn); hash_seq_init(&hash_seq, rb->by_txn);
while ((ent = hash_seq_search(&hash_seq)) != NULL) while ((ent = hash_seq_search(&hash_seq)) != NULL)
{ {
ReorderBufferTXN *txn = ent->txn; ReorderBufferTXN *txn = ent->txn;
/* if the current transaction is larger, remember it */ if (txn->size == 0)
if ((!largest) || (txn->size > largest->size)) continue;
largest = txn;
binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
} }
binaryheap_build(rb->txn_heap);
}
/*
* Reset the max-heap if the number of transactions got lower than the
* threshold.
*/
static void
ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
{
/*
* If we add and remove transactions right around the threshold, we could
* easily end up "thrashing". To avoid it, we adapt 10% of transactions to
* reset the max-heap.
*/
if (ReorderBufferMaxHeapIsReady(rb) &&
binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
binaryheap_reset(rb->txn_heap);
}
/*
* Find the largest transaction (toplevel or subxact) to evict (spill to disk)
* by doing a linear search or using the max-heap depending on the number of
* transactions in ReorderBuffer. Refer to the comments atop this file for the
* algorithm details.
*/
static ReorderBufferTXN *
ReorderBufferLargestTXN(ReorderBuffer *rb)
{
ReorderBufferTXN *largest = NULL;
if (!ReorderBufferMaxHeapIsReady(rb))
{
/*
* If the number of transactions are small, we scan all transactions
* being decoded to get the largest transaction. This saves the cost
* of building a max-heap with a small number of transactions.
*/
if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
{
HASH_SEQ_STATUS hash_seq;
ReorderBufferTXNByIdEnt *ent;
hash_seq_init(&hash_seq, rb->by_txn);
while ((ent = hash_seq_search(&hash_seq)) != NULL)
{
ReorderBufferTXN *txn = ent->txn;
/* if the current transaction is larger, remember it */
if ((!largest) || (txn->size > largest->size))
largest = txn;
}
Assert(largest);
}
else
{
/*
* There are a large number of transactions in ReorderBuffer. We
* build the max-heap for efficiently selecting the largest
* transactions.
*/
ReorderBufferBuildMaxHeap(rb);
/*
* The max-heap is ready now. We remain the max-heap at least
* until we free up enough transactions to bring the total memory
* usage below the limit. The largest transaction is selected
* below.
*/
Assert(ReorderBufferMaxHeapIsReady(rb));
}
}
/* Get the largest transaction from the max-heap */
if (ReorderBufferMaxHeapIsReady(rb))
largest = (ReorderBufferTXN *)
DatumGetPointer(binaryheap_first(rb->txn_heap));
Assert(largest); Assert(largest);
Assert(largest->size > 0); Assert(largest->size > 0);
Assert(largest->size <= rb->size); Assert(largest->size <= rb->size);
@ -3638,6 +3811,13 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
/* We must be under the memory limit now. */ /* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * 1024L); Assert(rb->size < logical_decoding_work_mem * 1024L);
/*
* After evicting some transactions, the number of transactions might get
* lower than the threshold for the max-heap.
*/
ReorderBufferMaybeResetMaxHeap(rb);
} }
/* /*
@ -3705,11 +3885,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferSerializeChange(rb, txn, fd, change); ReorderBufferSerializeChange(rb, txn, fd, change);
dlist_delete(&change->node); dlist_delete(&change->node);
ReorderBufferReturnChange(rb, change, true); ReorderBufferReturnChange(rb, change, false);
spilled++; spilled++;
} }
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
/* update the statistics iff we have spilled anything */ /* update the statistics iff we have spilled anything */
if (spilled) if (spilled)
{ {
@ -4491,7 +4674,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
* update the accounting too (subtracting the size from the counters). And * update the accounting too (subtracting the size from the counters). And
* we don't want to underflow there. * we don't want to underflow there.
*/ */
ReorderBufferChangeMemoryUpdate(rb, change, true, ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change)); ReorderBufferChangeSize(change));
} }
@ -4903,9 +5086,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
/* subtract the old change size */ /* subtract the old change size */
ReorderBufferChangeMemoryUpdate(rb, change, false, old_size); ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
/* now add the change back, with the correct size */ /* now add the change back, with the correct size */
ReorderBufferChangeMemoryUpdate(rb, change, true, ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change)); ReorderBufferChangeSize(change));
} }

View File

@ -10,6 +10,7 @@
#define REORDERBUFFER_H #define REORDERBUFFER_H
#include "access/htup_details.h" #include "access/htup_details.h"
#include "lib/binaryheap.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -631,6 +632,9 @@ struct ReorderBuffer
/* memory accounting */ /* memory accounting */
Size size; Size size;
/* Max-heap for sizes of all top-level and sub transactions */
binaryheap *txn_heap;
/* /*
* Statistics about transactions spilled to disk. * Statistics about transactions spilled to disk.
* *