diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1eb877e5fc..60d40d4505 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -143,16 +143,6 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; -/* - * Are we doing recovery from XLOG stream? If so, we recover without using - * offline XLOG archives even though InArchiveRecovery==true. This flag is - * used only in standby mode. - */ -static bool InStreamingRecovery = false; - -/* The current log page is partially-filled, and so needs to be read again? */ -static bool needReread = false; - /* * Local copy of SharedRecoveryInProgress variable. True actually means "not * known, need to check the shared state". @@ -457,12 +447,16 @@ static uint32 openLogOff = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. + * will be just past that page. readLen indicates how much of the current + * page has been read into readBuf. */ static int readFile = -1; static uint32 readId = 0; static uint32 readSeg = 0; static uint32 readOff = 0; +static uint32 readLen = 0; +/* Is the currently open segment being streamed from primary? */ +static bool readStreamed = false; /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ static char *readBuf = NULL; @@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0; /* State information for XLOG reading */ static XLogRecPtr ReadRecPtr; /* start of last record read */ static XLogRecPtr EndRecPtr; /* end+1 of last record read */ -static XLogRecord *nextRecord = NULL; static TimeLineID lastPageTLI = 0; static XLogRecPtr minRecoveryPoint; /* local copy of @@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock); -static int XLogFileRead(uint32 log, uint32 seg, int emode); +static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, + bool fromArchive, bool notexistOk); +static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, + bool fromArchive); +static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, + bool randAccess); static void XLogFileClose(void); static bool RestoreArchivedFile(char *path, const char *xlogfname, const char *recovername, off_t expectedSize); @@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); -static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); -static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode); +static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); static List *readTimeLineHistory(TimeLineID targetTLI); @@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, static void WriteControlFile(void); static void ReadControlFile(void); static char *str_time(pg_time_t tnow); +static bool CheckForStandbyTrigger(void); #ifdef WAL_DEBUG static void xlog_outrec(StringInfo buf, XLogRecord *record); @@ -2586,13 +2584,72 @@ XLogFileOpen(uint32 log, uint32 seg) /* * Open a logfile segment for reading (during recovery). + * + * If fromArchive is true, the segment is retrieved from archive, otherwise + * it's read from pg_xlog. */ static int -XLogFileRead(uint32 log, uint32 seg, int emode) +XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, + bool fromArchive, bool notfoundOk) { - char path[MAXPGPATH]; char xlogfname[MAXFNAMELEN]; char activitymsg[MAXFNAMELEN + 16]; + char path[MAXPGPATH]; + int fd; + + XLogFileName(xlogfname, tli, log, seg); + + if (fromArchive) + { + /* Report recovery progress in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", + xlogfname); + set_ps_display(activitymsg, false); + + restoredFromArchive = RestoreArchivedFile(path, xlogfname, + "RECOVERYXLOG", + XLogSegSize); + if (!restoredFromArchive) + return -1; + } + else + { + XLogFilePath(path, tli, log, seg); + restoredFromArchive = false; + } + + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (fd >= 0) + { + /* Success! */ + curFileTLI = tli; + + /* Report recovery progress in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "recovering %s", + xlogfname); + set_ps_display(activitymsg, false); + + return fd; + } + if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" (log file %u, segment %u): %m", + path, log, seg))); + return -1; +} + +/* + * Open a logfile segment for reading (during recovery). + * + * This version searches for the segment with any TLI listed in expectedTLIs. + * If not in StandbyMode and fromArchive is true, the segment is also + * searched in pg_xlog if not found in archive. + */ +static int +XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive) +{ + char path[MAXPGPATH]; ListCell *cell; int fd; @@ -2613,40 +2670,23 @@ XLogFileRead(uint32 log, uint32 seg, int emode) if (tli < curFileTLI) break; /* don't bother looking at too-old TLIs */ - XLogFileName(xlogfname, tli, log, seg); - - if (InArchiveRecovery && !InStreamingRecovery) - { - /* Report recovery progress in PS display */ - snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", - xlogfname); - set_ps_display(activitymsg, false); - - restoredFromArchive = RestoreArchivedFile(path, xlogfname, - "RECOVERYXLOG", - XLogSegSize); - } - else - XLogFilePath(path, tli, log, seg); - - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); - if (fd >= 0) - { - /* Success! */ - curFileTLI = tli; - - /* Report recovery progress in PS display */ - snprintf(activitymsg, sizeof(activitymsg), "recovering %s", - xlogfname); - set_ps_display(activitymsg, false); - + fd = XLogFileRead(log, seg, emode, tli, fromArchive, true); + if (fd != -1) return fd; + + /* + * If not in StandbyMode, fall back to searching pg_xlog. In + * StandbyMode we're streaming segments from the primary to pg_xlog, + * and we mustn't confuse the (possibly partial) segments in pg_xlog + * with complete segments ready to be applied. We rather wait for + * the records to arrive through streaming. + */ + if (!StandbyMode && fromArchive) + { + fd = XLogFileRead(log, seg, emode, tli, false, true); + if (fd != -1) + return fd; } - if (errno != ENOENT) /* unexpected failure? */ - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" (log file %u, segment %u): %m", - path, log, seg))); } /* Couldn't find it. For simplicity, complain about front timeline */ @@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) * different filename that can't be confused with regular XLOG * files. */ - if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name)) + if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); @@ -3473,79 +3513,6 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) return true; } -/* - * Attempt to fetch an XLOG record. - * - * If RecPtr is not NULL, try to fetch a record at that position. Otherwise - * try to fetch a record just after the last one previously read. - * - * In standby mode, if we failed in reading a valid record and are not doing - * recovery from XLOG stream yet, we ignore the failure and start walreceiver - * process to fetch the record from the primary. Otherwise, returns NULL, - * or fails if emode is PANIC. (emode must be either PANIC or LOG.) - * - * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In - * this case, if we have to start XLOG streaming, we use RedoStartLSN as the - * streaming start position instead of RecPtr. - * - * The record is copied into readRecordBuf, so that on successful return, - * the returned record pointer always points there. - */ -static XLogRecord * -FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) -{ - if (StandbyMode && !InStreamingRecovery) - { - XLogRecord *record; - XLogRecPtr startlsn; - bool haveNextRecord = (nextRecord != NULL); - - /* An invalid record is OK here, so we set emode to DEBUG2 */ - record = ReadRecord(RecPtr, DEBUG2); - if (record != NULL) - return record; - - /* - * Start XLOG streaming if there is no more valid records available - * in the archive. - * - * We need to calculate the start position of XLOG streaming. If we - * read a record in the middle of a segment which doesn't exist in - * pg_xlog, we use the start of the segment as the start position. - * That prevents a broken segment (i.e., with no records in the - * first half of a segment) from being created by XLOG streaming, - * which might cause trouble later on if the segment is e.g - * archived. - */ - startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr; - if (startlsn.xrecoff % XLogSegSize != 0) - { - char xlogpath[MAXPGPATH]; - struct stat stat_buf; - uint32 log; - uint32 seg; - - XLByteToSeg(startlsn, log, seg); - XLogFilePath(xlogpath, recoveryTargetTLI, log, seg); - - if (stat(xlogpath, &stat_buf) != 0) - startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize; - } - RequestXLogStreaming(startlsn, PrimaryConnInfo); - - /* Needs to read the current page again if the next record is in it */ - needReread = haveNextRecord; - nextRecord = NULL; - - InStreamingRecovery = true; - ereport(LOG, - (errmsg("starting streaming recovery at %X/%X", - startlsn.xlogid, startlsn.xrecoff))); - } - - return ReadRecord(RecPtr, emode); -} - /* * Attempt to read an XLOG record. * @@ -3553,13 +3520,13 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) * try to read a record just after the last one previously read. * * If no valid record is available, returns NULL, or fails if emode is PANIC. - * (emode must be either PANIC, LOG or DEBUG2.) + * (emode must be either PANIC, LOG) * * The record is copied into readRecordBuf, so that on successful return, * the returned record pointer always points there. */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, int emode_arg) +ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) { XLogRecord *record; char *buffer; @@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) bool randAccess = false; uint32 len, total_len; - uint32 targetPageOff; uint32 targetRecOff; uint32 pageHeaderSize; - XLogRecPtr receivedUpto = {0,0}; - bool finished; int emode; /* @@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) * should never hit the end of WAL because we wait for it to be streamed. * Therefore treat any broken WAL as PANIC, instead of failing over. */ - if (InStreamingRecovery) + if (StandbyMode) emode = PANIC; else emode = emode_arg; @@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) if (RecPtr == NULL) { RecPtr = &tmpRecPtr; - /* fast case if next record is on same page */ - if (nextRecord != NULL) - { - record = nextRecord; - goto got_record; - } /* - * Align old recptr to next page if the current page is filled and - * doesn't need to be read again. + * Align recptr to next page if no more records can fit on the + * current page. */ - if (!needReread) + if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord) + { NextLogPage(tmpRecPtr); - /* We will account for page header size below */ + /* We will account for page header size below */ + } } else { @@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) randAccess = true; /* allow curFileTLI to go backwards too */ } - if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) - { - close(readFile); - readFile = -1; - } + /* Read the page containing the record */ + if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) + return NULL; - /* Is the target record ready yet? */ - if (InStreamingRecovery) - { - receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished); - if (finished) - { - if (emode_arg == PANIC) - ereport(PANIC, - (errmsg("streaming recovery ended"))); - else - return NULL; - } - } - - XLByteToSeg(*RecPtr, readId, readSeg); - if (readFile < 0) - { - /* Now it's okay to reset curFileTLI if random fetch */ - if (randAccess) - curFileTLI = 0; - - readFile = XLogFileRead(readId, readSeg, emode); - if (readFile < 0) - goto next_record_is_invalid; - - /* - * Whenever switching to a new WAL segment, we read the first page of - * the file and validate its header, even if that's not where the - * target record is. This is so that we can check the additional - * identification info that is present in the first page's "long" - * header. - */ - readOff = 0; - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) - goto next_record_is_invalid; - } - - targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - if (readOff != targetPageOff || needReread) - { - readOff = targetPageOff; - needReread = false; - if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not seek in log file %u, segment %u to offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) - goto next_record_is_invalid; - } pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; if (targetRecOff == 0) @@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) } record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ); -got_record:; - /* * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is * required. @@ -3838,58 +3725,35 @@ got_record:; } buffer = readRecordBuf; - nextRecord = NULL; len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ; if (total_len > len) { /* Need to reassemble record */ XLogContRecord *contrecord; - XLogRecPtr nextpagelsn = *RecPtr; + XLogRecPtr pagelsn; uint32 gotlen = len; + /* Initialize pagelsn to the beginning of the page this record is on */ + pagelsn = *RecPtr; + pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ; + memcpy(buffer, record, len); record = (XLogRecord *) buffer; buffer += len; for (;;) { - /* Is the next page ready yet? */ - if (InStreamingRecovery) + /* Calculate pointer to beginning of next page */ + pagelsn.xrecoff += XLOG_BLCKSZ; + if (pagelsn.xrecoff >= XLogFileSize) { - if (gotlen != len) - nextpagelsn.xrecoff += XLOG_BLCKSZ; - NextLogPage(nextpagelsn); - receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished); - if (finished) - { - if (emode_arg == PANIC) - ereport(PANIC, - (errmsg("streaming recovery ended"))); - else - return NULL; - } + (pagelsn.xlogid)++; + pagelsn.xrecoff = 0; } + /* Wait for the next page to become available */ + if (!XLogPageRead(&pagelsn, emode, false, false)) + return NULL; - readOff += XLOG_BLCKSZ; - if (readOff >= XLogSegSize) - { - close(readFile); - readFile = -1; - NextLogSeg(readId, readSeg); - readFile = XLogFileRead(readId, readSeg, emode); - if (readFile < 0) - goto next_record_is_invalid; - readOff = 0; - } - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - ereport(emode, - (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u: %m", - readId, readSeg, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) - goto next_record_is_invalid; + /* Check that the continuation record looks valid */ if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) { ereport(emode, @@ -3923,31 +3787,11 @@ got_record:; if (!RecordIsValid(record, *RecPtr, emode)) goto next_record_is_invalid; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); - if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize + - MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len)) - { - nextRecord = (XLogRecord *) ((char *) contrecord + - MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len)); - } EndRecPtr.xlogid = readId; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + pageHeaderSize + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len); - /* - * Check whether the current page needs to be read again. If there is no - * unread record in the current page (nextRecord == NULL), obviously we - * don't need to reread it. If we're not in streaming recovery mode yet, - * partially-filled page doesn't need to be reread because it is the - * last valid page. - */ - if (nextRecord != NULL && InStreamingRecovery && - XLByteLE(receivedUpto, EndRecPtr)) - { - nextRecord = NULL; - needReread = true; - } - ReadRecPtr = *RecPtr; /* needn't worry about XLOG SWITCH, it can't cross page boundaries */ return record; @@ -3956,26 +3800,9 @@ got_record:; /* Record does not cross a page boundary */ if (!RecordIsValid(record, *RecPtr, emode)) goto next_record_is_invalid; - if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ + - MAXALIGN(total_len)) - nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len)); EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); - /* - * Check whether the current page needs to be read again. If there is no - * unread record in the current page (nextRecord == NULL), obviously we - * don't need to reread it. If we're not in streaming recovery mode yet, - * partially-filled page doesn't need to be reread because it is the last - * valid page. - */ - if (nextRecord != NULL && InStreamingRecovery && - XLByteLE(receivedUpto, EndRecPtr)) - { - nextRecord = NULL; - needReread = true; - } - ReadRecPtr = *RecPtr; memcpy(buffer, record, total_len); @@ -3987,8 +3814,6 @@ got_record:; /* Pretend it extends to end of segment */ EndRecPtr.xrecoff += XLogSegSize - 1; EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize; - nextRecord = NULL; /* definitely not on same page */ - needReread = false; /* * Pretend that readBuf contains the last page of the segment. This is @@ -4005,7 +3830,6 @@ next_record_is_invalid:; close(readFile); readFile = -1; } - nextRecord = NULL; return NULL; } @@ -5730,7 +5554,7 @@ StartupXLOG(void) (errmsg("checkpoint record is at %X/%X", checkPointLoc.xlogid, checkPointLoc.xrecoff))); } - else if (InStreamingRecovery) + else if (StandbyMode) { /* * The last valid checkpoint record required for a streaming @@ -5938,12 +5762,12 @@ StartupXLOG(void) if (XLByteLT(checkPoint.redo, RecPtr)) { /* back up to find the record */ - record = FetchRecord(&(checkPoint.redo), PANIC, false); + record = ReadRecord(&(checkPoint.redo), PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = FetchRecord(NULL, LOG, false); + record = ReadRecord(NULL, LOG, false); } if (record != NULL) @@ -6096,7 +5920,7 @@ StartupXLOG(void) LastRec = ReadRecPtr; - record = FetchRecord(NULL, LOG, false); + record = ReadRecord(NULL, LOG, false); } while (record != NULL && recoveryContinue); /* @@ -6130,22 +5954,17 @@ StartupXLOG(void) /* * We are now done reading the xlog from stream. Turn off streaming - * recovery, and restart fetching the files (which would be required - * at end of recovery, e.g., timeline history file) from archive. + * recovery to force fetching the files (which would be required + * at end of recovery, e.g., timeline history file) from archive or + * pg_xlog. */ - if (InStreamingRecovery) - { - /* We are no longer in streaming recovery state */ - InStreamingRecovery = false; - ereport(LOG, - (errmsg("streaming recovery complete"))); - } + StandbyMode = false; /* * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ - record = ReadRecord(&LastRec, PANIC); + record = ReadRecord(&LastRec, PANIC, false); EndOfLog = EndRecPtr; XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg); @@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) return NULL; } - record = FetchRecord(&RecPtr, LOG, true); + record = ReadRecord(&RecPtr, LOG, true); if (record == NULL) { @@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags) } LWLockRelease(ControlFileLock); - /* Are we doing recovery from XLOG stream? */ - if (!InStreamingRecovery) - InStreamingRecovery = WalRcvInProgress(); - /* * Delete old log files (those no longer needed even for previous * checkpoint/restartpoint) to prevent the disk holding the xlog from @@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags) * streaming recovery we have to or the disk will eventually fill up from * old log files streamed from master. */ - if (InStreamingRecovery && (_logId || _logSeg)) + if (WalRcvInProgress() && (_logId || _logSeg)) { XLogRecPtr endptr; @@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void) */ if (shutdown_requested) proc_exit(1); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (IsUnderPostmaster && !PostmasterIsAlive(true)) + exit(1); } /* Main entry point for startup process */ @@ -8843,3 +8665,281 @@ StartupProcessMain(void) */ proc_exit(0); } + +/* + * Read the XLOG page containing RecPtr into readBuf (if not read already). + * Returns true if successful, false otherwise or fails if emode is PANIC. + * + * This is responsible for restoring files from archive as needed, as well + * as for waiting for the requested WAL record to arrive in standby mode. + */ +static bool +XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, + bool randAccess) +{ + static XLogRecPtr receivedUpto = {0, 0}; + bool switched_segment = false; + uint32 targetPageOff; + uint32 targetRecOff; + uint32 targetId; + uint32 targetSeg; + + XLByteToSeg(*RecPtr, targetId, targetSeg); + targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; + targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; + + /* Fast exit if we have read the record in the current buffer already */ + if (targetId == readId && targetSeg == readSeg && + targetPageOff == readOff && targetRecOff < readLen) + return true; + + /* + * See if we need to switch to a new segment because the requested record + * is not in the currently open one. + */ + if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) + { + close(readFile); + readFile = -1; + } + + XLByteToSeg(*RecPtr, readId, readSeg); + + /* See if we need to retrieve more data */ + if (readFile < 0 || + (readStreamed && !XLByteLT(*RecPtr, receivedUpto))) + { + if (StandbyMode) + { + bool last_restore_failed = false; + + /* + * In standby mode, wait for the requested record to become + * available, either via restore_command succeeding to restore + * the segment, or via walreceiver having streamed the record. + */ + for (;;) + { + if (WalRcvInProgress()) + { + /* + * While walreceiver is active, wait for new WAL to + * arrive from primary. + */ + receivedUpto = GetWalRcvWriteRecPtr(); + if (XLByteLT(*RecPtr, receivedUpto)) + { + /* + * Great, streamed far enough. Open the file if it's + * not open already. + */ + if (readFile < 0) + { + readFile = + XLogFileRead(readId, readSeg, PANIC, + recoveryTargetTLI, false, false); + switched_segment = true; + readStreamed = true; + } + break; + } + + if (CheckForStandbyTrigger()) + goto next_record_is_invalid; + + /* + * When streaming is active, we want to react quickly when + * the next WAL record arrives, so sleep only a bit. + */ + pg_usleep(100000L); /* 100ms */ + } + else + { + /* + * Until walreceiver manages to reconnect, poll the + * archive. + */ + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + /* Reset curFileTLI if random fetch. */ + if (randAccess) + curFileTLI = 0; + readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true); + switched_segment = true; + readStreamed = false; + if (readFile != -1) + { + elog(DEBUG1, "got WAL segment from archive"); + break; + } + + /* + * If we succeeded restoring some segments from archive + * since the last connection attempt (or we haven't + * tried streaming yet, retry immediately. But if we + * haven't, assume the problem is persistent, so be + * less aggressive. + */ + if (last_restore_failed) + { + /* + * Check to see if the trigger file exists. Note that + * we do this only after failure, so when you create + * the trigger file, we still finish replaying as much + * as we can before failover. + */ + if (CheckForStandbyTrigger()) + goto next_record_is_invalid; + pg_usleep(5000000L); /* 5 seconds */ + } + last_restore_failed = true; + + /* + * Nope, not found in archive. Try to stream it. + * + * If fetching_ckpt is TRUE, RecPtr points to the initial + * checkpoint location. In that case, we use RedoStartLSN + * as the streaming start position instead of RecPtr, so + * that when we later jump backwards to start redo at + * RedoStartLSN, we will have the logs streamed already. + */ + RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr, + PrimaryConnInfo); + } + + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + } + } + else + { + /* In archive or crash recovery. */ + if (readFile < 0) + { + /* Reset curFileTLI if random fetch. */ + if (randAccess) + curFileTLI = 0; + readFile = XLogFileReadAnyTLI(readId, readSeg, emode, + InArchiveRecovery); + switched_segment = true; + readStreamed = false; + if (readFile < 0) + return false; + } + } + } + + /* + * At this point, we have the right segment open and we know the + * requested record is in it. + */ + Assert(readFile != -1); + + /* + * If the current segment is being streamed from master, calculate + * how much of the current page we have received already. We know the + * requested record has been received, but this is for the benefit + * of future calls, to allow quick exit at the top of this function. + */ + if (readStreamed) + { + if (RecPtr->xlogid != receivedUpto.xlogid || + (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ)) + { + readLen = XLOG_BLCKSZ; + } + else + readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff; + } + else + readLen = XLOG_BLCKSZ; + + if (switched_segment && targetPageOff != 0) + { + /* + * Whenever switching to a new WAL segment, we read the first page of + * the file and validate its header, even if that's not where the + * target record is. This is so that we can check the additional + * identification info that is present in the first page's "long" + * header. + */ + readOff = 0; + if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + ereport(emode, + (errcode_for_file_access(), + errmsg("could not read from log file %u, segment %u, offset %u: %m", + readId, readSeg, readOff))); + goto next_record_is_invalid; + } + if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) + goto next_record_is_invalid; + } + + /* Read the requested page */ + readOff = targetPageOff; + if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) + { + ereport(emode, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, segment %u to offset %u: %m", + readId, readSeg, readOff))); + goto next_record_is_invalid; + } + if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + ereport(emode, + (errcode_for_file_access(), + errmsg("could not read from log file %u, segment %u, offset %u: %m", + readId, readSeg, readOff))); + goto next_record_is_invalid; + } + if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) + goto next_record_is_invalid; + + Assert(targetId == readId); + Assert(targetSeg == readSeg); + Assert(targetPageOff == readOff); + Assert(targetRecOff < readLen); + + return true; + +next_record_is_invalid: + if (readFile >= 0) + close(readFile); + readFile = -1; + readStreamed = false; + readLen = 0; + + return false; +} + +/* + * Check to see if the trigger file exists. If it does, request postmaster + * to shut down walreceiver, wait for it to exit, remove the trigger + * file, and return true. + */ +static bool +CheckForStandbyTrigger(void) +{ + struct stat stat_buf; + + if (TriggerFile == NULL) + return false; + + if (stat(TriggerFile, &stat_buf) == 0) + { + ereport(LOG, + (errmsg("trigger file found: %s", TriggerFile))); + ShutdownWalRcv(); + unlink(TriggerFile); + return true; + } + return false; +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 59f994bd16..6df11b8a74 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -37,7 +37,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.601 2010/01/15 09:19:02 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.602 2010/01/27 15:27:50 heikki Exp $ * * NOTES * @@ -224,9 +224,6 @@ static int Shutdown = NoShutdown; static bool FatalError = false; /* T if recovering from backend crash */ static bool RecoveryError = false; /* T if WAL recovery failed */ -/* If WalReceiverActive is true, restart walreceiver if it dies */ -static bool WalReceiverActive = false; - /* * We use a simple state machine to control startup, shutdown, and * crash recovery (which is rather like shutdown followed by startup). @@ -1469,11 +1466,6 @@ ServerLoop(void) if (PgStatPID == 0 && pmState == PM_RUN) PgStatPID = pgstat_start(); - /* If we have lost walreceiver, try to start a new one */ - if (WalReceiverPID == 0 && WalReceiverActive && - (pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT)) - WalReceiverPID = StartWalReceiver(); - /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS) WalReceiverPID == 0) { /* Startup Process wants us to start the walreceiver process. */ - WalReceiverActive = true; WalReceiverPID = StartWalReceiver(); } - if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER)) - { - /* The walreceiver process doesn't want to be restarted anymore */ - WalReceiverActive = false; - } - PG_SETMASK(&UnBlockSig); errno = save_errno; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f805e673e1..4a5ba5b426 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void InitWalRcv(void); -static void WalRcvKill(int code, Datum arg); +static void WalRcvDie(int code, Datum arg); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); @@ -153,21 +152,57 @@ static struct void WalReceiverMain(void) { - sigjmp_buf local_sigjmp_buf; - MemoryContext walrcv_context; char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - /* Load the libpq-specific functions */ - load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_disconnect == NULL) - elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + /* + * WalRcv should be set up already (if we are a backend, we inherit + * this by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + Assert(walrcv != NULL); - /* Mark walreceiver in progress */ - InitWalRcv(); + /* + * Mark walreceiver as running in shared memory. + * + * Do this as early as possible, so that if we fail later on, we'll + * set state to STOPPED. If we die before this, the startup process + * will keep waiting for us to start up, until it times out. + */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + switch(walrcv->walRcvState) + { + case WALRCV_STOPPING: + /* If we've already been requested to stop, don't start up. */ + walrcv->walRcvState = WALRCV_STOPPED; + /* fall through */ + + case WALRCV_STOPPED: + SpinLockRelease(&walrcv->mutex); + proc_exit(1); + break; + + case WALRCV_STARTING: + /* The usual case */ + break; + + case WALRCV_RUNNING: + /* Shouldn't happen */ + elog(PANIC, "walreceiver still running according to shared memory state"); + } + /* Advertise our PID so that the startup process can kill us */ + walrcv->pid = MyProcPid; + walrcv->walRcvState = WALRCV_RUNNING; + + /* Fetch information required to start streaming */ + strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + startpoint = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvDie, 0); /* * If possible, make this process a group leader, so that the postmaster @@ -200,81 +235,21 @@ WalReceiverMain(void) /* We allow SIGQUIT (quickdie) at all times */ sigdelset(&BlockSig, SIGQUIT); + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (walrcv_connect == NULL || walrcv_receive == NULL || + walrcv_disconnect == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + /* * Create a resource owner to keep track of our resources (not clear that * we need this, but may as well have one). */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. - */ - walrcv_context = AllocSetContextCreate(TopMemoryContext, - "Wal Receiver", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walrcv_context); - - /* - * If an exception is encountered, processing resumes here. - * - * This code is heavily based on bgwriter.c, q.v. - */ - if (sigsetjmp(local_sigjmp_buf, 1) != 0) - { - /* Since not using PG_TRY, must reset error stack by hand */ - error_context_stack = NULL; - - /* Reset WalRcvImmediateInterruptOK */ - DisableWalRcvImmediateExit(); - - /* Prevent interrupts while cleaning up */ - HOLD_INTERRUPTS(); - - /* Report the error to the server log */ - EmitErrorReport(); - - /* Disconnect any previous connection. */ - EnableWalRcvImmediateExit(); - walrcv_disconnect(); - DisableWalRcvImmediateExit(); - - /* - * Now return to normal top-level context and clear ErrorContext for - * next time. - */ - MemoryContextSwitchTo(walrcv_context); - FlushErrorState(); - - /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(walrcv_context); - - /* Now we can allow interrupts again */ - RESUME_INTERRUPTS(); - - /* - * Sleep at least 1 second after any error. A write error is likely - * to be repeated, and we don't want to be filling the error logs as - * fast as we can. - */ - pg_usleep(1000000L); - } - - /* We can now handle ereport(ERROR) */ - PG_exception_stack = &local_sigjmp_buf; - /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); - /* Fetch connection information from shared memory */ - SpinLockAcquire(&walrcv->mutex); - strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); - startpoint = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); - /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo, startpoint); @@ -330,63 +305,24 @@ WalReceiverMain(void) } } -/* Advertise our pid in shared memory, so that startup process can kill us. */ -static void -InitWalRcv(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * WalRcv should be set up already (if we are a backend, we inherit - * this by fork() or EXEC_BACKEND mechanism from the postmaster). - */ - if (walrcv == NULL) - elog(PANIC, "walreceiver control data uninitialized"); - - /* If we've already been requested to stop, don't start up */ - SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->pid == 0); - if (walrcv->walRcvState == WALRCV_STOPPED || - walrcv->walRcvState == WALRCV_STOPPING) - { - walrcv->walRcvState = WALRCV_STOPPED; - SpinLockRelease(&walrcv->mutex); - proc_exit(1); - } - walrcv->pid = MyProcPid; - SpinLockRelease(&walrcv->mutex); - - /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvKill, 0); -} - /* - * Clear our pid from shared memory at exit. + * Mark us as STOPPED in shared memory at exit. */ static void -WalRcvKill(int code, Datum arg) +WalRcvDie(int code, Datum arg) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - bool stopped = false; SpinLockAcquire(&walrcv->mutex); - if (walrcv->walRcvState == WALRCV_STOPPING || - walrcv->walRcvState == WALRCV_STOPPED) - { - walrcv->walRcvState = WALRCV_STOPPED; - stopped = true; - elog(LOG, "walreceiver stopped"); - } + Assert(walrcv->walRcvState == WALRCV_RUNNING || + walrcv->walRcvState == WALRCV_STOPPING); + walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); + /* Terminate the connection gracefully. */ walrcv_disconnect(); - - /* If requested to stop, tell postmaster to not restart us. */ - if (stopped) - SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); } /* SIGHUP: set flag to re-read config file at next convenient time */ diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index c1d7b55887..4fb132dcd4 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -18,6 +18,8 @@ #include #include +#include +#include #include #include @@ -30,8 +32,11 @@ WalRcvData *WalRcv = NULL; -static bool CheckForStandbyTrigger(void); -static void ShutdownWalRcv(void); +/* + * How long to wait for walreceiver to start up after requesting + * postmaster to launch it. In seconds. + */ +#define WALRCV_STARTUP_TIMEOUT 10 /* Report shared memory space needed by WalRcvShmemInit */ Size @@ -62,7 +67,7 @@ WalRcvShmemInit(void) /* Initialize the data structures */ MemSet(WalRcv, 0, WalRcvShmemSize()); - WalRcv->walRcvState = WALRCV_NOT_STARTED; + WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); } @@ -73,90 +78,51 @@ WalRcvInProgress(void) /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; + pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); + state = walrcv->walRcvState; + startTime = walrcv->startTime; + SpinLockRelease(&walrcv->mutex); - if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) + /* + * If it has taken too long for walreceiver to start up, give up. + * Setting the state to STOPPED ensures that if walreceiver later + * does start up after all, it will see that it's not supposed to be + * running and die without doing anything. + */ + if (state == WALRCV_STARTING) + { + pg_time_t now = (pg_time_t) time(NULL); + + if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) + { + SpinLockAcquire(&walrcv->mutex); + + if (walrcv->walRcvState == WALRCV_STARTING) + state = walrcv->walRcvState = WALRCV_STOPPED; + + SpinLockRelease(&walrcv->mutex); + } + } + + if (state != WALRCV_STOPPED) return true; else return false; } /* - * Wait for the XLOG record at given position to become available. - * - * 'recptr' indicates the byte position which caller wants to read the - * XLOG record up to. The byte position actually written and flushed - * by walreceiver is returned. It can be higher than the requested - * location, and the caller can safely read up to that point without - * calling WaitNextXLogAvailable() again. - * - * If WAL streaming is ended (because a trigger file is found), *finished - * is set to true and function returns immediately. The returned position - * can be lower than requested in that case. - * - * Called by the startup process during streaming recovery. + * Stop walreceiver (if running) and wait for it to die. */ -XLogRecPtr -WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished) -{ - static XLogRecPtr receivedUpto = {0, 0}; - - *finished = false; - - /* Quick exit if already known available */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; - - for (;;) - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* Update local status */ - SpinLockAcquire(&walrcv->mutex); - receivedUpto = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); - - /* If available already, leave here */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; - - /* Check to see if the trigger file exists */ - if (CheckForStandbyTrigger()) - { - *finished = true; - return receivedUpto; - } - - pg_usleep(100000L); /* 100ms */ - - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive(true)) - exit(1); - } -} - -/* - * Stop walreceiver and wait for it to die. - */ -static void +void ShutdownWalRcv(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - pid_t walrcvpid; + pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -164,15 +130,25 @@ ShutdownWalRcv(void) * restart itself. */ SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_RUNNING); - walrcv->walRcvState = WALRCV_STOPPING; - walrcvpid = walrcv->pid; + switch(walrcv->walRcvState) + { + case WALRCV_STOPPED: + break; + case WALRCV_STARTING: + walrcv->walRcvState = WALRCV_STOPPED; + break; + + case WALRCV_RUNNING: + walrcv->walRcvState = WALRCV_STOPPING; + /* fall through */ + case WALRCV_STOPPING: + walrcvpid = walrcv->pid; + break; + } SpinLockRelease(&walrcv->mutex); /* - * Pid can be 0, if no walreceiver process is active right now. - * Postmaster should restart it, and when it does, it will see the - * STOPPING state. + * Signal walreceiver process if it was still running. */ if (walrcvpid != 0) kill(walrcvpid, SIGTERM); @@ -193,30 +169,6 @@ ShutdownWalRcv(void) } } -/* - * Check to see if the trigger file exists. If it does, request postmaster - * to shut down walreceiver and wait for it to exit, and remove the trigger - * file. - */ -static bool -CheckForStandbyTrigger(void) -{ - struct stat stat_buf; - - if (TriggerFile == NULL) - return false; - - if (stat(TriggerFile, &stat_buf) == 0) - { - ereport(LOG, - (errmsg("trigger file found: %s", TriggerFile))); - ShutdownWalRcv(); - unlink(TriggerFile); - return true; - } - return false; -} - /* * Request postmaster to start walreceiver. * @@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + pg_time_t now = (pg_time_t) time(NULL); - Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); + /* + * We always start at the beginning of the segment. + * That prevents a broken segment (i.e., with no records in the + * first half of a segment) from being created by XLOG streaming, + * which might cause trouble later on if the segment is e.g + * archived. + */ + if (recptr.xrecoff % XLogSegSize != 0) + recptr.xrecoff -= recptr.xrecoff % XLogSegSize; + + /* It better be stopped before we try to restart it */ + Assert(walrcv->walRcvState == WALRCV_STOPPED); - /* locking is just pro forma here; walreceiver isn't started yet */ SpinLockAcquire(&walrcv->mutex); - walrcv->receivedUpto = recptr; if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; - walrcv->walRcvState = WALRCV_RUNNING; + walrcv->walRcvState = WALRCV_STARTING; + walrcv->startTime = now; + + walrcv->receivedUpto = recptr; SpinLockRelease(&walrcv->mutex); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); @@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void) return recptr; } + diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index a645d18b5d..083eb4f07f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -5,7 +5,7 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.4 2010/01/20 18:54:27 heikki Exp $ + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -27,10 +27,10 @@ */ typedef enum { - WALRCV_NOT_STARTED, - WALRCV_RUNNING, /* walreceiver has been started */ - WALRCV_STOPPING, /* requested to stop, but still running */ - WALRCV_STOPPED /* stopped and mustn't start up again */ + WALRCV_STOPPED, /* stopped and mustn't start up again */ + WALRCV_STARTING, /* launched, but the process hasn't initialized yet */ + WALRCV_RUNNING, /* walreceiver is running */ + WALRCV_STOPPING /* requested to stop, but still running */ } WalRcvState; /* Shared memory area for management of walreceiver process */ @@ -47,6 +47,7 @@ typedef struct */ pid_t pid; WalRcvState walRcvState; + pg_time_t startTime; /* * receivedUpto-1 is the last byte position that has been already @@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; extern void WalReceiverMain(void); extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); +extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index 75ef17a5a0..c49c2f5fd2 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.28 2010/01/15 09:19:09 heikki Exp $ + * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.29 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -30,7 +30,6 @@ typedef enum PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ - PMSIGNAL_SHUTDOWN_WALRECEIVER, /* shut down a walreceiver */ NUM_PMSIGNALS /* Must be last value of enum! */ } PMSignalReason;