Store 2PC GID in commit/abort WAL recs for logical decoding

Store GID of 2PC in commit/abort WAL records when wal_level = logical.
This allows logical decoding to send the SAME gid to subscribers
across restarts of logical replication.

Track relica origin replay progress for 2PC.

(Edited from patch 0003 in the logical decoding 2PC series.)

Authors: Nikhil Sontakke, Stas Kelvich
Reviewed-by: Simon Riggs, Andres Freund
This commit is contained in:
Simon Riggs 2018-03-28 17:42:50 +01:00
parent 75e95dd79b
commit 1eb6d6527a
5 changed files with 230 additions and 24 deletions

View File

@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
if (parsed->xinfo & XACT_XINFO_HAS_GID)
{
int gidlen;
strcpy(parsed->twophase_gid, data);
gidlen = strlen(parsed->twophase_gid) + 1;
data += MAXALIGN(gidlen);
}
}
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
data += sizeof(xl_xact_xinfo);
}
if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
{
xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
parsed->dbId = xl_dbinfo->dbId;
parsed->tsId = xl_dbinfo->tsId;
data += sizeof(xl_xact_dbinfo);
}
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
if (parsed->xinfo & XACT_XINFO_HAS_GID)
{
int gidlen;
strcpy(parsed->twophase_gid, data);
gidlen = strlen(parsed->twophase_gid) + 1;
data += MAXALIGN(gidlen);
}
}
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
xl_xact_origin xl_origin;
/* we're only guaranteed 4 byte alignment, so copy onto stack */
memcpy(&xl_origin, data, sizeof(xl_origin));
parsed->origin_lsn = xl_origin.origin_lsn;
parsed->origin_timestamp = xl_origin.origin_timestamp;
data += sizeof(xl_xact_origin);
}
}

View File

@ -144,11 +144,7 @@ int max_prepared_xacts = 0;
*
* typedef struct GlobalTransactionData *GlobalTransaction appears in
* twophase.h
*
* Note that the max value of GIDSIZE must fit in the uint16 gidlen,
* specified in TwoPhaseFileHeader.
*/
#define GIDSIZE 200
typedef struct GlobalTransactionData
{
@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
bool initfileinval);
bool initfileinval,
const char *gid);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileNode *rels);
RelFileNode *rels,
const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
/*
* Header for a 2PC state file
*/
#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
XLogRecPtr origin_lsn; /* lsn of this record at origin node */
TimestampTz origin_timestamp; /* time of prepare at origin node */
} TwoPhaseFileHeader;
/*
@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
{
TwoPhaseFileHeader *hdr;
StateFileChunk *record;
bool replorigin;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId);
if (replorigin)
{
Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
hdr->origin_lsn = replorigin_session_origin_lsn;
hdr->origin_timestamp = replorigin_session_origin_timestamp;
}
else
{
hdr->origin_lsn = InvalidXLogRecPtr;
hdr->origin_timestamp = 0;
}
/*
* If the data size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
gxact->prepare_end_lsn);
XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
/*
* ParsePrepareRecord
*/
void
ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
{
TwoPhaseFileHeader *hdr;
char *bufptr;
hdr = (TwoPhaseFileHeader *) xlrec;
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
parsed->origin_lsn = hdr->origin_lsn;
parsed->origin_timestamp = hdr->origin_timestamp;
parsed->twophase_xid = hdr->xid;
parsed->dbId = hdr->database;
parsed->nsubxacts = hdr->nsubxacts;
parsed->nrels = hdr->ncommitrels;
parsed->nabortrels = hdr->nabortrels;
parsed->nmsgs = hdr->ninvalmsgs;
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
bufptr += MAXALIGN(hdr->gidlen);
parsed->subxacts = (TransactionId *) bufptr;
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
parsed->xnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
parsed->abortnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
parsed->msgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
}
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval);
hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
hdr->nabortrels, abortrels,
gid);
ProcArrayRemove(proc, latestXid);
@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
if (buf == NULL)
continue;
PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
PrepareRedoAdd(buf, InvalidXLogRecPtr,
InvalidXLogRecPtr, InvalidRepOriginId);
}
}
LWLockRelease(TwoPhaseStateLock);
@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
bool initfileinval,
const char *gid)
{
XLogRecPtr recptr;
TimestampTz committs = GetCurrentTimestamp();
@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
ninvalmsgs, invalmsgs,
initfileinval, false,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
xid);
xid, gid);
if (replorigin)
@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
RelFileNode *rels)
RelFileNode *rels,
const char *gid)
{
XLogRecPtr recptr;
@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
nchildren, children,
nrels, rels,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
xid);
xid, gid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
* data, the entry is marked as located on disk.
*/
void
PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
XLogRecPtr end_lsn, RepOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
if (origin_id != InvalidRepOriginId)
{
/* recover apply progress */
replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
false /* backward */ , false /* WAL */ );
}
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
}

View File

