diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f2694db873..ecb2c3a6d3 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.10 2010/04/20 22:55:03 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -76,9 +76,15 @@ static uint32 recvOff = 0; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; -static void ProcessWalRcvInterrupts(void); -static void EnableWalRcvImmediateExit(void); -static void DisableWalRcvImmediateExit(void); +/* + * LogstreamResult indicates the byte positions that we have already + * written/fsynced. + */ +static struct +{ + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; /* * About SIGTERM handling: @@ -98,6 +104,21 @@ static void DisableWalRcvImmediateExit(void); */ static volatile bool WalRcvImmediateInterruptOK = false; +/* Prototypes for private functions */ +static void ProcessWalRcvInterrupts(void); +static void EnableWalRcvImmediateExit(void); +static void DisableWalRcvImmediateExit(void); +static void WalRcvDie(int code, Datum arg); +static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalRcvFlush(void); + +/* Signal handlers */ +static void WalRcvSigHupHandler(SIGNAL_ARGS); +static void WalRcvShutdownHandler(SIGNAL_ARGS); +static void WalRcvQuickDieHandler(SIGNAL_ARGS); + + static void ProcessWalRcvInterrupts(void) { @@ -118,47 +139,25 @@ ProcessWalRcvInterrupts(void) } static void -EnableWalRcvImmediateExit() +EnableWalRcvImmediateExit(void) { WalRcvImmediateInterruptOK = true; ProcessWalRcvInterrupts(); } static void -DisableWalRcvImmediateExit() +DisableWalRcvImmediateExit(void) { WalRcvImmediateInterruptOK = false; ProcessWalRcvInterrupts(); } -/* Signal handlers */ -static void WalRcvSigHupHandler(SIGNAL_ARGS); -static void WalRcvShutdownHandler(SIGNAL_ARGS); -static void WalRcvQuickDieHandler(SIGNAL_ARGS); - -/* Prototypes for private functions */ -static void WalRcvDie(int code, Datum arg); -static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); -static void XLogWalRcvFlush(void); - -/* - * LogstreamResult indicates the byte positions that we have already - * written/fsynced. - */ -static struct -{ - XLogRecPtr Write; /* last byte + 1 written out in the standby */ - XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ -} LogstreamResult; - /* Main entry point for walreceiver process */ void WalReceiverMain(void) { char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; - /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -398,19 +397,21 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) if (len < sizeof(XLogRecPtr)) ereport(ERROR, - (errmsg("invalid WAL message received from primary"))); + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid WAL message received from primary"))); - recptr = *((XLogRecPtr *) buf); + memcpy(&recptr, buf, sizeof(XLogRecPtr)); buf += sizeof(XLogRecPtr); len -= sizeof(XLogRecPtr); + XLogWalRcvWrite(buf, len, recptr); break; } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid replication message type %d", - type))); + errmsg_internal("invalid replication message type %d", + type))); } }