diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5cf28d4df4..98a827e0b6 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -68,19 +68,9 @@ * memory gets actually freed. * * We use a max-heap with transaction size as the key to efficiently find - * the largest transaction. While the max-heap is empty, we don't update - * the max-heap when updating the memory counter. Therefore, we can get - * the largest transaction in O(N) time, where N is the number of - * transactions including top-level transactions and subtransactions. - * - * We build the max-heap just before selecting the largest transactions - * if the number of transactions being decoded is higher than the threshold, - * MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also - * update the max-heap when updating the memory counter. The intention is - * to efficiently find the largest transaction in O(1) time instead of - * incurring the cost of memory counter updates (O(log N)). Once the number - * of transactions got lower than the threshold, we reset the max-heap - * (refer to ReorderBufferMaybeResetMaxHeap() for details). + * the largest transaction. We update the max-heap whenever the memory + * counter is updated; however transactions with size 0 are not stored in + * the heap, because they have no changes to evict. * * We still rely on max_changes_in_memory when loading serialized changes * back into memory. At that point we can't use the memory limit directly @@ -122,23 +112,6 @@ #include "utils/rel.h" #include "utils/relfilenumbermap.h" -/* - * Threshold of the total number of top-level and sub transactions that - * controls whether we use the max-heap for tracking their sizes. Although - * using the max-heap to select the largest transaction is effective when - * there are many transactions being decoded, maintaining the max-heap while - * updating the memory statistics can be costly. Therefore, we use - * MaxConnections as the threshold so that we use the max-heap only when - * using subtransactions. - */ -#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections - -/* - * A macro to check if the max-heap is ready to use and needs to be updated - * accordingly. - */ -#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap) - /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt { @@ -290,9 +263,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); -static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb); -static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb); -static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg); +static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -390,16 +361,8 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; - /* - * The binaryheap is indexed for faster manipulations. - * - * We allocate the initial heap size greater than - * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used - * until the threshold is exceeded. - */ - buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2, - ReorderBufferTXNSizeCompare, - true, NULL); + /* txn_heap is ordered by transaction size */ + buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL); buffer->spillTxns = 0; buffer->spillCount = 0; @@ -1637,12 +1600,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* deallocate */ ReorderBufferReturnTXN(rb, txn); - - /* - * After cleaning up one transaction, the number of transactions might get - * lower than the threshold for the max-heap. - */ - ReorderBufferMaybeResetMaxHeap(rb); } /* @@ -3265,20 +3222,18 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (addition) { + Size oldsize = txn->size; + txn->size += sz; rb->size += sz; /* Update the total size in the top transaction. */ toptxn->total_size += sz; - /* Update the max-heap as well if necessary */ - if (ReorderBufferMaxHeapIsReady(rb)) - { - if ((txn->size - sz) == 0) - binaryheap_add(rb->txn_heap, PointerGetDatum(txn)); - else - binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn)); - } + /* Update the max-heap */ + if (oldsize != 0) + pairingheap_remove(rb->txn_heap, &txn->txn_node); + pairingheap_add(rb->txn_heap, &txn->txn_node); } else { @@ -3289,14 +3244,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size -= sz; - /* Update the max-heap as well if necessary */ - if (ReorderBufferMaxHeapIsReady(rb)) - { - if (txn->size == 0) - binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn)); - else - binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn)); - } + /* Update the max-heap */ + pairingheap_remove(rb->txn_heap, &txn->txn_node); + if (txn->size != 0) + pairingheap_add(rb->txn_heap, &txn->txn_node); } Assert(txn->size <= rb->size); @@ -3555,10 +3506,10 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) /* Compare two transactions by size */ static int -ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg) +ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg) { - ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a); - ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b); + const ReorderBufferTXN *ta = pairingheap_const_container(ReorderBufferTXN, txn_node, a); + const ReorderBufferTXN *tb = pairingheap_const_container(ReorderBufferTXN, txn_node, b); if (ta->size < tb->size) return -1; @@ -3568,106 +3519,16 @@ ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg) } /* - * Build the max-heap. The heap assembly step is deferred until the end, for - * efficiency. - */ -static void -ReorderBufferBuildMaxHeap(ReorderBuffer *rb) -{ - HASH_SEQ_STATUS hash_seq; - ReorderBufferTXNByIdEnt *ent; - - Assert(binaryheap_empty(rb->txn_heap)); - - hash_seq_init(&hash_seq, rb->by_txn); - while ((ent = hash_seq_search(&hash_seq)) != NULL) - { - ReorderBufferTXN *txn = ent->txn; - - if (txn->size == 0) - continue; - - binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn)); - } - - binaryheap_build(rb->txn_heap); -} - -/* - * Reset the max-heap if the number of transactions got lower than the - * threshold. - */ -static void -ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb) -{ - /* - * If we add and remove transactions right around the threshold, we could - * easily end up "thrashing". To avoid it, we adapt 10% of transactions to - * reset the max-heap. - */ - if (ReorderBufferMaxHeapIsReady(rb) && - binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9) - binaryheap_reset(rb->txn_heap); -} - -/* - * Find the largest transaction (toplevel or subxact) to evict (spill to disk) - * by doing a linear search or using the max-heap depending on the number of - * transactions in ReorderBuffer. Refer to the comments atop this file for the - * algorithm details. + * Find the largest transaction (toplevel or subxact) to evict (spill to disk). */ static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb) { - ReorderBufferTXN *largest = NULL; - - if (!ReorderBufferMaxHeapIsReady(rb)) - { - /* - * If the number of transactions are small, we scan all transactions - * being decoded to get the largest transaction. This saves the cost - * of building a max-heap with a small number of transactions. - */ - if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD) - { - HASH_SEQ_STATUS hash_seq; - ReorderBufferTXNByIdEnt *ent; - - hash_seq_init(&hash_seq, rb->by_txn); - while ((ent = hash_seq_search(&hash_seq)) != NULL) - { - ReorderBufferTXN *txn = ent->txn; - - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; - } - - Assert(largest); - } - else - { - /* - * There are a large number of transactions in ReorderBuffer. We - * build the max-heap for efficiently selecting the largest - * transactions. - */ - ReorderBufferBuildMaxHeap(rb); - - /* - * The max-heap is ready now. We remain the max-heap at least - * until we free up enough transactions to bring the total memory - * usage below the limit. The largest transaction is selected - * below. - */ - Assert(ReorderBufferMaxHeapIsReady(rb)); - } - } + ReorderBufferTXN *largest; /* Get the largest transaction from the max-heap */ - if (ReorderBufferMaxHeapIsReady(rb)) - largest = (ReorderBufferTXN *) - DatumGetPointer(binaryheap_first(rb->txn_heap)); + largest = pairingheap_container(ReorderBufferTXN, txn_node, + pairingheap_first(rb->txn_heap)); Assert(largest); Assert(largest->size > 0); @@ -3812,12 +3673,6 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * 1024L); - /* - * After evicting some transactions, the number of transactions might get - * lower than the threshold for the max-heap. - */ - ReorderBufferMaybeResetMaxHeap(rb); - } /* diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index a5aec01c2f..851a001c8b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,8 +10,8 @@ #define REORDERBUFFER_H #include "access/htup_details.h" -#include "lib/binaryheap.h" #include "lib/ilist.h" +#include "lib/pairingheap.h" #include "storage/sinval.h" #include "utils/hsearch.h" #include "utils/relcache.h" @@ -402,6 +402,11 @@ typedef struct ReorderBufferTXN */ dlist_node catchange_node; + /* + * A node in txn_heap + */ + pairingheap_node txn_node; + /* * Size of this transaction (changes currently in memory, in bytes). */ @@ -633,7 +638,7 @@ struct ReorderBuffer Size size; /* Max-heap for sizes of all top-level and sub transactions */ - binaryheap *txn_heap; + pairingheap *txn_heap; /* * Statistics about transactions spilled to disk.