@ -1226,7 +1226,7 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
InvalidTransactionId /* plain commit */ );
InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact)
XactLogAbortRecord(xact_time,
nchildren, children,
nrels, rels,
MyXactFlags, InvalidTransactionId);
MyXactFlags, InvalidTransactionId,
NULL);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@ -5234,7 +5235,8 @@ XactLogCommitRecord(TimestampTz commit_time,
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
int xactflags, TransactionId twophase_xid)
int xactflags, TransactionId twophase_xid,
const char *twophase_gid)
{
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
@ -5246,6 +5248,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_origin xl_origin;
uint8 info;
int gidlen = 0;
Assert(CritSectionCount > 0);
@ -5308,6 +5311,13 @@ XactLogCommitRecord(TimestampTz commit_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
Assert(twophase_gid != NULL);
if (XLogLogicalInfoActive())
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
gidlen = strlen(twophase_gid) + 1; /* include '\0' */
}
}
/* dump transaction origin information */
@ -5358,7 +5368,16 @@ XactLogCommitRecord(TimestampTz commit_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
{
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
{
static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
XLogRegisterData((char*) twophase_gid, gidlen);
if (MAXALIGN(gidlen) != gidlen)
XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
}
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@ -5379,15 +5398,19 @@ XLogRecPtr
XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
int xactflags, TransactionId twophase_xid)
int xactflags, TransactionId twophase_xid,
const char *twophase_gid)
{
xl_xact_abort xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_twophase xl_twophase;
xl_xact_dbinfo xl_dbinfo;
xl_xact_origin xl_origin;
uint8 info;
int gidlen = 0;
Assert(CritSectionCount > 0);
@ -5423,6 +5446,31 @@ XactLogAbortRecord(TimestampTz abort_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
Assert(twophase_gid != NULL);
if (XLogLogicalInfoActive())
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
gidlen = strlen(twophase_gid) + 1; /* include '\0' */
}
}
if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
xl_dbinfo.dbId = MyDatabaseId;
xl_dbinfo.tsId = MyDatabaseTableSpace;
}
/* dump transaction origin information only for abort prepared */
if ( (replorigin_session_origin != InvalidRepOriginId) &&
TransactionIdIsValid(twophase_xid) &&
XLogLogicalInfoActive())
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
xl_origin.origin_lsn = replorigin_session_origin_lsn;
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
@ -5437,6 +5485,10 @@ XactLogAbortRecord(TimestampTz abort_time,
if (xl_xinfo.xinfo != 0)
XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
{
XLogRegisterData((char *) (&xl_subxacts),
@ -5454,7 +5506,22 @@ XactLogAbortRecord(TimestampTz abort_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
{
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
{
static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
XLogRegisterData((char*) twophase_gid, gidlen);
if (MAXALIGN(gidlen) != gidlen)
XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
}
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
if (TransactionIdIsValid(twophase_xid))
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_XACT_ID, info);
}
@ -5777,7 +5844,8 @@ xact_redo(XLogReaderState *record)
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoAdd(XLogRecGetData(record),
record->ReadRecPtr,
record->EndRecPtr);
record->EndRecPtr,
XLogRecGetOrigin(record));
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ASSIGNMENT)

View File

@ -15,6 +15,7 @@
#define TWOPHASE_H
#include "access/xlogdefs.h"
#include "access/xact.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
int *nxids_p);
extern void ParsePrepareRecord(uint8 info, char *xlrec,
xl_xact_parsed_prepare *parsed);
extern void StandbyRecoverPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
@ -54,7 +57,7 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
XLogRecPtr end_lsn);
XLogRecPtr end_lsn, RepOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
#endif /* TWOPHASE_H */

View File

@ -21,6 +21,13 @@
#include "storage/sinval.h"
#include "utils/datetime.h"
/*
* Maximum size of Global Transaction ID (including '\0').
*
* Note that the max value of GIDSIZE must fit in the uint16 gidlen,
* specified in TwoPhaseFileHeader.
*/
#define GIDSIZE 200
/*
* Xact isolation levels
@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
#define XACT_XINFO_HAS_AE_LOCKS (1U << 6)
#define XACT_XINFO_HAS_GID (1U << 7)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
@ -286,7 +294,6 @@ typedef struct xl_xact_abort
typedef struct xl_xact_parsed_commit
{
TimestampTz xact_time;
uint32 xinfo;
Oid dbId; /* MyDatabaseId */
@ -302,16 +309,24 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
char twophase_gid[GIDSIZE]; /* only for 2PC */
int nabortrels; /* only for 2PC */
RelFileNode *abortnodes; /* only for 2PC */
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
typedef struct xl_xact_parsed_abort
{
TimestampTz xact_time;
uint32 xinfo;
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
int nsubxacts;
TransactionId *subxacts;
@ -319,6 +334,10 @@ typedef struct xl_xact_parsed_abort
RelFileNode *xnodes;
TransactionId twophase_xid; /* only for 2PC */
char twophase_gid[GIDSIZE]; /* only for 2PC */
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_abort;
@ -386,12 +405,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
int xactflags,
TransactionId twophase_xid);
TransactionId twophase_xid,
const char *twophase_gid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
int xactflags, TransactionId twophase_xid);
int xactflags, TransactionId twophase_xid,
const char *twophase_gid);
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */