diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b3ee7fa7ea..bd4c3cf325 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -191,6 +191,7 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool assigned; /* assigned to top-level XID */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -223,6 +224,7 @@ typedef struct SerializedTransactionState static TransactionStateData TopTransactionStateData = { .state = TRANS_DEFAULT, .blockState = TBLOCK_DEFAULT, + .assigned = false, }; /* @@ -5120,6 +5122,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->assigned = false; CurrentTransactionState = s; @@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * IsSubTransactionAssignmentPending + * + * This is used to decide whether we need to WAL log the top-level XID for + * operation in a subtransaction. We require that for logical decoding, see + * LogicalDecodingProcessRecord. + * + * This returns true if wal_level >= logical and we are inside a valid + * subtransaction, for which the assignment was not yet written to any WAL + * record. + */ +bool +IsSubTransactionAssignmentPending(void) +{ + /* wal_level has to be logical */ + if (!XLogLogicalInfoActive()) + return false; + + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + /* and it should not be already 'assigned' */ + return !CurrentTransactionState->assigned; +} + +/* + * MarkSubTransactionAssigned + * + * Mark the subtransaction assignment as completed. + */ +void +MarkSubTransactionAssigned(void) +{ + Assert(IsSubTransactionAssignmentPending()); + + CurrentTransactionState->assigned = true; +} diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b21679f09e..c526bb1928 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -89,11 +89,13 @@ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfXLogTransactionId) /* * An array of XLogRecData structs, to hold registered data. @@ -195,6 +197,10 @@ XLogResetInsertion(void) { int i; + /* reset the subxact assignment flag (if needed) */ + if (curinsert_flags & XLOG_INCLUDE_XID) + MarkSubTransactionAssigned(); + for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -398,7 +404,7 @@ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); - curinsert_flags = flags; + curinsert_flags |= flags; } /* @@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(replorigin_session_origin); } + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubTransactionAssignmentPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* update the flag (later used by XLogResetInsertion) */ + XLogSetRecordFlags(XLOG_INCLUDE_XID); + + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index cb76be4f46..a757baccfc 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) state->decoded_record = record; state->record_origin = InvalidRepOriginId; + state->toplevel_xid = InvalidTransactionId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) { COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); } + else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) + { + COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3abf8..0c0c371739 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -94,11 +94,27 @@ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { XLogRecordBuffer buf; + TransactionId txid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; buf.record = record; + txid = XLogRecGetTopXid(record); + + /* + * If the top-level xid is valid, we need to assign the subxact to the + * top-level xact. We need to do this for all records, hence we do it + * before the switch. + */ + if (TransactionIdIsValid(txid)) + { + ReorderBufferAssignChild(ctx->reorder, + txid, + record->decoded_record->xl_xid, + buf.origptr); + } + /* cast so we get a warning when new rmgrs are added */ switch ((RmgrId) XLogRecGetRmid(record)) { @@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If the snapshot isn't yet fully built, we cannot decode anything, so * bail out. - * - * However, it's critical to process XLOG_XACT_ASSIGNMENT records even - * when the snapshot is being built: it is possible to get later records - * that require subxids to be properly assigned. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && - info != XLOG_XACT_ASSIGNMENT) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) @@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_ASSIGNMENT: - { - xl_xact_assignment *xlrec; - int i; - TransactionId *sub_xid; - xlrec = (xl_xact_assignment *) XLogRecGetData(r); - - sub_xid = &xlrec->xsub[0]; - - for (i = 0; i < xlrec->nsubxacts; i++) - { - ReorderBufferAssignChild(reorder, xlrec->xtop, - *(sub_xid++), buf->origptr); - } - break; - } + /* + * We assign subxact to the toplevel xact while processing each + * record if required. So, we don't need to do anything here. + * See LogicalDecodingProcessRecord. + */ + break; case XLOG_XACT_PREPARE: /* diff --git a/src/include/access/xact.h b/src/include/access/xact.h index db191879b9..aef8555367 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); +extern bool IsSubTransactionAssignmentPending(void); +extern void MarkSubTransactionAssigned(void); + extern int xactGetCommittedChildren(TransactionId **ptr); extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 5b14334887..d8391aa378 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -237,6 +237,7 @@ extern bool XLOG_DEBUG; */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +#define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */ /* Checkpoint statistics */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 88f3d76700..b9490a3afe 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -31,7 +31,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index b0f2a6ed43..b976882229 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -191,6 +191,8 @@ struct XLogReaderState RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of top-level transaction */ + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetOrigin(decoder) ((decoder)->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index acd9af0194..2f0c8bf589 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_ORIGIN 253 +#define XLR_BLOCK_ID_TOPLEVEL_XID 252 #endif /* XLOGRECORD_H */