Fix race condition leading to hanging logical slot creation.

The snapshot assembly during the creation of logical slots relied
waiting for transactions in xl_running_xacts to end, by checking for
their commit/abort records.  Unfortunately, despite locking, it is
possible to see an xl_running_xact record listing transactions as
ready, that have already WAL-logged an commit/abort record, as the
locking just prevents the ProcArray to be adjusted, and the commit
record has to be logged first.

That lead to either delayed or hanging snapshot creation, because
snapbuild.c would wait "forever" to see commit/abort records for some
transactions.  That hang resolved only if a xl_running_xacts record
without any running transactions happened to be logged, far from
certain on a busy server.

It's impractical to prevent that via more heavyweight locking, the
likelihood of deadlocks and significantly increased contention would
be too big.

Instead change the initial snapshot creation to be solely based on
tracking the oldest running transaction via
xl_running_xacts->oldestRunningXid - that actually ends up
significantly simplifying the code.  That has two disadvantages:
1) Because we cannot fully "trust" the contents of xl_running_xacts,
   we cannot use it to build the initial snapshot.  Instead we have to
   wait twice for all running transactions to finish.
2) Previously a slot, unless the race occurred, could be created when
   the all transaction perceived as running based on commit/abort
   records, now we have to wait for the next xl_running_xacts record.
To address that, trigger logging new xl_running_xacts record from
within snapbuild.c exactly when necessary.

Unfortunately snabuild.c's SnapBuild is stored on disk, one of the
stupider ideas of a certain Mr Freund, so we can't change it in a
minor release.  As this is going to be backpatched, we have to hack
around a bit to keep on-disk compatibility.  A later commit will
rejigger that on master.

Author: Andres Freund, based on a quite different patch from Petr Jelinek
Analyzed-By: Petr Jelinek
Reviewed-By: Petr Jelinek
Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com
Backpatch: 9.4-, where logical decoding has been introduced
This commit is contained in:
Andres Freund 2017-05-13 14:21:00 -07:00
parent 9aab83fc50
commit 955a684e04
6 changed files with 220 additions and 249 deletions

View File

@ -1,21 +1,30 @@
Parsed test spec with 3 sessions
starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
starting permutation: s2b s2txid s1init s3b s3txid s2alter s2c s2b s2txid s3c s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
step s2b: BEGIN;
step s2txid: SELECT txid_current() IS NULL;
?column?
f
step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
step s3b: BEGIN;
step s3txid: SELECT txid_current() IS NULL;
?column?
f
step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
step s2c: COMMIT;
step s2b: BEGIN;
step s2txid: SELECT txid_current() IS NULL;
?column?
f
step s3c: COMMIT;
step s1init: <... completed>
?column?
init
step s2c: COMMIT;
step s1insert: INSERT INTO do_write DEFAULT VALUES;
step s1checkpoint: CHECKPOINT;
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');

View File

@ -24,7 +24,8 @@ step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
session "s2"
setup { SET synchronous_commit=on; }
step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
step "s2b" { BEGIN; }
step "s2txid" { SELECT txid_current() IS NULL; }
step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
step "s2c" { COMMIT; }
@ -32,7 +33,8 @@ step "s2c" { COMMIT; }
session "s3"
setup { SET synchronous_commit=on; }
step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
step "s3b" { BEGIN; }
step "s3txid" { SELECT txid_current() IS NULL; }
step "s3c" { COMMIT; }
# Force usage of ondisk snapshot by starting and not finishing a
@ -40,4 +42,4 @@ step "s3c" { COMMIT; }
# reached. In combination with a checkpoint forcing a snapshot to be
# written and a new restart point computed that'll lead to the usage
# of the snapshot.
permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"

View File

@ -622,9 +622,6 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
int i;
SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
parsed->nsubxacts, parsed->subxacts);
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],

View File

