diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 370429d746..2683385ca6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -211,7 +211,7 @@ typedef struct #define LAG_TRACKER_BUFFER_SIZE 8192 /* A mechanism for tracking replication lag. */ -static struct +typedef struct { XLogRecPtr last_lsn; WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]; @@ -220,6 +220,8 @@ static struct WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; } LagTracker; +static LagTracker *lag_tracker; + /* Signal handlers */ static void WalSndLastCycleHandler(SIGNAL_ARGS); @@ -282,7 +284,7 @@ InitWalSender(void) SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); /* Initialize empty timestamp buffer for lag tracking. */ - memset(&LagTracker, 0, sizeof(LagTracker)); + lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); } /* @@ -3439,9 +3441,9 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) * If the lsn hasn't advanced since last time, then do nothing. This way * we only record a new sample when new WAL has been written. */ - if (LagTracker.last_lsn == lsn) + if (lag_tracker->last_lsn == lsn) return; - LagTracker.last_lsn = lsn; + lag_tracker->last_lsn = lsn; /* * If advancing the write head of the circular buffer would crash into any @@ -3449,11 +3451,11 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) * slowest reader (presumably apply) is the one that controls the release * of space. */ - new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE; + new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; buffer_full = false; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) { - if (new_write_head == LagTracker.read_heads[i]) + if (new_write_head == lag_tracker->read_heads[i]) buffer_full = true; } @@ -3464,17 +3466,17 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) */ if (buffer_full) { - new_write_head = LagTracker.write_head; - if (LagTracker.write_head > 0) - LagTracker.write_head--; + new_write_head = lag_tracker->write_head; + if (lag_tracker->write_head > 0) + lag_tracker->write_head--; else - LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1; + lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1; } /* Store a sample at the current write head position. */ - LagTracker.buffer[LagTracker.write_head].lsn = lsn; - LagTracker.buffer[LagTracker.write_head].time = local_flush_time; - LagTracker.write_head = new_write_head; + lag_tracker->buffer[lag_tracker->write_head].lsn = lsn; + lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time; + lag_tracker->write_head = new_write_head; } /* @@ -3496,14 +3498,14 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) TimestampTz time = 0; /* Read all unread samples up to this LSN or end of buffer. */ - while (LagTracker.read_heads[head] != LagTracker.write_head && - LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn) + while (lag_tracker->read_heads[head] != lag_tracker->write_head && + lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn) { - time = LagTracker.buffer[LagTracker.read_heads[head]].time; - LagTracker.last_read[head] = - LagTracker.buffer[LagTracker.read_heads[head]]; - LagTracker.read_heads[head] = - (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; + time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; + lag_tracker->last_read[head] = + lag_tracker->buffer[lag_tracker->read_heads[head]]; + lag_tracker->read_heads[head] = + (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; } /* @@ -3513,8 +3515,8 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) * interpolation at the beginning of the next burst of WAL after a period * of idleness. */ - if (LagTracker.read_heads[head] == LagTracker.write_head) - LagTracker.last_read[head].time = 0; + if (lag_tracker->read_heads[head] == lag_tracker->write_head) + lag_tracker->last_read[head].time = 0; if (time > now) { @@ -3532,17 +3534,17 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) * eventually start moving again and cross one of our samples before * we can show the lag increasing. */ - if (LagTracker.read_heads[head] == LagTracker.write_head) + if (lag_tracker->read_heads[head] == lag_tracker->write_head) { /* There are no future samples, so we can't interpolate. */ return -1; } - else if (LagTracker.last_read[head].time != 0) + else if (lag_tracker->last_read[head].time != 0) { /* We can interpolate between last_read and the next sample. */ double fraction; - WalTimeSample prev = LagTracker.last_read[head]; - WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]]; + WalTimeSample prev = lag_tracker->last_read[head]; + WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]]; if (lsn < prev.lsn) { @@ -3579,7 +3581,7 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) * standby reaches the future sample the best we can do is report * the hypothetical lag if that sample were to be replayed now. */ - time = LagTracker.buffer[LagTracker.read_heads[head]].time; + time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; } }