Add macros for ReorderBufferTXN toptxn.
Currently, there are quite a few places in reorderbuffer.c that tries to access top-transaction for a subtransaction. This makes the code to access top-transaction consistent and easier to follow. Author: Peter Smith Reviewed-by: Vignesh C, Sawada Masahiko Discussion: https://postgr.es/m/CAHut+PuCznOyTqBQwjRUu-ibG-=KHyCv-0FTcWQtZUdR88umfg@mail.gmail.com
This commit is contained in:
parent
eb7d043c9b
commit
e709596b25
|
@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
|
|||
* maintain the output_plugin_private only under the toptxn so if this is
|
||||
* not the toptxn then fetch the toptxn.
|
||||
*/
|
||||
ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
|
||||
ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
|
||||
TestDecodingTxnData *txndata = toptxn->output_plugin_private;
|
||||
bool xact_wrote_changes = txndata->xact_wrote_changes;
|
||||
|
||||
if (txn->toptxn == NULL)
|
||||
if (rbtxn_is_toptxn(txn))
|
||||
{
|
||||
Assert(txn->output_plugin_private != NULL);
|
||||
pfree(txndata);
|
||||
|
|
|
@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
|||
return;
|
||||
|
||||
/* Get the top transaction. */
|
||||
if (txn->toptxn != NULL)
|
||||
toptxn = txn->toptxn;
|
||||
else
|
||||
toptxn = txn;
|
||||
toptxn = rbtxn_get_toptxn(txn);
|
||||
|
||||
/*
|
||||
* Indicate a partial change for toast inserts. The change will be
|
||||
|
@ -809,13 +806,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
|
|||
change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
|
||||
change->action == REORDER_BUFFER_CHANGE_MESSAGE)
|
||||
{
|
||||
ReorderBufferTXN *toptxn;
|
||||
|
||||
/* get the top transaction */
|
||||
if (txn->toptxn != NULL)
|
||||
toptxn = txn->toptxn;
|
||||
else
|
||||
toptxn = txn;
|
||||
ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
|
||||
|
||||
toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
|
||||
}
|
||||
|
@ -1655,9 +1646,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
|
|||
/*
|
||||
* Mark the transaction as streamed.
|
||||
*
|
||||
* The toplevel transaction, identified by (toptxn==NULL), is marked as
|
||||
* streamed always, even if it does not contain any changes (that is, when
|
||||
* all the changes are in subtransactions).
|
||||
* The top-level transaction, is marked as streamed always, even if it
|
||||
* does not contain any changes (that is, when all the changes are in
|
||||
* subtransactions).
|
||||
*
|
||||
* For subtransactions, we only mark them as streamed when there are
|
||||
* changes in them.
|
||||
|
@ -1667,7 +1658,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
|
|||
* about the toplevel xact (we send the XID in all messages), but we never
|
||||
* stream XIDs of empty subxacts.
|
||||
*/
|
||||
if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
|
||||
if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
|
||||
txn->txn_flags |= RBTXN_IS_STREAMED;
|
||||
|
||||
if (txn_prepared)
|
||||
|
@ -3207,10 +3198,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
|
|||
* Update the total size in top level as well. This is later used to
|
||||
* compute the decoding stats.
|
||||
*/
|
||||
if (txn->toptxn != NULL)
|
||||
toptxn = txn->toptxn;
|
||||
else
|
||||
toptxn = txn;
|
||||
toptxn = rbtxn_get_toptxn(txn);
|
||||
|
||||
if (addition)
|
||||
{
|
||||
|
@ -3295,8 +3283,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
|
|||
* so that we can execute them all together. See comments atop this
|
||||
* function.
|
||||
*/
|
||||
if (txn->toptxn)
|
||||
txn = txn->toptxn;
|
||||
txn = rbtxn_get_toptxn(txn);
|
||||
|
||||
Assert(nmsgs > 0);
|
||||
|
||||
|
@ -3354,7 +3341,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
|
|||
XLogRecPtr lsn)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
ReorderBufferTXN *toptxn;
|
||||
|
||||
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
|
||||
|
||||
|
@ -3370,11 +3356,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
|
|||
* conveniently check just top-level transaction and decide whether to
|
||||
* build the hash table or not.
|
||||
*/
|
||||
toptxn = txn->toptxn;
|
||||
if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
|
||||
if (rbtxn_is_subtxn(txn))
|
||||
{
|
||||
toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
|
||||
dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
|
||||
ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
|
||||
|
||||
if (!rbtxn_has_catalog_changes(toptxn))
|
||||
{
|
||||
toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
|
||||
dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3619,7 +3609,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
|
|||
(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
|
||||
{
|
||||
/* we know there has to be one, because the size is not zero */
|
||||
Assert(txn && !txn->toptxn);
|
||||
Assert(txn && rbtxn_is_toptxn(txn));
|
||||
Assert(txn->total_size > 0);
|
||||
Assert(rb->size >= txn->total_size);
|
||||
|
||||
|
@ -4007,7 +3997,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
|||
bool txn_is_streamed;
|
||||
|
||||
/* We can never reach here for a subtransaction. */
|
||||
Assert(txn->toptxn == NULL);
|
||||
Assert(rbtxn_is_toptxn(txn));
|
||||
|
||||
/*
|
||||
* We can't make any assumptions about base snapshot here, similar to what
|
||||
|
|
|
@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
|
|||
if (in_streaming)
|
||||
xid = change->txn->xid;
|
||||
|
||||
if (change->txn->toptxn)
|
||||
topxid = change->txn->toptxn->xid;
|
||||
if (rbtxn_is_subtxn(change->txn))
|
||||
topxid = rbtxn_get_toptxn(change->txn)->xid;
|
||||
else
|
||||
topxid = xid;
|
||||
|
||||
|
@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
|
|||
Assert(!in_streaming);
|
||||
|
||||
/* determine the toplevel transaction */
|
||||
toptxn = (txn->toptxn) ? txn->toptxn : txn;
|
||||
toptxn = rbtxn_get_toptxn(txn);
|
||||
|
||||
Assert(rbtxn_is_streamed(toptxn));
|
||||
|
||||
|
|
|
@ -249,6 +249,24 @@ typedef struct ReorderBufferChange
|
|||
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
|
||||
)
|
||||
|
||||
/* Is this a top-level transaction? */
|
||||
#define rbtxn_is_toptxn(txn) \
|
||||
( \
|
||||
(txn)->toptxn == NULL \
|
||||
)
|
||||
|
||||
/* Is this a subtransaction? */
|
||||
#define rbtxn_is_subtxn(txn) \
|
||||
( \
|
||||
(txn)->toptxn != NULL \
|
||||
)
|
||||
|
||||
/* Get the top-level transaction of this (sub)transaction. */
|
||||
#define rbtxn_get_toptxn(txn) \
|
||||
( \
|
||||
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
|
||||
)
|
||||
|
||||
typedef struct ReorderBufferTXN
|
||||
{
|
||||
/* See above */
|
||||
|
|
Loading…
Reference in New Issue