Move tracking of in_streaming to PGOutputData

"in_streaming" is a flag used to track if an instance of pgoutput is
streaming changes.  When pgoutput is started, the flag was always reset,
switched it back and forth in the stream start/stop callbacks.

Before this commit, it was a global variable, which is confusing as it
is actually attached to a state of PGOutputData.  Per my analysis, using
a global variable did not lead to an active bug like in 54ccfd6586,
but it makes the code more consistent.  Note that we cannot backpatch
this change anyway as it requires the addition of a new field to
PGOutputData, exposed in pgoutput.h.

Author: Hou Zhijie
Reviewed-by: Amit Kapila, Michael Paquier, Peter Smith
Discussion: https://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Michael Paquier 2023-09-28 09:33:51 +09:00
parent ebf76f2753
commit 9210afd3bc
2 changed files with 21 additions and 16 deletions

View File

@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
if (in_streaming)
if (data->in_streaming)
xid = change->txn->xid;
if (rbtxn_is_subtxn(change->txn))
@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
if (in_streaming)
if (data->in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
if (in_streaming)
if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* their association and on aborts, it can discard the corresponding
* changes.
*/
if (in_streaming)
if (data->in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
if (data->in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
if (in_streaming)
if (data->in_streaming)
xid = txn->xid;
/*
@ -1743,10 +1740,11 @@ static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
Assert(!in_streaming);
Assert(!data->in_streaming);
/*
* If we already sent the first stream for this transaction then don't
@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
in_streaming = true;
data->in_streaming = true;
}
/*
@ -1774,15 +1772,17 @@ static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
/* we should be streaming a transaction */
Assert(in_streaming);
Assert(data->in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
in_streaming = false;
data->in_streaming = false;
}
/*
@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
Assert(!data->in_streaming);
/* determine the toplevel transaction */
toptxn = rbtxn_get_toptxn(txn);
@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
Assert(!data->in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);

View File

@ -21,6 +21,9 @@ typedef struct PGOutputData
* allocations */
MemoryContext cachectx; /* private memory context for cache data */
bool in_streaming; /* true if we are streaming a chunk of
* transaction */
/* client-supplied info: */
uint32 protocol_version;
List *publication_names;