diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 7982e16abc..6e6678cfff 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -222,7 +222,7 @@ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); static char *ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool overwriteOK, bool setParent, - TransactionId *result, TransactionId *maxsubxid); + bool setNextXid); static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); @@ -1744,7 +1744,7 @@ restoreTwoPhaseData(void) buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr, true, false, false, - NULL, NULL); + false); if (buf == NULL) continue; @@ -1786,7 +1786,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) { TransactionId origNextXid = ShmemVariableCache->nextXid; TransactionId result = origNextXid; - TransactionId maxsubxid = origNextXid; TransactionId *xids = NULL; int nxids = 0; int allocsize = 0; @@ -1806,11 +1805,18 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, false, false, - &result, &maxsubxid); + true); if (buf == NULL) continue; + /* + * OK, we think this file is valid. Incorporate xid into the + * running-minimum result. + */ + if (TransactionIdPrecedes(xid, result)) + result = xid; + if (xids_p) { if (nxids == allocsize) @@ -1839,15 +1845,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) *nxids_p = nxids; } - /* update nextXid if needed */ - if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid)) - { - LWLockAcquire(XidGenLock, LW_EXCLUSIVE); - ShmemVariableCache->nextXid = maxsubxid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - LWLockRelease(XidGenLock); - } - return result; } @@ -1884,7 +1881,7 @@ StandbyRecoverPreparedTransactions(bool overwriteOK) buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, overwriteOK, true, - NULL, NULL); + false); if (buf != NULL) pfree(buf); } @@ -1924,7 +1921,7 @@ RecoverPreparedTransactions(void) buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, gxact->ondisk, false, false, - NULL, NULL); + false); if (buf == NULL) continue; @@ -2012,20 +2009,16 @@ RecoverPreparedTransactions(void) * If setParent is true, then use the overwriteOK parameter to set up * subtransaction parent linkages. * - * If result and maxsubxid are not NULL, fill them up with smallest - * running transaction id (lesser than ShmemVariableCache->nextXid) - * and largest subtransaction id for this transaction respectively. + * If setNextXid is true, set ShmemVariableCache->nextXid to the newest + * value scanned. */ static char * ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool overwriteOK, - bool setParent, TransactionId *result, - TransactionId *maxsubxid) + bool setParent, bool setNextXid) { TransactionId origNextXid = ShmemVariableCache->nextXid; - TransactionId res = InvalidTransactionId; - TransactionId maxsub = InvalidTransactionId; TransactionId *subxids; char *buf; TwoPhaseFileHeader *hdr; @@ -2034,11 +2027,6 @@ ProcessTwoPhaseBuffer(TransactionId xid, if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); - if (result) - res = *result; - if (maxsubxid) - maxsub = *maxsubxid; - /* Already processed? */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { @@ -2120,13 +2108,6 @@ ProcessTwoPhaseBuffer(TransactionId xid, return NULL; } - /* - * OK, we think this buffer is valid. Incorporate xid into the - * running-minimum result. - */ - if (TransactionIdPrecedes(xid, res)) - res = xid; - /* * Examine subtransaction XIDs ... they should all follow main * XID, and they may force us to advance nextXid. @@ -2139,17 +2120,31 @@ ProcessTwoPhaseBuffer(TransactionId xid, TransactionId subxid = subxids[i]; Assert(TransactionIdFollows(subxid, xid)); - if (TransactionIdFollowsOrEquals(subxid, maxsub)) - maxsub = subxid; + + /* update nextXid if needed */ + if (setNextXid && + TransactionIdFollowsOrEquals(subxid, + ShmemVariableCache->nextXid)) + { + /* + * We don't expect anyone else to modify nextXid, hence we don't + * need to hold a lock while examining it. We still acquire the + * lock to modify it, though, so we recheck. + */ + LWLockAcquire(XidGenLock, LW_EXCLUSIVE); + if (TransactionIdFollowsOrEquals(subxid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = subxid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + LWLockRelease(XidGenLock); + } + if (setParent) SubTransSetParent(xid, subxid, overwriteOK); } - if (result) - *result = res; - if (maxsubxid) - *maxsubxid = maxsub; - return buf; }