diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3d72a162eb..f87020c909 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1359,14 +1359,18 @@ The commands accepted in walsender mode are: has already been recycled. On success, server responds with a CopyBothResponse message, and then starts to stream WAL to the frontend. WAL will continue to be streamed until the connection is broken; - no further commands will be accepted. + no further commands will be accepted. If the WAL sender process is + terminated normally (during postmaster shutdown), it will send a + CommandComplete message before exiting. This might not happen during an + abnormal shutdown, of course. WAL data is sent as a series of CopyData messages. (This allows other information to be intermixed; in particular the server can send an ErrorResponse message if it encounters a failure after beginning - to stream.) The payload in each CopyData message follows this format: + to stream.) The payload of each CopyData message from server to the + client contains a message of one of the following formats: @@ -1390,34 +1394,32 @@ The commands accepted in walsender mode are: - Byte8 + Int64 - The starting point of the WAL data in this message, given in - XLogRecPtr format. + The starting point of the WAL data in this message. - Byte8 + Int64 - The current end of WAL on the server, given in - XLogRecPtr format. + The current end of WAL on the server. - Byte8 + Int64 - The server's system clock at the time of transmission, - given in TimestampTz format. + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. @@ -1429,42 +1431,19 @@ The commands accepted in walsender mode are: A section of the WAL data stream. + + A single WAL record is never split across two XLogData messages. + When a WAL record crosses a WAL page boundary, and is therefore + already split using continuation records, it can be split at the page + boundary. In other words, the first main WAL record and its + continuation records can be sent in different XLogData messages. + - - - - A single WAL record is never split across two CopyData messages. - When a WAL record crosses a WAL page boundary, and is therefore - already split using continuation records, it can be split at the page - boundary. In other words, the first main WAL record and its - continuation records can be sent in different CopyData messages. - - - Note that all fields within the WAL data and the above-described header - will be in the sending server's native format. Endianness, and the - format for the timestamp, are unpredictable unless the receiver has - verified that the sender's system identifier matches its own - pg_control contents. - - - If the WAL sender process is terminated normally (during postmaster - shutdown), it will send a CommandComplete message before exiting. - This might not happen during an abnormal shutdown, of course. - - - - The receiving process can send replies back to the sender at any time, - using one of the following message formats (also in the payload of a - CopyData message): - - - - Primary keepalive message (B) @@ -1484,23 +1463,33 @@ The commands accepted in walsender mode are: - Byte8 + Int64 - The current end of WAL on the server, given in - XLogRecPtr format. + The current end of WAL on the server. - Byte8 + Int64 - The server's system clock at the time of transmission, - given in TimestampTz format. + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + Byte1 + + + + 1 means that the client should reply to this message as soon as + possible, to avoid a timeout disconnect. 0 otherwise. @@ -1511,6 +1500,12 @@ The commands accepted in walsender mode are: + + The receiving process can send replies back to the sender at any time, + using one of the following message formats (also in the payload of a + CopyData message): + + @@ -1532,45 +1527,56 @@ The commands accepted in walsender mode are: - Byte8 + Int64 The location of the last WAL byte + 1 received and written to disk - in the standby, in XLogRecPtr format. + in the standby. - Byte8 + Int64 The location of the last WAL byte + 1 flushed to disk in - the standby, in XLogRecPtr format. + the standby. - Byte8 + Int64 - The location of the last WAL byte + 1 applied in the standby, in - XLogRecPtr format. + The location of the last WAL byte + 1 applied in the standby. - Byte8 + Int64 - The server's system clock at the time of transmission, - given in TimestampTz format. + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + Byte1 + + + + If 1, the client requests the server to reply to this message + immediately. This can be used to ping the server, to test if + the connection is still healthy. @@ -1602,28 +1608,29 @@ The commands accepted in walsender mode are: - Byte8 + Int64 - The server's system clock at the time of transmission, - given in TimestampTz format. + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. - Byte4 + Int32 - The standby's current xmin. + The standby's current xmin. This may be 0, if the standby does not + support feedback, or is not yet in Hot Standby state. - Byte4 + Int32 diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1accdccea..62135037f1 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -39,9 +39,9 @@ #include #include "access/xlog_internal.h" +#include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" -#include "replication/walprotocol.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" @@ -93,8 +93,8 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; -static StandbyReplyMessage reply_message; -static StandbyHSFeedbackMessage feedback_message; +static StringInfoData reply_message; +static StringInfoData incoming_message; /* * About SIGTERM handling: @@ -279,10 +279,10 @@ WalReceiverMain(void) walrcv_connect(conninfo, startpoint); DisableWalRcvImmediateExit(); - /* Initialize LogstreamResult, reply_message and feedback_message */ + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); - MemSet(&reply_message, 0, sizeof(reply_message)); - MemSet(&feedback_message, 0, sizeof(feedback_message)); + initStringInfo(&reply_message); + initStringInfo(&incoming_message); /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); @@ -480,41 +480,58 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { + int hdrlen; + XLogRecPtr dataStart; + XLogRecPtr walEnd; + TimestampTz sendTime; + bool replyRequested; + + resetStringInfo(&incoming_message); + switch (type) { case 'w': /* WAL records */ { - WalDataMessageHeader msghdr; - - if (len < sizeof(WalDataMessageHeader)) + /* copy message to StringInfo */ + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); + if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); - /* memcpy is required here for alignment reasons */ - memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); + appendBinaryStringInfo(&incoming_message, buf, hdrlen); - ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime); + /* read the fields */ + dataStart = pq_getmsgint64(&incoming_message); + walEnd = pq_getmsgint64(&incoming_message); + sendTime = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); + ProcessWalSndrMessage(walEnd, sendTime); - buf += sizeof(WalDataMessageHeader); - len -= sizeof(WalDataMessageHeader); - XLogWalRcvWrite(buf, len, msghdr.dataStart); + buf += hdrlen; + len -= hdrlen; + XLogWalRcvWrite(buf, len, dataStart); break; } case 'k': /* Keepalive */ { - PrimaryKeepaliveMessage keepalive; - - if (len != sizeof(PrimaryKeepaliveMessage)) + /* copy message to StringInfo */ + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary"))); - /* memcpy is required here for alignment reasons */ - memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage)); + appendBinaryStringInfo(&incoming_message, buf, hdrlen); - ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime); + /* read the fields */ + walEnd = pq_getmsgint64(&incoming_message); + sendTime = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); + replyRequested = pq_getmsgbyte(&incoming_message); + + ProcessWalSndrMessage(walEnd, sendTime); /* If the primary requested a reply, send one immediately */ - if (keepalive.replyRequested) + if (replyRequested) XLogWalRcvSendReply(true, false); break; } @@ -685,7 +702,10 @@ XLogWalRcvFlush(bool dying) static void XLogWalRcvSendReply(bool force, bool requestReply) { - char buf[sizeof(StandbyReplyMessage) + 1]; + static XLogRecPtr writePtr = 0; + static XLogRecPtr flushPtr = 0; + XLogRecPtr applyPtr; + static TimestampTz sendTime = 0; TimestampTz now; /* @@ -708,28 +728,34 @@ XLogWalRcvSendReply(bool force, bool requestReply) * probably OK. */ if (!force - && XLByteEQ(reply_message.write, LogstreamResult.Write) - && XLByteEQ(reply_message.flush, LogstreamResult.Flush) - && !TimestampDifferenceExceeds(reply_message.sendTime, now, + && XLByteEQ(writePtr, LogstreamResult.Write) + && XLByteEQ(flushPtr, LogstreamResult.Flush) + && !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; + sendTime = now; /* Construct a new message */ - reply_message.write = LogstreamResult.Write; - reply_message.flush = LogstreamResult.Flush; - reply_message.apply = GetXLogReplayRecPtr(NULL); - reply_message.sendTime = now; - reply_message.replyRequested = requestReply; + writePtr = LogstreamResult.Write; + flushPtr = LogstreamResult.Flush; + applyPtr = GetXLogReplayRecPtr(NULL); - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", - (uint32) (reply_message.write >> 32), (uint32) reply_message.write, - (uint32) (reply_message.flush >> 32), (uint32) reply_message.flush, - (uint32) (reply_message.apply >> 32), (uint32) reply_message.apply); + resetStringInfo(&reply_message); + pq_sendbyte(&reply_message, 'r'); + pq_sendint64(&reply_message, writePtr); + pq_sendint64(&reply_message, flushPtr); + pq_sendint64(&reply_message, applyPtr); + pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendbyte(&reply_message, requestReply ? 1 : 0); - /* Prepend with the message type and send it. */ - buf[0] = 'r'; - memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); - walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); + /* Send it */ + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", + (uint32) (writePtr >> 32), (uint32) writePtr, + (uint32) (flushPtr >> 32), (uint32) flushPtr, + (uint32) (applyPtr >> 32), (uint32) applyPtr, + requestReply ? " (reply requested)" : ""); + + walrcv_send(reply_message.data, reply_message.len); } /* @@ -739,11 +765,11 @@ XLogWalRcvSendReply(bool force, bool requestReply) static void XLogWalRcvSendHSFeedback(void) { - char buf[sizeof(StandbyHSFeedbackMessage) + 1]; TimestampTz now; TransactionId nextXid; uint32 nextEpoch; TransactionId xmin; + static TimestampTz sendTime = 0; /* * If the user doesn't want status to be reported to the master, be sure @@ -758,9 +784,10 @@ XLogWalRcvSendHSFeedback(void) /* * Send feedback at most once per wal_receiver_status_interval. */ - if (!TimestampDifferenceExceeds(feedback_message.sendTime, now, + if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; + sendTime = now; /* * If Hot Standby is not yet active there is nothing to send. Check this @@ -783,25 +810,23 @@ XLogWalRcvSendHSFeedback(void) if (nextXid < xmin) nextEpoch--; - /* - * Always send feedback message. - */ - feedback_message.sendTime = now; - feedback_message.xmin = xmin; - feedback_message.epoch = nextEpoch; - elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u", - feedback_message.xmin, - feedback_message.epoch); + xmin, nextEpoch); - /* Prepend with the message type and send it. */ - buf[0] = 'h'; - memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); - walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); + /* Construct the the message and send it. */ + resetStringInfo(&reply_message); + pq_sendbyte(&reply_message, 'h'); + pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint(&reply_message, xmin, 4); + pq_sendint(&reply_message, nextEpoch, 4); + walrcv_send(reply_message.data, reply_message.len); } /* - * Keep track of important messages from primary. + * Update shared memory status upon receiving a message from primary. + * + * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest + * message, reported by primary. */ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2af38f1cbe..8774d7e822 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -48,7 +48,6 @@ #include "nodes/replnodes.h" #include "replication/basebackup.h" #include "replication/syncrep.h" -#include "replication/walprotocol.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/walsender_private.h" @@ -66,6 +65,16 @@ #include "utils/timeout.h" #include "utils/timestamp.h" +/* + * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. + * + * We don't have a good idea of what a good value would be; there's some + * overhead per message in both walsender and walreceiver, but on the other + * hand sending large batches makes walsender less responsive to signals + * because signals are checked only between messages. 128kB (with + * default 8k blocks) seems like a reasonable guess for now. + */ +#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* Array of WalSnds in shared memory */ WalSndCtlData *WalSndCtl = NULL; @@ -103,13 +112,10 @@ static uint32 sendOff = 0; */ static XLogRecPtr sentPtr = 0; -/* Buffer for processing reply messages. */ +/* Buffers for constructing outgoing messages and processing reply messages. */ +static StringInfoData output_message; static StringInfoData reply_message; -/* - * Buffer for constructing outgoing messages. - * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes) - */ -static char *output_message; +static StringInfoData tmpbuf; /* * Timestamp of the last receipt of the reply from the standby. @@ -526,17 +532,26 @@ ProcessStandbyMessage(void) static void ProcessStandbyReplyMessage(void) { - StandbyReplyMessage reply; + XLogRecPtr writePtr, + flushPtr, + applyPtr; + bool replyRequested; - pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage)); + /* the caller already consumed the msgtype byte */ + writePtr = pq_getmsgint64(&reply_message); + flushPtr = pq_getmsgint64(&reply_message); + applyPtr = pq_getmsgint64(&reply_message); + (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X", - (uint32) (reply.write >> 32), (uint32) reply.write, - (uint32) (reply.flush >> 32), (uint32) reply.flush, - (uint32) (reply.apply >> 32), (uint32) reply.apply); + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", + (uint32) (writePtr >> 32), (uint32) writePtr, + (uint32) (flushPtr >> 32), (uint32) flushPtr, + (uint32) (applyPtr >> 32), (uint32) applyPtr, + replyRequested ? " (reply requested)" : ""); /* Send a reply if the standby requested one. */ - if (reply.replyRequested) + if (replyRequested) WalSndKeepalive(false); /* @@ -548,9 +563,9 @@ ProcessStandbyReplyMessage(void) volatile WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); - walsnd->write = reply.write; - walsnd->flush = reply.flush; - walsnd->apply = reply.apply; + walsnd->write = writePtr; + walsnd->flush = flushPtr; + walsnd->apply = applyPtr; SpinLockRelease(&walsnd->mutex); } @@ -564,20 +579,25 @@ ProcessStandbyReplyMessage(void) static void ProcessStandbyHSFeedbackMessage(void) { - StandbyHSFeedbackMessage reply; TransactionId nextXid; uint32 nextEpoch; + TransactionId feedbackXmin; + uint32 feedbackEpoch; - /* Decipher the reply message */ - pq_copymsgbytes(&reply_message, (char *) &reply, - sizeof(StandbyHSFeedbackMessage)); + /* + * Decipher the reply message. The caller already consumed the msgtype + * byte. + */ + (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + feedbackXmin = pq_getmsgint(&reply_message, 4); + feedbackEpoch = pq_getmsgint(&reply_message, 4); elog(DEBUG2, "hot standby feedback xmin %u epoch %u", - reply.xmin, - reply.epoch); + feedbackXmin, + feedbackEpoch); /* Ignore invalid xmin (can't actually happen with current walreceiver) */ - if (!TransactionIdIsNormal(reply.xmin)) + if (!TransactionIdIsNormal(feedbackXmin)) return; /* @@ -589,18 +609,18 @@ ProcessStandbyHSFeedbackMessage(void) */ GetNextXidAndEpoch(&nextXid, &nextEpoch); - if (reply.xmin <= nextXid) + if (feedbackXmin <= nextXid) { - if (reply.epoch != nextEpoch) + if (feedbackEpoch != nextEpoch) return; } else { - if (reply.epoch + 1 != nextEpoch) + if (feedbackEpoch + 1 != nextEpoch) return; } - if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid)) + if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid)) return; /* epoch OK, but it's wrapped around */ /* @@ -610,9 +630,9 @@ ProcessStandbyHSFeedbackMessage(void) * cleanup conflicts on the standby server. * * There is a small window for a race condition here: although we just - * checked that reply.xmin precedes nextXid, the nextXid could have gotten + * checked that feedbackXmin precedes nextXid, the nextXid could have gotten * advanced between our fetching it and applying the xmin below, perhaps - * far enough to make reply.xmin wrap around. In that case the xmin we + * far enough to make feedbackXmin wrap around. In that case the xmin we * set here would be "in the future" and have no effect. No point in * worrying about this since it's too late to save the desired data * anyway. Assuming that the standby sends us an increasing sequence of @@ -625,7 +645,7 @@ ProcessStandbyHSFeedbackMessage(void) * safe, and if we're moving it backwards, well, the data is at risk * already since a VACUUM could have just finished calling GetOldestXmin.) */ - MyPgXact->xmin = reply.xmin; + MyPgXact->xmin = feedbackXmin; } /* Main loop of walsender process that streams the WAL over Copy messages. */ @@ -635,17 +655,12 @@ WalSndLoop(void) bool caughtup = false; /* - * Allocate buffer that will be used for each output message. We do this - * just once to reduce palloc overhead. The buffer must be made large - * enough for maximum-sized messages. - */ - output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE); - - /* - * Allocate buffer that will be used for processing reply messages. As - * above, do this just once to reduce palloc overhead. + * Allocate buffers that will be used for each outgoing and incoming + * message. We do this just once to reduce palloc overhead. */ + initStringInfo(&output_message); initStringInfo(&reply_message); + initStringInfo(&tmpbuf); /* Initialize the last reply timestamp */ last_reply_timestamp = GetCurrentTimestamp(); @@ -1048,7 +1063,6 @@ XLogSend(bool *caughtup) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; - WalDataMessageHeader msghdr; /* * Attempt to send all data that's already been written out and fsync'd to @@ -1125,25 +1139,31 @@ XLogSend(bool *caughtup) /* * OK to read and send the slice. */ - output_message[0] = 'w'; + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 'w'); + + pq_sendint64(&output_message, startptr); /* dataStart */ + pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, 0); /* sendtime, filled in last */ /* * Read the log directly into the output buffer to avoid extra memcpy * calls. */ - XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); + enlargeStringInfo(&output_message, nbytes); + XLogRead(&output_message.data[output_message.len], startptr, nbytes); + output_message.len += nbytes; + output_message.data[output_message.len] = '\0'; /* - * We fill the message header last so that the send timestamp is taken as - * late as possible. + * Fill the send timestamp last, so that it is taken as late as possible. */ - msghdr.dataStart = startptr; - msghdr.walEnd = SendRqstPtr; - msghdr.sendTime = GetCurrentTimestamp(); + resetStringInfo(&tmpbuf); + pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp()); + memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], + tmpbuf.data, sizeof(int64)); - memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader)); - - pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes); + pq_putmessage_noblock('d', output_message.data, output_message.len); sentPtr = endptr; @@ -1518,19 +1538,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) static void WalSndKeepalive(bool requestReply) { - PrimaryKeepaliveMessage keepalive_message; - - /* Construct a new message */ - keepalive_message.walEnd = sentPtr; - keepalive_message.sendTime = GetCurrentTimestamp(); - keepalive_message.replyRequested = requestReply; - elog(DEBUG2, "sending replication keepalive"); - /* Prepend with the message type and send it. */ - output_message[0] = 'k'; - memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); - pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1); + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 'k'); + pq_sendint64(&output_message, sentPtr); + pq_sendint64(&output_message, GetCurrentIntegerTimestamp()); + pq_sendbyte(&output_message, requestReply ? 1 : 0); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock('d', output_message.data, output_message.len); } /* diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 50ef8976be..6ff7385233 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1285,6 +1285,50 @@ GetCurrentTimestamp(void) return result; } +/* + * GetCurrentIntegerTimestamp -- get the current operating system time as int64 + * + * Result is the number of milliseconds since the Postgres epoch. If compiled + * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(), + * and is implemented as a macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +int64 +GetCurrentIntegerTimestamp(void) +{ + int64 result; + struct timeval tp; + + gettimeofday(&tp, NULL); + + result = (int64) tp.tv_sec - + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + result = (result * USECS_PER_SEC) + tp.tv_usec; + + return result; +} +#endif + +/* + * IntegetTimestampToTimestampTz -- convert an int64 timestamp to native format + * + * When compiled with --enable-integer-datetimes, this is implemented as a + * no-op macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +TimestampTz +IntegerTimestampToTimestampTz(int64 timestamp) +{ + TimestampTz result; + + result = timestamp / USECS_PER_SEC; + result += (timestamp % USECS_PER_SEC) / 1000000.0; + + return result; +} +#endif + /* * TimestampDifference -- convert the difference between two timestamps * into integer seconds and microseconds diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 404ff91715..aed90954e6 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -21,7 +21,6 @@ #include "postgres.h" #include "libpq-fe.h" #include "access/xlog_internal.h" -#include "replication/walprotocol.h" #include "utils/datetime.h" #include "utils/timestamp.h" @@ -34,14 +33,9 @@ #include -/* Size of the streaming replication protocol headers */ -#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader)) -#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage)) - /* fd for currently open WAL file */ static int walfile = -1; - /* * Open a new WAL file in the specified directory. Store the name * (not including the full directory) in namebuf. Assumes there is @@ -189,37 +183,34 @@ close_walfile(char *basedir, char *walname, bool segment_complete) /* * Local version of GetCurrentTimestamp(), since we are not linked with - * backend code. + * backend code. The protocol always uses integer timestamps, regardless of + * server setting. */ -static TimestampTz +static int64 localGetCurrentTimestamp(void) { - TimestampTz result; + int64 result; struct timeval tp; gettimeofday(&tp, NULL); - result = (TimestampTz) tp.tv_sec - + result = (int64) tp.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); -#ifdef HAVE_INT64_TIMESTAMP result = (result * USECS_PER_SEC) + tp.tv_usec; -#else - result = result + (tp.tv_usec / 1000000.0); -#endif return result; } /* - * Local version of TimestampDifference(), since we are not - * linked with backend code. + * Local version of TimestampDifference(), since we are not linked with + * backend code. */ static void -localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, +localTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs) { - TimestampTz diff = stop_time - start_time; + int64 diff = stop_time - start_time; if (diff <= 0) { @@ -228,13 +219,8 @@ localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, } else { -#ifdef HAVE_INT64_TIMESTAMP *secs = (long) (diff / USECS_PER_SEC); *microsecs = (int) (diff % USECS_PER_SEC); -#else - *secs = (long) diff; - *microsecs = (int) ((diff - *secs) * 1000000.0); -#endif } } @@ -243,17 +229,86 @@ localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, * linked with backend code. */ static bool -localTimestampDifferenceExceeds(TimestampTz start_time, - TimestampTz stop_time, +localTimestampDifferenceExceeds(int64 start_time, + int64 stop_time, int msec) { - TimestampTz diff = stop_time - start_time; + int64 diff = stop_time - start_time; -#ifdef HAVE_INT64_TIMESTAMP return (diff >= msec * INT64CONST(1000)); -#else - return (diff * 1000.0 >= msec); -#endif +} + +/* + * Converts an int64 to network byte order. + */ +static void +sendint64(int64 i, char *buf) +{ + uint32 n32; + + /* High order half first, since we're doing MSB-first */ + n32 = (uint32) (i >> 32); + n32 = htonl(n32); + memcpy(&buf[0], &n32, 4); + + /* Now the low order half */ + n32 = (uint32) i; + n32 = htonl(n32); + memcpy(&buf[4], &n32, 4); +} + +/* + * Converts an int64 from network byte order to native format. + */ +static int64 +recvint64(char *buf) +{ + int64 result; + uint32 h32; + uint32 l32; + + memcpy(&h32, buf, 4); + memcpy(&l32, buf + 4, 4); + h32 = ntohl(h32); + l32 = ntohl(l32); + + result = h32; + result <<= 32; + result |= l32; + + return result; +} + +/* + * Send a Standby Status Update message to server. + */ +static bool +sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now) +{ + char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + int len = 0; + + replybuf[len] = 'r'; + len += 1; + sendint64(blockpos, &replybuf[len]); /* write */ + len += 8; + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + len += 8; + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + len += 8; + sendint64(now, &replybuf[len]); /* sendTime */ + len += 8; + replybuf[len] = 0; /* replyRequested */ + len += 1; + + if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send feedback packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + + return true; } /* @@ -382,24 +437,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, standby_message_timeout)) { /* Time to send feedback! */ - char replybuf[sizeof(StandbyReplyMessage) + 1]; - StandbyReplyMessage *replymsg; - - replymsg = (StandbyReplyMessage *) (replybuf + 1); - replymsg->write = blockpos; - replymsg->flush = InvalidXLogRecPtr; - replymsg->apply = InvalidXLogRecPtr; - replymsg->sendTime = now; - replybuf[0] = 'r'; - - if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 || - PQflush(conn)) - { - fprintf(stderr, _("%s: could not send feedback packet: %s"), - progname, PQerrorMessage(conn)); + if (!sendFeedback(conn, blockpos, now)) goto error; - } - last_status = now; } @@ -419,12 +458,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, FD_SET(PQsocket(conn), &input_mask); if (standby_message_timeout) { - TimestampTz targettime; + int64 targettime; long secs; int usecs; - targettime = TimestampTzPlusMilliseconds(last_status, - standby_message_timeout - 1); + targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); localTimestampDifference(now, targettime, &secs, @@ -474,19 +512,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, PQerrorMessage(conn)); goto error; } + + /* Check the message type. */ if (copybuf[0] == 'k') { /* * keepalive message, sent in 9.2 and newer. We just ignore this * message completely, but need to skip past it in the stream. */ - if (r != STREAMING_KEEPALIVE_SIZE) - { - fprintf(stderr, - _("%s: keepalive message has incorrect size %d\n"), - progname, r); - goto error; - } continue; } else if (copybuf[0] != 'w') @@ -495,15 +528,22 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, copybuf[0]); goto error; } + + /* + * Read the header of the XLogData message, enclosed in the CopyData + * message. We only need the WAL location field (dataStart), the rest + * of the header is ignored. + */ +#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */) if (r < STREAMING_HEADER_SIZE + 1) { fprintf(stderr, _("%s: streaming header too small: %d\n"), progname, r); goto error; } + blockpos = recvint64(©buf[1]); /* Extract WAL location for this block */ - memcpy(&blockpos, copybuf + 1, 8); xlogoff = blockpos % XLOG_SEG_SIZE; /* diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h deleted file mode 100644 index 396d006ea7..0000000000 --- a/src/include/replication/walprotocol.h +++ /dev/null @@ -1,128 +0,0 @@ -/*------------------------------------------------------------------------- - * - * walprotocol.h - * Definitions relevant to the streaming WAL transmission protocol. - * - * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group - * - * src/include/replication/walprotocol.h - * - *------------------------------------------------------------------------- - */ -#ifndef _WALPROTOCOL_H -#define _WALPROTOCOL_H - -#include "access/xlogdefs.h" -#include "datatype/timestamp.h" - - -/* - * All messages from WalSender must contain these fields to allow us to - * correctly calculate the replication delay. - */ -typedef struct -{ - /* Current end of WAL on the sender */ - XLogRecPtr walEnd; - - /* Sender's system clock at the time of transmission */ - TimestampTz sendTime; - - /* - * If replyRequested is set, the client should reply immediately to this - * message, to avoid a timeout disconnect. - */ - bool replyRequested; -} WalSndrMessage; - - -/* - * Header for a WAL data message (message type 'w'). This is wrapped within - * a CopyData message at the FE/BE protocol level. - * - * The header is followed by actual WAL data. Note that the data length is - * not specified in the header --- it's just whatever remains in the message. - * - * walEnd and sendTime are not essential data, but are provided in case - * the receiver wants to adjust its behavior depending on how far behind - * it is. - */ -typedef struct -{ - /* WAL start location of the data included in this message */ - XLogRecPtr dataStart; - - /* Current end of WAL on the sender */ - XLogRecPtr walEnd; - - /* Sender's system clock at the time of transmission */ - TimestampTz sendTime; -} WalDataMessageHeader; - -/* - * Keepalive message from primary (message type 'k'). (lowercase k) - * This is wrapped within a CopyData message at the FE/BE protocol level. - * - * Note that the data length is not specified here. - */ -typedef WalSndrMessage PrimaryKeepaliveMessage; - -/* - * Reply message from standby (message type 'r'). This is wrapped within - * a CopyData message at the FE/BE protocol level. - * - * Note that the data length is not specified here. - */ -typedef struct -{ - /* - * The xlog locations that have been written, flushed, and applied by - * standby-side. These may be invalid if the standby-side is unable to or - * chooses not to report these. - */ - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; - - /* Sender's system clock at the time of transmission */ - TimestampTz sendTime; - - /* - * If replyRequested is set, the server should reply immediately to this - * message, to avoid a timeout disconnect. - */ - bool replyRequested; -} StandbyReplyMessage; - -/* - * Hot Standby feedback from standby (message type 'h'). This is wrapped within - * a CopyData message at the FE/BE protocol level. - * - * Note that the data length is not specified here. - */ -typedef struct -{ - /* - * The current xmin and epoch from the standby, for Hot Standby feedback. - * This may be invalid if the standby-side does not support feedback, or - * Hot Standby is not yet available. - */ - TransactionId xmin; - uint32 epoch; - - /* Sender's system clock at the time of transmission */ - TimestampTz sendTime; -} StandbyHSFeedbackMessage; - -/* - * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. - * - * We don't have a good idea of what a good value would be; there's some - * overhead per message in both walsender and walreceiver, but on the other - * hand sending large batches makes walsender less responsive to signals - * because signals are checked only between messages. 128kB (with - * default 8k blocks) seems like a reasonable guess for now. - */ -#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) - -#endif /* _WALPROTOCOL_H */ diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index e7cdb417e5..b4b402f018 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -206,13 +206,24 @@ extern Datum generate_series_timestamptz(PG_FUNCTION_ARGS); /* Internal routines (not fmgr-callable) */ extern TimestampTz GetCurrentTimestamp(void); - extern void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs); extern bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec); +/* + * Prototypes for functions to deal with integer timestamps, when the native + * format is float timestamps. + */ +#ifndef HAVE_INT64_TIMESTAMP +extern int64 GetCurrentIntegerTimestamp(void); +extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp); +#else +#define GetCurrentIntegerTimestamp() GetCurrentTimestamp() +#define IntegerTimestampToTimestampTz(timestamp) (timestamp) +#endif + extern TimestampTz time_t_to_timestamptz(pg_time_t tm); extern pg_time_t timestamptz_to_time_t(TimestampTz t);