Fix catalog lookup due to wrong snapshot for subtransactions during decoding.

In commit 272248a0c, we fixed the catalog lookup due to the wrong snapshot
for transactions and subtransactions during decoding. We failed to
consider the case where top-level xact is already marked as containing
catalog change but its subtransaction is not yet marked as containing
catalog change even though it contained such a change.

This can happen when during decoding, none of the WAL records from the
subtransaction was decoded and top-level xact contains a DDL.

We fix it by marking the transaction and all its subtransactions as
containing catalog changes if the top-level xact contains any catalog
change and it is present in the initial running xacts array.

This fix is required only for 14 and 15 because in prior branches we
already always mark the transaction and all its subtransactions as
containing catalog changes in the same case. In 16 and above, we preserve
the list of transaction IDs and sub-transaction IDs, that have modified
catalogs and are running during snapshot serialization, to the serialized
snapshot (see commit 7f13ac8123).

Author: Fei Changhong
Reviewed-by: Amit Kapila, Hayato Kuroda, Andy Fan
Discussion: https://postgr.es/m/18280-4c8060178cb41750@postgresql.org
This commit is contained in:
Amit Kapila 2024-01-29 10:42:41 +05:30
parent 8b34cff338
commit b793a416bf
3 changed files with 65 additions and 4 deletions

View File

@ -132,3 +132,47 @@ COMMIT
stop
(1 row)
starting permutation: s0_init s0_begin s0_savepoint s0_create_part1 s0_savepoint_release s1_checkpoint s0_create_part2 s0_commit s0_begin s0_truncate s1_checkpoint s1_get_changes s0_insert_part s1_get_changes s0_commit s1_get_changes
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
?column?
--------
init
(1 row)
step s0_begin: BEGIN;
step s0_savepoint: SAVEPOINT sp1;
step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10);
step s0_savepoint_release: RELEASE SAVEPOINT sp1;
step s1_checkpoint: CHECKPOINT;
step s0_create_part2: CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20);
step s0_commit: COMMIT;
step s0_begin: BEGIN;
step s0_truncate: TRUNCATE tbl1;
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
----
(0 rows)
step s0_insert_part: INSERT INTO tbl1_part VALUES (1);
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
----
(0 rows)
step s0_commit: COMMIT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
--------------------------------------------------
BEGIN
table public.tbl1: TRUNCATE: (no-flags)
table public.tbl1_part_p1: INSERT: val1[integer]:1
COMMIT
(4 rows)
?column?
--------
stop
(1 row)

View File

@ -3,13 +3,16 @@
setup
{
DROP TABLE IF EXISTS tbl1;
DROP TABLE IF EXISTS tbl1_part;
CREATE TABLE tbl1 (val1 integer, val2 integer);
CREATE TABLE tbl1_part (val1 integer) PARTITION BY RANGE (val1);
CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true);
}
teardown
{
DROP TABLE tbl1;
DROP TABLE tbl1_part;
DROP TABLE user_cat;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
@ -19,9 +22,13 @@ setup { SET synchronous_commit=on; }
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
step "s0_begin" { BEGIN; }
step "s0_savepoint" { SAVEPOINT sp1; }
step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; }
step "s0_truncate" { TRUNCATE tbl1; }
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
step "s0_insert_part" { INSERT INTO tbl1_part VALUES (1); }
step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); }
step "s0_create_part2" { CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20); }
step "s0_commit" { COMMIT; }
session "s1"
@ -60,3 +67,11 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_
# to skip this xact but ensure that corresponding invalidation messages
# get processed.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
# The last decoding restarts from the first checkpoint and doesn't decode
# any WAL records generated by the subtransaction that performed s0_create_part1.
# While processing the commit record for the corresponding top-level transaction
# which will be marked as containing catalog change even before commit, we ensure
# that the corresponding substransaction is also marked as containing a catalog
# modifying change.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s1_checkpoint" "s0_create_part2" "s0_commit" "s0_begin" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_insert_part" "s1_get_changes" "s0_commit" "s1_get_changes"

View File

@ -2126,11 +2126,13 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt
TransactionId *subxacts, XLogRecPtr lsn)
{
/*
* Skip if there is no initial running xacts information or the
* transaction is already marked as containing catalog changes.
* Skip if there is no initial running xacts information.
*
* Even if the transaction has been marked as containing catalog
* changes, it cannot be skipped because its subtransactions that
* modified the catalog may not be marked.
*/
if (NInitialRunningXacts == 0 ||
ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
if (NInitialRunningXacts == 0)
return;
if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,