diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a5f9b81a82..65a91a8014 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -51,7 +51,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(REGRESSCHECKS) ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer + oldest_xmin snapshot_transfer subxact_without_top isolationcheck: | submake-isolation submake-test_decoding temp-install $(pg_isolation_regress_check) \ diff --git a/contrib/test_decoding/expected/subxact_without_top.out b/contrib/test_decoding/expected/subxact_without_top.out new file mode 100644 index 0000000000..99ce998822 --- /dev/null +++ b/contrib/test_decoding/expected/subxact_without_top.out @@ -0,0 +1,39 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_begin s0_first_subxact s2_checkpoint s1_begin s1_dml s0_many_subxacts s0_commit s2_checkpoint s2_get_changes_suppress_output s2_get_changes_suppress_output s1_commit s2_get_changes +step s0_begin: BEGIN; +step s0_first_subxact: + DO LANGUAGE plpgsql $$ + BEGIN + BEGIN + INSERT INTO harvest VALUES (41); + EXCEPTION WHEN OTHERS THEN RAISE; + END; + END $$; + +step s2_checkpoint: CHECKPOINT; +step s1_begin: BEGIN; +step s1_dml: INSERT INTO harvest VALUES (43); +step s0_many_subxacts: select subxacts(); +subxacts + + +step s0_commit: COMMIT; +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_suppress_output: SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n; +n + + +step s2_get_changes_suppress_output: SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n; +n + +step s1_commit: COMMIT; +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.harvest: INSERT: apples[integer]:43 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/subxact_without_top.spec b/contrib/test_decoding/specs/subxact_without_top.spec new file mode 100644 index 0000000000..76688c75e0 --- /dev/null +++ b/contrib/test_decoding/specs/subxact_without_top.spec @@ -0,0 +1,63 @@ +# Test decoding of subtransactions whose top-transaction is before restart +# point. Such transactions won't be streamed as we stream only complete +# transactions, but it is good to test that they don't cause any problem. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact + CREATE TABLE harvest(apples integer); + CREATE OR REPLACE FUNCTION subxacts() returns void as $$ + BEGIN + FOR i in 1 .. 128 LOOP + BEGIN + INSERT INTO harvest VALUES (42); + EXCEPTION + WHEN OTHERS THEN + RAISE; + END; + END LOOP; + END; $$LANGUAGE 'plpgsql'; +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_begin" { BEGIN; } +step "s0_first_subxact" { + DO LANGUAGE plpgsql $$ + BEGIN + BEGIN + INSERT INTO harvest VALUES (41); + EXCEPTION WHEN OTHERS THEN RAISE; + END; + END $$; +} +step "s0_many_subxacts" { select subxacts(); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_begin" { BEGIN; } +step "s1_dml" { INSERT INTO harvest VALUES (43); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } +step "s2_get_changes_suppress_output" { SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n; } + +# The first checkpoint establishes the potential restart point (aka +# restart_lsn) for the slot after the initial subxact. The second checkpoint +# followed by get_changes will ensure that the potential restart point will +# become the actual restart point. We do get_changes twice because if one +# more xl_running_xacts record had slipped before our s0_commit, then the +# potential restart point won't become actual restart point. The s1's open +# transaction till get_changes holds the potential restart point to our first +# checkpoint location. +permutation "s0_begin" "s0_first_subxact" "s2_checkpoint" "s1_begin" "s1_dml" "s0_many_subxacts" "s0_commit" "s2_checkpoint" "s2_get_changes_suppress_output" "s2_get_changes_suppress_output" "s1_commit" "s2_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index cbef6f0f44..1a4b87c419 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -779,9 +779,6 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); - if (new_top && !new_sub) - elog(ERROR, "subtransaction logged without previous top-level txn record"); - if (!new_sub) { if (subtxn->is_known_as_subxact)