diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index d32626677d..1e2604b832 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1031,8 +1031,13 @@ goes into copy-in mode, and the server may not send any more CopyData messages. After both sides have sent a CopyDone message, the copy mode is terminated, and the backend reverts to the command-processing mode. - See for more information on the - subprotocol transmitted over copy-both mode. + In the event of a backend-detected error during copy-both mode, + the backend will issue an ErrorResponse message, discard frontend messages + until a Sync message is received, and then issue ReadyForQuery and return + to normal processing. The frontend should treat receipt of ErrorResponse + as terminating the copy in both directions; no CopyDone should be sent + in this case. See for more + information on the subprotocol transmitted over copy-both mode. diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index b681efc7e6..f297003c62 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -32,10 +32,10 @@ static int walfile = -1; static char current_walfile_name[MAXPGPATH] = ""; -static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos); +static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, + uint32 timeline, char *basedir, + stream_stop_callback stream_stop, int standby_message_timeout, + char *partial_suffix, XLogRecPtr *stoppos); /* * Open a new WAL file in the specified directory. @@ -615,9 +615,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); /* Stream the WAL */ - if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, - standby_message_timeout, partial_suffix, - &stoppos)) + res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, + standby_message_timeout, partial_suffix, + &stoppos); + if (res == NULL) goto error; /* @@ -630,7 +631,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * restart streaming from the next timeline. */ - res = PQgetResult(conn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -708,10 +708,11 @@ error: * The main loop of ReceiveXLogStream. Handles the COPY stream after * initiating streaming with the START_STREAMING command. * - * If the COPY ends normally, returns true and sets *stoppos to the last - * byte written. On error, returns false. + * If the COPY ends (not necessarily successfully) due a message from the + * server, returns a PGresult and sets sets *stoppos to the last byte written. + * On any other sort of error, returns NULL. */ -static bool +static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, @@ -832,9 +833,12 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } if (r == -1) { + PGresult *res = PQgetResult(conn); + /* - * The server closed its end of the copy stream. Close ours - * if we haven't done so already, and exit. + * The server closed its end of the copy stream. If we haven't + * closed ours already, we need to do so now, unless the server + * threw an error, in which case we don't. */ if (still_sending) { @@ -843,18 +847,23 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Error message written in close_walfile() */ goto error; } - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + if (PQresultStatus(res) == PGRES_COPY_IN) { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); - goto error; + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, + _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + res = PQgetResult(conn); } still_sending = false; } if (copybuf != NULL) PQfreemem(copybuf); *stoppos = blockpos; - return true; + return res; } if (r == -2) { @@ -1030,5 +1039,5 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, error: if (copybuf != NULL) PQfreemem(copybuf); - return false; + return NULL; } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 7fa090adf3..a7d4f40d38 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1466,7 +1466,23 @@ getCopyDataMessage(PGconn *conn) break; case 'd': /* Copy Data, pass it back to caller */ return msgLength; + case 'c': + /* + * If this is a CopyDone message, exit COPY_OUT mode and let + * caller read status with PQgetResult(). If we're in + * COPY_BOTH mode, return to COPY_IN mode. + */ + if (conn->asyncStatus == PGASYNC_COPY_BOTH) + conn->asyncStatus = PGASYNC_COPY_IN; + else + conn->asyncStatus = PGASYNC_BUSY; + return -1; default: /* treat as end of copy */ + /* + * Any other message terminates either COPY_IN or COPY_BOTH + * mode. + */ + conn->asyncStatus = PGASYNC_BUSY; return -1; } @@ -1499,22 +1515,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) */ msgLength = getCopyDataMessage(conn); if (msgLength < 0) - { - /* - * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller - * read status with PQgetResult(). The normal case is that it's - * Copy Done, but we let parseInput read that. If error, we - * expect the state was already changed. - */ - if (msgLength == -1) - { - if (conn->asyncStatus == PGASYNC_COPY_BOTH) - conn->asyncStatus = PGASYNC_COPY_IN; - else - conn->asyncStatus = PGASYNC_BUSY; - } return msgLength; /* end-of-copy or error */ - } if (msgLength == 0) { /* Don't block if async read requested */