mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-09-30 12:41:19 +02:00
Code review for commit 05a7be935
.
Avoid having walreceiver code know explicitly about the precision and underlying datatype of TimestampTz. (There is still one calculation that knows that, which should be replaced with use of TimestampDifferenceMilliseconds; but we need to figure out what to do about overflow cases first.) In support of this, provide a TimestampTzPlusSeconds macro, as well as TIMESTAMP_INFINITY and TIMESTAMP_MINUS_INFINITY macros. (We could have used the existing DT_NOEND and DT_NOBEGIN symbols, but I judged those too opaque and confusing.) Move GetCurrentTimestamp calls so that it's more obvious that we are not using stale values of "now" anyplace. This doesn't result in net more calls, and might indeed make for net fewer. Avoid having a dummy value in the WalRcvWakeupReason enum, so that we can hope for the compiler to catch overlooked switch cases. Nathan Bossart and Tom Lane Discussion: https://postgr.es/m/20230125235004.GA1327755@nathanxps13
This commit is contained in:
parent
e35bb9f158
commit
24ff700f6a
@ -122,8 +122,8 @@ typedef enum WalRcvWakeupReason
|
|||||||
WALRCV_WAKEUP_TERMINATE,
|
WALRCV_WAKEUP_TERMINATE,
|
||||||
WALRCV_WAKEUP_PING,
|
WALRCV_WAKEUP_PING,
|
||||||
WALRCV_WAKEUP_REPLY,
|
WALRCV_WAKEUP_REPLY,
|
||||||
WALRCV_WAKEUP_HSFEEDBACK,
|
WALRCV_WAKEUP_HSFEEDBACK
|
||||||
NUM_WALRCV_WAKEUPS
|
#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
|
||||||
} WalRcvWakeupReason;
|
} WalRcvWakeupReason;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -206,8 +206,6 @@ WalReceiverMain(void)
|
|||||||
*/
|
*/
|
||||||
Assert(walrcv != NULL);
|
Assert(walrcv != NULL);
|
||||||
|
|
||||||
now = GetCurrentTimestamp();
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Mark walreceiver as running in shared memory.
|
* Mark walreceiver as running in shared memory.
|
||||||
*
|
*
|
||||||
@ -261,6 +259,7 @@ WalReceiverMain(void)
|
|||||||
Assert(!is_temp_slot || (slotname[0] == '\0'));
|
Assert(!is_temp_slot || (slotname[0] == '\0'));
|
||||||
|
|
||||||
/* Initialise to a sanish value */
|
/* Initialise to a sanish value */
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
walrcv->lastMsgSendTime =
|
walrcv->lastMsgSendTime =
|
||||||
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
|
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
|
||||||
|
|
||||||
@ -464,6 +463,7 @@ WalReceiverMain(void)
|
|||||||
{
|
{
|
||||||
ConfigReloadPending = false;
|
ConfigReloadPending = false;
|
||||||
ProcessConfigFile(PGC_SIGHUP);
|
ProcessConfigFile(PGC_SIGHUP);
|
||||||
|
/* recompute wakeup times */
|
||||||
now = GetCurrentTimestamp();
|
now = GetCurrentTimestamp();
|
||||||
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
|
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
|
||||||
WalRcvComputeNextWakeup(i, now);
|
WalRcvComputeNextWakeup(i, now);
|
||||||
@ -472,7 +472,6 @@ WalReceiverMain(void)
|
|||||||
|
|
||||||
/* See if we can read data immediately */
|
/* See if we can read data immediately */
|
||||||
len = walrcv_receive(wrconn, &buf, &wait_fd);
|
len = walrcv_receive(wrconn, &buf, &wait_fd);
|
||||||
now = GetCurrentTimestamp();
|
|
||||||
if (len != 0)
|
if (len != 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@ -487,6 +486,7 @@ WalReceiverMain(void)
|
|||||||
* Something was received from primary, so adjust
|
* Something was received from primary, so adjust
|
||||||
* the ping and terminate wakeup times.
|
* the ping and terminate wakeup times.
|
||||||
*/
|
*/
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
|
WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
|
||||||
now);
|
now);
|
||||||
WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
|
WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
|
||||||
@ -506,7 +506,6 @@ WalReceiverMain(void)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
len = walrcv_receive(wrconn, &buf, &wait_fd);
|
len = walrcv_receive(wrconn, &buf, &wait_fd);
|
||||||
now = GetCurrentTimestamp();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Let the primary know that we received some data. */
|
/* Let the primary know that we received some data. */
|
||||||
@ -525,7 +524,7 @@ WalReceiverMain(void)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
/* Find the soonest wakeup time, to limit our nap. */
|
/* Find the soonest wakeup time, to limit our nap. */
|
||||||
nextWakeup = PG_INT64_MAX;
|
nextWakeup = TIMESTAMP_INFINITY;
|
||||||
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
|
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
|
||||||
nextWakeup = Min(wakeup[i], nextWakeup);
|
nextWakeup = Min(wakeup[i], nextWakeup);
|
||||||
|
|
||||||
@ -536,6 +535,7 @@ WalReceiverMain(void)
|
|||||||
* millisecond to avoid waking up too early and spinning until
|
* millisecond to avoid waking up too early and spinning until
|
||||||
* one of the wakeup times.
|
* one of the wakeup times.
|
||||||
*/
|
*/
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
|
nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -556,7 +556,6 @@ WalReceiverMain(void)
|
|||||||
wait_fd,
|
wait_fd,
|
||||||
nap,
|
nap,
|
||||||
WAIT_EVENT_WAL_RECEIVER_MAIN);
|
WAIT_EVENT_WAL_RECEIVER_MAIN);
|
||||||
now = GetCurrentTimestamp();
|
|
||||||
if (rc & WL_LATCH_SET)
|
if (rc & WL_LATCH_SET)
|
||||||
{
|
{
|
||||||
ResetLatch(MyLatch);
|
ResetLatch(MyLatch);
|
||||||
@ -592,19 +591,20 @@ WalReceiverMain(void)
|
|||||||
* Check if time since last receive from primary has
|
* Check if time since last receive from primary has
|
||||||
* reached the configured limit.
|
* reached the configured limit.
|
||||||
*/
|
*/
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
|
if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
errmsg("terminating walreceiver due to timeout")));
|
errmsg("terminating walreceiver due to timeout")));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We didn't receive anything new, for half of receiver
|
* If we didn't receive anything new for half of receiver
|
||||||
* replication timeout. Ping the server.
|
* replication timeout, then ping the server.
|
||||||
*/
|
*/
|
||||||
if (now >= wakeup[WALRCV_WAKEUP_PING])
|
if (now >= wakeup[WALRCV_WAKEUP_PING])
|
||||||
{
|
{
|
||||||
requestReply = true;
|
requestReply = true;
|
||||||
wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
|
wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
|
||||||
}
|
}
|
||||||
|
|
||||||
XLogWalRcvSendReply(requestReply, requestReply);
|
XLogWalRcvSendReply(requestReply, requestReply);
|
||||||
@ -1266,7 +1266,6 @@ static void
|
|||||||
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
|
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
|
||||||
{
|
{
|
||||||
WalRcvData *walrcv = WalRcv;
|
WalRcvData *walrcv = WalRcv;
|
||||||
|
|
||||||
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
|
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
|
||||||
|
|
||||||
/* Update shared-memory status */
|
/* Update shared-memory status */
|
||||||
@ -1310,7 +1309,10 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
|
|||||||
/*
|
/*
|
||||||
* Compute the next wakeup time for a given wakeup reason. Can be called to
|
* Compute the next wakeup time for a given wakeup reason. Can be called to
|
||||||
* initialize a wakeup time, to adjust it for the next wakeup, or to
|
* initialize a wakeup time, to adjust it for the next wakeup, or to
|
||||||
* reinitialize it when GUCs have changed.
|
* reinitialize it when GUCs have changed. We ask the caller to pass in the
|
||||||
|
* value of "now" because this frequently avoids multiple calls of
|
||||||
|
* GetCurrentTimestamp(). It had better be a reasonably up-to-date value
|
||||||
|
* though.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
|
WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
|
||||||
@ -1319,30 +1321,29 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
|
|||||||
{
|
{
|
||||||
case WALRCV_WAKEUP_TERMINATE:
|
case WALRCV_WAKEUP_TERMINATE:
|
||||||
if (wal_receiver_timeout <= 0)
|
if (wal_receiver_timeout <= 0)
|
||||||
wakeup[reason] = PG_INT64_MAX;
|
wakeup[reason] = TIMESTAMP_INFINITY;
|
||||||
else
|
else
|
||||||
wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
|
wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
|
||||||
break;
|
break;
|
||||||
case WALRCV_WAKEUP_PING:
|
case WALRCV_WAKEUP_PING:
|
||||||
if (wal_receiver_timeout <= 0)
|
if (wal_receiver_timeout <= 0)
|
||||||
wakeup[reason] = PG_INT64_MAX;
|
wakeup[reason] = TIMESTAMP_INFINITY;
|
||||||
else
|
else
|
||||||
wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
|
wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
|
||||||
break;
|
break;
|
||||||
case WALRCV_WAKEUP_HSFEEDBACK:
|
case WALRCV_WAKEUP_HSFEEDBACK:
|
||||||
if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
|
if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
|
||||||
wakeup[reason] = PG_INT64_MAX;
|
wakeup[reason] = TIMESTAMP_INFINITY;
|
||||||
else
|
else
|
||||||
wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
|
wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
|
||||||
break;
|
break;
|
||||||
case WALRCV_WAKEUP_REPLY:
|
case WALRCV_WAKEUP_REPLY:
|
||||||
if (wal_receiver_status_interval <= 0)
|
if (wal_receiver_status_interval <= 0)
|
||||||
wakeup[reason] = PG_INT64_MAX;
|
wakeup[reason] = TIMESTAMP_INFINITY;
|
||||||
else
|
else
|
||||||
wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
|
wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
break;
|
||||||
|
/* there's intentionally no default: here */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,10 +143,17 @@ struct pg_itm_in
|
|||||||
#define TZDISP_LIMIT ((MAX_TZDISP_HOUR + 1) * SECS_PER_HOUR)
|
#define TZDISP_LIMIT ((MAX_TZDISP_HOUR + 1) * SECS_PER_HOUR)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DT_NOBEGIN represents timestamp -infinity; DT_NOEND represents +infinity
|
* We reserve the minimum and maximum integer values to represent
|
||||||
|
* timestamp (or timestamptz) -infinity and +infinity.
|
||||||
*/
|
*/
|
||||||
#define DT_NOBEGIN PG_INT64_MIN
|
#define TIMESTAMP_MINUS_INFINITY PG_INT64_MIN
|
||||||
#define DT_NOEND PG_INT64_MAX
|
#define TIMESTAMP_INFINITY PG_INT64_MAX
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Historically these alias for infinity have been used.
|
||||||
|
*/
|
||||||
|
#define DT_NOBEGIN TIMESTAMP_MINUS_INFINITY
|
||||||
|
#define DT_NOEND TIMESTAMP_INFINITY
|
||||||
|
|
||||||
#define TIMESTAMP_NOBEGIN(j) \
|
#define TIMESTAMP_NOBEGIN(j) \
|
||||||
do {(j) = DT_NOBEGIN;} while (0)
|
do {(j) = DT_NOBEGIN;} while (0)
|
||||||
|
@ -81,7 +81,9 @@ IntervalPGetDatum(const Interval *X)
|
|||||||
#define INTERVAL_PRECISION(t) ((t) & INTERVAL_PRECISION_MASK)
|
#define INTERVAL_PRECISION(t) ((t) & INTERVAL_PRECISION_MASK)
|
||||||
#define INTERVAL_RANGE(t) (((t) >> 16) & INTERVAL_RANGE_MASK)
|
#define INTERVAL_RANGE(t) (((t) >> 16) & INTERVAL_RANGE_MASK)
|
||||||
|
|
||||||
|
/* Macros for doing timestamp arithmetic without assuming timestamp's units */
|
||||||
#define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000))
|
#define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000))
|
||||||
|
#define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000))
|
||||||
|
|
||||||
|
|
||||||
/* Set at postmaster start */
|
/* Set at postmaster start */
|
||||||
|
Loading…
Reference in New Issue
Block a user