-- predictability SET synchronous_commit = on; -- superuser required by default CREATE ROLE regress_origin_replication REPLICATION; SET ROLE regress_origin_replication; SELECT pg_replication_origin_advance('regress_test_decoding: perm', '0/1'); ERROR: permission denied for function pg_replication_origin_advance SELECT pg_replication_origin_create('regress_test_decoding: perm'); ERROR: permission denied for function pg_replication_origin_create SELECT pg_replication_origin_drop('regress_test_decoding: perm'); ERROR: permission denied for function pg_replication_origin_drop SELECT pg_replication_origin_oid('regress_test_decoding: perm'); ERROR: permission denied for function pg_replication_origin_oid SELECT pg_replication_origin_progress('regress_test_decoding: perm', false); ERROR: permission denied for function pg_replication_origin_progress SELECT pg_replication_origin_session_is_setup(); ERROR: permission denied for function pg_replication_origin_session_is_setup SELECT pg_replication_origin_session_progress(false); ERROR: permission denied for function pg_replication_origin_session_progress SELECT pg_replication_origin_session_reset(); ERROR: permission denied for function pg_replication_origin_session_reset SELECT pg_replication_origin_session_setup('regress_test_decoding: perm'); ERROR: permission denied for function pg_replication_origin_session_setup SELECT pg_replication_origin_xact_reset(); ERROR: permission denied for function pg_replication_origin_xact_reset SELECT pg_replication_origin_xact_setup('0/1', '2013-01-01 00:00'); ERROR: permission denied for function pg_replication_origin_xact_setup SELECT pg_show_replication_origin_status(); ERROR: permission denied for function pg_show_replication_origin_status RESET ROLE; DROP ROLE regress_origin_replication; CREATE TABLE origin_tbl(id serial primary key, data text); CREATE TABLE target_tbl(id serial primary key, data text); SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); pg_replication_origin_create ------------------------------ 1 (1 row) -- ensure duplicate creations fail SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index" DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists. --ensure deletions work (once) SELECT pg_replication_origin_create('regress_test_decoding: temp'); pg_replication_origin_create ------------------------------ 2 (1 row) SELECT pg_replication_origin_drop('regress_test_decoding: temp'); pg_replication_origin_drop ---------------------------- (1 row) SELECT pg_replication_origin_drop('regress_test_decoding: temp'); ERROR: replication origin "regress_test_decoding: temp" does not exist -- specifying reserved origin names is not supported SELECT pg_replication_origin_create('any'); ERROR: replication origin name "any" is reserved DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved. SELECT pg_replication_origin_create('none'); ERROR: replication origin name "none" is reserved DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved. SELECT pg_replication_origin_create('pg_replication_origin'); ERROR: replication origin name "pg_replication_origin" is reserved DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved. -- various failure checks for undefined slots select pg_replication_origin_advance('regress_test_decoding: temp', '0/1'); ERROR: replication origin "regress_test_decoding: temp" does not exist select pg_replication_origin_session_setup('regress_test_decoding: temp'); ERROR: replication origin "regress_test_decoding: temp" does not exist select pg_replication_origin_progress('regress_test_decoding: temp', true); ERROR: replication origin "regress_test_decoding: temp" does not exist SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); ?column? ---------- init (1 row) -- origin tx INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); INSERT INTO target_tbl(data) SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- as is normal, the insert into target_tbl shows up SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- BEGIN table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN' table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again''' table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT' COMMIT (5 rows) INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); -- mark session as replaying SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot'); pg_replication_origin_session_setup ------------------------------------- (1 row) -- ensure we prevent duplicate setup SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot'); ERROR: cannot setup replication origin when one is already setup SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded'); ?column? ---------- (1 row) BEGIN; -- setup transaction origin SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); pg_replication_origin_xact_setup ---------------------------------- (1 row) INSERT INTO target_tbl(data) SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); COMMIT; -- check replication progress for the session is correct SELECT pg_replication_origin_session_progress(false); pg_replication_origin_session_progress ---------------------------------------- 0/AABBCCDD (1 row) SELECT pg_replication_origin_session_progress(true); pg_replication_origin_session_progress ---------------------------------------- 0/AABBCCDD (1 row) SELECT pg_replication_origin_session_reset(); pg_replication_origin_session_reset ------------------------------------- (1 row) SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status; local_id | external_id | remote_lsn | ?column? ----------+----------------------------------------+------------+---------- 1 | regress_test_decoding: regression_slot | 0/AABBCCDD | t (1 row) -- check replication progress identified by name is correct SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', false); pg_replication_origin_progress -------------------------------- 0/AABBCCDD (1 row) SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', true); pg_replication_origin_progress -------------------------------- 0/AABBCCDD (1 row) -- ensure reset requires previously setup state SELECT pg_replication_origin_session_reset(); ERROR: no replication origin is configured -- and magically the replayed xact will be filtered! SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); data ------ (0 rows) --but new original changes still show up INSERT INTO origin_tbl(data) VALUES ('will be replicated'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); data -------------------------------------------------------------------------------- BEGIN table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated' COMMIT (3 rows) SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot'); pg_replication_origin_drop ---------------------------- (1 row) -- Set of transactions with no origin LSNs and commit timestamps set for -- this session. SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_no_lsn', 'test_decoding'); ?column? ---------- init (1 row) SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_no_lsn'); pg_replication_origin_create ------------------------------ 1 (1 row) -- mark session as replaying SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_no_lsn'); pg_replication_origin_session_setup ------------------------------------- (1 row) -- Simple transactions BEGIN; INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit'); COMMIT; BEGIN; INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback'); ROLLBACK; -- 2PC transactions BEGIN; INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit prepared'); PREPARE TRANSACTION 'replorigin_prepared'; COMMIT PREPARED 'replorigin_prepared'; BEGIN; INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback prepared'); PREPARE TRANSACTION 'replorigin_prepared'; ROLLBACK PREPARED 'replorigin_prepared'; SELECT local_id, external_id, remote_lsn <> '0/0' AS valid_remote_lsn, local_lsn <> '0/0' AS valid_local_lsn FROM pg_replication_origin_status; local_id | external_id | valid_remote_lsn | valid_local_lsn ----------+-----------------------------------------------+------------------+----------------- 1 | regress_test_decoding: regression_slot_no_lsn | f | t (1 row) SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); data ------------------------------------------------------------------------------------- BEGIN table public.origin_tbl: INSERT: id[integer]:4 data[text]:'no_lsn, commit' COMMIT BEGIN table public.origin_tbl: INSERT: id[integer]:6 data[text]:'no_lsn, commit prepared' COMMIT (6 rows) -- Clean up SELECT pg_replication_origin_session_reset(); pg_replication_origin_session_reset ------------------------------------- (1 row) SELECT pg_drop_replication_slot('regression_slot_no_lsn'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn'); pg_replication_origin_drop ---------------------------- (1 row)