diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index ca6f6d57d3..8e35c432f5 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -205,7 +205,7 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ - bool assigned; /* assigned to top-level XID */ + bool topXidLogged; /* for a subxact: is top-level XID logged? */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -238,7 +238,7 @@ typedef struct SerializedTransactionState static TransactionStateData TopTransactionStateData = { .state = TRANS_DEFAULT, .blockState = TBLOCK_DEFAULT, - .assigned = false, + .topXidLogged = false, }; /* @@ -529,6 +529,56 @@ MarkCurrentTransactionIdLoggedIfAny(void) CurrentTransactionState->didLogXid = true; } +/* + * IsSubxactTopXidLogPending + * + * This is used to decide whether we need to WAL log the top-level XID for + * operation in a subtransaction. We require that for logical decoding, see + * LogicalDecodingProcessRecord. + * + * This returns true if wal_level >= logical and we are inside a valid + * subtransaction, for which the assignment was not yet written to any WAL + * record. + */ +bool +IsSubxactTopXidLogPending(void) +{ + /* check whether it is already logged */ + if (CurrentTransactionState->topXidLogged) + return false; + + /* wal_level has to be logical */ + if (!XLogLogicalInfoActive()) + return false; + + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + return true; +} + +/* + * MarkSubxactTopXidLogged + * + * Remember that the top transaction id for the current subtransaction is WAL + * logged now. + */ +void +MarkSubxactTopXidLogged(void) +{ + Assert(IsSubxactTopXidLogPending()); + + CurrentTransactionState->topXidLogged = true; +} /* * GetStableLatestTransactionId @@ -5174,7 +5224,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; - s->assigned = false; + s->topXidLogged = false; CurrentTransactionState = s; @@ -6106,50 +6156,3 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } - -/* - * IsSubTransactionAssignmentPending - * - * This is used to decide whether we need to WAL log the top-level XID for - * operation in a subtransaction. We require that for logical decoding, see - * LogicalDecodingProcessRecord. - * - * This returns true if wal_level >= logical and we are inside a valid - * subtransaction, for which the assignment was not yet written to any WAL - * record. - */ -bool -IsSubTransactionAssignmentPending(void) -{ - /* wal_level has to be logical */ - if (!XLogLogicalInfoActive()) - return false; - - /* we need to be in a transaction state */ - if (!IsTransactionState()) - return false; - - /* it has to be a subtransaction */ - if (!IsSubTransaction()) - return false; - - /* the subtransaction has to have a XID assigned */ - if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) - return false; - - /* and it should not be already 'assigned' */ - return !CurrentTransactionState->assigned; -} - -/* - * MarkSubTransactionAssigned - * - * Mark the subtransaction assignment as completed. - */ -void -MarkSubTransactionAssigned(void) -{ - Assert(IsSubTransactionAssignmentPending()); - - CurrentTransactionState->assigned = true; -} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f547efd294..6aca1fd75d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -999,6 +999,9 @@ static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); * 'flags' gives more in-depth control on the record being inserted. See * XLogSetRecordFlags() for details. * + * 'topxid_included' tells whether the top-transaction id is logged along with + * current subtransaction. See XLogRecordAssemble(). + * * The first XLogRecData in the chain must be for the record header, and its * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and * xl_crc fields in the header, the rest of the header must already be filled @@ -1014,7 +1017,8 @@ XLogRecPtr XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags, - int num_fpi) + int num_fpi, + bool topxid_included) { XLogCtlInsert *Insert = &XLogCtl->Insert; pg_crc32c rdata_crc; @@ -1169,6 +1173,13 @@ XLogInsertRecord(XLogRecData *rdata, END_CRIT_SECTION(); + /* + * Mark top transaction id is logged (if needed) so that we should not try + * to log it again with the next WAL record in the current subtransaction. + */ + if (topxid_included) + MarkSubxactTopXidLogged(); + /* * Update shared LogwrtRqst.Write, if we crossed page boundary. */ diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b492c656d7..689384a411 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -123,7 +123,8 @@ static MemoryContext xloginsert_cxt; static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, int *num_fpi); + XLogRecPtr *fpw_lsn, int *num_fpi, + bool *topxid_included); static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, char *dest, uint16 *dlen); @@ -209,10 +210,6 @@ XLogResetInsertion(void) { int i; - /* reset the subxact assignment flag (if needed) */ - if (curinsert_flags & XLOG_INCLUDE_XID) - MarkSubTransactionAssigned(); - for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -409,8 +406,6 @@ XLogRegisterBufData(uint8 block_id, char *data, int len) * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for * durability, which allows to avoid triggering WAL archiving and other * background activity. - * - XLOG_INCLUDE_XID, a message-passing hack between XLogRecordAssemble - * and XLogResetInsertion. */ void XLogSetRecordFlags(uint8 flags) @@ -465,6 +460,7 @@ XLogInsert(RmgrId rmid, uint8 info) { XLogRecPtr RedoRecPtr; bool doPageWrites; + bool topxid_included = false; XLogRecPtr fpw_lsn; XLogRecData *rdt; int num_fpi = 0; @@ -477,9 +473,10 @@ XLogInsert(RmgrId rmid, uint8 info) GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, - &fpw_lsn, &num_fpi); + &fpw_lsn, &num_fpi, &topxid_included); - EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi); + EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, + topxid_included); } while (EndPos == InvalidXLogRecPtr); XLogResetInsertion(); @@ -498,11 +495,14 @@ XLogInsert(RmgrId rmid, uint8 info) * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This * signals that the assembled record is only good for insertion on the * assumption that the RedoRecPtr and doPageWrites values were up-to-date. + * + * *topxid_included is set if the topmost transaction ID is logged with the + * current subtransaction. */ static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, int *num_fpi) + XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included) { XLogRecData *rdt; uint32 total_len = 0; @@ -788,12 +788,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, } /* followed by toplevel XID, if not already included in previous record */ - if (IsSubTransactionAssignmentPending()) + if (IsSubxactTopXidLogPending()) { TransactionId xid = GetTopTransactionIdIfAny(); - /* update the flag (later used by XLogResetInsertion) */ - XLogSetRecordFlags(XLOG_INCLUDE_XID); + /* Set the flag that the top xid is included in the WAL */ + *topxid_included = true; *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; memcpy(scratch, &xid, sizeof(TransactionId)); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 134f6862da..5546d334ad 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -433,8 +433,8 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); -extern bool IsSubTransactionAssignmentPending(void); -extern void MarkSubTransactionAssigned(void); +extern bool IsSubxactTopXidLogPending(void); +extern void MarkSubxactTopXidLogged(void); extern int xactGetCommittedChildren(TransactionId **ptr); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 5e2c94a05f..c0a560204b 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -212,7 +212,6 @@ extern bool XLOG_DEBUG; */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ -#define XLOG_INCLUDE_XID 0x04 /* WAL-internal message-passing hack */ /* Checkpoint statistics */ @@ -258,7 +257,8 @@ struct XLogRecData; extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags, - int num_fpi); + int num_fpi, + bool topxid_included); extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr);