Speedup 2PC by skipping two phase state files in normal path

2PC state info is written only to WAL at PREPARE, then read back from WAL at
COMMIT PREPARED/ABORT PREPARED. Prepared transactions that live past one bufmgr
checkpoint cycle will be written to disk in the same form as previously. Crash
recovery path is not altered. Measured performance gains of 50-100% for short
2PC transactions by completely avoiding writing files and fsyncing. Other
optimizations still available, further patches in related areas expected.

Stas Kelvich and heavily edited by Simon Riggs

Based upon earlier ideas and patches by Michael Paquier and Heikki Linnakangas,
a concrete example of how Postgres-XC has fed back ideas into PostgreSQL.

Reviewed by Michael Paquier, Jeff Janes and Andres Freund
Performance testing by Jesper Pedersen
This commit is contained in:
Simon Riggs 2016-01-20 18:40:44 -08:00
parent d0f2f53cd6
commit 978b2f65aa
3 changed files with 162 additions and 223 deletions

View File

@ -25,11 +25,28 @@
* what keeps the XID considered running by TransactionIdIsInProgress.
* It is also convenient as a PGPROC to hook the gxact's locks to.
*
* In order to survive crashes and shutdowns, all prepared
* transactions must be stored in permanent storage. This includes
* locking information, pending notifications etc. All that state
* information is written to the per-transaction state file in
* the pg_twophase directory.
* Information to recover prepared transactions in case of crash is
* now stored in WAL for the common case. In some cases there will be
* an extended period between preparing a GXACT and commit/abort, in
* which case we need to separately record prepared transaction data
* in permanent storage. This includes locking information, pending
* notifications etc. All that state information is written to the
* per-transaction state file in the pg_twophase directory.
* All prepared transactions will be written prior to shutdown.
*
* Life track of state data is following:
*
* * On PREPARE TRANSACTION backend writes state data only to the WAL and
* stores pointer to the start of the WAL record in
* gxact->prepare_start_lsn.
* * If COMMIT occurs before checkpoint then backend reads data from WAL
* using prepare_start_lsn.
* * On checkpoint state data copied to files in pg_twophase directory and
* fsynced
* * If COMMIT happens after checkpoint then backend reads state data from
* files
* * In case of crash replay will move data from xlog to files, if that
* hasn't happened before. XXX TODO - move to shmem in replay also
*
*-------------------------------------------------------------------------
*/
@ -51,6 +68,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "access/xlogreader.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
#include "funcapi.h"
@ -117,10 +135,21 @@ typedef struct GlobalTransactionData
int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
/*
* Note that we need to keep track of two LSNs for each GXACT.
* We keep track of the start LSN because this is the address we must
* use to read state data back from WAL when committing a prepared GXACT.
* We keep track of the end LSN because that is the LSN we need to wait
* for prior to commit.
*/
XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
Oid owner; /* ID of user that executed the xact */
BackendId locking_backend; /* backend currently working on the xact */
bool valid; /* TRUE if PGPROC entry is in proc array */
bool ondisk; /* TRUE if prepare state file is on disk */
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
} GlobalTransactionData;
@ -166,6 +195,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
/*
* Initialization of shared memory
@ -398,11 +428,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
/* initialize LSN to 0 (start of WAL) */
gxact->prepare_lsn = 0;
/* initialize LSN to InvalidXLogRecPtr */
gxact->prepare_start_lsn = InvalidXLogRecPtr;
gxact->prepare_end_lsn = InvalidXLogRecPtr;
gxact->owner = owner;
gxact->locking_backend = MyBackendId;
gxact->valid = false;
gxact->ondisk = false;
strcpy(gxact->gid, gid);
/* And insert it into the active array */
@ -578,41 +610,6 @@ RemoveGXact(GlobalTransaction gxact)
elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
}
/*
* TransactionIdIsPrepared
* True iff transaction associated with the identifier is prepared
* for two-phase commit
*
* Note: only gxacts marked "valid" are considered; but notice we do not
* check the locking status.
*
* This is not currently exported, because it is only needed internally.
*/
static bool
TransactionIdIsPrepared(TransactionId xid)
{
bool result = false;
int i;
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
if (gxact->valid && pgxact->xid == xid)
{
result = true;
break;
}
}
LWLockRelease(TwoPhaseStateLock);
return result;
}
/*
* Returns an array of all prepared transactions for the user-level
* function pg_prepared_xact.
@ -1013,21 +1010,13 @@ StartPrepare(GlobalTransaction gxact)
}
/*
* Finish preparing state file.
*
* Calculates CRC and writes state file to WAL and in pg_twophase directory.
* Finish preparing state data and writing it to WAL.
*/
void
EndPrepare(GlobalTransaction gxact)
{
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
StateFileChunk *record;
pg_crc32c statefile_crc;
pg_crc32c bogus_crc;
int fd;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@ -1039,8 +1028,9 @@ EndPrepare(GlobalTransaction gxact)
hdr->total_len = records.total_len + sizeof(pg_crc32c);
/*
* If the file size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail at commit time.
* If the data size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
* where we write data to file and then re-read at commit time.
*/
if (hdr->total_len > MaxAllocSize)
ereport(ERROR,
@ -1048,70 +1038,8 @@ EndPrepare(GlobalTransaction gxact)
errmsg("two-phase state file maximum length exceeded")));
/*
* Create the 2PC state file.
*/
TwoPhaseFilePath(path, xid);
fd = OpenTransientFile(path,
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
S_IRUSR | S_IWUSR);
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not create two-phase state file \"%s\": %m",
path)));
/* Write data to file, and calculate CRC as we pass over it */
INIT_CRC32C(statefile_crc);
for (record = records.head; record != NULL; record = record->next)
{
COMP_CRC32C(statefile_crc, record->data, record->len);
if ((write(fd, record->data, record->len)) != record->len)
{
CloseTransientFile(fd);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write two-phase state file: %m")));
}
}
FIN_CRC32C(statefile_crc);
/*
* Write a deliberately bogus CRC to the state file; this is just paranoia
* to catch the case where four more bytes will run us out of disk space.
*/
bogus_crc = ~statefile_crc;
if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
{
CloseTransientFile(fd);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write two-phase state file: %m")));
}
/* Back up to prepare for rewriting the CRC */
if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
{
CloseTransientFile(fd);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in two-phase state file: %m")));
}
/*
* The state file isn't valid yet, because we haven't written the correct
* CRC yet. Before we do that, insert entry in WAL and flush it to disk.
*
* Between the time we have written the WAL entry and the time we write
* out the correct state file CRC, we have an inconsistency: the xact is
* prepared according to WAL but not according to our on-disk state. We
* use a critical section to force a PANIC if we are unable to complete
* the write --- then, WAL replay should repair the inconsistency. The
* odds of a PANIC actually occurring should be very tiny given that we
* were able to write the bogus CRC above.
* Now writing 2PC state data to WAL. We let the WAL's CRC protection
* cover us, so no need to calculate a separate CRC.
*
* We have to set delayChkpt here, too; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@ -1131,24 +1059,13 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
XLogFlush(gxact->prepare_lsn);
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
/* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
{
CloseTransientFile(fd);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write two-phase state file: %m")));
}
if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close two-phase state file: %m")));
/* Store record's start location to read that later on Commit */
gxact->prepare_start_lsn = ProcLastRecPtr;
/*
* Mark the prepared transaction as valid. As soon as xact.c marks
@ -1186,7 +1103,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks.
*/
SyncRepWaitForLSN(gxact->prepare_lsn);
SyncRepWaitForLSN(gxact->prepare_end_lsn);
records.tail = records.head = NULL;
records.num_chunks = 0;
@ -1315,6 +1232,57 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
* twophase files and ReadTwoPhaseFile should be used instead.
*
* Note clearly that this function accesses WAL during normal operation, similarly
* to the way WALSender or Logical Decoding would do. It does not run during
* crash recovery or standby processing.
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
{
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
Assert(!RecoveryInProgress());
xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed while allocating an XLog reading processor.")));
record = XLogReadRecord(xlogreader, lsn, &errormsg);
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read two-phase state from xlog at %X/%X",
(uint32) (lsn >> 32),
(uint32) lsn)));
if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
(XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("expected two-phase state data is not present in xlog at %X/%X",
(uint32) (lsn >> 32),
(uint32) lsn)));
if (len != NULL)
*len = XLogRecGetDataLen(xlogreader);
*buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
XLogReaderFree(xlogreader);
}
/*
* Confirms an xid is prepared, during recovery
*/
@ -1375,14 +1343,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact->xid;
/*
* Read and validate the state file
* Read and validate 2PC state data.
* State data will typically be stored in WAL files if the LSN is after the
* last checkpoint record, or moved to disk if for some reason they have
* lived for a long time.
*/
buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("two-phase state file for transaction %u is corrupt",
xid)));
if (gxact->ondisk)
buf = ReadTwoPhaseFile(xid, true);
else
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
/*
* Disassemble the header area
@ -1482,9 +1452,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat(isCommit);
/*
* And now we can clean up our mess.
* And now we can clean up any files we may have left.
*/
RemoveTwoPhaseFile(xid, true);
if (gxact->ondisk)
RemoveTwoPhaseFile(xid, true);
RemoveGXact(gxact);
MyLockedGxact = NULL;
@ -1493,8 +1464,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
}
/*
* Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
* and call the indicated callbacks for each 2PC record.
* Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
*/
static void
ProcessRecords(char *bufptr, TransactionId xid,
@ -1539,7 +1509,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
* Recreates a state file. This is used in WAL replay.
* Recreates a state file. This is used in WAL replay and during
* checkpoint creation.
*
* Note: content and len don't include CRC.
*/
@ -1610,97 +1581,71 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
* This is deliberately run as late as possible in the checkpoint sequence,
* because GXACTs ordinarily have short lifespans, and so it is quite
* possible that GXACTs that were valid at checkpoint start will no longer
* exist if we wait a little bit.
* exist if we wait a little bit. With typical checkpoint settings this
* will be about 3 minutes for an online checkpoint, so as a result we
* we expect that there will be no GXACTs that need to be copied to disk.
*
* If a GXACT remains valid across multiple checkpoints, it'll be fsynced
* each time. This is considered unusual enough that we don't bother to
* expend any extra code to avoid the redundant fsyncs. (They should be
* reasonably cheap anyway, since they won't cause I/O.)
* If a GXACT remains valid across multiple checkpoints, it will already
* be on disk so we don't bother to repeat that write.
*/
void
CheckPointTwoPhase(XLogRecPtr redo_horizon)
{
TransactionId *xids;
int nxids;
char path[MAXPGPATH];
int i;
int serialized_xacts = 0;
/*
* We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
* it just long enough to make a list of the XIDs that require fsyncing,
* and then do the I/O afterwards.
*
* This approach creates a race condition: someone else could delete a
* GXACT between the time we release TwoPhaseStateLock and the time we try
* to open its state file. We handle this by special-casing ENOENT
* failures: if we see that, we verify that the GXACT is no longer valid,
* and if so ignore the failure.
*/
if (max_prepared_xacts <= 0)
return; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
nxids = 0;
/*
* We are expecting there to be zero GXACTs that need to be
* copied to disk, so we perform all I/O while holding
* TwoPhaseStateLock for simplicity. This prevents any new xacts
* from preparing while this occurs, which shouldn't be a problem
* since the presence of long-lived prepared xacts indicates the
* transaction manager isn't active.
*
* It's also possible to move I/O out of the lock, but on
* every error we should check whether somebody commited our
* transaction in different backend. Let's leave this optimisation
* for future, if somebody will spot that this place cause
* bottleneck.
*
* Note that it isn't possible for there to be a GXACT with
* a prepare_end_lsn set prior to the last checkpoint yet
* is marked invalid, because of the efforts with delayChkpt.
*/
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
if (gxact->valid &&
gxact->prepare_lsn <= redo_horizon)
xids[nxids++] = pgxact->xid;
}
!gxact->ondisk &&
gxact->prepare_end_lsn <= redo_horizon)
{
char *buf;
int len;
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
RecreateTwoPhaseFile(pgxact->xid, buf, len);
gxact->ondisk = true;
pfree(buf);
serialized_xacts++;
}
}
LWLockRelease(TwoPhaseStateLock);
for (i = 0; i < nxids; i++)
{
TransactionId xid = xids[i];
int fd;
TwoPhaseFilePath(path, xid);
fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
if (fd < 0)
{
if (errno == ENOENT)
{
/* OK if gxact is no longer valid */
if (!TransactionIdIsPrepared(xid))
continue;
/* Restore errno in case it was changed */
errno = ENOENT;
}
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open two-phase state file \"%s\": %m",
path)));
}
if (pg_fsync(fd) != 0)
{
CloseTransientFile(fd);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fsync two-phase state file \"%s\": %m",
path)));
}
if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close two-phase state file \"%s\": %m",
path)));
}
pfree(xids);
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
if (log_checkpoints && serialized_xacts > 0)
ereport(LOG,
(errmsg("%u two-phase state files were written "
"for long-running prepared transactions",
serialized_xacts)));
}
/*
@ -2029,17 +1974,11 @@ RecoverPreparedTransactions(void)
/*
* Recreate its GXACT and dummy PGPROC
*
* Note: since we don't have the PREPARE record's WAL location at
* hand, we leave prepare_lsn zeroes. This means the GXACT will
* be fsync'd on every future checkpoint. We assume this
* situation is infrequent enough that the performance cost is
* negligible (especially since we know the state file has already
* been fsynced).
*/
gxact = MarkAsPreparing(xid, hdr->gid,
hdr->prepared_at,
hdr->owner, hdr->database);
gxact->ondisk = true;
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
MarkAsPrepared(gxact);

View File

@ -321,8 +321,7 @@ static TimeLineID curFileTLI;
* stored here. The parallel leader advances its own copy, when necessary,
* in WaitForParallelWorkersToFinish.
*/
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;

View File

@ -86,6 +86,7 @@ typedef enum
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
extern XLogRecPtr ProcLastRecPtr;
extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;