diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 40e4298cf4..96bcc3a63b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + reply_time + timestamp with time zone + Send time of last reply message received from standby server + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 715995dd88..8630542bb3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 46edb525e8..d1a8113cb6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void) applyLag; bool clearLagTimes; TimestampTz now; + TimestampTz replyTime; static bool fullyAppliedLastTime = false; @@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", - (uint32) (writePtr >> 32), (uint32) writePtr, - (uint32) (flushPtr >> 32), (uint32) flushPtr, - (uint32) (applyPtr >> 32), (uint32) applyPtr, - replyRequested ? " (reply requested)" : ""); + if (log_min_messages <= DEBUG2) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s", + (uint32) (writePtr >> 32), (uint32) writePtr, + (uint32) (flushPtr >> 32), (uint32) flushPtr, + (uint32) (applyPtr >> 32), (uint32) applyPtr, + replyRequested ? " (reply requested)" : "", + replyTimeStr); + + pfree(replyTimeStr); + } /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); @@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + walsnd->replyTime = replyTime; SpinLockRelease(&walsnd->mutex); } @@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void) uint32 feedbackEpoch; TransactionId feedbackCatalogXmin; uint32 feedbackCatalogEpoch; + TimestampTz replyTime; /* * Decipher the reply message. The caller already consumed the msgtype * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation * of this message. */ - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", - feedbackXmin, - feedbackEpoch, - feedbackCatalogXmin, - feedbackCatalogEpoch); + if (log_min_messages <= DEBUG2) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s", + feedbackXmin, + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch, + replyTimeStr); + + pfree(replyTimeStr); + } + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + } /* * Unset WalSender's xmins if the feedback message values are invalid. @@ -2265,6 +2302,7 @@ InitWalSenderSlot(void) walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; + walsnd->replyTime = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + TimestampTz replyTime; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; + replyTime = walsnd->replyTime; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + if (replyTime == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(replyTime); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index be72bddd17..e16ec9dd77 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201811201 +#define CATALOG_VERSION_NO 201812091 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 034a41eb55..f79fcfe029 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5023,9 +5023,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4b90477936..53314b1fae 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -75,6 +75,11 @@ typedef struct WalSnd * SyncRepLock. */ int sync_standby_priority; + + /* + * Timestamp of the last message received from standby. + */ + TimestampTz replyTime; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 735dd37acf..b68b8d273f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid, w.flush_lag, w.replay_lag, w.sync_priority, - w.sync_state + w.sync_state, + w.reply_time FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl,