-- Test streaming of two-phase commits SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); ?column? ---------- init (1 row) CREATE TABLE stream_test(data text); -- consume DDL SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------ (0 rows) -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED BEGIN; SAVEPOINT s1; SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); ?column? ---------- msg5 (1 row) INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); TRUNCATE table stream_test; ROLLBACK TO s1; INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); PREPARE TRANSACTION 'test1'; -- should show the inserts after a ROLLBACK SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ---------------------------------------------------------- streaming message: transactional: 1 prefix: test, sz: 50 opening a streamed block for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction streaming change for transaction closing a streamed block for transaction preparing streamed transaction 'test1' (24 rows) COMMIT PREPARED 'test1'; --should show the COMMIT PREPARED and the other changes in the transaction SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ------------------------- COMMIT PREPARED 'test1' (1 row) -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with -- filtered gid. gids with '_nodecode' will not be decoded at prepare time. BEGIN; SAVEPOINT s1; SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); ?column? ---------- msg5 (1 row) INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); TRUNCATE table stream_test; ROLLBACK to s1; INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); PREPARE TRANSACTION 'test1_nodecode'; -- should NOT show inserts after a ROLLBACK SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ---------------------------------------------------------- streaming message: transactional: 1 prefix: test, sz: 50 (1 row) COMMIT PREPARED 'test1_nodecode'; -- should show the inserts but not show a COMMIT PREPARED but a COMMIT SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ------------------------------------------------------------- BEGIN table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19' table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20' COMMIT (22 rows) DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- (1 row)