postgresql/src/bin/pg_basebackup/pg_recvlogical.c

1071 lines
26 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
2014-03-18 20:03:17 +01:00
* pg_recvlogical.c - receive data from a logical decoding slot in a streaming
* fashion and write it to a local file.
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/pg_recvlogical.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <dirent.h>
#include <sys/stat.h>
#include <unistd.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
/* local includes */
#include "streamutil.h"
#include "access/xlog_internal.h"
#include "common/file_perm.h"
#include "common/fe_memutils.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "pqexpbuffer.h"
/* Time to sleep between reconnection attempts */
#define RECONNECT_SLEEP_TIME 5
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
static int noloop = 0;
Phase 2 of pgindent updates. Change pg_bsd_indent to follow upstream rules for placement of comments to the right of code, and remove pgindent hack that caused comments following #endif to not obey the general rule. Commit e3860ffa4dd0dad0dd9eea4be9cc1412373a8c89 wasn't actually using the published version of pg_bsd_indent, but a hacked-up version that tried to minimize the amount of movement of comments to the right of code. The situation of interest is where such a comment has to be moved to the right of its default placement at column 33 because there's code there. BSD indent has always moved right in units of tab stops in such cases --- but in the previous incarnation, indent was working in 8-space tab stops, while now it knows we use 4-space tabs. So the net result is that in about half the cases, such comments are placed one tab stop left of before. This is better all around: it leaves more room on the line for comment text, and it means that in such cases the comment uniformly starts at the next 4-space tab stop after the code, rather than sometimes one and sometimes two tabs after. Also, ensure that comments following #endif are indented the same as comments following other preprocessor commands such as #else. That inconsistency turns out to have been self-inflicted damage from a poorly-thought-through post-indent "fixup" in pgindent. This patch is much less interesting than the first round of indent changes, but also bulkier, so I thought it best to separate the effects. Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:18:54 +02:00
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool slot_exists_ok = false;
static bool do_start_slot = false;
static bool do_drop_slot = false;
static char *replication_slot = NULL;
/* filled pairwise with option, value. value may be NULL */
static char **options;
static size_t noptions = 0;
static const char *plugin = "test_decoding";
/* Global State */
static int outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false;
static bool output_isfile;
static TimestampTz output_last_fsync = -1;
static bool output_needs_fsync = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void);
static void StreamLogicalLog(void);
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
bool keepalive, XLogRecPtr lsn);
static void
usage(void)
{
printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
progname);
printf(_("Usage:\n"));
printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nAction to be performed:\n"));
printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
printf(_("\nOptions:\n"));
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
printf(_(" -F --fsync-interval=SECS\n"
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -o, --option=NAME[=VALUE]\n"
" pass option NAME with optional value VALUE to the\n"
" output plugin\n"));
printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=DBNAME database to connect to\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n"));
}
/*
* Send a Standby Status Update message to server.
*/
static bool
sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
{
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
/*
* we normally don't want to send superfluous feedback, but if it's
* because of a timeout we need to, otherwise wal_sender_timeout will kill
* us.
*/
if (!force &&
last_written_lsn == output_written_lsn &&
last_fsync_lsn != output_fsync_lsn)
return true;
if (verbose)
fprintf(stderr,
_("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
progname,
(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
replication_slot);
replybuf[len] = 'r';
len += 1;
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
len += 8;
Phase 2 of pgindent updates. Change pg_bsd_indent to follow upstream rules for placement of comments to the right of code, and remove pgindent hack that caused comments following #endif to not obey the general rule. Commit e3860ffa4dd0dad0dd9eea4be9cc1412373a8c89 wasn't actually using the published version of pg_bsd_indent, but a hacked-up version that tried to minimize the amount of movement of comments to the right of code. The situation of interest is where such a comment has to be moved to the right of its default placement at column 33 because there's code there. BSD indent has always moved right in units of tab stops in such cases --- but in the previous incarnation, indent was working in 8-space tab stops, while now it knows we use 4-space tabs. So the net result is that in about half the cases, such comments are placed one tab stop left of before. This is better all around: it leaves more room on the line for comment text, and it means that in such cases the comment uniformly starts at the next 4-space tab stop after the code, rather than sometimes one and sometimes two tabs after. Also, ensure that comments following #endif are indented the same as comments following other preprocessor commands such as #else. That inconsistency turns out to have been self-inflicted damage from a poorly-thought-through post-indent "fixup" in pgindent. This patch is much less interesting than the first round of indent changes, but also bulkier, so I thought it best to separate the effects. Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:18:54 +02:00
fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
len += 8;
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
Phase 2 of pgindent updates. Change pg_bsd_indent to follow upstream rules for placement of comments to the right of code, and remove pgindent hack that caused comments following #endif to not obey the general rule. Commit e3860ffa4dd0dad0dd9eea4be9cc1412373a8c89 wasn't actually using the published version of pg_bsd_indent, but a hacked-up version that tried to minimize the amount of movement of comments to the right of code. The situation of interest is where such a comment has to be moved to the right of its default placement at column 33 because there's code there. BSD indent has always moved right in units of tab stops in such cases --- but in the previous incarnation, indent was working in 8-space tab stops, while now it knows we use 4-space tabs. So the net result is that in about half the cases, such comments are placed one tab stop left of before. This is better all around: it leaves more room on the line for comment text, and it means that in such cases the comment uniformly starts at the next 4-space tab stop after the code, rather than sometimes one and sometimes two tabs after. Also, ensure that comments following #endif are indented the same as comments following other preprocessor commands such as #else. That inconsistency turns out to have been self-inflicted damage from a poorly-thought-through post-indent "fixup" in pgindent. This patch is much less interesting than the first round of indent changes, but also bulkier, so I thought it best to separate the effects. Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:18:54 +02:00
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
startpos = output_written_lsn;
last_written_lsn = output_written_lsn;
last_fsync_lsn = output_fsync_lsn;
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send feedback packet: %s"),
progname, PQerrorMessage(conn));
return false;
}
return true;
}
static void
disconnect_atexit(void)
{
if (conn != NULL)
PQfinish(conn);
}
static bool
OutputFsync(TimestampTz now)
{
output_last_fsync = now;
output_fsync_lsn = output_written_lsn;
if (fsync_interval <= 0)
return true;
if (!output_needs_fsync)
return true;
output_needs_fsync = false;
/* can only fsync if it's a regular file */
if (!output_isfile)
return true;
if (fsync(outfd) != 0)
{
fprintf(stderr,
_("%s: could not fsync file \"%s\": %s\n"),
progname, outfile, strerror(errno));
return false;
}
return true;
}
/*
* Start the log streaming
*/
static void
StreamLogicalLog(void)
{
PGresult *res;
char *copybuf = NULL;
TimestampTz last_status = -1;
int i;
PQExpBuffer query;
output_written_lsn = InvalidXLogRecPtr;
output_fsync_lsn = InvalidXLogRecPtr;
query = createPQExpBuffer();
/*
* Connect in replication mode to the server
*/
if (!conn)
conn = GetConnection();
if (!conn)
/* Error message already written in GetConnection() */
return;
/*
* Start the replication
*/
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (slot %s)\n"),
progname, (uint32) (startpos >> 32), (uint32) startpos,
replication_slot);
/* Initiate the replication stream at specified location */
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
/* print options if there are any */
if (noptions)
appendPQExpBufferStr(query, " (");
for (i = 0; i < noptions; i++)
{
/* separator */
if (i > 0)
appendPQExpBufferStr(query, ", ");
/* write option name */
appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
/* write option value if specified */
if (options[(i * 2) + 1] != NULL)
appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
}
if (noptions)
appendPQExpBufferChar(query, ')');
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query->data, PQresultErrorMessage(res));
PQclear(res);
goto error;
}
PQclear(res);
resetPQExpBuffer(query);
if (verbose)
fprintf(stderr,
_("%s: streaming initiated\n"),
progname);
while (!time_to_abort)
{
int r;
int bytes_left;
int bytes_written;
TimestampTz now;
int hdr_len;
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
/*
* Potentially send a status message to the master
*/
now = feGetCurrentTimestamp();
if (outfd != -1 &&
feTimestampDifferenceExceeds(output_last_fsync, now,
fsync_interval))
{
if (!OutputFsync(now))
goto error;
}
if (standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, now, true, false))
goto error;
last_status = now;
}
/* got SIGHUP, close output file */
if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
{
now = feGetCurrentTimestamp();
if (!OutputFsync(now))
goto error;
close(outfd);
outfd = -1;
}
output_reopen = false;
/* open the output file, if not open yet */
if (outfd == -1)
{
struct stat statbuf;
if (strcmp(outfile, "-") == 0)
outfd = fileno(stdout);
else
outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
S_IRUSR | S_IWUSR);
if (outfd == -1)
{
fprintf(stderr,
_("%s: could not open log file \"%s\": %s\n"),
progname, outfile, strerror(errno));
goto error;
}
if (fstat(outfd, &statbuf) != 0)
fprintf(stderr,
_("%s: could not stat file \"%s\": %s\n"),
progname, outfile, strerror(errno));
output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
}
r = PQgetCopyData(conn, &copybuf, 1);
if (r == 0)
{
/*
* In async mode, and no data available. We block on reading but
* not more than the specified timeout, so that we can send a
* response back to the client.
*/
fd_set input_mask;
TimestampTz message_target = 0;
TimestampTz fsync_target = 0;
struct timeval timeout;
struct timeval *timeoutptr = NULL;
if (PQsocket(conn) < 0)
{
fprintf(stderr,
_("%s: invalid socket: %s"),
progname, PQerrorMessage(conn));
goto error;
}
FD_ZERO(&input_mask);
FD_SET(PQsocket(conn), &input_mask);
/* Compute when we need to wakeup to send a keepalive message. */
if (standby_message_timeout)
message_target = last_status + (standby_message_timeout - 1) *
((int64) 1000);
/* Compute when we need to wakeup to fsync the output file. */
if (fsync_interval > 0 && output_needs_fsync)
fsync_target = output_last_fsync + (fsync_interval - 1) *
((int64) 1000);
/* Now compute when to wakeup. */
if (message_target > 0 || fsync_target > 0)
{
TimestampTz targettime;
long secs;
int usecs;
targettime = message_target;
if (fsync_target > 0 && fsync_target < targettime)
targettime = fsync_target;
feTimestampDifference(now,
targettime,
&secs,
&usecs);
if (secs <= 0)
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
else
timeout.tv_sec = secs;
timeout.tv_usec = usecs;
timeoutptr = &timeout;
}
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
if (r == 0 || (r < 0 && errno == EINTR))
{
/*
* Got a timeout or signal. Continue the loop and either
* deliver a status packet to the server or just go back into
* blocking.
*/
continue;
}
else if (r < 0)
{
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
goto error;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,
_("%s: could not receive data from WAL stream: %s"),
progname, PQerrorMessage(conn));
goto error;
}
continue;
}
/* End of copy stream */
if (r == -1)
break;
/* Failure while reading the copy stream */
if (r == -2)
{
fprintf(stderr, _("%s: could not read COPY data: %s"),
progname, PQerrorMessage(conn));
goto error;
}
/* Check the message type. */
if (copybuf[0] == 'k')
{
int pos;
bool replyRequested;
XLogRecPtr walEnd;
bool endposReached = false;
/*
* Parse the keepalive message, enclosed in the CopyData message.
* We just check if the server requested a reply, and ignore the
* rest.
*/
pos = 1; /* skip msgtype 'k' */
walEnd = fe_recvint64(&copybuf[pos]);
output_written_lsn = Max(walEnd, output_written_lsn);
pos += 8; /* read walEnd */
pos += 8; /* skip sendTime */
if (r < pos + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
replyRequested = copybuf[pos];
if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
{
/*
* If there's nothing to read on the socket until a keepalive
* we know that the server has nothing to send us; and if
* walEnd has passed endpos, we know nothing else can have
* committed before endpos. So we can bail out now.
*/
endposReached = true;
}
/* Send a reply, if necessary */
if (replyRequested || endposReached)
{
if (!flushAndSendFeedback(conn, &now))
goto error;
last_status = now;
}
if (endposReached)
{
prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
time_to_abort = true;
break;
}
continue;
}
else if (copybuf[0] != 'w')
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
goto error;
}
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
* of the header is ignored.
*/
hdr_len = 1; /* msgtype 'w' */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
/* Extract WAL location for this block */
cur_record_lsn = fe_recvint64(&copybuf[1]);
if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
{
/*
* We've read past our endpoint, so prepare to go away being
* cautious about what happens to our output data.
*/
if (!flushAndSendFeedback(conn, &now))
goto error;
prepareToTerminate(conn, endpos, false, cur_record_lsn);
time_to_abort = true;
break;
}
output_written_lsn = Max(cur_record_lsn, output_written_lsn);
bytes_left = r - hdr_len;
bytes_written = 0;
/* signal that a fsync is needed */
output_needs_fsync = true;
while (bytes_left)
{
int ret;
ret = write(outfd,
copybuf + hdr_len + bytes_written,
bytes_left);
if (ret < 0)
{
fprintf(stderr,
_("%s: could not write %u bytes to log file \"%s\": %s\n"),
progname, bytes_left, outfile,
strerror(errno));
goto error;
}
/* Write was successful, advance our position */
bytes_written += ret;
bytes_left -= ret;
}
if (write(outfd, "\n", 1) != 1)
{
fprintf(stderr,
_("%s: could not write %u bytes to log file \"%s\": %s\n"),
progname, 1, outfile,
strerror(errno));
goto error;
}
if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
{
/* endpos was exactly the record we just processed, we're done */
if (!flushAndSendFeedback(conn, &now))
goto error;
prepareToTerminate(conn, endpos, false, cur_record_lsn);
time_to_abort = true;
break;
}
}
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_COPY_OUT)
{
/*
* We're doing a client-initiated clean exit and have sent CopyDone to
* the server. We've already sent replay confirmation and fsync'd so
* we can just clean up the connection now.
*/
goto error;
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
PQclear(res);
if (outfd != -1 && strcmp(outfile, "-") != 0)
{
TimestampTz t = feGetCurrentTimestamp();
/* no need to jump to error on failure here, we're finishing anyway */
OutputFsync(t);
if (close(outfd) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, outfile, strerror(errno));
}
outfd = -1;
error:
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
destroyPQExpBuffer(query);
PQfinish(conn);
conn = NULL;
}
/*
* Unfortunately we can't do sensible signal handling on windows...
*/
#ifndef WIN32
/*
* When sigint is called, just tell the system to exit at the next possible
* moment.
*/
static void
sigint_handler(int signum)
{
time_to_abort = true;
}
/*
* Trigger the output file to be reopened.
*/
static void
sighup_handler(int signum)
{
output_reopen = true;
}
#endif
int
main(int argc, char **argv)
{
static struct option long_options[] = {
/* general options */
{"file", required_argument, NULL, 'f'},
{"fsync-interval", required_argument, NULL, 'F'},
{"no-loop", no_argument, NULL, 'n'},
{"verbose", no_argument, NULL, 'v'},
{"version", no_argument, NULL, 'V'},
{"help", no_argument, NULL, '?'},
/* connection options */
{"dbname", required_argument, NULL, 'd'},
{"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
/* replication options */
{"startpos", required_argument, NULL, 'I'},
{"endpos", required_argument, NULL, 'E'},
{"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'},
{"slot", required_argument, NULL, 'S'},
/* action */
{"create-slot", no_argument, NULL, 1},
{"start", no_argument, NULL, 2},
{"drop-slot", no_argument, NULL, 3},
{"if-not-exists", no_argument, NULL, 4},
{NULL, 0, NULL, 0}
};
int c;
int option_index;
uint32 hi,
lo;
char *db_name;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
if (argc > 1)
{
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
{
usage();
exit(0);
}
else if (strcmp(argv[1], "-V") == 0 ||
strcmp(argv[1], "--version") == 0)
{
puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
exit(0);
}
}
while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
{
/* general options */
case 'f':
outfile = pg_strdup(optarg);
break;
case 'F':
fsync_interval = atoi(optarg) * 1000;
if (fsync_interval < 0)
{
fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'n':
noloop = 1;
break;
case 'v':
verbose++;
break;
/* connection options */
case 'd':
dbname = pg_strdup(optarg);
break;
case 'h':
dbhost = pg_strdup(optarg);
break;
case 'p':
if (atoi(optarg) <= 0)
{
fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
progname, optarg);
exit(1);
}
dbport = pg_strdup(optarg);
break;
case 'U':
dbuser = pg_strdup(optarg);
break;
case 'w':
dbgetpassword = -1;
break;
case 'W':
dbgetpassword = 1;
break;
/* replication options */
case 'I':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse start position \"%s\"\n"),
progname, optarg);
exit(1);
}
startpos = ((uint64) hi) << 32 | lo;
break;
case 'E':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse end position \"%s\"\n"),
progname, optarg);
exit(1);
}
endpos = ((uint64) hi) << 32 | lo;
break;
case 'o':
{
char *data = pg_strdup(optarg);
char *val = strchr(data, '=');
if (val != NULL)
{
/* remove =; separate data from val */
*val = '\0';
val++;
}
noptions += 1;
options = pg_realloc(options, sizeof(char *) * noptions * 2);
options[(noptions - 1) * 2] = data;
options[(noptions - 1) * 2 + 1] = val;
}
break;
case 'P':
plugin = pg_strdup(optarg);
break;
case 's':
standby_message_timeout = atoi(optarg) * 1000;
if (standby_message_timeout < 0)
{
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'S':
replication_slot = pg_strdup(optarg);
break;
/* action */
case 1:
do_create_slot = true;
break;
case 2:
do_start_slot = true;
break;
case 3:
do_drop_slot = true;
break;
case 4:
slot_exists_ok = true;
break;
default:
/*
* getopt_long already emitted a complaint
*/
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
}
/*
* Any non-option arguments?
*/
if (optind < argc)
{
fprintf(stderr,
_("%s: too many command-line arguments (first is \"%s\")\n"),
progname, argv[optind]);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
/*
* Required arguments
*/
if (replication_slot == NULL)
{
fprintf(stderr, _("%s: no slot specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (do_start_slot && outfile == NULL)
{
fprintf(stderr, _("%s: no target file specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (!do_drop_slot && dbname == NULL)
{
fprintf(stderr, _("%s: no database specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (!do_drop_slot && !do_create_slot && !do_start_slot)
{
fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (do_drop_slot && (do_create_slot || do_start_slot))
{
fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
{
fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (endpos != InvalidXLogRecPtr && !do_start_slot)
{
fprintf(stderr,
_("%s: --endpos may only be specified with --start\n"),
progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
#endif
/*
* Obtain a connection to server. This is not really necessary but it
2016-06-10 00:02:36 +02:00
* helps to get more precise error messages about authentication, required
* GUC parameters and such.
*/
conn = GetConnection();
if (!conn)
/* Error message already written in GetConnection() */
exit(1);
atexit(disconnect_atexit);
/*
* Run IDENTIFY_SYSTEM to make sure we connected using a database specific
* replication connection.
*/
if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
exit(1);
if (db_name == NULL)
{
fprintf(stderr,
2015-05-19 05:01:48 +02:00
_("%s: could not establish database-specific replication connection\n"),
progname);
exit(1);
}
/*
* Set umask so that directories/files are created with the same
* permissions as directories/files in the source data directory.
*
* pg_mode_mask is set to owner-only by default and then updated in
* GetConnection() where we get the mode from the server-side with
* RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
*/
umask(pg_mode_mask);
/* Drop a replication slot. */
if (do_drop_slot)
{
if (verbose)
fprintf(stderr,
_("%s: dropping replication slot \"%s\"\n"),
progname, replication_slot);
if (!DropReplicationSlot(conn, replication_slot))
exit(1);
}
/* Create a replication slot. */
if (do_create_slot)
{
if (verbose)
fprintf(stderr,
_("%s: creating replication slot \"%s\"\n"),
progname, replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
false, false, slot_exists_ok))
exit(1);
startpos = InvalidXLogRecPtr;
}
if (!do_start_slot)
exit(0);
/* Stream loop */
while (true)
{
StreamLogicalLog();
if (time_to_abort)
{
/*
* We've been Ctrl-C'ed or reached an exit limit condition. That's
* not an error, so exit without an errorcode.
*/
exit(0);
}
else if (noloop)
{
fprintf(stderr, _("%s: disconnected\n"), progname);
exit(1);
}
else
{
fprintf(stderr,
/* translator: check source for value for %d */
_("%s: disconnected; waiting %d seconds to try again\n"),
progname, RECONNECT_SLEEP_TIME);
pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
}
}
}
/*
* Fsync our output data, and send a feedback message to the server. Returns
* true if successful, false otherwise.
*
* If successful, *now is updated to the current timestamp just before sending
* feedback.
*/
static bool
flushAndSendFeedback(PGconn *conn, TimestampTz *now)
{
/* flush data to disk, so that we send a recent flush pointer */
if (!OutputFsync(*now))
return false;
*now = feGetCurrentTimestamp();
if (!sendFeedback(conn, *now, true, false))
return false;
return true;
}
/*
2017-12-29 22:32:32 +01:00
* Try to inform the server about our upcoming demise, but don't wait around or
* retry on failure.
*/
static void
prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
{
(void) PQputCopyEnd(conn, NULL);
(void) PQflush(conn);
if (verbose)
{
if (keepalive)
fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
progname,
(uint32) (endpos >> 32), (uint32) endpos);
else
fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
progname, (uint32) (endpos >> 32), (uint32) (endpos),
(uint32) (lsn >> 32), (uint32) lsn);
}
}