diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index dc4f9b7018..1d75cf5af0 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -42,3 +42,48 @@ COMMIT stop (1 row) + +starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_insert2 s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO user_cat VALUES (1); +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +table public.user_cat: INSERT: val1[integer]:1 +COMMIT +(4 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index 2971ddc69c..2ad1edeaa8 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -4,11 +4,13 @@ setup { DROP TABLE IF EXISTS tbl1; CREATE TABLE tbl1 (val1 integer, val2 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); } teardown { DROP TABLE tbl1; + DROP TABLE user_cat; SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); } @@ -19,6 +21,7 @@ step "s0_begin" { BEGIN; } step "s0_savepoint" { SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_insert2" { INSERT INTO user_cat VALUES (1); } step "s0_commit" { COMMIT; } session "s1" @@ -37,3 +40,16 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_ # record written by bgwriter. One might think we can either stop the bgwriter or # increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# Test that we can handle the case where there is no association between top-level +# transaction and its subtransactions. The last decoding restarts from the first +# checkpoint, decodes NEW_CID generated by "s0_insert2", and marks the subtransaction +# as containing catalog changes while adding tuple cids to its top-level transaction. +# During that, both transaction entries are created in ReorderBuffer as top-level +# transactions and have the same LSN. We check if the assertion check for the order +# of transaction LSNs in AssertTXNLsnOrder() is skipped since we are still before the +# LSN at which we start replaying the contents of transactions. Besides, when decoding +# the commit record of the top-level transaction, we must force the top-level +# transaction to do timetravel since one of its subtransactions has been marked as +# containing catalog changes. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8971df140b..5d42283b56 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -878,10 +878,24 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb) { #ifdef USE_ASSERT_CHECKING + LogicalDecodingContext *ctx = rb->private_data; dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; + /* + * Skip the verification if we don't reach the LSN at which we start + * decoding the contents of transactions yet because until we reach the + * LSN, we could have transactions that don't have the association between + * the top-level transaction and subtransaction yet and consequently have + * the same LSN. We don't guarantee this association until we try to + * decode the actual contents of transaction. The ordering of the records + * prior to the start_decoding_at LSN should have been checked before the + * restart. + */ + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr)) + return; + dlist_foreach(iter, &rb->toplevel_by_lsn) { ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 385817e295..80b7d86e79 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1101,6 +1101,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, else if (sub_needs_timetravel) { /* track toplevel txn as well, subxact alone isn't meaningful */ + elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions", + xid); + needs_timetravel = true; SnapBuildAddCommittedTxn(builder, xid); } else if (needs_timetravel)