930 lines
31 KiB
Perl
930 lines
31 KiB
Perl
# Copyright (c) 2023-2024, PostgreSQL Global Development Group
|
|
|
|
# logical decoding on standby : test logical decoding,
|
|
# recovery conflict and standby promotion.
|
|
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
|
|
use PostgreSQL::Test::Cluster;
|
|
use PostgreSQL::Test::Utils;
|
|
use Test::More;
|
|
|
|
my ($stdin, $stdout, $stderr,
|
|
$cascading_stdout, $cascading_stderr, $subscriber_stdin,
|
|
$subscriber_stdout, $subscriber_stderr, $ret,
|
|
$handle, $slot);
|
|
|
|
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
|
|
my $node_standby = PostgreSQL::Test::Cluster->new('standby');
|
|
my $node_cascading_standby =
|
|
PostgreSQL::Test::Cluster->new('cascading_standby');
|
|
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
|
|
my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
|
|
my $psql_timeout = IPC::Run::timer($default_timeout);
|
|
my $res;
|
|
|
|
# Name for the physical slot on primary
|
|
my $primary_slotname = 'primary_physical';
|
|
my $standby_physical_slotname = 'standby_physical';
|
|
|
|
# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
|
|
# given boolean condition to be true to ensure we've reached a quiescent state.
|
|
sub wait_for_xmins
|
|
{
|
|
my ($node, $slotname, $check_expr) = @_;
|
|
|
|
$node->poll_query_until(
|
|
'postgres', qq[
|
|
SELECT $check_expr
|
|
FROM pg_catalog.pg_replication_slots
|
|
WHERE slot_name = '$slotname';
|
|
]) or die "Timed out waiting for slot xmins to advance";
|
|
}
|
|
|
|
# Create the required logical slots on standby.
|
|
sub create_logical_slots
|
|
{
|
|
my ($node, $slot_prefix) = @_;
|
|
|
|
my $active_slot = $slot_prefix . 'activeslot';
|
|
my $inactive_slot = $slot_prefix . 'inactiveslot';
|
|
$node->create_logical_slot_on_standby($node_primary, qq($inactive_slot),
|
|
'testdb');
|
|
$node->create_logical_slot_on_standby($node_primary, qq($active_slot),
|
|
'testdb');
|
|
}
|
|
|
|
# Drop the logical slots on standby.
|
|
sub drop_logical_slots
|
|
{
|
|
my ($slot_prefix) = @_;
|
|
my $active_slot = $slot_prefix . 'activeslot';
|
|
my $inactive_slot = $slot_prefix . 'inactiveslot';
|
|
|
|
$node_standby->psql('postgres',
|
|
qq[SELECT pg_drop_replication_slot('$inactive_slot')]);
|
|
$node_standby->psql('postgres',
|
|
qq[SELECT pg_drop_replication_slot('$active_slot')]);
|
|
}
|
|
|
|
# Acquire one of the standby logical slots created by create_logical_slots().
|
|
# In case wait is true we are waiting for an active pid on the 'activeslot' slot.
|
|
# If wait is not true it means we are testing a known failure scenario.
|
|
sub make_slot_active
|
|
{
|
|
my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
|
|
my $slot_user_handle;
|
|
|
|
my $active_slot = $slot_prefix . 'activeslot';
|
|
$slot_user_handle = IPC::Run::start(
|
|
[
|
|
'pg_recvlogical', '-d',
|
|
$node->connstr('testdb'), '-S',
|
|
qq($active_slot), '-o',
|
|
'include-xids=0', '-o',
|
|
'skip-empty-xacts=1', '--no-loop',
|
|
'--start', '-f',
|
|
'-'
|
|
],
|
|
'>',
|
|
$to_stdout,
|
|
'2>',
|
|
$to_stderr);
|
|
|
|
if ($wait)
|
|
{
|
|
# make sure activeslot is in use
|
|
$node->poll_query_until('testdb',
|
|
qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)]
|
|
) or die "slot never became active";
|
|
}
|
|
return $slot_user_handle;
|
|
}
|
|
|
|
# Check pg_recvlogical stderr
|
|
sub check_pg_recvlogical_stderr
|
|
{
|
|
my ($slot_user_handle, $check_stderr) = @_;
|
|
my $return;
|
|
|
|
# our client should've terminated in response to the walsender error
|
|
$slot_user_handle->finish;
|
|
$return = $?;
|
|
cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
|
|
if ($return)
|
|
{
|
|
like($stderr, qr/$check_stderr/, 'slot has been invalidated');
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
# Check if all the slots on standby are dropped. These include the 'activeslot'
|
|
# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
|
|
sub check_slots_dropped
|
|
{
|
|
my ($slot_prefix, $slot_user_handle) = @_;
|
|
|
|
is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'},
|
|
'', 'inactiveslot on standby dropped');
|
|
is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'},
|
|
'', 'activeslot on standby dropped');
|
|
|
|
check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery");
|
|
}
|
|
|
|
# Change hot_standby_feedback and check xmin and catalog_xmin values.
|
|
sub change_hot_standby_feedback_and_wait_for_xmins
|
|
{
|
|
my ($hsf, $invalidated) = @_;
|
|
|
|
$node_standby->append_conf(
|
|
'postgresql.conf', qq[
|
|
hot_standby_feedback = $hsf
|
|
]);
|
|
|
|
$node_standby->reload;
|
|
|
|
if ($hsf && $invalidated)
|
|
{
|
|
# With hot_standby_feedback on, xmin should advance,
|
|
# but catalog_xmin should still remain NULL since there is no logical slot.
|
|
wait_for_xmins($node_primary, $primary_slotname,
|
|
"xmin IS NOT NULL AND catalog_xmin IS NULL");
|
|
}
|
|
elsif ($hsf)
|
|
{
|
|
# With hot_standby_feedback on, xmin and catalog_xmin should advance.
|
|
wait_for_xmins($node_primary, $primary_slotname,
|
|
"xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
|
|
}
|
|
else
|
|
{
|
|
# Both should be NULL since hs_feedback is off
|
|
wait_for_xmins($node_primary, $primary_slotname,
|
|
"xmin IS NULL AND catalog_xmin IS NULL");
|
|
|
|
}
|
|
}
|
|
|
|
# Check conflict_reason in pg_replication_slots.
|
|
sub check_slots_conflict_reason
|
|
{
|
|
my ($slot_prefix, $reason) = @_;
|
|
|
|
my $active_slot = $slot_prefix . 'activeslot';
|
|
my $inactive_slot = $slot_prefix . 'inactiveslot';
|
|
|
|
$res = $node_standby->safe_psql(
|
|
'postgres', qq(
|
|
select conflict_reason from pg_replication_slots where slot_name = '$active_slot';));
|
|
|
|
is($res, "$reason", "$active_slot conflict_reason is $reason");
|
|
|
|
$res = $node_standby->safe_psql(
|
|
'postgres', qq(
|
|
select conflict_reason from pg_replication_slots where slot_name = '$inactive_slot';));
|
|
|
|
is($res, "$reason", "$inactive_slot conflict_reason is $reason");
|
|
}
|
|
|
|
# Drop the slots, re-create them, change hot_standby_feedback,
|
|
# check xmin and catalog_xmin values, make slot active and reset stat.
|
|
sub reactive_slots_change_hfs_and_wait_for_xmins
|
|
{
|
|
my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;
|
|
|
|
# drop the logical slots
|
|
drop_logical_slots($previous_slot_prefix);
|
|
|
|
# create the logical slots
|
|
create_logical_slots($node_standby, $slot_prefix);
|
|
|
|
change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated);
|
|
|
|
$handle =
|
|
make_slot_active($node_standby, $slot_prefix, 1, \$stdout, \$stderr);
|
|
|
|
# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
|
|
$node_standby->psql('testdb', q[select pg_stat_reset();]);
|
|
}
|
|
|
|
# Check invalidation in the logfile and in pg_stat_database_conflicts
|
|
sub check_for_invalidation
|
|
{
|
|
my ($slot_prefix, $log_start, $test_name) = @_;
|
|
|
|
my $active_slot = $slot_prefix . 'activeslot';
|
|
my $inactive_slot = $slot_prefix . 'inactiveslot';
|
|
|
|
# message should be issued
|
|
ok( $node_standby->log_contains(
|
|
"invalidating obsolete replication slot \"$inactive_slot\"",
|
|
$log_start),
|
|
"inactiveslot slot invalidation is logged $test_name");
|
|
|
|
ok( $node_standby->log_contains(
|
|
"invalidating obsolete replication slot \"$active_slot\"",
|
|
$log_start),
|
|
"activeslot slot invalidation is logged $test_name");
|
|
|
|
# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
|
|
ok( $node_standby->poll_query_until(
|
|
'postgres',
|
|
"select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'",
|
|
't'),
|
|
'confl_active_logicalslot updated'
|
|
) or die "Timed out waiting confl_active_logicalslot to be updated";
|
|
}
|
|
|
|
########################
|
|
# Initialize primary node
|
|
########################
|
|
|
|
$node_primary->init(allows_streaming => 1, has_archiving => 1);
|
|
$node_primary->append_conf(
|
|
'postgresql.conf', q{
|
|
wal_level = 'logical'
|
|
max_replication_slots = 4
|
|
max_wal_senders = 4
|
|
});
|
|
$node_primary->dump_info;
|
|
$node_primary->start;
|
|
|
|
$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
|
|
|
|
$node_primary->safe_psql('testdb',
|
|
qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
|
|
);
|
|
|
|
# Check conflict_reason is NULL for physical slot
|
|
$res = $node_primary->safe_psql(
|
|
'postgres', qq[
|
|
SELECT conflict_reason is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
|
|
);
|
|
|
|
is($res, 't', "Physical slot reports conflict_reason as NULL");
|
|
|
|
my $backup_name = 'b1';
|
|
$node_primary->backup($backup_name);
|
|
|
|
# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
|
|
# WAL. An insert into flush_wal outside transaction does guarantee a flush.
|
|
$node_primary->psql('testdb', q[CREATE TABLE flush_wal();]);
|
|
|
|
#######################
|
|
# Initialize standby node
|
|
#######################
|
|
|
|
$node_standby->init_from_backup(
|
|
$node_primary, $backup_name,
|
|
has_streaming => 1,
|
|
has_restoring => 1);
|
|
$node_standby->append_conf(
|
|
'postgresql.conf',
|
|
qq[primary_slot_name = '$primary_slotname'
|
|
max_replication_slots = 5]);
|
|
$node_standby->start;
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
#######################
|
|
# Initialize subscriber node
|
|
#######################
|
|
$node_subscriber->init;
|
|
$node_subscriber->start;
|
|
|
|
my %psql_subscriber = (
|
|
'subscriber_stdin' => '',
|
|
'subscriber_stdout' => '',
|
|
'subscriber_stderr' => '');
|
|
$psql_subscriber{run} = IPC::Run::start(
|
|
[ 'psql', '-XA', '-f', '-', '-d', $node_subscriber->connstr('postgres') ],
|
|
'<',
|
|
\$psql_subscriber{subscriber_stdin},
|
|
'>',
|
|
\$psql_subscriber{subscriber_stdout},
|
|
'2>',
|
|
\$psql_subscriber{subscriber_stderr},
|
|
$psql_timeout);
|
|
|
|
##################################################
|
|
# Test that logical decoding on the standby
|
|
# behaves correctly.
|
|
##################################################
|
|
|
|
# create the logical slots
|
|
create_logical_slots($node_standby, 'behaves_ok_');
|
|
|
|
$node_primary->safe_psql('testdb',
|
|
qq[CREATE TABLE decoding_test(x integer, y text);]);
|
|
$node_primary->safe_psql('testdb',
|
|
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
|
|
);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
my $result = $node_standby->safe_psql('testdb',
|
|
qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, NULL);]
|
|
);
|
|
|
|
# test if basic decoding works
|
|
is(scalar(my @foobar = split /^/m, $result),
|
|
14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
|
|
|
|
# Insert some rows and verify that we get the same results from pg_recvlogical
|
|
# and the SQL interface.
|
|
$node_primary->safe_psql('testdb',
|
|
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
|
|
);
|
|
|
|
my $expected = q{BEGIN
|
|
table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
|
|
table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
|
|
table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
|
|
table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
|
|
COMMIT};
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
my $stdout_sql = $node_standby->safe_psql('testdb',
|
|
qq[SELECT data FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
|
|
);
|
|
|
|
is($stdout_sql, $expected, 'got expected output from SQL decoding session');
|
|
|
|
my $endpos = $node_standby->safe_psql('testdb',
|
|
"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
|
|
);
|
|
|
|
# Insert some rows after $endpos, which we won't read.
|
|
$node_primary->safe_psql('testdb',
|
|
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
|
|
);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
my $stdout_recv = $node_standby->pg_recvlogical_upto(
|
|
'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
|
|
'include-xids' => '0',
|
|
'skip-empty-xacts' => '1');
|
|
chomp($stdout_recv);
|
|
is($stdout_recv, $expected,
|
|
'got same expected output from pg_recvlogical decoding session');
|
|
|
|
$node_standby->poll_query_until('testdb',
|
|
"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)"
|
|
) or die "slot never became inactive";
|
|
|
|
$stdout_recv = $node_standby->pg_recvlogical_upto(
|
|
'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
|
|
'include-xids' => '0',
|
|
'skip-empty-xacts' => '1');
|
|
chomp($stdout_recv);
|
|
is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
|
|
|
|
$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
|
|
|
|
# Wait for catchup to ensure that the new database is visible to other sessions
|
|
# on the standby.
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
($result, $stdout, $stderr) = $node_standby->psql('otherdb',
|
|
"SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
|
|
);
|
|
ok( $stderr =~
|
|
m/replication slot "behaves_ok_activeslot" was not created in this database/,
|
|
"replaying logical slot from another database fails");
|
|
|
|
##################################################
|
|
# Test that we can subscribe on the standby with the publication
|
|
# created on the primary.
|
|
##################################################
|
|
|
|
# Create a table on the primary
|
|
$node_primary->safe_psql('postgres',
|
|
"CREATE TABLE tab_rep (a int primary key)");
|
|
|
|
# Create a table (same structure) on the subscriber node
|
|
$node_subscriber->safe_psql('postgres',
|
|
"CREATE TABLE tab_rep (a int primary key)");
|
|
|
|
# Create a publication on the primary
|
|
$node_primary->safe_psql('postgres',
|
|
"CREATE PUBLICATION tap_pub for table tab_rep");
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Subscribe on the standby
|
|
my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
|
|
|
|
# Not using safe_psql() here as it would wait for activity on the primary
|
|
# and we wouldn't be able to launch pg_log_standby_snapshot() on the primary
|
|
# while waiting.
|
|
# psql_subscriber() allows to not wait synchronously.
|
|
$psql_subscriber{subscriber_stdin} .= qq[CREATE SUBSCRIPTION tap_sub
|
|
CONNECTION '$standby_connstr'
|
|
PUBLICATION tap_pub
|
|
WITH (copy_data = off);];
|
|
$psql_subscriber{subscriber_stdin} .= "\n";
|
|
|
|
$psql_subscriber{run}->pump_nb();
|
|
|
|
# Speed up the subscription creation
|
|
$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
|
|
|
|
# Explicitly shut down psql instance gracefully - to avoid hangs
|
|
# or worse on windows
|
|
$psql_subscriber{subscriber_stdin} .= "\\q\n";
|
|
$psql_subscriber{run}->finish;
|
|
|
|
$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
|
|
|
|
# Insert some rows on the primary
|
|
$node_primary->safe_psql('postgres',
|
|
qq[INSERT INTO tab_rep select generate_series(1,10);]);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
$node_standby->wait_for_catchup('tap_sub');
|
|
|
|
# Check that the subscriber can see the rows inserted in the primary
|
|
$result =
|
|
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
|
|
is($result, qq(10), 'check replicated inserts after subscription on standby');
|
|
|
|
# We do not need the subscription and the subscriber anymore
|
|
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
|
|
$node_subscriber->stop;
|
|
|
|
##################################################
|
|
# Recovery conflict: Invalidate conflicting slots, including in-use slots
|
|
# Scenario 1: hot_standby_feedback off and vacuum FULL
|
|
##################################################
|
|
|
|
# One way to produce recovery conflict is to create/drop a relation and
|
|
# launch a vacuum full on pg_class with hot_standby_feedback turned off on
|
|
# the standby.
|
|
reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_',
|
|
0, 1);
|
|
|
|
# This should trigger the conflict
|
|
$node_primary->safe_psql(
|
|
'testdb', qq[
|
|
CREATE TABLE conflict_test(x integer, y text);
|
|
DROP TABLE conflict_test;
|
|
VACUUM full pg_class;
|
|
INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
|
|
]);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Check invalidation in the logfile and in pg_stat_database_conflicts
|
|
check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
|
|
|
|
# Verify conflict_reason is 'rows_removed' in pg_replication_slots
|
|
check_slots_conflict_reason('vacuum_full_', 'rows_removed');
|
|
|
|
$handle =
|
|
make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);
|
|
|
|
# We are not able to read from the slot as it has been invalidated
|
|
check_pg_recvlogical_stderr($handle,
|
|
"can no longer get changes from replication slot \"vacuum_full_activeslot\""
|
|
);
|
|
|
|
# Turn hot_standby_feedback back on
|
|
change_hot_standby_feedback_and_wait_for_xmins(1, 1);
|
|
|
|
##################################################
|
|
# Verify that invalidated logical slots stay invalidated across a restart.
|
|
##################################################
|
|
$node_standby->restart;
|
|
|
|
# Verify conflict_reason is retained across a restart.
|
|
check_slots_conflict_reason('vacuum_full_', 'rows_removed');
|
|
|
|
##################################################
|
|
# Verify that invalidated logical slots do not lead to retaining WAL.
|
|
##################################################
|
|
|
|
# Get the restart_lsn from an invalidated slot
|
|
my $restart_lsn = $node_standby->safe_psql('postgres',
|
|
"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflict_reason is not null;"
|
|
);
|
|
|
|
chomp($restart_lsn);
|
|
|
|
# As pg_walfile_name() can not be executed on the standby,
|
|
# get the WAL file name associated to this lsn from the primary
|
|
my $walfile_name = $node_primary->safe_psql('postgres',
|
|
"SELECT pg_walfile_name('$restart_lsn')");
|
|
|
|
chomp($walfile_name);
|
|
|
|
# Generate some activity and switch WAL file on the primary
|
|
$node_primary->advance_wal(1);
|
|
$node_primary->safe_psql('postgres', "checkpoint;");
|
|
|
|
# Wait for the standby to catch up
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Request a checkpoint on the standby to trigger the WAL file(s) removal
|
|
$node_standby->safe_psql('postgres', 'checkpoint;');
|
|
|
|
# Verify that the WAL file has not been retained on the standby
|
|
my $standby_walfile = $node_standby->data_dir . '/pg_wal/' . $walfile_name;
|
|
ok(!-f "$standby_walfile",
|
|
"invalidated logical slots do not lead to retaining WAL");
|
|
|
|
##################################################
|
|
# Recovery conflict: Invalidate conflicting slots, including in-use slots
|
|
# Scenario 2: conflict due to row removal with hot_standby_feedback off.
|
|
##################################################
|
|
|
|
# get the position to search from in the standby logfile
|
|
my $logstart = -s $node_standby->logfile;
|
|
|
|
# One way to produce recovery conflict is to create/drop a relation and
|
|
# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
|
|
reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_',
|
|
0, 1);
|
|
|
|
# This should trigger the conflict
|
|
$node_primary->safe_psql(
|
|
'testdb', qq[
|
|
CREATE TABLE conflict_test(x integer, y text);
|
|
DROP TABLE conflict_test;
|
|
VACUUM pg_class;
|
|
INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
|
|
]);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Check invalidation in the logfile and in pg_stat_database_conflicts
|
|
check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
|
|
|
|
# Verify conflict_reason is 'rows_removed' in pg_replication_slots
|
|
check_slots_conflict_reason('row_removal_', 'rows_removed');
|
|
|
|
$handle =
|
|
make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);
|
|
|
|
# We are not able to read from the slot as it has been invalidated
|
|
check_pg_recvlogical_stderr($handle,
|
|
"can no longer get changes from replication slot \"row_removal_activeslot\""
|
|
);
|
|
|
|
##################################################
|
|
# Recovery conflict: Same as Scenario 2 but on a shared catalog table
|
|
# Scenario 3: conflict due to row removal with hot_standby_feedback off.
|
|
##################################################
|
|
|
|
# get the position to search from in the standby logfile
|
|
$logstart = -s $node_standby->logfile;
|
|
|
|
# One way to produce recovery conflict is to create/drop a relation and
|
|
# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
|
|
reactive_slots_change_hfs_and_wait_for_xmins('row_removal_',
|
|
'shared_row_removal_', 0, 1);
|
|
|
|
# Trigger the conflict
|
|
$node_primary->safe_psql(
|
|
'testdb', qq[
|
|
CREATE ROLE create_trash;
|
|
DROP ROLE create_trash;
|
|
VACUUM pg_authid;
|
|
INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
|
|
]);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Check invalidation in the logfile and in pg_stat_database_conflicts
|
|
check_for_invalidation('shared_row_removal_', $logstart,
|
|
'with vacuum on pg_authid');
|
|
|
|
# Verify conflict_reason is 'rows_removed' in pg_replication_slots
|
|
check_slots_conflict_reason('shared_row_removal_', 'rows_removed');
|
|
|
|
$handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
|
|
\$stderr);
|
|
|
|
# We are not able to read from the slot as it has been invalidated
|
|
check_pg_recvlogical_stderr($handle,
|
|
"can no longer get changes from replication slot \"shared_row_removal_activeslot\""
|
|
);
|
|
|
|
##################################################
|
|
# Recovery conflict: Same as Scenario 2 but on a non catalog table
|
|
# Scenario 4: No conflict expected.
|
|
##################################################
|
|
|
|
# get the position to search from in the standby logfile
|
|
$logstart = -s $node_standby->logfile;
|
|
|
|
reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_',
|
|
'no_conflict_', 0, 1);
|
|
|
|
# This should not trigger a conflict
|
|
$node_primary->safe_psql(
|
|
'testdb', qq[
|
|
CREATE TABLE conflict_test(x integer, y text);
|
|
INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
|
|
UPDATE conflict_test set x=1, y=1;
|
|
VACUUM conflict_test;
|
|
INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
|
|
]);
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# message should not be issued
|
|
ok( !$node_standby->log_contains(
|
|
"invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
|
|
'inactiveslot slot invalidation is not logged with vacuum on conflict_test'
|
|
);
|
|
|
|
ok( !$node_standby->log_contains(
|
|
"invalidating obsolete slot \"no_conflict_activeslot\"", $logstart),
|
|
'activeslot slot invalidation is not logged with vacuum on conflict_test'
|
|
);
|
|
|
|
# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
|
|
ok( $node_standby->poll_query_until(
|
|
'postgres',
|
|
"select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'",
|
|
't'),
|
|
'confl_active_logicalslot not updated'
|
|
) or die "Timed out waiting confl_active_logicalslot to be updated";
|
|
|
|
# Verify slots are reported as non conflicting in pg_replication_slots
|
|
is( $node_standby->safe_psql(
|
|
'postgres',
|
|
q[select bool_or(conflicting) from
|
|
(select conflict_reason is not NULL as conflicting
|
|
from pg_replication_slots WHERE slot_type = 'logical')]),
|
|
'f',
|
|
'Logical slots are reported as non conflicting');
|
|
|
|
# Turn hot_standby_feedback back on
|
|
change_hot_standby_feedback_and_wait_for_xmins(1, 0);
|
|
|
|
# Restart the standby node to ensure no slots are still active
|
|
$node_standby->restart;
|
|
|
|
##################################################
|
|
# Recovery conflict: Invalidate conflicting slots, including in-use slots
|
|
# Scenario 4: conflict due to on-access pruning.
|
|
##################################################
|
|
|
|
# get the position to search from in the standby logfile
|
|
$logstart = -s $node_standby->logfile;
|
|
|
|
# One way to produce recovery conflict is to trigger an on-access pruning
|
|
# on a relation marked as user_catalog_table.
|
|
reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0,
|
|
0);
|
|
|
|
# This should trigger the conflict
|
|
$node_primary->safe_psql('testdb',
|
|
qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]
|
|
);
|
|
$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
|
|
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
|
|
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
|
|
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
|
|
$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Check invalidation in the logfile and in pg_stat_database_conflicts
|
|
check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
|
|
|
|
# Verify conflict_reason is 'rows_removed' in pg_replication_slots
|
|
check_slots_conflict_reason('pruning_', 'rows_removed');
|
|
|
|
$handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
|
|
|
|
# We are not able to read from the slot as it has been invalidated
|
|
check_pg_recvlogical_stderr($handle,
|
|
"can no longer get changes from replication slot \"pruning_activeslot\"");
|
|
|
|
# Turn hot_standby_feedback back on
|
|
change_hot_standby_feedback_and_wait_for_xmins(1, 1);
|
|
|
|
##################################################
|
|
# Recovery conflict: Invalidate conflicting slots, including in-use slots
|
|
# Scenario 5: incorrect wal_level on primary.
|
|
##################################################
|
|
|
|
# get the position to search from in the standby logfile
|
|
$logstart = -s $node_standby->logfile;
|
|
|
|
# drop the logical slots
|
|
drop_logical_slots('pruning_');
|
|
|
|
# create the logical slots
|
|
create_logical_slots($node_standby, 'wal_level_');
|
|
|
|
$handle =
|
|
make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr);
|
|
|
|
# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
|
|
$node_standby->psql('testdb', q[select pg_stat_reset();]);
|
|
|
|
# Make primary wal_level replica. This will trigger slot conflict.
|
|
$node_primary->append_conf(
|
|
'postgresql.conf', q[
|
|
wal_level = 'replica'
|
|
]);
|
|
$node_primary->restart;
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Check invalidation in the logfile and in pg_stat_database_conflicts
|
|
check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
|
|
|
|
# Verify conflict_reason is 'wal_level_insufficient' in pg_replication_slots
|
|
check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');
|
|
|
|
$handle =
|
|
make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
|
|
# We are not able to read from the slot as it requires wal_level >= logical on the primary server
|
|
check_pg_recvlogical_stderr($handle,
|
|
"logical decoding on standby requires wal_level >= logical on the primary"
|
|
);
|
|
|
|
# Restore primary wal_level
|
|
$node_primary->append_conf(
|
|
'postgresql.conf', q[
|
|
wal_level = 'logical'
|
|
]);
|
|
$node_primary->restart;
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
$handle =
|
|
make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
|
|
# as the slot has been invalidated we should not be able to read
|
|
check_pg_recvlogical_stderr($handle,
|
|
"can no longer get changes from replication slot \"wal_level_activeslot\""
|
|
);
|
|
|
|
##################################################
|
|
# DROP DATABASE should drops it's slots, including active slots.
|
|
##################################################
|
|
|
|
# drop the logical slots
|
|
drop_logical_slots('wal_level_');
|
|
|
|
# create the logical slots
|
|
create_logical_slots($node_standby, 'drop_db_');
|
|
|
|
$handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr);
|
|
|
|
# Create a slot on a database that would not be dropped. This slot should not
|
|
# get dropped.
|
|
$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot',
|
|
'postgres');
|
|
|
|
# dropdb on the primary to verify slots are dropped on standby
|
|
$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
|
|
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
is( $node_standby->safe_psql(
|
|
'postgres',
|
|
q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]),
|
|
'f',
|
|
'database dropped on standby');
|
|
|
|
check_slots_dropped('drop_db', $handle);
|
|
|
|
is($node_standby->slot('otherslot')->{'slot_type'},
|
|
'logical', 'otherslot on standby not dropped');
|
|
|
|
# Cleanup : manually drop the slot that was not dropped.
|
|
$node_standby->psql('postgres',
|
|
q[SELECT pg_drop_replication_slot('otherslot')]);
|
|
|
|
##################################################
|
|
# Test standby promotion and logical decoding behavior
|
|
# after the standby gets promoted.
|
|
##################################################
|
|
|
|
$node_standby->reload;
|
|
|
|
$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
|
|
$node_primary->safe_psql('testdb',
|
|
qq[CREATE TABLE decoding_test(x integer, y text);]);
|
|
|
|
# Wait for the standby to catchup before initializing the cascading standby
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
|
|
# Create a physical replication slot on the standby.
|
|
# Keep this step after the "Verify that invalidated logical slots do not lead
|
|
# to retaining WAL" test (as the physical slot on the standby could prevent the
|
|
# WAL file removal).
|
|
$node_standby->safe_psql('testdb',
|
|
qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]
|
|
);
|
|
|
|
# Initialize cascading standby node
|
|
$node_standby->backup($backup_name);
|
|
$node_cascading_standby->init_from_backup(
|
|
$node_standby, $backup_name,
|
|
has_streaming => 1,
|
|
has_restoring => 1);
|
|
$node_cascading_standby->append_conf(
|
|
'postgresql.conf',
|
|
qq[primary_slot_name = '$standby_physical_slotname'
|
|
hot_standby_feedback = on]);
|
|
$node_cascading_standby->start;
|
|
|
|
# create the logical slots
|
|
create_logical_slots($node_standby, 'promotion_');
|
|
|
|
# Wait for the cascading standby to catchup before creating the slots
|
|
$node_standby->wait_for_replay_catchup($node_cascading_standby,
|
|
$node_primary);
|
|
|
|
# create the logical slots on the cascading standby too
|
|
create_logical_slots($node_cascading_standby, 'promotion_');
|
|
|
|
# Make slots actives
|
|
$handle =
|
|
make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr);
|
|
my $cascading_handle =
|
|
make_slot_active($node_cascading_standby, 'promotion_', 1,
|
|
\$cascading_stdout, \$cascading_stderr);
|
|
|
|
# Insert some rows before the promotion
|
|
$node_primary->safe_psql('testdb',
|
|
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
|
|
);
|
|
|
|
# Wait for both standbys to catchup
|
|
$node_primary->wait_for_replay_catchup($node_standby);
|
|
$node_standby->wait_for_replay_catchup($node_cascading_standby,
|
|
$node_primary);
|
|
|
|
# promote
|
|
$node_standby->promote;
|
|
|
|
# insert some rows on promoted standby
|
|
$node_standby->safe_psql('testdb',
|
|
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;]
|
|
);
|
|
|
|
# Wait for the cascading standby to catchup
|
|
$node_standby->wait_for_replay_catchup($node_cascading_standby);
|
|
|
|
$expected = q{BEGIN
|
|
table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
|
|
table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
|
|
table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
|
|
table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
|
|
COMMIT
|
|
BEGIN
|
|
table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
|
|
table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
|
|
table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
|
|
COMMIT};
|
|
|
|
# check that we are decoding pre and post promotion inserted rows
|
|
$stdout_sql = $node_standby->safe_psql('testdb',
|
|
qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
|
|
);
|
|
|
|
is($stdout_sql, $expected,
|
|
'got expected output from SQL decoding session on promoted standby');
|
|
|
|
# check that we are decoding pre and post promotion inserted rows
|
|
# with pg_recvlogical that has started before the promotion
|
|
my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
|
|
|
|
ok(pump_until($handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
|
|
'got 2 COMMIT from pg_recvlogical output');
|
|
|
|
chomp($stdout);
|
|
is($stdout, $expected,
|
|
'got same expected output from pg_recvlogical decoding session');
|
|
|
|
# check that we are decoding pre and post promotion inserted rows on the cascading standby
|
|
$stdout_sql = $node_cascading_standby->safe_psql('testdb',
|
|
qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
|
|
);
|
|
|
|
is($stdout_sql, $expected,
|
|
'got expected output from SQL decoding session on cascading standby');
|
|
|
|
# check that we are decoding pre and post promotion inserted rows
|
|
# with pg_recvlogical that has started before the promotion on the cascading standby
|
|
ok( pump_until(
|
|
$cascading_handle, $pump_timeout,
|
|
\$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
|
|
'got 2 COMMIT from pg_recvlogical output');
|
|
|
|
chomp($cascading_stdout);
|
|
is($cascading_stdout, $expected,
|
|
'got same expected output from pg_recvlogical decoding session on cascading standby'
|
|
);
|
|
|
|
done_testing();
|