Use FullTransactionId for the transaction stack.

Provide GetTopFullTransactionId() and GetCurrentFullTransactionId().
The intended users of these interfaces are access methods that use
xids for visibility checks but don't want to have to go back and
"freeze" existing references some time later before the 32 bit xid
counter wraps around.

Use a new struct to serialize the transaction state for parallel
query, because FullTransactionId doesn't fit into the previous
serialization scheme very well.

Author: Thomas Munro
Reviewed-by: Heikki Linnakangas
Discussion: https://postgr.es/m/CAA4eK1%2BMv%2Bmb0HFfWM9Srtc6MVe160WFurXV68iAFMcagRZ0dQ%40mail.gmail.com
This commit is contained in:
Thomas Munro 2019-03-28 10:59:19 +13:00
parent 2fc7af5e96
commit ad308058cc
5 changed files with 170 additions and 81 deletions

View File

@ -35,7 +35,8 @@ VariableCache ShmemVariableCache = NULL;
/*
* Allocate the next XID for a new transaction or subtransaction.
* Allocate the next FullTransactionId for a new transaction or
* subtransaction.
*
* The new XID is also stored into MyPgXact before returning.
*
@ -44,9 +45,10 @@ VariableCache ShmemVariableCache = NULL;
* does something. So it is safe to do a database lookup if we want to
* issue a warning about XID wrap.
*/
TransactionId
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
FullTransactionId full_xid;
TransactionId xid;
/*
@ -64,7 +66,7 @@ GetNewTransactionId(bool isSubXact)
{
Assert(!isSubXact);
MyPgXact->xid = BootstrapTransactionId;
return BootstrapTransactionId;
return FullTransactionIdFromEpochAndXid(0, BootstrapTransactionId);
}
/* safety check, we should never get this far in a HS standby */
@ -73,7 +75,8 @@ GetNewTransactionId(bool isSubXact)
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
full_xid = ShmemVariableCache->nextFullXid;
xid = XidFromFullTransactionId(full_xid);
/*----------
* Check to see if it's safe to assign another XID. This protects against
@ -232,7 +235,7 @@ GetNewTransactionId(bool isSubXact)
LWLockRelease(XidGenLock);
return xid;
return full_xid;
}
/*

View File

@ -105,7 +105,7 @@ int synchronous_commit = SYNCHRONOUS_COMMIT_ON;
* The XIDs are stored sorted in numerical order (not logical order) to make
* lookups as fast as possible.
*/
TransactionId XactTopTransactionId = InvalidTransactionId;
FullTransactionId XactTopFullTransactionId = {InvalidTransactionId};
int nParallelCurrentXids = 0;
TransactionId *ParallelCurrentXids;
@ -171,7 +171,7 @@ typedef enum TBlockState
*/
typedef struct TransactionStateData
{
TransactionId transactionId; /* my XID, or Invalid if none */
FullTransactionId fullTransactionId; /* my FullTransactionId */
SubTransactionId subTransactionId; /* my subxact ID */
char *name; /* savepoint name, if any */
int savepointLevel; /* savepoint level */
@ -196,6 +196,25 @@ typedef struct TransactionStateData
typedef TransactionStateData *TransactionState;
/*
* Serialized representation used to transmit transaction state to parallel
* workers though shared memory.
*/
typedef struct SerializedTransactionState
{
int xactIsoLevel;
bool xactDeferrable;
FullTransactionId topFullTransactionId;
FullTransactionId currentFullTransactionId;
CommandId currentCommandId;
int nParallelCurrentXids;
TransactionId parallelCurrentXids[FLEXIBLE_ARRAY_MEMBER];
} SerializedTransactionState;
/* The size of SerializedTransactionState, not including the final array. */
#define SerializedTransactionStateHeaderSize \
offsetof(SerializedTransactionState, parallelCurrentXids)
/*
* CurrentTransactionState always points to the current transaction state
* block. It will point to TopTransactionStateData when not in a
@ -372,9 +391,9 @@ IsAbortedTransactionBlockState(void)
TransactionId
GetTopTransactionId(void)
{
if (!TransactionIdIsValid(XactTopTransactionId))
if (!FullTransactionIdIsValid(XactTopFullTransactionId))
AssignTransactionId(&TopTransactionStateData);
return XactTopTransactionId;
return XidFromFullTransactionId(XactTopFullTransactionId);
}
/*
@ -387,7 +406,7 @@ GetTopTransactionId(void)
TransactionId
GetTopTransactionIdIfAny(void)
{
return XactTopTransactionId;
return XidFromFullTransactionId(XactTopFullTransactionId);
}
/*
@ -402,9 +421,9 @@ GetCurrentTransactionId(void)
{
TransactionState s = CurrentTransactionState;
if (!TransactionIdIsValid(s->transactionId))
if (!FullTransactionIdIsValid(s->fullTransactionId))
AssignTransactionId(s);
return s->transactionId;
return XidFromFullTransactionId(s->fullTransactionId);
}
/*
@ -417,7 +436,66 @@ GetCurrentTransactionId(void)
TransactionId
GetCurrentTransactionIdIfAny(void)
{
return CurrentTransactionState->transactionId;
return XidFromFullTransactionId(CurrentTransactionState->fullTransactionId);
}
/*
* GetTopFullTransactionId
*
* This will return the FullTransactionId of the main transaction, assigning
* one if it's not yet set. Be careful to call this only inside a valid xact.
*/
FullTransactionId
GetTopFullTransactionId(void)
{
if (!FullTransactionIdIsValid(XactTopFullTransactionId))
AssignTransactionId(&TopTransactionStateData);
return XactTopFullTransactionId;
}
/*
* GetTopFullTransactionIdIfAny
*
* This will return the FullTransactionId of the main transaction, if one is
* assigned. It will return InvalidFullTransactionId if we are not currently
* inside a transaction, or inside a transaction that hasn't yet been assigned
* one.
*/
FullTransactionId
GetTopFullTransactionIdIfAny(void)
{
return XactTopFullTransactionId;
}
/*
* GetCurrentFullTransactionId
*
* This will return the FullTransactionId of the current transaction (main or
* sub transaction), assigning one if it's not yet set. Be careful to call
* this only inside a valid xact.
*/
FullTransactionId
GetCurrentFullTransactionId(void)
{
TransactionState s = CurrentTransactionState;
if (!FullTransactionIdIsValid(s->fullTransactionId))
AssignTransactionId(s);
return s->fullTransactionId;
}
/*
* GetCurrentFullTransactionIdIfAny
*
* This will return the FullTransactionId of the current sub xact, if one is
* assigned. It will return InvalidFullTransactionId if we are not currently
* inside a transaction, or inside a transaction that hasn't been assigned one
* yet.
*/
FullTransactionId
GetCurrentFullTransactionIdIfAny(void)
{
return CurrentTransactionState->fullTransactionId;
}
/*
@ -428,7 +506,7 @@ GetCurrentTransactionIdIfAny(void)
void
MarkCurrentTransactionIdLoggedIfAny(void)
{
if (TransactionIdIsValid(CurrentTransactionState->transactionId))
if (FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId))
CurrentTransactionState->didLogXid = true;
}
@ -463,7 +541,7 @@ GetStableLatestTransactionId(void)
/*
* AssignTransactionId
*
* Assigns a new permanent XID to the given TransactionState.
* Assigns a new permanent FullTransactionId to the given TransactionState.
* We do not assign XIDs to transactions until/unless this is called.
* Also, any parent TransactionStates that don't yet have XIDs are assigned
* one; this maintains the invariant that a child transaction has an XID
@ -477,7 +555,7 @@ AssignTransactionId(TransactionState s)
bool log_unknown_top = false;
/* Assert that caller didn't screw up */
Assert(!TransactionIdIsValid(s->transactionId));
Assert(!FullTransactionIdIsValid(s->fullTransactionId));
Assert(s->state == TRANS_INPROGRESS);
/*
@ -493,14 +571,14 @@ AssignTransactionId(TransactionState s)
* if we're at the bottom of a huge stack of subtransactions none of which
* have XIDs yet.
*/
if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
if (isSubXact && !FullTransactionIdIsValid(s->parent->fullTransactionId))
{
TransactionState p = s->parent;
TransactionState *parents;
size_t parentOffset = 0;
parents = palloc(sizeof(TransactionState) * s->nestingLevel);
while (p != NULL && !TransactionIdIsValid(p->transactionId))
while (p != NULL && !FullTransactionIdIsValid(p->fullTransactionId))
{
parents[parentOffset++] = p;
p = p->parent;
@ -531,26 +609,28 @@ AssignTransactionId(TransactionState s)
log_unknown_top = true;
/*
* Generate a new Xid and record it in PG_PROC and pg_subtrans.
* Generate a new FullTransactionId and record its xid in PG_PROC and
* pg_subtrans.
*
* NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
* shared storage other than PG_PROC; because if there's no room for it in
* PG_PROC, the subtrans entry is needed to ensure that other backends see
* the Xid as "running". See GetNewTransactionId.
*/
s->transactionId = GetNewTransactionId(isSubXact);
s->fullTransactionId = GetNewTransactionId(isSubXact);
if (!isSubXact)
XactTopTransactionId = s->transactionId;
XactTopFullTransactionId = s->fullTransactionId;
if (isSubXact)
SubTransSetParent(s->transactionId, s->parent->transactionId);
SubTransSetParent(XidFromFullTransactionId(s->fullTransactionId),
XidFromFullTransactionId(s->parent->fullTransactionId));
/*
* If it's a top-level transaction, the predicate locking system needs to
* be told about it too.
*/
if (!isSubXact)
RegisterPredicateLockingXid(s->transactionId);
RegisterPredicateLockingXid(XidFromFullTransactionId(s->fullTransactionId));
/*
* Acquire lock on the transaction XID. (We assume this cannot block.) We
@ -560,7 +640,7 @@ AssignTransactionId(TransactionState s)
currentOwner = CurrentResourceOwner;
CurrentResourceOwner = s->curTransactionOwner;
XactLockTableInsert(s->transactionId);
XactLockTableInsert(XidFromFullTransactionId(s->fullTransactionId));
CurrentResourceOwner = currentOwner;
@ -584,7 +664,7 @@ AssignTransactionId(TransactionState s)
*/
if (isSubXact && XLogStandbyInfoActive())
{
unreportedXids[nUnreportedXids] = s->transactionId;
unreportedXids[nUnreportedXids] = XidFromFullTransactionId(s->fullTransactionId);
nUnreportedXids++;
/*
@ -832,9 +912,9 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
if (s->state == TRANS_ABORT)
continue;
if (!TransactionIdIsValid(s->transactionId))
if (!FullTransactionIdIsValid(s->fullTransactionId))
continue; /* it can't have any child XIDs either */
if (TransactionIdEquals(xid, s->transactionId))
if (TransactionIdEquals(xid, XidFromFullTransactionId(s->fullTransactionId)))
return true;
/* As the childXids array is ordered, we can use binary search */
low = 0;
@ -1495,7 +1575,7 @@ AtSubCommit_childXids(void)
* all XIDs already in the array belong to subtransactions started and
* subcommitted before us, so their XIDs must precede ours.
*/
s->parent->childXids[s->parent->nChildXids] = s->transactionId;
s->parent->childXids[s->parent->nChildXids] = XidFromFullTransactionId(s->fullTransactionId);
if (s->nChildXids > 0)
memcpy(&s->parent->childXids[s->parent->nChildXids + 1],
@ -1809,7 +1889,7 @@ StartTransaction(void)
s = &TopTransactionStateData;
CurrentTransactionState = s;
Assert(XactTopTransactionId == InvalidTransactionId);
Assert(!FullTransactionIdIsValid(XactTopFullTransactionId));
/* check the current transaction state */
Assert(s->state == TRANS_DEFAULT);
@ -1821,7 +1901,7 @@ StartTransaction(void)
* flags are fetched below.
*/
s->state = TRANS_START;
s->transactionId = InvalidTransactionId; /* until assigned */
s->fullTransactionId = InvalidFullTransactionId; /* until assigned */
/*
* initialize current transaction state fields
@ -2165,7 +2245,7 @@ CommitTransaction(void)
AtCommit_Memory();
s->transactionId = InvalidTransactionId;
s->fullTransactionId = InvalidFullTransactionId;
s->subTransactionId = InvalidSubTransactionId;
s->nestingLevel = 0;
s->gucNestLevel = 0;
@ -2173,7 +2253,7 @@ CommitTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
XactTopTransactionId = InvalidTransactionId;
XactTopFullTransactionId = InvalidFullTransactionId;
nParallelCurrentXids = 0;
/*
@ -2448,7 +2528,7 @@ PrepareTransaction(void)
AtCommit_Memory();
s->transactionId = InvalidTransactionId;
s->fullTransactionId = InvalidFullTransactionId;
s->subTransactionId = InvalidSubTransactionId;
s->nestingLevel = 0;
s->gucNestLevel = 0;
@ -2456,7 +2536,7 @@ PrepareTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
XactTopTransactionId = InvalidTransactionId;
XactTopFullTransactionId = InvalidFullTransactionId;
nParallelCurrentXids = 0;
/*
@ -2686,7 +2766,7 @@ CleanupTransaction(void)
AtCleanup_Memory(); /* and transaction memory */
s->transactionId = InvalidTransactionId;
s->fullTransactionId = InvalidFullTransactionId;
s->subTransactionId = InvalidSubTransactionId;
s->nestingLevel = 0;
s->gucNestLevel = 0;
@ -2695,7 +2775,7 @@ CleanupTransaction(void)
s->maxChildXids = 0;
s->parallelModeLevel = 0;
XactTopTransactionId = InvalidTransactionId;
XactTopFullTransactionId = InvalidFullTransactionId;
nParallelCurrentXids = 0;
/*
@ -4693,7 +4773,7 @@ CommitSubTransaction(void)
*/
/* Post-commit cleanup */
if (TransactionIdIsValid(s->transactionId))
if (FullTransactionIdIsValid(s->fullTransactionId))
AtSubCommit_childXids();
AfterTriggerEndSubXact(true);
AtSubCommit_Portals(s->subTransactionId,
@ -4718,8 +4798,8 @@ CommitSubTransaction(void)
* The only lock we actually release here is the subtransaction XID lock.
*/
CurrentResourceOwner = s->curTransactionOwner;
if (TransactionIdIsValid(s->transactionId))
XactLockTableDelete(s->transactionId);
if (FullTransactionIdIsValid(s->fullTransactionId))
XactLockTableDelete(XidFromFullTransactionId(s->fullTransactionId));
/*
* Other locks should get transferred to their parent resource owner.
@ -4872,7 +4952,7 @@ AbortSubTransaction(void)
(void) RecordTransactionAbort(true);
/* Post-abort cleanup */
if (TransactionIdIsValid(s->transactionId))
if (FullTransactionIdIsValid(s->fullTransactionId))
AtSubAbort_childXids();
CallSubXactCallbacks(SUBXACT_EVENT_ABORT_SUB, s->subTransactionId,
@ -4985,7 +5065,7 @@ PushTransaction(void)
* We can now stack a minimally valid subtransaction without fear of
* failure.
*/
s->transactionId = InvalidTransactionId; /* until assigned */
s->fullTransactionId = InvalidFullTransactionId; /* until assigned */
s->subTransactionId = currentSubTransactionId;
s->parent = p;
s->nestingLevel = p->nestingLevel + 1;
@ -5052,18 +5132,17 @@ Size
EstimateTransactionStateSpace(void)
{
TransactionState s;
Size nxids = 6; /* iso level, deferrable, top & current XID,
* command counter, XID count */
Size nxids = 0;
Size size = SerializedTransactionStateHeaderSize;
for (s = CurrentTransactionState; s != NULL; s = s->parent)
{
if (TransactionIdIsValid(s->transactionId))
if (FullTransactionIdIsValid(s->fullTransactionId))
nxids = add_size(nxids, 1);
nxids = add_size(nxids, s->nChildXids);
}
nxids = add_size(nxids, nParallelCurrentXids);
return mul_size(nxids, sizeof(TransactionId));
return add_size(size, sizeof(SerializedTransactionState) * nxids);
}
/*
@ -5072,14 +5151,10 @@ EstimateTransactionStateSpace(void)
* needed by a parallel worker.
*
* We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs
* associated with this transaction. The first eight bytes of the result
* contain XactDeferrable and XactIsoLevel; the next twelve bytes contain the
* XID of the top-level transaction, the XID of the current transaction
* (or, in each case, InvalidTransactionId if none), and the current command
* counter. After that, the next 4 bytes contain a count of how many
* additional XIDs follow; this is followed by all of those XIDs one after
* another. We emit the XIDs in sorted order for the convenience of the
* receiving process.
* associated with this transaction. These are serialized into a
* caller-supplied buffer big enough to hold the number of bytes reported by
* EstimateTransactionStateSpace(). We emit the XIDs in sorted order for the
* convenience of the receiving process.
*/
void
SerializeTransactionState(Size maxsize, char *start_address)
@ -5087,16 +5162,17 @@ SerializeTransactionState(Size maxsize, char *start_address)
TransactionState s;
Size nxids = 0;
Size i = 0;
Size c = 0;
TransactionId *workspace;
TransactionId *result = (TransactionId *) start_address;
SerializedTransactionState *result;
result[c++] = (TransactionId) XactIsoLevel;
result[c++] = (TransactionId) XactDeferrable;
result[c++] = XactTopTransactionId;
result[c++] = CurrentTransactionState->transactionId;
result[c++] = (TransactionId) currentCommandId;
Assert(maxsize >= c * sizeof(TransactionId));
result = (SerializedTransactionState *) start_address;
result->xactIsoLevel = XactIsoLevel;
result->xactDeferrable = XactDeferrable;
result->topFullTransactionId = XactTopFullTransactionId;
result->currentFullTransactionId =
CurrentTransactionState->fullTransactionId;
result->currentCommandId = currentCommandId;
/*
* If we're running in a parallel worker and launching a parallel worker
@ -5105,9 +5181,8 @@ SerializeTransactionState(Size maxsize, char *start_address)
*/
if (nParallelCurrentXids > 0)
{
result[c++] = nParallelCurrentXids;
Assert(maxsize >= (nParallelCurrentXids + c) * sizeof(TransactionId));
memcpy(&result[c], ParallelCurrentXids,
result->nParallelCurrentXids = nParallelCurrentXids;
memcpy(&result->parallelCurrentXids[0], ParallelCurrentXids,
nParallelCurrentXids * sizeof(TransactionId));
return;
}
@ -5118,18 +5193,19 @@ SerializeTransactionState(Size maxsize, char *start_address)
*/
for (s = CurrentTransactionState; s != NULL; s = s->parent)
{
if (TransactionIdIsValid(s->transactionId))
if (FullTransactionIdIsValid(s->fullTransactionId))
nxids = add_size(nxids, 1);
nxids = add_size(nxids, s->nChildXids);
}
Assert((c + 1 + nxids) * sizeof(TransactionId) <= maxsize);
Assert(SerializedTransactionStateHeaderSize + nxids * sizeof(TransactionId)
<= maxsize);
/* Copy them to our scratch space. */
workspace = palloc(nxids * sizeof(TransactionId));
for (s = CurrentTransactionState; s != NULL; s = s->parent)
{
if (TransactionIdIsValid(s->transactionId))
workspace[i++] = s->transactionId;
if (FullTransactionIdIsValid(s->fullTransactionId))
workspace[i++] = XidFromFullTransactionId(s->fullTransactionId);
memcpy(&workspace[i], s->childXids,
s->nChildXids * sizeof(TransactionId));
i += s->nChildXids;
@ -5140,8 +5216,9 @@ SerializeTransactionState(Size maxsize, char *start_address)
qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
/* Copy data into output area. */
result[c++] = (TransactionId) nxids;
memcpy(&result[c], workspace, nxids * sizeof(TransactionId));
result->nParallelCurrentXids = nxids;
memcpy(&result->parallelCurrentXids[0], workspace,
nxids * sizeof(TransactionId));
}
/*
@ -5152,18 +5229,20 @@ SerializeTransactionState(Size maxsize, char *start_address)
void
StartParallelWorkerTransaction(char *tstatespace)
{
TransactionId *tstate = (TransactionId *) tstatespace;
SerializedTransactionState *tstate;
Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT);
StartTransaction();
XactIsoLevel = (int) tstate[0];
XactDeferrable = (bool) tstate[1];
XactTopTransactionId = tstate[2];
CurrentTransactionState->transactionId = tstate[3];
currentCommandId = tstate[4];
nParallelCurrentXids = (int) tstate[5];
ParallelCurrentXids = &tstate[6];
tstate = (SerializedTransactionState *) tstatespace;
XactIsoLevel = tstate->xactIsoLevel;
XactDeferrable = tstate->xactDeferrable;
XactTopFullTransactionId = tstate->topFullTransactionId;
CurrentTransactionState->fullTransactionId =
tstate->currentFullTransactionId;
currentCommandId = tstate->currentCommandId;
nParallelCurrentXids = tstate->nParallelCurrentXids;
ParallelCurrentXids = &tstate->parallelCurrentXids[0];
CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
}
@ -5222,7 +5301,7 @@ ShowTransactionStateRec(const char *str, TransactionState s)
PointerIsValid(s->name) ? s->name : "unnamed",
BlockStateAsString(s->blockState),
TransStateAsString(s->state),
(unsigned int) s->transactionId,
(unsigned int) XidFromFullTransactionId(s->fullTransactionId),
(unsigned int) s->subTransactionId,
(unsigned int) currentCommandId,
currentCommandIdUsed ? " (used)" : "",

View File

@ -49,6 +49,7 @@
#define U64FromFullTransactionId(x) ((x).value)
#define FullTransactionIdPrecedes(a, b) ((a).value < (b).value)
#define FullTransactionIdIsValid(x) TransactionIdIsValid(XidFromFullTransactionId(x))
#define InvalidFullTransactionId FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
/*
* A 64 bit value that contains an epoch and a TransactionId. This is
@ -221,7 +222,7 @@ extern TransactionId TransactionIdLatest(TransactionId mainxid,
extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
/* in transam/varsup.c */
extern TransactionId GetNewTransactionId(bool isSubXact);
extern FullTransactionId GetNewTransactionId(bool isSubXact);
extern void AdvanceNextFullTransactionIdPastXid(TransactionId xid);
extern FullTransactionId ReadNextFullTransactionId(void);
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,

View File

@ -14,6 +14,7 @@
#ifndef XACT_H
#define XACT_H
#include "access/transam.h"
#include "access/xlogreader.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
@ -355,6 +356,10 @@ extern TransactionId GetCurrentTransactionId(void);
extern TransactionId GetCurrentTransactionIdIfAny(void);
extern TransactionId GetStableLatestTransactionId(void);
extern SubTransactionId GetCurrentSubTransactionId(void);
extern FullTransactionId GetTopFullTransactionId(void);
extern FullTransactionId GetTopFullTransactionIdIfAny(void);
extern FullTransactionId GetCurrentFullTransactionId(void);
extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
extern void MarkCurrentTransactionIdLoggedIfAny(void);
extern bool SubTransactionIsActive(SubTransactionId subxid);
extern CommandId GetCurrentCommandId(bool used);

View File

@ -2107,6 +2107,7 @@ SeqTableData
SerCommitSeqNo
SerializedReindexState
SerializedSnapshotData
SerializedTransactionState
Session
SessionBackupState
SetConstraintState