Make pg_receivexlog and pg_basebackup -X stream work across timeline switches.

This mirrors the changes done earlier to the server in standby mode. When
receivelog reaches the end of a timeline, as reported by the server, it
fetches the timeline history file of the next timeline, and restarts
streaming from the new timeline by issuing a new START_STREAMING command.

When pg_receivexlog crosses a timeline, it leaves the .partial suffix on the
last segment on the old timeline. This helps you to tell apart a partial
segment left in the directory because of a timeline switch, and a completed
segment. If you just follow a single server, it won't make a difference, but
it can be significant in more complicated scenarios where new WAL is still
generated on the old timeline.

This includes two small changes to the streaming replication protocol:
First, when you reach the end of timeline while streaming, the server now
sends the TLI of the next timeline in the server's history to the client.
pg_receivexlog uses that as the next timeline, so that it doesn't need to
parse the timeline history file like a standby server does. Second, when
BASE_BACKUP command sends the begin and end WAL positions, it now also sends
the timeline IDs corresponding the positions.
This commit is contained in:
Heikki Linnakangas 2013-01-17 20:23:00 +02:00
parent 8ae35e9180
commit 0b6329130e
12 changed files with 699 additions and 312 deletions

View File

@ -1418,8 +1418,10 @@ The commands accepted in walsender mode are:
<para>
After streaming all the WAL on a timeline that is not the latest one,
the server will end streaming by exiting the COPY mode. When the client
acknowledges this by also exiting COPY mode, the server responds with a
CommandComplete message, and is ready to accept a new command.
acknowledges this by also exiting COPY mode, the server sends a
single-row, single-column result set indicating the next timeline in
this server's history. That is followed by a CommandComplete message,
and the server is ready to accept a new command.
</para>
<para>
@ -1784,7 +1786,9 @@ The commands accepted in walsender mode are:
</para>
<para>
The first ordinary result set contains the starting position of the
backup, given in XLogRecPtr format as a single column in a single row.
backup, in a single row with two columns. The first column contains
the start position given in XLogRecPtr format, and the second column
contains the corresponding timeline ID.
</para>
<para>
The second ordinary result set has one row for each tablespace.
@ -1827,7 +1831,9 @@ The commands accepted in walsender mode are:
<quote>ustar interchange format</> specified in the POSIX 1003.1-2008
standard) dump of the tablespace contents, except that the two trailing
blocks of zeroes specified in the standard are omitted.
After the tar data is complete, a final ordinary result set will be sent.
After the tar data is complete, a final ordinary result set will be sent,
containing the WAL end position of the backup, in the same format as
the start position.
</para>
<para>

View File

