From ea5516081dcbdc146ae0b3314a88cda2c48b0ca5 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 26 May 2010 22:21:33 +0000 Subject: [PATCH] In walsender, don't sleep if there's outstanding WAL waiting to be sent, otherwise we effectively rate-limit the streaming as pointed out by Simon Riggs. Also, send the WAL in smaller chunks, to respond to signals more promptly. --- src/backend/replication/walsender.c | 193 +++++++++++++++------------- 1 file changed, 102 insertions(+), 91 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 22272205cd..0d976f5b11 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -30,7 +30,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.20 2010/05/09 18:11:55 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.21 2010/05/26 22:21:33 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -100,13 +100,19 @@ static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); -static bool XLogSend(StringInfo outMsg); +static bool XLogSend(StringInfo outMsg, bool *caughtup); static void CheckClosedConnection(void); /* * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. + * + * We don't have a good idea of what a good value would be; there's some + * overhead per message in both walsender and walreceiver, but on the other + * hand sending large batches makes walsender less responsive to signals + * because signals are checked only between messages. 128kB seems like + * a reasonable guess for now. */ -#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2) +#define MAX_SEND_SIZE (128 * 1024) /* Main entry point for walsender process */ int @@ -360,6 +366,7 @@ static int WalSndLoop(void) { StringInfoData output_message; + bool caughtup = false; initStringInfo(&output_message); @@ -387,7 +394,7 @@ WalSndLoop(void) */ if (ready_to_stop) { - XLogSend(&output_message); + XLogSend(&output_message, &caughtup); shutdown_requested = true; } @@ -402,31 +409,32 @@ WalSndLoop(void) } /* - * Nap for the configured time or until a message arrives. + * If we had sent all accumulated WAL in last round, nap for the + * configured time before retrying. * * On some platforms, signals won't interrupt the sleep. To ensure we * respond reasonably promptly when someone signals us, break down the * sleep into NAPTIME_PER_CYCLE increments, and check for * interrupts after each nap. */ - remain = WalSndDelay * 1000L; - while (remain > 0) + if (caughtup) { - if (got_SIGHUP || shutdown_requested || ready_to_stop) - break; + remain = WalSndDelay * 1000L; + while (remain > 0) + { + /* Check for interrupts */ + if (got_SIGHUP || shutdown_requested || ready_to_stop) + break; - /* - * Check to see whether a message from the standby or an interrupt - * from other processes has arrived. - */ - pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); - CheckClosedConnection(); + /* Sleep and check that the connection is still alive */ + pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); + CheckClosedConnection(); - remain -= NAPTIME_PER_CYCLE; + remain -= NAPTIME_PER_CYCLE; + } } - /* Attempt to send the log once every loop */ - if (!XLogSend(&output_message)) + if (!XLogSend(&output_message, &caughtup)) goto eof; } @@ -623,15 +631,20 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) } /* - * Read all WAL that's been written (and flushed) since last cycle, and send - * it to client. + * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed), + * but not yet sent to the client, and send it. If there is no unsent WAL, + * *caughtup is set to true and nothing is sent, otherwise *caughtup is set + * to false. * * Returns true if OK, false if trouble. */ static bool -XLogSend(StringInfo outMsg) +XLogSend(StringInfo outMsg, bool *caughtup) { XLogRecPtr SendRqstPtr; + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; char activitymsg[50]; /* use volatile pointer to prevent code rearrangement */ @@ -642,85 +655,83 @@ XLogSend(StringInfo outMsg) /* Quick exit if nothing to do */ if (!XLByteLT(sentPtr, SendRqstPtr)) + { + *caughtup = true; return true; + } + /* + * Otherwise let the caller know that we're not fully caught up. Unless + * there's a huge backlog, we'll be caught up to the current WriteRecPtr + * after we've sent everything below, but more WAL could accumulate while + * we're busy sending. + */ + *caughtup = false; /* - * We gather multiple records together by issuing just one XLogRead() of a - * suitable size, and send them as one CopyData message. Repeat until - * we've sent everything we can. + * Figure out how much to send in one message. If there's less than + * MAX_SEND_SIZE bytes to send, send everything. Otherwise send + * MAX_SEND_SIZE bytes, but round to page boundary. + * + * The rounding is not only for performance reasons. Walreceiver + * relies on the fact that we never split a WAL record across two + * messages. Since a long WAL record is split at page boundary into + * continuation records, page boundary is always a safe cut-off point. + * We also assume that SendRqstPtr never points in the middle of a WAL + * record. */ - while (XLByteLT(sentPtr, SendRqstPtr)) + startptr = sentPtr; + if (startptr.xrecoff >= XLogFileSize) { - XLogRecPtr startptr; - XLogRecPtr endptr; - Size nbytes; - /* - * Figure out how much to send in one message. If there's less than - * MAX_SEND_SIZE bytes to send, send everything. Otherwise send - * MAX_SEND_SIZE bytes, but round to page boundary. - * - * The rounding is not only for performance reasons. Walreceiver - * relies on the fact that we never split a WAL record across two - * messages. Since a long WAL record is split at page boundary into - * continuation records, page boundary is always a safe cut-off point. - * We also assume that SendRqstPtr never points in the middle of a WAL - * record. + * crossing a logid boundary, skip the non-existent last log + * segment in previous logical log file. */ - startptr = sentPtr; - if (startptr.xrecoff >= XLogFileSize) - { - /* - * crossing a logid boundary, skip the non-existent last log - * segment in previous logical log file. - */ - startptr.xlogid += 1; - startptr.xrecoff = 0; - } - - endptr = startptr; - XLByteAdvance(endptr, MAX_SEND_SIZE); - /* round down to page boundary. */ - endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); - /* if we went beyond SendRqstPtr, back off */ - if (XLByteLT(SendRqstPtr, endptr)) - endptr = SendRqstPtr; - - /* - * OK to read and send the slice. - * - * We don't need to convert the xlogid/xrecoff from host byte order to - * network byte order because the both server can be expected to have - * the same byte order. If they have different byte order, we don't - * reach here. - */ - pq_sendbyte(outMsg, 'w'); - pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); - - if (endptr.xlogid != startptr.xlogid) - { - Assert(endptr.xlogid == startptr.xlogid + 1); - nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; - } - else - nbytes = endptr.xrecoff - startptr.xrecoff; - - sentPtr = endptr; - - /* - * Read the log directly into the output buffer to prevent extra - * memcpy calls. - */ - enlargeStringInfo(outMsg, nbytes); - - XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); - outMsg->len += nbytes; - outMsg->data[outMsg->len] = '\0'; - - pq_putmessage('d', outMsg->data, outMsg->len); - resetStringInfo(outMsg); + startptr.xlogid += 1; + startptr.xrecoff = 0; } + endptr = startptr; + XLByteAdvance(endptr, MAX_SEND_SIZE); + /* round down to page boundary. */ + endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + /* if we went beyond SendRqstPtr, back off */ + if (XLByteLT(SendRqstPtr, endptr)) + endptr = SendRqstPtr; + + /* + * OK to read and send the slice. + * + * We don't need to convert the xlogid/xrecoff from host byte order to + * network byte order because the both server can be expected to have + * the same byte order. If they have different byte order, we don't + * reach here. + */ + pq_sendbyte(outMsg, 'w'); + pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); + + if (endptr.xlogid != startptr.xlogid) + { + Assert(endptr.xlogid == startptr.xlogid + 1); + nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; + } + else + nbytes = endptr.xrecoff - startptr.xrecoff; + + sentPtr = endptr; + + /* + * Read the log directly into the output buffer to prevent extra + * memcpy calls. + */ + enlargeStringInfo(outMsg, nbytes); + + XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); + outMsg->len += nbytes; + outMsg->data[outMsg->len] = '\0'; + + pq_putmessage('d', outMsg->data, outMsg->len); + resetStringInfo(outMsg); + /* Update shared memory status */ SpinLockAcquire(&walsnd->mutex); walsnd->sentPtr = sentPtr;