2014-03-18 17:19:57 +01:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
2014-03-18 20:03:17 +01:00
|
|
|
* pg_recvlogical.c - receive data from a logical decoding slot in a streaming
|
|
|
|
* fashion and write it to a local file.
|
2014-03-18 17:19:57 +01:00
|
|
|
*
|
|
|
|
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
|
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* src/bin/pg_basebackup/pg_recvlogical.c
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres_fe.h"
|
|
|
|
|
|
|
|
#include <dirent.h>
|
|
|
|
#include <sys/stat.h>
|
|
|
|
#include <unistd.h>
|
|
|
|
|
|
|
|
/* local includes */
|
|
|
|
#include "streamutil.h"
|
|
|
|
|
|
|
|
#include "access/xlog_internal.h"
|
|
|
|
#include "common/fe_memutils.h"
|
|
|
|
#include "getopt_long.h"
|
|
|
|
#include "libpq-fe.h"
|
|
|
|
#include "libpq/pqsignal.h"
|
|
|
|
#include "pqexpbuffer.h"
|
|
|
|
|
|
|
|
|
|
|
|
/* Time to sleep between reconnection attempts */
|
|
|
|
#define RECONNECT_SLEEP_TIME 5
|
|
|
|
|
|
|
|
/* Global Options */
|
2014-05-06 18:12:18 +02:00
|
|
|
static char *outfile = NULL;
|
|
|
|
static int verbose = 0;
|
|
|
|
static int noloop = 0;
|
|
|
|
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
|
|
|
|
static int fsync_interval = 10 * 1000; /* 10 sec = default */
|
2014-03-18 17:19:57 +01:00
|
|
|
static XLogRecPtr startpos = InvalidXLogRecPtr;
|
2014-05-06 18:12:18 +02:00
|
|
|
static bool do_create_slot = false;
|
|
|
|
static bool do_start_slot = false;
|
|
|
|
static bool do_drop_slot = false;
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
/* filled pairwise with option, value. value may be NULL */
|
2014-05-06 18:12:18 +02:00
|
|
|
static char **options;
|
|
|
|
static size_t noptions = 0;
|
2014-03-18 17:19:57 +01:00
|
|
|
static const char *plugin = "test_decoding";
|
|
|
|
|
|
|
|
/* Global State */
|
2014-05-06 18:12:18 +02:00
|
|
|
static int outfd = -1;
|
2014-03-18 17:19:57 +01:00
|
|
|
static volatile sig_atomic_t time_to_abort = false;
|
|
|
|
static volatile sig_atomic_t output_reopen = false;
|
2014-05-06 18:12:18 +02:00
|
|
|
static int64 output_last_fsync = -1;
|
2014-05-15 18:43:37 +02:00
|
|
|
static bool output_needs_fsync = false;
|
2014-03-18 17:19:57 +01:00
|
|
|
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
|
|
|
|
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
|
|
|
|
|
|
|
|
static void usage(void);
|
2014-09-29 15:35:40 +02:00
|
|
|
static void StreamLogicalLog(void);
|
2014-03-18 17:19:57 +01:00
|
|
|
static void disconnect_and_exit(int code);
|
|
|
|
|
|
|
|
static void
|
|
|
|
usage(void)
|
|
|
|
{
|
2014-10-12 07:45:25 +02:00
|
|
|
printf(_("%s receives PostgreSQL logical change streams.\n\n"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname);
|
|
|
|
printf(_("Usage:\n"));
|
|
|
|
printf(_(" %s [OPTION]...\n"), progname);
|
2014-10-12 07:45:25 +02:00
|
|
|
printf(_("\nAction to be performed:\n"));
|
|
|
|
printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
|
|
|
|
printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
|
|
|
|
printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
|
2014-03-18 17:19:57 +01:00
|
|
|
printf(_("\nOptions:\n"));
|
2014-10-12 07:45:25 +02:00
|
|
|
printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
|
2014-05-25 18:47:05 +02:00
|
|
|
printf(_(" -F --fsync-interval=SECS\n"
|
2014-10-12 07:45:25 +02:00
|
|
|
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
|
|
|
|
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
|
2014-03-18 17:19:57 +01:00
|
|
|
printf(_(" -n, --no-loop do not loop on connection lost\n"));
|
2014-10-12 07:45:25 +02:00
|
|
|
printf(_(" -o, --option=NAME[=VALUE]\n"
|
|
|
|
" pass option NAME with optional value VALUE to the\n"
|
|
|
|
" output plugin\n"));
|
|
|
|
printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
|
|
|
|
printf(_(" -s, --status-interval=SECS\n"
|
|
|
|
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
|
|
|
|
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
|
2014-03-18 17:19:57 +01:00
|
|
|
printf(_(" -v, --verbose output verbose messages\n"));
|
|
|
|
printf(_(" -V, --version output version information, then exit\n"));
|
|
|
|
printf(_(" -?, --help show this help, then exit\n"));
|
|
|
|
printf(_("\nConnection options:\n"));
|
|
|
|
printf(_(" -d, --dbname=DBNAME database to connect to\n"));
|
|
|
|
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
|
|
|
|
printf(_(" -p, --port=PORT database server port number\n"));
|
|
|
|
printf(_(" -U, --username=NAME connect as specified database user\n"));
|
|
|
|
printf(_(" -w, --no-password never prompt for password\n"));
|
|
|
|
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
|
|
|
|
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Send a Standby Status Update message to server.
|
|
|
|
*/
|
|
|
|
static bool
|
|
|
|
sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
|
|
|
|
{
|
|
|
|
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
|
|
|
|
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
|
|
|
|
|
|
|
|
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
|
|
|
|
int len = 0;
|
|
|
|
|
|
|
|
/*
|
2014-03-18 20:03:17 +01:00
|
|
|
* we normally don't want to send superfluous feedbacks, but if it's
|
2014-05-06 18:12:18 +02:00
|
|
|
* because of a timeout we need to, otherwise wal_sender_timeout will kill
|
|
|
|
* us.
|
2014-03-18 17:19:57 +01:00
|
|
|
*/
|
|
|
|
if (!force &&
|
|
|
|
last_written_lsn == output_written_lsn &&
|
|
|
|
last_fsync_lsn != output_fsync_lsn)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
if (verbose)
|
|
|
|
fprintf(stderr,
|
2014-05-06 18:12:18 +02:00
|
|
|
_("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname,
|
2014-05-06 18:12:18 +02:00
|
|
|
(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
|
2014-03-18 17:19:57 +01:00
|
|
|
(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
|
|
|
|
replication_slot);
|
|
|
|
|
|
|
|
replybuf[len] = 'r';
|
|
|
|
len += 1;
|
2014-05-06 18:12:18 +02:00
|
|
|
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
|
2014-03-18 17:19:57 +01:00
|
|
|
len += 8;
|
|
|
|
fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
|
|
|
|
len += 8;
|
2014-05-06 18:12:18 +02:00
|
|
|
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
|
2014-03-18 17:19:57 +01:00
|
|
|
len += 8;
|
2014-05-06 18:12:18 +02:00
|
|
|
fe_sendint64(now, &replybuf[len]); /* sendTime */
|
2014-03-18 17:19:57 +01:00
|
|
|
len += 8;
|
|
|
|
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
|
|
|
|
len += 1;
|
|
|
|
|
|
|
|
startpos = output_written_lsn;
|
|
|
|
last_written_lsn = output_written_lsn;
|
|
|
|
last_fsync_lsn = output_fsync_lsn;
|
|
|
|
|
|
|
|
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: could not send feedback packet: %s"),
|
|
|
|
progname, PQerrorMessage(conn));
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
disconnect_and_exit(int code)
|
|
|
|
{
|
|
|
|
if (conn != NULL)
|
|
|
|
PQfinish(conn);
|
|
|
|
|
|
|
|
exit(code);
|
|
|
|
}
|
|
|
|
|
|
|
|
static bool
|
|
|
|
OutputFsync(int64 now)
|
|
|
|
{
|
|
|
|
output_last_fsync = now;
|
|
|
|
|
|
|
|
output_fsync_lsn = output_written_lsn;
|
|
|
|
|
|
|
|
if (fsync_interval <= 0)
|
|
|
|
return true;
|
|
|
|
|
2014-05-15 18:43:37 +02:00
|
|
|
if (!output_needs_fsync)
|
2014-03-18 17:19:57 +01:00
|
|
|
return true;
|
|
|
|
|
2014-05-15 18:43:37 +02:00
|
|
|
output_needs_fsync = false;
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
/* Accept EINVAL, in case output is writing to a pipe or similar. */
|
|
|
|
if (fsync(outfd) != 0 && errno != EINVAL)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: could not fsync log file \"%s\": %s\n"),
|
|
|
|
progname, outfile, strerror(errno));
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Start the log streaming
|
|
|
|
*/
|
|
|
|
static void
|
2014-09-29 15:35:40 +02:00
|
|
|
StreamLogicalLog(void)
|
2014-03-18 17:19:57 +01:00
|
|
|
{
|
|
|
|
PGresult *res;
|
|
|
|
char *copybuf = NULL;
|
|
|
|
int64 last_status = -1;
|
|
|
|
int i;
|
|
|
|
PQExpBuffer query;
|
|
|
|
|
|
|
|
output_written_lsn = InvalidXLogRecPtr;
|
|
|
|
output_fsync_lsn = InvalidXLogRecPtr;
|
|
|
|
|
|
|
|
query = createPQExpBuffer();
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Connect in replication mode to the server
|
|
|
|
*/
|
|
|
|
if (!conn)
|
|
|
|
conn = GetConnection();
|
|
|
|
if (!conn)
|
|
|
|
/* Error message already written in GetConnection() */
|
|
|
|
return;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Start the replication
|
|
|
|
*/
|
|
|
|
if (verbose)
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: starting log streaming at %X/%X (slot %s)\n"),
|
|
|
|
progname, (uint32) (startpos >> 32), (uint32) startpos,
|
|
|
|
replication_slot);
|
|
|
|
|
|
|
|
/* Initiate the replication stream at specified location */
|
|
|
|
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
|
2014-05-06 18:12:18 +02:00
|
|
|
replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
/* print options if there are any */
|
|
|
|
if (noptions)
|
|
|
|
appendPQExpBufferStr(query, " (");
|
|
|
|
|
|
|
|
for (i = 0; i < noptions; i++)
|
|
|
|
{
|
|
|
|
/* separator */
|
|
|
|
if (i > 0)
|
|
|
|
appendPQExpBufferStr(query, ", ");
|
|
|
|
|
|
|
|
/* write option name */
|
|
|
|
appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
|
|
|
|
|
|
|
|
/* write option value if specified */
|
|
|
|
if (options[(i * 2) + 1] != NULL)
|
|
|
|
appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (noptions)
|
|
|
|
appendPQExpBufferChar(query, ')');
|
|
|
|
|
|
|
|
res = PQexec(conn, query->data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COPY_BOTH)
|
|
|
|
{
|
2014-05-15 13:49:11 +02:00
|
|
|
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname, query->data, PQresultErrorMessage(res));
|
|
|
|
PQclear(res);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
resetPQExpBuffer(query);
|
|
|
|
|
|
|
|
if (verbose)
|
|
|
|
fprintf(stderr,
|
2014-05-15 13:49:11 +02:00
|
|
|
_("%s: streaming initiated\n"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname);
|
|
|
|
|
|
|
|
while (!time_to_abort)
|
|
|
|
{
|
|
|
|
int r;
|
|
|
|
int bytes_left;
|
|
|
|
int bytes_written;
|
|
|
|
int64 now;
|
|
|
|
int hdr_len;
|
|
|
|
|
|
|
|
if (copybuf != NULL)
|
|
|
|
{
|
|
|
|
PQfreemem(copybuf);
|
|
|
|
copybuf = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Potentially send a status message to the master
|
|
|
|
*/
|
|
|
|
now = feGetCurrentTimestamp();
|
|
|
|
|
|
|
|
if (outfd != -1 &&
|
|
|
|
feTimestampDifferenceExceeds(output_last_fsync, now,
|
|
|
|
fsync_interval))
|
|
|
|
{
|
|
|
|
if (!OutputFsync(now))
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (standby_message_timeout > 0 &&
|
|
|
|
feTimestampDifferenceExceeds(last_status, now,
|
|
|
|
standby_message_timeout))
|
|
|
|
{
|
|
|
|
/* Time to send feedback! */
|
|
|
|
if (!sendFeedback(conn, now, true, false))
|
|
|
|
goto error;
|
|
|
|
|
|
|
|
last_status = now;
|
|
|
|
}
|
|
|
|
|
2014-05-15 18:43:37 +02:00
|
|
|
/* got SIGHUP, close output file */
|
|
|
|
if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
|
|
|
|
{
|
|
|
|
now = feGetCurrentTimestamp();
|
|
|
|
if (!OutputFsync(now))
|
|
|
|
goto error;
|
|
|
|
close(outfd);
|
|
|
|
outfd = -1;
|
|
|
|
}
|
|
|
|
output_reopen = false;
|
|
|
|
|
2014-05-16 09:10:45 +02:00
|
|
|
/* open the output file, if not open yet */
|
|
|
|
if (outfd == -1)
|
|
|
|
{
|
|
|
|
if (strcmp(outfile, "-") == 0)
|
|
|
|
outfd = fileno(stdout);
|
|
|
|
else
|
|
|
|
outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
|
|
|
|
S_IRUSR | S_IWUSR);
|
|
|
|
if (outfd == -1)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: could not open log file \"%s\": %s\n"),
|
|
|
|
progname, outfile, strerror(errno));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-03-18 17:19:57 +01:00
|
|
|
r = PQgetCopyData(conn, ©buf, 1);
|
|
|
|
if (r == 0)
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* In async mode, and no data available. We block on reading but
|
|
|
|
* not more than the specified timeout, so that we can send a
|
|
|
|
* response back to the client.
|
|
|
|
*/
|
|
|
|
fd_set input_mask;
|
|
|
|
int64 message_target = 0;
|
|
|
|
int64 fsync_target = 0;
|
|
|
|
struct timeval timeout;
|
2014-03-18 19:54:00 +01:00
|
|
|
struct timeval *timeoutptr = NULL;
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
FD_ZERO(&input_mask);
|
|
|
|
FD_SET(PQsocket(conn), &input_mask);
|
|
|
|
|
|
|
|
/* Compute when we need to wakeup to send a keepalive message. */
|
|
|
|
if (standby_message_timeout)
|
|
|
|
message_target = last_status + (standby_message_timeout - 1) *
|
|
|
|
((int64) 1000);
|
|
|
|
|
|
|
|
/* Compute when we need to wakeup to fsync the output file. */
|
2014-05-15 18:43:37 +02:00
|
|
|
if (fsync_interval > 0 && output_needs_fsync)
|
2014-03-18 17:19:57 +01:00
|
|
|
fsync_target = output_last_fsync + (fsync_interval - 1) *
|
|
|
|
((int64) 1000);
|
|
|
|
|
|
|
|
/* Now compute when to wakeup. */
|
|
|
|
if (message_target > 0 || fsync_target > 0)
|
|
|
|
{
|
|
|
|
int64 targettime;
|
|
|
|
long secs;
|
|
|
|
int usecs;
|
|
|
|
|
|
|
|
targettime = message_target;
|
|
|
|
|
|
|
|
if (fsync_target > 0 && fsync_target < targettime)
|
|
|
|
targettime = fsync_target;
|
|
|
|
|
|
|
|
feTimestampDifference(now,
|
|
|
|
targettime,
|
|
|
|
&secs,
|
|
|
|
&usecs);
|
|
|
|
if (secs <= 0)
|
|
|
|
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
|
|
|
|
else
|
|
|
|
timeout.tv_sec = secs;
|
|
|
|
timeout.tv_usec = usecs;
|
|
|
|
timeoutptr = &timeout;
|
|
|
|
}
|
|
|
|
|
|
|
|
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
|
|
|
|
if (r == 0 || (r < 0 && errno == EINTR))
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* Got a timeout or signal. Continue the loop and either
|
|
|
|
* deliver a status packet to the server or just go back into
|
|
|
|
* blocking.
|
|
|
|
*/
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
else if (r < 0)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: select() failed: %s\n"),
|
|
|
|
progname, strerror(errno));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Else there is actually data on the socket */
|
|
|
|
if (PQconsumeInput(conn) == 0)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: could not receive data from WAL stream: %s"),
|
|
|
|
progname, PQerrorMessage(conn));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* End of copy stream */
|
|
|
|
if (r == -1)
|
|
|
|
break;
|
|
|
|
|
|
|
|
/* Failure while reading the copy stream */
|
|
|
|
if (r == -2)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: could not read COPY data: %s"),
|
|
|
|
progname, PQerrorMessage(conn));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Check the message type. */
|
|
|
|
if (copybuf[0] == 'k')
|
|
|
|
{
|
|
|
|
int pos;
|
|
|
|
bool replyRequested;
|
|
|
|
XLogRecPtr walEnd;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Parse the keepalive message, enclosed in the CopyData message.
|
|
|
|
* We just check if the server requested a reply, and ignore the
|
|
|
|
* rest.
|
|
|
|
*/
|
|
|
|
pos = 1; /* skip msgtype 'k' */
|
|
|
|
walEnd = fe_recvint64(©buf[pos]);
|
|
|
|
output_written_lsn = Max(walEnd, output_written_lsn);
|
|
|
|
|
|
|
|
pos += 8; /* read walEnd */
|
|
|
|
|
|
|
|
pos += 8; /* skip sendTime */
|
|
|
|
|
|
|
|
if (r < pos + 1)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: streaming header too small: %d\n"),
|
|
|
|
progname, r);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
replyRequested = copybuf[pos];
|
|
|
|
|
|
|
|
/* If the server requested an immediate reply, send one. */
|
|
|
|
if (replyRequested)
|
|
|
|
{
|
|
|
|
/* fsync data, so we send a recent flush pointer */
|
|
|
|
if (!OutputFsync(now))
|
|
|
|
goto error;
|
|
|
|
|
|
|
|
now = feGetCurrentTimestamp();
|
|
|
|
if (!sendFeedback(conn, now, true, false))
|
|
|
|
goto error;
|
|
|
|
last_status = now;
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
else if (copybuf[0] != 'w')
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
|
|
|
|
progname, copybuf[0]);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Read the header of the XLogData message, enclosed in the CopyData
|
|
|
|
* message. We only need the WAL location field (dataStart), the rest
|
|
|
|
* of the header is ignored.
|
|
|
|
*/
|
|
|
|
hdr_len = 1; /* msgtype 'w' */
|
|
|
|
hdr_len += 8; /* dataStart */
|
|
|
|
hdr_len += 8; /* walEnd */
|
|
|
|
hdr_len += 8; /* sendTime */
|
|
|
|
if (r < hdr_len + 1)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: streaming header too small: %d\n"),
|
|
|
|
progname, r);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Extract WAL location for this block */
|
|
|
|
{
|
|
|
|
XLogRecPtr temp = fe_recvint64(©buf[1]);
|
|
|
|
|
|
|
|
output_written_lsn = Max(temp, output_written_lsn);
|
|
|
|
}
|
|
|
|
|
|
|
|
bytes_left = r - hdr_len;
|
|
|
|
bytes_written = 0;
|
|
|
|
|
|
|
|
/* signal that a fsync is needed */
|
2014-05-15 18:43:37 +02:00
|
|
|
output_needs_fsync = true;
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
while (bytes_left)
|
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
|
|
|
ret = write(outfd,
|
|
|
|
copybuf + hdr_len + bytes_written,
|
|
|
|
bytes_left);
|
|
|
|
|
|
|
|
if (ret < 0)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: could not write %u bytes to log file \"%s\": %s\n"),
|
|
|
|
progname, bytes_left, outfile,
|
|
|
|
strerror(errno));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Write was successful, advance our position */
|
|
|
|
bytes_written += ret;
|
|
|
|
bytes_left -= ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (write(outfd, "\n", 1) != 1)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: could not write %u bytes to log file \"%s\": %s\n"),
|
|
|
|
progname, 1, outfile,
|
|
|
|
strerror(errno));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res = PQgetResult(conn);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: unexpected termination of replication stream: %s"),
|
|
|
|
progname, PQresultErrorMessage(res));
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
|
|
|
|
if (outfd != -1 && strcmp(outfile, "-") != 0)
|
|
|
|
{
|
2014-05-06 18:12:18 +02:00
|
|
|
int64 t = feGetCurrentTimestamp();
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
/* no need to jump to error on failure here, we're finishing anyway */
|
|
|
|
OutputFsync(t);
|
|
|
|
|
|
|
|
if (close(outfd) != 0)
|
|
|
|
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
|
|
|
|
progname, outfile, strerror(errno));
|
|
|
|
}
|
|
|
|
outfd = -1;
|
|
|
|
error:
|
2014-05-05 15:20:12 +02:00
|
|
|
if (copybuf != NULL)
|
|
|
|
{
|
|
|
|
PQfreemem(copybuf);
|
|
|
|
copybuf = NULL;
|
|
|
|
}
|
2014-03-18 17:19:57 +01:00
|
|
|
destroyPQExpBuffer(query);
|
|
|
|
PQfinish(conn);
|
|
|
|
conn = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Unfortunately we can't do sensible signal handling on windows...
|
|
|
|
*/
|
|
|
|
#ifndef WIN32
|
|
|
|
|
|
|
|
/*
|
|
|
|
* When sigint is called, just tell the system to exit at the next possible
|
|
|
|
* moment.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
sigint_handler(int signum)
|
|
|
|
{
|
|
|
|
time_to_abort = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Trigger the output file to be reopened.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
sighup_handler(int signum)
|
|
|
|
{
|
|
|
|
output_reopen = true;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
int
|
|
|
|
main(int argc, char **argv)
|
|
|
|
{
|
|
|
|
static struct option long_options[] = {
|
|
|
|
/* general options */
|
|
|
|
{"file", required_argument, NULL, 'f'},
|
2014-05-25 18:47:05 +02:00
|
|
|
{"fsync-interval", required_argument, NULL, 'F'},
|
2014-03-18 17:19:57 +01:00
|
|
|
{"no-loop", no_argument, NULL, 'n'},
|
|
|
|
{"verbose", no_argument, NULL, 'v'},
|
|
|
|
{"version", no_argument, NULL, 'V'},
|
|
|
|
{"help", no_argument, NULL, '?'},
|
|
|
|
/* connnection options */
|
|
|
|
{"dbname", required_argument, NULL, 'd'},
|
|
|
|
{"host", required_argument, NULL, 'h'},
|
|
|
|
{"port", required_argument, NULL, 'p'},
|
|
|
|
{"username", required_argument, NULL, 'U'},
|
|
|
|
{"no-password", no_argument, NULL, 'w'},
|
|
|
|
{"password", no_argument, NULL, 'W'},
|
|
|
|
/* replication options */
|
2014-05-25 18:47:05 +02:00
|
|
|
{"startpos", required_argument, NULL, 'I'},
|
2014-03-18 17:19:57 +01:00
|
|
|
{"option", required_argument, NULL, 'o'},
|
|
|
|
{"plugin", required_argument, NULL, 'P'},
|
|
|
|
{"status-interval", required_argument, NULL, 's'},
|
|
|
|
{"slot", required_argument, NULL, 'S'},
|
|
|
|
/* action */
|
2014-10-06 12:11:52 +02:00
|
|
|
{"create-slot", no_argument, NULL, 1},
|
2014-03-18 17:19:57 +01:00
|
|
|
{"start", no_argument, NULL, 2},
|
2014-10-06 12:11:52 +02:00
|
|
|
{"drop-slot", no_argument, NULL, 3},
|
2014-03-18 17:19:57 +01:00
|
|
|
{NULL, 0, NULL, 0}
|
|
|
|
};
|
|
|
|
int c;
|
|
|
|
int option_index;
|
|
|
|
uint32 hi,
|
|
|
|
lo;
|
2014-10-01 17:22:21 +02:00
|
|
|
char *db_name;
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
progname = get_progname(argv[0]);
|
|
|
|
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
|
|
|
|
|
|
|
|
if (argc > 1)
|
|
|
|
{
|
|
|
|
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
|
|
|
|
{
|
|
|
|
usage();
|
|
|
|
exit(0);
|
|
|
|
}
|
|
|
|
else if (strcmp(argv[1], "-V") == 0 ||
|
|
|
|
strcmp(argv[1], "--version") == 0)
|
|
|
|
{
|
|
|
|
puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
|
|
|
|
exit(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-05-25 18:47:05 +02:00
|
|
|
while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
|
2014-03-18 17:19:57 +01:00
|
|
|
long_options, &option_index)) != -1)
|
|
|
|
{
|
|
|
|
switch (c)
|
|
|
|
{
|
|
|
|
/* general options */
|
|
|
|
case 'f':
|
|
|
|
outfile = pg_strdup(optarg);
|
|
|
|
break;
|
2014-05-25 18:47:05 +02:00
|
|
|
case 'F':
|
|
|
|
fsync_interval = atoi(optarg) * 1000;
|
|
|
|
if (fsync_interval < 0)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
|
|
|
|
progname, optarg);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
break;
|
2014-03-18 17:19:57 +01:00
|
|
|
case 'n':
|
|
|
|
noloop = 1;
|
|
|
|
break;
|
|
|
|
case 'v':
|
|
|
|
verbose++;
|
|
|
|
break;
|
|
|
|
/* connnection options */
|
|
|
|
case 'd':
|
|
|
|
dbname = pg_strdup(optarg);
|
|
|
|
break;
|
|
|
|
case 'h':
|
|
|
|
dbhost = pg_strdup(optarg);
|
|
|
|
break;
|
|
|
|
case 'p':
|
|
|
|
if (atoi(optarg) <= 0)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
|
|
|
|
progname, optarg);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
dbport = pg_strdup(optarg);
|
|
|
|
break;
|
|
|
|
case 'U':
|
|
|
|
dbuser = pg_strdup(optarg);
|
|
|
|
break;
|
|
|
|
case 'w':
|
|
|
|
dbgetpassword = -1;
|
|
|
|
break;
|
|
|
|
case 'W':
|
|
|
|
dbgetpassword = 1;
|
|
|
|
break;
|
|
|
|
/* replication options */
|
2014-05-25 18:47:05 +02:00
|
|
|
case 'I':
|
|
|
|
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: could not parse start position \"%s\"\n"),
|
|
|
|
progname, optarg);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
startpos = ((uint64) hi) << 32 | lo;
|
|
|
|
break;
|
2014-03-18 17:19:57 +01:00
|
|
|
case 'o':
|
|
|
|
{
|
2014-05-06 18:12:18 +02:00
|
|
|
char *data = pg_strdup(optarg);
|
|
|
|
char *val = strchr(data, '=');
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
if (val != NULL)
|
|
|
|
{
|
|
|
|
/* remove =; separate data from val */
|
|
|
|
*val = '\0';
|
|
|
|
val++;
|
|
|
|
}
|
|
|
|
|
|
|
|
noptions += 1;
|
2014-05-06 18:12:18 +02:00
|
|
|
options = pg_realloc(options, sizeof(char *) * noptions * 2);
|
2014-03-18 17:19:57 +01:00
|
|
|
|
|
|
|
options[(noptions - 1) * 2] = data;
|
|
|
|
options[(noptions - 1) * 2 + 1] = val;
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
case 'P':
|
|
|
|
plugin = pg_strdup(optarg);
|
|
|
|
break;
|
|
|
|
case 's':
|
|
|
|
standby_message_timeout = atoi(optarg) * 1000;
|
|
|
|
if (standby_message_timeout < 0)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
|
|
|
|
progname, optarg);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case 'S':
|
|
|
|
replication_slot = pg_strdup(optarg);
|
|
|
|
break;
|
|
|
|
/* action */
|
|
|
|
case 1:
|
|
|
|
do_create_slot = true;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
do_start_slot = true;
|
|
|
|
break;
|
|
|
|
case 3:
|
|
|
|
do_drop_slot = true;
|
|
|
|
break;
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
|
|
/*
|
|
|
|
* getopt_long already emitted a complaint
|
|
|
|
*/
|
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Any non-option arguments?
|
|
|
|
*/
|
|
|
|
if (optind < argc)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: too many command-line arguments (first is \"%s\")\n"),
|
|
|
|
progname, argv[optind]);
|
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Required arguments
|
|
|
|
*/
|
|
|
|
if (replication_slot == NULL)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: no slot specified\n"), progname);
|
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (do_start_slot && outfile == NULL)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: no target file specified\n"), progname);
|
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!do_drop_slot && dbname == NULL)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: no database specified\n"), progname);
|
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!do_drop_slot && !do_create_slot && !do_start_slot)
|
|
|
|
{
|
|
|
|
fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
|
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (do_drop_slot && (do_create_slot || do_start_slot))
|
|
|
|
{
|
2014-10-06 12:11:52 +02:00
|
|
|
fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname);
|
2014-03-18 17:19:57 +01:00
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
2014-08-12 11:12:16 +02:00
|
|
|
if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
|
2014-03-18 17:19:57 +01:00
|
|
|
{
|
2014-10-06 12:11:52 +02:00
|
|
|
fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
|
2014-03-18 17:19:57 +01:00
|
|
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
|
|
|
progname);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef WIN32
|
|
|
|
pqsignal(SIGINT, sigint_handler);
|
|
|
|
pqsignal(SIGHUP, sighup_handler);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/*
|
2014-10-01 17:22:21 +02:00
|
|
|
* Obtain a connection to server. This is not really necessary but it
|
|
|
|
* helps to get more precise error messages about authentification,
|
|
|
|
* required GUC parameters and such.
|
2014-03-18 17:19:57 +01:00
|
|
|
*/
|
2014-10-01 17:22:21 +02:00
|
|
|
conn = GetConnection();
|
|
|
|
if (!conn)
|
|
|
|
/* Error message already written in GetConnection() */
|
|
|
|
exit(1);
|
2014-03-18 17:19:57 +01:00
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
/*
|
|
|
|
* Run IDENTIFY_SYSTEM to make sure we connected using a database specific
|
|
|
|
* replication connection.
|
|
|
|
*/
|
|
|
|
if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
|
|
|
|
disconnect_and_exit(1);
|
2014-03-18 17:19:57 +01:00
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
if (db_name == NULL)
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
_("%s: failed to establish database specific replication connection\n"),
|
|
|
|
progname);
|
|
|
|
disconnect_and_exit(1);
|
2014-03-18 17:19:57 +01:00
|
|
|
}
|
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
/* Drop a replication slot. */
|
2014-03-18 17:19:57 +01:00
|
|
|
if (do_drop_slot)
|
|
|
|
{
|
|
|
|
if (verbose)
|
|
|
|
fprintf(stderr,
|
2014-09-29 15:35:40 +02:00
|
|
|
_("%s: dropping replication slot \"%s\"\n"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname, replication_slot);
|
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
if (!DropReplicationSlot(conn, replication_slot))
|
2014-03-18 17:19:57 +01:00
|
|
|
disconnect_and_exit(1);
|
|
|
|
}
|
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
/* Create a replication slot. */
|
2014-03-18 17:19:57 +01:00
|
|
|
if (do_create_slot)
|
|
|
|
{
|
|
|
|
if (verbose)
|
|
|
|
fprintf(stderr,
|
2014-09-29 15:35:40 +02:00
|
|
|
_("%s: creating replication slot \"%s\"\n"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname, replication_slot);
|
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
if (!CreateReplicationSlot(conn, replication_slot, plugin,
|
|
|
|
&startpos, false))
|
2014-03-18 17:19:57 +01:00
|
|
|
disconnect_and_exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!do_start_slot)
|
|
|
|
disconnect_and_exit(0);
|
|
|
|
|
2014-10-01 17:22:21 +02:00
|
|
|
/* Stream loop */
|
2014-03-18 17:19:57 +01:00
|
|
|
while (true)
|
|
|
|
{
|
2014-10-01 17:22:21 +02:00
|
|
|
StreamLogicalLog();
|
2014-03-18 17:19:57 +01:00
|
|
|
if (time_to_abort)
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* We've been Ctrl-C'ed. That's not an error, so exit without an
|
|
|
|
* errorcode.
|
|
|
|
*/
|
|
|
|
disconnect_and_exit(0);
|
|
|
|
}
|
|
|
|
else if (noloop)
|
|
|
|
{
|
2014-05-15 13:49:11 +02:00
|
|
|
fprintf(stderr, _("%s: disconnected\n"), progname);
|
2014-03-18 17:19:57 +01:00
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
fprintf(stderr,
|
|
|
|
/* translator: check source for value for %d */
|
2014-05-15 13:49:11 +02:00
|
|
|
_("%s: disconnected; waiting %d seconds to try again\n"),
|
2014-03-18 17:19:57 +01:00
|
|
|
progname, RECONNECT_SLEEP_TIME);
|
|
|
|
pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|