@ -545,22 +545,26 @@ tliOfPointInHistory(XLogRecPtr ptr, List *history)
}
/*
* Returns the point in history where we branched off the given timeline.
* Returns InvalidXLogRecPtr if the timeline is current (= we have not
* branched off from it), and throws an error if the timeline is not part of
* this server's history.
* Returns the point in history where we branched off the given timeline,
* and the timeline we branched to (*nextTLI). Returns InvalidXLogRecPtr if
* the timeline is current, ie. we have not branched off from it, and throws
* an error if the timeline is not part of this server's history.
*/
XLogRecPtr
tliSwitchPoint(TimeLineID tli, List *history)
tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
{
ListCell *cell;
if (nextTLI)
*nextTLI = 0;
foreach (cell, history)
{
TimeLineHistoryEntry *tle = (TimeLineHistoryEntry *) lfirst(cell);
if (tle->tli == tli)
return tle->end;
if (nextTLI)
*nextTLI = tle->tli;
}
ereport(ERROR,

View File

@ -4930,7 +4930,7 @@ StartupXLOG(void)
* tliSwitchPoint will throw an error if the checkpoint's timeline
* is not in expectedTLEs at all.
*/
switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs);
switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
ereport(FATAL,
(errmsg("requested timeline %u is not a child of this server's history",
recoveryTargetTLI),
@ -7870,16 +7870,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
* non-exclusive backups active at the same time, and they don't conflict
* with an exclusive backup either.
*
* Returns the minimum WAL position that must be present to restore from this
* backup, and the corresponding timeline ID in *starttli_p.
*
* Every successfully started non-exclusive backup must be stopped by calling
* do_pg_stop_backup() or do_pg_abort_backup().
*/
XLogRecPtr
do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
char **labelfile)
{
bool exclusive = (labelfile == NULL);
bool backup_started_in_recovery = false;
XLogRecPtr checkpointloc;
XLogRecPtr startpoint;
TimeLineID starttli;
pg_time_t stamp_time;
char strfbuf[128];
char xlogfilename[MAXFNAMELEN];
@ -8021,6 +8026,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
LWLockAcquire(ControlFileLock, LW_SHARED);
checkpointloc = ControlFile->checkPoint;
startpoint = ControlFile->checkPointCopy.redo;
starttli = ControlFile->checkPointCopy.ThisTimeLineID;
checkpointfpw = ControlFile->checkPointCopy.fullPageWrites;
LWLockRelease(ControlFileLock);
@ -8154,6 +8160,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
/*
* We're done. As a convenience, return the starting WAL location.
*/
if (starttli_p)
*starttli_p = starttli;
return startpoint;
}
@ -8190,14 +8198,18 @@ pg_start_backup_callback(int code, Datum arg)
* If labelfile is NULL, this stops an exclusive backup. Otherwise this stops
* the non-exclusive backup specified by 'labelfile'.
*
* Returns the last WAL position that must be present to restore from this
* backup, and the corresponding timeline ID in *stoptli_p.
*/
XLogRecPtr
do_pg_stop_backup(char *labelfile, bool waitforarchive)
do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
{
bool exclusive = (labelfile == NULL);
bool backup_started_in_recovery = false;
XLogRecPtr startpoint;
XLogRecPtr stoppoint;
TimeLineID stoptli;
XLogRecData rdata;
pg_time_t stamp_time;
char strfbuf[128];
@ -8401,8 +8413,11 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
LWLockAcquire(ControlFileLock, LW_SHARED);
stoppoint = ControlFile->minRecoveryPoint;
stoptli = ControlFile->minRecoveryPointTLI;
LWLockRelease(ControlFileLock);
if (stoptli_p)
*stoptli_p = stoptli;
return stoppoint;
}
@ -8414,6 +8429,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
rdata.buffer = InvalidBuffer;
rdata.next = NULL;
stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
stoptli = ThisTimeLineID;
/*
* Force a switch to a new xlog segment file, so that the backup is valid
@ -8529,6 +8545,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
/*
* We're done. As a convenience, return the ending WAL location.
*/
if (stoptli_p)
*stoptli_p = stoptli;
return stoppoint;
}

View File

@ -56,7 +56,7 @@ pg_start_backup(PG_FUNCTION_ARGS)
backupidstr = text_to_cstring(backupid);
startpoint = do_pg_start_backup(backupidstr, fast, NULL);
startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL);
snprintf(startxlogstr, sizeof(startxlogstr), "%X/%X",
(uint32) (startpoint >> 32), (uint32) startpoint);
@ -82,7 +82,7 @@ pg_stop_backup(PG_FUNCTION_ARGS)
XLogRecPtr stoppoint;
char stopxlogstr[MAXFNAMELEN];
stoppoint = do_pg_stop_backup(NULL, true);
stoppoint = do_pg_stop_backup(NULL, true, NULL);
snprintf(stopxlogstr, sizeof(stopxlogstr), "%X/%X",
(uint32) (stoppoint >> 32), (uint32) stoppoint);

View File

