-- predictability SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding'); ?column? ---------- init (1 row) SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true); ?column? ---------- init (1 row) SELECT pg_drop_replication_slot('regression_slot_p'); pg_drop_replication_slot -------------------------- (1 row) SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false); ?column? ---------- init (1 row) SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true); ?column? ---------- init (1 row) SELECT pg_create_logical_replication_slot('foo', 'nonexistent'); ERROR: could not access file "nonexistent": No such file or directory -- here we want to start a new session and wait till old one is gone select pg_backend_pid() as oldpid \gset \c - SET synchronous_commit = on; do 'declare c int = 0; begin while (select count(*) from pg_replication_slots where active_pid = ' :'oldpid' ') > 0 loop c := c + 1; perform pg_sleep(0.01); end loop; raise log ''slot test looped % times'', c; end'; -- should fail because the temporary slots were dropped automatically SELECT pg_drop_replication_slot('regression_slot_t'); ERROR: replication slot "regression_slot_t" does not exist SELECT pg_drop_replication_slot('regression_slot_t2'); ERROR: replication slot "regression_slot_t2" does not exist -- permanent slot has survived SELECT pg_drop_replication_slot('regression_slot_p'); pg_drop_replication_slot -------------------------- (1 row) -- test switching between slots in a session SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); ?column? ---------- init (1 row) CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); BEGIN; INSERT INTO replication_example(somedata, text) VALUES (1, 1); INSERT INTO replication_example(somedata, text) VALUES (1, 2); COMMIT; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true); ?column? ---------- init (1 row) INSERT INTO replication_example(somedata, text) VALUES (1, 3); SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data --------------------------------------------------------------------------------------------------------- BEGIN table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:'1' table public.replication_example: INSERT: id[integer]:2 somedata[integer]:1 text[character varying]:'2' COMMIT BEGIN table public.replication_example: INSERT: id[integer]:3 somedata[integer]:1 text[character varying]:'3' COMMIT (7 rows) SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data --------------------------------------------------------------------------------------------------------- BEGIN table public.replication_example: INSERT: id[integer]:3 somedata[integer]:1 text[character varying]:'3' COMMIT (3 rows) INSERT INTO replication_example(somedata, text) VALUES (1, 4); INSERT INTO replication_example(somedata, text) VALUES (1, 5); SELECT pg_current_wal_lsn() AS wal_lsn \gset INSERT INTO replication_example(somedata, text) VALUES (1, 6); SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); slot_name ------------------ regression_slot2 (1 row) SELECT :'wal_lsn' = :'end_lsn'; ?column? ---------- t (1 row) SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data --------------------------------------------------------------------------------------------------------- BEGIN table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6' COMMIT (3 rows) SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------ (0 rows) DROP TABLE replication_example; -- error SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); ERROR: replication slot "regression_slot1" already exists -- both should error as they should be dropped on error SELECT pg_drop_replication_slot('regression_slot1'); ERROR: replication slot "regression_slot1" does not exist SELECT pg_drop_replication_slot('regression_slot2'); ERROR: replication slot "regression_slot2" does not exist -- slot advance with physical slot, error with non-reserved slot SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3'); slot_name ------------------ regression_slot3 (1 row) SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN ERROR: invalid target WAL LSN SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error ERROR: cannot advance replication slot that has not previously reserved WAL SELECT pg_drop_replication_slot('regression_slot3'); pg_drop_replication_slot -------------------------- (1 row) -- -- Test copy functions for logical replication slots -- -- Create and copy logical slots SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); ?column? ---------- init (1 row) SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); ?column? ---------- copy (1 row) SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput'); ?column? ---------- copy (1 row) SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput'); ?column? ---------- copy (1 row) -- Check all copied slots status SELECT o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn WHERE o.slot_name != c.slot_name ORDER BY o.slot_name, c.slot_name; slot_name | plugin | temporary | slot_name | plugin | temporary ------------+---------------+-----------+---------------------------------+---------------+----------- orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f (3 rows) -- Now we have maximum 4 replication slots. Check slots are properly -- released even when raise error during creating the target slot. SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error ERROR: all replication slots are in use HINT: Free one or increase max_replication_slots. -- temporary slots were dropped automatically SELECT pg_drop_replication_slot('orig_slot1'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_drop_replication_slot('copied_slot1_no_change'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); pg_drop_replication_slot -------------------------- (1 row) -- Test based on the temporary logical slot SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); ?column? ---------- init (1 row) SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); ?column? ---------- copy (1 row) SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput'); ?column? ---------- copy (1 row) SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput'); ?column? ---------- copy (1 row) -- Check all copied slots status SELECT o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn WHERE o.slot_name != c.slot_name ORDER BY o.slot_name, c.slot_name; slot_name | plugin | temporary | slot_name | plugin | temporary ------------+---------------+-----------+---------------------------------+---------------+----------- orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t (3 rows) -- Cannot copy a logical slot to a physical slot SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error ERROR: cannot copy physical replication slot "orig_slot2" as a logical replication slot -- temporary slots were dropped automatically SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); pg_drop_replication_slot -------------------------- (1 row) -- -- Test copy functions for physical replication slots -- -- Create and copy physical slots SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); ?column? ---------- init (1 row) SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); ?column? ---------- init (1 row) SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); ?column? ---------- copy (1 row) SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); ?column? ---------- copy (1 row) -- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. SELECT slot_name, slot_type, temporary FROM pg_replication_slots; slot_name | slot_type | temporary ------------------------+-----------+----------- orig_slot1 | physical | f orig_slot2 | physical | f copied_slot1_no_change | physical | f copied_slot1_temp | physical | t (4 rows) -- Cannot copy a physical slot to a logical slot SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error ERROR: cannot copy logical replication slot "orig_slot1" as a physical replication slot -- Cannot copy a physical slot that doesn't reserve WAL SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error ERROR: cannot copy a replication slot that doesn't reserve WAL -- temporary slots were dropped automatically SELECT pg_drop_replication_slot('orig_slot1'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_drop_replication_slot('orig_slot2'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_drop_replication_slot('copied_slot1_no_change'); pg_drop_replication_slot -------------------------- (1 row) -- Test based on the temporary physical slot SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); ?column? ---------- init (1 row) SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); ?column? ---------- copy (1 row) SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); ?column? ---------- copy (1 row) -- Check all copied slots status SELECT o.slot_name, o.temporary, c.slot_name, c.temporary FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn WHERE o.slot_name != c.slot_name ORDER BY o.slot_name, c.slot_name; slot_name | temporary | slot_name | temporary ------------+-----------+------------------------+----------- orig_slot2 | t | copied_slot2_no_change | t orig_slot2 | t | copied_slot2_notemp | f (2 rows) SELECT pg_drop_replication_slot('orig_slot2'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_drop_replication_slot('copied_slot2_no_change'); pg_drop_replication_slot -------------------------- (1 row) SELECT pg_drop_replication_slot('copied_slot2_notemp'); pg_drop_replication_slot -------------------------- (1 row)