Fix catalog lookup with the wrong snapshot during logical decoding.

Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION
records to know if the transaction has modified the catalog, and that
information is not serialized to snapshot. Therefore, after the restart,
if the logical decoding decodes only the commit record of the transaction
that has actually modified a catalog, we will miss adding its XID to the
snapshot. Thus, we will end up looking at catalogs with the wrong
snapshot.

To fix this problem, this change adds the list of transaction IDs and
sub-transaction IDs, that have modified catalogs and are running during
snapshot serialization, to the serialized snapshot. After restart or
otherwise, when we restore from such a serialized snapshot, the
corresponding list is restored in memory. Now, when decoding a COMMIT
record, we check both the list and the ReorderBuffer to see if the
transaction has modified catalogs.

Since this adds additional information to the serialized snapshot, we
cannot backpatch it. For back branches, we took another approach.
We remember the last-running-xacts list of the decoded RUNNING_XACTS
record after restoring the previously serialized snapshot. Then, we mark
the transaction as containing catalog changes if it's in the list of
initial running transactions and its commit record has
XACT_XINFO_HAS_INVALS. This doesn't require any file format changes but
the transaction will end up being added to the snapshot even if it has
only relcache invalidations. But that won't be a problem since we use
snapshot built during decoding only to read system catalogs.

This commit bumps SNAPBUILD_VERSION because of a change in SnapBuild.

Reported-by: Mike Oh
Author: Masahiko Sawada
Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi
Backpatch-through: 10
Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com
This commit is contained in:
Amit Kapila 2022-08-11 10:09:24 +05:30
parent 37a6e5df37
commit 7f13ac8123
8 changed files with 355 additions and 95 deletions

View File

@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error
twophase_snapshot slot_creation_error catalog_change_snapshot
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf

View File

@ -0,0 +1,44 @@
Parsed test spec with 2 sessions
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
?column?
--------
init
(1 row)
step s0_begin: BEGIN;
step s0_savepoint: SAVEPOINT sp1;
step s0_truncate: TRUNCATE tbl1;
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
----
(0 rows)
step s0_commit: COMMIT;
step s0_begin: BEGIN;
step s0_insert: INSERT INTO tbl1 VALUES (1);
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
---------------------------------------
BEGIN
table public.tbl1: TRUNCATE: (no-flags)
COMMIT
(3 rows)
step s0_commit: COMMIT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
-------------------------------------------------------------
BEGIN
table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
COMMIT
(3 rows)
?column?
--------
stop
(1 row)

View File

