/*------------------------------------------------------------------------- * * bufmgr.c * buffer manager interface routines * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.160 2004/02/12 20:07:26 tgl Exp $ * *------------------------------------------------------------------------- */ /* * * BufferAlloc() -- lookup a buffer in the buffer table. If * it isn't there add it, but do not read data into memory. * This is used when we are about to reinitialize the * buffer so don't care what the current disk contents are. * BufferAlloc() also pins the new buffer in memory. * * ReadBuffer() -- like BufferAlloc() but reads the data * on a buffer cache miss. * * ReleaseBuffer() -- unpin the buffer * * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty" * but don't unpin. The disk IO is delayed until buffer * replacement. * * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer() * * BufferSync() -- flush all dirty buffers in the buffer pool. * * InitBufferPool() -- Init the buffer module. * * See other files: * freelist.c -- chooses victim for buffer replacement * buf_table.c -- manages the buffer lookup table */ #include "postgres.h" #include #include #include #include #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" #include "storage/proc.h" #include "storage/smgr.h" #include "utils/relcache.h" #include "pgstat.h" #define BufferGetLSN(bufHdr) \ (*((XLogRecPtr*) MAKE_PTR((bufHdr)->data))) /* GUC variable */ bool zero_damaged_pages = false; int BgWriterDelay = 200; int BgWriterPercent = 1; int BgWriterMaxpages = 100; static void WaitIO(BufferDesc *buf); static void StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf); static void ContinueBufferIO(BufferDesc *buf, bool forInput); static void buffer_write_error_callback(void *arg); /* * Macro : BUFFER_IS_BROKEN * Note that write error doesn't mean the buffer broken */ #define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY)) static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum, bool bufferLockHeld); static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr); static void BufferReplace(BufferDesc *bufHdr); #ifdef NOT_USED void PrintBufferDescs(void); #endif static void write_buffer(Buffer buffer, bool unpin); /* * ReadBuffer -- returns a buffer containing the requested * block of the requested relation. If the blknum * requested is P_NEW, extend the relation file and * allocate a new block. (Caller is responsible for * ensuring that only one backend tries to extend a * relation at the same time!) * * Returns: the buffer number for the buffer containing * the block read, or NULL on an error. If successful, * the returned buffer has been pinned. * * Assume when this function is called, that reln has been * opened already. * * Note: a side effect of a P_NEW call is to update reln->rd_nblocks. */ Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { return ReadBufferInternal(reln, blockNum, false); } /* * ReadBufferInternal -- internal version of ReadBuffer with more options * * bufferLockHeld: if true, caller already acquired the bufmgr lock. * (This is assumed never to be true if dealing with a local buffer!) */ static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum, bool bufferLockHeld) { BufferDesc *bufHdr; bool found; bool isExtend; bool isLocalBuf; isExtend = (blockNum == P_NEW); isLocalBuf = reln->rd_istemp; /* Open it at the smgr level if not already done */ if (reln->rd_smgr == NULL) reln->rd_smgr = smgropen(reln->rd_node); if (isLocalBuf) { ReadLocalBufferCount++; pgstat_count_buffer_read(&reln->pgstat_info, reln); /* Substitute proper block number if caller asked for P_NEW */ if (isExtend) { blockNum = reln->rd_nblocks; reln->rd_nblocks++; } bufHdr = LocalBufferAlloc(reln, blockNum, &found); if (found) { LocalBufferHitCount++; pgstat_count_buffer_hit(&reln->pgstat_info, reln); } } else { ReadBufferCount++; pgstat_count_buffer_read(&reln->pgstat_info, reln); /* Substitute proper block number if caller asked for P_NEW */ if (isExtend) { /* must be sure we have accurate file length! */ blockNum = reln->rd_nblocks = smgrnblocks(reln->rd_smgr); reln->rd_nblocks++; } /* * lookup the buffer. IO_IN_PROGRESS is set if the requested * block is not currently in memory. */ if (!bufferLockHeld) LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); bufHdr = BufferAlloc(reln, blockNum, &found); if (found) { BufferHitCount++; pgstat_count_buffer_hit(&reln->pgstat_info, reln); } } /* At this point we do NOT hold the bufmgr lock. */ if (!bufHdr) return InvalidBuffer; /* if it's already in the buffer pool, we're done */ if (found) { /* That is, we're done if we expected to be able to find it ... */ if (!isExtend) return BufferDescriptorGetBuffer(bufHdr); /* * If we found a buffer when we were expecting to extend the * relation, the implication is that a buffer was already created * for the next page position, but then smgrextend failed to write * the page. We'd better try the smgrextend again. But since * BufferAlloc won't have done StartBufferIO, we must do that * first. */ if (!isLocalBuf) { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); StartBufferIO(bufHdr, false); LWLockRelease(BufMgrLock); } } /* * if we have gotten to this point, the relation must be open in the smgr. */ if (isExtend) { /* new buffers are zero-filled */ MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ); smgrextend(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data)); } else { smgrread(reln->rd_smgr, blockNum, (char *) MAKE_PTR(bufHdr->data)); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) MAKE_PTR(bufHdr->data))) { /* * During WAL recovery, the first access to any data page should * overwrite the whole page from the WAL; so a clobbered page * header is not reason to fail. Hence, when InRecovery we may * always act as though zero_damaged_pages is ON. */ if (zero_damaged_pages || InRecovery) { ereport(WARNING, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page", blockNum, RelationGetRelationName(reln)))); MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ); } else ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("invalid page header in block %u of relation \"%s\"", blockNum, RelationGetRelationName(reln)))); } } if (isLocalBuf) { /* No shared buffer state to update... */ return BufferDescriptorGetBuffer(bufHdr); } /* lock buffer manager again to update IO IN PROGRESS */ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); /* IO Succeeded. clear the flags, finish buffer update */ bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS); /* If anyone was waiting for IO to complete, wake them up now */ TerminateBufferIO(bufHdr); LWLockRelease(BufMgrLock); return BufferDescriptorGetBuffer(bufHdr); } /* * BufferAlloc -- Get a buffer from the buffer pool but don't * read it. If successful, the returned buffer is pinned. * * Returns: descriptor for buffer * * BufMgrLock must be held at entry. When this routine returns, * the BufMgrLock is guaranteed NOT to be held. */ static BufferDesc * BufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) { BufferDesc *buf, *buf2; BufferTag newTag; /* identity of requested block */ bool inProgress; /* buffer undergoing IO */ /* create a new tag so we can lookup the buffer */ /* assume that the relation is already open */ INIT_BUFFERTAG(&newTag, reln, blockNum); /* see if the block is in the buffer pool already */ buf = StrategyBufferLookup(&newTag, false); if (buf != NULL) { /* * Found it. Now, (a) pin the buffer so no one steals it from the * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting * the buffer into the buffer pool. */ PinBuffer(buf); inProgress = (buf->flags & BM_IO_IN_PROGRESS); *foundPtr = TRUE; if (inProgress) /* confirm end of IO */ { WaitIO(buf); inProgress = (buf->flags & BM_IO_IN_PROGRESS); } if (BUFFER_IS_BROKEN(buf)) { /* * I couldn't understand the following old comment. If there's * no IO for the buffer and the buffer is BROKEN, it should be * read again. So start a new buffer IO here. * * wierd race condition: * * We were waiting for someone else to read the buffer. While we * were waiting, the reader boof'd in some way, so the * contents of the buffer are still invalid. By saying that * we didn't find it, we can make the caller reinitialize the * buffer. If two processes are waiting for this block, both * will read the block. The second one to finish may * overwrite any updates made by the first. (Assume higher * level synchronization prevents this from happening). * * This is never going to happen, don't worry about it. */ *foundPtr = FALSE; StartBufferIO(buf, true); } LWLockRelease(BufMgrLock); return buf; } *foundPtr = FALSE; /* * Didn't find it in the buffer pool. We'll have to initialize a new * buffer. First, grab one from the free list. If it's dirty, flush * it to disk. Remember to unlock BufMgrLock while doing the IOs. */ inProgress = FALSE; for (buf = NULL; buf == NULL;) { buf = StrategyGetBuffer(); /* GetFreeBuffer will abort if it can't find a free buffer */ Assert(buf); /* * There should be exactly one pin on the buffer after it is * allocated -- ours. If it had a pin it wouldn't have been on * the free list. No one else could have pinned it between * GetFreeBuffer and here because we have the BufMgrLock. */ Assert(buf->refcount == 0); buf->refcount = 1; PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1; if (buf->flags & BM_DIRTY || buf->cntxDirty) { /* * skip write error buffers */ if ((buf->flags & BM_IO_ERROR) != 0) { UnpinBuffer(buf); buf = NULL; continue; } /* * Set BM_IO_IN_PROGRESS to keep anyone from doing anything * with the contents of the buffer while we write it out. We * don't really care if they try to read it, but if they can * complete a BufferAlloc on it they can then scribble into * it, and we'd really like to avoid that while we are * flushing the buffer. Setting this flag should block them * in WaitIO until we're done. */ inProgress = TRUE; /* * All code paths that acquire this lock pin the buffer first; * since no one had it pinned (it just came off the free * list), no one else can have this lock. */ StartBufferIO(buf, false); /* * Write the buffer out, being careful to release BufMgrLock * before starting the I/O. */ BufferReplace(buf); /* * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't * be set by anyone. - vadim 01/17/97 */ if (buf->flags & BM_JUST_DIRTIED) { elog(PANIC, "content of block %u of %u/%u changed while flushing", buf->tag.blockNum, buf->tag.rnode.tblNode, buf->tag.rnode.relNode); } buf->flags &= ~BM_DIRTY; buf->cntxDirty = false; /* * Somebody could have pinned the buffer while we were doing * the I/O and had given up the BufMgrLock (though they would * be waiting for us to clear the BM_IO_IN_PROGRESS flag). * That's why this is a loop -- if so, we need to clear the * I/O flags, remove our pin and start all over again. * * People may be making buffers free at any time, so there's no * reason to think that we have an immediate disaster on our * hands. */ if (buf && buf->refcount > 1) { inProgress = FALSE; buf->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(buf); UnpinBuffer(buf); buf = NULL; } /* * Somebody could have allocated another buffer for the same * block we are about to read in. (While we flush out the * dirty buffer, we don't hold the lock and someone could have * allocated another buffer for the same block. The problem is * we haven't gotten around to insert the new tag into the * buffer table. So we need to check here. -ay 3/95 */ buf2 = StrategyBufferLookup(&newTag, true); if (buf2 != NULL) { /* * Found it. Someone has already done what we're about to * do. We'll just handle this as if it were found in the * buffer pool in the first place. */ if (buf != NULL) { buf->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(buf); /* give up old buffer since we don't need it any more */ UnpinBuffer(buf); } PinBuffer(buf2); inProgress = (buf2->flags & BM_IO_IN_PROGRESS); *foundPtr = TRUE; if (inProgress) { WaitIO(buf2); inProgress = (buf2->flags & BM_IO_IN_PROGRESS); } if (BUFFER_IS_BROKEN(buf2)) { *foundPtr = FALSE; StartBufferIO(buf2, true); } LWLockRelease(BufMgrLock); return buf2; } } } /* * At this point we should have the sole pin on a non-dirty buffer and * we may or may not already have the BM_IO_IN_PROGRESS flag set. */ /* * Tell the buffer replacement strategy that we are replacing the * buffer content. Then rename the buffer. */ StrategyReplaceBuffer(buf, reln, blockNum); INIT_BUFFERTAG(&(buf->tag), reln, blockNum); /* * Buffer contents are currently invalid. Have to mark IO IN PROGRESS * so no one fiddles with them until the read completes. If this * routine has been called simply to allocate a buffer, no io will be * attempted, so the flag isnt set. */ if (!inProgress) StartBufferIO(buf, true); else ContinueBufferIO(buf, true); LWLockRelease(BufMgrLock); return buf; } /* * write_buffer -- common functionality for * WriteBuffer and WriteNoReleaseBuffer */ static void write_buffer(Buffer buffer, bool release) { BufferDesc *bufHdr; if (BufferIsLocal(buffer)) { WriteLocalBuffer(buffer, release); return; } if (BAD_BUFFER_ID(buffer)) elog(ERROR, "bad buffer id: %d", buffer); bufHdr = &BufferDescriptors[buffer - 1]; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(bufHdr->refcount > 0); /* * If the buffer is not dirty yet, do vacuum cost accounting. */ if (!(bufHdr->flags & BM_DIRTY) && VacuumCostActive) VacuumCostBalance += VacuumCostPageDirty; bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); if (release) UnpinBuffer(bufHdr); LWLockRelease(BufMgrLock); } /* * WriteBuffer * * Marks buffer contents as dirty (actual write happens later). * * Assume that buffer is pinned. Assume that reln is valid. * * Side Effects: * Pin count is decremented. */ void WriteBuffer(Buffer buffer) { write_buffer(buffer, true); } /* * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer * when the operation is complete. */ void WriteNoReleaseBuffer(Buffer buffer) { write_buffer(buffer, false); } /* * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer() * to save a lock release/acquire. * * Also, if the passed buffer is valid and already contains the desired block * number, we simply return it without ever acquiring the lock at all. * Since the passed buffer must be pinned, it's OK to examine its block * number without getting the lock first. * * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old * buffer actually needs to be released. This case is the same as ReadBuffer, * but can save some tests in the caller. * * Also note: while it will work to call this routine with blockNum == P_NEW, * it's best to avoid doing so, since that would result in calling * smgrnblocks() while holding the bufmgr lock, hence some loss of * concurrency. */ Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum) { BufferDesc *bufHdr; if (BufferIsValid(buffer)) { if (BufferIsLocal(buffer)) { Assert(LocalRefCount[-buffer - 1] > 0); bufHdr = &LocalBufferDescriptors[-buffer - 1]; if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) return buffer; LocalRefCount[-buffer - 1]--; } else { Assert(PrivateRefCount[buffer - 1] > 0); bufHdr = &BufferDescriptors[buffer - 1]; if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) return buffer; if (PrivateRefCount[buffer - 1] > 1) PrivateRefCount[buffer - 1]--; else { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); UnpinBuffer(bufHdr); return ReadBufferInternal(relation, blockNum, true); } } } return ReadBufferInternal(relation, blockNum, false); } /* * BufferSync -- Write all dirty buffers in the pool. * * This is called at checkpoint time and writes out all dirty shared buffers, * and by the background writer process to write out some of the dirty blocks. */ int BufferSync(int percent, int maxpages) { int i; BufferDesc *bufHdr; ErrorContextCallback errcontext; int num_buffer_dirty; int *buffer_dirty; /* Setup error traceback support for ereport() */ errcontext.callback = buffer_write_error_callback; errcontext.arg = NULL; errcontext.previous = error_context_stack; error_context_stack = &errcontext; /* * Get a list of all currently dirty buffers and how many there are. * We do not flush buffers that get dirtied after we started. They * have to wait until the next checkpoint. */ buffer_dirty = (int *)palloc(NBuffers * sizeof(int)); LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); num_buffer_dirty = StrategyDirtyBufferList(buffer_dirty, NBuffers); LWLockRelease(BufMgrLock); /* * If called by the background writer, we are usually asked to * only write out some percentage of dirty buffers now, to prevent * the IO storm at checkpoint time. */ if (percent > 0 && num_buffer_dirty > 10) { Assert(percent <= 100); num_buffer_dirty = (num_buffer_dirty * percent) / 100; if (maxpages > 0 && num_buffer_dirty > maxpages) num_buffer_dirty = maxpages; } for (i = 0; i < num_buffer_dirty; i++) { Buffer buffer; XLogRecPtr recptr; SMgrRelation reln; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); bufHdr = &BufferDescriptors[buffer_dirty[i]]; errcontext.arg = bufHdr; if (!(bufHdr->flags & BM_VALID)) { LWLockRelease(BufMgrLock); continue; } /* * We can check bufHdr->cntxDirty here *without* holding any lock * on buffer context as long as we set this flag in access methods * *before* logging changes with XLogInsert(): if someone will set * cntxDirty just after our check we don't worry because of our * checkpoint.redo points before log record for upcoming changes * and so we are not required to write such dirty buffer. */ if (!(bufHdr->flags & BM_DIRTY) && !(bufHdr->cntxDirty)) { LWLockRelease(BufMgrLock); continue; } /* * IO synchronization. Note that we do it with unpinned buffer to * avoid conflicts with FlushRelationBuffers. */ if (bufHdr->flags & BM_IO_IN_PROGRESS) { WaitIO(bufHdr); if (!(bufHdr->flags & BM_VALID) || (!(bufHdr->flags & BM_DIRTY) && !(bufHdr->cntxDirty))) { LWLockRelease(BufMgrLock); continue; } } /* * Here: no one doing IO for this buffer and it's dirty. Pin * buffer now and set IO state for it *before* acquiring shlock to * avoid conflicts with FlushRelationBuffers. */ PinBuffer(bufHdr); StartBufferIO(bufHdr, false); /* output IO start */ buffer = BufferDescriptorGetBuffer(bufHdr); LWLockRelease(BufMgrLock); /* * Protect buffer content against concurrent update */ LockBuffer(buffer, BUFFER_LOCK_SHARE); /* * Force XLOG flush for buffer' LSN */ recptr = BufferGetLSN(bufHdr); XLogFlush(recptr); /* * Now it's safe to write buffer to disk. Note that no one else * should not be able to write it while we were busy with locking * and log flushing because of we setted IO flag. */ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty); bufHdr->flags &= ~BM_JUST_DIRTIED; LWLockRelease(BufMgrLock); /* Find smgr relation for buffer */ reln = smgropen(bufHdr->tag.rnode); /* And write... */ smgrwrite(reln, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); /* * Note that it's safe to change cntxDirty here because of we * protect it from upper writers by share lock and from other * bufmgr routines by BM_IO_IN_PROGRESS */ bufHdr->cntxDirty = false; /* * Release the per-buffer readlock, reacquire BufMgrLock. */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); BufferFlushCount++; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); bufHdr->flags &= ~BM_IO_IN_PROGRESS; /* mark IO finished */ TerminateBufferIO(bufHdr); /* Sync IO finished */ /* * If this buffer was marked by someone as DIRTY while we were * flushing it out we must not clear DIRTY flag - vadim 01/17/97 */ if (!(bufHdr->flags & BM_JUST_DIRTIED)) bufHdr->flags &= ~BM_DIRTY; UnpinBuffer(bufHdr); LWLockRelease(BufMgrLock); } pfree(buffer_dirty); /* Pop the error context stack */ error_context_stack = errcontext.previous; return num_buffer_dirty; } /* * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared. * * Should be entered with buffer manager lock held; releases it before * waiting and re-acquires it afterwards. */ static void WaitIO(BufferDesc *buf) { /* * Changed to wait until there's no IO - Inoue 01/13/2000 * * Note this is *necessary* because an error abort in the process doing * I/O could release the io_in_progress_lock prematurely. See * AbortBufferIO. */ while ((buf->flags & BM_IO_IN_PROGRESS) != 0) { LWLockRelease(BufMgrLock); LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); LWLockRelease(buf->io_in_progress_lock); LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); } } long NDirectFileRead; /* some I/O's are direct file access. * bypass bufmgr */ long NDirectFileWrite; /* e.g., I/O in psort and hashjoin. */ /* * Return a palloc'd string containing buffer usage statistics. */ char * ShowBufferUsage(void) { StringInfoData str; float hitrate; float localhitrate; initStringInfo(&str); if (ReadBufferCount == 0) hitrate = 0.0; else hitrate = (float) BufferHitCount *100.0 / ReadBufferCount; if (ReadLocalBufferCount == 0) localhitrate = 0.0; else localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount; appendStringInfo(&str, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n", ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate); appendStringInfo(&str, "!\tLocal blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n", ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate); appendStringInfo(&str, "!\tDirect blocks: %10ld read, %10ld written\n", NDirectFileRead, NDirectFileWrite); return str.data; } void ResetBufferUsage(void) { BufferHitCount = 0; ReadBufferCount = 0; BufferFlushCount = 0; LocalBufferHitCount = 0; ReadLocalBufferCount = 0; LocalBufferFlushCount = 0; NDirectFileRead = 0; NDirectFileWrite = 0; } /* * AtEOXact_Buffers - clean up at end of transaction. * * During abort, we need to release any buffer pins we're holding * (this cleans up in case ereport interrupted a routine that pins a * buffer). During commit, we shouldn't need to do that, but check * anyway to see if anyone leaked a buffer reference count. */ void AtEOXact_Buffers(bool isCommit) { int i; for (i = 0; i < NBuffers; i++) { if (PrivateRefCount[i] != 0) { BufferDesc *buf = &(BufferDescriptors[i]); if (isCommit) elog(WARNING, "buffer refcount leak: [%03d] (bufNext=%d, " "rel=%u/%u, blockNum=%u, flags=0x%x, refcount=%d %ld)", i, buf->bufNext, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); PrivateRefCount[i] = 1; /* make sure we release shared pin */ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); UnpinBuffer(buf); LWLockRelease(BufMgrLock); Assert(PrivateRefCount[i] == 0); } } AtEOXact_LocalBuffers(isCommit); } /* * FlushBufferPool * * Flush all dirty blocks in buffer pool to disk at the checkpoint time. * Local relations do not participate in checkpoints, so they don't need to be * flushed. */ void FlushBufferPool(void) { BufferSync(-1, -1); smgrsync(); } /* * BufferBackgroundWriter * * Periodically flushes dirty blocks from the buffer pool to keep * the LRU list clean, preventing regular backends from writing. */ void BufferBackgroundWriter(void) { if (BgWriterPercent == 0) return; /* * Loop forever */ for (;;) { int n; long udelay; /* * Call BufferSync() with instructions to keep just the * LRU heads clean. */ n = BufferSync(BgWriterPercent, BgWriterMaxpages); /* * Whatever signal is sent to us, let's just die gallantly. If * it wasn't meant that way, the postmaster will reincarnate us. */ if (InterruptPending) return; /* * Whenever we have nothing to do, close all smgr files. This * is so we won't hang onto smgr references to deleted files * indefinitely. XXX this is a bogus, temporary solution. 'Twould * be much better to do this once per checkpoint, but the bgwriter * doesn't yet know anything about checkpoints. */ if (n == 0) smgrcloseall(); /* * Nap for the configured time or sleep for 10 seconds if * there was nothing to do at all. * * On some platforms, signals won't interrupt the sleep. To ensure * we respond reasonably promptly when the postmaster signals us, * break down the sleep into 1-second increments, and check for * interrupts after each nap. */ udelay = ((n > 0) ? BgWriterDelay : 10000) * 1000L; while (udelay > 1000000L) { pg_usleep(1000000L); udelay -= 1000000L; if (InterruptPending) return; } pg_usleep(udelay); if (InterruptPending) return; } } /* * Do whatever is needed to prepare for commit at the bufmgr and smgr levels */ void BufmgrCommit(void) { /* Nothing to do in bufmgr anymore... */ smgrcommit(); } /* * BufferGetBlockNumber * Returns the block number associated with a buffer. * * Note: * Assumes that the buffer is valid and pinned, else the * value may be obsolete immediately... */ BlockNumber BufferGetBlockNumber(Buffer buffer) { Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) return LocalBufferDescriptors[-buffer - 1].tag.blockNum; else return BufferDescriptors[buffer - 1].tag.blockNum; } /* * BufferReplace * * Write out the buffer corresponding to 'bufHdr'. * * BufMgrLock must be held at entry, and the buffer must be pinned. */ static void BufferReplace(BufferDesc *bufHdr) { SMgrRelation reln; XLogRecPtr recptr; ErrorContextCallback errcontext; /* To check if block content changed while flushing. - vadim 01/17/97 */ bufHdr->flags &= ~BM_JUST_DIRTIED; LWLockRelease(BufMgrLock); /* Setup error traceback support for ereport() */ errcontext.callback = buffer_write_error_callback; errcontext.arg = bufHdr; errcontext.previous = error_context_stack; error_context_stack = &errcontext; /* * No need to lock buffer context - no one should be able to end * ReadBuffer */ recptr = BufferGetLSN(bufHdr); XLogFlush(recptr); /* Find smgr relation for buffer */ reln = smgropen(bufHdr->tag.rnode); /* And write... */ smgrwrite(reln, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); /* Pop the error context stack */ error_context_stack = errcontext.previous; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); BufferFlushCount++; } /* * RelationGetNumberOfBlocks * Determines the current number of pages in the relation. * Side effect: relation->rd_nblocks is updated. */ BlockNumber RelationGetNumberOfBlocks(Relation relation) { /* * relation->rd_nblocks should be accurate already if the relation is * new or temp, because no one else should be modifying it. Otherwise * we need to ask the smgr for the current physical file length. * * Don't call smgr on a view or a composite type, either. */ if (relation->rd_rel->relkind == RELKIND_VIEW || relation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) relation->rd_nblocks = 0; else if (!relation->rd_isnew && !relation->rd_istemp) { /* Open it at the smgr level if not already done */ if (relation->rd_smgr == NULL) relation->rd_smgr = smgropen(relation->rd_node); relation->rd_nblocks = smgrnblocks(relation->rd_smgr); } return relation->rd_nblocks; } /* * RelationUpdateNumberOfBlocks * Forcibly update relation->rd_nblocks. * * If the relcache drops an entry for a temp relation, it must call this * routine after recreating the relcache entry, so that rd_nblocks is * re-sync'd with reality. See RelationGetNumberOfBlocks. */ void RelationUpdateNumberOfBlocks(Relation relation) { if (relation->rd_rel->relkind == RELKIND_VIEW || relation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) relation->rd_nblocks = 0; else { /* Open it at the smgr level if not already done */ if (relation->rd_smgr == NULL) relation->rd_smgr = smgropen(relation->rd_node); relation->rd_nblocks = smgrnblocks(relation->rd_smgr); } } /* --------------------------------------------------------------------- * DropRelationBuffers * * This function removes all the buffered pages for a relation * from the buffer pool. Dirty pages are simply dropped, without * bothering to write them out first. This is NOT rollback-able, * and so should be used only with extreme caution! * * We assume that the caller holds an exclusive lock on the relation, * which should assure that no new buffers will be acquired for the rel * meanwhile. * -------------------------------------------------------------------- */ void DropRelationBuffers(Relation rel) { DropRelFileNodeBuffers(rel->rd_node, rel->rd_istemp); } /* --------------------------------------------------------------------- * DropRelFileNodeBuffers * * This is the same as DropRelationBuffers, except that the target * relation is specified by RelFileNode and temp status. * * This is NOT rollback-able. One legitimate use is to clear the * buffer cache of buffers for a relation that is being deleted * during transaction abort. * -------------------------------------------------------------------- */ void DropRelFileNodeBuffers(RelFileNode rnode, bool istemp) { int i; BufferDesc *bufHdr; if (istemp) { for (i = 0; i < NLocBuffer; i++) { bufHdr = &LocalBufferDescriptors[i]; if (RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); bufHdr->cntxDirty = false; LocalRefCount[i] = 0; bufHdr->tag.rnode.relNode = InvalidOid; } } return; } LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); for (i = 1; i <= NBuffers; i++) { bufHdr = &BufferDescriptors[i - 1]; recheck: if (RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { /* * If there is I/O in progress, better wait till it's done; * don't want to delete the relation out from under someone * who's just trying to flush the buffer! */ if (bufHdr->flags & BM_IO_IN_PROGRESS) { WaitIO(bufHdr); /* * By now, the buffer very possibly belongs to some other * rel, so check again before proceeding. */ goto recheck; } /* Now we can do what we came for */ bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); bufHdr->cntxDirty = false; /* * Release any refcount we may have. If someone else has a * pin on the buffer, we got trouble. */ if (bufHdr->refcount != 0) { /* the sole pin should be ours */ if (bufHdr->refcount != 1 || PrivateRefCount[i - 1] == 0) elog(FATAL, "block %u of %u/%u is still referenced (private %ld, global %d)", bufHdr->tag.blockNum, bufHdr->tag.rnode.tblNode, bufHdr->tag.rnode.relNode, PrivateRefCount[i - 1], bufHdr->refcount); /* Make sure it will be released */ PrivateRefCount[i - 1] = 1; UnpinBuffer(bufHdr); } /* * And mark the buffer as no longer occupied by this rel. */ StrategyInvalidateBuffer(bufHdr); } } LWLockRelease(BufMgrLock); } /* --------------------------------------------------------------------- * DropBuffers * * This function removes all the buffers in the buffer cache for a * particular database. Dirty pages are simply dropped, without * bothering to write them out first. This is used when we destroy a * database, to avoid trying to flush data to disk when the directory * tree no longer exists. Implementation is pretty similar to * DropRelationBuffers() which is for destroying just one relation. * -------------------------------------------------------------------- */ void DropBuffers(Oid dbid) { int i; BufferDesc *bufHdr; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); for (i = 1; i <= NBuffers; i++) { bufHdr = &BufferDescriptors[i - 1]; recheck: /* * We know that currently database OID is tblNode but this * probably will be changed in future and this func will be used * to drop tablespace buffers. */ if (bufHdr->tag.rnode.tblNode == dbid) { /* * If there is I/O in progress, better wait till it's done; * don't want to delete the database out from under someone * who's just trying to flush the buffer! */ if (bufHdr->flags & BM_IO_IN_PROGRESS) { WaitIO(bufHdr); /* * By now, the buffer very possibly belongs to some other * DB, so check again before proceeding. */ goto recheck; } /* Now we can do what we came for */ bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); bufHdr->cntxDirty = false; /* * The thing should be free, if caller has checked that no * backends are running in that database. */ Assert(bufHdr->refcount == 0); /* * And mark the buffer as no longer occupied by this page. */ StrategyInvalidateBuffer(bufHdr); } } LWLockRelease(BufMgrLock); } /* ----------------------------------------------------------------- * PrintBufferDescs * * this function prints all the buffer descriptors, for debugging * use only. * ----------------------------------------------------------------- */ #ifdef NOT_USED void PrintBufferDescs(void) { int i; BufferDesc *buf = BufferDescriptors; if (IsUnderPostmaster) { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); for (i = 0; i < NBuffers; ++i, ++buf) { elog(LOG, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \ blockNum=%u, flags=0x%x, refcount=%d %ld)", i, buf->freeNext, buf->freePrev, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } LWLockRelease(BufMgrLock); } else { /* interactive backend */ for (i = 0; i < NBuffers; ++i, ++buf) { printf("[%-2d] (%u/%u, %u) flags=0x%x, refcnt=%d %ld)\n", i, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } } } #endif #ifdef NOT_USED void PrintPinnedBufs(void) { int i; BufferDesc *buf = BufferDescriptors; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); for (i = 0; i < NBuffers; ++i, ++buf) { if (PrivateRefCount[i] > 0) elog(WARNING, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \ blockNum=%u, flags=0x%x, refcount=%d %ld)", i, buf->freeNext, buf->freePrev, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } LWLockRelease(BufMgrLock); } #endif /* --------------------------------------------------------------------- * FlushRelationBuffers * * This function writes all dirty pages of a relation out to disk. * Furthermore, pages that have blocknumber >= firstDelBlock are * actually removed from the buffer pool. An error code is returned * if we fail to dump a dirty buffer or if we find one of * the target pages is pinned into the cache. * * This is called by DROP TABLE to clear buffers for the relation * from the buffer pool. Note that we must write dirty buffers, * rather than just dropping the changes, because our transaction * might abort later on; we want to roll back safely in that case. * * This is also called by VACUUM before truncating the relation to the * given number of blocks. It might seem unnecessary for VACUUM to * write dirty pages before firstDelBlock, since VACUUM should already * have committed its changes. However, it is possible for there still * to be dirty pages: if some page had unwritten on-row tuple status * updates from a prior transaction, and VACUUM had no additional * changes to make to that page, then VACUUM won't have written it. * This is harmless in most cases but will break pg_upgrade, which * relies on VACUUM to ensure that *all* tuples have correct on-row * status. So, we check and flush all dirty pages of the rel * regardless of block number. * * In all cases, the caller should be holding AccessExclusiveLock on * the target relation to ensure that no other backend is busy reading * more blocks of the relation (or might do so before we commit). * * Formerly, we considered it an error condition if we found dirty * buffers here. However, since BufferSync no longer forces out all * dirty buffers at every xact commit, it's possible for dirty buffers * to still be present in the cache due to failure of an earlier * transaction. So, must flush dirty buffers without complaint. * * Returns: 0 - Ok, -1 - FAILED TO WRITE DIRTY BUFFER, -2 - PINNED * * XXX currently it sequentially searches the buffer pool, should be * changed to more clever ways of searching. * -------------------------------------------------------------------- */ int FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock) { int i; BufferDesc *bufHdr; XLogRecPtr recptr; ErrorContextCallback errcontext; /* Setup error traceback support for ereport() */ errcontext.callback = buffer_write_error_callback; errcontext.arg = NULL; errcontext.previous = error_context_stack; error_context_stack = &errcontext; if (rel->rd_istemp) { for (i = 0; i < NLocBuffer; i++) { bufHdr = &LocalBufferDescriptors[i]; errcontext.arg = bufHdr; if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node)) { if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty) { /* Open it at the smgr level if not already done */ if (rel->rd_smgr == NULL) rel->rd_smgr = smgropen(rel->rd_node); smgrwrite(rel->rd_smgr, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); bufHdr->cntxDirty = false; } if (LocalRefCount[i] > 0) { error_context_stack = errcontext.previous; elog(WARNING, "FlushRelationBuffers(\"%s\" (local), %u): block %u is referenced (%ld)", RelationGetRelationName(rel), firstDelBlock, bufHdr->tag.blockNum, LocalRefCount[i]); return (-2); } if (bufHdr->tag.blockNum >= firstDelBlock) bufHdr->tag.rnode.relNode = InvalidOid; } } /* Pop the error context stack */ error_context_stack = errcontext.previous; return 0; } LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); for (i = 0; i < NBuffers; i++) { bufHdr = &BufferDescriptors[i]; errcontext.arg = bufHdr; if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node)) { if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty) { PinBuffer(bufHdr); if (bufHdr->flags & BM_IO_IN_PROGRESS) WaitIO(bufHdr); LWLockRelease(BufMgrLock); /* * Force XLOG flush for buffer' LSN */ recptr = BufferGetLSN(bufHdr); XLogFlush(recptr); /* * Now it's safe to write buffer to disk */ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); if (bufHdr->flags & BM_IO_IN_PROGRESS) WaitIO(bufHdr); if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty) { bufHdr->flags &= ~BM_JUST_DIRTIED; StartBufferIO(bufHdr, false); /* output IO start */ LWLockRelease(BufMgrLock); /* Open it at the smgr level if not already done */ if (rel->rd_smgr == NULL) rel->rd_smgr = smgropen(rel->rd_node); smgrwrite(rel->rd_smgr, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); BufferFlushCount++; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); bufHdr->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(bufHdr); Assert(!(bufHdr->flags & BM_JUST_DIRTIED)); bufHdr->flags &= ~BM_DIRTY; /* * Note that it's safe to change cntxDirty here * because of we protect it from upper writers by * AccessExclusiveLock and from other bufmgr routines * by BM_IO_IN_PROGRESS */ bufHdr->cntxDirty = false; } UnpinBuffer(bufHdr); } if (bufHdr->refcount != 0) { LWLockRelease(BufMgrLock); error_context_stack = errcontext.previous; elog(WARNING, "FlushRelationBuffers(\"%s\", %u): block %u is referenced (private %ld, global %d)", RelationGetRelationName(rel), firstDelBlock, bufHdr->tag.blockNum, PrivateRefCount[i], bufHdr->refcount); return -2; } if (bufHdr->tag.blockNum >= firstDelBlock) StrategyInvalidateBuffer(bufHdr); } } LWLockRelease(BufMgrLock); /* Pop the error context stack */ error_context_stack = errcontext.previous; return 0; } /* * ReleaseBuffer -- remove the pin on a buffer without * marking it dirty. */ int ReleaseBuffer(Buffer buffer) { BufferDesc *bufHdr; if (BufferIsLocal(buffer)) { Assert(LocalRefCount[-buffer - 1] > 0); LocalRefCount[-buffer - 1]--; return STATUS_OK; } if (BAD_BUFFER_ID(buffer)) return STATUS_ERROR; bufHdr = &BufferDescriptors[buffer - 1]; Assert(PrivateRefCount[buffer - 1] > 0); if (PrivateRefCount[buffer - 1] > 1) PrivateRefCount[buffer - 1]--; else { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); UnpinBuffer(bufHdr); LWLockRelease(BufMgrLock); } return STATUS_OK; } #ifdef NOT_USED void IncrBufferRefCount_Debug(char *file, int line, Buffer buffer) { IncrBufferRefCount(buffer); if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer)) { BufferDesc *buf = &BufferDescriptors[buffer - 1]; fprintf(stderr, "PIN(Incr) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", buffer, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } } #endif #ifdef NOT_USED void ReleaseBuffer_Debug(char *file, int line, Buffer buffer) { ReleaseBuffer(buffer); if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer)) { BufferDesc *buf = &BufferDescriptors[buffer - 1]; fprintf(stderr, "UNPIN(Rel) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", buffer, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } } #endif #ifdef NOT_USED Buffer ReleaseAndReadBuffer_Debug(char *file, int line, Buffer buffer, Relation relation, BlockNumber blockNum) { bool bufferValid; Buffer b; bufferValid = BufferIsValid(buffer); b = ReleaseAndReadBuffer(buffer, relation, blockNum); if (ShowPinTrace && bufferValid && BufferIsLocal(buffer) && is_userbuffer(buffer)) { BufferDesc *buf = &BufferDescriptors[buffer - 1]; fprintf(stderr, "UNPIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", buffer, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer)) { BufferDesc *buf = &BufferDescriptors[b - 1]; fprintf(stderr, "PIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, \ refcount = %ld, file: %s, line: %d\n", b, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[b - 1], file, line); } return b; } #endif /* * SetBufferCommitInfoNeedsSave * * Mark a buffer dirty when we have updated tuple commit-status bits in it. * * This is similar to WriteNoReleaseBuffer, except that we have not made a * critical change that has to be flushed to disk before xact commit --- the * status-bit update could be redone by someone else just as easily. * * This routine might get called many times on the same page, if we are making * the first scan after commit of an xact that added/deleted many tuples. * So, be as quick as we can if the buffer is already dirty. */ void SetBufferCommitInfoNeedsSave(Buffer buffer) { BufferDesc *bufHdr; if (BufferIsLocal(buffer)) { WriteLocalBuffer(buffer, false); return; } if (BAD_BUFFER_ID(buffer)) elog(ERROR, "bad buffer id: %d", buffer); bufHdr = &BufferDescriptors[buffer - 1]; if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(bufHdr->refcount > 0); bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); LWLockRelease(BufMgrLock); } } /* * Release buffer context locks for shared buffers. * * Used to clean up after errors. */ void UnlockBuffers(void) { BufferDesc *buf; int i; for (i = 0; i < NBuffers; i++) { bits8 buflocks = BufferLocks[i]; if (buflocks == 0) continue; Assert(BufferIsValid(i + 1)); buf = &(BufferDescriptors[i]); HOLD_INTERRUPTS(); /* don't want to die() partway through... */ /* * The buffer's cntx_lock has already been released by lwlock.c. */ if (buflocks & BL_PIN_COUNT_LOCK) { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); /* * Don't complain if flag bit not set; it could have been * reset but we got a cancel/die interrupt before getting the * signal. */ if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_id == MyBackendId) buf->flags &= ~BM_PIN_COUNT_WAITER; LWLockRelease(BufMgrLock); ProcCancelWaitForSignal(); } BufferLocks[i] = 0; RESUME_INTERRUPTS(); } } /* * Acquire or release the cntx_lock for the buffer. */ void LockBuffer(Buffer buffer, int mode) { BufferDesc *buf; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) return; buf = &(BufferDescriptors[buffer - 1]); if (mode == BUFFER_LOCK_UNLOCK) LWLockRelease(buf->cntx_lock); else if (mode == BUFFER_LOCK_SHARE) LWLockAcquire(buf->cntx_lock, LW_SHARED); else if (mode == BUFFER_LOCK_EXCLUSIVE) { LWLockAcquire(buf->cntx_lock, LW_EXCLUSIVE); /* * This is not the best place to set cntxDirty flag (eg indices do * not always change buffer they lock in excl mode). But please * remember that it's critical to set cntxDirty *before* logging * changes with XLogInsert() - see comments in BufferSync(). */ buf->cntxDirty = true; } else elog(ERROR, "unrecognized buffer lock mode: %d", mode); } /* * Acquire the cntx_lock for the buffer, but only if we don't have to wait. * * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode. */ bool ConditionalLockBuffer(Buffer buffer) { BufferDesc *buf; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) return true; /* act as though we got it */ buf = &(BufferDescriptors[buffer - 1]); if (LWLockConditionalAcquire(buf->cntx_lock, LW_EXCLUSIVE)) { /* * This is not the best place to set cntxDirty flag (eg indices do * not always change buffer they lock in excl mode). But please * remember that it's critical to set cntxDirty *before* logging * changes with XLogInsert() - see comments in BufferSync(). */ buf->cntxDirty = true; return true; } return false; } /* * LockBufferForCleanup - lock a buffer in preparation for deleting items * * Items may be deleted from a disk page only when the caller (a) holds an * exclusive lock on the buffer and (b) has observed that no other backend * holds a pin on the buffer. If there is a pin, then the other backend * might have a pointer into the buffer (for example, a heapscan reference * to an item --- see README for more details). It's OK if a pin is added * after the cleanup starts, however; the newly-arrived backend will be * unable to look at the page until we release the exclusive lock. * * To implement this protocol, a would-be deleter must pin the buffer and * then call LockBufferForCleanup(). LockBufferForCleanup() is similar to * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until * it has successfully observed pin count = 1. */ void LockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; bits8 *buflock; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { /* There should be exactly one pin */ if (LocalRefCount[-buffer - 1] != 1) elog(ERROR, "incorrect local pin count: %ld", LocalRefCount[-buffer - 1]); /* Nobody else to wait for */ return; } /* There should be exactly one local pin */ if (PrivateRefCount[buffer - 1] != 1) elog(ERROR, "incorrect local pin count: %ld", PrivateRefCount[buffer - 1]); bufHdr = &BufferDescriptors[buffer - 1]; buflock = &(BufferLocks[buffer - 1]); for (;;) { /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(bufHdr->refcount > 0); if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ LWLockRelease(BufMgrLock); return; } /* Failed, so mark myself as waiting for pincount 1 */ if (bufHdr->flags & BM_PIN_COUNT_WAITER) { LWLockRelease(BufMgrLock); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_id = MyBackendId; bufHdr->flags |= BM_PIN_COUNT_WAITER; *buflock |= BL_PIN_COUNT_LOCK; LWLockRelease(BufMgrLock); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Wait to be signaled by UnpinBuffer() */ ProcWaitForSignal(); *buflock &= ~BL_PIN_COUNT_LOCK; /* Loop back and try again */ } } /* * Functions for IO error handling * * Note : We assume that nested buffer IO never occur. * i.e at most one io_in_progress lock is held per proc. */ static BufferDesc *InProgressBuf = NULL; static bool IsForInput; /* * Function:StartBufferIO * (Assumptions) * My process is executing no IO * BufMgrLock is held * BM_IO_IN_PROGRESS mask is not set for the buffer * The buffer is Pinned * * Because BufMgrLock is held, we are already in an interrupt holdoff here, * and do not need another. */ static void StartBufferIO(BufferDesc *buf, bool forInput) { Assert(!InProgressBuf); Assert(!(buf->flags & BM_IO_IN_PROGRESS)); buf->flags |= BM_IO_IN_PROGRESS; LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); InProgressBuf = buf; IsForInput = forInput; } /* * Function:TerminateBufferIO * (Assumptions) * My process is executing IO for the buffer * BufMgrLock is held * The buffer is Pinned * * Because BufMgrLock is held, we are already in an interrupt holdoff here, * and do not need another. */ static void TerminateBufferIO(BufferDesc *buf) { Assert(buf == InProgressBuf); LWLockRelease(buf->io_in_progress_lock); InProgressBuf = NULL; } /* * Function:ContinueBufferIO * (Assumptions) * My process is executing IO for the buffer * BufMgrLock is held * The buffer is Pinned * * Because BufMgrLock is held, we are already in an interrupt holdoff here, * and do not need another. */ static void ContinueBufferIO(BufferDesc *buf, bool forInput) { Assert(buf == InProgressBuf); Assert(buf->flags & BM_IO_IN_PROGRESS); IsForInput = forInput; } #ifdef NOT_USED void InitBufferIO(void) { InProgressBuf = NULL; } #endif /* * Clean up any active buffer I/O after an error. * BufMgrLock isn't held when this function is called. * * If I/O was in progress, we always set BM_IO_ERROR. */ void AbortBufferIO(void) { BufferDesc *buf = InProgressBuf; if (buf) { /* * Since LWLockReleaseAll has already been called, we're not * holding the buffer's io_in_progress_lock. We have to re-acquire * it so that we can use TerminateBufferIO. Anyone who's executing * WaitIO on the buffer will be in a busy spin until we succeed in * doing this. */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(buf->flags & BM_IO_IN_PROGRESS); if (IsForInput) { Assert(!(buf->flags & BM_DIRTY) && !(buf->cntxDirty)); /* Don't think that buffer is valid */ StrategyInvalidateBuffer(buf); } else { Assert(buf->flags & BM_DIRTY || buf->cntxDirty); /* Issue notice if this is not the first failure... */ if (buf->flags & BM_IO_ERROR) { ereport(WARNING, (errcode(ERRCODE_IO_ERROR), errmsg("could not write block %u of %u/%u", buf->tag.blockNum, buf->tag.rnode.tblNode, buf->tag.rnode.relNode), errdetail("Multiple failures --- write error may be permanent."))); } buf->flags |= BM_DIRTY; } buf->flags |= BM_IO_ERROR; buf->flags &= ~BM_IO_IN_PROGRESS; TerminateBufferIO(buf); LWLockRelease(BufMgrLock); } } RelFileNode BufferGetFileNode(Buffer buffer) { BufferDesc *bufHdr; if (BufferIsLocal(buffer)) bufHdr = &(LocalBufferDescriptors[-buffer - 1]); else bufHdr = &BufferDescriptors[buffer - 1]; return (bufHdr->tag.rnode); } /* * Error context callback for errors occurring during buffer writes. */ static void buffer_write_error_callback(void *arg) { BufferDesc *bufHdr = (BufferDesc *) arg; if (bufHdr != NULL) errcontext("writing block %u of relation %u/%u", bufHdr->tag.blockNum, bufHdr->tag.rnode.tblNode, bufHdr->tag.rnode.relNode); }