@ -1725,7 +1725,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
{
elog(DEBUG1, "aborting old transaction %u", txn->xid);
elog(DEBUG2, "aborting old transaction %u", txn->xid);
/* remove potential on-disk data, and deallocate this tx */
ReorderBufferCleanupTXN(rb, txn);

View File

@ -56,23 +56,34 @@
*
*
* The snapbuild machinery is starting up in several stages, as illustrated
* by the following graph:
* by the following graph describing the SnapBuild->state transitions:
*
* +-------------------------+
* +----|SNAPBUILD_START |-------------+
* +----| START |-------------+
* | +-------------------------+ |
* | | |
* | | |
* | running_xacts with running xacts |
* | running_xacts #1 |
* | | |
* | | |
* | v |
* | +-------------------------+ v
* | |SNAPBUILD_FULL_SNAPSHOT |------------>|
* | | BUILDING_SNAPSHOT |------------>|
* | +-------------------------+ |
* | | |
* | | |
* | running_xacts #2, xacts from #1 finished |
* | | |
* | | |
* | v |
* | +-------------------------+ v
* | | FULL_SNAPSHOT |------------>|
* | +-------------------------+ |
* | | |
* running_xacts | saved snapshot
* with zero xacts | at running_xacts's lsn
* | | |
* | all running toplevel TXNs finished |
* | running_xacts with xacts from #2 finished |
* | | |
* | v |
* | +-------------------------+ |
@ -83,7 +94,7 @@
* record is read that is sufficiently new (above the safe xmin horizon),
* there's a state transition. If there were no running xacts when the
* running_xacts record was generated, we'll directly go into CONSISTENT
* state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
* state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
* snapshot means that all transactions that start henceforth can be decoded
* in their entirety, but transactions that started previously can't. In
* FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
@ -184,26 +195,24 @@ struct SnapBuild
ReorderBuffer *reorder;
/*
* Information about initially running transactions
*
* When we start building a snapshot there already may be transactions in
* progress. Those are stored in running.xip. We don't have enough
* information about those to decode their contents, so until they are
* finished (xcnt=0) we cannot switch to a CONSISTENT state.
* Outdated: This struct isn't used for its original purpose anymore, but
* can't be removed / changed in a minor version, because it's stored
* on-disk.
*/
struct
{
/*
* As long as running.xcnt all XIDs < running.xmin and > running.xmax
* have to be checked whether they still are running.
* NB: This field is misused, until a major version can break on-disk
* compatibility. See SnapBuildNextPhaseAt() /
* SnapBuildStartNextPhaseAt().
*/
TransactionId xmin;
TransactionId xmax;
TransactionId was_xmin;
TransactionId was_xmax;
size_t xcnt; /* number of used xip entries */
size_t xcnt_space; /* allocated size of xip */
TransactionId *xip; /* running xacts array, xidComparator-sorted */
} running;
size_t was_xcnt; /* number of used xip entries */
size_t was_xcnt_space; /* allocated size of xip */
TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
} was_running;
/*
* Array of transactions which could have catalog changes that committed
@ -249,12 +258,6 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
/* transaction state manipulation functions */
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
/* ->running manipulation */
static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
/* ->committed manipulation */
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
@ -269,11 +272,39 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr
/* xlog reading helper functions for SnapBuildProcessRecord */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
/* serialization functions */
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
/*
* Return TransactionId after which the next phase of initial snapshot
* building will happen.
*/
static inline TransactionId
SnapBuildNextPhaseAt(SnapBuild *builder)
{
/*
* For backward compatibility reasons this has to be stored in the wrongly
* named field. Will be fixed in next major version.
*/
return builder->was_running.was_xmax;
}
/*
* Set TransactionId after which the next phase of initial snapshot building
* will happen.
*/
static inline void
SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
{
/*
* For backward compatibility reasons this has to be stored in the wrongly
* named field. Will be fixed in next major version.
*/
builder->was_running.was_xmax = at;
}
/*
* Allocate a new snapshot builder.
@ -700,7 +731,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
* we got into the SNAPBUILD_FULL_SNAPSHOT state.
*/
if (builder->state < SNAPBUILD_CONSISTENT &&
SnapBuildTxnIsRunning(builder, xid))
TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
return false;
/*
@ -768,38 +799,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
}
/*
* Check whether `xid` is currently 'running'.
*
* Running transactions in our parlance are transactions which we didn't
* observe from the start so we can't properly decode their contents. They
* only exist after we freshly started from an < CONSISTENT snapshot.
*/
static bool
SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
{
Assert(builder->state < SNAPBUILD_CONSISTENT);
Assert(TransactionIdIsNormal(builder->running.xmin));
Assert(TransactionIdIsNormal(builder->running.xmax));
if (builder->running.xcnt &&
NormalTransactionIdFollows(xid, builder->running.xmin) &&
NormalTransactionIdPrecedes(xid, builder->running.xmax))
{
TransactionId *search =
bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
sizeof(TransactionId), xidComparator);
if (search != NULL)
{
Assert(*search == xid);
return true;
}
}
return false;
}
/*
* Add a new Snapshot to all transactions we're decoding that currently are
* in-progress so they can see new catalog contents made by the transaction
@ -921,63 +920,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
pfree(workspace);
}
/*
* Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
* keeping track of the amount of running transactions.
*/
static void
SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
{
if (builder->state == SNAPBUILD_CONSISTENT)
return;
/*
* NB: This handles subtransactions correctly even if we started from
* suboverflowed xl_running_xacts because we only keep track of toplevel
* transactions. Since the latter are always allocated before their
* subxids and since they end at the same time it's sufficient to deal
* with them here.
*/
if (SnapBuildTxnIsRunning(builder, xid))
{
Assert(builder->running.xcnt > 0);
if (!--builder->running.xcnt)
{
/*
* None of the originally running transaction is running anymore,
* so our incrementally built snapshot now is consistent.
*/
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("Transaction ID %u finished; no more running transactions.",
xid)));
builder->state = SNAPBUILD_CONSISTENT;
}
}
}
/*
* Abort a transaction, throw away all state we kept.
*/
void
SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid,
int nsubxacts, TransactionId *subxacts)
{
int i;
for (i = 0; i < nsubxacts; i++)
{
TransactionId subxid = subxacts[i];
SnapBuildEndTxn(builder, lsn, subxid);
}
SnapBuildEndTxn(builder, lsn, xid);
}
/*
* Handle everything that needs to be done when a transaction commits
*/
@ -1021,11 +963,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
{
TransactionId subxid = subxacts[nxact];
/*
* make sure txn is not tracked in running txn's anymore, switch state
*/
SnapBuildEndTxn(builder, lsn, subxid);
/*
* If we're forcing timetravel we also need visibility information
* about subtransaction, so keep track of subtransaction's state.
@ -1055,12 +992,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
}
/*
* Make sure toplevel txn is not tracked in running txn's anymore, switch
* state to consistent if possible.
*/
SnapBuildEndTxn(builder, lsn, xid);
if (forced_timetravel)
{
elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
@ -1250,25 +1181,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
*
* a) There were no running transactions when the xl_running_xacts record
* was inserted, jump to CONSISTENT immediately. We might find such a
* state we were waiting for b) or c).
* state while waiting on c)'s sub-states.
*
* b) Wait for all toplevel transactions that were running to end. We
* simply track the number of in-progress toplevel transactions and
* lower it whenever one commits or aborts. When that number
* (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
* to CONSISTENT.
* NB: We need to search running.xip when seeing a transaction's end to
* make sure it's a toplevel transaction and it's been one of the
* initially running ones.
* Interestingly, in contrast to HS, this allows us not to care about
* subtransactions - and by extension suboverflowed xl_running_xacts -
* at all.
*
* c) This (in a previous run) or another decoding slot serialized a
* b) This (in a previous run) or another decoding slot serialized a
* snapshot to disk that we can use. Can't use this method for the
* initial snapshot when slot is being created and needs full snapshot
* for export or direct use, as that snapshot will only contain catalog
* modifying transactions.
*
* c) First incrementally build a snapshot for catalog tuples
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
* transactions to finish. Every transaction starting after that
* (FULL_SNAPSHOT state), has enough information to be decoded. But
* for older running transactions no viable snapshot exists yet, so
* CONSISTENT will only be reached once all of those have finished.
* ---
*/
@ -1285,16 +1211,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
(uint32) (lsn >> 32), (uint32) lsn),
errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
builder->initial_xmin_horizon, running->oldestRunningXid)));
SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
return true;
}
/*
* a) No transaction were running, we can jump to consistent.
*
* This is not affected by races around xl_running_xacts, because we can
* miss transaction commits, but currently not transactions starting.
*
* NB: We might have already started to incrementally assemble a snapshot,
* so we need to be careful to deal with that.
*/
if (running->xcnt == 0)
if (running->oldestRunningXid == running->nextXid)
{
if (builder->start_decoding_at == InvalidXLogRecPtr ||
builder->start_decoding_at <= lsn)
@ -1309,12 +1242,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
/* no transactions running now */
builder->running.xcnt = 0;
builder->running.xmin = InvalidTransactionId;
builder->running.xmax = InvalidTransactionId;
builder->state = SNAPBUILD_CONSISTENT;
SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
@ -1323,30 +1252,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
return false;
}
/* c) valid on disk state and not building full snapshot */
/* b) valid on disk state and not building full snapshot */
else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{
/* there won't be any state to cleanup */
return false;
}
/*
* b) first encounter of a useable xl_running_xacts record. If we had
* found one earlier we would either track running transactions (i.e.
* builder->running.xcnt != 0) or be consistent (this function wouldn't
* get called).
* c) transition from START to BUILDING_SNAPSHOT.
*
* In START state, and a xl_running_xacts record with running xacts is
* encountered. In that case, switch to BUILDING_SNAPSHOT state, and
* record xl_running_xacts->nextXid. Once all running xacts have finished
* (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
* might look that we could use xl_running_xact's ->xids information to
* get there quicker, but that is problematic because transactions marked
* as running, might already have inserted their commit record - it's
* infeasible to change that with locking.
*/
else if (!builder->running.xcnt)
else if (builder->state == SNAPBUILD_START)
{
int off;
/*
* We only care about toplevel xids as those are the ones we
* definitely see in the wal stream. As snapbuild.c tracks committed
* instead of running transactions we don't need to know anything
* about uncommitted subtransactions.
*/
builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
SnapBuildStartNextPhaseAt(builder, running->nextXid);
/*
* Start with an xmin/xmax that's correct for future, when all the
@ -1360,59 +1288,57 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmin));
Assert(TransactionIdIsNormal(builder->xmax));
builder->running.xcnt = running->xcnt;
builder->running.xcnt_space = running->xcnt;
builder->running.xip =
MemoryContextAlloc(builder->context,
builder->running.xcnt * sizeof(TransactionId));
memcpy(builder->running.xip, running->xids,
builder->running.xcnt * sizeof(TransactionId));
/* sort so we can do a binary search */
qsort(builder->running.xip, builder->running.xcnt,
sizeof(TransactionId), xidComparator);
builder->running.xmin = builder->running.xip[0];
builder->running.xmax = builder->running.xip[running->xcnt - 1];
/* makes comparisons cheaper later */
TransactionIdRetreat(builder->running.xmin);
TransactionIdAdvance(builder->running.xmax);
builder->state = SNAPBUILD_FULL_SNAPSHOT;
ereport(LOG,
(errmsg("logical decoding found initial starting point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail_plural("%u transaction needs to finish.",
"%u transactions need to finish.",
builder->running.xcnt,
(uint32) builder->running.xcnt)));
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
running->xcnt, running->nextXid)));
/*
* Iterate through all xids, wait for them to finish.
*
* This isn't required for the correctness of decoding, but to allow
* isolationtester to notice that we're currently waiting for
* something.
*/
for (off = 0; off < builder->running.xcnt; off++)
{
TransactionId xid = builder->running.xip[off];
SnapBuildWaitSnapshot(running, running->nextXid);
}
/*
* c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
*
* In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
* is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
* means all transactions starting afterwards have enough information to
* be decoded. Switch to FULL_SNAPSHOT.
*/
else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
running->oldestRunningXid))
{
builder->state = SNAPBUILD_FULL_SNAPSHOT;
SnapBuildStartNextPhaseAt(builder, running->nextXid);
/*
* Upper layers should prevent that we ever need to wait on
* ourselves. Check anyway, since failing to do so would either
* result in an endless wait or an Assert() failure.
*/
if (TransactionIdIsCurrentTransactionId(xid))
elog(ERROR, "waiting for ourselves");
ereport(LOG,
(errmsg("logical decoding found initial consistent point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
running->xcnt, running->nextXid)));
XactLockTableWait(xid, NULL, NULL, XLTW_None);
}
SnapBuildWaitSnapshot(running, running->nextXid);
}
/*
* c) transition from FULL_SNAPSHOT to CONSISTENT.
*
* In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
* oldestRunningXid is >= than nextXid from when we switched to
* FULL_SNAPSHOT. This means all transactions that are currently in
* progress have a catalog snapshot, and all their changes have been
* collected. Switch to CONSISTENT.
*/
else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
running->oldestRunningXid))
{
builder->state = SNAPBUILD_CONSISTENT;
SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
/* nothing could have built up so far, so don't perform cleanup */
return false;
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("There are no old transactions anymore.")));
}
/*
@ -1421,8 +1347,54 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* records so incremental cleanup can be performed.
*/
return true;
}
/* ---
* Iterate through xids in record, wait for all older than the cutoff to
* finish. Then, if possible, log a new xl_running_xacts record.
*
* This isn't required for the correctness of decoding, but to:
* a) allow isolationtester to notice that we're currently waiting for
* something.
* b) log a new xl_running_xacts record where it'd be helpful, without having
* to write for bgwriter or checkpointer.
* ---
*/
static void
SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
{
int off;
for (off = 0; off < running->xcnt; off++)
{
TransactionId xid = running->xids[off];
/*
* Upper layers should prevent that we ever need to wait on
* ourselves. Check anyway, since failing to do so would either
* result in an endless wait or an Assert() failure.
*/
if (TransactionIdIsCurrentTransactionId(xid))
elog(ERROR, "waiting for ourselves");
if (TransactionIdFollows(xid, cutoff))
continue;
XactLockTableWait(xid, NULL, NULL, XLTW_None);
}
/*
* All transactions we needed to finish finished - try to ensure there is
* another xl_running_xacts record in a timely manner, without having to
* write for bgwriter or checkpointer to log one. During recovery we
* can't enforce that, so we'll have to wait.
*/
if (!RecoveryInProgress())
{
LogStandbySnapshot();
}
}
/* -----------------------------------
* Snapshot serialization support
@ -1572,7 +1544,6 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
errmsg("could not remove file \"%s\": %m", path)));
needed_length = sizeof(SnapBuildOnDisk) +
sizeof(TransactionId) * builder->running.xcnt_space +
sizeof(TransactionId) * builder->committed.xcnt;
ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
@ -1591,18 +1562,14 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
ondisk->builder.context = NULL;
ondisk->builder.snapshot = NULL;
ondisk->builder.reorder = NULL;
ondisk->builder.running.xip = NULL;
ondisk->builder.committed.xip = NULL;
COMP_CRC32C(ondisk->checksum,
&ondisk->builder,
sizeof(SnapBuild));
/* copy running xacts */
sz = sizeof(TransactionId) * builder->running.xcnt_space;
memcpy(ondisk_c, builder->running.xip, sz);
COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
/* there shouldn't be any running xacts */
Assert(builder->was_running.was_xcnt == 0);
/* copy committed xacts */
sz = sizeof(TransactionId) * builder->committed.xcnt;
@ -1762,11 +1729,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
/* restore running xacts information */
sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
/* restore running xacts (dead, but kept for backward compat) */
sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
ondisk.builder.was_running.was_xip =
MemoryContextAllocZero(builder->context, sz);
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, ondisk.builder.running.xip, sz);
readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
pgstat_report_wait_end();
if (readBytes != sz)
{
@ -1776,7 +1744,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
errmsg("could not read file \"%s\", read %d of %d: %m",
path, readBytes, (int) sz)));
}
COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
/* restore committed xacts information */
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
@ -1842,12 +1810,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
ondisk.builder.committed.xip = NULL;
builder->running.xcnt = ondisk.builder.running.xcnt;
if (builder->running.xip)
pfree(builder->running.xip);
builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
builder->running.xip = ondisk.builder.running.xip;
/* our snapshot is not interesting anymore, build a new one */
if (builder->snapshot != NULL)
{
@ -1867,8 +1829,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
return true;
snapshot_not_interesting:
if (ondisk.builder.running.xip != NULL)
pfree(ondisk.builder.running.xip);
if (ondisk.builder.committed.xip != NULL)
pfree(ondisk.builder.committed.xip);
return false;

View File

@ -20,24 +20,30 @@ typedef enum
/*
* Initial state, we can't do much yet.
*/
SNAPBUILD_START,
SNAPBUILD_START = -1,
/*
* Collecting committed transactions, to build the initial catalog
* snapshot.
*/
SNAPBUILD_BUILDING_SNAPSHOT = 0,
/*
* We have collected enough information to decode tuples in transactions
* that started after this.
*
* Once we reached this we start to collect changes. We cannot apply them
* yet because the might be based on transactions that were still running
* when we reached them yet.
* yet, because they might be based on transactions that were still running
* when FULL_SNAPSHOT was reached.
*/
SNAPBUILD_FULL_SNAPSHOT,
SNAPBUILD_FULL_SNAPSHOT = 1,
/*
* Found a point after hitting built_full_snapshot where all transactions
* that were running at that point finished. Till we reach that we hold
* off calling any commit callbacks.
* Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
* were running at that point finished. Till we reach that we hold off
* calling any commit callbacks.
*/
SNAPBUILD_CONSISTENT
SNAPBUILD_CONSISTENT = 2
} SnapBuildState;
/* forward declare so we don't have to expose the struct to the public */
@ -73,9 +79,6 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,
TransactionId *subxacts);
extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,
TransactionId *subxacts);
extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
XLogRecPtr lsn);
extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,