From a5a02a744555789ab8390dbf57271e9d07127602 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 13 Jan 2011 17:51:28 +0200 Subject: [PATCH] Fix the logic in libpqrcv_receive() to determine if there's any incoming data that can be read without blocking. It used to conclude that there isn't, even though there was data in the socket receive buffer. That lead walreceiver to flush the WAL after every received chunk, potentially causing big performance issues. Backpatch to 9.0, because the performance impact can be very significant. --- .../libpqwalreceiver/libpqwalreceiver.c | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 5aac85d2ee..c052df242f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -41,7 +41,6 @@ void _PG_init(void); /* Current connection to the primary, if any */ static PGconn *streamConn = NULL; -static bool justconnected = false; /* Buffer for currently read records */ static char *recvBuf = NULL; @@ -168,7 +167,6 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) } PQclear(res); - justconnected = true; ereport(LOG, (errmsg("streaming replication successfully connected to primary"))); @@ -321,7 +319,6 @@ libpqrcv_disconnect(void) { PQfinish(streamConn); streamConn = NULL; - justconnected = false; } /* @@ -351,28 +348,30 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) PQfreemem(recvBuf); recvBuf = NULL; - /* - * If the caller requested to block, wait for data to arrive. But if this - * is the first call after connecting, don't wait, because there might - * already be some data in libpq buffer that we haven't returned to - * caller. - */ - if (timeout > 0 && !justconnected) + /* Try to receive a CopyData message */ + rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + if (rawlen == 0) { - if (!libpq_select(timeout)) - return false; + /* + * No data available yet. If the caller requested to block, wait for + * more data to arrive. + */ + if (timeout > 0) + { + if (!libpq_select(timeout)) + return false; + } if (PQconsumeInput(streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", PQerrorMessage(streamConn)))); - } - justconnected = false; - /* Receive CopyData message */ - rawlen = PQgetCopyData(streamConn, &recvBuf, 1); - if (rawlen == 0) /* no data available yet, then return */ - return false; + /* Now that we've consumed some input, try again */ + rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + if (rawlen == 0) + return false; + } if (rawlen == -1) /* end-of-streaming or error */ { PGresult *res;