pg_basebackup: Add option to create replication slot

When requesting a particular replication slot, the new pg_basebackup
option -C/--create-slot creates it before starting to replicate from it.

Further refactor the slot creation logic to include the temporary slot
creation logic into the same function.  Add new arguments is_temporary
and preserve_wal to CreateReplicationSlot().  Print in --verbose mode
that a slot has been created.

Author: Michael Banck <michael.banck@credativ.de>
This commit is contained in:
Peter Eisentraut 2017-09-26 16:07:52 -04:00
parent 59597e6485
commit 3709ca1cf0
9 changed files with 112 additions and 41 deletions

View File

@ -382,6 +382,18 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>-C</option></term>
<term><option>--create-slot</option></term>
<listitem>
<para>
This option causes the replication slot specified by the
option <literal>--slot</literal> to be created before starting the
backup. In this case, an error is raised if the slot already exists.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>-l <replaceable class="parameter">label</replaceable></option></term> <term><option>-l <replaceable class="parameter">label</replaceable></option></term>
<term><option>--label=<replaceable class="parameter">label</replaceable></option></term> <term><option>--label=<replaceable class="parameter">label</replaceable></option></term>
@ -462,6 +474,10 @@ PostgreSQL documentation
the server does not remove any necessary WAL data in the time between the server does not remove any necessary WAL data in the time between
the end of the base backup and the start of streaming replication. the end of the base backup and the start of streaming replication.
</para> </para>
<para>
The specified replication slot has to exist unless the
option <option>-C</option> is also used.
</para>
<para> <para>
If this option is not specified and the server supports temporary If this option is not specified and the server supports temporary
replication slots (version 10 and later), then a temporary replication replication slots (version 10 and later), then a temporary replication

View File

@ -93,6 +93,8 @@ static pg_time_t last_progress_report = 0;
static int32 maxrate = 0; /* no limit by default */ static int32 maxrate = 0; /* no limit by default */
static char *replication_slot = NULL; static char *replication_slot = NULL;
static bool temp_replication_slot = true; static bool temp_replication_slot = true;
static bool create_slot = false;
static bool no_slot = false;
static bool success = false; static bool success = false;
static bool made_new_pgdata = false; static bool made_new_pgdata = false;
@ -346,6 +348,7 @@ usage(void)
printf(_("\nGeneral options:\n")); printf(_("\nGeneral options:\n"));
printf(_(" -c, --checkpoint=fast|spread\n" printf(_(" -c, --checkpoint=fast|spread\n"
" set fast or spread checkpointing\n")); " set fast or spread checkpointing\n"));
printf(_(" -C, --create-slot create replication slot\n"));
printf(_(" -l, --label=LABEL set backup label\n")); printf(_(" -l, --label=LABEL set backup label\n"));
printf(_(" -n, --no-clean do not clean up after errors\n")); printf(_(" -n, --no-clean do not clean up after errors\n"));
printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n")); printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n"));
@ -466,7 +469,6 @@ typedef struct
char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */ char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */
char *sysidentifier; char *sysidentifier;
int timeline; int timeline;
bool temp_slot;
} logstreamer_param; } logstreamer_param;
static int static int
@ -492,9 +494,6 @@ LogStreamerMain(logstreamer_param *param)
stream.mark_done = true; stream.mark_done = true;
stream.partial_suffix = NULL; stream.partial_suffix = NULL;
stream.replication_slot = replication_slot; stream.replication_slot = replication_slot;
stream.temp_slot = param->temp_slot;
if (stream.temp_slot && !stream.replication_slot)
stream.replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
if (format == 'p') if (format == 'p')
stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync); stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
@ -583,9 +582,29 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
/* Temporary replication slots are only supported in 10 and newer */ /* Temporary replication slots are only supported in 10 and newer */
if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS) if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
param->temp_slot = false; temp_replication_slot = false;
/*
* Create replication slot if requested
*/
if (temp_replication_slot && !replication_slot)
replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
if (temp_replication_slot || create_slot)
{
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
temp_replication_slot, true, true, false))
disconnect_and_exit(1);
if (verbose)
{
if (temp_replication_slot)
fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
progname, replication_slot);
else else
param->temp_slot = temp_replication_slot; fprintf(stderr, _("%s: created replication slot \"%s\"\n"),
progname, replication_slot);
}
}
if (format == 'p') if (format == 'p')
{ {
@ -2079,6 +2098,7 @@ main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'}, {"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'}, {"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'}, {"checkpoint", required_argument, NULL, 'c'},
{"create-slot", no_argument, NULL, 'C'},
{"max-rate", required_argument, NULL, 'r'}, {"max-rate", required_argument, NULL, 'r'},
{"write-recovery-conf", no_argument, NULL, 'R'}, {"write-recovery-conf", no_argument, NULL, 'R'},
{"slot", required_argument, NULL, 'S'}, {"slot", required_argument, NULL, 'S'},
@ -2105,7 +2125,6 @@ main(int argc, char **argv)
int c; int c;
int option_index; int option_index;
bool no_slot = false;
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup")); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
@ -2127,11 +2146,14 @@ main(int argc, char **argv)
atexit(cleanup_directories_atexit); atexit(cleanup_directories_atexit);
while ((c = getopt_long(argc, argv, "D:F:r:RT:X:l:nNzZ:d:c:h:p:U:s:S:wWvP", while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1) long_options, &option_index)) != -1)
{ {
switch (c) switch (c)
{ {
case 'C':
create_slot = true;
break;
case 'D': case 'D':
basedir = pg_strdup(optarg); basedir = pg_strdup(optarg);
break; break;
@ -2348,6 +2370,29 @@ main(int argc, char **argv)
temp_replication_slot = false; temp_replication_slot = false;
} }
if (create_slot)
{
if (!replication_slot)
{
fprintf(stderr,
_("%s: --create-slot needs a slot to be specified using --slot\n"),
progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (no_slot)
{
fprintf(stderr,
_("%s: --create-slot and --no-slot are incompatible options\n"),
progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
}
if (xlog_dir) if (xlog_dir)
{ {
if (format != 'p') if (format != 'p')

View File

@ -431,7 +431,6 @@ StreamLog(void)
stream.do_sync); stream.do_sync);
stream.partial_suffix = ".partial"; stream.partial_suffix = ".partial";
stream.replication_slot = replication_slot; stream.replication_slot = replication_slot;
stream.temp_slot = false;
ReceiveXlogStream(conn, &stream); ReceiveXlogStream(conn, &stream);
@ -728,7 +727,7 @@ main(int argc, char **argv)
_("%s: creating replication slot \"%s\"\n"), _("%s: creating replication slot \"%s\"\n"),
progname, replication_slot); progname, replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, NULL, true, if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
slot_exists_ok)) slot_exists_ok))
disconnect_and_exit(1); disconnect_and_exit(1);
disconnect_and_exit(0); disconnect_and_exit(0);

View File

@ -979,8 +979,8 @@ main(int argc, char **argv)
_("%s: creating replication slot \"%s\"\n"), _("%s: creating replication slot \"%s\"\n"),
progname, replication_slot); progname, replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
false, slot_exists_ok)) false, false, slot_exists_ok))
disconnect_and_exit(1); disconnect_and_exit(1);
startpos = InvalidXLogRecPtr; startpos = InvalidXLogRecPtr;
} }

