diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out index 65115c830a..c7b1f45b46 100644 --- a/contrib/test_decoding/expected/ondisk_startup.out +++ b/contrib/test_decoding/expected/ondisk_startup.out @@ -1,21 +1,30 @@ Parsed test spec with 3 sessions -starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start -step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +starting permutation: s2b s2txid s1init s3b s3txid s2alter s2c s2b s2txid s3c s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start +step s2b: BEGIN; +step s2txid: SELECT txid_current() IS NULL; ?column? f step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +step s3b: BEGIN; +step s3txid: SELECT txid_current() IS NULL; ?column? f step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int; step s2c: COMMIT; +step s2b: BEGIN; +step s2txid: SELECT txid_current() IS NULL; +?column? + +f +step s3c: COMMIT; step s1init: <... completed> ?column? init +step s2c: COMMIT; step s1insert: INSERT INTO do_write DEFAULT VALUES; step s1checkpoint: CHECKPOINT; step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec index 8223705639..12c57a813d 100644 --- a/contrib/test_decoding/specs/ondisk_startup.spec +++ b/contrib/test_decoding/specs/ondisk_startup.spec @@ -24,7 +24,8 @@ step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; } session "s2" setup { SET synchronous_commit=on; } -step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s2b" { BEGIN; } +step "s2txid" { SELECT txid_current() IS NULL; } step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; } step "s2c" { COMMIT; } @@ -32,7 +33,8 @@ step "s2c" { COMMIT; } session "s3" setup { SET synchronous_commit=on; } -step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s3b" { BEGIN; } +step "s3txid" { SELECT txid_current() IS NULL; } step "s3c" { COMMIT; } # Force usage of ondisk snapshot by starting and not finishing a @@ -40,4 +42,4 @@ step "s3c" { COMMIT; } # reached. In combination with a checkpoint forcing a snapshot to be # written and a new restart point computed that'll lead to the usage # of the snapshot. -permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" +permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 46cd5ba1f2..4fabb880ca 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -622,9 +622,6 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { int i; - SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid, - parsed->nsubxacts, parsed->subxacts); - for (i = 0; i < parsed->nsubxacts; i++) { ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 38c088f23b..10798aee44 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1768,7 +1768,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) if (TransactionIdPrecedes(txn->xid, oldestRunningXid)) { - elog(DEBUG1, "aborting old transaction %u", txn->xid); + elog(DEBUG2, "aborting old transaction %u", txn->xid); /* remove potential on-disk data, and deallocate this tx */ ReorderBufferCleanupTXN(rb, txn); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 9f0796a0ec..8001f5b1f0 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -56,23 +56,34 @@ * * * The snapbuild machinery is starting up in several stages, as illustrated - * by the following graph: + * by the following graph describing the SnapBuild->state transitions: + * * +-------------------------+ - * +----|SNAPBUILD_START |-------------+ + * +----| START |-------------+ * | +-------------------------+ | * | | | * | | | - * | running_xacts with running xacts | + * | running_xacts #1 | * | | | * | | | * | v | * | +-------------------------+ v - * | |SNAPBUILD_FULL_SNAPSHOT |------------>| + * | | BUILDING_SNAPSHOT |------------>| * | +-------------------------+ | + * | | | + * | | | + * | running_xacts #2, xacts from #1 finished | + * | | | + * | | | + * | v | + * | +-------------------------+ v + * | | FULL_SNAPSHOT |------------>| + * | +-------------------------+ | + * | | | * running_xacts | saved snapshot * with zero xacts | at running_xacts's lsn * | | | - * | all running toplevel TXNs finished | + * | running_xacts with xacts from #2 finished | * | | | * | v | * | +-------------------------+ | @@ -82,8 +93,8 @@ * Initially the machinery is in the START stage. When an xl_running_xacts * record is read that is sufficiently new (above the safe xmin horizon), * there's a state transition. If there were no running xacts when the - * runnign_xacts record was generated, we'll directly go into CONSISTENT - * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full + * running_xacts record was generated, we'll directly go into CONSISTENT + * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full * snapshot means that all transactions that start henceforth can be decoded * in their entirety, but transactions that started previously can't. In * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously @@ -183,26 +194,24 @@ struct SnapBuild ReorderBuffer *reorder; /* - * Information about initially running transactions - * - * When we start building a snapshot there already may be transactions in - * progress. Those are stored in running.xip. We don't have enough - * information about those to decode their contents, so until they are - * finished (xcnt=0) we cannot switch to a CONSISTENT state. + * Outdated: This struct isn't used for its original purpose anymore, but + * can't be removed / changed in a minor version, because it's stored + * on-disk. */ struct { /* - * As long as running.xcnt all XIDs < running.xmin and > running.xmax - * have to be checked whether they still are running. + * NB: This field is misused, until a major version can break on-disk + * compatibility. See SnapBuildNextPhaseAt() / + * SnapBuildStartNextPhaseAt(). */ - TransactionId xmin; - TransactionId xmax; + TransactionId was_xmin; + TransactionId was_xmax; - size_t xcnt; /* number of used xip entries */ - size_t xcnt_space; /* allocated size of xip */ - TransactionId *xip; /* running xacts array, xidComparator-sorted */ - } running; + size_t was_xcnt; /* number of used xip entries */ + size_t was_xcnt_space; /* allocated size of xip */ + TransactionId *was_xip; /* running xacts array, xidComparator-sorted */ + } was_running; /* * Array of transactions which could have catalog changes that committed @@ -248,12 +257,6 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* transaction state manipulation functions */ -static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); - -/* ->running manipulation */ -static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid); - /* ->committed manipulation */ static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); @@ -268,11 +271,39 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr /* xlog reading helper functions for SnapBuildProcessRecord */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); +static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); +/* + * Return TransactionId after which the next phase of initial snapshot + * building will happen. + */ +static inline TransactionId +SnapBuildNextPhaseAt(SnapBuild *builder) +{ + /* + * For backward compatibility reasons this has to be stored in the wrongly + * named field. Will be fixed in next major version. + */ + return builder->was_running.was_xmax; +} + +/* + * Set TransactionId after which the next phase of initial snapshot building + * will happen. + */ +static inline void +SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at) +{ + /* + * For backward compatibility reasons this has to be stored in the wrongly + * named field. Will be fixed in next major version. + */ + builder->was_running.was_xmax = at; +} /* * Allocate a new snapshot builder. @@ -682,7 +713,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * we got into the SNAPBUILD_FULL_SNAPSHOT state. */ if (builder->state < SNAPBUILD_CONSISTENT && - SnapBuildTxnIsRunning(builder, xid)) + TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))) return false; /* @@ -750,38 +781,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1); } -/* - * Check whether `xid` is currently 'running'. - * - * Running transactions in our parlance are transactions which we didn't - * observe from the start so we can't properly decode their contents. They - * only exist after we freshly started from an < CONSISTENT snapshot. - */ -static bool -SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid) -{ - Assert(builder->state < SNAPBUILD_CONSISTENT); - Assert(TransactionIdIsNormal(builder->running.xmin)); - Assert(TransactionIdIsNormal(builder->running.xmax)); - - if (builder->running.xcnt && - NormalTransactionIdFollows(xid, builder->running.xmin) && - NormalTransactionIdPrecedes(xid, builder->running.xmax)) - { - TransactionId *search = - bsearch(&xid, builder->running.xip, builder->running.xcnt_space, - sizeof(TransactionId), xidComparator); - - if (search != NULL) - { - Assert(*search == xid); - return true; - } - } - - return false; -} - /* * Add a new Snapshot to all transactions we're decoding that currently are * in-progress so they can see new catalog contents made by the transaction @@ -903,63 +902,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) pfree(workspace); } -/* - * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with - * keeping track of the amount of running transactions. - */ -static void -SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) -{ - if (builder->state == SNAPBUILD_CONSISTENT) - return; - - /* - * NB: This handles subtransactions correctly even if we started from - * suboverflowed xl_running_xacts because we only keep track of toplevel - * transactions. Since the latter are always allocated before their - * subxids and since they end at the same time it's sufficient to deal - * with them here. - */ - if (SnapBuildTxnIsRunning(builder, xid)) - { - Assert(builder->running.xcnt > 0); - - if (!--builder->running.xcnt) - { - /* - * None of the originally running transaction is running anymore, - * so our incrementally built snapshot now is consistent. - */ - ereport(LOG, - (errmsg("logical decoding found consistent point at %X/%X", - (uint32) (lsn >> 32), (uint32) lsn), - errdetail("Transaction ID %u finished; no more running transactions.", - xid))); - builder->state = SNAPBUILD_CONSISTENT; - } - } -} - -/* - * Abort a transaction, throw away all state we kept. - */ -void -SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn, - TransactionId xid, - int nsubxacts, TransactionId *subxacts) -{ - int i; - - for (i = 0; i < nsubxacts; i++) - { - TransactionId subxid = subxacts[i]; - - SnapBuildEndTxn(builder, lsn, subxid); - } - - SnapBuildEndTxn(builder, lsn, xid); -} - /* * Handle everything that needs to be done when a transaction commits */ @@ -1003,11 +945,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, { TransactionId subxid = subxacts[nxact]; - /* - * make sure txn is not tracked in running txn's anymore, switch state - */ - SnapBuildEndTxn(builder, lsn, subxid); - /* * If we're forcing timetravel we also need visibility information * about subtransaction, so keep track of subtransaction's state. @@ -1037,12 +974,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } - /* - * Make sure toplevel txn is not tracked in running txn's anymore, switch - * state to consistent if possible. - */ - SnapBuildEndTxn(builder, lsn, xid); - if (forced_timetravel) { elog(DEBUG2, "forced transaction %u to do timetravel.", xid); @@ -1232,25 +1163,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * * a) There were no running transactions when the xl_running_xacts record * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) or c). + * state while waiting on c)'s sub-states. * - * b) Wait for all toplevel transactions that were running to end. We - * simply track the number of in-progress toplevel transactions and - * lower it whenever one commits or aborts. When that number - * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT - * to CONSISTENT. - * NB: We need to search running.xip when seeing a transaction's end to - * make sure it's a toplevel transaction and it's been one of the - * initially running ones. - * Interestingly, in contrast to HS, this allows us not to care about - * subtransactions - and by extension suboverflowed xl_running_xacts - - * at all. - * - * c) This (in a previous run) or another decoding slot serialized a + * b) This (in a previous run) or another decoding slot serialized a * snapshot to disk that we can use. Can't use this method for the * initial snapshot when slot is being created and needs full snapshot * for export or direct use, as that snapshot will only contain catalog * modifying transactions. + * + * c) First incrementally build a snapshot for catalog tuples + * (BUILDING_SNAPSHOT), that requires all, already in-progress, + * transactions to finish. Every transaction starting after that + * (FULL_SNAPSHOT state), has enough information to be decoded. But + * for older running transactions no viable snapshot exists yet, so + * CONSISTENT will only be reached once all of those have finished. * --- */ @@ -1267,16 +1193,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn (uint32) (lsn >> 32), (uint32) lsn), errdetail_internal("initial xmin horizon of %u vs the snapshot's %u", builder->initial_xmin_horizon, running->oldestRunningXid))); + + + SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon); + return true; } /* * a) No transaction were running, we can jump to consistent. * + * This is not affected by races around xl_running_xacts, because we can + * miss transaction commits, but currently not transactions starting. + * * NB: We might have already started to incrementally assemble a snapshot, * so we need to be careful to deal with that. */ - if (running->xcnt == 0) + if (running->oldestRunningXid == running->nextXid) { if (builder->start_decoding_at == InvalidXLogRecPtr || builder->start_decoding_at <= lsn) @@ -1291,12 +1224,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); - /* no transactions running now */ - builder->running.xcnt = 0; - builder->running.xmin = InvalidTransactionId; - builder->running.xmax = InvalidTransactionId; - builder->state = SNAPBUILD_CONSISTENT; + SnapBuildStartNextPhaseAt(builder, InvalidTransactionId); ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", @@ -1305,30 +1234,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* c) valid on disk state and not building full snapshot */ + /* b) valid on disk state and not building full snapshot */ else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ return false; } - /* - * b) first encounter of a useable xl_running_xacts record. If we had - * found one earlier we would either track running transactions (i.e. - * builder->running.xcnt != 0) or be consistent (this function wouldn't - * get called). + * c) transition from START to BUILDING_SNAPSHOT. + * + * In START state, and a xl_running_xacts record with running xacts is + * encountered. In that case, switch to BUILDING_SNAPSHOT state, and + * record xl_running_xacts->nextXid. Once all running xacts have finished + * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It + * might look that we could use xl_running_xact's ->xids information to + * get there quicker, but that is problematic because transactions marked + * as running, might already have inserted their commit record - it's + * infeasible to change that with locking. */ - else if (!builder->running.xcnt) + else if (builder->state == SNAPBUILD_START) { - int off; - - /* - * We only care about toplevel xids as those are the ones we - * definitely see in the wal stream. As snapbuild.c tracks committed - * instead of running transactions we don't need to know anything - * about uncommitted subtransactions. - */ + builder->state = SNAPBUILD_BUILDING_SNAPSHOT; + SnapBuildStartNextPhaseAt(builder, running->nextXid); /* * Start with an xmin/xmax that's correct for future, when all the @@ -1342,59 +1270,57 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); - builder->running.xcnt = running->xcnt; - builder->running.xcnt_space = running->xcnt; - builder->running.xip = - MemoryContextAlloc(builder->context, - builder->running.xcnt * sizeof(TransactionId)); - memcpy(builder->running.xip, running->xids, - builder->running.xcnt * sizeof(TransactionId)); - - /* sort so we can do a binary search */ - qsort(builder->running.xip, builder->running.xcnt, - sizeof(TransactionId), xidComparator); - - builder->running.xmin = builder->running.xip[0]; - builder->running.xmax = builder->running.xip[running->xcnt - 1]; - - /* makes comparisons cheaper later */ - TransactionIdRetreat(builder->running.xmin); - TransactionIdAdvance(builder->running.xmax); - - builder->state = SNAPBUILD_FULL_SNAPSHOT; - ereport(LOG, (errmsg("logical decoding found initial starting point at %X/%X", (uint32) (lsn >> 32), (uint32) lsn), - errdetail_plural("%u transaction needs to finish.", - "%u transactions need to finish.", - builder->running.xcnt, - (uint32) builder->running.xcnt))); + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); - /* - * Iterate through all xids, wait for them to finish. - * - * This isn't required for the correctness of decoding, but to allow - * isolationtester to notice that we're currently waiting for - * something. - */ - for (off = 0; off < builder->running.xcnt; off++) - { - TransactionId xid = builder->running.xip[off]; + SnapBuildWaitSnapshot(running, running->nextXid); + } + /* + * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT. + * + * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid + * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This + * means all transactions starting afterwards have enough information to + * be decoded. Switch to FULL_SNAPSHOT. + */ + else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && + TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder), + running->oldestRunningXid)) + { + builder->state = SNAPBUILD_FULL_SNAPSHOT; + SnapBuildStartNextPhaseAt(builder, running->nextXid); - /* - * Upper layers should prevent that we ever need to wait on - * ourselves. Check anyway, since failing to do so would either - * result in an endless wait or an Assert() failure. - */ - if (TransactionIdIsCurrentTransactionId(xid)) - elog(ERROR, "waiting for ourselves"); + ereport(LOG, + (errmsg("logical decoding found initial consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); - XactLockTableWait(xid, NULL, NULL, XLTW_None); - } + SnapBuildWaitSnapshot(running, running->nextXid); + } + /* + * c) transition from FULL_SNAPSHOT to CONSISTENT. + * + * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts' + * oldestRunningXid is >= than nextXid from when we switched to + * FULL_SNAPSHOT. This means all transactions that are currently in + * progress have a catalog snapshot, and all their changes have been + * collected. Switch to CONSISTENT. + */ + else if (builder->state == SNAPBUILD_FULL_SNAPSHOT && + TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder), + running->oldestRunningXid)) + { + builder->state = SNAPBUILD_CONSISTENT; + SnapBuildStartNextPhaseAt(builder, InvalidTransactionId); - /* nothing could have built up so far, so don't perform cleanup */ - return false; + ereport(LOG, + (errmsg("logical decoding found consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("There are no old transactions anymore."))); } /* @@ -1403,8 +1329,54 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * records so incremental cleanup can be performed. */ return true; + } +/* --- + * Iterate through xids in record, wait for all older than the cutoff to + * finish. Then, if possible, log a new xl_running_xacts record. + * + * This isn't required for the correctness of decoding, but to: + * a) allow isolationtester to notice that we're currently waiting for + * something. + * b) log a new xl_running_xacts record where it'd be helpful, without having + * to write for bgwriter or checkpointer. + * --- + */ +static void +SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) +{ + int off; + + for (off = 0; off < running->xcnt; off++) + { + TransactionId xid = running->xids[off]; + + /* + * Upper layers should prevent that we ever need to wait on + * ourselves. Check anyway, since failing to do so would either + * result in an endless wait or an Assert() failure. + */ + if (TransactionIdIsCurrentTransactionId(xid)) + elog(ERROR, "waiting for ourselves"); + + if (TransactionIdFollows(xid, cutoff)) + continue; + + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } + + /* + * All transactions we needed to finish finished - try to ensure there is + * another xl_running_xacts record in a timely manner, without having to + * write for bgwriter or checkpointer to log one. During recovery we + * can't enforce that, so we'll have to wait. + */ + if (!RecoveryInProgress()) + { + LogStandbySnapshot(); + } +} /* ----------------------------------- * Snapshot serialization support @@ -1554,7 +1526,6 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not remove file \"%s\": %m", path))); needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * builder->running.xcnt_space + sizeof(TransactionId) * builder->committed.xcnt; ondisk_c = MemoryContextAllocZero(builder->context, needed_length); @@ -1573,18 +1544,14 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.context = NULL; ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; - ondisk->builder.running.xip = NULL; ondisk->builder.committed.xip = NULL; COMP_CRC32C(ondisk->checksum, &ondisk->builder, sizeof(SnapBuild)); - /* copy running xacts */ - sz = sizeof(TransactionId) * builder->running.xcnt_space; - memcpy(ondisk_c, builder->running.xip, sz); - COMP_CRC32C(ondisk->checksum, ondisk_c, sz); - ondisk_c += sz; + /* there shouldn't be any running xacts */ + Assert(builder->was_running.was_xcnt == 0); /* copy committed xacts */ sz = sizeof(TransactionId) * builder->committed.xcnt; @@ -1736,10 +1703,11 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); - /* restore running xacts information */ - sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space; - ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz); - readBytes = read(fd, ondisk.builder.running.xip, sz); + /* restore running xacts (dead, but kept for backward compat) */ + sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space; + ondisk.builder.was_running.was_xip = + MemoryContextAllocZero(builder->context, sz); + readBytes = read(fd, ondisk.builder.was_running.was_xip, sz); if (readBytes != sz) { CloseTransientFile(fd); @@ -1748,7 +1716,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not read file \"%s\", read %d of %d: %m", path, readBytes, (int) sz))); } - COMP_CRC32C(checksum, ondisk.builder.running.xip, sz); + COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz); /* restore committed xacts information */ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; @@ -1812,12 +1780,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; - builder->running.xcnt = ondisk.builder.running.xcnt; - if (builder->running.xip) - pfree(builder->running.xip); - builder->running.xcnt_space = ondisk.builder.running.xcnt_space; - builder->running.xip = ondisk.builder.running.xip; - /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) { @@ -1837,8 +1799,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) return true; snapshot_not_interesting: - if (ondisk.builder.running.xip != NULL) - pfree(ondisk.builder.running.xip); if (ondisk.builder.committed.xip != NULL) pfree(ondisk.builder.committed.xip); return false; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 1df4f37634..80e91c9c25 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -20,24 +20,30 @@ typedef enum /* * Initial state, we can't do much yet. */ - SNAPBUILD_START, + SNAPBUILD_START = -1, + + /* + * Collecting committed transactions, to build the initial catalog + * snapshot. + */ + SNAPBUILD_BUILDING_SNAPSHOT = 0, /* * We have collected enough information to decode tuples in transactions * that started after this. * * Once we reached this we start to collect changes. We cannot apply them - * yet because the might be based on transactions that were still running - * when we reached them yet. + * yet, because they might be based on transactions that were still running + * when FULL_SNAPSHOT was reached. */ - SNAPBUILD_FULL_SNAPSHOT, + SNAPBUILD_FULL_SNAPSHOT = 1, /* - * Found a point after hitting built_full_snapshot where all transactions - * that were running at that point finished. Till we reach that we hold - * off calling any commit callbacks. + * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that + * were running at that point finished. Till we reach that we hold off + * calling any commit callbacks. */ - SNAPBUILD_CONSISTENT + SNAPBUILD_CONSISTENT = 2 } SnapBuildState; /* forward declare so we don't have to expose the struct to the public */ @@ -72,9 +78,6 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts); -extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn, - TransactionId xid, int nsubxacts, - TransactionId *subxacts); extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn); extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,