@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces);
static void base_backup_cleanup(int code, Datum arg);
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const void *a, const void *b);
/* Was the backup currently in-progress initiated in recovery mode? */
@ -94,13 +94,16 @@ static void
perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
{
XLogRecPtr startptr;
TimeLineID starttli;
XLogRecPtr endptr;
TimeLineID endtli;
char *labelfile;
backup_started_in_recovery = RecoveryInProgress();
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
SendXlogRecPtrResult(startptr);
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
&labelfile);
SendXlogRecPtrResult(startptr, starttli);
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
{
@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
}
PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
endptr = do_pg_stop_backup(labelfile, !opt->nowait);
endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
if (opt->includewal)
{
@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* Send CopyDone message for the last tar file */
pq_putemptymessage('c');
}
SendXlogRecPtrResult(endptr);
SendXlogRecPtrResult(endptr, endtli);
}
/*
@ -635,17 +638,15 @@ SendBackupHeader(List *tablespaces)
* XlogRecPtr record (in text format)
*/
static void
SendXlogRecPtrResult(XLogRecPtr ptr)
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
{
StringInfoData buf;
char str[MAXFNAMELEN];
snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint(&buf, 1, 2); /* 1 field */
pq_sendint(&buf, 2, 2); /* 2 fields */
/* Field header */
/* Field headers */
pq_sendstring(&buf, "recptr");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
@ -653,11 +654,29 @@ SendXlogRecPtrResult(XLogRecPtr ptr)
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_sendstring(&buf, "tli");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
/*
* int8 may seem like a surprising data type for this, but in thory int4
* would not be wide enough for this, as TimeLineID is unsigned.
*/
pq_sendint(&buf, INT8OID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 1, 2); /* number of columns */
pq_sendint(&buf, 2, 2); /* number of columns */
snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str));
snprintf(str, sizeof(str), "%u", tli);
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str));
pq_endmessage(&buf);

View File

@ -117,6 +117,7 @@ static uint32 sendOff = 0;
* history forked off from that timeline at sendTimeLineValidUpto.
*/
static TimeLineID sendTimeLine = 0;
static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
* requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/*
@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd)
streamingDoneSending = streamingDoneReceiving = false;
/* If there is nothing to stream, don't even enter COPY mode */
if (!sendTimeLineIsHistoric ||
cmd->startpoint < sendTimeLineValidUpto)
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
* When we first start replication the standby will be behind the primary.
@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd)
if (walsender_ready_to_stop)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDoneSending && streamingDoneReceiving);
}
/* Get out of COPY mode (CommandComplete). */
EndCommand("COPY 0", DestRemote);
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
*/
if (sendTimeLineIsHistoric)
{
char str[11];
snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint(&buf, 1, 2); /* 1 field */
/* Field header */
pq_sendstring(&buf, "next_tli");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
/*
* int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned.
*/
pq_sendint(&buf, INT8OID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 1, 2); /* number of columns */
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str));
pq_endmessage(&buf);
}
/* Send CommandComplete message */
pq_puttextmessage('C', "START_STREAMING");
}
/*
@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup)
List *history;
history = readTimeLineHistory(ThisTimeLineID);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sentPtr <= sendTimeLineValidUpto);
Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history);
/* the current send pointer should be <= the switchpoint */

View File

