Attempt to fix error recovery in COPY BOTH mode.

Previously, libpq and the backend had opposite ideas about whether
it was necessary for the client to send a CopyDone message after
receiving an ErrorResponse, making it impossible to cleanly exit
COPY BOTH mode.  Fix libpq so that works correctly, adopting the
backend's notion that an ErrorResponse kills the copy in both
directions.

Adjust receivelog.c to avoid a degradation in the quality of the
resulting error messages.  libpqwalreceiver.c is already doing
the right thing, so no adjustment needed there.

Add an explicit statement to the documentation explaining how
this part of the protocol is supposed to work, in the hopes of
avoiding future confusion in this area.

Since the consequences of all this confusion are very limited,
especially in the back-branches where no client ever attempts
to exit COPY BOTH mode without closing the connection entirely,
no back-patch.
This commit is contained in:
Robert Haas 2013-04-29 06:29:32 -04:00
parent 43e7a66849
commit 91fa8532f4
3 changed files with 51 additions and 36 deletions

View File

@ -1031,8 +1031,13 @@
goes into copy-in mode, and the server may not send any more CopyData
messages. After both sides have sent a CopyDone message, the copy mode
is terminated, and the backend reverts to the command-processing mode.
See <xref linkend="protocol-replication"> for more information on the
subprotocol transmitted over copy-both mode.
In the event of a backend-detected error during copy-both mode,
the backend will issue an ErrorResponse message, discard frontend messages
until a Sync message is received, and then issue ReadyForQuery and return
to normal processing. The frontend should treat receipt of ErrorResponse
as terminating the copy in both directions; no CopyDone should be sent
in this case. See <xref linkend="protocol-replication"> for more
information on the subprotocol transmitted over copy-both mode.
</para>
<para>

View File

@ -32,10 +32,10 @@
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos);
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
/*
* Open a new WAL file in the specified directory.
@ -615,9 +615,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
/* Stream the WAL */
if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
&stoppos))
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
&stoppos);
if (res == NULL)
goto error;
/*
@ -630,7 +631,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* restart streaming from the next timeline.
*/
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
@ -708,10 +708,11 @@ error:
* The main loop of ReceiveXLogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*
* If the COPY ends normally, returns true and sets *stoppos to the last
* byte written. On error, returns false.
* If the COPY ends (not necessarily successfully) due a message from the
* server, returns a PGresult and sets sets *stoppos to the last byte written.
* On any other sort of error, returns NULL.
*/
static bool
static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
@ -832,9 +833,12 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
if (r == -1)
{
PGresult *res = PQgetResult(conn);
/*
* The server closed its end of the copy stream. Close ours
* if we haven't done so already, and exit.
* The server closed its end of the copy stream. If we haven't
* closed ours already, we need to do so now, unless the server
* threw an error, in which case we don't.
*/
if (still_sending)
{
@ -843,18 +847,23 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Error message written in close_walfile() */
goto error;
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
if (PQresultStatus(res) == PGRES_COPY_IN)
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
goto error;
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr,
_("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
goto error;
}
res = PQgetResult(conn);
}
still_sending = false;
}
if (copybuf != NULL)
PQfreemem(copybuf);
*stoppos = blockpos;
return true;
return res;
}
if (r == -2)
{
@ -1030,5 +1039,5 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
error:
if (copybuf != NULL)
PQfreemem(copybuf);
return false;
return NULL;
}

View File

@ -1466,7 +1466,23 @@ getCopyDataMessage(PGconn *conn)
break;
case 'd': /* Copy Data, pass it back to caller */
return msgLength;
case 'c':
/*
* If this is a CopyDone message, exit COPY_OUT mode and let
* caller read status with PQgetResult(). If we're in
* COPY_BOTH mode, return to COPY_IN mode.
*/
if (conn->asyncStatus == PGASYNC_COPY_BOTH)
conn->asyncStatus = PGASYNC_COPY_IN;
else
conn->asyncStatus = PGASYNC_BUSY;
return -1;
default: /* treat as end of copy */
/*
* Any other message terminates either COPY_IN or COPY_BOTH
* mode.
*/
conn->asyncStatus = PGASYNC_BUSY;
return -1;
}
@ -1499,22 +1515,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
*/
msgLength = getCopyDataMessage(conn);
if (msgLength < 0)
{
/*
* On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
* read status with PQgetResult(). The normal case is that it's
* Copy Done, but we let parseInput read that. If error, we
* expect the state was already changed.
*/
if (msgLength == -1)
{
if (conn->asyncStatus == PGASYNC_COPY_BOTH)
conn->asyncStatus = PGASYNC_COPY_IN;
else
conn->asyncStatus = PGASYNC_BUSY;
}
return msgLength; /* end-of-copy or error */
}
if (msgLength == 0)
{
/* Don't block if async read requested */