diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index b220906479..c7ce603706 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error + twophase_snapshot slot_creation_error catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 0000000000..dc4f9b7018 --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000000..2971ddc69c --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c5c6a2ba68..1667d720b1 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -628,7 +628,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, - parsed->nsubxacts, parsed->subxacts); + parsed->nsubxacts, parsed->subxacts, + parsed->xinfo); /* ---- * Check whether we are interested in this specific transaction, and tell diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 88a37fde72..1c21a1d14b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -349,6 +349,8 @@ ReorderBufferAllocate(void) buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; + buffer->catchange_ntxns = 0; + buffer->outbuf = NULL; buffer->outbufsize = 0; buffer->size = 0; @@ -366,6 +368,7 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); dlist_init(&buffer->txns_by_base_snapshot_lsn); + dlist_init(&buffer->catchange_txns); /* * Ensure there's no stale data from prior uses of this slot, in case some @@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Remove TXN from its containing list. + * Remove TXN from its containing lists. * * Note: if txn is known as subxact, we are deleting the TXN from its * parent's list of known subxacts; this leaves the parent's nsubxacts * count too high, but we don't care. Otherwise, we are deleting the TXN - * from the LSN-ordered list of toplevel TXNs. + * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the + * list of catalog modifying transactions as well. */ dlist_delete(&txn->node); + if (rbtxn_has_catalog_changes(txn)) + { + dlist_delete(&txn->catchange_node); + rb->catchange_ntxns--; + + Assert(rb->catchange_ntxns >= 0); + } /* now remove reference from buffer */ hash_search(rb->by_txn, @@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + if (!rbtxn_has_catalog_changes(txn)) + { + txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dlist_push_tail(&rb->catchange_txns, &txn->catchange_node); + rb->catchange_ntxns++; + } /* * Mark top-level transaction as having catalog changes too if one of its @@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, * conveniently check just top-level transaction and decide whether to * build the hash table or not. */ - if (txn->toptxn != NULL) - txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + toptxn = txn->toptxn; + if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn)) + { + toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + rb->catchange_ntxns++; + } +} + +/* + * Return palloc'ed array of the transactions that have changed catalogs. + * The returned array is sorted in xidComparator order. + * + * The caller must free the returned array when done with it. + */ +TransactionId * +ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb) +{ + dlist_iter iter; + TransactionId *xids = NULL; + size_t xcnt = 0; + + /* Quick return if the list is empty */ + if (rb->catchange_ntxns == 0) + { + Assert(dlist_is_empty(&rb->catchange_txns)); + return NULL; + } + + /* Initialize XID array */ + xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns); + dlist_foreach(iter, &rb->catchange_txns) + { + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, + catchange_node, + iter.cur); + + Assert(rbtxn_has_catalog_changes(txn)); + + xids[xcnt++] = txn->xid; + } + + qsort(xids, xcnt, sizeof(TransactionId), xidComparator); + + Assert(xcnt == rb->catchange_ntxns); + return xids; } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 73c0f15214..1ff2c12240 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -241,6 +241,33 @@ struct SnapBuild */ TransactionId *xip; } committed; + + /* + * Array of transactions and subtransactions that had modified catalogs + * and were running when the snapshot was serialized. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that + * the logical decoding decodes only the commit record of the transaction + * after restoring the previously serialized snapshot in which case we + * will miss adding the xid to the snapshot and end up looking at the + * catalogs with the wrong snapshot. + * + * Now to avoid the above problem, we serialize the transactions that had + * modified the catalogs and are still running at the time of snapshot + * serialization. We fill this array while restoring the snapshot and then + * refer it while decoding commit to ensure if the xact has modified the + * catalog. We discard this array when all the xids in the list become old + * enough to matter. See SnapBuildPurgeOlderTxn for details. + */ + struct + { + /* number of transactions */ + size_t xcnt; + + /* This array must be sorted in xidComparator order */ + TransactionId *xip; + } catchange; }; /* @@ -250,8 +277,8 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* ->committed and ->catchange manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap); static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, + uint32 xinfo); + /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); @@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); /* * Allocate a new snapshot builder. @@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); builder->committed.includes_all_transactions = true; + builder->catchange.xcnt = 0; + builder->catchange.xip = NULL; + builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; @@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed or containing catalog + * changes that are smaller than ->xmin. Those won't ever get checked via + * the ->committed or ->catchange array, respectively. The committed xids will + * get checked via the clog machinery. + * + * We can ideally remove the transaction from catchange array once it is + * finished (committed/aborted) but that could be costly as we need to maintain + * the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* + * Either all the xacts got purged or none. It is only possible to + * partially remove the xids from this array if one or more of the xids + * are still running but not all. That can happen if we start decoding + * from a point (LSN where the snapshot state became consistent) where all + * the xacts in this were running and then at least one of those got + * committed and a few are still running. We will never start from such a + * point because we won't move the slot's restart_lsn past the point where + * the oldest running transaction's restart_decoding_lsn is. + */ + if (builder->catchange.xcnt == 0 || + TransactionIdFollowsOrEquals(builder->catchange.xip[0], + builder->xmin)) + return; + + Assert(TransactionIdFollows(builder->xmin, + builder->catchange.xip[builder->catchange.xcnt - 1])); + pfree(builder->catchange.xip); + builder->catchange.xip = NULL; + builder->catchange.xcnt = 0; + + elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u", + builder->xmin); } /* @@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) */ void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, - int nsubxacts, TransactionId *subxacts) + int nsubxacts, TransactionId *subxacts, uint32 xinfo) { int nxact; @@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) + if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo)) { sub_needs_timetravel = true; needs_snapshot = true; @@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } /* if top-level modified catalog, it'll need a snapshot */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } +/* + * Check the reorder buffer and the snapshot to see if the given transaction has + * modified catalogs. + */ +static inline bool +SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, + uint32 xinfo) +{ + if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + return true; + + /* + * The transactions that have changed catalogs must have invalidation + * info. + */ + if (!(xinfo & XACT_XINFO_HAS_INVALS)) + return false; + + /* Check the catchange XID array */ + return ((builder->catchange.xcnt > 0) && + (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt, + sizeof(TransactionId), xidComparator) != NULL)); +} /* ----------------------------------- * Snapshot building functions dealing with xlog records @@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) * * struct SnapBuildOnDisk; * TransactionId * committed.xcnt; (*not xcnt_space*) + * TransactionId * catchange.xcnt; * */ typedef struct SnapBuildOnDisk @@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 4 +#define SNAPBUILD_VERSION 5 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; + TransactionId *catchange_xip = NULL; + MemoryContext old_ctx; + size_t catchange_xcnt; char *ondisk_c; int fd; char tmppath[MAXPGPATH]; @@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", tmppath))); - needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * builder->committed.xcnt; + old_ctx = MemoryContextSwitchTo(builder->context); - ondisk_c = MemoryContextAllocZero(builder->context, needed_length); + /* Get the catalog modifying transactions that are yet not committed */ + catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder); + catchange_xcnt = builder->reorder->catchange_ntxns; + + needed_length = sizeof(SnapBuildOnDisk) + + sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt); + + ondisk_c = palloc0(needed_length); ondisk = (SnapBuildOnDisk *) ondisk_c; ondisk->magic = SNAPBUILD_MAGIC; ondisk->version = SNAPBUILD_VERSION; @@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; ondisk->builder.committed.xip = NULL; + ondisk->builder.catchange.xip = NULL; + /* update catchange only on disk data */ + ondisk->builder.catchange.xcnt = catchange_xcnt; COMP_CRC32C(ondisk->checksum, &ondisk->builder, sizeof(SnapBuild)); /* copy committed xacts */ - sz = sizeof(TransactionId) * builder->committed.xcnt; - memcpy(ondisk_c, builder->committed.xip, sz); - COMP_CRC32C(ondisk->checksum, ondisk_c, sz); - ondisk_c += sz; + if (builder->committed.xcnt > 0) + { + sz = sizeof(TransactionId) * builder->committed.xcnt; + memcpy(ondisk_c, builder->committed.xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } + + /* copy catalog modifying xacts */ + if (catchange_xcnt > 0) + { + sz = sizeof(TransactionId) * catchange_xcnt; + memcpy(ondisk_c, catchange_xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } FIN_CRC32C(ondisk->checksum); @@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) */ builder->last_serialized_snapshot = lsn; + MemoryContextSwitchTo(old_ctx); + out: ReorderBufferSetRestartPoint(builder->reorder, builder->last_serialized_snapshot); /* be tidy */ if (ondisk) pfree(ondisk); + if (catchange_xip) + pfree(catchange_xip); } /* @@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) int fd; char path[MAXPGPATH]; Size sz; - int readBytes; pg_crc32c checksum; /* no point in loading a snapshot if we're already there */ @@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* read statically sized portion of snapshot */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize); - pgstat_report_wait_end(); - if (readBytes != SnapBuildOnDiskConstantSize) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, - (Size) SnapBuildOnDiskConstantSize))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path); if (ondisk.magic != SNAPBUILD_MAGIC) ereport(ERROR, @@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); - pgstat_report_wait_end(); - if (readBytes != sizeof(SnapBuild)) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sizeof(SnapBuild)))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path); COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore committed xacts information */ - sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; - ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, ondisk.builder.committed.xip, sz); - pgstat_report_wait_end(); - if (readBytes != sz) + if (ondisk.builder.committed.xcnt > 0) { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sz))); + sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; + ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); + } + + /* restore catalog modifying xacts information */ + if (ondisk.builder.catchange.xcnt > 0) + { + sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt; + ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz); } - COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); if (CloseTransientFile(fd) != 0) ereport(ERROR, @@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; + /* set catalog modifying transactions */ + if (builder->catchange.xip) + pfree(builder->catchange.xip); + builder->catchange.xcnt = ondisk.builder.catchange.xcnt; + builder->catchange.xip = ondisk.builder.catchange.xip; + ondisk.builder.catchange.xip = NULL; + /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) { @@ -1906,9 +1975,43 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) snapshot_not_interesting: if (ondisk.builder.committed.xip != NULL) pfree(ondisk.builder.committed.xip); + if (ondisk.builder.catchange.xip != NULL) + pfree(ondisk.builder.catchange.xip); return false; } +/* + * Read the contents of the serialized snapshot to 'dest'. + */ +static void +SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path) +{ + int readBytes; + + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, dest, size); + pgstat_report_wait_end(); + if (readBytes != size) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(SnapBuild)))); + } +} + /* * Remove all serialized snapshots that are not required anymore because no * slot can need them. This doesn't actually have to run during a checkpoint, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 2c9206ace4..8695901ba7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -380,6 +380,11 @@ typedef struct ReorderBufferTXN */ dlist_node node; + /* + * A node in the list of catalog modifying transactions + */ + dlist_node catchange_node; + /* * Size of this transaction (changes currently in memory, in bytes). */ @@ -526,6 +531,12 @@ struct ReorderBuffer */ dlist_head txns_by_base_snapshot_lsn; + /* + * Transactions and subtransactions that have modified system catalogs. + */ + dlist_head catchange_txns; + int catchange_ntxns; + /* * one-entry sized cache for by_txn. Very frequently the same txn gets * looked up over and over again. @@ -677,6 +688,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); +extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb); extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index d179251aad..e6adea24f2 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -82,7 +82,7 @@ extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, - TransactionId *subxacts); + TransactionId *subxacts, uint32 xinfo); extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn); extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,