View File

@ -522,24 +522,6 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
PQclear(res); PQclear(res);
} }
/*
* Create temporary replication slot if one is needed
*/
if (stream->temp_slot)
{
snprintf(query, sizeof(query),
"CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
stream->replication_slot);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
progname, stream->replication_slot, PQerrorMessage(conn));
PQclear(res);
return false;
}
}
/* /*
* initialize flush position to starting point, it's the caller's * initialize flush position to starting point, it's the caller's
* responsibility that that's sane. * responsibility that that's sane.

View File

@ -47,7 +47,6 @@ typedef struct StreamCtl
WalWriteMethod *walmethod; /* How to write the WAL */ WalWriteMethod *walmethod; /* How to write the WAL */
char *partial_suffix; /* Suffix appended to partially received files */ char *partial_suffix; /* Suffix appended to partially received files */
char *replication_slot; /* Replication slot to use, or NULL */ char *replication_slot; /* Replication slot to use, or NULL */
bool temp_slot; /* Create temporary replication slot */
} StreamCtl; } StreamCtl;

View File

@ -398,7 +398,8 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
*/ */
bool bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
bool is_physical, bool slot_exists_ok) bool is_temporary, bool is_physical, bool reserve_wal,
bool slot_exists_ok)
{ {
PQExpBuffer query; PQExpBuffer query;
PGresult *res; PGresult *res;
@ -410,13 +411,18 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
Assert(slot_name != NULL); Assert(slot_name != NULL);
/* Build query */ /* Build query */
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
if (is_temporary)
appendPQExpBuffer(query, " TEMPORARY");
if (is_physical) if (is_physical)
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", {
slot_name); appendPQExpBuffer(query, " PHYSICAL");
if (reserve_wal)
appendPQExpBuffer(query, " RESERVE_WAL");
}
else else
{ {
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
slot_name, plugin);
if (PQserverVersion(conn) >= 100000) if (PQserverVersion(conn) >= 100000)
/* pg_recvlogical doesn't use an exported snapshot, so suppress */ /* pg_recvlogical doesn't use an exported snapshot, so suppress */
appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT"); appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");

View File

@ -33,7 +33,8 @@ extern PGconn *GetConnection(void);
/* Replication commands */ /* Replication commands */
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
const char *plugin, bool is_physical, const char *plugin, bool is_temporary,
bool is_physical, bool reserve_wal,
bool slot_exists_ok); bool slot_exists_ok);
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
extern bool RunIdentifySystem(PGconn *conn, char **sysid, extern bool RunIdentifySystem(PGconn *conn, char **sysid,

View File

@ -4,7 +4,7 @@ use Cwd;
use Config; use Config;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 72; use Test::More tests => 78;
program_help_ok('pg_basebackup'); program_help_ok('pg_basebackup');
program_version_ok('pg_basebackup'); program_version_ok('pg_basebackup');
@ -259,9 +259,32 @@ $node->command_fails(
[ 'pg_basebackup', '-D', [ 'pg_basebackup', '-D',
"$tempdir/backupxs_sl_fail", '-X', "$tempdir/backupxs_sl_fail", '-X',
'stream', '-S', 'stream', '-S',
'slot1' ], 'slot0' ],
'pg_basebackup fails with nonexistent replication slot'); 'pg_basebackup fails with nonexistent replication slot');
$node->command_fails(
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ],
'pg_basebackup -C fails without slot name');
$node->command_fails(
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '--no-slot' ],
'pg_basebackup fails with -C -S --no-slot');
$node->command_ok(
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0' ],
'pg_basebackup -C runs');
is($node->safe_psql('postgres', q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'}),
'slot0',
'replication slot was created');
isnt($node->safe_psql('postgres', q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'}),
'',
'restart LSN of new slot is not null');
$node->command_fails(
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0' ],
'pg_basebackup fails with -C -S and a previously existing slot');
$node->safe_psql('postgres', $node->safe_psql('postgres',
q{SELECT * FROM pg_create_physical_replication_slot('slot1')}); q{SELECT * FROM pg_create_physical_replication_slot('slot1')});
my $lsn = $node->safe_psql('postgres', my $lsn = $node->safe_psql('postgres',