diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 80d2d20d6c..6023e7c16f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2276,6 +2276,14 @@ RecordTransactionAbortPrepared(TransactionId xid, const char *gid) { XLogRecPtr recptr; + bool replorigin; + + /* + * Are we using the replication origins feature? Or, in other words, are + * we replaying remote actions? + */ + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); /* * Catch the scenario where we aborted partway through @@ -2298,6 +2306,11 @@ RecordTransactionAbortPrepared(TransactionId xid, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, xid, gid); + if (replorigin) + /* Move LSNs forward for this replication origin */ + replorigin_session_advance(replorigin_session_origin_lsn, + XactLastRecEnd); + /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 4e6a3df6b8..c83aa16f2c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5714,10 +5714,12 @@ XactLogAbortRecord(TimestampTz abort_time, xl_dbinfo.tsId = MyDatabaseTableSpace; } - /* dump transaction origin information only for abort prepared */ + /* + * Dump transaction origin information only for abort prepared. We need + * this during recovery to update the replication origin progress. + */ if ((replorigin_session_origin != InvalidRepOriginId) && - TransactionIdIsValid(twophase_xid) && - XLogLogicalInfoActive()) + TransactionIdIsValid(twophase_xid)) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; @@ -5923,7 +5925,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, * because subtransaction commit is never WAL logged. */ static void -xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) +xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, + XLogRecPtr lsn, RepOriginId origin_id) { TransactionId max_xid; @@ -5972,6 +5975,13 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts); } + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + /* recover apply progress */ + replorigin_advance(origin_id, parsed->origin_lsn, lsn, + false /* backward */ , false /* WAL */ ); + } + /* Make sure files supposed to be dropped are dropped */ DropRelationFiles(parsed->xnodes, parsed->nrels, true); } @@ -6013,7 +6023,8 @@ xact_redo(XLogReaderState *record) xl_xact_parsed_abort parsed; ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); - xact_redo_abort(&parsed, XLogRecGetXid(record)); + xact_redo_abort(&parsed, XLogRecGetXid(record), + record->EndRecPtr, XLogRecGetOrigin(record)); } else if (info == XLOG_XACT_ABORT_PREPARED) { @@ -6021,7 +6032,8 @@ xact_redo(XLogReaderState *record) xl_xact_parsed_abort parsed; ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); - xact_redo_abort(&parsed, parsed.twophase_xid); + xact_redo_abort(&parsed, parsed.twophase_xid, + record->EndRecPtr, XLogRecGetOrigin(record)); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);