diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 5aac85d2ee..c052df242f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -41,7 +41,6 @@ void _PG_init(void); /* Current connection to the primary, if any */ static PGconn *streamConn = NULL; -static bool justconnected = false; /* Buffer for currently read records */ static char *recvBuf = NULL; @@ -168,7 +167,6 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) } PQclear(res); - justconnected = true; ereport(LOG, (errmsg("streaming replication successfully connected to primary"))); @@ -321,7 +319,6 @@ libpqrcv_disconnect(void) { PQfinish(streamConn); streamConn = NULL; - justconnected = false; } /* @@ -351,28 +348,30 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) PQfreemem(recvBuf); recvBuf = NULL; - /* - * If the caller requested to block, wait for data to arrive. But if this - * is the first call after connecting, don't wait, because there might - * already be some data in libpq buffer that we haven't returned to - * caller. - */ - if (timeout > 0 && !justconnected) + /* Try to receive a CopyData message */ + rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + if (rawlen == 0) { - if (!libpq_select(timeout)) - return false; + /* + * No data available yet. If the caller requested to block, wait for + * more data to arrive. + */ + if (timeout > 0) + { + if (!libpq_select(timeout)) + return false; + } if (PQconsumeInput(streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", PQerrorMessage(streamConn)))); - } - justconnected = false; - /* Receive CopyData message */ - rawlen = PQgetCopyData(streamConn, &recvBuf, 1); - if (rawlen == 0) /* no data available yet, then return */ - return false; + /* Now that we've consumed some input, try again */ + rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + if (rawlen == 0) + return false; + } if (rawlen == -1) /* end-of-streaming or error */ { PGresult *res;