Correct base backup throttling

Throttling for sending a base backup in walsender is broken for the case
where there is a lot of WAL traffic, because the latch used to put the
walsender to sleep is also signalled by regular WAL traffic (and each
signal causes an additional batch of data to be sent); the net effect is
that there is no or little actual throttling.  This is undesirable, so
rewrite the sleep into a loop to achieve the desired effeect.

Author: Jeff Janes, small tweaks by me
Reviewed-by: Antonin Houska
Discussion: https://postgr.es/m/CAMkU=1xH6mde-yL-Eo1TKBGNd0PB1-TMxvrNvqcAkN-qr2E9mw@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2017-09-05 16:59:39 +02:00
parent 9ae9d8c154
commit ebd346caf4
1 changed files with 26 additions and 11 deletions

View File

@ -1336,10 +1336,7 @@ _tarWriteDir(const char *pathbuf, int basepathlen, struct stat *statbuf,
static void
throttle(size_t increment)
{
TimeOffset elapsed,
elapsed_min,
sleep;
int wait_result;
TimeOffset elapsed_min;
if (throttling_counter < 0)
return;
@ -1348,14 +1345,28 @@ throttle(size_t increment)
if (throttling_counter < throttling_sample)
return;
/* Time elapsed since the last measurement (and possible wake up). */
elapsed = GetCurrentTimestamp() - throttled_last;
/* How much should have elapsed at minimum? */
elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
sleep = elapsed_min - elapsed;
/* Only sleep if the transfer is faster than it should be. */
if (sleep > 0)
/* How much time should have elapsed at minimum? */
elapsed_min = elapsed_min_unit *
(throttling_counter / throttling_sample);
/*
* Since the latch could be set repeatedly because of concurrently WAL
* activity, sleep in a loop to ensure enough time has passed.
*/
for (;;)
{
TimeOffset elapsed,
sleep;
int wait_result;
/* Time elapsed since the last measurement (and possible wake up). */
elapsed = GetCurrentTimestamp() - throttled_last;
/* sleep if the transfer is faster than it should be */
sleep = elapsed_min - elapsed;
if (sleep <= 0)
break;
ResetLatch(MyLatch);
/* We're eating a potentially set latch, so check for interrupts */
@ -1372,6 +1383,10 @@ throttle(size_t increment)
if (wait_result & WL_LATCH_SET)
CHECK_FOR_INTERRUPTS();
/* Done waiting? */
if (wait_result & WL_TIMEOUT)
break;
}
/*