From 8fccf75834c11798e3b13a4cebedfb8ae683857a Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 13 Oct 2020 08:30:35 +0530 Subject: [PATCH] Add tests for logical replication spilled stats. Commit 9868167500 added a mechanism to track statistics corresponding to the spilling of changes from ReorderBuffer but didn't add any tests. Author: Amit Kapila and Sawada Masahiko Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/stats.out | 109 +++++++++++++++++++++++ contrib/test_decoding/sql/stats.sql | 62 +++++++++++++ 3 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/stats.out create mode 100644 contrib/test_decoding/sql/stats.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f23f15b04d..9a4c76f013 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream + spill slot truncate stream stats ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out new file mode 100644 index 0000000000..c4464f49da --- /dev/null +++ b/contrib/test_decoding/expected/stats.out @@ -0,0 +1,109 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE stats_test(data text); +-- function to wait for counters to advance +CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$ +DECLARE + start_time timestamptz := clock_timestamp(); + updated bool; +BEGIN + -- we don't want to wait forever; loop will exit after 30 seconds + FOR i IN 1 .. 300 LOOP + + -- check to see if all updates have been reset/updated + SELECT CASE WHEN check_reset THEN (spill_txns = 0) + ELSE (spill_txns > 0) + END + INTO updated + FROM pg_stat_replication_slots WHERE name='regression_slot'; + + exit WHEN updated; + + -- wait a little + perform pg_sleep_for('100 milliseconds'); + + -- reset stats snapshot so we can test again + perform pg_stat_clear_snapshot(); + + END LOOP; + + -- report time waited in postmaster log (where it won't change test output) + RAISE LOG 'wait_for_decode_stats delayed % seconds', + extract(epoch from clock_timestamp() - start_time); +END +$$ LANGUAGE plpgsql; +-- spilling the xact +BEGIN; +INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i); +COMMIT; +SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL); + count +------- + 5006 +(1 row) + +-- check stats, wait for stats collector to update. +SELECT wait_for_decode_stats(false); + wait_for_decode_stats +----------------------- + +(1 row) + +SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots; + name | spill_txns | spill_count +-----------------+------------+------------- + regression_slot | 1 | 12 +(1 row) + +-- reset the slot stats, and wait for stats collector to reset +SELECT pg_stat_reset_replication_slot('regression_slot'); + pg_stat_reset_replication_slot +-------------------------------- + +(1 row) + +SELECT wait_for_decode_stats(true); + wait_for_decode_stats +----------------------- + +(1 row) + +SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots; + name | spill_txns | spill_count +-----------------+------------+------------- + regression_slot | 0 | 0 +(1 row) + +-- decode and check stats again. +SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL); + count +------- + 5006 +(1 row) + +SELECT wait_for_decode_stats(false); + wait_for_decode_stats +----------------------- + +(1 row) + +SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots; + name | spill_txns | spill_count +-----------------+------------+------------- + regression_slot | 1 | 12 +(1 row) + +DROP FUNCTION wait_for_decode_stats(bool); +DROP TABLE stats_test; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql new file mode 100644 index 0000000000..7d406f0b11 --- /dev/null +++ b/contrib/test_decoding/sql/stats.sql @@ -0,0 +1,62 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE stats_test(data text); + +-- function to wait for counters to advance +CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$ +DECLARE + start_time timestamptz := clock_timestamp(); + updated bool; +BEGIN + -- we don't want to wait forever; loop will exit after 30 seconds + FOR i IN 1 .. 300 LOOP + + -- check to see if all updates have been reset/updated + SELECT CASE WHEN check_reset THEN (spill_txns = 0) + ELSE (spill_txns > 0) + END + INTO updated + FROM pg_stat_replication_slots WHERE name='regression_slot'; + + exit WHEN updated; + + -- wait a little + perform pg_sleep_for('100 milliseconds'); + + -- reset stats snapshot so we can test again + perform pg_stat_clear_snapshot(); + + END LOOP; + + -- report time waited in postmaster log (where it won't change test output) + RAISE LOG 'wait_for_decode_stats delayed % seconds', + extract(epoch from clock_timestamp() - start_time); +END +$$ LANGUAGE plpgsql; + +-- spilling the xact +BEGIN; +INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i); +COMMIT; +SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL); + +-- check stats, wait for stats collector to update. +SELECT wait_for_decode_stats(false); +SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots; + +-- reset the slot stats, and wait for stats collector to reset +SELECT pg_stat_reset_replication_slot('regression_slot'); +SELECT wait_for_decode_stats(true); +SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots; + +-- decode and check stats again. +SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL); +SELECT wait_for_decode_stats(false); +SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots; + +DROP FUNCTION wait_for_decode_stats(bool); +DROP TABLE stats_test; +SELECT pg_drop_replication_slot('regression_slot');