postgresql/contrib/test_decoding/expected/twophase_stream.out
Amit Kapila 19890a064e Add option to enable two_phase commits via pg_create_logical_replication_slot.
Commit 0aa8a01d04 extends the output plugin API to allow decoding of
prepared xacts and allowed the user to enable/disable the two-phase option
via pg_logical_slot_get_changes(). This can lead to a problem such that
the first time when it gets changes via pg_logical_slot_get_changes()
without two_phase option enabled it will not get the prepared even though
prepare is after consistent snapshot. Now next time during getting changes,
if the two_phase option is enabled it can skip prepare because by that
time start decoding point has been moved. So the user will only get commit
prepared.

Allow to enable/disable this option at the create slot time and default
will be false. It will break the existing slots which is fine in a major
release.

Author: Ajin Cherian
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
2021-03-03 07:34:11 +05:30

126 lines
4.9 KiB
Plaintext

-- Test streaming of two-phase commits
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
?column?
----------
init
(1 row)
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
preparing streamed transaction 'test1'
(24 rows)
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------
COMMIT PREPARED 'test1'
(1 row)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
(1 row)
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
COMMIT
(22 rows)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)