Allow PostgresNode.pm tests to wait for catchup

Add methods to the core test framework PostgresNode.pm to allow us to
test that standby nodes have caught up with the master, as well as
basic LSN handling.  Used in tests recovery/t/001_stream_rep.pl and
recovery/t/004_timeline_switch.pl

Craig Ringer, reviewed by Aleksander Alekseev and Simon Riggs
This commit is contained in:
Simon Riggs 2017-01-04 16:50:23 +00:00
parent 579f700911
commit fb093e4cb3
3 changed files with 175 additions and 25 deletions

View File

@ -1121,7 +1121,6 @@ sub psql
my $exc_save = $@;
if ($exc_save)
{
# IPC::Run::run threw an exception. re-throw unless it's a
# timeout, which we'll handle by testing is_expired
die $exc_save
@ -1173,7 +1172,7 @@ sub psql
if $ret == 1;
die "connection error: '$$stderr'\nwhile running '@psql_params'"
if $ret == 2;
die "error running SQL: '$$stderr'\nwhile running '@psql_params'"
die "error running SQL: '$$stderr'\nwhile running '@psql_params' with sql '$sql'"
if $ret == 3;
die "psql returns $ret: '$$stderr'\nwhile running '@psql_params'";
}
@ -1325,6 +1324,175 @@ sub run_log
TestLib::run_log(@_);
}
=pod $node->lsn(mode)
Look up xlog positions on the server:
* insert position (master only, error on replica)
* write position (master only, error on replica)
* flush position
* receive position (always undef on master)
* replay position
mode must be specified.
=cut
sub lsn
{
my ($self, $mode) = @_;
my %modes = ('insert' => 'pg_current_xlog_insert_location()',
'flush' => 'pg_current_xlog_flush_location()',
'write' => 'pg_current_xlog_location()',
'receive' => 'pg_last_xlog_receive_location()',
'replay' => 'pg_last_xlog_replay_location()');
$mode = '<undef>' if !defined($mode);
die "unknown mode for 'lsn': '$mode', valid modes are " . join(', ', keys %modes)
if !defined($modes{$mode});
my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}");
chomp($result);
if ($result eq '')
{
return undef;
}
else
{
return $result;
}
}
=pod $node->wait_for_catchup(standby_name, mode, target_lsn)
Wait for the node with application_name standby_name (usually from node->name)
until its replication position in pg_stat_replication equals or passes the
upstream's xlog insert point at the time this function is called. By default
the replay_location is waited for, but 'mode' may be specified to wait for any
of sent|write|flush|replay.
If there is no active replication connection from this peer, waits until
poll_query_until timeout.
Requires that the 'postgres' db exists and is accessible.
target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
This is not a test. It die()s on failure.
=cut
sub wait_for_catchup
{
my ($self, $standby_name, $mode, $target_lsn) = @_;
$mode = defined($mode) ? $mode : 'replay';
my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1 );
die "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode});
# Allow passing of a PostgresNode instance as shorthand
if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") )
{
$standby_name = $standby_name->name;
}
die 'target_lsn must be specified' unless defined($target_lsn);
print "Waiting for replication conn " . $standby_name . "'s " . $mode . "_location to pass " . $target_lsn . " on " . $self->name . "\n";
my $query = qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
$self->poll_query_until('postgres', $query)
or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)');
print "done\n";
}
=pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn)
Wait for the named replication slot to equal or pass the supplied target_lsn.
The position used is the restart_lsn unless mode is given, in which case it may
be 'restart' or 'confirmed_flush'.
Requires that the 'postgres' db exists and is accessible.
This is not a test. It die()s on failure.
If the slot is not active, will time out after poll_query_until's timeout.
target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
Note that for logical slots, restart_lsn is held down by the oldest in-progress tx.
=cut
sub wait_for_slot_catchup
{
my ($self, $slot_name, $mode, $target_lsn) = @_;
$mode = defined($mode) ? $mode : 'restart';
if (!($mode eq 'restart' || $mode eq 'confirmed_flush'))
{
die "valid modes are restart, confirmed_flush";
}
die 'target lsn must be specified' unless defined($target_lsn);
print "Waiting for replication slot " . $slot_name . "'s " . $mode . "_lsn to pass " . $target_lsn . " on " . $self->name . "\n";
my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
$self->poll_query_until('postgres', $query)
or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)');
print "done\n";
}
=pod $node->query_hash($dbname, $query, @columns)
Execute $query on $dbname, replacing any appearance of the string __COLUMNS__
within the query with a comma-separated list of @columns.
If __COLUMNS__ does not appear in the query, its result columns must EXACTLY
match the order and number (but not necessarily alias) of supplied @columns.
The query must return zero or one rows.
Return a hash-ref representation of the results of the query, with any empty
or null results as defined keys with an empty-string value. There is no way
to differentiate between null and empty-string result fields.
If the query returns zero rows, return a hash with all columns empty. There
is no way to differentiate between zero rows returned and a row with only
null columns.
=cut
sub query_hash
{
my ($self, $dbname, $query, @columns) = @_;
die 'calls in array context for multi-row results not supported yet' if (wantarray);
# Replace __COLUMNS__ if found
substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) = join(', ', @columns)
if index($query, '__COLUMNS__') >= 0;
my $result = $self->safe_psql($dbname, $query);
# hash slice, see http://stackoverflow.com/a/16755894/398670 .
#
# Fills the hash with empty strings produced by x-operator element
# duplication if result is an empty row
#
my %val;
@val{@columns} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@columns);
return \%val;
}
=pod $node->slot(slot_name)
Return hash-ref of replication slot data for the named slot, or a hash-ref with
all values '' if not found. Does not differentiate between null and empty string
for fields, no field is ever undef.
The restart_lsn and confirmed_flush_lsn fields are returned verbatim, and also
as a 2-list of [highword, lowword] integer. Since we rely on Perl 5.8.8 we can't
"use bigint", it's from 5.20, and we can't assume we have Math::Bigint from CPAN
either.
=cut
sub slot
{
my ($self, $slot_name) = @_;
my @columns = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn');
return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns);
}
=pod
=back

