Rewrite gather-write patch into something less obviously bolted on

after the fact.  Fix bug with incorrect test for whether we are at end
of logfile segment.  Arrange for writes triggered by XLogInsert's
is-cache-more-than-half-full test to synchronize with the cache boundaries,
so that in long transactions we tend to write alternating halves of the
cache rather than randomly chosen portions of it; this saves one more
write syscall per cache load.
This commit is contained in:
Tom Lane 2005-08-22 23:59:04 +00:00
parent ae94f10397
commit 9052537325
1 changed files with 143 additions and 124 deletions

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/22 00:41:28 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/22 23:59:04 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -70,19 +70,19 @@
* default method. We assume that fsync() is always available, and that
* configure determined whether fdatasync() is.
*/
#ifdef O_SYNC
#define CMP_OPEN_SYNC_FLAG O_SYNC
#if defined(O_SYNC)
#define BARE_OPEN_SYNC_FLAG O_SYNC
#elif defined(O_FSYNC)
#define CMP_OPEN_SYNC_FLAG O_FSYNC
#define BARE_OPEN_SYNC_FLAG O_FSYNC
#endif
#ifdef CMP_OPEN_SYNC_FLAG
#define OPEN_SYNC_FLAG (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
#ifdef BARE_OPEN_SYNC_FLAG
#define OPEN_SYNC_FLAG (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
#endif
#ifdef O_DSYNC
#ifdef OPEN_SYNC_FLAG
#if defined(O_DSYNC)
#if defined(OPEN_SYNC_FLAG)
/* O_DSYNC is distinct? */
#if O_DSYNC != CMP_OPEN_SYNC_FLAG
#if O_DSYNC != BARE_OPEN_SYNC_FLAG
#define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT)
#endif
#else /* !defined(OPEN_SYNC_FLAG) */
@ -91,7 +91,7 @@
#endif
#endif
#ifdef OPEN_DATASYNC_FLAG
#if defined(OPEN_DATASYNC_FLAG)
#define DEFAULT_SYNC_METHOD_STR "open_datasync"
#define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN
#define DEFAULT_SYNC_FLAGBIT OPEN_DATASYNC_FLAG
@ -469,7 +469,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static bool XLogCheckBuffer(XLogRecData *rdata,
XLogRecPtr *lsn, BkpBlock *bkpb);
static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
@ -497,18 +497,6 @@ static void ReadControlFile(void);
static char *str_time(time_t tnow);
static void issue_xlog_fsync(void);
/* XLog gather-write stuff */
typedef struct XLogPages
{
char *head; /* Start of first page to write */
Size size; /* Total bytes to write == count(pages) * BLCKSZ */
uint32 offset; /* Starting offset in xlog segment file */
} XLogPages;
static void XLogPageReset(XLogPages *pages);
static void XLogPageWrite(XLogPages *pages, int index);
static void XLogPageFlush(XLogPages *pages, int index);
#ifdef WAL_DEBUG
static void xlog_outrec(char *buf, XLogRecord *record);
#endif
@ -726,9 +714,17 @@ begin:;
{
if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
{
/*
* Since the amount of data we write here is completely optional
* anyway, tell XLogWrite it can be "flexible" and stop at a
* convenient boundary. This allows writes triggered by this
* mechanism to synchronize with the cache boundaries, so that
* in a long transaction we'll basically dump alternating halves
* of the buffer array.
*/
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
XLogWrite(LogwrtRqst);
XLogWrite(LogwrtRqst, true);
LWLockRelease(WALWriteLock);
}
}
@ -1219,7 +1215,7 @@ AdvanceXLInsertBuffer(void)
WriteRqst.Write = OldPageRqstPtr;
WriteRqst.Flush.xlogid = 0;
WriteRqst.Flush.xrecoff = 0;
XLogWrite(WriteRqst);
XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
}
@ -1279,16 +1275,24 @@ AdvanceXLInsertBuffer(void)
/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
* If flexible == TRUE, we don't have to write as far as WriteRqst, but
* may stop at any convenient boundary (such as a cache or logfile boundary).
* This option allows us to avoid uselessly issuing multiple writes when a
* single one would do.
*
* Must be called with WALWriteLock held.
*/
static void
XLogWrite(XLogwrtRqst WriteRqst)
XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage;
bool finishing_seg;
bool use_existent;
int currentIndex = Write->curridx;
XLogPages pages;
int curridx;
int npages;
int startidx;
uint32 startoffset;
/* We should always be inside a critical section here */
Assert(CritSectionCount > 0);
@ -1299,7 +1303,27 @@ XLogWrite(XLogwrtRqst WriteRqst)
*/
LogwrtResult = Write->LogwrtResult;
XLogPageReset(&pages);
/*
* Since successive pages in the xlog cache are consecutively allocated,
* we can usually gather multiple pages together and issue just one
* write() call. npages is the number of pages we have determined can
* be written together; startidx is the cache block index of the first
* one, and startoffset is the file offset at which it should go.
* The latter two variables are only valid when npages > 0, but we must
* initialize all of them to keep the compiler quiet.
*/
npages = 0;
startidx = 0;
startoffset = 0;
/*
* Within the loop, curridx is the cache block index of the page to
* consider writing. We advance Write->curridx only after successfully
* writing pages. (Right now, this refinement is useless since we are
* going to PANIC if any error occurs anyway; but someday it may come
* in useful.)
*/
curridx = Write->curridx;
while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
{
@ -1309,22 +1333,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
* end of the last page that's been initialized by
* AdvanceXLInsertBuffer.
*/
if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
XLogCtl->xlblocks[currentIndex].xlogid,
XLogCtl->xlblocks[currentIndex].xrecoff);
XLogCtl->xlblocks[curridx].xlogid,
XLogCtl->xlblocks[curridx].xrecoff);
/* Advance LogwrtResult.Write to end of current buffer page */
LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
LogwrtResult.Write = XLogCtl->xlblocks[curridx];
ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
{
/*
* Switch to new logfile segment.
* Switch to new logfile segment. We cannot have any pending
* pages here (since we dump what we have at segment end).
*/
XLogPageFlush(&pages, currentIndex);
Assert(npages == 0);
if (openLogFile >= 0)
{
if (close(openLogFile))
@ -1391,6 +1416,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
LWLockRelease(ControlFileLock);
}
/* Make sure we have the current logfile open */
if (openLogFile < 0)
{
XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
@ -1398,27 +1424,83 @@ XLogWrite(XLogwrtRqst WriteRqst)
openLogOff = 0;
}
/* Add a page to buffer */
XLogPageWrite(&pages, currentIndex);
/* Add current page to the set of pending pages-to-dump */
if (npages == 0)
{
/* first of group */
startidx = curridx;
startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
}
npages++;
/*
* If we just wrote the whole last page of a logfile segment,
* fsync the segment immediately. This avoids having to go back
* and re-open prior segments when an fsync request comes along
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage.
* Dump the set if this will be the last loop iteration, or if
* we are at the last page of the cache area (since the next page
* won't be contiguous in memory), or if we are at the end of the
* logfile segment.
*/
if (openLogOff + pages.size >= XLogSegSize && !ispartialpage)
{
XLogPageFlush(&pages, currentIndex);
issue_xlog_fsync();
LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */
finishing_seg = !ispartialpage &&
(startoffset + npages * BLCKSZ) >= XLogSegSize;
if (XLogArchivingActive())
XLogArchiveNotifySeg(openLogId, openLogSeg);
if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
curridx == XLogCtl->XLogCacheBlck ||
finishing_seg)
{
char *from;
Size nbytes;
/* Need to seek in the file? */
if (openLogOff != startoffset)
{
if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not seek in log file %u, "
"segment %u to offset %u: %m",
openLogId, openLogSeg, startoffset)));
openLogOff = startoffset;
}
/* OK to write the page(s) */
from = XLogCtl->pages + startidx * (Size) BLCKSZ;
nbytes = npages * (Size) BLCKSZ;
errno = 0;
if (write(openLogFile, from, nbytes) != nbytes)
{
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log file %u, segment %u "
"at offset %u length %lu: %m",
openLogId, openLogSeg,
openLogOff, (unsigned long) nbytes)));
}
/* Update state for write */
openLogOff += nbytes;
Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
npages = 0;
/*
* If we just wrote the whole last page of a logfile segment,
* fsync the segment immediately. This avoids having to go back
* and re-open prior segments when an fsync request comes along
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage.
*/
if (finishing_seg)
{
issue_xlog_fsync();
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
XLogArchiveNotifySeg(openLogId, openLogSeg);
}
}
if (ispartialpage)
@ -1427,9 +1509,15 @@ XLogWrite(XLogwrtRqst WriteRqst)
LogwrtResult.Write = WriteRqst.Write;
break;
}
currentIndex = NextBufIdx(currentIndex);
curridx = NextBufIdx(curridx);
/* If flexible, break out of loop as soon as we wrote something */
if (flexible && npages == 0)
break;
}
XLogPageFlush(&pages, currentIndex);
Assert(npages == 0);
Assert(curridx == Write->curridx);
/*
* If asked to flush, do so
@ -1572,7 +1660,7 @@ XLogFlush(XLogRecPtr record)
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = record;
}
XLogWrite(WriteRqst);
XLogWrite(WriteRqst, false);
}
LWLockRelease(WALWriteLock);
}
@ -5898,72 +5986,3 @@ remove_backup_label(void)
errmsg("could not remove file \"%s\": %m",
BACKUP_LABEL_FILE)));
}
/* XLog gather-write stuff */
static void
XLogPageReset(XLogPages *pages)
{
memset(pages, 0, sizeof(*pages));
}
static void
XLogPageWrite(XLogPages *pages, int index)
{
char *page = XLogCtl->pages + index * (Size) BLCKSZ;
Size size = BLCKSZ;
uint32 offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
if (pages->head + pages->size == page &&
pages->offset + pages->size == offset)
{ /* Pages are continuous. Append new page. */
pages->size += size;
}
else
{ /* Pages are not continuous. Flush and clear. */
XLogPageFlush(pages, PrevBufIdx(index));
pages->head = page;
pages->size = size;
pages->offset = offset;
}
}
static void
XLogPageFlush(XLogPages *pages, int index)
{
if (!pages->head)
{ /* Nothing to write */
XLogCtl->Write.curridx = index;
return;
}
/* Need to seek in the file? */
if (openLogOff != pages->offset)
{
openLogOff = pages->offset;
if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not seek in log file %u, segment %u to offset %u: %m",
openLogId, openLogSeg, openLogOff)));
}
/* OK to write the page */
errno = 0;
if (write(openLogFile, pages->head, pages->size) != pages->size)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log file %u, segment %u length %u at offset %u: %m",
openLogId, openLogSeg,
(unsigned int) pages->size, openLogOff)));
}
openLogOff += pages->size;
XLogCtl->Write.curridx = index;
XLogPageReset(pages);
}