@ -243,7 +243,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
true))
NULL))
/*
* Any errors will already have been reported in the function process,
@ -1220,7 +1220,7 @@ BaseBackup(void)
{
PGresult *res;
char *sysidentifier;
uint32 timeline;
uint32 starttli;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
@ -1259,7 +1259,6 @@ BaseBackup(void)
disconnect_and_exit(1);
}
sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
timeline = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
/*
@ -1291,18 +1290,24 @@ BaseBackup(void)
progname, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1)
if (PQntuples(res) != 1 || PQnfields(res) < 2)
{
fprintf(stderr, _("%s: no start point returned from server\n"),
progname);
fprintf(stderr,
_("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 2);
disconnect_and_exit(1);
}
strcpy(xlogstart, PQgetvalue(res, 0, 0));
if (verbose && includewal)
fprintf(stderr, "transaction log start point: %s\n", xlogstart);
starttli = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
MemSet(xlogend, 0, sizeof(xlogend));
if (verbose && includewal)
fprintf(stderr, _("transaction log start point: %s on timeline %u\n"),
xlogstart, starttli);
/*
* Get the header
*/
@ -1358,7 +1363,7 @@ BaseBackup(void)
if (verbose)
fprintf(stderr, _("%s: starting background WAL receiver\n"),
progname);
StartLogStreamer(xlogstart, timeline, sysidentifier);
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
/*

View File

@ -39,8 +39,7 @@ volatile bool time_to_abort = false;
static void usage(void);
static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos,
uint32 currenttimeline);
static XLogRecPtr FindStreamingStart(uint32 *tli);
static void StreamLog();
static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
bool segment_finished);
@ -70,14 +69,31 @@ usage(void)
}
static bool
stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
{
static uint32 prevtimeline = 0;
static XLogRecPtr prevpos = InvalidXLogRecPtr;
/* we assume that we get called once at the end of each segment */
if (verbose && segment_finished)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname,
(uint32) (segendpos >> 32), (uint32) segendpos,
progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
timeline);
/*
* Note that we report the previous, not current, position here. That's
* the exact location where the timeline switch happend. After the switch,
* we restart streaming from the beginning of the segment, so xlogpos can
* smaller than prevpos if we just switched to new timeline.
*/
if (prevtimeline != 0 && prevtimeline != timeline)
fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
progname, timeline,
(uint32) (prevpos >> 32), (uint32) prevpos);
prevtimeline = timeline;
prevpos = xlogpos;
if (time_to_abort)
{
fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
@ -88,20 +104,19 @@ stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
}
/*
* Determine starting location for streaming, based on:
* 1. If there are existing xlog segments, start at the end of the last one
* that is complete (size matches XLogSegSize)
* 2. If no valid xlog exists, start from the beginning of the current
* WAL segment.
* Determine starting location for streaming, based on any existing xlog
* segments in the directory. We start at the end of the last one that is
* complete (size matches XLogSegSize), on the timeline with highest ID.
*
* If there are no WAL files in the directory, returns InvalidXLogRecPtr.
*/
static XLogRecPtr
FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
FindStreamingStart(uint32 *tli)
{
DIR *dir;
struct dirent *dirent;
int i;
bool b;
XLogSegNo high_segno = 0;
uint32 high_tli = 0;
dir = opendir(basedir);
if (dir == NULL)
@ -120,26 +135,13 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
seg;
XLogSegNo segno;
if (strcmp(dirent->d_name, ".") == 0 ||
strcmp(dirent->d_name, "..") == 0)
continue;
/* xlog files are always 24 characters */
if (strlen(dirent->d_name) != 24)
continue;
/* Filenames are always made out of 0-9 and A-F */
b = false;
for (i = 0; i < 24; i++)
{
if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
!(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
{
b = true;
break;
}
}
if (b)
/*
* Check if the filename looks like an xlog file, or a .partial file.
* Xlog files are always 24 characters, and .partial files are 32
* characters.
*/
if (strlen(dirent->d_name) != 24 ||
!strspn(dirent->d_name, "0123456789ABCDEF") == 24)
continue;
/*
@ -154,10 +156,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
}
segno = ((uint64) log) << 32 | seg;
/* Ignore any files that are for another timeline */
if (tli != currenttimeline)
continue;
/* Check if this is a completed segment or not */
snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
if (stat(fullpath, &statbuf) != 0)
@ -170,9 +168,10 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
if (statbuf.st_size == XLOG_SEG_SIZE)
{
/* Completed segment */
if (segno > high_segno)
if (segno > high_segno || (segno == high_segno && tli > high_tli))
{
high_segno = segno;
high_tli = tli;
continue;
}
}
@ -199,10 +198,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
*tli = high_tli;
return high_ptr;
}
else
return currentpos;
return InvalidXLogRecPtr;
}
/*
@ -212,8 +212,10 @@ static void
StreamLog(void)
{
PGresult *res;
uint32 timeline;
XLogRecPtr startpos;
uint32 starttli;
XLogRecPtr serverpos;
uint32 servertli;
uint32 hi,
lo;
@ -243,7 +245,7 @@ StreamLog(void)
progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1);
}
timeline = atoi(PQgetvalue(res, 0, 1));
servertli = atoi(PQgetvalue(res, 0, 1));
if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
@ -251,13 +253,18 @@ StreamLog(void)
progname, PQgetvalue(res, 0, 2));
disconnect_and_exit(1);
}
startpos = ((uint64) hi) << 32 | lo;
serverpos = ((uint64) hi) << 32 | lo;
PQclear(res);
/*
* Figure out where to start streaming.
*/
startpos = FindStreamingStart(startpos, timeline);
startpos = FindStreamingStart(&starttli);
if (startpos == InvalidXLogRecPtr)
{
startpos = serverpos;
starttli = servertli;
}
/*
* Always start streaming at the beginning of a segment
@ -271,10 +278,10 @@ StreamLog(void)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
progname, (uint32) (startpos >> 32), (uint32) startpos,
timeline);
starttli);
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
stop_streaming, standby_message_timeout, false);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial");
PQfinish(conn);
}

View File

@ -28,19 +28,24 @@
#include "streamutil.h"
/* fd for currently open WAL file */
/* fd and filename for currently open WAL file */
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos);
/*
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
* enough room in this buffer...
* Open a new WAL file in the specified directory.
*
* The file will be padded to 16Mb with zeroes.
* The file will be padded to 16Mb with zeroes. The base filename (without
* partial_suffix) is stored in current_walfile_name.
*/
static int
static bool
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
char *namebuf)
char *partial_suffix)
{
int f;
char fn[MAXPGPATH];
@ -50,16 +55,17 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
XLogFileName(namebuf, timeline, segno);
XLogFileName(current_walfile_name, timeline, segno);
snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
partial_suffix ? partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
fprintf(stderr,
_("%s: could not open transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno));
return -1;
return false;
}
/*
@ -72,17 +78,21 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
_("%s: could not stat transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno));
close(f);
return -1;
return false;
}
if (statbuf.st_size == XLogSegSize)
return f; /* File is open and ready to use */
{
/* File is open and ready to use */
walfile = f;
return true;
}
if (statbuf.st_size != 0)
{
fprintf(stderr,
_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
progname, fn, (int) statbuf.st_size, XLogSegSize);
close(f);
return -1;
return false;
}
/* New, empty, file. So pad it to 16Mb with zeroes */
@ -97,7 +107,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
free(zerobuf);
close(f);
unlink(fn);
return -1;
return false;
}
}
free(zerobuf);
@ -108,42 +118,45 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno));
close(f);
return -1;
return false;
}
return f;
walfile = f;
return true;
}
/*
* Close the current WAL file, and rename it to the correct filename if it's
* complete.
*
* If segment_complete is true, rename the current WAL file even if we've not
* completed writing the whole segment.
* Close the current WAL file (if open), and rename it to the correct
* filename if it's complete. On failure, prints an error message to stderr
* and returns false, otherwise returns true.
*/
static bool
close_walfile(char *basedir, char *walname, bool segment_complete)
close_walfile(char *basedir, char *partial_suffix)
{
off_t currpos = lseek(walfile, 0, SEEK_CUR);
off_t currpos;
if (walfile == -1)
return true;
currpos = lseek(walfile, 0, SEEK_CUR);
if (currpos == -1)
{
fprintf(stderr,
_("%s: could not determine seek position in file \"%s\": %s\n"),
progname, walname, strerror(errno));
progname, current_walfile_name, strerror(errno));
return false;
}
if (fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, walname, strerror(errno));
progname, current_walfile_name, strerror(errno));
return false;
}
if (close(walfile) != 0)
{
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, walname, strerror(errno));
progname, current_walfile_name, strerror(errno));
walfile = -1;
return false;
}
@ -153,24 +166,24 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
* Rename the .partial file only if we've completed writing the whole
* segment or segment_complete is true.
*/
if (currpos == XLOG_SEG_SIZE || segment_complete)
if (currpos == XLOG_SEG_SIZE && partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
progname, walname, strerror(errno));
progname, current_walfile_name, strerror(errno));
return false;
}
}
else
else if (partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s\", segment is not complete\n"),
progname, walname);
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
return true;
}
@ -233,6 +246,123 @@ localTimestampDifferenceExceeds(int64 start_time,
return (diff >= msec * INT64CONST(1000));
}
/*
* Check if a timeline history file exists.
*/
static bool
existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
{
char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
int fd;
/*
* Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it.
*/
if (tli == 1)
return true;
TLHistoryFileName(histfname, tli);
snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
{
if (errno != ENOENT)
fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s"),
progname, path, strerror(errno));
return false;
}
else
{
close(fd);
return true;
}
}
static bool
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
{
int size = strlen(content);
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
char histfname[MAXFNAMELEN];
int fd;
/*
* Check that the server's idea of how timeline history files should be
* named matches ours.
*/
TLHistoryFileName(histfname, tli);
if (strcmp(histfname, filename) != 0)
{
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s"),
progname, tli, filename);
return false;
}
/*
* Write into a temp file name.
*/
snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
unlink(tmppath);
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
{
fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s"),
progname, tmppath, strerror(errno));
return false;
}
errno = 0;
if ((int) write(fd, content, size) != size)
{
int save_errno = errno;
/*
* If we fail to make the file, delete it to release disk space
*/
unlink(tmppath);
errno = save_errno;
fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s"),
progname, tmppath, strerror(errno));
return false;
}
if (fsync(fd) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false;
}
if (close(fd) != 0)
{
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false;
}
/*
* Now move the completed history file into place with its final name.
*/
snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
if (rename(tmppath, path) < 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
progname, tmppath, path, strerror(errno));
return false;
}
return true;
}
/*
* Converts an int64 to network byte order.
*/
@ -314,7 +444,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
* (by sending an extra IDENTIFY_SYSTEM command)
*
* All received segments will be written to the directory
* specified by basedir.
* specified by basedir. This will also fetch any missing timeline history
* files.
*
* The stream_stop callback will be called every time data
* is received, and whenever a segment is completed. If it returns
@ -327,20 +458,22 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
* This message will only contain the write location, and never
* flush or replay.
*
* If 'partial_suffix' is not NULL, files are initially created with the
* given suffix, and the suffix is removed once the file is finished. That
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, bool rename_partial)
int standby_message_timeout, char *partial_suffix)
{
char query[128];
char current_walfile_name[MAXPGPATH];
PGresult *res;
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = InvalidXLogRecPtr;
XLogRecPtr stoppos;
/*
* The message format used in streaming replication changed in 9.3, so we
@ -359,7 +492,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (sysidentifier != NULL)
{
/* Validate system identifier and timeline hasn't changed */
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
@ -385,33 +518,184 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
if (timeline != atoi(PQgetvalue(res, 0, 1)))
if (timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: timeline does not match between base backup and streaming connection\n"),
progname);
_("%s: starting timeline %u is not present in the server\n"),
progname, timeline);
PQclear(res);
return false;
}
PQclear(res);
}
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %X/%X",
(uint32) (startpos >> 32), (uint32) startpos);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
while (1)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "START_REPLICATION", PQresultErrorMessage(res));
PQclear(res);
return false;
}
PQclear(res);
/*
* Fetch the timeline history file for this timeline, if we don't
* have it already.
*/
if (!existsTimeLineHistoryFile(basedir, timeline))
{
snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
/* FIXME: we might send it ok, but get an error */
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
PQclear(res);
return false;
}
/*
* The response to TIMELINE_HISTORY is a single row result set
* with two fields: filename and content
*/
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 2);
}
/* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 1));
PQclear(res);
}
/*
* Before we start streaming from the requested location, check
* if the callback tells us to stop here.
*/
if (stream_stop(startpos, timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "START_REPLICATION", PQresultErrorMessage(res));
PQclear(res);
return false;
}
PQclear(res);
/* Stream the WAL */
if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
&stoppos))
goto error;
/*
* Streaming finished.
*
* There are two possible reasons for that: a controlled shutdown,
* or we reached the end of the current timeline. In case of
* end-of-timeline, the server sends a result set after Copy has
* finished, containing the next timeline's ID. Read that, and
* restart streaming from the next timeline.
*/
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
* End-of-timeline. Read the next timeline's ID.
*/
uint32 newtimeline;
newtimeline = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
if (newtimeline <= timeline)
{
/* shouldn't happen */
fprintf(stderr,
"server reported unexpected next timeline %u, following timeline %u\n",
newtimeline, timeline);
goto error;
}
/* Read the final result, which should be CommandComplete. */
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
PQclear(res);
/*
* Loop back to start streaming from the new timeline.
* Always start streaming at the beginning of a segment.
*/
timeline = newtimeline;
startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/*
* End of replication (ie. controlled shut down of the server).
*
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
if (stream_stop(stoppos, timeline, false))
return true;
else
{
fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
progname);
goto error;
}
}
else
{
/* Server returned an error. */
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
}
error:
if (walfile != -1 && close(walfile) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno));
walfile = -1;
return false;
}
/*
* The main loop of ReceiveXLogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*
* If the COPY ends normally, returns true and sets *stoppos to the last
* byte written. On error, returns false.
*/
static bool
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = startpos;
bool still_sending = true;
/*
* Receive the actual xlog data
*/
while (1)
{
int r;
@ -430,20 +714,27 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming, or abort at this point.
*/
if (stream_stop && stream_stop(blockpos, timeline, false))
if (still_sending && stream_stop(blockpos, timeline, false))
{
if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
rename_partial))
if (!close_walfile(basedir, partial_suffix))
{
/* Potential error message is written by close_walfile */
goto error;
return true;
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
goto error;
}
still_sending = false;
}
/*
* Potentially send a status message to the master
*/
now = localGetCurrentTimestamp();
if (standby_message_timeout > 0 &&
if (still_sending && standby_message_timeout > 0 &&
localTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
{
@ -457,9 +748,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (r == 0)
{
/*
* In async mode, and no data available. We block on reading but
* not more than the specified timeout, so that we can send a
* response back to the client.
* No data available. Wait for some to appear, but not longer
* than the specified timeout, so that we can ping the server.
*/
fd_set input_mask;
struct timeval timeout;
@ -467,7 +757,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
FD_ZERO(&input_mask);
FD_SET(PQsocket(conn), &input_mask);
if (standby_message_timeout)
if (standby_message_timeout && still_sending)
{
int64 targettime;
long secs;
@ -493,8 +783,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
{
/*
* Got a timeout or signal. Continue the loop and either
* deliver a status packet to the server or just go back into
* blocking.
* deliver a status packet to the server or just go back
* into blocking.
*/
continue;
}
@ -515,8 +805,31 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
continue;
}
if (r == -1)
/* End of copy stream */
break;
{
/*
* The server closed its end of the copy stream. Close ours
* if we haven't done so already, and exit.
*/
if (still_sending)
{
if (!close_walfile(basedir, partial_suffix))
{
/* Error message written in close_walfile() */
goto error;
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
goto error;
}
still_sending = false;
}
if (copybuf != NULL)
PQfreemem(copybuf);
*stoppos = blockpos;
return true;
}
if (r == -2)
{
fprintf(stderr, _("%s: could not read COPY data: %s"),
@ -548,174 +861,148 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
replyRequested = copybuf[pos];
/* If the server requested an immediate reply, send one. */
if (replyRequested)
if (replyRequested && still_sending)
{
now = localGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
}
continue;
}
else if (copybuf[0] != 'w')
else if (copybuf[0] == 'w')
{
/*
* Once we've decided we don't want to receive any more, just
* ignore any subsequent XLogData messages.
*/
if (!still_sending)
continue;
/*
* Read the header of the XLogData message, enclosed in the
* CopyData message. We only need the WAL location field
* (dataStart), the rest of the header is ignored.
*/
hdr_len = 1; /* msgtype 'w' */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
blockpos = recvint64(&copybuf[1]);
/* Extract WAL location for this block */
xlogoff = blockpos % XLOG_SEG_SIZE;
/*
* Verify that the initial location in the stream matches where
* we think we are.
*/
if (walfile == -1)
{
/* No file open yet */
if (xlogoff != 0)
{
fprintf(stderr,
_("%s: received transaction log record for offset %u with no file open\n"),
progname, xlogoff);
goto error;
}
}
else
{
/* More data in existing segment */
/* XXX: store seek value don't reseek all the time */
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
}
bytes_left = r - hdr_len;
bytes_written = 0;
while (bytes_left)
{
int bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach
* XLOG_SEG_SIZE.
*/
if (xlogoff + bytes_left > XLOG_SEG_SIZE)
bytes_to_write = XLOG_SEG_SIZE - xlogoff;
else
bytes_to_write = bytes_left;
if (walfile == -1)
{
if (!open_walfile(blockpos, timeline,
basedir, partial_suffix))
{
/* Error logged by open_walfile */
goto error;
}
}
if (write(walfile,
copybuf + hdr_len + bytes_written,
bytes_to_write) != bytes_to_write)
{
fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name,
strerror(errno));
goto error;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
blockpos += bytes_to_write;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
if (!close_walfile(basedir, partial_suffix))
/* Error message written in close_walfile() */
goto error;
xlogoff = 0;
if (still_sending && stream_stop(blockpos, timeline, false))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
goto error;
}
still_sending = false;
break; /* ignore the rest of this XLogData packet */
}
}
}
/* No more data left to write, receive next copy packet */
}
else
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
goto error;
}
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
* of the header is ignored.
*/
hdr_len = 1; /* msgtype 'w' */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
blockpos = recvint64(&copybuf[1]);
/* Extract WAL location for this block */
xlogoff = blockpos % XLOG_SEG_SIZE;
/*
* Verify that the initial location in the stream matches where we
* think we are.
*/
if (walfile == -1)
{
/* No file open yet */
if (xlogoff != 0)
{
fprintf(stderr,
_("%s: received transaction log record for offset %u with no file open\n"),
progname, xlogoff);
goto error;
}
}
else
{
/* More data in existing segment */
/* XXX: store seek value don't reseek all the time */
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
}
bytes_left = r - hdr_len;
bytes_written = 0;
while (bytes_left)
{
int bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach
* XLOG_SEG_SIZE.
*/
if (xlogoff + bytes_left > XLOG_SEG_SIZE)
bytes_to_write = XLOG_SEG_SIZE - xlogoff;
else
bytes_to_write = bytes_left;
if (walfile == -1)
{
walfile = open_walfile(blockpos, timeline,
basedir, current_walfile_name);
if (walfile == -1)
/* Error logged by open_walfile */
goto error;
}
if (write(walfile,
copybuf + hdr_len + bytes_written,
bytes_to_write) != bytes_to_write)
{
fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name,
strerror(errno));
goto error;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
blockpos += bytes_to_write;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
if (!close_walfile(basedir, current_walfile_name, false))
/* Error message written in close_walfile() */
goto error;
xlogoff = 0;
if (stream_stop != NULL)
{
/*
* Callback when the segment finished, and return if it
* told us to.
*/
if (stream_stop(blockpos, timeline, true))
return true;
}
}
}
/* No more data left to write, start receiving next copy packet */
}
/*
* The only way to get out of the loop is if the server shut down the
* replication stream. If it's a controlled shutdown, the server will send
* a shutdown message, and we'll return the latest xlog location that has
* been streamed.
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
PQclear(res);
/* Complain if we've not reached stop point yet */
if (stream_stop != NULL && !stream_stop(blockpos, timeline, false))
{
fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
progname);
goto error;
}
if (copybuf != NULL)
PQfreemem(copybuf);
if (walfile != -1 && close(walfile) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno));
walfile = -1;
return true;
error:
if (copybuf != NULL)
PQfreemem(copybuf);
if (walfile != -1 && close(walfile) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno));
walfile = -1;
return false;
}

View File

@ -13,4 +13,4 @@ extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
bool rename_partial);
char *partial_suffix);

View File

@ -37,6 +37,7 @@ extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size);
extern bool tliInHistory(TimeLineID tli, List *expectedTLIs);
extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history);
extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history);
extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history,
TimeLineID *nextTLI);
#endif /* TIMELINE_H */

View File

@ -317,8 +317,10 @@ extern void SetWalWriterSleeping(bool sleeping);
/*
* Starting/stopping a base backup
*/
extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile);
extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive);
extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
TimeLineID *starttli_p, char **labelfile);
extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
TimeLineID *stoptli_p);
extern void do_pg_abort_backup(void);
/* File path names (all relative to $PGDATA) */