postgresql/src/backend/replication/walreceiver.c

830 lines
24 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* walreceiver.c
*
* The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
* is the process in the standby server that takes charge of receiving
* XLOG records from a primary server during streaming replication.
*
* When the startup process determines that it's time to start streaming,
* it instructs postmaster to start walreceiver. Walreceiver first connects
* to the primary server (it will be served by a walsender process
* in the primary server), and then keeps receiving XLOG records and
* writing them to the disk as long as the connection is alive. As XLOG
* records are received and flushed to disk, it updates the
* WalRcv->receivedUpto variable in shared memory, to inform the startup
* process of how far it can proceed with XLOG replay.
*
* Normal termination is by SIGTERM, which instructs the walreceiver to
* exit(0). Emergency termination is by SIGQUIT; like any postmaster child
* process, the walreceiver will simply abort and exit on SIGQUIT. A close
* of the connection and a FATAL error are treated not as a crash but as
* normal operation.
*
* This file contains the server-facing parts of walreceiver. The libpq-
* specific parts are in the libpqwalreceiver module. It's loaded
* dynamically to avoid linking the server with libpq.
*
* Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
2010-09-20 22:08:53 +02:00
* src/backend/replication/walreceiver.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/guc.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Fix management of pendingOpsTable in auxiliary processes. mdinit() was misusing IsBootstrapProcessingMode() to decide whether to create an fsync pending-operations table in the current process. This led to creating a table not only in the startup and checkpointer processes as intended, but also in the bgwriter process, not to mention other auxiliary processes such as walwriter and walreceiver. Creation of the table in the bgwriter is fatal, because it absorbs fsync requests that should have gone to the checkpointer; instead they just sit in bgwriter local memory and are never acted on. So writes performed by the bgwriter were not being fsync'd which could result in data loss after an OS crash. I think there is no live bug with respect to walwriter and walreceiver because those never perform any writes of shared buffers; but the potential is there for future breakage in those processes too. To fix, make AuxiliaryProcessMain() export the current process's AuxProcType as a global variable, and then make mdinit() test directly for the types of aux process that should have a pendingOpsTable. Having done that, we might as well also get rid of the random bool flags such as am_walreceiver that some of the aux processes had grown. (Note that we could not have fixed the bug by examining those variables in mdinit(), because it's called from BaseInit() which is run by AuxiliaryProcessMain() before entering any of the process-type-specific code.) Back-patch to 9.2, where the problem was introduced by the split-up of bgwriter and checkpointer processes. The bogus pendingOpsTable exists in walwriter and walreceiver processes in earlier branches, but absent any evidence that it causes actual problems there, I'll leave the older branches alone.
2012-07-18 21:28:10 +02:00
/* GUC variables */
int wal_receiver_status_interval;
int wal_receiver_timeout;
bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
/*
* These variables are used similarly to openLogFile/SegNo/Off,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
* corresponding the filename of recvFile, used for error messages.
*/
static int recvFile = -1;
static TimeLineID recvFileTLI = 0;
static XLogSegNo recvSegNo = 0;
static uint32 recvOff = 0;
/*
* Flags set by interrupt handlers of walreceiver for later service in the
* main loop.
*/
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;
/*
* LogstreamResult indicates the byte positions that we have already
* written/fsynced.
*/
static struct
{
XLogRecPtr Write; /* last byte + 1 written out in the standby */
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
2011-04-10 17:42:00 +02:00
static StandbyReplyMessage reply_message;
static StandbyHSFeedbackMessage feedback_message;
/*
* About SIGTERM handling:
*
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
* check it in the main loop, because we perform some blocking operations
* like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
* sets got_SIGTERM flag, which is checked in the main loop when convenient.
*
* This is very much like what regular backends do with ImmediateInterruptOK,
* ProcessInterrupts() etc.
*/
static volatile bool WalRcvImmediateInterruptOK = false;
/* Prototypes for private functions */
static void ProcessWalRcvInterrupts(void);
static void EnableWalRcvImmediateExit(void);
static void DisableWalRcvImmediateExit(void);
static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(void);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
static void
ProcessWalRcvInterrupts(void)
{
/*
2010-02-26 03:01:40 +01:00
* Although walreceiver interrupt handling doesn't use the same scheme as
* regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
* any incoming signals on Win32.
*/
CHECK_FOR_INTERRUPTS();
if (got_SIGTERM)
{
WalRcvImmediateInterruptOK = false;
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating walreceiver process due to administrator command")));
}
}
static void
EnableWalRcvImmediateExit(void)
{
WalRcvImmediateInterruptOK = true;
ProcessWalRcvInterrupts();
}
static void
DisableWalRcvImmediateExit(void)
{
WalRcvImmediateInterruptOK = false;
ProcessWalRcvInterrupts();
}
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
2010-02-26 03:01:40 +01:00
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz last_recv_timestamp;
bool ping_sent;
/*
2010-02-26 03:01:40 +01:00
* WalRcv should be set up already (if we are a backend, we inherit this
* by fork() or EXEC_BACKEND mechanism from the postmaster).
*/
Assert(walrcv != NULL);
/*
* Mark walreceiver as running in shared memory.
*
2010-02-26 03:01:40 +01:00
* Do this as early as possible, so that if we fail later on, we'll set
* state to STOPPED. If we die before this, the startup process will keep
* waiting for us to start up, until it times out.
*/
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->pid == 0);
2010-02-26 03:01:40 +01:00
switch (walrcv->walRcvState)
{
case WALRCV_STOPPING:
/* If we've already been requested to stop, don't start up. */
walrcv->walRcvState = WALRCV_STOPPED;
/* fall through */
case WALRCV_STOPPED:
SpinLockRelease(&walrcv->mutex);
proc_exit(1);
break;
case WALRCV_STARTING:
/* The usual case */
break;
case WALRCV_RUNNING:
/* Shouldn't happen */
elog(PANIC, "walreceiver still running according to shared memory state");
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_RUNNING;
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
/*
* If possible, make this process a group leader, so that the postmaster
* can signal any child processes too. (walreceiver probably never has
* any child processes, but for consistency we make all postmaster child
* processes do this.)
*/
#ifdef HAVE_SETSID
if (setsid() < 0)
elog(FATAL, "setsid() failed: %m");
#endif
/* Properly accept or ignore signals the postmaster might send us */
2010-02-26 03:01:40 +01:00
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
* file */
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN);
pqsignal(SIGUSR2, SIG_IGN);
/* Reset some signals that are accepted by postmaster but not here */
pqsignal(SIGCHLD, SIG_DFL);
pqsignal(SIGTTIN, SIG_DFL);
pqsignal(SIGTTOU, SIG_DFL);
pqsignal(SIGCONT, SIG_DFL);
pqsignal(SIGWINCH, SIG_DFL);
/* We allow SIGQUIT (quickdie) at all times */
sigdelset(&BlockSig, SIGQUIT);
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
* Create a resource owner to keep track of our resources (not clear that
* we need this, but may as well have one).
*/
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
/* Initialize LogstreamResult, reply_message and feedback_message */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
MemSet(&reply_message, 0, sizeof(reply_message));
MemSet(&feedback_message, 0, sizeof(feedback_message));
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
/* Loop until end-of-streaming or error */
for (;;)
{
2010-02-26 03:01:40 +01:00
unsigned char type;
char *buf;
int len;
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive())
exit(1);
/*
* Exit walreceiver if we're not in recovery. This should not happen,
* but cross-check the status here.
*/
if (!RecoveryInProgress())
ereport(FATAL,
2010-03-21 01:17:59 +01:00
(errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
ProcessWalRcvInterrupts();
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/* Wait a while for data to arrive */
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
/* Something was received from master, so reset timeout */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
/* Accept the received data, and process it */
XLogWalRcvProcessMsg(type, buf, len);
/* Receive any more data we can without sleeping */
2010-02-26 03:01:40 +01:00
while (walrcv_receive(0, &type, &buf, &len))
{
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
XLogWalRcvProcessMsg(type, buf, len);
}
/* Let the master know that we received some data. */
XLogWalRcvSendReply(false, false);
/*
2010-02-26 03:01:40 +01:00
* If we've written some records, flush them to disk and let the
* startup process and primary server know about them.
*/
XLogWalRcvFlush(false);
}
else
{
/*
* We didn't receive anything new. If we haven't heard anything
* from the server for more than wal_receiver_timeout / 2,
* ping the server. Also, if it's been longer than
* wal_receiver_status_interval since the last update we sent,
* send a status update to the master anyway, to report any
* progress in applying WAL.
*/
bool requestReply = false;
/*
* Check if time since last receive from standby has reached the
* configured limit.
*/
if (wal_receiver_timeout > 0)
{
TimestampTz now = GetCurrentTimestamp();
TimestampTz timeout;
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
wal_receiver_timeout);
if (now >= timeout)
ereport(ERROR,
(errmsg("terminating walreceiver due to timeout")));
/*
* We didn't receive anything new, for half of receiver
* replication timeout. Ping the server.
*/
if (!ping_sent)
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
(wal_receiver_timeout/2));
if (now >= timeout)
{
requestReply = true;
ping_sent = true;
}
}
}
XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback();
}
}
}
/*
* Mark us as STOPPED in shared memory at exit.
*/
static void
WalRcvDie(int code, Datum arg)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_RUNNING ||
walrcv->walRcvState == WALRCV_STOPPING);
walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
if (walrcv_disconnect != NULL)
walrcv_disconnect();
}
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalRcvSigHupHandler(SIGNAL_ARGS)
{
got_SIGHUP = true;
}
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
static void
WalRcvShutdownHandler(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGTERM = true;
/* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts();
errno = save_errno;
}
/*
* WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
*
* Some backend has bought the farm, so we need to stop what we're doing and
* exit.
*/
static void
WalRcvQuickDieHandler(SIGNAL_ARGS)
{
PG_SETMASK(&BlockSig);
/*
* We DO NOT want to run proc_exit() callbacks -- we're here because
* shared memory may be corrupted, so we don't want to try to clean up our
* transaction. Just nail the windows shut and get out of town. Now that
* there's an atexit callback to prevent third-party code from breaking
* things by calling exit() directly, we have to reset the callbacks
* explicitly to make this work as intended.
*/
on_exit_reset();
/*
* Note we do exit(2) not exit(0). This is to force the postmaster into a
* system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
* backend. This is necessary precisely because we don't clean up our
* shared memory state. (The "dead man switch" mechanism in pmsignal.c
2010-02-26 03:01:40 +01:00
* should ensure the postmaster sees this as a crash, too, but no harm in
* being doubly sure.)
*/
exit(2);
}
/*
* Accept the message from XLOG stream, and process it.
*/
static void
XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
switch (type)
{
2010-02-26 03:01:40 +01:00
case 'w': /* WAL records */
{
WalDataMessageHeader msghdr;
if (len < sizeof(WalDataMessageHeader))
2010-02-26 03:01:40 +01:00
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
XLogWalRcvWrite(buf, len, msghdr.dataStart);
2010-02-26 03:01:40 +01:00
break;
}
case 'k': /* Keepalive */
{
PrimaryKeepaliveMessage keepalive;
if (len != sizeof(PrimaryKeepaliveMessage))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid keepalive message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
/* If the primary requested a reply, send one immediately */
if (keepalive.replyRequested)
XLogWalRcvSendReply(true, false);
break;
}
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid replication message type %d",
type)));
}
}
/*
* Write XLOG data to disk.
*/
static void
XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
2010-02-26 03:01:40 +01:00
int startoff;
int byteswritten;
while (nbytes > 0)
{
2010-02-26 03:01:40 +01:00
int segbytes;
if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
{
2010-02-26 03:01:40 +01:00
bool use_existent;
/*
2010-02-26 03:01:40 +01:00
* fsync() and close current file before we switch to next one. We
* would otherwise have to reopen this file to fsync it later
*/
if (recvFile >= 0)
{
XLogWalRcvFlush(false);
/*
* XLOG segment files will be re-read by recovery in startup
* process soon, so we don't advise the OS to release cache
* pages associated with the file like XLogFileClose() does.
*/
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
XLogFileNameP(recvFileTLI, recvSegNo))));
}
recvFile = -1;
/* Create/use new log file */
XLByteToSeg(recptr, recvSegNo);
use_existent = true;
recvFile = XLogFileInit(recvSegNo, &use_existent, true);
recvFileTLI = ThisTimeLineID;
recvOff = 0;
}
/* Calculate the start offset of the received logs */
startoff = recptr % XLogSegSize;
if (startoff + nbytes > XLogSegSize)
segbytes = XLogSegSize - startoff;
else
segbytes = nbytes;
/* Need to seek in the file? */
if (recvOff != startoff)
{
if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not seek in log segment %s, to offset %u: %m",
XLogFileNameP(recvFileTLI, recvSegNo),
startoff)));
recvOff = startoff;
}
/* OK to write the logs */
errno = 0;
byteswritten = write(recvFile, buf, segbytes);
if (byteswritten <= 0)
{
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
XLogFileNameP(recvFileTLI, recvSegNo),
recvOff, (unsigned long) segbytes)));
}
/* Update state for write */
XLByteAdvance(recptr, byteswritten);
recvOff += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
2010-02-26 03:01:40 +01:00
LogstreamResult.Write = recptr;
}
}
/*
* Flush the log to disk.
*
* If we're in the midst of dying, it's unwise to do anything that might throw
* an error, so we skip sending a reply in that case.
*/
static void
XLogWalRcvFlush(bool dying)
{
if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write;
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
if (XLByteLT(walrcv->receivedUpto, LogstreamResult.Flush))
{
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush;
}
SpinLockRelease(&walrcv->mutex);
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
if (AllowCascadeReplication())
WalSndWakeup();
/* Report XLOG streaming progress in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
(uint32) (LogstreamResult.Write >> 32),
(uint32) LogstreamResult.Write);
set_ps_display(activitymsg, false);
}
/* Also let the master know that we made some progress */
if (!dying)
XLogWalRcvSendReply(false, false);
}
}
/*
* Send reply message to primary, indicating our current XLOG positions, oldest
* xmin and the current time.
*
* If 'force' is not set, the message is only sent if enough time has
2012-10-15 12:01:31 +02:00
* passed since last status update to reach wal_receiver_status_interval.
* If wal_receiver_status_interval is disabled altogether and 'force' is
* false, this is a no-op.
*
* If 'requestReply' is true, requests the server to reply immediately upon
* receiving this message. This is used for heartbearts, when approaching
* wal_receiver_timeout.
*/
static void
XLogWalRcvSendReply(bool force, bool requestReply)
{
char buf[sizeof(StandbyReplyMessage) + 1];
2011-04-10 17:42:00 +02:00
TimestampTz now;
/*
* If the user doesn't want status to be reported to the master, be sure
* to exit before doing anything at all.
*/
if (!force && wal_receiver_status_interval <= 0)
return;
/* Get current timestamp. */
now = GetCurrentTimestamp();
/*
* We can compare the write and flush positions to the last message we
* sent without taking any lock, but the apply position requires a spin
* lock, so we don't check that unless something else has changed or 10
* seconds have passed. This means that the apply log position will
* appear, from the master's point of view, to lag slightly, but since
* this is only for reporting purposes and only on idle systems, that's
* probably OK.
*/
if (!force
&& XLByteEQ(reply_message.write, LogstreamResult.Write)
&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
2011-04-10 17:42:00 +02:00
wal_receiver_status_interval * 1000))
return;
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now;
reply_message.replyRequested = requestReply;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
(uint32) (reply_message.write >> 32), (uint32) reply_message.write,
(uint32) (reply_message.flush >> 32), (uint32) reply_message.flush,
(uint32) (reply_message.apply >> 32), (uint32) reply_message.apply);
/* Prepend with the message type and send it. */
buf[0] = 'r';
memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
}
/*
* Send hot standby feedback message to primary, plus the current time,
* in case they don't have a watch.
*/
static void
XLogWalRcvSendHSFeedback(void)
{
2011-04-10 17:42:00 +02:00
char buf[sizeof(StandbyHSFeedbackMessage) + 1];
TimestampTz now;
TransactionId nextXid;
uint32 nextEpoch;
TransactionId xmin;
/*
* If the user doesn't want status to be reported to the master, be sure
* to exit before doing anything at all.
*/
if (wal_receiver_status_interval <= 0 || !hot_standby_feedback)
return;
/* Get current timestamp. */
now = GetCurrentTimestamp();
/*
* Send feedback at most once per wal_receiver_status_interval.
*/
if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
2011-04-10 17:42:00 +02:00
wal_receiver_status_interval * 1000))
return;
/*
2011-04-10 17:42:00 +02:00
* If Hot Standby is not yet active there is nothing to send. Check this
* after the interval has expired to reduce number of calls.
*/
if (!HotStandbyActive())
return;
/*
2011-04-10 17:42:00 +02:00
* Make the expensive call to get the oldest xmin once we are certain
* everything else has been checked.
*/
xmin = GetOldestXmin(true, false);
/*
2011-04-10 17:42:00 +02:00
* Get epoch and adjust if nextXid and oldestXmin are different sides of
* the epoch boundary.
*/
GetNextXidAndEpoch(&nextXid, &nextEpoch);
if (nextXid < xmin)
nextEpoch--;
/*
* Always send feedback message.
*/
feedback_message.sendTime = now;
feedback_message.xmin = xmin;
feedback_message.epoch = nextEpoch;
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
2011-04-10 17:42:00 +02:00
feedback_message.xmin,
feedback_message.epoch);
/* Prepend with the message type and send it. */
buf[0] = 'h';
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
}
/*
* Keep track of important messages from primary.
*/
static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
if (XLByteLT(walrcv->latestWalEnd, walEnd))
walrcv->latestWalEndTime = sendTime;
walrcv->latestWalEnd = walEnd;
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
2012-01-13 14:21:45 +01:00
if (log_min_messages <= DEBUG2)
elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
timestamptz_to_str(sendTime),
timestamptz_to_str(lastMsgReceiptTime),
GetReplicationApplyDelay(),
GetReplicationTransferLatency());
}