Replace binaryheap + index with pairingheap in reorderbuffer.c

A pairing heap can perform the same operations as the binary heap +
index, with as good or better algorithmic complexity, and that's an
existing data structure so that we don't need to invent anything new
compared to v16. This commit makes the new binaryheap functionality
that was added in commits b840508644 and bcb14f4abc unnecessary, but
they will be reverted separately.

Remove the optimization to only build and maintain the heap when the
amount of memory used is close to the limit, becuase the bookkeeping
overhead with the pairing heap seems to be small enough that it
doesn't matter in practice.

Reported-by: Jeff Davis
Author: Heikki Linnakangas
Reviewed-by: Michael Paquier, Hayato Kuroda, Masahiko Sawada
Discussion: https://postgr.es/m/12747c15811d94efcc5cda72d6b35c80d7bf3443.camel%40j-davis.com
This commit is contained in:
Masahiko Sawada 2024-04-11 17:04:38 +09:00
parent 942219996c
commit efb8acc0d0
2 changed files with 30 additions and 170 deletions

View File

@ -68,19 +68,9 @@
* memory gets actually freed. * memory gets actually freed.
* *
* We use a max-heap with transaction size as the key to efficiently find * We use a max-heap with transaction size as the key to efficiently find
* the largest transaction. While the max-heap is empty, we don't update * the largest transaction. We update the max-heap whenever the memory
* the max-heap when updating the memory counter. Therefore, we can get * counter is updated; however transactions with size 0 are not stored in
* the largest transaction in O(N) time, where N is the number of * the heap, because they have no changes to evict.
* transactions including top-level transactions and subtransactions.
*
* We build the max-heap just before selecting the largest transactions
* if the number of transactions being decoded is higher than the threshold,
* MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
* update the max-heap when updating the memory counter. The intention is
* to efficiently find the largest transaction in O(1) time instead of
* incurring the cost of memory counter updates (O(log N)). Once the number
* of transactions got lower than the threshold, we reset the max-heap
* (refer to ReorderBufferMaybeResetMaxHeap() for details).
* *
* We still rely on max_changes_in_memory when loading serialized changes * We still rely on max_changes_in_memory when loading serialized changes
* back into memory. At that point we can't use the memory limit directly * back into memory. At that point we can't use the memory limit directly
@ -122,23 +112,6 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relfilenumbermap.h" #include "utils/relfilenumbermap.h"
/*
* Threshold of the total number of top-level and sub transactions that
* controls whether we use the max-heap for tracking their sizes. Although
* using the max-heap to select the largest transaction is effective when
* there are many transactions being decoded, maintaining the max-heap while
* updating the memory statistics can be costly. Therefore, we use
* MaxConnections as the threshold so that we use the max-heap only when
* using subtransactions.
*/
#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
/*
* A macro to check if the max-heap is ready to use and needs to be updated
* accordingly.
*/
#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
/* entry for a hash table we use to map from xid to our transaction state */ /* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt typedef struct ReorderBufferTXNByIdEnt
{ {
@ -290,9 +263,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno); TransactionId xid, XLogSegNo segno);
static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb); static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@ -390,16 +361,8 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0; buffer->outbufsize = 0;
buffer->size = 0; buffer->size = 0;
/* /* txn_heap is ordered by transaction size */
* The binaryheap is indexed for faster manipulations. buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL);
*
* We allocate the initial heap size greater than
* MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
* until the threshold is exceeded.
*/
buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
ReorderBufferTXNSizeCompare,
true, NULL);
buffer->spillTxns = 0; buffer->spillTxns = 0;
buffer->spillCount = 0; buffer->spillCount = 0;
@ -1637,12 +1600,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* deallocate */ /* deallocate */
ReorderBufferReturnTXN(rb, txn); ReorderBufferReturnTXN(rb, txn);
/*
* After cleaning up one transaction, the number of transactions might get
* lower than the threshold for the max-heap.
*/
ReorderBufferMaybeResetMaxHeap(rb);
} }
/* /*
@ -3265,20 +3222,18 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
if (addition) if (addition)
{ {
Size oldsize = txn->size;
txn->size += sz; txn->size += sz;
rb->size += sz; rb->size += sz;
/* Update the total size in the top transaction. */ /* Update the total size in the top transaction. */
toptxn->total_size += sz; toptxn->total_size += sz;
/* Update the max-heap as well if necessary */ /* Update the max-heap */
if (ReorderBufferMaxHeapIsReady(rb)) if (oldsize != 0)
{ pairingheap_remove(rb->txn_heap, &txn->txn_node);
if ((txn->size - sz) == 0) pairingheap_add(rb->txn_heap, &txn->txn_node);
binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
else
binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
}
} }
else else
{ {
@ -3289,14 +3244,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */ /* Update the total size in the top transaction. */
toptxn->total_size -= sz; toptxn->total_size -= sz;
/* Update the max-heap as well if necessary */ /* Update the max-heap */
if (ReorderBufferMaxHeapIsReady(rb)) pairingheap_remove(rb->txn_heap, &txn->txn_node);
{ if (txn->size != 0)
if (txn->size == 0) pairingheap_add(rb->txn_heap, &txn->txn_node);
binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
else
binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
}
} }
Assert(txn->size <= rb->size); Assert(txn->size <= rb->size);
@ -3555,10 +3506,10 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
/* Compare two transactions by size */ /* Compare two transactions by size */
static int static int
ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg) ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
{ {
ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a); const ReorderBufferTXN *ta = pairingheap_const_container(ReorderBufferTXN, txn_node, a);
ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b); const ReorderBufferTXN *tb = pairingheap_const_container(ReorderBufferTXN, txn_node, b);
if (ta->size < tb->size) if (ta->size < tb->size)
return -1; return -1;
@ -3568,106 +3519,16 @@ ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
} }
/* /*
* Build the max-heap. The heap assembly step is deferred until the end, for * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
* efficiency.
*/
static void
ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
{
HASH_SEQ_STATUS hash_seq;
ReorderBufferTXNByIdEnt *ent;
Assert(binaryheap_empty(rb->txn_heap));
hash_seq_init(&hash_seq, rb->by_txn);
while ((ent = hash_seq_search(&hash_seq)) != NULL)
{
ReorderBufferTXN *txn = ent->txn;
if (txn->size == 0)
continue;
binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
}
binaryheap_build(rb->txn_heap);
}
/*
* Reset the max-heap if the number of transactions got lower than the
* threshold.
*/
static void
ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
{
/*
* If we add and remove transactions right around the threshold, we could
* easily end up "thrashing". To avoid it, we adapt 10% of transactions to
* reset the max-heap.
*/
if (ReorderBufferMaxHeapIsReady(rb) &&
binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
binaryheap_reset(rb->txn_heap);
}
/*
* Find the largest transaction (toplevel or subxact) to evict (spill to disk)
* by doing a linear search or using the max-heap depending on the number of
* transactions in ReorderBuffer. Refer to the comments atop this file for the
* algorithm details.
*/ */
static ReorderBufferTXN * static ReorderBufferTXN *
ReorderBufferLargestTXN(ReorderBuffer *rb) ReorderBufferLargestTXN(ReorderBuffer *rb)
{ {
ReorderBufferTXN *largest = NULL; ReorderBufferTXN *largest;
if (!ReorderBufferMaxHeapIsReady(rb))
{
/*
* If the number of transactions are small, we scan all transactions
* being decoded to get the largest transaction. This saves the cost
* of building a max-heap with a small number of transactions.
*/
if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
{
HASH_SEQ_STATUS hash_seq;
ReorderBufferTXNByIdEnt *ent;
hash_seq_init(&hash_seq, rb->by_txn);
while ((ent = hash_seq_search(&hash_seq)) != NULL)
{
ReorderBufferTXN *txn = ent->txn;
/* if the current transaction is larger, remember it */
if ((!largest) || (txn->size > largest->size))
largest = txn;
}
Assert(largest);
}
else
{
/*
* There are a large number of transactions in ReorderBuffer. We
* build the max-heap for efficiently selecting the largest
* transactions.
*/
ReorderBufferBuildMaxHeap(rb);
/*
* The max-heap is ready now. We remain the max-heap at least
* until we free up enough transactions to bring the total memory
* usage below the limit. The largest transaction is selected
* below.
*/
Assert(ReorderBufferMaxHeapIsReady(rb));
}
}
/* Get the largest transaction from the max-heap */ /* Get the largest transaction from the max-heap */
if (ReorderBufferMaxHeapIsReady(rb)) largest = pairingheap_container(ReorderBufferTXN, txn_node,
largest = (ReorderBufferTXN *) pairingheap_first(rb->txn_heap));
DatumGetPointer(binaryheap_first(rb->txn_heap));
Assert(largest); Assert(largest);
Assert(largest->size > 0); Assert(largest->size > 0);
@ -3812,12 +3673,6 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
/* We must be under the memory limit now. */ /* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * 1024L); Assert(rb->size < logical_decoding_work_mem * 1024L);
/*
* After evicting some transactions, the number of transactions might get
* lower than the threshold for the max-heap.
*/
ReorderBufferMaybeResetMaxHeap(rb);
} }
/* /*

View File

@ -10,8 +10,8 @@
#define REORDERBUFFER_H #define REORDERBUFFER_H
#include "access/htup_details.h" #include "access/htup_details.h"
#include "lib/binaryheap.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "storage/sinval.h" #include "storage/sinval.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/relcache.h" #include "utils/relcache.h"
@ -402,6 +402,11 @@ typedef struct ReorderBufferTXN
*/ */
dlist_node catchange_node; dlist_node catchange_node;
/*
* A node in txn_heap
*/
pairingheap_node txn_node;
/* /*
* Size of this transaction (changes currently in memory, in bytes). * Size of this transaction (changes currently in memory, in bytes).
*/ */
@ -633,7 +638,7 @@ struct ReorderBuffer
Size size; Size size;
/* Max-heap for sizes of all top-level and sub transactions */ /* Max-heap for sizes of all top-level and sub transactions */
binaryheap *txn_heap; pairingheap *txn_heap;
/* /*
* Statistics about transactions spilled to disk. * Statistics about transactions spilled to disk.