From b5ec22bf5e8f19964f7c7d7ef357ff947a38c7fc Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Thu, 9 Sep 2021 23:58:05 +0900 Subject: [PATCH] Fix issue with WAL archiving in standby. Previously, walreceiver always closed the currently-opened WAL segment and created its archive notification file, after it finished writing the current segment up and received any WAL data that should be written into the next segment. If walreceiver exited just before any WAL data in the next segment arrived at standby, it did not create the archive notification file of the current segment even though that's known completed. This behavior could cause WAL archiving of the segment to be delayed until subsequent restartpoints or checkpoints created its notification file. To fix the issue, this commit changes walreceiver so that it creates an archive notification file of a current WAL segment immediately if that's known completed before receiving next WAL data. Back-patch to all supported branches. Reported-by: Kyotaro Horiguchi Author: Fujii Masao Reviewed-by: Kyotaro Horiguchi Discussion: https://postgr.es/m/20200630.165503.1465894182551545886.horikyota.ntt@gmail.com --- src/backend/replication/walreceiver.c | 99 +++++++++++++++++---------- 1 file changed, 62 insertions(+), 37 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index faeea9f0cc..4831a259c4 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -125,6 +125,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); +static void XLogWalRcvClose(XLogRecPtr recptr); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -883,47 +884,16 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + /* Close the current segment if it's completed */ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + XLogWalRcvClose(recptr); + + if (recvFile < 0) { - bool use_existent; - - /* - * fsync() and close current file before we switch to next one. We - * would otherwise have to reopen this file to fsync it later - */ - if (recvFile >= 0) - { - char xlogfname[MAXFNAMELEN]; - - XLogWalRcvFlush(false); - - XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); - - /* - * XLOG segment files will be re-read by recovery in startup - * process soon, so we don't advise the OS to release cache - * pages associated with the file like XLogFileClose() does. - */ - if (close(recvFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log segment %s: %m", - xlogfname))); - - /* - * Create .done file forcibly to prevent the streamed segment - * from being archived later. - */ - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else - XLogArchiveNotify(xlogfname); - } - recvFile = -1; + bool use_existent = true; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); - use_existent = true; recvFile = XLogFileInit(recvSegNo, &use_existent, true); recvFileTLI = ThisTimeLineID; } @@ -970,6 +940,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* Update shared-memory status */ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + + /* + * Close the current segment if it's fully written up in the last cycle of + * the loop, to create its archive notification file soon. Otherwise WAL + * archiving of the segment will be delayed until any data in the next + * segment is received and written. + */ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + XLogWalRcvClose(recptr); } /* @@ -1023,6 +1002,52 @@ XLogWalRcvFlush(bool dying) } } +/* + * Close the current segment. + * + * Flush the segment to disk before closing it. Otherwise we have to + * reopen and fsync it later. + * + * Create an archive notification file since the segment is known completed. + */ +static void +XLogWalRcvClose(XLogRecPtr recptr) +{ + char xlogfname[MAXFNAMELEN]; + + Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)); + + /* + * fsync() and close current file before we switch to next one. We would + * otherwise have to reopen this file to fsync it later + */ + XLogWalRcvFlush(false); + + XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); + + /* + * XLOG segment files will be re-read by recovery in startup process soon, + * so we don't advise the OS to release cache pages associated with the + * file like XLogFileClose() does. + */ + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + + /* + * Create .done file forcibly to prevent the streamed segment from being + * archived later. + */ + if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) + XLogArchiveForceDone(xlogfname); + else + XLogArchiveNotify(xlogfname); + + recvFile = -1; +} + /* * Send reply message to primary, indicating our current WAL locations, oldest * xmin and the current time.