From 7f13ac812313666a2fbb8dacfbee67e78d2ba0bc Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 11 Aug 2022 10:09:24 +0530 Subject: [PATCH] Fix catalog lookup with the wrong snapshot during logical decoding. Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, after the restart, if the logical decoding decodes only the commit record of the transaction that has actually modified a catalog, we will miss adding its XID to the snapshot. Thus, we will end up looking at catalogs with the wrong snapshot. To fix this problem, this change adds the list of transaction IDs and sub-transaction IDs, that have modified catalogs and are running during snapshot serialization, to the serialized snapshot. After restart or otherwise, when we restore from such a serialized snapshot, the corresponding list is restored in memory. Now, when decoding a COMMIT record, we check both the list and the ReorderBuffer to see if the transaction has modified catalogs. Since this adds additional information to the serialized snapshot, we cannot backpatch it. For back branches, we took another approach. We remember the last-running-xacts list of the decoded RUNNING_XACTS record after restoring the previously serialized snapshot. Then, we mark the transaction as containing catalog changes if it's in the list of initial running transactions and its commit record has XACT_XINFO_HAS_INVALS. This doesn't require any file format changes but the transaction will end up being added to the snapshot even if it has only relcache invalidations. But that won't be a problem since we use snapshot built during decoding only to read system catalogs. This commit bumps SNAPBUILD_VERSION because of a change in SnapBuild. Reported-by: Mike Oh Author: Masahiko Sawada Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi Backpatch-through: 10 Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com --- contrib/test_decoding/Makefile | 2 +- .../expected/catalog_change_snapshot.out | 44 +++ .../specs/catalog_change_snapshot.spec | 39 +++ src/backend/replication/logical/decode.c | 3 +- .../replication/logical/reorderbuffer.c | 71 ++++- src/backend/replication/logical/snapbuild.c | 277 ++++++++++++------ src/include/replication/reorderbuffer.h | 12 + src/include/replication/snapbuild.h | 2 +- 8 files changed, 355 insertions(+), 95 deletions(-) create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index b220906479..c7ce603706 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error + twophase_snapshot slot_creation_error catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 0000000000..dc4f9b7018 --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000000..2971ddc69c --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c5c6a2ba68..1667d720b1 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -628,7 +628,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, - parsed->nsubxacts, parsed->subxacts); + parsed->nsubxacts, parsed->subxacts, + parsed->xinfo); /* ---- * Check whether we are interested in this specific transaction, and tell diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 88a37fde72..1c21a1d14b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -349,6 +349,8 @@ ReorderBufferAllocate(void) buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; + buffer->catchange_ntxns = 0; + buffer->outbuf = NULL; buffer->outbufsize = 0; buffer->size = 0; @@ -366,6 +368,7 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); dlist_init(&buffer->txns_by_base_snapshot_lsn); + dlist_init(&buffer->catchange_txns); /* * Ensure there's no stale data from prior uses of this slot, in case some @@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Remove TXN from its containing list. + * Remove TXN from its containing lists. * * Note: if txn is known as subxact, we are deleting the TXN from its * parent's list of known subxacts; this leaves the parent's nsubxacts * count too high, but we don't care. Otherwise, we are deleting the TXN - * from the LSN-ordered list of toplevel TXNs. + * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the + * list of catalog modifying transactions as well. */ dlist_delete(&txn->node); + if (rbtxn_has_catalog_changes(txn)) + { + dlist_delete(&txn->catchange_node); + rb->catchange_ntxns--; + + Assert(rb->catchange_ntxns >= 0); + } /* now remove reference from buffer */ hash_search(rb->by_txn, @@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + if (!rbtxn_has_catalog_changes(txn)) + { + txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dlist_push_tail(&rb->catchange_txns, &txn->catchange_node); + rb->catchange_ntxns++; + } /* * Mark top-level transaction as having catalog changes too if one of its @@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, * conveniently check just top-level transaction and decide whether to * build the hash table or not. */ - if (txn->toptxn != NULL) - txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + toptxn = txn->toptxn; + if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn)) + { + toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + rb->catchange_ntxns++; + } +} + +/* + * Return palloc'ed array of the transactions that have changed catalogs. + * The returned array is sorted in xidComparator order. + * + * The caller must free the returned array when done with it. + */ +TransactionId * +ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb) +{ + dlist_iter iter; + TransactionId *xids = NULL; + size_t xcnt = 0; + + /* Quick return if the list is empty */ + if (rb->catchange_ntxns == 0) + { + Assert(dlist_is_empty(&rb->catchange_txns)); + return NULL; + } + + /* Initialize XID array */ + xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns); + dlist_foreach(iter, &rb->catchange_txns) + { + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, + catchange_node, + iter.cur); + + Assert(rbtxn_has_catalog_changes(txn)); + + xids[xcnt++] = txn->xid; + } + + qsort(xids, xcnt, sizeof(TransactionId), xidComparator); + + Assert(xcnt == rb->catchange_ntxns); + return xids; } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 73c0f15214..1ff2c12240 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -241,6 +241,33 @@ struct SnapBuild */ TransactionId *xip; } committed; + + /* + * Array of transactions and subtransactions that had modified catalogs + * and were running when the snapshot was serialized. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that + * the logical decoding decodes only the commit record of the transaction + * after restoring the previously serialized snapshot in which case we + * will miss adding the xid to the snapshot and end up looking at the + * catalogs with the wrong snapshot. + * + * Now to avoid the above problem, we serialize the transactions that had + * modified the catalogs and are still running at the time of snapshot + * serialization. We fill this array while restoring the snapshot and then + * refer it while decoding commit to ensure if the xact has modified the + * catalog. We discard this array when all the xids in the list become old + * enough to matter. See SnapBuildPurgeOlderTxn for details. + */ + struct + { + /* number of transactions */ + size_t xcnt; + + /* This array must be sorted in xidComparator order */ + TransactionId *xip; + } catchange; }; /* @@ -250,8 +277,8 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* ->committed and ->catchange manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap); static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, + uint32 xinfo); + /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); @@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); /* * Allocate a new snapshot builder. @@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); builder->committed.includes_all_transactions = true; + builder->catchange.xcnt = 0; + builder->catchange.xip = NULL; + builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; @@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed or containing catalog + * changes that are smaller than ->xmin. Those won't ever get checked via + * the ->committed or ->catchange array, respectively. The committed xids will + * get checked via the clog machinery. + * + * We can ideally remove the transaction from catchange array once it is + * finished (committed/aborted) but that could be costly as we need to maintain + * the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* + * Either all the xacts got purged or none. It is only possible to + * partially remove the xids from this array if one or more of the xids + * are still running but not all. That can happen if we start decoding + * from a point (LSN where the snapshot state became consistent) where all + * the xacts in this were running and then at least one of those got + * committed and a few are still running. We will never start from such a + * point because we won't move the slot's restart_lsn past the point where + * the oldest running transaction's restart_decoding_lsn is. + */ + if (builder->catchange.xcnt == 0 || + TransactionIdFollowsOrEquals(builder->catchange.xip[0], + builder->xmin)) + return; + + Assert(TransactionIdFollows(builder->xmin, + builder->catchange.xip[builder->catchange.xcnt - 1])); + pfree(builder->catchange.xip); + builder->catchange.xip = NULL; + builder->catchange.xcnt = 0; + + elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u", + builder->xmin); } /* @@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) */ void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, - int nsubxacts, TransactionId *subxacts) + int nsubxacts, TransactionId *subxacts, uint32 xinfo) { int nxact; @@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) + if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo)) { sub_needs_timetravel = true; needs_snapshot = true; @@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } /* if top-level modified catalog, it'll need a snapshot */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } +/* + * Check the reorder buffer and the snapshot to see if the given transaction has + * modified catalogs. + */ +static inline bool +SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, + uint32 xinfo) +{ + if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + return true; + + /* + * The transactions that have changed catalogs must have invalidation + * info. + */ + if (!(xinfo & XACT_XINFO_HAS_INVALS)) + return false; + + /* Check the catchange XID array */ + return ((builder->catchange.xcnt > 0) && + (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt, + sizeof(TransactionId), xidComparator) != NULL)); +} /* ----------------------------------- * Snapshot building functions dealing with xlog records @@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) * * struct SnapBuildOnDisk; * TransactionId * committed.xcnt; (*not xcnt_space*) + * TransactionId * catchange.xcnt; * */ typedef struct SnapBuildOnDisk @@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 4 +#define SNAPBUILD_VERSION 5 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; + TransactionId *catchange_xip = NULL; + MemoryContext old_ctx; + size_t catchange_xcnt; char *ondisk_c; int fd; char tmppath[MAXPGPATH]; @@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", tmppath))); - needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * builder->committed.xcnt; + old_ctx = MemoryContextSwitchTo(builder->context); - ondisk_c = MemoryContextAllocZero(builder->context, needed_length); + /* Get the catalog modifying transactions that are yet not committed */ + catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder); + catchange_xcnt = builder->reorder->catchange_ntxns; + + needed_length = sizeof(SnapBuildOnDisk) + + sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt); + + ondisk_c = palloc0(needed_length); ondisk = (SnapBuildOnDisk *) ondisk_c; ondisk->magic = SNAPBUILD_MAGIC; ondisk->version = SNAPBUILD_VERSION; @@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; ondisk->builder.committed.xip = NULL; + ondisk->builder.catchange.xip = NULL; + /* update catchange only on disk data */ + ondisk->builder.catchange.xcnt = catchange_xcnt; COMP_CRC32C(ondisk->checksum, &ondisk->builder, sizeof(SnapBuild)); /* copy committed xacts */ - sz = sizeof(TransactionId) * builder->committed.xcnt; - memcpy(ondisk_c, builder->committed.xip, sz); - COMP_CRC32C(ondisk->checksum, ondisk_c, sz); - ondisk_c += sz; + if (builder->committed.xcnt > 0) + { + sz = sizeof(TransactionId) * builder->committed.xcnt; + memcpy(ondisk_c, builder->committed.xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } + + /* copy catalog modifying xacts */ + if (catchange_xcnt > 0) + { + sz = sizeof(TransactionId) * catchange_xcnt; + memcpy(ondisk_c, catchange_xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } FIN_CRC32C(ondisk->checksum); @@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) */ builder->last_serialized_snapshot = lsn; + MemoryContextSwitchTo(old_ctx); + out: ReorderBufferSetRestartPoint(builder->reorder, builder->last_serialized_snapshot); /* be tidy */ if (ondisk) pfree(ondisk); + if (catchange_xip) + pfree(catchange_xip); } /* @@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) int fd; char path[MAXPGPATH]; Size sz; - int readBytes; pg_crc32c checksum; /* no point in loading a snapshot if we're already there */ @@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* read statically sized portion of snapshot */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize); - pgstat_report_wait_end(); - if (readBytes != SnapBuildOnDiskConstantSize) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, - (Size) SnapBuildOnDiskConstantSize))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path); if (ondisk.magic != SNAPBUILD_MAGIC) ereport(ERROR, @@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); - pgstat_report_wait_end(); - if (readBytes != sizeof(SnapBuild)) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sizeof(SnapBuild)))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path); COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore committed xacts information */ - sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; - ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, ondisk.builder.committed.xip, sz); - pgstat_report_wait_end(); - if (readBytes != sz) + if (ondisk.builder.committed.xcnt > 0) { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sz))); + sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; + ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); + } + + /* restore catalog modifying xacts information */ + if (ondisk.builder.catchange.xcnt > 0) + { + sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt; + ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz); } - COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); if (CloseTransientFile(fd) != 0) ereport(ERROR, @@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; + /* set catalog modifying transactions */ + if (builder->catchange.xip) + pfree(builder->catchange.xip); + builder->catchange.xcnt = ondisk.builder.catchange.xcnt; + builder->catchange.xip = ondisk.builder.catchange.xip; + ondisk.builder.catchange.xip = NULL; + /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) { @@ -1906,9 +1975,43 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) snapshot_not_interesting: if (ondisk.builder.committed.xip != NULL) pfree(ondisk.builder.committed.xip); + if (ondisk.builder.catchange.xip != NULL) + pfree(ondisk.builder.catchange.xip); return false; } +/* + * Read the contents of the serialized snapshot to 'dest'. + */ +static void +SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path) +{ + int readBytes; + + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, dest, size); + pgstat_report_wait_end(); + if (readBytes != size) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(SnapBuild)))); + } +} + /* * Remove all serialized snapshots that are not required anymore because no * slot can need them. This doesn't actually have to run during a checkpoint, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 2c9206ace4..8695901ba7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -380,6 +380,11 @@ typedef struct ReorderBufferTXN */ dlist_node node; + /* + * A node in the list of catalog modifying transactions + */ + dlist_node catchange_node; + /* * Size of this transaction (changes currently in memory, in bytes). */ @@ -526,6 +531,12 @@ struct ReorderBuffer */ dlist_head txns_by_base_snapshot_lsn; + /* + * Transactions and subtransactions that have modified system catalogs. + */ + dlist_head catchange_txns; + int catchange_ntxns; + /* * one-entry sized cache for by_txn. Very frequently the same txn gets * looked up over and over again. @@ -677,6 +688,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); +extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb); extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index d179251aad..e6adea24f2 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -82,7 +82,7 @@ extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, - TransactionId *subxacts); + TransactionId *subxacts, uint32 xinfo); extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn); extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,