|
|
|
@ -56,23 +56,34 @@
|
|
|
|
|
*
|
|
|
|
|
*
|
|
|
|
|
* The snapbuild machinery is starting up in several stages, as illustrated
|
|
|
|
|
* by the following graph:
|
|
|
|
|
* by the following graph describing the SnapBuild->state transitions:
|
|
|
|
|
*
|
|
|
|
|
* +-------------------------+
|
|
|
|
|
* +----|SNAPBUILD_START |-------------+
|
|
|
|
|
* +----| START |-------------+
|
|
|
|
|
* | +-------------------------+ |
|
|
|
|
|
* | | |
|
|
|
|
|
* | | |
|
|
|
|
|
* | running_xacts with running xacts |
|
|
|
|
|
* | running_xacts #1 |
|
|
|
|
|
* | | |
|
|
|
|
|
* | | |
|
|
|
|
|
* | v |
|
|
|
|
|
* | +-------------------------+ v
|
|
|
|
|
* | |SNAPBUILD_FULL_SNAPSHOT |------------>|
|
|
|
|
|
* | | BUILDING_SNAPSHOT |------------>|
|
|
|
|
|
* | +-------------------------+ |
|
|
|
|
|
* | | |
|
|
|
|
|
* | | |
|
|
|
|
|
* | running_xacts #2, xacts from #1 finished |
|
|
|
|
|
* | | |
|
|
|
|
|
* | | |
|
|
|
|
|
* | v |
|
|
|
|
|
* | +-------------------------+ v
|
|
|
|
|
* | | FULL_SNAPSHOT |------------>|
|
|
|
|
|
* | +-------------------------+ |
|
|
|
|
|
* | | |
|
|
|
|
|
* running_xacts | saved snapshot
|
|
|
|
|
* with zero xacts | at running_xacts's lsn
|
|
|
|
|
* | | |
|
|
|
|
|
* | all running toplevel TXNs finished |
|
|
|
|
|
* | running_xacts with xacts from #2 finished |
|
|
|
|
|
* | | |
|
|
|
|
|
* | v |
|
|
|
|
|
* | +-------------------------+ |
|
|
|
|
@ -83,7 +94,7 @@
|
|
|
|
|
* record is read that is sufficiently new (above the safe xmin horizon),
|
|
|
|
|
* there's a state transition. If there were no running xacts when the
|
|
|
|
|
* running_xacts record was generated, we'll directly go into CONSISTENT
|
|
|
|
|
* state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full
|
|
|
|
|
* state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
|
|
|
|
|
* snapshot means that all transactions that start henceforth can be decoded
|
|
|
|
|
* in their entirety, but transactions that started previously can't. In
|
|
|
|
|
* FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
|
|
|
|
@ -184,26 +195,24 @@ struct SnapBuild
|
|
|
|
|
ReorderBuffer *reorder;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Information about initially running transactions
|
|
|
|
|
*
|
|
|
|
|
* When we start building a snapshot there already may be transactions in
|
|
|
|
|
* progress. Those are stored in running.xip. We don't have enough
|
|
|
|
|
* information about those to decode their contents, so until they are
|
|
|
|
|
* finished (xcnt=0) we cannot switch to a CONSISTENT state.
|
|
|
|
|
* Outdated: This struct isn't used for its original purpose anymore, but
|
|
|
|
|
* can't be removed / changed in a minor version, because it's stored
|
|
|
|
|
* on-disk.
|
|
|
|
|
*/
|
|
|
|
|
struct
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* As long as running.xcnt all XIDs < running.xmin and > running.xmax
|
|
|
|
|
* have to be checked whether they still are running.
|
|
|
|
|
* NB: This field is misused, until a major version can break on-disk
|
|
|
|
|
* compatibility. See SnapBuildNextPhaseAt() /
|
|
|
|
|
* SnapBuildStartNextPhaseAt().
|
|
|
|
|
*/
|
|
|
|
|
TransactionId xmin;
|
|
|
|
|
TransactionId xmax;
|
|
|
|
|
TransactionId was_xmin;
|
|
|
|
|
TransactionId was_xmax;
|
|
|
|
|
|
|
|
|
|
size_t xcnt; /* number of used xip entries */
|
|
|
|
|
size_t xcnt_space; /* allocated size of xip */
|
|
|
|
|
TransactionId *xip; /* running xacts array, xidComparator-sorted */
|
|
|
|
|
} running;
|
|
|
|
|
size_t was_xcnt; /* number of used xip entries */
|
|
|
|
|
size_t was_xcnt_space; /* allocated size of xip */
|
|
|
|
|
TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
|
|
|
|
|
} was_running;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Array of transactions which could have catalog changes that committed
|
|
|
|
@ -249,12 +258,6 @@ struct SnapBuild
|
|
|
|
|
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
|
|
|
|
|
static bool ExportInProgress = false;
|
|
|
|
|
|
|
|
|
|
/* transaction state manipulation functions */
|
|
|
|
|
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
|
|
|
|
|
|
|
|
|
|
/* ->running manipulation */
|
|
|
|
|
static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid);
|
|
|
|
|
|
|
|
|
|
/* ->committed manipulation */
|
|
|
|
|
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
|
|
|
|
|
|
|
|
|
@ -269,11 +272,39 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr
|
|
|
|
|
|
|
|
|
|
/* xlog reading helper functions for SnapBuildProcessRecord */
|
|
|
|
|
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
|
|
|
|
|
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
|
|
|
|
|
|
|
|
|
|
/* serialization functions */
|
|
|
|
|
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
|
|
|
|
|
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Return TransactionId after which the next phase of initial snapshot
|
|
|
|
|
* building will happen.
|
|
|
|
|
*/
|
|
|
|
|
static inline TransactionId
|
|
|
|
|
SnapBuildNextPhaseAt(SnapBuild *builder)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* For backward compatibility reasons this has to be stored in the wrongly
|
|
|
|
|
* named field. Will be fixed in next major version.
|
|
|
|
|
*/
|
|
|
|
|
return builder->was_running.was_xmax;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set TransactionId after which the next phase of initial snapshot building
|
|
|
|
|
* will happen.
|
|
|
|
|
*/
|
|
|
|
|
static inline void
|
|
|
|
|
SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* For backward compatibility reasons this has to be stored in the wrongly
|
|
|
|
|
* named field. Will be fixed in next major version.
|
|
|
|
|
*/
|
|
|
|
|
builder->was_running.was_xmax = at;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Allocate a new snapshot builder.
|
|
|
|
@ -700,7 +731,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
|
|
|
|
|
* we got into the SNAPBUILD_FULL_SNAPSHOT state.
|
|
|
|
|
*/
|
|
|
|
|
if (builder->state < SNAPBUILD_CONSISTENT &&
|
|
|
|
|
SnapBuildTxnIsRunning(builder, xid))
|
|
|
|
|
TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -768,38 +799,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
|
|
|
|
|
ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check whether `xid` is currently 'running'.
|
|
|
|
|
*
|
|
|
|
|
* Running transactions in our parlance are transactions which we didn't
|
|
|
|
|
* observe from the start so we can't properly decode their contents. They
|
|
|
|
|
* only exist after we freshly started from an < CONSISTENT snapshot.
|
|
|
|
|
*/
|
|
|
|
|
static bool
|
|
|
|
|
SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid)
|
|
|
|
|
{
|
|
|
|
|
Assert(builder->state < SNAPBUILD_CONSISTENT);
|
|
|
|
|
Assert(TransactionIdIsNormal(builder->running.xmin));
|
|
|
|
|
Assert(TransactionIdIsNormal(builder->running.xmax));
|
|
|
|
|
|
|
|
|
|
if (builder->running.xcnt &&
|
|
|
|
|
NormalTransactionIdFollows(xid, builder->running.xmin) &&
|
|
|
|
|
NormalTransactionIdPrecedes(xid, builder->running.xmax))
|
|
|
|
|
{
|
|
|
|
|
TransactionId *search =
|
|
|
|
|
bsearch(&xid, builder->running.xip, builder->running.xcnt_space,
|
|
|
|
|
sizeof(TransactionId), xidComparator);
|
|
|
|
|
|
|
|
|
|
if (search != NULL)
|
|
|
|
|
{
|
|
|
|
|
Assert(*search == xid);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Add a new Snapshot to all transactions we're decoding that currently are
|
|
|
|
|
* in-progress so they can see new catalog contents made by the transaction
|
|
|
|
@ -921,63 +920,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
|
|
|
|
|
pfree(workspace);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with
|
|
|
|
|
* keeping track of the amount of running transactions.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
|
|
|
|
|
{
|
|
|
|
|
if (builder->state == SNAPBUILD_CONSISTENT)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* NB: This handles subtransactions correctly even if we started from
|
|
|
|
|
* suboverflowed xl_running_xacts because we only keep track of toplevel
|
|
|
|
|
* transactions. Since the latter are always allocated before their
|
|
|
|
|
* subxids and since they end at the same time it's sufficient to deal
|
|
|
|
|
* with them here.
|
|
|
|
|
*/
|
|
|
|
|
if (SnapBuildTxnIsRunning(builder, xid))
|
|
|
|
|
{
|
|
|
|
|
Assert(builder->running.xcnt > 0);
|
|
|
|
|
|
|
|
|
|
if (!--builder->running.xcnt)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* None of the originally running transaction is running anymore,
|
|
|
|
|
* so our incrementally built snapshot now is consistent.
|
|
|
|
|
*/
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("logical decoding found consistent point at %X/%X",
|
|
|
|
|
(uint32) (lsn >> 32), (uint32) lsn),
|
|
|
|
|
errdetail("Transaction ID %u finished; no more running transactions.",
|
|
|
|
|
xid)));
|
|
|
|
|
builder->state = SNAPBUILD_CONSISTENT;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Abort a transaction, throw away all state we kept.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
|
|
|
|
|
TransactionId xid,
|
|
|
|
|
int nsubxacts, TransactionId *subxacts)
|
|
|
|
|
{
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < nsubxacts; i++)
|
|
|
|
|
{
|
|
|
|
|
TransactionId subxid = subxacts[i];
|
|
|
|
|
|
|
|
|
|
SnapBuildEndTxn(builder, lsn, subxid);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SnapBuildEndTxn(builder, lsn, xid);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Handle everything that needs to be done when a transaction commits
|
|
|
|
|
*/
|
|
|
|
@ -1021,11 +963,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
|
|
|
|
|
{
|
|
|
|
|
TransactionId subxid = subxacts[nxact];
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* make sure txn is not tracked in running txn's anymore, switch state
|
|
|
|
|
*/
|
|
|
|
|
SnapBuildEndTxn(builder, lsn, subxid);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If we're forcing timetravel we also need visibility information
|
|
|
|
|
* about subtransaction, so keep track of subtransaction's state.
|
|
|
|
@ -1055,12 +992,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Make sure toplevel txn is not tracked in running txn's anymore, switch
|
|
|
|
|
* state to consistent if possible.
|
|
|
|
|
*/
|
|
|
|
|
SnapBuildEndTxn(builder, lsn, xid);
|
|
|
|
|
|
|
|
|
|
if (forced_timetravel)
|
|
|
|
|
{
|
|
|
|
|
elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
|
|
|
|
@ -1250,25 +1181,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
|
|
|
|
*
|
|
|
|
|
* a) There were no running transactions when the xl_running_xacts record
|
|
|
|
|
* was inserted, jump to CONSISTENT immediately. We might find such a
|
|
|
|
|
* state we were waiting for b) or c).
|
|
|
|
|
* state while waiting on c)'s sub-states.
|
|
|
|
|
*
|
|
|
|
|
* b) Wait for all toplevel transactions that were running to end. We
|
|
|
|
|
* simply track the number of in-progress toplevel transactions and
|
|
|
|
|
* lower it whenever one commits or aborts. When that number
|
|
|
|
|
* (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
|
|
|
|
|
* to CONSISTENT.
|
|
|
|
|
* NB: We need to search running.xip when seeing a transaction's end to
|
|
|
|
|
* make sure it's a toplevel transaction and it's been one of the
|
|
|
|
|
* initially running ones.
|
|
|
|
|
* Interestingly, in contrast to HS, this allows us not to care about
|
|
|
|
|
* subtransactions - and by extension suboverflowed xl_running_xacts -
|
|
|
|
|
* at all.
|
|
|
|
|
*
|
|
|
|
|
* c) This (in a previous run) or another decoding slot serialized a
|
|
|
|
|
* b) This (in a previous run) or another decoding slot serialized a
|
|
|
|
|
* snapshot to disk that we can use. Can't use this method for the
|
|
|
|
|
* initial snapshot when slot is being created and needs full snapshot
|
|
|
|
|
* for export or direct use, as that snapshot will only contain catalog
|
|
|
|
|
* modifying transactions.
|
|
|
|
|
*
|
|
|
|
|
* c) First incrementally build a snapshot for catalog tuples
|
|
|
|
|
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
|
|
|
|
|
* transactions to finish. Every transaction starting after that
|
|
|
|
|
* (FULL_SNAPSHOT state), has enough information to be decoded. But
|
|
|
|
|
* for older running transactions no viable snapshot exists yet, so
|
|
|
|
|
* CONSISTENT will only be reached once all of those have finished.
|
|
|
|
|
* ---
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
@ -1285,16 +1211,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
|
|
|
|
(uint32) (lsn >> 32), (uint32) lsn),
|
|
|
|
|
errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
|
|
|
|
|
builder->initial_xmin_horizon, running->oldestRunningXid)));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* a) No transaction were running, we can jump to consistent.
|
|
|
|
|
*
|
|
|
|
|
* This is not affected by races around xl_running_xacts, because we can
|
|
|
|
|
* miss transaction commits, but currently not transactions starting.
|
|
|
|
|
*
|
|
|
|
|
* NB: We might have already started to incrementally assemble a snapshot,
|
|
|
|
|
* so we need to be careful to deal with that.
|
|
|
|
|
*/
|
|
|
|
|
if (running->xcnt == 0)
|
|
|
|
|
if (running->oldestRunningXid == running->nextXid)
|
|
|
|
|
{
|
|
|
|
|
if (builder->start_decoding_at == InvalidXLogRecPtr ||
|
|
|
|
|
builder->start_decoding_at <= lsn)
|
|
|
|
@ -1309,12 +1242,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
|
|
|
|
Assert(TransactionIdIsNormal(builder->xmin));
|
|
|
|
|
Assert(TransactionIdIsNormal(builder->xmax));
|
|
|
|
|
|
|
|
|
|
/* no transactions running now */
|
|
|
|
|
builder->running.xcnt = 0;
|
|
|
|
|
builder->running.xmin = InvalidTransactionId;
|
|
|
|
|
builder->running.xmax = InvalidTransactionId;
|
|
|
|
|
|
|
|
|
|
builder->state = SNAPBUILD_CONSISTENT;
|
|
|
|
|
SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
|
|
|
|
|
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("logical decoding found consistent point at %X/%X",
|
|
|
|
@ -1323,30 +1252,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
/* c) valid on disk state and not building full snapshot */
|
|
|
|
|
/* b) valid on disk state and not building full snapshot */
|
|
|
|
|
else if (!builder->building_full_snapshot &&
|
|
|
|
|
SnapBuildRestore(builder, lsn))
|
|
|
|
|
{
|
|
|
|
|
/* there won't be any state to cleanup */
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* b) first encounter of a useable xl_running_xacts record. If we had
|
|
|
|
|
* found one earlier we would either track running transactions (i.e.
|
|
|
|
|
* builder->running.xcnt != 0) or be consistent (this function wouldn't
|
|
|
|
|
* get called).
|
|
|
|
|
* c) transition from START to BUILDING_SNAPSHOT.
|
|
|
|
|
*
|
|
|
|
|
* In START state, and a xl_running_xacts record with running xacts is
|
|
|
|
|
* encountered. In that case, switch to BUILDING_SNAPSHOT state, and
|
|
|
|
|
* record xl_running_xacts->nextXid. Once all running xacts have finished
|
|
|
|
|
* (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
|
|
|
|
|
* might look that we could use xl_running_xact's ->xids information to
|
|
|
|
|
* get there quicker, but that is problematic because transactions marked
|
|
|
|
|
* as running, might already have inserted their commit record - it's
|
|
|
|
|
* infeasible to change that with locking.
|
|
|
|
|
*/
|
|
|
|
|
else if (!builder->running.xcnt)
|
|
|
|
|
else if (builder->state == SNAPBUILD_START)
|
|
|
|
|
{
|
|
|
|
|
int off;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We only care about toplevel xids as those are the ones we
|
|
|
|
|
* definitely see in the wal stream. As snapbuild.c tracks committed
|
|
|
|
|
* instead of running transactions we don't need to know anything
|
|
|
|
|
* about uncommitted subtransactions.
|
|
|
|
|
*/
|
|
|
|
|
builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
|
|
|
|
|
SnapBuildStartNextPhaseAt(builder, running->nextXid);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Start with an xmin/xmax that's correct for future, when all the
|
|
|
|
@ -1360,59 +1288,57 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
|
|
|
|
Assert(TransactionIdIsNormal(builder->xmin));
|
|
|
|
|
Assert(TransactionIdIsNormal(builder->xmax));
|
|
|
|
|
|
|
|
|
|
builder->running.xcnt = running->xcnt;
|
|
|
|
|
builder->running.xcnt_space = running->xcnt;
|
|
|
|
|
builder->running.xip =
|
|
|
|
|
MemoryContextAlloc(builder->context,
|
|
|
|
|
builder->running.xcnt * sizeof(TransactionId));
|
|
|
|
|
memcpy(builder->running.xip, running->xids,
|
|
|
|
|
builder->running.xcnt * sizeof(TransactionId));
|
|
|
|
|
|
|
|
|
|
/* sort so we can do a binary search */
|
|
|
|
|
qsort(builder->running.xip, builder->running.xcnt,
|
|
|
|
|
sizeof(TransactionId), xidComparator);
|
|
|
|
|
|
|
|
|
|
builder->running.xmin = builder->running.xip[0];
|
|
|
|
|
builder->running.xmax = builder->running.xip[running->xcnt - 1];
|
|
|
|
|
|
|
|
|
|
/* makes comparisons cheaper later */
|
|
|
|
|
TransactionIdRetreat(builder->running.xmin);
|
|
|
|
|
TransactionIdAdvance(builder->running.xmax);
|
|
|
|
|
|
|
|
|
|
builder->state = SNAPBUILD_FULL_SNAPSHOT;
|
|
|
|
|
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("logical decoding found initial starting point at %X/%X",
|
|
|
|
|
(uint32) (lsn >> 32), (uint32) lsn),
|
|
|
|
|
errdetail_plural("%u transaction needs to finish.",
|
|
|
|
|
"%u transactions need to finish.",
|
|
|
|
|
builder->running.xcnt,
|
|
|
|
|
(uint32) builder->running.xcnt)));
|
|
|
|
|
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
|
|
|
|
|
running->xcnt, running->nextXid)));
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Iterate through all xids, wait for them to finish.
|
|
|
|
|
*
|
|
|
|
|
* This isn't required for the correctness of decoding, but to allow
|
|
|
|
|
* isolationtester to notice that we're currently waiting for
|
|
|
|
|
* something.
|
|
|
|
|
*/
|
|
|
|
|
for (off = 0; off < builder->running.xcnt; off++)
|
|
|
|
|
{
|
|
|
|
|
TransactionId xid = builder->running.xip[off];
|
|
|
|
|
SnapBuildWaitSnapshot(running, running->nextXid);
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
* c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
|
|
|
|
|
*
|
|
|
|
|
* In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
|
|
|
|
|
* is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
|
|
|
|
|
* means all transactions starting afterwards have enough information to
|
|
|
|
|
* be decoded. Switch to FULL_SNAPSHOT.
|
|
|
|
|
*/
|
|
|
|
|
else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
|
|
|
|
|
TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
|
|
|
|
|
running->oldestRunningXid))
|
|
|
|
|
{
|
|
|
|
|
builder->state = SNAPBUILD_FULL_SNAPSHOT;
|
|
|
|
|
SnapBuildStartNextPhaseAt(builder, running->nextXid);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Upper layers should prevent that we ever need to wait on
|
|
|
|
|
* ourselves. Check anyway, since failing to do so would either
|
|
|
|
|
* result in an endless wait or an Assert() failure.
|
|
|
|
|
*/
|
|
|
|
|
if (TransactionIdIsCurrentTransactionId(xid))
|
|
|
|
|
elog(ERROR, "waiting for ourselves");
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("logical decoding found initial consistent point at %X/%X",
|
|
|
|
|
(uint32) (lsn >> 32), (uint32) lsn),
|
|
|
|
|
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
|
|
|
|
|
running->xcnt, running->nextXid)));
|
|
|
|
|
|
|
|
|
|
XactLockTableWait(xid, NULL, NULL, XLTW_None);
|
|
|
|
|
}
|
|
|
|
|
SnapBuildWaitSnapshot(running, running->nextXid);
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
* c) transition from FULL_SNAPSHOT to CONSISTENT.
|
|
|
|
|
*
|
|
|
|
|
* In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
|
|
|
|
|
* oldestRunningXid is >= than nextXid from when we switched to
|
|
|
|
|
* FULL_SNAPSHOT. This means all transactions that are currently in
|
|
|
|
|
* progress have a catalog snapshot, and all their changes have been
|
|
|
|
|
* collected. Switch to CONSISTENT.
|
|
|
|
|
*/
|
|
|
|
|
else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
|
|
|
|
|
TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
|
|
|
|
|
running->oldestRunningXid))
|
|
|
|
|
{
|
|
|
|
|
builder->state = SNAPBUILD_CONSISTENT;
|
|
|
|
|
SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
|
|
|
|
|
|
|
|
|
|
/* nothing could have built up so far, so don't perform cleanup */
|
|
|
|
|
return false;
|
|
|
|
|
ereport(LOG,
|
|
|
|
|
(errmsg("logical decoding found consistent point at %X/%X",
|
|
|
|
|
(uint32) (lsn >> 32), (uint32) lsn),
|
|
|
|
|
errdetail("There are no old transactions anymore.")));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -1421,8 +1347,54 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
|
|
|
|
* records so incremental cleanup can be performed.
|
|
|
|
|
*/
|
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* ---
|
|
|
|
|
* Iterate through xids in record, wait for all older than the cutoff to
|
|
|
|
|
* finish. Then, if possible, log a new xl_running_xacts record.
|
|
|
|
|
*
|
|
|
|
|
* This isn't required for the correctness of decoding, but to:
|
|
|
|
|
* a) allow isolationtester to notice that we're currently waiting for
|
|
|
|
|
* something.
|
|
|
|
|
* b) log a new xl_running_xacts record where it'd be helpful, without having
|
|
|
|
|
* to write for bgwriter or checkpointer.
|
|
|
|
|
* ---
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
|
|
|
|
|
{
|
|
|
|
|
int off;
|
|
|
|
|
|
|
|
|
|
for (off = 0; off < running->xcnt; off++)
|
|
|
|
|
{
|
|
|
|
|
TransactionId xid = running->xids[off];
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Upper layers should prevent that we ever need to wait on
|
|
|
|
|
* ourselves. Check anyway, since failing to do so would either
|
|
|
|
|
* result in an endless wait or an Assert() failure.
|
|
|
|
|
*/
|
|
|
|
|
if (TransactionIdIsCurrentTransactionId(xid))
|
|
|
|
|
elog(ERROR, "waiting for ourselves");
|
|
|
|
|
|
|
|
|
|
if (TransactionIdFollows(xid, cutoff))
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
XactLockTableWait(xid, NULL, NULL, XLTW_None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* All transactions we needed to finish finished - try to ensure there is
|
|
|
|
|
* another xl_running_xacts record in a timely manner, without having to
|
|
|
|
|
* write for bgwriter or checkpointer to log one. During recovery we
|
|
|
|
|
* can't enforce that, so we'll have to wait.
|
|
|
|
|
*/
|
|
|
|
|
if (!RecoveryInProgress())
|
|
|
|
|
{
|
|
|
|
|
LogStandbySnapshot();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* -----------------------------------
|
|
|
|
|
* Snapshot serialization support
|
|
|
|
@ -1572,7 +1544,6 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|
|
|
|
errmsg("could not remove file \"%s\": %m", path)));
|
|
|
|
|
|
|
|
|
|
needed_length = sizeof(SnapBuildOnDisk) +
|
|
|
|
|
sizeof(TransactionId) * builder->running.xcnt_space +
|
|
|
|
|
sizeof(TransactionId) * builder->committed.xcnt;
|
|
|
|
|
|
|
|
|
|
ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
|
|
|
|
@ -1591,18 +1562,14 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|
|
|
|
ondisk->builder.context = NULL;
|
|
|
|
|
ondisk->builder.snapshot = NULL;
|
|
|
|
|
ondisk->builder.reorder = NULL;
|
|
|
|
|
ondisk->builder.running.xip = NULL;
|
|
|
|
|
ondisk->builder.committed.xip = NULL;
|
|
|
|
|
|
|
|
|
|
COMP_CRC32C(ondisk->checksum,
|
|
|
|
|
&ondisk->builder,
|
|
|
|
|
sizeof(SnapBuild));
|
|
|
|
|
|
|
|
|
|
/* copy running xacts */
|
|
|
|
|
sz = sizeof(TransactionId) * builder->running.xcnt_space;
|
|
|
|
|
memcpy(ondisk_c, builder->running.xip, sz);
|
|
|
|
|
COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
|
|
|
|
|
ondisk_c += sz;
|
|
|
|
|
/* there shouldn't be any running xacts */
|
|
|
|
|
Assert(builder->was_running.was_xcnt == 0);
|
|
|
|
|
|
|
|
|
|
/* copy committed xacts */
|
|
|
|
|
sz = sizeof(TransactionId) * builder->committed.xcnt;
|
|
|
|
@ -1762,11 +1729,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
|
|
|
|
}
|
|
|
|
|
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
|
|
|
|
|
|
|
|
|
|
/* restore running xacts information */
|
|
|
|
|
sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
|
|
|
|
|
ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
|
|
|
|
|
/* restore running xacts (dead, but kept for backward compat) */
|
|
|
|
|
sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
|
|
|
|
|
ondisk.builder.was_running.was_xip =
|
|
|
|
|
MemoryContextAllocZero(builder->context, sz);
|
|
|
|
|
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
|
|
|
|
|
readBytes = read(fd, ondisk.builder.running.xip, sz);
|
|
|
|
|
readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
|
|
|
|
|
pgstat_report_wait_end();
|
|
|
|
|
if (readBytes != sz)
|
|
|
|
|
{
|
|
|
|
@ -1776,7 +1744,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
|
|
|
|
errmsg("could not read file \"%s\", read %d of %d: %m",
|
|
|
|
|
path, readBytes, (int) sz)));
|
|
|
|
|
}
|
|
|
|
|
COMP_CRC32C(checksum, ondisk.builder.running.xip, sz);
|
|
|
|
|
COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
|
|
|
|
|
|
|
|
|
|
/* restore committed xacts information */
|
|
|
|
|
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
|
|
|
|
@ -1842,12 +1810,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
|
|
|
|
}
|
|
|
|
|
ondisk.builder.committed.xip = NULL;
|
|
|
|
|
|
|
|
|
|
builder->running.xcnt = ondisk.builder.running.xcnt;
|
|
|
|
|
if (builder->running.xip)
|
|
|
|
|
pfree(builder->running.xip);
|
|
|
|
|
builder->running.xcnt_space = ondisk.builder.running.xcnt_space;
|
|
|
|
|
builder->running.xip = ondisk.builder.running.xip;
|
|
|
|
|
|
|
|
|
|
/* our snapshot is not interesting anymore, build a new one */
|
|
|
|
|
if (builder->snapshot != NULL)
|
|
|
|
|
{
|
|
|
|
@ -1867,8 +1829,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
snapshot_not_interesting:
|
|
|
|
|
if (ondisk.builder.running.xip != NULL)
|
|
|
|
|
pfree(ondisk.builder.running.xip);
|
|
|
|
|
if (ondisk.builder.committed.xip != NULL)
|
|
|
|
|
pfree(ondisk.builder.committed.xip);
|
|
|
|
|
return false;
|
|
|
|
|