Adjust time qual checking code so that we always check TransactionIdIsInProgress

before we check commit/abort status.  Formerly this was done in some paths
but not all, with the result that a transaction might be considered
committed for some purposes before it became committed for others.
Per example found by Jan Wieck.
This commit is contained in:
Tom Lane 2005-05-07 21:22:01 +00:00
parent 4cd4ed0cc2
commit b72e5fa17b
1 changed files with 97 additions and 84 deletions

View File

@ -11,12 +11,28 @@
* "hint" status bits if we see that the inserting or deleting transaction * "hint" status bits if we see that the inserting or deleting transaction
* has now committed or aborted. * has now committed or aborted.
* *
* NOTE: must check TransactionIdIsInProgress (which looks in PGPROC array)
* before TransactionIdDidCommit/TransactionIdDidAbort (which look in
* pg_clog). Otherwise we have a race condition: we might decide that a
* just-committed transaction crashed, because none of the tests succeed.
* xact.c is careful to record commit/abort in pg_clog before it unsets
* MyProc->xid in PGPROC array. That fixes that problem, but it also
* means there is a window where TransactionIdIsInProgress and
* TransactionIdDidCommit will both return true. If we check only
* TransactionIdDidCommit, we could consider a tuple committed when a
* later GetSnapshotData call will still think the originating transaction
* is in progress, which leads to application-level inconsistency. The
* upshot is that we gotta check TransactionIdIsInProgress first in all
* code paths, except for a few cases where we are looking at
* subtransactions of our own main transaction and so there can't be any
* race condition.
*
* *
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.87 2005/04/28 21:47:16 tgl Exp $ * $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.88 2005/05/07 21:22:01 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -147,20 +163,20 @@ HeapTupleSatisfiesItself(HeapTupleHeader tuple, Buffer buffer)
return false; return false;
} }
else if (!TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple))) else if (TransactionIdIsInProgress(HeapTupleHeaderGetXmin(tuple)))
{
if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(tuple)))
{
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
}
return false; return false;
} else if (TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple)))
else
{ {
tuple->t_infomask |= HEAP_XMIN_COMMITTED; tuple->t_infomask |= HEAP_XMIN_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer); SetBufferCommitInfoNeedsSave(buffer);
} }
else
{
/* it must have aborted or crashed */
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
return false;
}
} }
/* by here, the inserting transaction has committed */ /* by here, the inserting transaction has committed */
@ -189,13 +205,14 @@ HeapTupleSatisfiesItself(HeapTupleHeader tuple, Buffer buffer)
return false; return false;
} }
if (TransactionIdIsInProgress(HeapTupleHeaderGetXmax(tuple)))
return true;
if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple))) if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple)))
{ {
if (TransactionIdDidAbort(HeapTupleHeaderGetXmax(tuple))) /* it must have aborted or crashed */
{ tuple->t_infomask |= HEAP_XMAX_INVALID;
tuple->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(buffer);
SetBufferCommitInfoNeedsSave(buffer);
}
return true; return true;
} }
@ -330,20 +347,20 @@ HeapTupleSatisfiesNow(HeapTupleHeader tuple, Buffer buffer)
else else
return false; /* deleted before scan started */ return false; /* deleted before scan started */
} }
else if (!TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple))) else if (TransactionIdIsInProgress(HeapTupleHeaderGetXmin(tuple)))
{
if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(tuple)))
{
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
}
return false; return false;
} else if (TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple)))
else
{ {
tuple->t_infomask |= HEAP_XMIN_COMMITTED; tuple->t_infomask |= HEAP_XMIN_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer); SetBufferCommitInfoNeedsSave(buffer);
} }
else
{
/* it must have aborted or crashed */
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
return false;
}
} }
/* by here, the inserting transaction has committed */ /* by here, the inserting transaction has committed */
@ -375,13 +392,14 @@ HeapTupleSatisfiesNow(HeapTupleHeader tuple, Buffer buffer)
return false; /* deleted before scan started */ return false; /* deleted before scan started */
} }
if (TransactionIdIsInProgress(HeapTupleHeaderGetXmax(tuple)))
return true;
if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple))) if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple)))
{ {
if (TransactionIdDidAbort(HeapTupleHeaderGetXmax(tuple))) /* it must have aborted or crashed */
{ tuple->t_infomask |= HEAP_XMAX_INVALID;
tuple->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(buffer);
SetBufferCommitInfoNeedsSave(buffer);
}
return true; return true;
} }
@ -569,20 +587,20 @@ HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid,
return HeapTupleInvisible; /* updated before scan return HeapTupleInvisible; /* updated before scan
* started */ * started */
} }
else if (!TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple))) else if (TransactionIdIsInProgress(HeapTupleHeaderGetXmin(tuple)))
{
if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(tuple)))
{
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
}
return HeapTupleInvisible; return HeapTupleInvisible;
} else if (TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple)))
else
{ {
tuple->t_infomask |= HEAP_XMIN_COMMITTED; tuple->t_infomask |= HEAP_XMIN_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer); SetBufferCommitInfoNeedsSave(buffer);
} }
else
{
/* it must have aborted or crashed */
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
return HeapTupleInvisible;
}
} }
/* by here, the inserting transaction has committed */ /* by here, the inserting transaction has committed */
@ -620,16 +638,15 @@ HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid,
return HeapTupleInvisible; /* updated before scan started */ return HeapTupleInvisible; /* updated before scan started */
} }
if (TransactionIdIsInProgress(HeapTupleHeaderGetXmax(tuple)))
return HeapTupleBeingUpdated;
if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple))) if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple)))
{ {
if (TransactionIdDidAbort(HeapTupleHeaderGetXmax(tuple))) /* it must have aborted or crashed */
{ tuple->t_infomask |= HEAP_XMAX_INVALID;
tuple->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(buffer);
SetBufferCommitInfoNeedsSave(buffer); return HeapTupleMayBeUpdated;
return HeapTupleMayBeUpdated;
}
/* running xact */
return HeapTupleBeingUpdated; /* in updation by other */
} }
/* xmax transaction committed */ /* xmax transaction committed */
@ -735,23 +752,24 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple, Buffer buffer)
return false; return false;
} }
else if (!TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple))) else if (TransactionIdIsInProgress(HeapTupleHeaderGetXmin(tuple)))
{ {
if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(tuple)))
{
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
return false;
}
SnapshotDirty->xmin = HeapTupleHeaderGetXmin(tuple); SnapshotDirty->xmin = HeapTupleHeaderGetXmin(tuple);
/* XXX shouldn't we fall through to look at xmax? */ /* XXX shouldn't we fall through to look at xmax? */
return true; /* in insertion by other */ return true; /* in insertion by other */
} }
else else if (TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple)))
{ {
tuple->t_infomask |= HEAP_XMIN_COMMITTED; tuple->t_infomask |= HEAP_XMIN_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer); SetBufferCommitInfoNeedsSave(buffer);
} }
else
{
/* it must have aborted or crashed */
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
return false;
}
} }
/* by here, the inserting transaction has committed */ /* by here, the inserting transaction has committed */
@ -781,17 +799,18 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple, Buffer buffer)
return false; return false;
} }
if (TransactionIdIsInProgress(HeapTupleHeaderGetXmax(tuple)))
{
SnapshotDirty->xmax = HeapTupleHeaderGetXmax(tuple);
return true;
}
if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple))) if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple)))
{ {
if (TransactionIdDidAbort(HeapTupleHeaderGetXmax(tuple))) /* it must have aborted or crashed */
{ tuple->t_infomask |= HEAP_XMAX_INVALID;
tuple->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(buffer);
SetBufferCommitInfoNeedsSave(buffer); return true;
return true;
}
/* running xact */
SnapshotDirty->xmax = HeapTupleHeaderGetXmax(tuple);
return true; /* in updation by other */
} }
/* xmax transaction committed */ /* xmax transaction committed */
@ -907,20 +926,20 @@ HeapTupleSatisfiesSnapshot(HeapTupleHeader tuple, Snapshot snapshot,
else else
return false; /* deleted before scan started */ return false; /* deleted before scan started */
} }
else if (!TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple))) else if (TransactionIdIsInProgress(HeapTupleHeaderGetXmin(tuple)))
{
if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(tuple)))
{
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
}
return false; return false;
} else if (TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple)))
else
{ {
tuple->t_infomask |= HEAP_XMIN_COMMITTED; tuple->t_infomask |= HEAP_XMIN_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer); SetBufferCommitInfoNeedsSave(buffer);
} }
else
{
/* it must have aborted or crashed */
tuple->t_infomask |= HEAP_XMIN_INVALID;
SetBufferCommitInfoNeedsSave(buffer);
return false;
}
} }
/* /*
@ -982,13 +1001,14 @@ HeapTupleSatisfiesSnapshot(HeapTupleHeader tuple, Snapshot snapshot,
return false; /* deleted before scan started */ return false; /* deleted before scan started */
} }
if (TransactionIdIsInProgress(HeapTupleHeaderGetXmax(tuple)))
return true;
if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple))) if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple)))
{ {
if (TransactionIdDidAbort(HeapTupleHeaderGetXmax(tuple))) /* it must have aborted or crashed */
{ tuple->t_infomask |= HEAP_XMAX_INVALID;
tuple->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(buffer);
SetBufferCommitInfoNeedsSave(buffer);
}
return true; return true;
} }
@ -1057,13 +1077,6 @@ HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
* *
* If the inserting transaction aborted, then the tuple was never visible * If the inserting transaction aborted, then the tuple was never visible
* to any other transaction, so we can delete it immediately. * to any other transaction, so we can delete it immediately.
*
* NOTE: must check TransactionIdIsInProgress (which looks in PROC array)
* before TransactionIdDidCommit/TransactionIdDidAbort (which look in
* pg_clog). Otherwise we have a race condition where we might decide
* that a just-committed transaction crashed, because none of the
* tests succeed. xact.c is careful to record commit/abort in pg_clog
* before it unsets MyProc->xid in PROC array.
*/ */
if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED)) if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
{ {