diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 3e87978cef..b036b6d524 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -14,53 +14,188 @@ */ #include "postgres.h" +#include "access/transam.h" #include "access/xact.h" #include "catalog/catalog.h" #include "storage/sinval.h" #include "utils/timestamp.h" +/* + * Parse the WAL format of a xact commit and abort records into a easier to + * understand format. + * + * This routines are in xactdesc.c because they're accessed in backend (when + * replaying WAL) and frontend (pg_xlogdump) code. This file is the only xact + * specific one shared between both. They're complicated enough that + * duplication would be bothersome. + */ + +void +ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed) +{ + char *data = ((char *) xlrec) + MinSizeOfXactCommit; + + memset(parsed, 0, sizeof(*parsed)); + + parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */ + + parsed->xact_time = xlrec->xact_time; + + if (info & XLOG_XACT_HAS_INFO) + { + xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data; + + parsed->xinfo = xl_xinfo->xinfo; + + data += sizeof(xl_xact_xinfo); + } + + if (parsed->xinfo & XACT_XINFO_HAS_DBINFO) + { + xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data; + + parsed->dbId = xl_dbinfo->dbId; + parsed->tsId = xl_dbinfo->tsId; + + data += sizeof(xl_xact_dbinfo); + } + + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) + { + xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; + + parsed->nsubxacts = xl_subxacts->nsubxacts; + parsed->subxacts = xl_subxacts->subxacts; + + data += MinSizeOfXactSubxacts; + data += parsed->nsubxacts * sizeof(TransactionId); + } + + if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES) + { + xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data; + + parsed->nrels = xl_relfilenodes->nrels; + parsed->xnodes = xl_relfilenodes->xnodes; + + data += MinSizeOfXactRelfilenodes; + data += xl_relfilenodes->nrels * sizeof(RelFileNode); + } + + if (parsed->xinfo & XACT_XINFO_HAS_INVALS) + { + xl_xact_invals *xl_invals = (xl_xact_invals *) data; + + parsed->nmsgs = xl_invals->nmsgs; + parsed->msgs = xl_invals->msgs; + + data += MinSizeOfXactInvals; + data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage); + } + + if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE) + { + xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data; + + parsed->twophase_xid = xl_twophase->xid; + + data += sizeof(xl_xact_twophase); + } +} + +void +ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) +{ + char *data = ((char *) xlrec) + MinSizeOfXactAbort; + + memset(parsed, 0, sizeof(*parsed)); + + parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */ + + parsed->xact_time = xlrec->xact_time; + + if (info & XLOG_XACT_HAS_INFO) + { + xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data; + + parsed->xinfo = xl_xinfo->xinfo; + + data += sizeof(xl_xact_xinfo); + } + + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) + { + xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; + + parsed->nsubxacts = xl_subxacts->nsubxacts; + parsed->subxacts = xl_subxacts->subxacts; + + data += MinSizeOfXactSubxacts; + data += parsed->nsubxacts * sizeof(TransactionId); + } + + if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES) + { + xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data; + + parsed->nrels = xl_relfilenodes->nrels; + parsed->xnodes = xl_relfilenodes->xnodes; + + data += MinSizeOfXactRelfilenodes; + data += xl_relfilenodes->nrels * sizeof(RelFileNode); + } + + if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE) + { + xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data; + + parsed->twophase_xid = xl_twophase->xid; + + data += sizeof(xl_xact_twophase); + } +} static void -xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) +xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec) { + xl_xact_parsed_commit parsed; int i; - TransactionId *subxacts; - subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; + ParseCommitRecord(info, xlrec, &parsed); + + /* If this is a prepared xact, show the xid of the original xact */ + if (TransactionIdIsValid(parsed.twophase_xid)) + appendStringInfo(buf, "%u: ", parsed.twophase_xid); appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); - if (xlrec->nrels > 0) + if (parsed.nrels > 0) { appendStringInfoString(buf, "; rels:"); - for (i = 0; i < xlrec->nrels; i++) + for (i = 0; i < parsed.nrels; i++) { - char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM); + char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM); appendStringInfo(buf, " %s", path); pfree(path); } } - if (xlrec->nsubxacts > 0) + if (parsed.nsubxacts > 0) { appendStringInfoString(buf, "; subxacts:"); - for (i = 0; i < xlrec->nsubxacts; i++) - appendStringInfo(buf, " %u", subxacts[i]); + for (i = 0; i < parsed.nsubxacts; i++) + appendStringInfo(buf, " %u", parsed.subxacts[i]); } - if (xlrec->nmsgs > 0) + if (parsed.nmsgs > 0) { - SharedInvalidationMessage *msgs; - - msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts]; - - if (XactCompletionRelcacheInitFileInval(xlrec->xinfo)) + if (XactCompletionRelcacheInitFileInval(parsed.xinfo)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", - xlrec->dbId, xlrec->tsId); + parsed.dbId, parsed.tsId); appendStringInfoString(buf, "; inval msgs:"); - for (i = 0; i < xlrec->nmsgs; i++) + for (i = 0; i < parsed.nmsgs; i++) { - SharedInvalidationMessage *msg = &msgs[i]; + SharedInvalidationMessage *msg = &parsed.msgs[i]; if (msg->id >= 0) appendStringInfo(buf, " catcache %d", msg->id); @@ -80,48 +215,41 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) appendStringInfo(buf, " unknown id %d", msg->id); } } + + if (XactCompletionForceSyncCommit(parsed.xinfo)) + appendStringInfo(buf, "; sync"); } static void -xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec) +xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec) { + xl_xact_parsed_abort parsed; int i; - appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); + ParseAbortRecord(info, xlrec, &parsed); - if (xlrec->nsubxacts > 0) - { - appendStringInfoString(buf, "; subxacts:"); - for (i = 0; i < xlrec->nsubxacts; i++) - appendStringInfo(buf, " %u", xlrec->subxacts[i]); - } -} - -static void -xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec) -{ - int i; + /* If this is a prepared xact, show the xid of the original xact */ + if (TransactionIdIsValid(parsed.twophase_xid)) + appendStringInfo(buf, "%u: ", parsed.twophase_xid); appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); - if (xlrec->nrels > 0) + if (parsed.nrels > 0) { appendStringInfoString(buf, "; rels:"); - for (i = 0; i < xlrec->nrels; i++) + for (i = 0; i < parsed.nrels; i++) { - char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM); + char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM); appendStringInfo(buf, " %s", path); pfree(path); } } - if (xlrec->nsubxacts > 0) - { - TransactionId *xacts = (TransactionId *) - &xlrec->xnodes[xlrec->nrels]; + if (parsed.nsubxacts > 0) + { appendStringInfoString(buf, "; subxacts:"); - for (i = 0; i < xlrec->nsubxacts; i++) - appendStringInfo(buf, " %u", xacts[i]); + for (i = 0; i < parsed.nsubxacts; i++) + appendStringInfo(buf, " %u", parsed.subxacts[i]); } } @@ -140,39 +268,19 @@ void xact_desc(StringInfo buf, XLogReaderState *record) { char *rec = XLogRecGetData(record); - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK; - if (info == XLOG_XACT_COMMIT_COMPACT) - { - xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec; - - xact_desc_commit_compact(buf, xlrec); - } - else if (info == XLOG_XACT_COMMIT) + if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED) { xl_xact_commit *xlrec = (xl_xact_commit *) rec; - xact_desc_commit(buf, xlrec); + xact_desc_commit(buf, XLogRecGetInfo(record), xlrec); } - else if (info == XLOG_XACT_ABORT) + else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) { xl_xact_abort *xlrec = (xl_xact_abort *) rec; - xact_desc_abort(buf, xlrec); - } - else if (info == XLOG_XACT_COMMIT_PREPARED) - { - xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec; - - appendStringInfo(buf, "%u: ", xlrec->xid); - xact_desc_commit(buf, &xlrec->crec); - } - else if (info == XLOG_XACT_ABORT_PREPARED) - { - xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec; - - appendStringInfo(buf, "%u: ", xlrec->xid); - xact_desc_abort(buf, &xlrec->arec); + xact_desc_abort(buf, XLogRecGetInfo(record), xlrec); } else if (info == XLOG_XACT_ASSIGNMENT) { @@ -193,7 +301,7 @@ xact_identify(uint8 info) { const char *id = NULL; - switch (info & ~XLR_INFO_MASK) + switch (info & XLOG_XACT_OPMASK) { case XLOG_XACT_COMMIT: id = "COMMIT"; @@ -213,9 +321,6 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; - case XLOG_XACT_COMMIT_COMPACT: - id = "COMMIT_COMPACT"; - break; } return id; diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 6edc22704c..4075a6f743 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2079,7 +2079,6 @@ RecordTransactionCommitPrepared(TransactionId xid, SharedInvalidationMessage *invalmsgs, bool initfileinval) { - xl_xact_commit_prepared xlrec; XLogRecPtr recptr; START_CRIT_SECTION(); @@ -2088,36 +2087,11 @@ RecordTransactionCommitPrepared(TransactionId xid, MyPgXact->delayChkpt = true; /* Emit the XLOG commit record */ - xlrec.xid = xid; - - xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0; - - xlrec.crec.dbId = MyDatabaseId; - xlrec.crec.tsId = MyDatabaseTableSpace; - - xlrec.crec.xact_time = GetCurrentTimestamp(); - xlrec.crec.nrels = nrels; - xlrec.crec.nsubxacts = nchildren; - xlrec.crec.nmsgs = ninvalmsgs; - - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared); - - /* dump rels to delete */ - if (nrels > 0) - XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); - - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - - /* dump cache invalidation messages */ - if (ninvalmsgs > 0) - XLogRegisterData((char *) invalmsgs, - ninvalmsgs * sizeof(SharedInvalidationMessage)); - - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED); + recptr = XactLogCommitRecord(GetCurrentTimestamp(), + nchildren, children, nrels, rels, + ninvalmsgs, invalmsgs, + initfileinval, false, + xid); /* * We don't currently try to sleep before flush here ... nor is there any @@ -2160,7 +2134,6 @@ RecordTransactionAbortPrepared(TransactionId xid, int nrels, RelFileNode *rels) { - xl_xact_abort_prepared xlrec; XLogRecPtr recptr; /* @@ -2174,24 +2147,10 @@ RecordTransactionAbortPrepared(TransactionId xid, START_CRIT_SECTION(); /* Emit the XLOG abort record */ - xlrec.xid = xid; - xlrec.arec.xact_time = GetCurrentTimestamp(); - xlrec.arec.nrels = nrels; - xlrec.arec.nsubxacts = nchildren; - - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared); - - /* dump rels to delete */ - if (nrels > 0) - XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); - - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED); + recptr = XactLogAbortRecord(GetCurrentTimestamp(), + nchildren, children, + nrels, rels, + xid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 89769eac07..1495bb499f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1068,70 +1068,11 @@ RecordTransactionCommit(void) SetCurrentTransactionStopTimestamp(); - /* - * Do we need the long commit record? If not, use the compact format. - * - * For now always use the non-compact version if wal_level=logical, so - * we can hide commits from other databases. TODO: In the future we - * should merge compact and non-compact commits and use a flags - * variable to determine if it contains subxacts, relations or - * invalidation messages, that's more extensible and degrades more - * gracefully. Till then, it's just 20 bytes of overhead. - */ - if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit || - XLogLogicalInfoActive()) - { - xl_xact_commit xlrec; - - /* - * Set flags required for recovery processing of commits. - */ - xlrec.xinfo = 0; - if (RelcacheInitFileInval) - xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; - if (forceSyncCommit) - xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; - - xlrec.dbId = MyDatabaseId; - xlrec.tsId = MyDatabaseTableSpace; - - xlrec.xact_time = xactStopTimestamp; - xlrec.nrels = nrels; - xlrec.nsubxacts = nchildren; - xlrec.nmsgs = nmsgs; - - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit); - /* dump rels to delete */ - if (nrels > 0) - XLogRegisterData((char *) rels, - nrels * sizeof(RelFileNode)); - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - /* dump shared cache invalidation messages */ - if (nmsgs > 0) - XLogRegisterData((char *) invalMessages, - nmsgs * sizeof(SharedInvalidationMessage)); - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT); - } - else - { - xl_xact_commit_compact xlrec; - - xlrec.xact_time = xactStopTimestamp; - xlrec.nsubxacts = nchildren; - - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact); - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT); - } + XactLogCommitRecord(xactStopTimestamp, + nchildren, children, nrels, rels, + nmsgs, invalMessages, + RelcacheInitFileInval, forceSyncCommit, + InvalidTransactionId /* plain commit */); } /* @@ -1424,7 +1365,7 @@ RecordTransactionAbort(bool isSubXact) RelFileNode *rels; int nchildren; TransactionId *children; - xl_xact_abort xlrec; + TimestampTz xact_time; /* * If we haven't been assigned an XID, nobody will care whether we aborted @@ -1464,28 +1405,17 @@ RecordTransactionAbort(bool isSubXact) /* Write the ABORT record */ if (isSubXact) - xlrec.xact_time = GetCurrentTimestamp(); + xact_time = GetCurrentTimestamp(); else { SetCurrentTransactionStopTimestamp(); - xlrec.xact_time = xactStopTimestamp; + xact_time = xactStopTimestamp; } - xlrec.nrels = nrels; - xlrec.nsubxacts = nchildren; - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort); - - /* dump rels to delete */ - if (nrels > 0) - XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); - - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT); + XactLogAbortRecord(xact_time, + nchildren, children, + nrels, rels, + InvalidTransactionId); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -4659,23 +4589,229 @@ xactGetCommittedChildren(TransactionId **ptr) * XLOG support routines */ + +/* + * Log the commit record for a plain or twophase transaction commit. + * + * A 2pc commit will be emitted when twophase_xid is valid, a plain one + * otherwise. + */ +XLogRecPtr +XactLogCommitRecord(TimestampTz commit_time, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNode *rels, + int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInval, bool forceSync, + TransactionId twophase_xid) +{ + xl_xact_commit xlrec; + xl_xact_xinfo xl_xinfo; + xl_xact_dbinfo xl_dbinfo; + xl_xact_subxacts xl_subxacts; + xl_xact_relfilenodes xl_relfilenodes; + xl_xact_invals xl_invals; + xl_xact_twophase xl_twophase; + + uint8 info; + + Assert(CritSectionCount > 0); + + xl_xinfo.xinfo = 0; + + /* decide between a plain and 2pc commit */ + if (!TransactionIdIsValid(twophase_xid)) + info = XLOG_XACT_COMMIT; + else + info = XLOG_XACT_COMMIT_PREPARED; + + /* First figure out and collect all the information needed */ + + xlrec.xact_time = commit_time; + + if (relcacheInval) + xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; + if (forceSyncCommit) + xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; + + /* + * Relcache invalidations requires information about the current database + * and so does logical decoding. + */ + if (nmsgs > 0 || XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + if (nsubxacts > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS; + xl_subxacts.nsubxacts = nsubxacts; + } + + if (nrels > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES; + xl_relfilenodes.nrels = nrels; + } + + if (nmsgs > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS; + xl_invals.nmsgs = nmsgs; + } + + if (TransactionIdIsValid(twophase_xid)) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; + xl_twophase.xid = twophase_xid; + } + + if (xl_xinfo.xinfo != 0) + info |= XLOG_XACT_HAS_INFO; + + /* Then include all the collected data into the commit record. */ + + XLogBeginInsert(); + + XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit)); + + if (xl_xinfo.xinfo != 0) + XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) + { + XLogRegisterData((char *) (&xl_subxacts), + MinSizeOfXactSubxacts); + XLogRegisterData((char *) subxacts, + nsubxacts * sizeof(TransactionId)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) + { + XLogRegisterData((char *) (&xl_relfilenodes), + MinSizeOfXactRelfilenodes); + XLogRegisterData((char *) rels, + nrels * sizeof(RelFileNode)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS) + { + XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals); + XLogRegisterData((char *) msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + + return XLogInsert(RM_XACT_ID, info); +} + +/* + * Log the commit record for a plain or twophase transaction abort. + * + * A 2pc abort will be emitted when twophase_xid is valid, a plain one + * otherwise. + */ +XLogRecPtr +XactLogAbortRecord(TimestampTz abort_time, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNode *rels, + TransactionId twophase_xid) +{ + xl_xact_abort xlrec; + xl_xact_xinfo xl_xinfo; + xl_xact_subxacts xl_subxacts; + xl_xact_relfilenodes xl_relfilenodes; + xl_xact_twophase xl_twophase; + + uint8 info; + + Assert(CritSectionCount > 0); + + xl_xinfo.xinfo = 0; + + /* decide between a plain and 2pc abort */ + if (!TransactionIdIsValid(twophase_xid)) + info = XLOG_XACT_ABORT; + else + info = XLOG_XACT_ABORT_PREPARED; + + + /* First figure out and collect all the information needed */ + + xlrec.xact_time = abort_time; + + if (nsubxacts > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS; + xl_subxacts.nsubxacts = nsubxacts; + } + + if (nrels > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES; + xl_relfilenodes.nrels = nrels; + } + + if (TransactionIdIsValid(twophase_xid)) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; + xl_twophase.xid = twophase_xid; + } + + if (xl_xinfo.xinfo != 0) + info |= XLOG_XACT_HAS_INFO; + + /* Then include all the collected data into the abort record. */ + + XLogBeginInsert(); + + XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort); + + if (xl_xinfo.xinfo != 0) + XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) + { + XLogRegisterData((char *) (&xl_subxacts), + MinSizeOfXactSubxacts); + XLogRegisterData((char *) subxacts, + nsubxacts * sizeof(TransactionId)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) + { + XLogRegisterData((char *) (&xl_relfilenodes), + MinSizeOfXactRelfilenodes); + XLogRegisterData((char *) rels, + nrels * sizeof(RelFileNode)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + + return XLogInsert(RM_XACT_ID, info); +} + /* * Before 9.0 this was a fairly short function, but now it performs many * actions for which the order of execution is critical. */ static void -xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, - TimestampTz commit_time, - TransactionId *sub_xids, int nsubxacts, - SharedInvalidationMessage *inval_msgs, int nmsgs, - RelFileNode *xnodes, int nrels, - Oid dbId, Oid tsId, - uint32 xinfo) +xact_redo_commit(xl_xact_parsed_commit *parsed, + TransactionId xid, + XLogRecPtr lsn) { TransactionId max_xid; int i; - max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); + max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); /* * Make sure nextXid is beyond any XID mentioned in the record. @@ -4694,15 +4830,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, } /* Set the transaction commit timestamp and metadata */ - TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids, - commit_time, InvalidCommitTsNodeId, false); + TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, + parsed->xact_time, InvalidCommitTsNodeId, + false); if (standbyState == STANDBY_DISABLED) { /* * Mark the transaction committed in pg_clog. */ - TransactionIdCommitTree(xid, nsubxacts, sub_xids); + TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts); } else { @@ -4726,21 +4863,24 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ - TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn); + TransactionIdAsyncCommitTree( + xid, parsed->nsubxacts, parsed->subxacts, lsn); /* * We must mark clog before we update the ProcArray. */ - ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid); + ExpireTreeKnownAssignedTransactionIds( + xid, parsed->nsubxacts, parsed->subxacts, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ - ProcessCommittedInvalidationMessages(inval_msgs, nmsgs, - XactCompletionRelcacheInitFileInval(xinfo), - dbId, tsId); + ProcessCommittedInvalidationMessages( + parsed->msgs, parsed->nmsgs, + XactCompletionRelcacheInitFileInval(parsed->xinfo), + parsed->dbId, parsed->tsId); /* * Release locks, if any. We do this for both two phase and normal one @@ -4753,7 +4893,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, } /* Make sure files supposed to be dropped are dropped */ - if (nrels > 0) + if (parsed->nrels > 0) { /* * First update minimum recovery point to cover this WAL record. Once @@ -4772,13 +4912,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, */ XLogFlush(lsn); - for (i = 0; i < nrels; i++) + for (i = 0; i < parsed->nrels; i++) { - SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId); + SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) - XLogDropRelation(xnodes[i], fork); + XLogDropRelation(parsed->xnodes[i], fork); smgrdounlink(srel, true); smgrclose(srel); } @@ -4796,51 +4936,11 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, * minRecoveryPoint during recovery) helps to reduce that problem window, * for any user that requested ForceSyncCommit(). */ - if (XactCompletionForceSyncCommit(xinfo)) + if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); } -/* - * Utility function to call xact_redo_commit_internal after breaking down xlrec - */ -static void -xact_redo_commit(xl_xact_commit *xlrec, - TransactionId xid, XLogRecPtr lsn) -{ - TransactionId *subxacts; - SharedInvalidationMessage *inval_msgs; - - /* subxid array follows relfilenodes */ - subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - /* invalidation messages array follows subxids */ - inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - - xact_redo_commit_internal(xid, lsn, xlrec->xact_time, - subxacts, xlrec->nsubxacts, - inval_msgs, xlrec->nmsgs, - xlrec->xnodes, xlrec->nrels, - xlrec->dbId, - xlrec->tsId, - xlrec->xinfo); -} - -/* - * Utility function to call xact_redo_commit_internal for compact form of message. - */ -static void -xact_redo_commit_compact(xl_xact_commit_compact *xlrec, - TransactionId xid, XLogRecPtr lsn) -{ - xact_redo_commit_internal(xid, lsn, xlrec->xact_time, - xlrec->subxacts, xlrec->nsubxacts, - NULL, 0, /* inval msgs */ - NULL, 0, /* relfilenodes */ - InvalidOid, /* dbId */ - InvalidOid, /* tsId */ - 0); /* xinfo */ -} - /* * Be careful with the order of execution, as with xact_redo_commit(). * The two functions are similar but differ in key places. @@ -4851,14 +4951,10 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec, * because subtransaction commit is never WAL logged. */ static void -xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) +xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) { - TransactionId *sub_xids; - TransactionId max_xid; - int i; - - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids); + int i; + TransactionId max_xid; /* * Make sure nextXid is beyond any XID mentioned in the record. @@ -4867,6 +4963,10 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) * hold a lock while checking this. We still acquire the lock to modify * it, though. */ + max_xid = TransactionIdLatest(xid, + parsed->nsubxacts, + parsed->subxacts); + if (TransactionIdFollowsOrEquals(max_xid, ShmemVariableCache->nextXid)) { @@ -4879,7 +4979,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) if (standbyState == STANDBY_DISABLED) { /* Mark the transaction aborted in pg_clog, no need for async stuff */ - TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); + TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts); } else { @@ -4895,12 +4995,13 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) RecordKnownAssignedTransactionIds(max_xid); /* Mark the transaction aborted in pg_clog, no need for async stuff */ - TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); + TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts); /* * We must update the ProcArray after we have marked clog. */ - ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); + ExpireTreeKnownAssignedTransactionIds( + xid, parsed->nsubxacts, parsed->subxacts, max_xid); /* * There are no flat files that need updating, nor invalidation @@ -4910,17 +5011,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) /* * Release locks, if any. There are no invalidations to send. */ - StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); + StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts); } /* Make sure files supposed to be dropped are dropped */ - for (i = 0; i < xlrec->nrels; i++) + for (i = 0; i < parsed->nrels; i++) { - SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); + SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) - XLogDropRelation(xlrec->xnodes[i], fork); + XLogDropRelation(parsed->xnodes[i], fork); smgrdounlink(srel, true); smgrclose(srel); } @@ -4929,28 +5030,52 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) void xact_redo(XLogReaderState *record) { - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK; /* Backup blocks are not used in xact records */ Assert(!XLogRecHasAnyBlockRefs(record)); - if (info == XLOG_XACT_COMMIT_COMPACT) - { - xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record); - - xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr); - } - else if (info == XLOG_XACT_COMMIT) + if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; - xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr); + ParseCommitRecord(XLogRecGetInfo(record), xlrec, + &parsed); + + if (info == XLOG_XACT_COMMIT) + { + Assert(!TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_commit(&parsed, XLogRecGetXid(record), + record->EndRecPtr); + } + else + { + Assert(TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_commit(&parsed, parsed.twophase_xid, + record->EndRecPtr); + RemoveTwoPhaseFile(parsed.twophase_xid, false); + } } - else if (info == XLOG_XACT_ABORT) + else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; - xact_redo_abort(xlrec, XLogRecGetXid(record)); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, + &parsed); + + if (info == XLOG_XACT_ABORT) + { + Assert(!TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_abort(&parsed, XLogRecGetXid(record)); + } + else + { + Assert(TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_abort(&parsed, parsed.twophase_xid); + RemoveTwoPhaseFile(parsed.twophase_xid, false); + } } else if (info == XLOG_XACT_PREPARE) { @@ -4958,20 +5083,6 @@ xact_redo(XLogReaderState *record) RecreateTwoPhaseFile(XLogRecGetXid(record), XLogRecGetData(record), XLogRecGetDataLen(record)); } - else if (info == XLOG_XACT_COMMIT_PREPARED) - { - xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); - - xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr); - RemoveTwoPhaseFile(xlrec->xid, false); - } - else if (info == XLOG_XACT_ABORT_PREPARED) - { - xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record); - - xact_redo_abort(&xlrec->arec, xlrec->xid); - RemoveTwoPhaseFile(xlrec->xid, false); - } else if (info == XLOG_XACT_ASSIGNMENT) { xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 72af2c4330..e2d187f74d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5168,39 +5168,27 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime) { - uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 xact_info = info & XLOG_XACT_OPMASK; uint8 rmid = XLogRecGetRmid(record); - if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) + if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT) { *recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time; return true; } - if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT) - { - *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time; - return true; - } - if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) + if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT || + xact_info == XLOG_XACT_COMMIT_PREPARED)) { *recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time; return true; } - if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED) - { - *recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time; - return true; - } - if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT) + if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT || + xact_info == XLOG_XACT_ABORT_PREPARED)) { *recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time; return true; } - if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED) - { - *recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time; - return true; - } return false; } @@ -5216,7 +5204,7 @@ static bool recoveryStopsBefore(XLogReaderState *record) { bool stopsHere = false; - uint8 record_info; + uint8 xact_info; bool isCommit; TimestampTz recordXtime = 0; TransactionId recordXid; @@ -5237,27 +5225,40 @@ recoveryStopsBefore(XLogReaderState *record) /* Otherwise we only consider stopping before COMMIT or ABORT records. */ if (XLogRecGetRmid(record) != RM_XACT_ID) return false; - record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; - if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT) + xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK; + + if (xact_info == XLOG_XACT_COMMIT) { isCommit = true; recordXid = XLogRecGetXid(record); } - else if (record_info == XLOG_XACT_COMMIT_PREPARED) + else if (xact_info == XLOG_XACT_COMMIT_PREPARED) { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; + isCommit = true; - recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid; + ParseCommitRecord(XLogRecGetInfo(record), + xlrec, + &parsed); + recordXid = parsed.twophase_xid; } - else if (record_info == XLOG_XACT_ABORT) + else if (xact_info == XLOG_XACT_ABORT) { isCommit = false; recordXid = XLogRecGetXid(record); } - else if (record_info == XLOG_XACT_ABORT_PREPARED) + else if (xact_info == XLOG_XACT_ABORT_PREPARED) { - isCommit = false; - recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid; + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; + + isCommit = true; + ParseAbortRecord(XLogRecGetInfo(record), + xlrec, + &parsed); + recordXid = parsed.twophase_xid; } else return false; @@ -5325,11 +5326,12 @@ recoveryStopsBefore(XLogReaderState *record) static bool recoveryStopsAfter(XLogReaderState *record) { - uint8 record_info; + uint8 info; + uint8 xact_info; uint8 rmid; TimestampTz recordXtime; - record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; rmid = XLogRecGetRmid(record); /* @@ -5337,7 +5339,7 @@ recoveryStopsAfter(XLogReaderState *record) * the first one. */ if (recoveryTarget == RECOVERY_TARGET_NAME && - rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) + rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT) { xl_restore_point *recordRestorePointData; @@ -5358,12 +5360,15 @@ recoveryStopsAfter(XLogReaderState *record) } } - if (rmid == RM_XACT_ID && - (record_info == XLOG_XACT_COMMIT_COMPACT || - record_info == XLOG_XACT_COMMIT || - record_info == XLOG_XACT_COMMIT_PREPARED || - record_info == XLOG_XACT_ABORT || - record_info == XLOG_XACT_ABORT_PREPARED)) + if (rmid != RM_XACT_ID) + return false; + + xact_info = info & XLOG_XACT_OPMASK; + + if (xact_info == XLOG_XACT_COMMIT || + xact_info == XLOG_XACT_COMMIT_PREPARED || + xact_info == XLOG_XACT_ABORT || + xact_info == XLOG_XACT_ABORT_PREPARED) { TransactionId recordXid; @@ -5372,10 +5377,26 @@ recoveryStopsAfter(XLogReaderState *record) SetLatestXTime(recordXtime); /* Extract the XID of the committed/aborted transaction */ - if (record_info == XLOG_XACT_COMMIT_PREPARED) - recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid; - else if (record_info == XLOG_XACT_ABORT_PREPARED) - recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid; + if (xact_info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; + + ParseCommitRecord(XLogRecGetInfo(record), + xlrec, + &parsed); + recordXid = parsed.twophase_xid; + } + else if (xact_info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; + + ParseAbortRecord(XLogRecGetInfo(record), + xlrec, + &parsed); + recordXid = parsed.twophase_xid; + } else recordXid = XLogRecGetXid(record); @@ -5396,17 +5417,16 @@ recoveryStopsAfter(XLogReaderState *record) recoveryStopTime = recordXtime; recoveryStopName[0] = '\0'; - if (record_info == XLOG_XACT_COMMIT_COMPACT || - record_info == XLOG_XACT_COMMIT || - record_info == XLOG_XACT_COMMIT_PREPARED) + if (xact_info == XLOG_XACT_COMMIT || + xact_info == XLOG_XACT_COMMIT_PREPARED) { ereport(LOG, (errmsg("recovery stopping after commit of transaction %u, time %s", recoveryStopXid, timestamptz_to_str(recoveryStopTime)))); } - else if (record_info == XLOG_XACT_ABORT || - record_info == XLOG_XACT_ABORT_PREPARED) + else if (xact_info == XLOG_XACT_ABORT || + xact_info == XLOG_XACT_ABORT_PREPARED) { ereport(LOG, (errmsg("recovery stopping after abort of transaction %u, time %s", @@ -5494,7 +5514,7 @@ SetRecoveryPause(bool recoveryPause) static bool recoveryApplyDelay(XLogReaderState *record) { - uint8 record_info; + uint8 xact_info; TimestampTz xtime; long secs; int microsecs; @@ -5511,11 +5531,13 @@ recoveryApplyDelay(XLogReaderState *record) * so there is already opportunity for issues caused by early conflicts on * standbys. */ - record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; - if (!(XLogRecGetRmid(record) == RM_XACT_ID && - (record_info == XLOG_XACT_COMMIT_COMPACT || - record_info == XLOG_XACT_COMMIT || - record_info == XLOG_XACT_COMMIT_PREPARED))) + if (XLogRecGetRmid(record) != RM_XACT_ID) + return false; + + xact_info = XLogRecGetInfo(record) & XLOG_XACT_COMMIT; + + if (xact_info != XLOG_XACT_COMMIT && + xact_info != XLOG_XACT_COMMIT_PREPARED) return false; if (!getRecordTimestamp(record, &xtime)) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index e7614bd515..eb7293f2f3 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - TransactionId xid, Oid dboid, - TimestampTz commit_time, - int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msg); -static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, - TransactionId xid, TransactionId *sub_xids, int nsubxacts); + xl_xact_parsed_commit *parsed, TransactionId xid); +static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_abort *parsed, TransactionId xid); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -188,7 +185,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; XLogReaderState *r = buf->record; - uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; /* no point in doing anything yet, data could not be decoded anyway */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) @@ -197,87 +194,41 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) switch (info) { case XLOG_XACT_COMMIT: - { - xl_xact_commit *xlrec; - TransactionId *subxacts = NULL; - SharedInvalidationMessage *invals = NULL; - - xlrec = (xl_xact_commit *) XLogRecGetData(r); - - subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - - DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId, - xlrec->xact_time, - xlrec->nsubxacts, subxacts, - xlrec->nmsgs, invals); - - break; - } case XLOG_XACT_COMMIT_PREPARED: { - xl_xact_commit_prepared *prec; xl_xact_commit *xlrec; - TransactionId *subxacts; - SharedInvalidationMessage *invals = NULL; + xl_xact_parsed_commit parsed; + TransactionId xid; - /* Prepared commits contain a normal commit record... */ - prec = (xl_xact_commit_prepared *) XLogRecGetData(r); - xlrec = &prec->crec; + xlrec = (xl_xact_commit *) XLogRecGetData(r); + ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); - subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; - DecodeCommit(ctx, buf, prec->xid, xlrec->dbId, - xlrec->xact_time, - xlrec->nsubxacts, subxacts, - xlrec->nmsgs, invals); - - break; - } - case XLOG_XACT_COMMIT_COMPACT: - { - xl_xact_commit_compact *xlrec; - - xlrec = (xl_xact_commit_compact *) XLogRecGetData(r); - - DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid, - xlrec->xact_time, - xlrec->nsubxacts, xlrec->subxacts, - 0, NULL); + DecodeCommit(ctx, buf, &parsed, xid); break; } case XLOG_XACT_ABORT: - { - xl_xact_abort *xlrec; - TransactionId *sub_xids; - - xlrec = (xl_xact_abort *) XLogRecGetData(r); - - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - - DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r), - sub_xids, xlrec->nsubxacts); - break; - } case XLOG_XACT_ABORT_PREPARED: { - xl_xact_abort_prepared *prec; xl_xact_abort *xlrec; - TransactionId *sub_xids; + xl_xact_parsed_abort parsed; + TransactionId xid; - /* prepared abort contain a normal commit abort... */ - prec = (xl_xact_abort_prepared *) XLogRecGetData(r); - xlrec = &prec->arec; + xlrec = (xl_xact_abort *) XLogRecGetData(r); + ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; - /* r->xl_xid is committed in a separate record */ - DecodeAbort(ctx, buf->origptr, prec->xid, - sub_xids, xlrec->nsubxacts); + DecodeAbort(ctx, buf, &parsed, xid); break; } - case XLOG_XACT_ASSIGNMENT: { xl_xact_assignment *xlrec; @@ -477,10 +428,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - TransactionId xid, Oid dboid, - TimestampTz commit_time, - int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msgs) + xl_xact_parsed_commit *parsed, TransactionId xid) { int i; @@ -489,15 +437,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * transaction's contents, since the various caches need to always be * consistent. */ - if (ninval_msgs > 0) + if (parsed->nmsgs > 0) { ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - ninval_msgs, msgs); + parsed->nmsgs, parsed->msgs); ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, - nsubxacts, sub_xids); + parsed->nsubxacts, parsed->subxacts); /* ---- * Check whether we are interested in this specific transaction, and tell @@ -524,12 +472,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * --- */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (dboid != InvalidOid && dboid != ctx->slot->data.database)) + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database)) { - for (i = 0; i < nsubxacts; i++) + for (i = 0; i < parsed->nsubxacts; i++) { - ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr); - sub_xids++; + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); } ReorderBufferForget(ctx->reorder, xid, buf->origptr); @@ -537,16 +484,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* tell the reorderbuffer about the surviving subtransactions */ - for (i = 0; i < nsubxacts; i++) + for (i = 0; i < parsed->nsubxacts; i++) { - ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids, + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], buf->origptr, buf->endptr); - sub_xids++; } /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time); + parsed->xact_time); } /* @@ -554,20 +500,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * snapbuild.c and reorderbuffer.c */ static void -DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, - TransactionId *sub_xids, int nsubxacts) +DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_abort *parsed, TransactionId xid) { int i; - SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids); + SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid, + parsed->nsubxacts, parsed->subxacts); - for (i = 0; i < nsubxacts; i++) + for (i = 0; i < parsed->nsubxacts; i++) { - ReorderBufferAbort(ctx->reorder, *sub_xids, lsn); - sub_xids++; + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); } - ReorderBufferAbort(ctx->reorder, xid, lsn); + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); } /* diff --git a/src/include/access/xact.h b/src/include/access/xact.h index d7e5f6447c..fdf3ea3228 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -18,6 +18,7 @@ #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "storage/relfilenode.h" +#include "storage/sinval.h" #include "utils/datetime.h" @@ -103,8 +104,8 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, */ /* - * XLOG allows to store some information in high 4 bits of log - * record xl_info field + * XLOG allows to store some information in high 4 bits of log record xl_info + * field. We use 3 for the opcode, and one about an optional flag variable. */ #define XLOG_XACT_COMMIT 0x00 #define XLOG_XACT_PREPARE 0x10 @@ -112,7 +113,41 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -#define XLOG_XACT_COMMIT_COMPACT 0x60 +/* free opcode 0x60 */ +/* free opcode 0x70 */ + +/* mask for filtering opcodes out of xl_info */ +#define XLOG_XACT_OPMASK 0x70 + +/* does this record have a 'xinfo' field or not */ +#define XLOG_XACT_HAS_INFO 0x80 + +/* + * The following flags, stored in xinfo, determine which information is + * contained in commit/abort records. + */ +#define XACT_XINFO_HAS_DBINFO (1U << 0) +#define XACT_XINFO_HAS_SUBXACTS (1U << 1) +#define XACT_XINFO_HAS_RELFILENODES (1U << 2) +#define XACT_XINFO_HAS_INVALS (1U << 3) +#define XACT_XINFO_HAS_TWOPHASE (1U << 4) + +/* + * Also stored in xinfo, these indicating a variety of additional actions that + * need to occur when emulating transaction effects during recovery. + * + * They are named XactCompletion... to differentiate them from + * EOXact... routines which run at the end of the original transaction + * completion. + */ +#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) +#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) + +/* Access macros for above flags */ +#define XactCompletionRelcacheInitFileInval(xinfo) \ + (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)) +#define XactCompletionForceSyncCommit(xinfo) \ + (!!(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)) typedef struct xl_xact_assignment { @@ -123,85 +158,130 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) -typedef struct xl_xact_commit_compact -{ - TimestampTz xact_time; /* time of commit */ - int nsubxacts; /* number of subtransaction XIDs */ - /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ - TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER]; -} xl_xact_commit_compact; +/* + * Commit and abort records can contain a lot of information. But a large + * portion of the records won't need all possible pieces of information. So we + * only include what's needed. + * + * A minimal commit/abort record only consists out of a xl_xact_commit/abort + * struct. The presence of additional information is indicated by bits set in + * 'xl_xact_xinfo->xinfo'. The presence of the xinfo field itself is signalled + * by a set XLOG_XACT_HAS_INFO bit in the xl_info field. + * + * NB: All the individual data chunks should be be sized to multiples of + * sizeof(int) and only require int32 alignment. + */ -#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts) +/* sub-records for commit/abort */ + +typedef struct xl_xact_xinfo +{ + /* + * Even though we right now only require 1 byte of space in xinfo we use + * four so following records don't have to care about alignment. Commit + * records can be large, so copying large portions isn't attractive. + */ + uint32 xinfo; +} xl_xact_xinfo; + +typedef struct xl_xact_dbinfo +{ + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ +} xl_xact_dbinfo; + +typedef struct xl_xact_subxacts +{ + int nsubxacts; /* number of subtransaction XIDs */ + TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER]; +} xl_xact_subxacts; +#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts) + +typedef struct xl_xact_relfilenodes +{ + int nrels; /* number of subtransaction XIDs */ + RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER]; +} xl_xact_relfilenodes; +#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes) + +typedef struct xl_xact_invals +{ + int nmsgs; /* number of shared inval msgs */ + SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; +} xl_xact_invals; +#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs) + +typedef struct xl_xact_twophase +{ + TransactionId xid; +} xl_xact_twophase; +#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs) typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ - uint32 xinfo; /* info flags */ - int nrels; /* number of RelFileNodes */ - int nsubxacts; /* number of subtransaction XIDs */ - int nmsgs; /* number of shared inval msgs */ - Oid dbId; /* MyDatabaseId */ - Oid tsId; /* MyDatabaseTableSpace */ - /* Array of RelFileNode(s) to drop at commit */ - RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER]; - /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ - /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ + + /* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */ + /* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */ + /* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */ + /* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */ + /* xl_xact_invals follows if XINFO_HAS_INVALS */ + /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ } xl_xact_commit; - -#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) - -/* - * These flags are set in the xinfo fields of WAL commit records, - * indicating a variety of additional actions that need to occur - * when emulating transaction effects during recovery. - * They are named XactCompletion... to differentiate them from - * EOXact... routines which run at the end of the original - * transaction completion. - */ -#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01 -#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 - -/* Access macros for above flags */ -#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) -#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) +#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) typedef struct xl_xact_abort { TimestampTz xact_time; /* time of abort */ - int nrels; /* number of RelFileNodes */ - int nsubxacts; /* number of subtransaction XIDs */ - /* Array of RelFileNode(s) to drop at abort */ - RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER]; - /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */ + + /* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */ + /* No db_info required */ + /* xl_xact_subxacts follows if HAS_SUBXACT */ + /* xl_xact_relfilenodes follows if HAS_RELFILENODES */ + /* No invalidation messages needed. */ + /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ } xl_xact_abort; - -/* Note the intentional lack of an invalidation message array c.f. commit */ - -#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes) +#define MinSizeOfXactAbort sizeof(xl_xact_abort) /* - * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records - * except that we have to store the XID of the prepared transaction explicitly - * --- the XID in the record header will be invalid. + * Commit/Abort records in the above form are a bit verbose to parse, so + * there's a deconstructed versions generated by ParseCommit/AbortRecord() for + * easier consumption. */ - -typedef struct xl_xact_commit_prepared +typedef struct xl_xact_parsed_commit { - TransactionId xid; /* XID of prepared xact */ - xl_xact_commit crec; /* COMMIT record */ - /* MORE DATA FOLLOWS AT END OF STRUCT */ -} xl_xact_commit_prepared; + TimestampTz xact_time; -#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes) + uint32 xinfo; -typedef struct xl_xact_abort_prepared + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + + int nsubxacts; + TransactionId *subxacts; + + int nrels; + RelFileNode *xnodes; + + int nmsgs; + SharedInvalidationMessage *msgs; + + TransactionId twophase_xid; /* only for 2PC */ +} xl_xact_parsed_commit; + +typedef struct xl_xact_parsed_abort { - TransactionId xid; /* XID of prepared xact */ - xl_xact_abort arec; /* ABORT record */ - /* MORE DATA FOLLOWS AT END OF STRUCT */ -} xl_xact_abort_prepared; + TimestampTz xact_time; + uint32 xinfo; -#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes) + int nsubxacts; + TransactionId *subxacts; + + int nrels; + RelFileNode *xnodes; + + TransactionId twophase_xid; /* only for 2PC */ +} xl_xact_parsed_abort; /* ---------------- @@ -256,8 +336,25 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); extern int xactGetCommittedChildren(TransactionId **ptr); +extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNode *rels, + int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInval, bool forceSync, + TransactionId twophase_xid); + +extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNode *rels, + TransactionId twophase_xid); extern void xact_redo(XLogReaderState *record); + +/* xactdesc.c */ extern void xact_desc(StringInfo buf, XLogReaderState *record); extern const char *xact_identify(uint8 info); +/* also in xactdesc.c, so they can be shared between front/backend code */ +extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed); +extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed); + #endif /* XACT_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index f1be598d8f..12a1b6173f 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -31,7 +31,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD082 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData {