From 808969d0e7ac2d2fdbd915c6a6ac9ec68b6f63f9 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 3 Feb 2010 09:47:19 +0000 Subject: [PATCH] Add a message type header to the CopyData messages sent from primary to standby in streaming replication. While we only have one message type at the moment, adding a message type header makes this easier to extend. --- doc/src/sgml/protocol.sgml | 61 +++++++++++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 34 +++++------ src/backend/replication/walreceiver.c | 49 ++++++++++++--- src/backend/replication/walsender.c | 3 +- src/include/replication/walreceiver.h | 5 +- 5 files changed, 117 insertions(+), 35 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 54e03998ff..845e418130 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1,4 +1,4 @@ - + Frontend/Backend Protocol @@ -4179,12 +4179,65 @@ The commands accepted in walsender mode are: already been recycled. On success, server responds with a CopyOutResponse message, and backend starts to stream WAL as CopyData messages. + The payload in CopyData message consists of the following format. - The payload in each CopyData message consists of an XLogRecPtr, - indicating the starting point of the WAL in the message, immediately - followed by the WAL data itself. + + + + XLogData (B) + + + + + + + Byte1('w') + + + + Identifies the message as WAL data. + + + + + + Int32 + + + + The log file number of the LSN, indicating the starting point of + the WAL in the message. + + + + + + Int32 + + + + The byte offset of the LSN, indicating the starting point of + the WAL in the message. + + + + + + Byten + + + + Data that forms part of WAL data stream. + + + + + + + + A single WAL record is never split across two CopyData messages. When diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index b7a24e56f5..039370a851 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.2 2010/01/20 11:58:44 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -48,8 +48,8 @@ static char *recvBuf = NULL; /* Prototypes for interface functions */ static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); -static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, - int *len); +static bool libpqrcv_receive(int timeout, unsigned char *type, + char **buffer, int *len); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ @@ -236,13 +236,13 @@ libpqrcv_disconnect(void) } /* - * Receive any WAL records available from XLOG stream, blocking for + * Receive a message available from XLOG stream, blocking for * maximum of 'timeout' ms. * * Returns: * - * True if data was received. *recptr, *buffer and *len are set to - * the WAL location of the received data, buffer holding it, and length, + * True if data was received. *type, *buffer and *len are set to + * the type of the received data, buffer holding it, and length, * respectively. * * False if no data was available within timeout, or wait was interrupted @@ -254,7 +254,7 @@ libpqrcv_disconnect(void) * ereports on error. */ static bool -libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) +libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) { int rawlen; @@ -275,14 +275,14 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) if (PQconsumeInput(streamConn) == 0) ereport(ERROR, - (errmsg("could not read xlog records: %s", + (errmsg("could not receive data from XLOG stream: %s", PQerrorMessage(streamConn)))); } justconnected = false; /* Receive CopyData message */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); - if (rawlen == 0) /* no records available yet, then return */ + if (rawlen == 0) /* no data available yet, then return */ return false; if (rawlen == -1) /* end-of-streaming or error */ { @@ -297,22 +297,18 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) } PQclear(res); ereport(ERROR, - (errmsg("could not read xlog records: %s", + (errmsg("could not receive data from XLOG stream: %s", PQerrorMessage(streamConn)))); } if (rawlen < -1) ereport(ERROR, - (errmsg("could not read xlog records: %s", + (errmsg("could not receive data from XLOG stream: %s", PQerrorMessage(streamConn)))); - if (rawlen < sizeof(XLogRecPtr)) - ereport(ERROR, - (errmsg("invalid WAL message received from primary"))); - - /* Return received WAL records to caller */ - *recptr = *((XLogRecPtr *) recvBuf); - *buffer = recvBuf + sizeof(XLogRecPtr); - *len = rawlen - sizeof(XLogRecPtr); + /* Return received messages to caller */ + *type = *((unsigned char *) recvBuf); + *buffer = recvBuf + sizeof(*type); + *len = rawlen - sizeof(*type); return true; } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 4a5ba5b426..a2f15a9a03 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ static void WalRcvDie(int code, Datum arg); +static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); @@ -258,7 +259,7 @@ WalReceiverMain(void) /* Loop until end-of-streaming or error */ for (;;) { - XLogRecPtr recptr; + unsigned char type; char *buf; int len; @@ -287,17 +288,17 @@ WalReceiverMain(void) } /* Wait a while for data to arrive */ - if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len)) + if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { - /* Write received WAL records to disk */ - XLogWalRcvWrite(buf, len, recptr); + /* Accept the received data, and process it */ + XLogWalRcvProcessMsg(type, buf, len); - /* Receive any more WAL records we can without sleeping */ - while(walrcv_receive(0, &recptr, &buf, &len)) - XLogWalRcvWrite(buf, len, recptr); + /* Receive any more data we can without sleeping */ + while(walrcv_receive(0, &type, &buf, &len)) + XLogWalRcvProcessMsg(type, buf, len); /* - * Now that we've written some records, flush them to disk and + * If we've written some records, flush them to disk and * let the startup process know about them. */ XLogWalRcvFlush(); @@ -375,6 +376,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) exit(2); } +/* + * Accept the message from XLOG stream, and process it. + */ +static void +XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) +{ + switch (type) + { + case 'w': /* WAL records */ + { + XLogRecPtr recptr; + + if (len < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + recptr = *((XLogRecPtr *) buf); + buf += sizeof(XLogRecPtr); + len -= sizeof(XLogRecPtr); + XLogWalRcvWrite(buf, len, recptr); + break; + } + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid replication message type %d", + type))); + } +} + /* * Write XLOG data to disk. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0eb074ffe7..0115b70fa2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -30,7 +30,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.4 2010/01/27 16:41:09 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.5 2010/02/03 09:47:19 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -659,6 +659,7 @@ XLogSend(StringInfo outMsg) * have the same byte order. If they have different byte order, we * don't reach here. */ + pq_sendbyte(outMsg, 'w'); pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); if (endptr.xlogid != startptr.xlogid) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 083eb4f07f..bf7ad41b06 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -5,7 +5,7 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $ + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.6 2010/02/03 09:47:19 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -66,7 +66,8 @@ extern WalRcvData *WalRcv; typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); extern PGDLLIMPORT walrcv_connect_type walrcv_connect; -typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len); +typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, + char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef void (*walrcv_disconnect_type) (void);