diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 251ba46da5..c1c66848f3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; -static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("streaming requested, but not supported by output plugin"))); - /* Also remember we're currently not streaming any transaction. */ - in_streaming = false; - /* * Here, we just check whether the two-phase option is passed by * plugin and decide whether to enable it at later point of time. It @@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool schema_sent; TransactionId xid = InvalidTransactionId; TransactionId topxid = InvalidTransactionId; @@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * If we're not in a streaming block, just use InvalidTransactionId and * the write methods will not include it. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; if (rbtxn_is_subtxn(change->txn)) @@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * doing that we need to study its impact on the case where we have a mix * of streaming and non-streaming transactions. */ - if (in_streaming) + if (data->in_streaming) schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); else schema_sent = relentry->schema_sent; @@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, send_relation_and_attrs(relation, xid, ctx, relentry->columns); - if (in_streaming) + if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); else relentry->schema_sent = true; @@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * their association and on aborts, it can discard the corresponding * changes. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; relentry = get_rel_sync_entry(data, relation); @@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; /* Remember the xid for the change in streaming mode. See pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Remember the xid for the message in streaming mode. See * pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = txn->xid; /* @@ -1743,10 +1740,11 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool send_replication_origin = txn->origin_id != InvalidRepOriginId; /* we can't nest streaming of transactions */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* * If we already sent the first stream for this transaction then don't @@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); /* we're streaming a chunk of transaction now */ - in_streaming = true; + data->in_streaming = true; } /* @@ -1774,15 +1772,17 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + /* we should be streaming a transaction */ - Assert(in_streaming); + Assert(data->in_streaming); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); /* we've stopped streaming a transaction */ - in_streaming = false; + data->in_streaming = false; } /* @@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, * The abort should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* determine the toplevel transaction */ toptxn = rbtxn_get_toptxn(txn); @@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); Assert(rbtxn_is_streamed(txn)); OutputPluginUpdateProgress(ctx, false); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index b3f9a01629..cee209e4cc 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -21,6 +21,9 @@ typedef struct PGOutputData * allocations */ MemoryContext cachectx; /* private memory context for cache data */ + bool in_streaming; /* true if we are streaming a chunk of + * transaction */ + /* client-supplied info: */ uint32 protocol_version; List *publication_names;