Add support for managing physical replication slots to pg_receivexlog.

pg_receivexlog already has the capability to use a replication slot to
reserve WAL on the upstream node. But the used slot currently has to
be created via SQL.

To allow using slots directly, without involving SQL, add
--create-slot and --drop-slot actions, analogous to the logical slot
manipulation support in pg_recvlogical.

Author: Michael Paquier
Discussion: CABUevEx+zrOHZOQg+dPapNPFRJdsk59b=TSVf30Z71GnFXhQaw@mail.gmail.com
This commit is contained in:
Andres Freund 2014-10-06 12:51:37 +02:00
parent c8b6cba84a
commit d9f38c7a55
2 changed files with 170 additions and 16 deletions

View File

@ -255,13 +255,42 @@ PostgreSQL documentation
to make sure that <application>pg_receivexlog</> cannot become the
synchronous standby through an incautious setting of
<xref linkend="guc-synchronous-standby-names">; it does not flush
data frequently enough for this to work correctly.
data frequently enough for this to work correctly. In
<option>--create-slot</option> mode, create the slot with this name.
In <option>--drop-slot</option> mode, delete the slot with this name.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
<application>pg_receivexlog</application> can perform one of the two
following actions in order to control physical replication slots:
<variablelist>
<varlistentry>
<term><option>--create-slot</option></term>
<listitem>
<para>
Create a new physical replication slot with the name specified in
<option>--slot</option>, then start to stream WAL.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--drop-slot</option></term>
<listitem>
<para>
Drop the replication slot with the name specified in
<option>--slot</option>, then exit.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
Other options are also available:

View File

@ -38,11 +38,15 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
static bool do_create_slot = false;
static bool do_drop_slot = false;
static void usage(void);
static DIR* get_destination_dir(char *dest_folder);
static void close_destination_dir(DIR *dest_dir, char *dest_folder);
static XLogRecPtr FindStreamingStart(uint32 *tli);
static void StreamLog();
static void StreamLog(void);
static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
bool segment_finished);
@ -78,6 +82,9 @@ usage(void)
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
printf(_("\nOptional actions:\n"));
printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
@ -118,6 +125,44 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
return false;
}
/*
* Get destination directory.
*/
static DIR*
get_destination_dir(char *dest_folder)
{
DIR *dir;
Assert(dest_folder != NULL);
dir = opendir(dest_folder);
if (dir == NULL)
{
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
progname, basedir, strerror(errno));
disconnect_and_exit(1);
}
return dir;
}
/*
* Close existing directory.
*/
static void
close_destination_dir(DIR *dest_dir, char *dest_folder)
{
Assert(dest_dir != NULL && dest_folder != NULL);
if (closedir(dest_dir))
{
fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
progname, dest_folder, strerror(errno));
disconnect_and_exit(1);
}
}
/*
* Determine starting location for streaming, based on any existing xlog
* segments in the directory. We start at the end of the last one that is
@ -134,13 +179,7 @@ FindStreamingStart(uint32 *tli)
uint32 high_tli = 0;
bool high_ispartial = false;
dir = opendir(basedir);
if (dir == NULL)
{
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
progname, basedir, strerror(errno));
disconnect_and_exit(1);
}
dir = get_destination_dir(basedir);
while (errno = 0, (dirent = readdir(dir)) != NULL)
{
@ -219,12 +258,7 @@ FindStreamingStart(uint32 *tli)
disconnect_and_exit(1);
}
if (closedir(dir))
{
fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
progname, basedir, strerror(errno));
disconnect_and_exit(1);
}
close_destination_dir(dir, basedir);
if (high_segno > 0)
{
@ -344,11 +378,15 @@ main(int argc, char **argv)
{"status-interval", required_argument, NULL, 's'},
{"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
/* action */
{"create-slot", no_argument, NULL, 1},
{"drop-slot", no_argument, NULL, 2},
{NULL, 0, NULL, 0}
};
int c;
int option_index;
char *db_name;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
@ -427,6 +465,13 @@ main(int argc, char **argv)
case 'v':
verbose++;
break;
/* action */
case 1:
do_create_slot = true;
break;
case 2:
do_drop_slot = true;
break;
default:
/*
@ -451,10 +496,26 @@ main(int argc, char **argv)
exit(1);
}
if (replication_slot == NULL && (do_drop_slot || do_create_slot))
{
fprintf(stderr, _("%s: --create-slot and --drop-slot need a slot to be specified using --slot\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (do_drop_slot && do_create_slot)
{
fprintf(stderr, _("%s: cannot use --create-slot together with --drop-slot\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
/*
* Required arguments
*/
if (basedir == NULL)
if (basedir == NULL && !do_drop_slot)
{
fprintf(stderr, _("%s: no target directory specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
@ -462,10 +523,74 @@ main(int argc, char **argv)
exit(1);
}
/*
* Check existence of destination folder.
*/
if (!do_drop_slot)
{
DIR *dir = get_destination_dir(basedir);
close_destination_dir(dir, basedir);
}
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
#endif
/*
* Obtain a connection before doing anything.
*/
conn = GetConnection();
if (!conn)
/* error message already written in GetConnection() */
exit(1);
/*
* Run IDENTIFY_SYSTEM to make sure we've successfully have established a
* replication connection and haven't connected using a database specific
* connection.
*/
if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
disconnect_and_exit(1);
/*
* Check that there is a database associated with connection, none
* should be defined in this context.
*/
if (db_name)
{
fprintf(stderr,
_("%s: replication connection using slot \"%s\" is unexpectedly database specific\n"),
progname, replication_slot);
disconnect_and_exit(1);
}
/*
* Drop a replication slot.
*/
if (do_drop_slot)
{
if (verbose)
fprintf(stderr,
_("%s: dropping replication slot \"%s\"\n"),
progname, replication_slot);
if (!DropReplicationSlot(conn, replication_slot))
disconnect_and_exit(1);
disconnect_and_exit(0);
}
/* Create a replication slot */
if (do_create_slot)
{
if (verbose)
fprintf(stderr,
_("%s: creating replication slot \"%s\"\n"),
progname, replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, NULL, NULL, true))
disconnect_and_exit(1);
}
while (true)
{
StreamLog();