From e709596b25bd184d6566dfff240e3f672a548afe Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 17 Mar 2023 08:29:41 +0530 Subject: [PATCH] Add macros for ReorderBufferTXN toptxn. Currently, there are quite a few places in reorderbuffer.c that tries to access top-transaction for a subtransaction. This makes the code to access top-transaction consistent and easier to follow. Author: Peter Smith Reviewed-by: Vignesh C, Sawada Masahiko Discussion: https://postgr.es/m/CAHut+PuCznOyTqBQwjRUu-ibG-=KHyCv-0FTcWQtZUdR88umfg@mail.gmail.com --- contrib/test_decoding/test_decoding.c | 4 +- .../replication/logical/reorderbuffer.c | 46 ++++++++----------- src/backend/replication/pgoutput/pgoutput.c | 6 +-- src/include/replication/reorderbuffer.h | 18 ++++++++ 4 files changed, 41 insertions(+), 33 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index b7e6048647..628c6a2595 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, * maintain the output_plugin_private only under the toptxn so if this is * not the toptxn then fetch the toptxn. */ - ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn; + ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); TestDecodingTxnData *txndata = toptxn->output_plugin_private; bool xact_wrote_changes = txndata->xact_wrote_changes; - if (txn->toptxn == NULL) + if (rbtxn_is_toptxn(txn)) { Assert(txn->output_plugin_private != NULL); pfree(txndata); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2d17c551a8..9f44974473 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, return; /* Get the top transaction. */ - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; + toptxn = rbtxn_get_toptxn(txn); /* * Indicate a partial change for toast inserts. The change will be @@ -809,13 +806,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, change->action == REORDER_BUFFER_CHANGE_TRUNCATE || change->action == REORDER_BUFFER_CHANGE_MESSAGE) { - ReorderBufferTXN *toptxn; - - /* get the top transaction */ - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; + ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; } @@ -1655,9 +1646,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* * Mark the transaction as streamed. * - * The toplevel transaction, identified by (toptxn==NULL), is marked as - * streamed always, even if it does not contain any changes (that is, when - * all the changes are in subtransactions). + * The top-level transaction, is marked as streamed always, even if it + * does not contain any changes (that is, when all the changes are in + * subtransactions). * * For subtransactions, we only mark them as streamed when there are * changes in them. @@ -1667,7 +1658,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) + if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; if (txn_prepared) @@ -3207,10 +3198,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, * Update the total size in top level as well. This is later used to * compute the decoding stats. */ - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; + toptxn = rbtxn_get_toptxn(txn); if (addition) { @@ -3295,8 +3283,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, * so that we can execute them all together. See comments atop this * function. */ - if (txn->toptxn) - txn = txn->toptxn; + txn = rbtxn_get_toptxn(txn); Assert(nmsgs > 0); @@ -3354,7 +3341,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { ReorderBufferTXN *txn; - ReorderBufferTXN *toptxn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -3370,11 +3356,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, * conveniently check just top-level transaction and decide whether to * build the hash table or not. */ - toptxn = txn->toptxn; - if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn)) + if (rbtxn_is_subtxn(txn)) { - toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; - dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); + + if (!rbtxn_has_catalog_changes(toptxn)) + { + toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + } } } @@ -3619,7 +3609,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) { /* we know there has to be one, because the size is not zero */ - Assert(txn && !txn->toptxn); + Assert(txn && rbtxn_is_toptxn(txn)); Assert(txn->total_size > 0); Assert(rb->size >= txn->total_size); @@ -4007,7 +3997,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) bool txn_is_streamed; /* We can never reach here for a subtransaction. */ - Assert(txn->toptxn == NULL); + Assert(rbtxn_is_toptxn(txn)); /* * We can't make any assumptions about base snapshot here, similar to what diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 00a2d73dab..3a2d2e357e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, if (in_streaming) xid = change->txn->xid; - if (change->txn->toptxn) - topxid = change->txn->toptxn->xid; + if (rbtxn_is_subtxn(change->txn)) + topxid = rbtxn_get_toptxn(change->txn)->xid; else topxid = xid; @@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, Assert(!in_streaming); /* determine the toplevel transaction */ - toptxn = (txn->toptxn) ? txn->toptxn : txn; + toptxn = rbtxn_get_toptxn(txn); Assert(rbtxn_is_streamed(toptxn)); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 215d1494e9..e37f5120eb 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -249,6 +249,24 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ ) +/* Is this a top-level transaction? */ +#define rbtxn_is_toptxn(txn) \ +( \ + (txn)->toptxn == NULL \ +) + +/* Is this a subtransaction? */ +#define rbtxn_is_subtxn(txn) \ +( \ + (txn)->toptxn != NULL \ +) + +/* Get the top-level transaction of this (sub)transaction. */ +#define rbtxn_get_toptxn(txn) \ +( \ + rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \ +) + typedef struct ReorderBufferTXN { /* See above */