@ -0,0 +1,39 @@
# Test decoding only the commit record of the transaction that have
# modified catalogs.
setup
{
DROP TABLE IF EXISTS tbl1;
CREATE TABLE tbl1 (val1 integer, val2 integer);
}
teardown
{
DROP TABLE tbl1;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
session "s0"
setup { SET synchronous_commit=on; }
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
step "s0_begin" { BEGIN; }
step "s0_savepoint" { SAVEPOINT sp1; }
step "s0_truncate" { TRUNCATE tbl1; }
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
step "s0_commit" { COMMIT; }
session "s1"
setup { SET synchronous_commit=on; }
step "s1_checkpoint" { CHECKPOINT; }
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
# during the first checkpoint execution. This transaction must be marked as
# containing catalog changes while decoding the COMMIT record and the decoding
# of the INSERT record must read the pg_class with the correct historic snapshot.
#
# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
# record written by bgwriter. One might think we can either stop the bgwriter or
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"

View File

@ -628,7 +628,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
parsed->nsubxacts, parsed->subxacts);
parsed->nsubxacts, parsed->subxacts,
parsed->xinfo);
/* ----
* Check whether we are interested in this specific transaction, and tell

View File

@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
buffer->catchange_ntxns = 0;
buffer->outbuf = NULL;
buffer->outbufsize = 0;
buffer->size = 0;
@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
dlist_init(&buffer->toplevel_by_lsn);
dlist_init(&buffer->txns_by_base_snapshot_lsn);
dlist_init(&buffer->catchange_txns);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
* Remove TXN from its containing list.
* Remove TXN from its containing lists.
*
* Note: if txn is known as subxact, we are deleting the TXN from its
* parent's list of known subxacts; this leaves the parent's nsubxacts
* count too high, but we don't care. Otherwise, we are deleting the TXN
* from the LSN-ordered list of toplevel TXNs.
* from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
* list of catalog modifying transactions as well.
*/
dlist_delete(&txn->node);
if (rbtxn_has_catalog_changes(txn))
{
dlist_delete(&txn->catchange_node);
rb->catchange_ntxns--;
Assert(rb->catchange_ntxns >= 0);
}
/* now remove reference from buffer */
hash_search(rb->by_txn,
@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn)
{
ReorderBufferTXN *txn;
ReorderBufferTXN *toptxn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
if (!rbtxn_has_catalog_changes(txn))
{
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
rb->catchange_ntxns++;
}
/*
* Mark top-level transaction as having catalog changes too if one of its
@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
* conveniently check just top-level transaction and decide whether to
* build the hash table or not.
*/
if (txn->toptxn != NULL)
txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
toptxn = txn->toptxn;
if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
{
toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
rb->catchange_ntxns++;
}
}
/*
* Return palloc'ed array of the transactions that have changed catalogs.
* The returned array is sorted in xidComparator order.
*
* The caller must free the returned array when done with it.
*/
TransactionId *
ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
{
dlist_iter iter;
TransactionId *xids = NULL;
size_t xcnt = 0;
/* Quick return if the list is empty */
if (rb->catchange_ntxns == 0)
{
Assert(dlist_is_empty(&rb->catchange_txns));
return NULL;
}
/* Initialize XID array */
xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
dlist_foreach(iter, &rb->catchange_txns)
{
ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
catchange_node,
iter.cur);
Assert(rbtxn_has_catalog_changes(txn));
xids[xcnt++] = txn->xid;
}
qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
Assert(xcnt == rb->catchange_ntxns);
return xids;
}
/*

View File

@ -241,6 +241,33 @@ struct SnapBuild
*/
TransactionId *xip;
} committed;
/*
* Array of transactions and subtransactions that had modified catalogs
* and were running when the snapshot was serialized.
*
* We normally rely on some WAL record types such as HEAP2_NEW_CID to know
* if the transaction has changed the catalog. But it could happen that
* the logical decoding decodes only the commit record of the transaction
* after restoring the previously serialized snapshot in which case we
* will miss adding the xid to the snapshot and end up looking at the
* catalogs with the wrong snapshot.
*
* Now to avoid the above problem, we serialize the transactions that had
* modified the catalogs and are still running at the time of snapshot
* serialization. We fill this array while restoring the snapshot and then
* refer it while decoding commit to ensure if the xact has modified the
* catalog. We discard this array when all the xids in the list become old
* enough to matter. See SnapBuildPurgeOlderTxn for details.
*/
struct
{
/* number of transactions */
size_t xcnt;
/* This array must be sorted in xidComparator order */
TransactionId *xip;
} catchange;
};
/*
@ -250,8 +277,8 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
/* ->committed manipulation */
static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
/* ->committed and ->catchange manipulation */
static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap);
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
uint32 xinfo);
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof
/* serialization functions */
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
/*
* Allocate a new snapshot builder.
@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
builder->committed.includes_all_transactions = true;
builder->catchange.xcnt = 0;
builder->catchange.xip = NULL;
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
}
/*
* Remove knowledge about transactions we treat as committed that are smaller
* than ->xmin. Those won't ever get checked via the ->committed array but via
* the clog machinery, so we don't need to waste memory on them.
* Remove knowledge about transactions we treat as committed or containing catalog
* changes that are smaller than ->xmin. Those won't ever get checked via
* the ->committed or ->catchange array, respectively. The committed xids will
* get checked via the clog machinery.
*
* We can ideally remove the transaction from catchange array once it is
* finished (committed/aborted) but that could be costly as we need to maintain
* the xids order in the array.
*/
static void
SnapBuildPurgeCommittedTxn(SnapBuild *builder)
SnapBuildPurgeOlderTxn(SnapBuild *builder)
{
int off;
TransactionId *workspace;
@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
builder->committed.xcnt = surviving_xids;
pfree(workspace);
/*
* Either all the xacts got purged or none. It is only possible to
* partially remove the xids from this array if one or more of the xids
* are still running but not all. That can happen if we start decoding
* from a point (LSN where the snapshot state became consistent) where all
* the xacts in this were running and then at least one of those got
* committed and a few are still running. We will never start from such a
* point because we won't move the slot's restart_lsn past the point where
* the oldest running transaction's restart_decoding_lsn is.
*/
if (builder->catchange.xcnt == 0 ||
TransactionIdFollowsOrEquals(builder->catchange.xip[0],
builder->xmin))
return;
Assert(TransactionIdFollows(builder->xmin,
builder->catchange.xip[builder->catchange.xcnt - 1]));
pfree(builder->catchange.xip);
builder->catchange.xip = NULL;
builder->catchange.xcnt = 0;
elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
builder->xmin);
}
/*
@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
*/
void
SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
int nsubxacts, TransactionId *subxacts)
int nsubxacts, TransactionId *subxacts, uint32 xinfo)
{
int nxact;
@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
* Add subtransaction to base snapshot if catalog modifying, we don't
* distinguish to toplevel transactions there.
*/
if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
{
sub_needs_timetravel = true;
needs_snapshot = true;
@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
/* if top-level modified catalog, it'll need a snapshot */
if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
{
elog(DEBUG2, "found top level transaction %u, with catalog changes",
xid);
@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
}
/*
* Check the reorder buffer and the snapshot to see if the given transaction has
* modified catalogs.
*/
static inline bool
SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
uint32 xinfo)
{
if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
return true;
/*
* The transactions that have changed catalogs must have invalidation
* info.
*/
if (!(xinfo & XACT_XINFO_HAS_INVALS))
return false;
/* Check the catchange XID array */
return ((builder->catchange.xcnt > 0) &&
(bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
sizeof(TransactionId), xidComparator) != NULL));
}
/* -----------------------------------
* Snapshot building functions dealing with xlog records
@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
builder->xmin = running->oldestRunningXid;
/* Remove transactions we don't need to keep track off anymore */
SnapBuildPurgeCommittedTxn(builder);
SnapBuildPurgeOlderTxn(builder);
/*
* Advance the xmin limit for the current replication slot, to allow
@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
*
* struct SnapBuildOnDisk;
* TransactionId * committed.xcnt; (*not xcnt_space*)
* TransactionId * catchange.xcnt;
*
*/
typedef struct SnapBuildOnDisk
@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
#define SNAPBUILD_VERSION 4
#define SNAPBUILD_VERSION 5
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
{
Size needed_length;
SnapBuildOnDisk *ondisk = NULL;
TransactionId *catchange_xip = NULL;
MemoryContext old_ctx;
size_t catchange_xcnt;
char *ondisk_c;
int fd;
char tmppath[MAXPGPATH];
@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", tmppath)));
needed_length = sizeof(SnapBuildOnDisk) +
sizeof(TransactionId) * builder->committed.xcnt;
old_ctx = MemoryContextSwitchTo(builder->context);
ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
/* Get the catalog modifying transactions that are yet not committed */
catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
catchange_xcnt = builder->reorder->catchange_ntxns;
needed_length = sizeof(SnapBuildOnDisk) +
sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
ondisk_c = palloc0(needed_length);
ondisk = (SnapBuildOnDisk *) ondisk_c;
ondisk->magic = SNAPBUILD_MAGIC;
ondisk->version = SNAPBUILD_VERSION;
@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
ondisk->builder.snapshot = NULL;
ondisk->builder.reorder = NULL;
ondisk->builder.committed.xip = NULL;
ondisk->builder.catchange.xip = NULL;
/* update catchange only on disk data */
ondisk->builder.catchange.xcnt = catchange_xcnt;
COMP_CRC32C(ondisk->checksum,
&ondisk->builder,
sizeof(SnapBuild));
/* copy committed xacts */
sz = sizeof(TransactionId) * builder->committed.xcnt;
memcpy(ondisk_c, builder->committed.xip, sz);
COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
if (builder->committed.xcnt > 0)
{
sz = sizeof(TransactionId) * builder->committed.xcnt;
memcpy(ondisk_c, builder->committed.xip, sz);
COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
}
/* copy catalog modifying xacts */
if (catchange_xcnt > 0)
{
sz = sizeof(TransactionId) * catchange_xcnt;
memcpy(ondisk_c, catchange_xip, sz);
COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
}
FIN_CRC32C(ondisk->checksum);
@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
*/
builder->last_serialized_snapshot = lsn;
MemoryContextSwitchTo(old_ctx);
out:
ReorderBufferSetRestartPoint(builder->reorder,
builder->last_serialized_snapshot);
/* be tidy */
if (ondisk)
pfree(ondisk);
if (catchange_xip)
pfree(catchange_xip);
}
/*
@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
int fd;
char path[MAXPGPATH];
Size sz;
int readBytes;
pg_crc32c checksum;
/* no point in loading a snapshot if we're already there */
@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
/* read statically sized portion of snapshot */
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
pgstat_report_wait_end();
if (readBytes != SnapBuildOnDiskConstantSize)
{
int save_errno = errno;
CloseTransientFile(fd);
if (readBytes < 0)
{
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", path)));
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read file \"%s\": read %d of %zu",
path, readBytes,
(Size) SnapBuildOnDiskConstantSize)));
}
SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
if (ondisk.magic != SNAPBUILD_MAGIC)
ereport(ERROR,
@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
pgstat_report_wait_end();
if (readBytes != sizeof(SnapBuild))
{
int save_errno = errno;
CloseTransientFile(fd);
if (readBytes < 0)
{
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", path)));
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read file \"%s\": read %d of %zu",
path, readBytes, sizeof(SnapBuild))));
}
SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
/* restore committed xacts information */
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, ondisk.builder.committed.xip, sz);
pgstat_report_wait_end();
if (readBytes != sz)
if (ondisk.builder.committed.xcnt > 0)
{
int save_errno = errno;
CloseTransientFile(fd);
if (readBytes < 0)
{
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", path)));
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read file \"%s\": read %d of %zu",
path, readBytes, sz)));
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
}
/* restore catalog modifying xacts information */
if (ondisk.builder.catchange.xcnt > 0)
{
sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
}
COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
if (CloseTransientFile(fd) != 0)
ereport(ERROR,
@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
ondisk.builder.committed.xip = NULL;
/* set catalog modifying transactions */
if (builder->catchange.xip)
pfree(builder->catchange.xip);
builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
builder->catchange.xip = ondisk.builder.catchange.xip;
ondisk.builder.catchange.xip = NULL;
/* our snapshot is not interesting anymore, build a new one */
if (builder->snapshot != NULL)
{
@ -1906,9 +1975,43 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
snapshot_not_interesting:
if (ondisk.builder.committed.xip != NULL)
pfree(ondisk.builder.committed.xip);
if (ondisk.builder.catchange.xip != NULL)
pfree(ondisk.builder.catchange.xip);
return false;
}
/*
* Read the contents of the serialized snapshot to 'dest'.
*/
static void
SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
{
int readBytes;
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, dest, size);
pgstat_report_wait_end();
if (readBytes != size)
{
int save_errno = errno;
CloseTransientFile(fd);
if (readBytes < 0)
{
errno = save_errno;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", path)));
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read file \"%s\": read %d of %zu",
path, readBytes, sizeof(SnapBuild))));
}
}
/*
* Remove all serialized snapshots that are not required anymore because no
* slot can need them. This doesn't actually have to run during a checkpoint,

View File

@ -380,6 +380,11 @@ typedef struct ReorderBufferTXN
*/
dlist_node node;
/*
* A node in the list of catalog modifying transactions
*/
dlist_node catchange_node;
/*
* Size of this transaction (changes currently in memory, in bytes).
*/
@ -526,6 +531,12 @@ struct ReorderBuffer
*/
dlist_head txns_by_base_snapshot_lsn;
/*
* Transactions and subtransactions that have modified system catalogs.
*/
dlist_head catchange_txns;
int catchange_ntxns;
/*
* one-entry sized cache for by_txn. Very frequently the same txn gets
* looked up over and over again.
@ -677,6 +688,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);

View File

@ -82,7 +82,7 @@ extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,
TransactionId *subxacts);
TransactionId *subxacts, uint32 xinfo);
extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
XLogRecPtr lsn);
extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,