View File

@ -40,16 +40,8 @@ $node_master->safe_psql('postgres',
"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
# Wait for standbys to catch up
my $applname_1 = $node_standby_1->name;
my $applname_2 = $node_standby_2->name;
my $caughtup_query =
"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
$node_master->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for standby 1 to catch up";
$caughtup_query =
"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
$node_standby_1->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for standby 2 to catch up";
$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert'));
$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay'));
my $result =
$node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");

View File

@ -32,14 +32,9 @@ $node_standby_2->start;
# Create some content on master
$node_master->safe_psql('postgres',
"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
my $until_lsn =
$node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
# Wait until standby has replayed enough data on standby 1
my $caughtup_query =
"SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
$node_standby_1->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for standby to catch up";
$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('write'));
# Stop and remove master, and promote standby 1, switching it to a new timeline
$node_master->teardown_node;
@ -50,7 +45,7 @@ rmtree($node_standby_2->data_dir . '/recovery.conf');
my $connstr_1 = $node_standby_1->connstr;
$node_standby_2->append_conf(
'recovery.conf', qq(
primary_conninfo='$connstr_1'
primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}'
standby_mode=on
recovery_target_timeline='latest'
));
@ -60,12 +55,7 @@ $node_standby_2->restart;
# to ensure that the timeline switch has been done.
$node_standby_1->safe_psql('postgres',
"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
$until_lsn = $node_standby_1->safe_psql('postgres',
"SELECT pg_current_xlog_location();");
$caughtup_query =
"SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
$node_standby_2->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for standby to catch up";
$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('write'));
my $result =
$node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");