diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2251b022da..8a22836406 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -25,11 +25,28 @@ * what keeps the XID considered running by TransactionIdIsInProgress. * It is also convenient as a PGPROC to hook the gxact's locks to. * - * In order to survive crashes and shutdowns, all prepared - * transactions must be stored in permanent storage. This includes - * locking information, pending notifications etc. All that state - * information is written to the per-transaction state file in - * the pg_twophase directory. + * Information to recover prepared transactions in case of crash is + * now stored in WAL for the common case. In some cases there will be + * an extended period between preparing a GXACT and commit/abort, in + * which case we need to separately record prepared transaction data + * in permanent storage. This includes locking information, pending + * notifications etc. All that state information is written to the + * per-transaction state file in the pg_twophase directory. + * All prepared transactions will be written prior to shutdown. + * + * Life track of state data is following: + * + * * On PREPARE TRANSACTION backend writes state data only to the WAL and + * stores pointer to the start of the WAL record in + * gxact->prepare_start_lsn. + * * If COMMIT occurs before checkpoint then backend reads data from WAL + * using prepare_start_lsn. + * * On checkpoint state data copied to files in pg_twophase directory and + * fsynced + * * If COMMIT happens after checkpoint then backend reads state data from + * files + * * In case of crash replay will move data from xlog to files, if that + * hasn't happened before. XXX TODO - move to shmem in replay also * *------------------------------------------------------------------------- */ @@ -51,6 +68,7 @@ #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" +#include "access/xlogreader.h" #include "catalog/pg_type.h" #include "catalog/storage.h" #include "funcapi.h" @@ -117,10 +135,21 @@ typedef struct GlobalTransactionData int pgprocno; /* ID of associated dummy PGPROC */ BackendId dummyBackendId; /* similar to backend id for backends */ TimestampTz prepared_at; /* time of preparation */ - XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */ + + /* + * Note that we need to keep track of two LSNs for each GXACT. + * We keep track of the start LSN because this is the address we must + * use to read state data back from WAL when committing a prepared GXACT. + * We keep track of the end LSN because that is the LSN we need to wait + * for prior to commit. + */ + XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ + XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ + Oid owner; /* ID of user that executed the xact */ BackendId locking_backend; /* backend currently working on the xact */ bool valid; /* TRUE if PGPROC entry is in proc array */ + bool ondisk; /* TRUE if prepare state file is on disk */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ } GlobalTransactionData; @@ -166,6 +195,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); +static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); /* * Initialization of shared memory @@ -398,11 +428,13 @@ MarkAsPreparing(TransactionId xid, const char *gid, pgxact->nxids = 0; gxact->prepared_at = prepared_at; - /* initialize LSN to 0 (start of WAL) */ - gxact->prepare_lsn = 0; + /* initialize LSN to InvalidXLogRecPtr */ + gxact->prepare_start_lsn = InvalidXLogRecPtr; + gxact->prepare_end_lsn = InvalidXLogRecPtr; gxact->owner = owner; gxact->locking_backend = MyBackendId; gxact->valid = false; + gxact->ondisk = false; strcpy(gxact->gid, gid); /* And insert it into the active array */ @@ -578,41 +610,6 @@ RemoveGXact(GlobalTransaction gxact) elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); } -/* - * TransactionIdIsPrepared - * True iff transaction associated with the identifier is prepared - * for two-phase commit - * - * Note: only gxacts marked "valid" are considered; but notice we do not - * check the locking status. - * - * This is not currently exported, because it is only needed internally. - */ -static bool -TransactionIdIsPrepared(TransactionId xid) -{ - bool result = false; - int i; - - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); - - for (i = 0; i < TwoPhaseState->numPrepXacts; i++) - { - GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - - if (gxact->valid && pgxact->xid == xid) - { - result = true; - break; - } - } - - LWLockRelease(TwoPhaseStateLock); - - return result; -} - /* * Returns an array of all prepared transactions for the user-level * function pg_prepared_xact. @@ -1013,21 +1010,13 @@ StartPrepare(GlobalTransaction gxact) } /* - * Finish preparing state file. - * - * Calculates CRC and writes state file to WAL and in pg_twophase directory. + * Finish preparing state data and writing it to WAL. */ void EndPrepare(GlobalTransaction gxact) { - PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; - TransactionId xid = pgxact->xid; TwoPhaseFileHeader *hdr; - char path[MAXPGPATH]; StateFileChunk *record; - pg_crc32c statefile_crc; - pg_crc32c bogus_crc; - int fd; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1039,8 +1028,9 @@ EndPrepare(GlobalTransaction gxact) hdr->total_len = records.total_len + sizeof(pg_crc32c); /* - * If the file size exceeds MaxAllocSize, we won't be able to read it in - * ReadTwoPhaseFile. Check for that now, rather than fail at commit time. + * If the data size exceeds MaxAllocSize, we won't be able to read it in + * ReadTwoPhaseFile. Check for that now, rather than fail in the case + * where we write data to file and then re-read at commit time. */ if (hdr->total_len > MaxAllocSize) ereport(ERROR, @@ -1048,70 +1038,8 @@ EndPrepare(GlobalTransaction gxact) errmsg("two-phase state file maximum length exceeded"))); /* - * Create the 2PC state file. - */ - TwoPhaseFilePath(path, xid); - - fd = OpenTransientFile(path, - O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, - S_IRUSR | S_IWUSR); - if (fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not create two-phase state file \"%s\": %m", - path))); - - /* Write data to file, and calculate CRC as we pass over it */ - INIT_CRC32C(statefile_crc); - - for (record = records.head; record != NULL; record = record->next) - { - COMP_CRC32C(statefile_crc, record->data, record->len); - if ((write(fd, record->data, record->len)) != record->len) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - } - - FIN_CRC32C(statefile_crc); - - /* - * Write a deliberately bogus CRC to the state file; this is just paranoia - * to catch the case where four more bytes will run us out of disk space. - */ - bogus_crc = ~statefile_crc; - - if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - - /* Back up to prepare for rewriting the CRC */ - if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in two-phase state file: %m"))); - } - - /* - * The state file isn't valid yet, because we haven't written the correct - * CRC yet. Before we do that, insert entry in WAL and flush it to disk. - * - * Between the time we have written the WAL entry and the time we write - * out the correct state file CRC, we have an inconsistency: the xact is - * prepared according to WAL but not according to our on-disk state. We - * use a critical section to force a PANIC if we are unable to complete - * the write --- then, WAL replay should repair the inconsistency. The - * odds of a PANIC actually occurring should be very tiny given that we - * were able to write the bogus CRC above. + * Now writing 2PC state data to WAL. We let the WAL's CRC protection + * cover us, so no need to calculate a separate CRC. * * We have to set delayChkpt here, too; otherwise a checkpoint starting * immediately after the WAL record is inserted could complete without @@ -1131,24 +1059,13 @@ EndPrepare(GlobalTransaction gxact) XLogBeginInsert(); for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); - gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); - XLogFlush(gxact->prepare_lsn); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ - /* write correct CRC and close file */ - if ((write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write two-phase state file: %m"))); - } - - if (CloseTransientFile(fd) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close two-phase state file: %m"))); + /* Store record's start location to read that later on Commit */ + gxact->prepare_start_lsn = ProcLastRecPtr; /* * Mark the prepared transaction as valid. As soon as xact.c marks @@ -1186,7 +1103,7 @@ EndPrepare(GlobalTransaction gxact) * Note that at this stage we have marked the prepare, but still show as * running in the procarray (twice!) and continue to hold locks. */ - SyncRepWaitForLSN(gxact->prepare_lsn); + SyncRepWaitForLSN(gxact->prepare_end_lsn); records.tail = records.head = NULL; records.num_chunks = 0; @@ -1315,6 +1232,57 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } + +/* + * Reads 2PC data from xlog. During checkpoint this data will be moved to + * twophase files and ReadTwoPhaseFile should be used instead. + * + * Note clearly that this function accesses WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. It does not run during + * crash recovery or standby processing. + */ +static void +XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) +{ + XLogRecord *record; + XLogReaderState *xlogreader; + char *errormsg; + + Assert(!RecoveryInProgress()); + + xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor."))); + + record = XLogReadRecord(xlogreader, lsn, &errormsg); + if (record == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read two-phase state from xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (XLogRecGetRmid(xlogreader) != RM_XACT_ID || + (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("expected two-phase state data is not present in xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (len != NULL) + *len = XLogRecGetDataLen(xlogreader); + + *buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader)); + memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader)); + + XLogReaderFree(xlogreader); +} + + /* * Confirms an xid is prepared, during recovery */ @@ -1375,14 +1343,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit) xid = pgxact->xid; /* - * Read and validate the state file + * Read and validate 2PC state data. + * State data will typically be stored in WAL files if the LSN is after the + * last checkpoint record, or moved to disk if for some reason they have + * lived for a long time. */ - buf = ReadTwoPhaseFile(xid, true); - if (buf == NULL) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("two-phase state file for transaction %u is corrupt", - xid))); + if (gxact->ondisk) + buf = ReadTwoPhaseFile(xid, true); + else + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + /* * Disassemble the header area @@ -1482,9 +1452,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit) AtEOXact_PgStat(isCommit); /* - * And now we can clean up our mess. + * And now we can clean up any files we may have left. */ - RemoveTwoPhaseFile(xid, true); + if (gxact->ondisk) + RemoveTwoPhaseFile(xid, true); RemoveGXact(gxact); MyLockedGxact = NULL; @@ -1493,8 +1464,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) } /* - * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile) - * and call the indicated callbacks for each 2PC record. + * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record. */ static void ProcessRecords(char *bufptr, TransactionId xid, @@ -1539,7 +1509,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) } /* - * Recreates a state file. This is used in WAL replay. + * Recreates a state file. This is used in WAL replay and during + * checkpoint creation. * * Note: content and len don't include CRC. */ @@ -1610,97 +1581,71 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) * This is deliberately run as late as possible in the checkpoint sequence, * because GXACTs ordinarily have short lifespans, and so it is quite * possible that GXACTs that were valid at checkpoint start will no longer - * exist if we wait a little bit. + * exist if we wait a little bit. With typical checkpoint settings this + * will be about 3 minutes for an online checkpoint, so as a result we + * we expect that there will be no GXACTs that need to be copied to disk. * - * If a GXACT remains valid across multiple checkpoints, it'll be fsynced - * each time. This is considered unusual enough that we don't bother to - * expend any extra code to avoid the redundant fsyncs. (They should be - * reasonably cheap anyway, since they won't cause I/O.) + * If a GXACT remains valid across multiple checkpoints, it will already + * be on disk so we don't bother to repeat that write. */ void CheckPointTwoPhase(XLogRecPtr redo_horizon) { - TransactionId *xids; - int nxids; - char path[MAXPGPATH]; int i; + int serialized_xacts = 0; - /* - * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab - * it just long enough to make a list of the XIDs that require fsyncing, - * and then do the I/O afterwards. - * - * This approach creates a race condition: someone else could delete a - * GXACT between the time we release TwoPhaseStateLock and the time we try - * to open its state file. We handle this by special-casing ENOENT - * failures: if we see that, we verify that the GXACT is no longer valid, - * and if so ignore the failure. - */ if (max_prepared_xacts <= 0) return; /* nothing to do */ TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START(); - xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId)); - nxids = 0; - + /* + * We are expecting there to be zero GXACTs that need to be + * copied to disk, so we perform all I/O while holding + * TwoPhaseStateLock for simplicity. This prevents any new xacts + * from preparing while this occurs, which shouldn't be a problem + * since the presence of long-lived prepared xacts indicates the + * transaction manager isn't active. + * + * It's also possible to move I/O out of the lock, but on + * every error we should check whether somebody commited our + * transaction in different backend. Let's leave this optimisation + * for future, if somebody will spot that this place cause + * bottleneck. + * + * Note that it isn't possible for there to be a GXACT with + * a prepare_end_lsn set prior to the last checkpoint yet + * is marked invalid, because of the efforts with delayChkpt. + */ LWLockAcquire(TwoPhaseStateLock, LW_SHARED); - for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; if (gxact->valid && - gxact->prepare_lsn <= redo_horizon) - xids[nxids++] = pgxact->xid; - } + !gxact->ondisk && + gxact->prepare_end_lsn <= redo_horizon) + { + char *buf; + int len; + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); + RecreateTwoPhaseFile(pgxact->xid, buf, len); + gxact->ondisk = true; + pfree(buf); + serialized_xacts++; + } + } LWLockRelease(TwoPhaseStateLock); - for (i = 0; i < nxids; i++) - { - TransactionId xid = xids[i]; - int fd; - - TwoPhaseFilePath(path, xid); - - fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0); - if (fd < 0) - { - if (errno == ENOENT) - { - /* OK if gxact is no longer valid */ - if (!TransactionIdIsPrepared(xid)) - continue; - /* Restore errno in case it was changed */ - errno = ENOENT; - } - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open two-phase state file \"%s\": %m", - path))); - } - - if (pg_fsync(fd) != 0) - { - CloseTransientFile(fd); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not fsync two-phase state file \"%s\": %m", - path))); - } - - if (CloseTransientFile(fd) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close two-phase state file \"%s\": %m", - path))); - } - - pfree(xids); - TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE(); + + if (log_checkpoints && serialized_xacts > 0) + ereport(LOG, + (errmsg("%u two-phase state files were written " + "for long-running prepared transactions", + serialized_xacts))); } /* @@ -2029,17 +1974,11 @@ RecoverPreparedTransactions(void) /* * Recreate its GXACT and dummy PGPROC - * - * Note: since we don't have the PREPARE record's WAL location at - * hand, we leave prepare_lsn zeroes. This means the GXACT will - * be fsync'd on every future checkpoint. We assume this - * situation is infrequent enough that the performance cost is - * negligible (especially since we know the state file has already - * been fsynced). */ gxact = MarkAsPreparing(xid, hdr->gid, hdr->prepared_at, hdr->owner, hdr->database); + gxact->ondisk = true; GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7d5d493cdc..a2846c41b5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -321,8 +321,7 @@ static TimeLineID curFileTLI; * stored here. The parallel leader advances its own copy, when necessary, * in WaitForParallelWorkersToFinish. */ -static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; - +XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 3de337a207..ecd30ce3ce 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -86,6 +86,7 @@ typedef enum RECOVERY_TARGET_IMMEDIATE } RecoveryTargetType; +extern XLogRecPtr ProcLastRecPtr; extern XLogRecPtr XactLastRecEnd; extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;