diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2ca1c14549..7f01a83c4f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -962,25 +962,9 @@ RecordTransactionCommit(void) /* * Begin commit critical section and insert the commit XLOG record. */ - XLogRecData rdata[4]; - int lastrdata = 0; - xl_xact_commit xlrec; - /* Tell bufmgr and smgr to prepare for commit */ BufmgrCommit(); - /* - * Set flags required for recovery processing of commits. - */ - xlrec.xinfo = 0; - if (RelcacheInitFileInval) - xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; - if (forceSyncCommit) - xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; - - xlrec.dbId = MyDatabaseId; - xlrec.tsId = MyDatabaseTableSpace; - /* * Mark ourselves as within our "commit critical section". This * forces any concurrent checkpoint to wait until we've updated @@ -1002,43 +986,88 @@ RecordTransactionCommit(void) MyProc->inCommit = true; SetCurrentTransactionStopTimestamp(); - xlrec.xact_time = xactStopTimestamp; - xlrec.nrels = nrels; - xlrec.nsubxacts = nchildren; - xlrec.nmsgs = nmsgs; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactCommit; - rdata[0].buffer = InvalidBuffer; - /* dump rels to delete */ - if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } - /* dump committed child Xids */ - if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } - /* dump shared cache invalidation messages */ - if (nmsgs > 0) - { - rdata[lastrdata].next = &(rdata[3]); - rdata[3].data = (char *) invalMessages; - rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); - rdata[3].buffer = InvalidBuffer; - lastrdata = 3; - } - rdata[lastrdata].next = NULL; - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); + /* + * Do we need the long commit record? If not, use the compact format. + */ + if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit) + { + XLogRecData rdata[4]; + int lastrdata = 0; + xl_xact_commit xlrec; + /* + * Set flags required for recovery processing of commits. + */ + xlrec.xinfo = 0; + if (RelcacheInitFileInval) + xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; + if (forceSyncCommit) + xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; + + xlrec.dbId = MyDatabaseId; + xlrec.tsId = MyDatabaseTableSpace; + + xlrec.xact_time = xactStopTimestamp; + xlrec.nrels = nrels; + xlrec.nsubxacts = nchildren; + xlrec.nmsgs = nmsgs; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactCommit; + rdata[0].buffer = InvalidBuffer; + /* dump rels to delete */ + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].data = (char *) rels; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].buffer = InvalidBuffer; + lastrdata = 1; + } + /* dump committed child Xids */ + if (nchildren > 0) + { + rdata[lastrdata].next = &(rdata[2]); + rdata[2].data = (char *) children; + rdata[2].len = nchildren * sizeof(TransactionId); + rdata[2].buffer = InvalidBuffer; + lastrdata = 2; + } + /* dump shared cache invalidation messages */ + if (nmsgs > 0) + { + rdata[lastrdata].next = &(rdata[3]); + rdata[3].data = (char *) invalMessages; + rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); + rdata[3].buffer = InvalidBuffer; + lastrdata = 3; + } + rdata[lastrdata].next = NULL; + + (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); + } + else + { + XLogRecData rdata[2]; + int lastrdata = 0; + xl_xact_commit_compact xlrec; + xlrec.xact_time = xactStopTimestamp; + xlrec.nsubxacts = nchildren; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactCommitCompact; + rdata[0].buffer = InvalidBuffer; + /* dump committed child Xids */ + if (nchildren > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].data = (char *) children; + rdata[1].len = nchildren * sizeof(TransactionId); + rdata[1].buffer = InvalidBuffer; + lastrdata = 1; + } + rdata[lastrdata].next = NULL; + + (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata); + } } /* @@ -4441,19 +4470,17 @@ xactGetCommittedChildren(TransactionId **ptr) * actions for which the order of execution is critical. */ static void -xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) +xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, + TransactionId *sub_xids, int nsubxacts, + SharedInvalidationMessage *inval_msgs, int nmsgs, + RelFileNode *xnodes, int nrels, + Oid dbId, Oid tsId, + uint32 xinfo) { - TransactionId *sub_xids; - SharedInvalidationMessage *inval_msgs; TransactionId max_xid; int i; - /* subxid array follows relfilenodes */ - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - /* invalidation messages array follows subxids */ - inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]); - - max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids); + max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. @@ -4476,7 +4503,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) /* * Mark the transaction committed in pg_clog. */ - TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids); + TransactionIdCommitTree(xid, nsubxacts, sub_xids); } else { @@ -4500,41 +4527,41 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ - TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn); + TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ - ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); + ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ - ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, - XactCompletionRelcacheInitFileInval(xlrec), - xlrec->dbId, xlrec->tsId); + ProcessCommittedInvalidationMessages(inval_msgs, nmsgs, + XactCompletionRelcacheInitFileInval(xinfo), + dbId, tsId); /* * Release locks, if any. We do this for both two phase and normal one * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ - StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); + StandbyReleaseLockTree(xid, nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ - for (i = 0; i < xlrec->nrels; i++) + for (i = 0; i < nrels; i++) { - SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); + SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { - XLogDropRelation(xlrec->xnodes[i], fork); + XLogDropRelation(xnodes[i], fork); smgrdounlink(srel, fork, true); } } @@ -4553,8 +4580,46 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) * to reduce that problem window, for any user that requested * ForceSyncCommit(). */ - if (XactCompletionForceSyncCommit(xlrec)) + if (XactCompletionForceSyncCommit(xinfo)) XLogFlush(lsn); + +} +/* + * Utility function to call xact_redo_commit_internal after breaking down xlrec + */ +static void +xact_redo_commit(xl_xact_commit *xlrec, + TransactionId xid, XLogRecPtr lsn) +{ + TransactionId *subxacts; + SharedInvalidationMessage *inval_msgs; + + /* subxid array follows relfilenodes */ + subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + /* invalidation messages array follows subxids */ + inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); + + xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts, + inval_msgs, xlrec->nmsgs, + xlrec->xnodes, xlrec->nrels, + xlrec->dbId, + xlrec->tsId, + xlrec->xinfo); +} + +/* + * Utility function to call xact_redo_commit_internal for compact form of message. + */ +static void +xact_redo_commit_compact(xl_xact_commit_compact *xlrec, + TransactionId xid, XLogRecPtr lsn) +{ + xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts, + NULL, 0, /* inval msgs */ + NULL, 0, /* relfilenodes */ + InvalidOid, /* dbId */ + InvalidOid, /* tsId */ + 0); /* xinfo */ } /* @@ -4655,7 +4720,13 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record) /* Backup blocks are not used in xact records */ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - if (info == XLOG_XACT_COMMIT) + if (info == XLOG_XACT_COMMIT_COMPACT) + { + xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record); + + xact_redo_commit_compact(xlrec, record->xl_xid, lsn); + } + else if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); @@ -4703,9 +4774,9 @@ static void xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) { int i; - TransactionId *xacts; + TransactionId *subxacts; - xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; + subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); @@ -4724,15 +4795,15 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) { appendStringInfo(buf, "; subxacts:"); for (i = 0; i < xlrec->nsubxacts; i++) - appendStringInfo(buf, " %u", xacts[i]); + appendStringInfo(buf, " %u", subxacts[i]); } if (xlrec->nmsgs > 0) { SharedInvalidationMessage *msgs; - msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts]; + msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts]; - if (XactCompletionRelcacheInitFileInval(xlrec)) + if (XactCompletionRelcacheInitFileInval(xlrec->xinfo)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); @@ -4758,6 +4829,21 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) } } +static void +xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec) +{ + int i; + + appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); + + if (xlrec->nsubxacts > 0) + { + appendStringInfo(buf, "; subxacts:"); + for (i = 0; i < xlrec->nsubxacts; i++) + appendStringInfo(buf, " %u", xlrec->subxacts[i]); + } +} + static void xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec) { @@ -4802,7 +4888,14 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; - if (info == XLOG_XACT_COMMIT) + if (info == XLOG_XACT_COMMIT_COMPACT) + { + xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec; + + appendStringInfo(buf, "commit: "); + xact_desc_commit_compact(buf, xlrec); + } + else if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) rec; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4952d223cd..178a458eb7 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5593,7 +5593,14 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID) return false; record_info = record->xl_info & ~XLR_INFO_MASK; - if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) + if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT) + { + xl_xact_commit_compact *recordXactCommitData; + + recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record); + recordXtime = recordXactCommitData->xact_time; + } + else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) { xl_xact_commit *recordXactCommitData; @@ -5680,7 +5687,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) recoveryStopTime = recordXtime; recoveryStopAfter = *includeThis; - if (record_info == XLOG_XACT_COMMIT) + if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT) { if (recoveryStopAfter) ereport(LOG, diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cb440d41f1..c585752b00 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -106,6 +106,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 +#define XLOG_XACT_COMMIT_COMPACT 0x60 typedef struct xl_xact_assignment { @@ -116,6 +117,16 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +typedef struct xl_xact_commit_compact +{ + TimestampTz xact_time; /* time of commit */ + int nsubxacts; /* number of subtransaction XIDs */ + /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ + TransactionId subxacts[1]; /* VARIABLE LENGTH ARRAY */ +} xl_xact_commit_compact; + +#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts) + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -145,8 +156,8 @@ typedef struct xl_xact_commit #define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 /* Access macros for above flags */ -#define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) -#define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) +#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) +#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) typedef struct xl_xact_abort { diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 34316fffeb..4eaa243948 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -71,7 +71,7 @@ typedef struct XLogContRecord /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD067 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD068 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData {