From 4fbe6096b954092abc460adaddd365d5049d40f1 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 21 Oct 2022 09:52:44 +0530 Subject: [PATCH] Fix executing invalidation messages generated by subtransactions during decoding. This problem has been introduced by commit 272248a0c1 where we started assigning the subtransactions to the top-level transaction when we mark both the top-level transaction and its subtransactions as containing catalog changes. After we assign subtransactions to the top-level transaction, we were not allowed to execute any invalidations associated with it when we decide to skip the transaction. The reason to assign the subtransactions to the top-level transaction was to avoid the assertion failure in AssertTXNLsnOrder() as they have the same LSN when we sometimes start accumulating transaction changes for partial transactions after the restart. Now that with commit 64ff0fe4e8, we skip this assertion check until we reach the LSN at which we start decoding the contents of the transaction, so, there is no reason for such an assignment anymore. The assignment change was introduced in 15 and prior versions but this bug doesn't exist in branches prior to 14 since we don't add invalidation messages to subtransactions. We decided to backpatch through 11 for consistency but not for 10 since its final release is near. Reported-by: Kuroda Hayato Author: Masahiko Sawada Reviewed-by: Amit Kapila Backpatch-through: 11 Discussion: https://postgr.es/m/TYAPR01MB58660803BCAA7849C8584AA4F57E9%40TYAPR01MB5866.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/a89b46b6-0239-2fd5-71a9-b19b1f7a7145%40enterprisedb.com --- .../expected/catalog_change_snapshot.out | 45 +++++++++++++++++++ .../specs/catalog_change_snapshot.spec | 7 +++ src/backend/replication/logical/snapbuild.c | 3 -- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index 1d75cf5af0..b33e49c0b1 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -87,3 +87,48 @@ COMMIT stop (1 row) + +starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_truncate s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_truncate: TRUNCATE tbl1; +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(4 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index 2ad1edeaa8..770dbd642d 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -53,3 +53,10 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s # transaction to do timetravel since one of its subtransactions has been marked as # containing catalog changes. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# The last decoding restarts from the first checkpoint and adds invalidation +# messages generated by "s0_truncate" to the subtransaction. While +# processing the commit record for the top-level transaction, we decide +# to skip this xact but ensure that corresponding invalidation messages +# get processed. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 8ba634aade..977440b942 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -2113,9 +2113,6 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); for (int i = 0; i < subxcnt; i++) - { - ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); - } } }