From 9210afd3bcd65feccb883ace4ed6dcef6a684585 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Thu, 28 Sep 2023 09:33:51 +0900 Subject: [PATCH] Move tracking of in_streaming to PGOutputData "in_streaming" is a flag used to track if an instance of pgoutput is streaming changes. When pgoutput is started, the flag was always reset, switched it back and forth in the stream start/stop callbacks. Before this commit, it was a global variable, which is confusing as it is actually attached to a state of PGOutputData. Per my analysis, using a global variable did not lead to an active bug like in 54ccfd65868c, but it makes the code more consistent. Note that we cannot backpatch this change anyway as it requires the addition of a new field to PGOutputData, exposed in pgoutput.h. Author: Hou Zhijie Reviewed-by: Amit Kapila, Michael Paquier, Peter Smith Discussion: https://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com --- src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++---------- src/include/replication/pgoutput.h | 3 ++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 251ba46da5..c1c66848f3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; -static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("streaming requested, but not supported by output plugin"))); - /* Also remember we're currently not streaming any transaction. */ - in_streaming = false; - /* * Here, we just check whether the two-phase option is passed by * plugin and decide whether to enable it at later point of time. It @@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool schema_sent; TransactionId xid = InvalidTransactionId; TransactionId topxid = InvalidTransactionId; @@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * If we're not in a streaming block, just use InvalidTransactionId and * the write methods will not include it. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; if (rbtxn_is_subtxn(change->txn)) @@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * doing that we need to study its impact on the case where we have a mix * of streaming and non-streaming transactions. */ - if (in_streaming) + if (data->in_streaming) schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); else schema_sent = relentry->schema_sent; @@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, send_relation_and_attrs(relation, xid, ctx, relentry->columns); - if (in_streaming) + if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); else relentry->schema_sent = true; @@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * their association and on aborts, it can discard the corresponding * changes. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; relentry = get_rel_sync_entry(data, relation); @@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; /* Remember the xid for the change in streaming mode. See pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Remember the xid for the message in streaming mode. See * pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = txn->xid; /* @@ -1743,10 +1740,11 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool send_replication_origin = txn->origin_id != InvalidRepOriginId; /* we can't nest streaming of transactions */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* * If we already sent the first stream for this transaction then don't @@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); /* we're streaming a chunk of transaction now */ - in_streaming = true; + data->in_streaming = true; } /* @@ -1774,15 +1772,17 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + /* we should be streaming a transaction */ - Assert(in_streaming); + Assert(data->in_streaming); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); /* we've stopped streaming a transaction */ - in_streaming = false; + data->in_streaming = false; } /* @@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, * The abort should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* determine the toplevel transaction */ toptxn = rbtxn_get_toptxn(txn); @@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); Assert(rbtxn_is_streamed(txn)); OutputPluginUpdateProgress(ctx, false); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index b3f9a01629..cee209e4cc 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -21,6 +21,9 @@ typedef struct PGOutputData * allocations */ MemoryContext cachectx; /* private memory context for cache data */ + bool in_streaming; /* true if we are streaming a chunk of + * transaction */ + /* client-supplied info: */ uint32 protocol_version; List *publication_names;