diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 1e2604b832..70165115f5 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1423,10 +1423,15 @@ The commands accepted in walsender mode are: After streaming all the WAL on a timeline that is not the latest one, the server will end streaming by exiting the COPY mode. When the client - acknowledges this by also exiting COPY mode, the server sends a - single-row, single-column result set indicating the next timeline in - this server's history. That is followed by a CommandComplete message, - and the server is ready to accept a new command. + acknowledges this by also exiting COPY mode, the server sends a result + set with one row and two columns, indicating the next timeline in this + server's history. The first column is the next timeline's ID, and the + second column is the XLOG position where the switch happened. Usually, + the switch position is the end of the WAL that was streamed, but there + are corner cases where the server can send some WAL from the old + timeline that it has not itself replayed before promoting. Finally, the + server sends CommandComplete message, and is ready to accept a new + command. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 959f423187..f7dd61c4c7 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } else { - ptr = RecPtr; + ptr = tliRecPtr; tli = tliOfPointInHistory(tliRecPtr, expectedTLEs); if (curFileTLI > 0 && tli < curFileTLI) @@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + receivedUpto = 0; } /* * Move to XLOG_FROM_STREAM state in either case. We'll get diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e6e670e9e4..f7cc6e3c2f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli) res = PQgetResult(streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - /* Read the next timeline's ID */ - if (PQnfields(res) != 1 || PQntuples(res) != 1) + /* + * Read the next timeline's ID. The server also sends the timeline's + * starting point, but it is ignored. + */ + if (PQnfields(res) < 2 || PQntuples(res) != 1) ereport(ERROR, (errmsg("unexpected result set after end-of-streaming"))); *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index d414808c9f..e5ad84393f 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) walrcv->startTime = now; /* - * If this is the first startup of walreceiver, we initialize receivedUpto - * and latestChunkStart to receiveStart. + * If this is the first startup of walreceiver (on this timeline), + * initialize receivedUpto and latestChunkStart to the starting point. */ - if (walrcv->receiveStart == 0) + if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { walrcv->receivedUpto = recptr; + walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c05bb1e081..1dcb0f57f4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd) */ if (sendTimeLineIsHistoric) { - char str[11]; - snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); + char tli_str[11]; + char startpos_str[8+1+8+1]; - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 1, 2); /* 1 field */ + snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI); + snprintf(startpos_str, sizeof(startpos_str), "%X/%X", + (uint32) (sendTimeLineValidUpto >> 32), + (uint32) sendTimeLineValidUpto); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 2, 2); /* 2 fields */ /* Field header */ pq_sendstring(&buf, "next_tli"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ /* * int8 may seem like a surprising data type for this, but in theory * int4 would not be wide enough for this, as TimeLineID is unsigned. @@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + pq_sendstring(&buf, "next_tli_startpos"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 1, 2); /* number of columns */ - pq_sendint(&buf, strlen(str), 4); /* length */ - pq_sendbytes(&buf, str, strlen(str)); + pq_sendint(&buf, 2, 2); /* number of columns */ + + pq_sendint(&buf, strlen(tli_str), 4); /* length */ + pq_sendbytes(&buf, tli_str, strlen(tli_str)); + + pq_sendint(&buf, strlen(startpos_str), 4); /* length */ + pq_sendbytes(&buf, startpos_str, strlen(startpos_str)); + pq_endmessage(&buf); } @@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup) history = readTimeLineHistory(ThisTimeLineID); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); - Assert(sentPtr <= sendTimeLineValidUpto); + Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); - /* the current send pointer should be <= the switchpoint */ - if (!(sentPtr <= sendTimeLineValidUpto)) - elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X", - sendTimeLine, - (uint32) (sendTimeLineValidUpto >> 32), - (uint32) sendTimeLineValidUpto, - (uint32) (sentPtr >> 32), - (uint32) sentPtr); - sendTimeLineIsHistoric = true; SendRqstPtr = sendTimeLineValidUpto; @@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup) /* * If this is a historic timeline and we've reached the point where we * forked to the next timeline, stop streaming. + * + * Note: We might already have sent WAL > sendTimeLineValidUpto. The + * startup process will normally replay all WAL that has been received from + * the master, before promoting, but if the WAL streaming is terminated at + * a WAL page boundary, the valid portion of the timeline might end in the + * middle of a WAL record. We might've already sent the first half of that + * partial WAL record to the cascading standby, so that sentPtr > + * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the + * partial WAL record either, so it can still follow our timeline switch. */ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { @@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup) streamingDoneSending = true; *caughtup = true; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), (uint32) sentPtr); return; } diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index e4da799d1f..fa0ac5184c 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -83,10 +83,12 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) timeline); /* - * Note that we report the previous, not current, position here. That's - * the exact location where the timeline switch happend. After the switch, - * we restart streaming from the beginning of the segment, so xlogpos can - * smaller than prevpos if we just switched to new timeline. + * Note that we report the previous, not current, position here. After a + * timeline switch, xlogpos points to the beginning of the segment because + * that's where we always begin streaming. Reporting the end of previous + * timeline isn't totally accurate, because the next timeline can begin + * slightly before the end of the WAL that we received on the previous + * timeline, but it's close enough for reporting purposes. */ if (prevtimeline != 0 && prevtimeline != timeline) fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"), diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index f297003c62..98e874f4ff 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -37,6 +37,9 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos); +static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, + uint32 *timeline); + /* * Open a new WAL file in the specified directory. * @@ -627,26 +630,44 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * There are two possible reasons for that: a controlled shutdown, * or we reached the end of the current timeline. In case of * end-of-timeline, the server sends a result set after Copy has - * finished, containing the next timeline's ID. Read that, and - * restart streaming from the next timeline. + * finished, containing information about the next timeline. Read + * that, and restart streaming from the next timeline. In case of + * controlled shutdown, stop here. */ - if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* - * End-of-timeline. Read the next timeline's ID. + * End-of-timeline. Read the next timeline's ID and starting + * position. Usually, the starting position will match the end of + * the previous timeline, but there are corner cases like if the + * server had sent us half of a WAL record, when it was promoted. + * The new timeline will begin at the end of the last complete + * record in that case, overlapping the partial WAL record on the + * the old timeline. */ uint32 newtimeline; + bool parsed; - newtimeline = atoi(PQgetvalue(res, 0, 0)); + parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline); PQclear(res); + if (!parsed) + goto error; + /* Sanity check the values the server gave us */ if (newtimeline <= timeline) { - /* shouldn't happen */ fprintf(stderr, - "server reported unexpected next timeline %u, following timeline %u\n", - newtimeline, timeline); + _("%s: server reported unexpected next timeline %u, following timeline %u\n"), + progname, newtimeline, timeline); + goto error; + } + if (startpos > stoppos) + { + fprintf(stderr, + _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), + progname, + timeline, (uint32) (stoppos >> 32), (uint32) stoppos, + newtimeline, (uint32) (startpos >> 32), (uint32) startpos); goto error; } @@ -666,7 +687,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Always start streaming at the beginning of a segment. */ timeline = newtimeline; - startpos = stoppos - (stoppos % XLOG_SEG_SIZE); + startpos = startpos - (startpos % XLOG_SEG_SIZE); continue; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -704,6 +725,50 @@ error: return false; } +/* + * Helper function to parse the result set returned by server after streaming + * has finished. On failure, prints an error to stderr and returns false. + */ +static bool +ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) +{ + uint32 startpos_xlogid, + startpos_xrecoff; + + /*---------- + * The result set consists of one row and two columns, e.g: + * + * next_tli | next_tli_startpos + * ----------+------------------- + * 4 | 0/9949AE0 + * + * next_tli is the timeline ID of the next timeline after the one that + * just finished streaming. next_tli_startpos is the XLOG position where + * the server switched to it. + *---------- + */ + if (PQnfields(res) < 2 || PQntuples(res) != 1) + { + fprintf(stderr, + _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 2); + return false; + } + + *timeline = atoi(PQgetvalue(res, 0, 0)); + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid, + &startpos_xrecoff) != 2) + { + fprintf(stderr, + _("%s: could not parse next timeline's starting point \"%s\"\n"), + progname, PQgetvalue(res, 0, 1)); + return false; + } + *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff; + + return true; +} + /* * The main loop of ReceiveXLogStream. Handles the COPY stream after * initiating streaming with the START_STREAMING command.