postgresql/src/backend/access/gist/gistvacuum.c
2017-01-03 13:48:53 -05:00

276 lines
6.7 KiB
C

/*-------------------------------------------------------------------------
*
* gistvacuum.c
* vacuuming routines for the postgres GiST index access method.
*
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gist/gistvacuum.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
/*
* VACUUM cleanup: update FSM
*/
IndexBulkDeleteResult *
gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
Relation rel = info->index;
BlockNumber npages,
blkno;
BlockNumber totFreePages;
bool needLock;
/* No-op in ANALYZE ONLY mode */
if (info->analyze_only)
return stats;
/* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL)
{
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* use heap's tuple count */
stats->num_index_tuples = info->num_heap_tuples;
stats->estimated_count = info->estimated_count;
/*
* XXX the above is wrong if index is partial. Would it be OK to just
* return NULL, or is there work we must do below?
*/
}
/*
* Need lock unless it's local to this backend.
*/
needLock = !RELATION_IS_LOCAL(rel);
/* try to find deleted pages */
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
npages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
totFreePages = 0;
for (blkno = GIST_ROOT_BLKNO + 1; blkno < npages; blkno++)
{
Buffer buffer;
Page page;
vacuum_delay_point();
buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
info->strategy);
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
if (PageIsNew(page) || GistPageIsDeleted(page))
{
totFreePages++;
RecordFreeIndexPage(rel, blkno);
}
UnlockReleaseBuffer(buffer);
}
/* Finally, vacuum the FSM */
IndexFreeSpaceMapVacuum(info->index);
/* return statistics */
stats->pages_free = totFreePages;
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
stats->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
return stats;
}
typedef struct GistBDItem
{
GistNSN parentlsn;
BlockNumber blkno;
struct GistBDItem *next;
} GistBDItem;
static void
pushStackIfSplited(Page page, GistBDItem *stack)
{
GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
(GistFollowRight(page) || stack->parentlsn < GistPageGetNSN(page)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{
/* split page detected, install right link to the stack */
GistBDItem *ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
ptr->blkno = opaque->rightlink;
ptr->parentlsn = stack->parentlsn;
ptr->next = stack->next;
stack->next = ptr;
}
}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples and
* check invalid tuples left after upgrade.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
IndexBulkDeleteResult *
gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
Relation rel = info->index;
GistBDItem *stack,
*ptr;
/* first time through? */
if (stats == NULL)
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* we'll re-count the tuples each time */
stats->estimated_count = false;
stats->num_index_tuples = 0;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
while (stack)
{
Buffer buffer;
Page page;
OffsetNumber i,
maxoff;
IndexTuple idxtuple;
ItemId iid;
buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno,
RBM_NORMAL, info->strategy);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(rel, buffer);
page = (Page) BufferGetPage(buffer);
if (GistPageIsLeaf(page))
{
OffsetNumber todelete[MaxOffsetNumber];
int ntodelete = 0;
LockBuffer(buffer, GIST_UNLOCK);
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page))
{
/* only the root can become non-leaf during relock */
UnlockReleaseBuffer(buffer);
/* one more check */
continue;
}
/*
* check for split proceeded after look at parent, we should check
* it after relock
*/
pushStackIfSplited(page, stack);
/*
* Remove deletable tuples from page
*/
maxoff = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
if (callback(&(idxtuple->t_tid), callback_state))
todelete[ntodelete++] = i;
else
stats->num_index_tuples += 1;
}
stats->tuples_removed += ntodelete;
if (ntodelete)
{
START_CRIT_SECTION();
MarkBufferDirty(buffer);
PageIndexMultiDelete(page, todelete, ntodelete);
GistMarkTuplesDeleted(page);
if (RelationNeedsWAL(rel))
{
XLogRecPtr recptr;
recptr = gistXLogUpdate(buffer,
todelete, ntodelete,
NULL, 0, InvalidBuffer);
PageSetLSN(page, recptr);
}
else
PageSetLSN(page, gistGetFakeLSN(rel));
END_CRIT_SECTION();
}
}
else
{
/* check for split proceeded after look at parent */
pushStackIfSplited(page, stack);
maxoff = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
ptr->parentlsn = PageGetLSN(page);
ptr->next = stack->next;
stack->next = ptr;
if (GistTupleIsInvalid(idxtuple))
ereport(LOG,
(errmsg("index \"%s\" contains an inner tuple marked as invalid",
RelationGetRelationName(rel)),
errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
errhint("Please REINDEX it.")));
}
}
UnlockReleaseBuffer(buffer);
ptr = stack->next;
pfree(stack);
stack = ptr;
vacuum_delay_point();
}
return stats;
}