diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index a260881517..d28e13b4d8 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -31,12 +31,23 @@ static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; +static bool still_sending = true; /* feedback still needs to be sent? */ + static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); +static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr blockpos, int64 *last_status); +static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr *blockpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + char *partial_suffix); +static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, + XLogRecPtr blockpos, char *basedir, char *partial_suffix, + XLogRecPtr *stoppos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); @@ -740,16 +751,13 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; - bool still_sending = true; + + still_sending = true; while (1) { int r; - int xlogoff; - int bytes_left; - int bytes_written; int64 now; - int hdr_len; long sleeptime; /* @@ -818,198 +826,26 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; if (r == -2) { - PGresult *res = PQgetResult(conn); - - /* - * The server closed its end of the copy stream. If we haven't - * closed ours already, we need to do so now, unless the server - * threw an error, in which case we don't. - */ - if (still_sending) - { - if (!close_walfile(basedir, partial_suffix, blockpos)) - { - /* Error message written in close_walfile() */ - PQclear(res); - goto error; - } - if (PQresultStatus(res) == PGRES_COPY_IN) - { - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) - { - fprintf(stderr, - _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); - PQclear(res); - goto error; - } - PQclear(res); - res = PQgetResult(conn); - } - still_sending = false; - } - if (copybuf != NULL) - PQfreemem(copybuf); - copybuf = NULL; - *stoppos = blockpos; - return res; + PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, + basedir, partial_suffix, stoppos); + if (res == NULL) + goto error; + else + return res; } /* Check the message type. */ if (copybuf[0] == 'k') { - int pos; - bool replyRequested; - - /* - * Parse the keepalive message, enclosed in the CopyData message. - * We just check if the server requested a reply, and ignore the - * rest. - */ - pos = 1; /* skip msgtype 'k' */ - pos += 8; /* skip walEnd */ - pos += 8; /* skip sendTime */ - - if (r < pos + 1) - { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); + if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, + &last_status)) goto error; - } - replyRequested = copybuf[pos]; - - /* If the server requested an immediate reply, send one. */ - if (replyRequested && still_sending) - { - now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, blockpos, now, false)) - goto error; - last_status = now; - } } else if (copybuf[0] == 'w') { - /* - * Once we've decided we don't want to receive any more, just - * ignore any subsequent XLogData messages. - */ - if (!still_sending) - continue; - - /* - * Read the header of the XLogData message, enclosed in the - * CopyData message. We only need the WAL location field - * (dataStart), the rest of the header is ignored. - */ - hdr_len = 1; /* msgtype 'w' */ - hdr_len += 8; /* dataStart */ - hdr_len += 8; /* walEnd */ - hdr_len += 8; /* sendTime */ - if (r < hdr_len) - { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); + if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, + timeline, basedir, stream_stop, partial_suffix)) goto error; - } - blockpos = fe_recvint64(©buf[1]); - - /* Extract WAL location for this block */ - xlogoff = blockpos % XLOG_SEG_SIZE; - - /* - * Verify that the initial location in the stream matches where we - * think we are. - */ - if (walfile == -1) - { - /* No file open yet */ - if (xlogoff != 0) - { - fprintf(stderr, - _("%s: received transaction log record for offset %u with no file open\n"), - progname, xlogoff); - goto error; - } - } - else - { - /* More data in existing segment */ - /* XXX: store seek value don't reseek all the time */ - if (lseek(walfile, 0, SEEK_CUR) != xlogoff) - { - fprintf(stderr, - _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); - goto error; - } - } - - bytes_left = r - hdr_len; - bytes_written = 0; - - while (bytes_left) - { - int bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach - * XLOG_SEG_SIZE. - */ - if (xlogoff + bytes_left > XLOG_SEG_SIZE) - bytes_to_write = XLOG_SEG_SIZE - xlogoff; - else - bytes_to_write = bytes_left; - - if (walfile == -1) - { - if (!open_walfile(blockpos, timeline, - basedir, partial_suffix)) - { - /* Error logged by open_walfile */ - goto error; - } - } - - if (write(walfile, - copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) - { - fprintf(stderr, - _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), - progname, bytes_to_write, current_walfile_name, - strerror(errno)); - goto error; - } - - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - blockpos += bytes_to_write; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if (blockpos % XLOG_SEG_SIZE == 0) - { - if (!close_walfile(basedir, partial_suffix, blockpos)) - /* Error message written in close_walfile() */ - goto error; - - xlogoff = 0; - - if (still_sending && stream_stop(blockpos, timeline, true)) - { - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) - { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); - goto error; - } - still_sending = false; - break; /* ignore the rest of this XLogData packet */ - } - } - } - /* No more data left to write, receive next copy packet */ } else { @@ -1135,3 +971,225 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer) *buffer = copybuf; return rawlen; } + +/* + * Process the keepalive message. + */ +static bool +ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr blockpos, int64 *last_status) +{ + int pos; + bool replyRequested; + int64 now; + + /* + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. + */ + pos = 1; /* skip msgtype 'k' */ + pos += 8; /* skip walEnd */ + pos += 8; /* skip sendTime */ + + if (len < pos + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, len); + return false; + } + replyRequested = copybuf[pos]; + + /* If the server requested an immediate reply, send one. */ + if (replyRequested && still_sending) + { + now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, blockpos, now, false)) + return false; + *last_status = now; + } + + return true; +} + +/* + * Process XLogData message. + */ +static bool +ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr *blockpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + char *partial_suffix) +{ + int xlogoff; + int bytes_left; + int bytes_written; + int hdr_len; + + /* + * Once we've decided we don't want to receive any more, just + * ignore any subsequent XLogData messages. + */ + if (!(still_sending)) + return true; + + /* + * Read the header of the XLogData message, enclosed in the + * CopyData message. We only need the WAL location field + * (dataStart), the rest of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (len < hdr_len) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, len); + return false; + } + *blockpos = fe_recvint64(©buf[1]); + + /* Extract WAL location for this block */ + xlogoff = *blockpos % XLOG_SEG_SIZE; + + /* + * Verify that the initial location in the stream matches where we + * think we are. + */ + if (walfile == -1) + { + /* No file open yet */ + if (xlogoff != 0) + { + fprintf(stderr, + _("%s: received transaction log record for offset %u with no file open\n"), + progname, xlogoff); + return false; + } + } + else + { + /* More data in existing segment */ + /* XXX: store seek value don't reseek all the time */ + if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + { + fprintf(stderr, + _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + return false; + } + } + + bytes_left = len - hdr_len; + bytes_written = 0; + + while (bytes_left) + { + int bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach + * XLOG_SEG_SIZE. + */ + if (xlogoff + bytes_left > XLOG_SEG_SIZE) + bytes_to_write = XLOG_SEG_SIZE - xlogoff; + else + bytes_to_write = bytes_left; + + if (walfile == -1) + { + if (!open_walfile(*blockpos, timeline, + basedir, partial_suffix)) + { + /* Error logged by open_walfile */ + return false; + } + } + + if (write(walfile, + copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) + { + fprintf(stderr, + _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), + progname, bytes_to_write, current_walfile_name, + strerror(errno)); + return false; + } + + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + *blockpos += bytes_to_write; + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if (*blockpos % XLOG_SEG_SIZE == 0) + { + if (!close_walfile(basedir, partial_suffix, *blockpos)) + /* Error message written in close_walfile() */ + return false; + + xlogoff = 0; + + if (still_sending && stream_stop(*blockpos, timeline, true)) + { + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + still_sending = false; + return true; /* ignore the rest of this XLogData packet */ + } + } + } + /* No more data left to write, receive next copy packet */ + + return true; +} + +/* + * Handle end of the copy stream. + */ +static PGresult * +HandleEndOfCopyStream(PGconn *conn, char *copybuf, + XLogRecPtr blockpos, char *basedir, char *partial_suffix, + XLogRecPtr *stoppos) +{ + PGresult *res = PQgetResult(conn); + + /* + * The server closed its end of the copy stream. If we haven't + * closed ours already, we need to do so now, unless the server + * threw an error, in which case we don't. + */ + if (still_sending) + { + if (!close_walfile(basedir, partial_suffix, blockpos)) + { + /* Error message written in close_walfile() */ + PQclear(res); + return NULL; + } + if (PQresultStatus(res) == PGRES_COPY_IN) + { + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, + _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + PQclear(res); + return NULL; + } + res = PQgetResult(conn); + } + still_sending = false; + } + if (copybuf != NULL) + PQfreemem(copybuf); + *stoppos = blockpos; + return res; +}