diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 390963d6b5..bc7cbb25fc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -564,7 +564,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; + Snapshot snapshot = NULL; xl_logical_message *message; if (info != XLOG_LOGICAL_MESSAGE) @@ -594,7 +594,17 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuildXactNeedsSkip(builder, buf->origptr))) return; - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + /* + * If this is a non-transactional change, get the snapshot we're expected + * to use. We only get here when the snapshot is consistent, and the + * change is not meant to be skipped. + * + * For transactional changes we don't need a snapshot, we'll use the + * regular snapshot maintained by ReorderBuffer. We just leave it NULL. + */ + if (!message->transactional) + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, message->transactional, message->message, /* first part of message is diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e3b67e4447..b1882ae5ec 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -828,6 +828,13 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Assert(xid != InvalidTransactionId); + /* + * We don't expect snapshots for transactional changes - we'll use the + * snapshot derived later during apply (unless the change gets + * skipped). + */ + Assert(!snapshot); + oldcontext = MemoryContextSwitchTo(rb->context); change = ReorderBufferGetChange(rb); @@ -846,6 +853,9 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, ReorderBufferTXN *txn = NULL; volatile Snapshot snapshot_now = snapshot; + /* Non-transactional changes require a valid snapshot. */ + Assert(snapshot_now); + if (xid != InvalidTransactionId) txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);