/*------------------------------------------------------------------------- * * vacuum.c * the postgres vacuum cleaner * * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.203 2001/07/12 04:11:13 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include #include #include #include "access/genam.h" #include "access/heapam.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "catalog/catname.h" #include "catalog/index.h" #include "catalog/pg_index.h" #include "commands/vacuum.h" #include "executor/executor.h" #include "miscadmin.h" #include "nodes/execnodes.h" #include "storage/freespace.h" #include "storage/sinval.h" #include "storage/smgr.h" #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/relcache.h" #include "utils/syscache.h" #include "utils/temprel.h" #include "pgstat.h" typedef struct VRelListData { Oid vrl_relid; struct VRelListData *vrl_next; } VRelListData; typedef VRelListData *VRelList; typedef struct VacPageData { BlockNumber blkno; /* BlockNumber of this Page */ Size free; /* FreeSpace on this Page */ uint16 offsets_used; /* Number of OffNums used by vacuum */ uint16 offsets_free; /* Number of OffNums free or to be free */ OffsetNumber offsets[1]; /* Array of free OffNums */ } VacPageData; typedef VacPageData *VacPage; typedef struct VacPageListData { BlockNumber empty_end_pages; /* Number of "empty" end-pages */ int num_pages; /* Number of pages in pagedesc */ int num_allocated_pages; /* Number of allocated pages in * pagedesc */ VacPage *pagedesc; /* Descriptions of pages */ } VacPageListData; typedef VacPageListData *VacPageList; typedef struct VTupleLinkData { ItemPointerData new_tid; ItemPointerData this_tid; } VTupleLinkData; typedef VTupleLinkData *VTupleLink; typedef struct VTupleMoveData { ItemPointerData tid; /* tuple ID */ VacPage vacpage; /* where to move */ bool cleanVpd; /* clean vacpage before using */ } VTupleMoveData; typedef VTupleMoveData *VTupleMove; typedef struct VRelStats { BlockNumber rel_pages; double rel_tuples; Size min_tlen; Size max_tlen; bool hasindex; int num_vtlinks; VTupleLink vtlinks; } VRelStats; static MemoryContext vac_context = NULL; static int MESSAGE_LEVEL; /* message level */ static TransactionId XmaxRecent; /* non-export function prototypes */ static void vacuum_init(void); static void vacuum_shutdown(void); static VRelList getrels(Name VacRelP, const char *stmttype); static void vacuum_rel(Oid relid, VacuumStmt *vacstmt); static void full_vacuum_rel(Relation onerel); static void scan_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages); static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindices, Relation *Irel); static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacpagelist); static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage); static void vacuum_index(VacPageList vacpagelist, Relation indrel, double num_tuples, int keep_tuples); static void scan_index(Relation indrel, double num_tuples); static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist); static void vac_update_fsm(Relation onerel, VacPageList fraged_pages, BlockNumber rel_pages); static VacPage copy_vac_page(VacPage vacpage); static void vpage_insert(VacPageList vacpagelist, VacPage vpnew); static void get_indices(Relation relation, int *nindices, Relation **Irel); static void close_indices(int nindices, Relation *Irel); static bool is_partial_index(Relation indrel); static void *vac_bsearch(const void *key, const void *base, size_t nelem, size_t size, int (*compar) (const void *, const void *)); static int vac_cmp_blk(const void *left, const void *right); static int vac_cmp_offno(const void *left, const void *right); static int vac_cmp_vtlinks(const void *left, const void *right); static bool enough_space(VacPage vacpage, Size len); /**************************************************************************** * * * Code common to all flavors of VACUUM and ANALYZE * * * **************************************************************************** */ /* * Primary entry point for VACUUM and ANALYZE commands. */ void vacuum(VacuumStmt *vacstmt) { const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE"; NameData VacRel; Name VacRelName; VRelList vrl, cur; /* * We cannot run VACUUM inside a user transaction block; if we were * inside a transaction, then our commit- and * start-transaction-command calls would not have the intended effect! * Furthermore, the forced commit that occurs before truncating the * relation's file would have the effect of committing the rest of the * user's transaction too, which would certainly not be the desired * behavior. */ if (IsTransactionBlock()) elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype); /* * Send info about dead objects to the statistics collector */ pgstat_vacuum_tabstat(); if (vacstmt->verbose) MESSAGE_LEVEL = NOTICE; else MESSAGE_LEVEL = DEBUG; /* * Create special memory context for cross-transaction storage. * * Since it is a child of QueryContext, it will go away eventually even * if we suffer an error; there's no need for special abort cleanup * logic. */ vac_context = AllocSetContextCreate(QueryContext, "Vacuum", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); /* Convert vacrel, which is just a string, to a Name */ if (vacstmt->vacrel) { namestrcpy(&VacRel, vacstmt->vacrel); VacRelName = &VacRel; } else VacRelName = NULL; /* Build list of relations to process (note this lives in vac_context) */ vrl = getrels(VacRelName, stmttype); /* * Start up the vacuum cleaner. */ vacuum_init(); /* * Process each selected relation. We are careful to process * each relation in a separate transaction in order to avoid holding * too many locks at one time. Also, if we are doing VACUUM ANALYZE, * the ANALYZE part runs as a separate transaction from the VACUUM * to further reduce locking. */ for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next) { if (vacstmt->vacuum) vacuum_rel(cur->vrl_relid, vacstmt); if (vacstmt->analyze) analyze_rel(cur->vrl_relid, vacstmt); } /* * If we did a complete vacuum, then flush the init file that relcache.c * uses to save startup time. The next backend startup will rebuild the * init file with up-to-date information from pg_class. This lets the * optimizer see the stats that we've collected for certain critical * system indexes. See relcache.c for more details. * * Ignore any failure to unlink the file, since it might not be there if * no backend has been started since the last vacuum. */ if (vacstmt->vacrel == NULL) unlink(RELCACHE_INIT_FILENAME); /* clean up */ vacuum_shutdown(); } /* * vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner. * * Formerly, there was code here to prevent more than one VACUUM from * executing concurrently in the same database. However, there's no * good reason to prevent that, and manually removing lockfiles after * a vacuum crash was a pain for dbadmins. So, forget about lockfiles, * and just rely on the locks we grab on each target table * to ensure that there aren't two VACUUMs running on the same table * at the same time. * * The strangeness with committing and starting transactions in the * init and shutdown routines is due to the fact that the vacuum cleaner * is invoked via an SQL command, and so is already executing inside * a transaction. We need to leave ourselves in a predictable state * on entry and exit to the vacuum cleaner. We commit the transaction * started in PostgresMain() inside vacuum_init(), and start one in * vacuum_shutdown() to match the commit waiting for us back in * PostgresMain(). */ static void vacuum_init(void) { /* matches the StartTransaction in PostgresMain() */ CommitTransactionCommand(); } static void vacuum_shutdown(void) { /* on entry, we are not in a transaction */ /* matches the CommitTransaction in PostgresMain() */ StartTransactionCommand(); /* * Clean up working storage --- note we must do this after * StartTransactionCommand, else we might be trying to delete the * active context! */ MemoryContextDelete(vac_context); vac_context = NULL; } /* * Build a list of VRelListData nodes for each relation to be processed * * The list is built in vac_context so that it will survive across our * per-relation transactions. */ static VRelList getrels(Name VacRelP, const char *stmttype) { Relation rel; TupleDesc tupdesc; HeapScanDesc scan; HeapTuple tuple; VRelList vrl, cur; Datum d; char *rname; char rkind; bool n; ScanKeyData key; if (VacRelP) { /* * we could use the cache here, but it is clearer to use scankeys * for both vacuum cases, bjm 2000/01/19 */ char *nontemp_relname; /* We must re-map temp table names bjm 2000-04-06 */ nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP)); if (nontemp_relname == NULL) nontemp_relname = NameStr(*VacRelP); ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname, F_NAMEEQ, PointerGetDatum(nontemp_relname)); } else { /* find all plain relations listed in pg_class */ ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind, F_CHAREQ, CharGetDatum(RELKIND_RELATION)); } vrl = cur = (VRelList) NULL; rel = heap_openr(RelationRelationName, AccessShareLock); tupdesc = RelationGetDescr(rel); scan = heap_beginscan(rel, false, SnapshotNow, 1, &key); while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) { d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n); rname = (char *) DatumGetName(d); d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n); rkind = DatumGetChar(d); if (rkind != RELKIND_RELATION) { elog(NOTICE, "%s: can not process indexes, views or special system tables", stmttype); continue; } /* Make a relation list entry for this guy */ if (vrl == (VRelList) NULL) vrl = cur = (VRelList) MemoryContextAlloc(vac_context, sizeof(VRelListData)); else { cur->vrl_next = (VRelList) MemoryContextAlloc(vac_context, sizeof(VRelListData)); cur = cur->vrl_next; } cur->vrl_relid = tuple->t_data->t_oid; cur->vrl_next = (VRelList) NULL; } heap_endscan(scan); heap_close(rel, AccessShareLock); if (vrl == NULL) elog(NOTICE, "%s: table not found", stmttype); return vrl; } /* * vac_update_relstats() -- update statistics for one relation * * Update the whole-relation statistics that are kept in its pg_class * row. There are additional stats that will be updated if we are * doing ANALYZE, but we always update these stats. This routine works * for both index and heap relation entries in pg_class. * * We violate no-overwrite semantics here by storing new values for the * statistics columns directly into the pg_class tuple that's already on * the page. The reason for this is that if we updated these tuples in * the usual way, vacuuming pg_class itself wouldn't work very well --- * by the time we got done with a vacuum cycle, most of the tuples in * pg_class would've been obsoleted. Of course, this only works for * fixed-size never-null columns, but these are. * * This routine is shared by full VACUUM, lazy VACUUM, and stand-alone * ANALYZE. */ void vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples, bool hasindex) { Relation rd; HeapTupleData rtup; HeapTuple ctup; Form_pg_class pgcform; Buffer buffer; /* * update number of tuples and number of pages in pg_class */ rd = heap_openr(RelationRelationName, RowExclusiveLock); ctup = SearchSysCache(RELOID, ObjectIdGetDatum(relid), 0, 0, 0); if (!HeapTupleIsValid(ctup)) elog(ERROR, "pg_class entry for relid %u vanished during vacuuming", relid); /* get the buffer cache tuple */ rtup.t_self = ctup->t_self; ReleaseSysCache(ctup); heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL); /* overwrite the existing statistics in the tuple */ pgcform = (Form_pg_class) GETSTRUCT(&rtup); pgcform->relpages = (int32) num_pages; pgcform->reltuples = num_tuples; pgcform->relhasindex = hasindex; /* invalidate the tuple in the cache and write the buffer */ RelationInvalidateHeapTuple(rd, &rtup); WriteBuffer(buffer); heap_close(rd, RowExclusiveLock); } /**************************************************************************** * * * Code common to both flavors of VACUUM * * * **************************************************************************** */ /* XXX Temporary placeholder */ static void lazy_vacuum_rel(Relation onerel) { full_vacuum_rel(onerel); } /* * vacuum_rel() -- vacuum one heap relation * * Doing one heap at a time incurs extra overhead, since we need to * check that the heap exists again just before we vacuum it. The * reason that we do this is so that vacuuming can be spread across * many small transactions. Otherwise, two-phase locking would require * us to lock the entire database during one pass of the vacuum cleaner. * * At entry and exit, we are not inside a transaction. */ static void vacuum_rel(Oid relid, VacuumStmt *vacstmt) { LOCKMODE lmode; Relation onerel; LockRelId onerelid; Oid toast_relid; /* Begin a transaction for vacuuming this relation */ StartTransactionCommand(); /* * Check for user-requested abort. Note we want this to be inside a * transaction, so xact.c doesn't issue useless NOTICE. */ CHECK_FOR_INTERRUPTS(); /* * Race condition -- if the pg_class tuple has gone away since the * last time we saw it, we don't need to vacuum it. */ if (!SearchSysCacheExists(RELOID, ObjectIdGetDatum(relid), 0, 0, 0)) { CommitTransactionCommand(); return; } /* * Determine the type of lock we want --- hard exclusive lock for a * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent * vacuum. Either way, we can be sure that no other backend is vacuuming * the same table. */ lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock; /* * Open the class, get an appropriate lock on it, and check permissions. * * We allow the user to vacuum a table if he is superuser, the table * owner, or the database owner (but in the latter case, only if it's * not a shared relation). pg_ownercheck includes the superuser case. * * Note we choose to treat permissions failure as a NOTICE and keep * trying to vacuum the rest of the DB --- is this appropriate? */ onerel = heap_open(relid, lmode); if (! (pg_ownercheck(GetUserId(), RelationGetRelationName(onerel), RELNAME) || (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared))) { elog(NOTICE, "Skipping \"%s\" --- only table or database owner can VACUUM it", RelationGetRelationName(onerel)); heap_close(onerel, lmode); CommitTransactionCommand(); return; } /* * Get a session-level lock too. This will protect our access to the * relation across multiple transactions, so that we can vacuum the * relation's TOAST table (if any) secure in the knowledge that no one * is deleting the parent relation. * * NOTE: this cannot block, even if someone else is waiting for access, * because the lock manager knows that both lock requests are from the * same process. */ onerelid = onerel->rd_lockInfo.lockRelId; LockRelationForSession(&onerelid, lmode); /* * Remember the relation's TOAST relation for later */ toast_relid = onerel->rd_rel->reltoastrelid; /* * Do the actual work --- either FULL or "lazy" vacuum */ if (vacstmt->full) full_vacuum_rel(onerel); else lazy_vacuum_rel(onerel); /* all done with this class, but hold lock until commit */ heap_close(onerel, NoLock); /* * Complete the transaction and free all temporary memory used. */ CommitTransactionCommand(); /* * If the relation has a secondary toast rel, vacuum that too while we * still hold the session lock on the master table. Note however that * "analyze" will not get done on the toast table. This is good, * because the toaster always uses hardcoded index access and statistics * are totally unimportant for toast relations. */ if (toast_relid != InvalidOid) vacuum_rel(toast_relid, vacstmt); /* * Now release the session-level lock on the master table. */ UnlockRelationForSession(&onerelid, lmode); } /**************************************************************************** * * * Code for VACUUM FULL (only) * * * **************************************************************************** */ /* * full_vacuum_rel() -- perform FULL VACUUM for one heap relation * * This routine vacuums a single heap, cleans out its indices, and * updates its num_pages and num_tuples statistics. * * At entry, we have already established a transaction and opened * and locked the relation. */ static void full_vacuum_rel(Relation onerel) { VacPageListData vacuum_pages; /* List of pages to vacuum and/or * clean indices */ VacPageListData fraged_pages; /* List of pages with space enough * for re-using */ Relation *Irel; int32 nindices, i; VRelStats *vacrelstats; bool reindex = false; if (IsIgnoringSystemIndexes() && IsSystemRelationName(RelationGetRelationName(onerel))) reindex = true; GetXmaxRecent(&XmaxRecent); /* * Set up statistics-gathering machinery. */ vacrelstats = (VRelStats *) palloc(sizeof(VRelStats)); vacrelstats->rel_pages = 0; vacrelstats->rel_tuples = 0; vacrelstats->hasindex = false; /* scan the heap */ vacuum_pages.num_pages = fraged_pages.num_pages = 0; scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages); /* Now open all indices of the relation */ nindices = 0; Irel = (Relation *) NULL; get_indices(onerel, &nindices, &Irel); if (!Irel) reindex = false; else if (!RelationGetForm(onerel)->relhasindex) reindex = true; if (nindices > 0) vacrelstats->hasindex = true; #ifdef NOT_USED /* * reindex in VACUUM is dangerous under WAL. ifdef out until it * becomes safe. */ if (reindex) { close_indices(nindices, Irel); Irel = (Relation *) NULL; activate_indexes_of_a_table(RelationGetRelid(onerel), false); } #endif /* NOT_USED */ /* Clean/scan index relation(s) */ if (Irel != (Relation *) NULL) { if (vacuum_pages.num_pages > 0) { for (i = 0; i < nindices; i++) vacuum_index(&vacuum_pages, Irel[i], vacrelstats->rel_tuples, 0); } else { /* just scan indices to update statistic */ for (i = 0; i < nindices; i++) scan_index(Irel[i], vacrelstats->rel_tuples); } } if (fraged_pages.num_pages > 0) { /* Try to shrink heap */ repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages, nindices, Irel); close_indices(nindices, Irel); } else { close_indices(nindices, Irel); if (vacuum_pages.num_pages > 0) { /* Clean pages from vacuum_pages list */ vacuum_heap(vacrelstats, onerel, &vacuum_pages); } else { /* * Flush dirty pages out to disk. We must do this even if we * didn't do anything else, because we want to ensure that all * tuples have correct on-row commit status on disk (see * bufmgr.c's comments for FlushRelationBuffers()). */ i = FlushRelationBuffers(onerel, vacrelstats->rel_pages); if (i < 0) elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d", i); } } #ifdef NOT_USED if (reindex) activate_indexes_of_a_table(RelationGetRelid(onerel), true); #endif /* NOT_USED */ /* update shared free space map with final free space info */ vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages); /* update statistics in pg_class */ vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages, vacrelstats->rel_tuples, vacrelstats->hasindex); } /* * scan_heap() -- scan an open heap relation * * This routine sets commit status bits, constructs vacuum_pages (list * of pages we need to compact free space on and/or clean indexes of * deleted tuples), constructs fraged_pages (list of pages with free * space that tuples could be moved into), and calculates statistics * on the number of live tuples in the heap. */ static void scan_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages) { BlockNumber nblocks, blkno; ItemId itemid; Buffer buf; HeapTupleData tuple; OffsetNumber offnum, maxoff; bool pgchanged, tupgone, notup; char *relname; VacPage vacpage, vacpagecopy; BlockNumber empty_pages, new_pages, changed_pages, empty_end_pages; double num_tuples, tups_vacuumed, nkeep, nunused; double free_size, usable_free_size; Size min_tlen = MaxTupleSize; Size max_tlen = 0; int i; bool do_shrinking = true; VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData)); int num_vtlinks = 0; int free_vtlinks = 100; VacRUsage ru0; vac_init_rusage(&ru0); relname = RelationGetRelationName(onerel); elog(MESSAGE_LEVEL, "--Relation %s--", relname); empty_pages = new_pages = changed_pages = empty_end_pages = 0; num_tuples = tups_vacuumed = nkeep = nunused = 0; free_size = 0; nblocks = RelationGetNumberOfBlocks(onerel); /* * We initially create each VacPage item in a maximal-sized workspace, * then copy the workspace into a just-large-enough copy. */ vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber)); for (blkno = 0; blkno < nblocks; blkno++) { Page page, tempPage = NULL; bool do_reap, do_frag; buf = ReadBuffer(onerel, blkno); page = BufferGetPage(buf); vacpage->blkno = blkno; vacpage->offsets_used = 0; vacpage->offsets_free = 0; if (PageIsNew(page)) { elog(NOTICE, "Rel %s: Uninitialized page %u - fixing", relname, blkno); PageInit(page, BufferGetPageSize(buf), 0); vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; free_size += (vacpage->free - sizeof(ItemIdData)); new_pages++; empty_end_pages++; vacpagecopy = copy_vac_page(vacpage); vpage_insert(vacuum_pages, vacpagecopy); vpage_insert(fraged_pages, vacpagecopy); WriteBuffer(buf); continue; } if (PageIsEmpty(page)) { vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; free_size += (vacpage->free - sizeof(ItemIdData)); empty_pages++; empty_end_pages++; vacpagecopy = copy_vac_page(vacpage); vpage_insert(vacuum_pages, vacpagecopy); vpage_insert(fraged_pages, vacpagecopy); ReleaseBuffer(buf); continue; } pgchanged = false; notup = true; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { uint16 sv_infomask; itemid = PageGetItemId(page, offnum); /* * Collect un-used items too - it's possible to have indices * pointing here after crash. */ if (!ItemIdIsUsed(itemid)) { vacpage->offsets[vacpage->offsets_free++] = offnum; nunused += 1; continue; } tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); tuple.t_len = ItemIdGetLength(itemid); ItemPointerSet(&(tuple.t_self), blkno, offnum); tupgone = false; sv_infomask = tuple.t_data->t_infomask; switch (HeapTupleSatisfiesVacuum(tuple.t_data, XmaxRecent)) { case HEAPTUPLE_DEAD: tupgone = true; /* we can delete the tuple */ break; case HEAPTUPLE_LIVE: break; case HEAPTUPLE_RECENTLY_DEAD: /* * If tuple is recently deleted then we must not remove * it from relation. */ nkeep += 1; /* * If we do shrinking and this tuple is updated one * then remember it to construct updated tuple * dependencies. */ if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))) { if (free_vtlinks == 0) { free_vtlinks = 1000; vtlinks = (VTupleLink) repalloc(vtlinks, (free_vtlinks + num_vtlinks) * sizeof(VTupleLinkData)); } vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid; vtlinks[num_vtlinks].this_tid = tuple.t_self; free_vtlinks--; num_vtlinks++; } break; case HEAPTUPLE_INSERT_IN_PROGRESS: /* * This should not happen, since we hold exclusive lock * on the relation; shouldn't we raise an error? */ elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation", relname, blkno, offnum, tuple.t_data->t_xmin); do_shrinking = false; break; case HEAPTUPLE_DELETE_IN_PROGRESS: /* * This should not happen, since we hold exclusive lock * on the relation; shouldn't we raise an error? */ elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation", relname, blkno, offnum, tuple.t_data->t_xmax); do_shrinking = false; break; default: elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result"); break; } /* check for hint-bit update by HeapTupleSatisfiesVacuum */ if (sv_infomask != tuple.t_data->t_infomask) pgchanged = true; /* * Other checks... */ if (!OidIsValid(tuple.t_data->t_oid)) elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.", relname, blkno, offnum, (int) tupgone); if (tupgone) { ItemId lpp; /* * Here we are building a temporary copy of the page with * dead tuples removed. Below we will apply * PageRepairFragmentation to the copy, so that we can * determine how much space will be available after * removal of dead tuples. But note we are NOT changing * the real page yet... */ if (tempPage == (Page) NULL) { Size pageSize; pageSize = PageGetPageSize(page); tempPage = (Page) palloc(pageSize); memcpy(tempPage, page, pageSize); } /* mark it unused on the temp page */ lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]); lpp->lp_flags &= ~LP_USED; vacpage->offsets[vacpage->offsets_free++] = offnum; tups_vacuumed += 1; } else { num_tuples += 1; notup = false; if (tuple.t_len < min_tlen) min_tlen = tuple.t_len; if (tuple.t_len > max_tlen) max_tlen = tuple.t_len; } } /* scan along page */ if (tempPage != (Page) NULL) { /* Some tuples are removable; figure free space after removal */ PageRepairFragmentation(tempPage, NULL); vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower; pfree(tempPage); do_reap = true; } else { /* Just use current available space */ vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; /* Need to reap the page if it has ~LP_USED line pointers */ do_reap = (vacpage->offsets_free > 0); } free_size += vacpage->free; /* * Add the page to fraged_pages if it has a useful amount of free * space. "Useful" means enough for a minimal-sized tuple. * But we don't know that accurately near the start of the relation, * so add pages unconditionally if they have >= BLCKSZ/10 free space. */ do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10); if (do_reap || do_frag) { vacpagecopy = copy_vac_page(vacpage); if (do_reap) vpage_insert(vacuum_pages, vacpagecopy); if (do_frag) vpage_insert(fraged_pages, vacpagecopy); } if (notup) empty_end_pages++; else empty_end_pages = 0; if (pgchanged) { WriteBuffer(buf); changed_pages++; } else ReleaseBuffer(buf); } pfree(vacpage); /* save stats in the rel list for use later */ vacrelstats->rel_tuples = num_tuples; vacrelstats->rel_pages = nblocks; if (num_tuples == 0) min_tlen = max_tlen = 0; vacrelstats->min_tlen = min_tlen; vacrelstats->max_tlen = max_tlen; vacuum_pages->empty_end_pages = empty_end_pages; fraged_pages->empty_end_pages = empty_end_pages; /* * Clear the fraged_pages list if we found we couldn't shrink. * Else, remove any "empty" end-pages from the list, and compute * usable free space = free space in remaining pages. */ if (do_shrinking) { Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages); fraged_pages->num_pages -= empty_end_pages; usable_free_size = 0; for (i = 0; i < fraged_pages->num_pages; i++) usable_free_size += fraged_pages->pagedesc[i]->free; } else { fraged_pages->num_pages = 0; usable_free_size = 0; } if (usable_free_size > 0 && num_vtlinks > 0) { qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData), vac_cmp_vtlinks); vacrelstats->vtlinks = vtlinks; vacrelstats->num_vtlinks = num_vtlinks; } else { vacrelstats->vtlinks = NULL; vacrelstats->num_vtlinks = 0; pfree(vtlinks); } elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \ Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \ Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u. %s", nblocks, changed_pages, vacuum_pages->num_pages, empty_pages, new_pages, num_tuples, tups_vacuumed, nkeep, vacrelstats->num_vtlinks, nunused, (unsigned long) min_tlen, (unsigned long) max_tlen, free_size, usable_free_size, empty_end_pages, fraged_pages->num_pages, vac_show_rusage(&ru0)); } /* * repair_frag() -- try to repair relation's fragmentation * * This routine marks dead tuples as unused and tries re-use dead space * by moving tuples (and inserting indices if needed). It constructs * Nvacpagelist list of free-ed pages (moved tuples) and clean indices * for them after committing (in hack-manner - without losing locks * and freeing memory!) current transaction. It truncates relation * if some end-blocks are gone away. */ static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindices, Relation *Irel) { TransactionId myXID; CommandId myCID; Buffer buf, cur_buffer; BlockNumber nblocks, blkno; BlockNumber last_move_dest_block = 0, last_vacuum_block; Page page, ToPage = NULL; OffsetNumber offnum, maxoff, newoff, max_offset; ItemId itemid, newitemid; HeapTupleData tuple, newtup; TupleDesc tupdesc; ResultRelInfo *resultRelInfo; EState *estate; TupleTable tupleTable; TupleTableSlot *slot; VacPageListData Nvacpagelist; VacPage cur_page = NULL, last_vacuum_page, vacpage, *curpage; int cur_item = 0; int i; Size tuple_len; int num_moved, num_fraged_pages, vacuumed_pages; int checked_moved, num_tuples, keep_tuples = 0; bool isempty, dowrite, chain_tuple_moved; VacRUsage ru0; vac_init_rusage(&ru0); myXID = GetCurrentTransactionId(); myCID = GetCurrentCommandId(); tupdesc = RelationGetDescr(onerel); /* * We need a ResultRelInfo and an EState so we can use the regular * executor's index-entry-making machinery. */ resultRelInfo = makeNode(ResultRelInfo); resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ resultRelInfo->ri_RelationDesc = onerel; resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */ ExecOpenIndices(resultRelInfo); estate = CreateExecutorState(); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; estate->es_result_relation_info = resultRelInfo; /* Set up a dummy tuple table too */ tupleTable = ExecCreateTupleTable(1); slot = ExecAllocTableSlot(tupleTable); ExecSetSlotDescriptor(slot, tupdesc, false); Nvacpagelist.num_pages = 0; num_fraged_pages = fraged_pages->num_pages; Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages); vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages; if (vacuumed_pages > 0) { /* get last reaped page from vacuum_pages */ last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1]; last_vacuum_block = last_vacuum_page->blkno; } else { last_vacuum_page = NULL; last_vacuum_block = InvalidBlockNumber; } cur_buffer = InvalidBuffer; num_moved = 0; vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber)); vacpage->offsets_used = vacpage->offsets_free = 0; /* * Scan pages backwards from the last nonempty page, trying to move * tuples down to lower pages. Quit when we reach a page that we have * moved any tuples onto, or the first page if we haven't moved anything, * or when we find a page we cannot completely empty (this last condition * is handled by "break" statements within the loop). * * NB: this code depends on the vacuum_pages and fraged_pages lists being * in order by blkno. */ nblocks = vacrelstats->rel_pages; for (blkno = nblocks - vacuum_pages->empty_end_pages - 1; blkno > last_move_dest_block; blkno--) { /* * Forget fraged_pages pages at or after this one; they're no longer * useful as move targets, since we only want to move down. Note * that since we stop the outer loop at last_move_dest_block, pages * removed here cannot have had anything moved onto them already. * * Also note that we don't change the stored fraged_pages list, * only our local variable num_fraged_pages; so the forgotten pages * are still available to be loaded into the free space map later. */ while (num_fraged_pages > 0 && fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno) { Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0); --num_fraged_pages; } /* * Process this page of relation. */ buf = ReadBuffer(onerel, blkno); page = BufferGetPage(buf); vacpage->offsets_free = 0; isempty = PageIsEmpty(page); dowrite = false; /* Is the page in the vacuum_pages list? */ if (blkno == last_vacuum_block) { if (last_vacuum_page->offsets_free > 0) { /* there are dead tuples on this page - clean them */ Assert(!isempty); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); vacuum_page(onerel, buf, last_vacuum_page); LockBuffer(buf, BUFFER_LOCK_UNLOCK); dowrite = true; } else Assert(isempty); --vacuumed_pages; if (vacuumed_pages > 0) { /* get prev reaped page from vacuum_pages */ last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1]; last_vacuum_block = last_vacuum_page->blkno; } else { last_vacuum_page = NULL; last_vacuum_block = InvalidBlockNumber; } if (isempty) { ReleaseBuffer(buf); continue; } } else Assert(!isempty); chain_tuple_moved = false; /* no one chain-tuple was moved * off this page, yet */ vacpage->blkno = blkno; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); tuple_len = tuple.t_len = ItemIdGetLength(itemid); ItemPointerSet(&(tuple.t_self), blkno, offnum); if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin"); if (tuple.t_data->t_infomask & HEAP_MOVED_IN) elog(ERROR, "HEAP_MOVED_IN was not expected"); /* * If this (chain) tuple is moved by me already then I * have to check is it in vacpage or not - i.e. is it * moved while cleaning this page or some previous one. */ if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { if (keep_tuples == 0) continue; if (chain_tuple_moved) /* some chains was moved * while */ { /* cleaning this page */ Assert(vacpage->offsets_free > 0); for (i = 0; i < vacpage->offsets_free; i++) { if (vacpage->offsets[i] == offnum) break; } if (i >= vacpage->offsets_free) /* not found */ { vacpage->offsets[vacpage->offsets_free++] = offnum; keep_tuples--; } } else { vacpage->offsets[vacpage->offsets_free++] = offnum; keep_tuples--; } continue; } elog(ERROR, "HEAP_MOVED_OFF was expected"); } /* * If this tuple is in the chain of tuples created in updates * by "recent" transactions then we have to move all chain of * tuples to another places. */ if ((tuple.t_data->t_infomask & HEAP_UPDATED && !TransactionIdPrecedes(tuple.t_data->t_xmin, XmaxRecent)) || (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) && !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid))))) { Buffer Cbuf = buf; Page Cpage; ItemId Citemid; ItemPointerData Ctid; HeapTupleData tp = tuple; Size tlen = tuple_len; VTupleMove vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData)); int num_vtmove = 0; int free_vtmove = 100; VacPage to_vacpage = NULL; int to_item = 0; bool freeCbuf = false; int ti; if (vacrelstats->vtlinks == NULL) elog(ERROR, "No one parent tuple was found"); if (cur_buffer != InvalidBuffer) { WriteBuffer(cur_buffer); cur_buffer = InvalidBuffer; } /* * If this tuple is in the begin/middle of the chain then * we have to move to the end of chain. */ while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) && !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid)))) { Ctid = tp.t_data->t_ctid; if (freeCbuf) ReleaseBuffer(Cbuf); freeCbuf = true; Cbuf = ReadBuffer(onerel, ItemPointerGetBlockNumber(&Ctid)); Cpage = BufferGetPage(Cbuf); Citemid = PageGetItemId(Cpage, ItemPointerGetOffsetNumber(&Ctid)); if (!ItemIdIsUsed(Citemid)) { /* * This means that in the middle of chain there * was tuple updated by older (than XmaxRecent) * xaction and this tuple is already deleted by * me. Actually, upper part of chain should be * removed and seems that this should be handled * in scan_heap(), but it's not implemented at the * moment and so we just stop shrinking here. */ ReleaseBuffer(Cbuf); pfree(vtmove); vtmove = NULL; elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag"); break; } tp.t_datamcxt = NULL; tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid); tp.t_self = Ctid; tlen = tp.t_len = ItemIdGetLength(Citemid); } if (vtmove == NULL) break; /* first, can chain be moved ? */ for (;;) { if (to_vacpage == NULL || !enough_space(to_vacpage, tlen)) { for (i = 0; i < num_fraged_pages; i++) { if (enough_space(fraged_pages->pagedesc[i], tlen)) break; } if (i == num_fraged_pages) { /* can't move item anywhere */ for (i = 0; i < num_vtmove; i++) { Assert(vtmove[i].vacpage->offsets_used > 0); (vtmove[i].vacpage->offsets_used)--; } num_vtmove = 0; break; } to_item = i; to_vacpage = fraged_pages->pagedesc[to_item]; } to_vacpage->free -= MAXALIGN(tlen); if (to_vacpage->offsets_used >= to_vacpage->offsets_free) to_vacpage->free -= MAXALIGN(sizeof(ItemIdData)); (to_vacpage->offsets_used)++; if (free_vtmove == 0) { free_vtmove = 1000; vtmove = (VTupleMove) repalloc(vtmove, (free_vtmove + num_vtmove) * sizeof(VTupleMoveData)); } vtmove[num_vtmove].tid = tp.t_self; vtmove[num_vtmove].vacpage = to_vacpage; if (to_vacpage->offsets_used == 1) vtmove[num_vtmove].cleanVpd = true; else vtmove[num_vtmove].cleanVpd = false; free_vtmove--; num_vtmove++; /* All done ? */ if (!(tp.t_data->t_infomask & HEAP_UPDATED) || TransactionIdPrecedes(tp.t_data->t_xmin, XmaxRecent)) break; /* Well, try to find tuple with old row version */ for (;;) { Buffer Pbuf; Page Ppage; ItemId Pitemid; HeapTupleData Ptp; VTupleLinkData vtld, *vtlp; vtld.new_tid = tp.t_self; vtlp = (VTupleLink) vac_bsearch((void *) &vtld, (void *) (vacrelstats->vtlinks), vacrelstats->num_vtlinks, sizeof(VTupleLinkData), vac_cmp_vtlinks); if (vtlp == NULL) elog(ERROR, "Parent tuple was not found"); tp.t_self = vtlp->this_tid; Pbuf = ReadBuffer(onerel, ItemPointerGetBlockNumber(&(tp.t_self))); Ppage = BufferGetPage(Pbuf); Pitemid = PageGetItemId(Ppage, ItemPointerGetOffsetNumber(&(tp.t_self))); if (!ItemIdIsUsed(Pitemid)) elog(ERROR, "Parent itemid marked as unused"); Ptp.t_datamcxt = NULL; Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid); Assert(ItemPointerEquals(&(vtld.new_tid), &(Ptp.t_data->t_ctid))); /* * Read above about cases when * !ItemIdIsUsed(Citemid) (child item is * removed)... Due to the fact that at the moment * we don't remove unuseful part of update-chain, * it's possible to get too old parent row here. * Like as in the case which caused this problem, * we stop shrinking here. I could try to find * real parent row but want not to do it because * of real solution will be implemented anyway, * latter, and we are too close to 6.5 release. - * vadim 06/11/99 */ if (!(TransactionIdEquals(Ptp.t_data->t_xmax, tp.t_data->t_xmin))) { if (freeCbuf) ReleaseBuffer(Cbuf); freeCbuf = false; ReleaseBuffer(Pbuf); for (i = 0; i < num_vtmove; i++) { Assert(vtmove[i].vacpage->offsets_used > 0); (vtmove[i].vacpage->offsets_used)--; } num_vtmove = 0; elog(NOTICE, "Too old parent tuple found - can't continue repair_frag"); break; } #ifdef NOT_USED /* I'm not sure that this will wotk * properly... */ /* * If this tuple is updated version of row and it * was created by the same transaction then no one * is interested in this tuple - mark it as * removed. */ if (Ptp.t_data->t_infomask & HEAP_UPDATED && TransactionIdEquals(Ptp.t_data->t_xmin, Ptp.t_data->t_xmax)) { TransactionIdStore(myXID, (TransactionId *) &(Ptp.t_data->t_cmin)); Ptp.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); Ptp.t_data->t_infomask |= HEAP_MOVED_OFF; WriteBuffer(Pbuf); continue; } #endif tp.t_datamcxt = Ptp.t_datamcxt; tp.t_data = Ptp.t_data; tlen = tp.t_len = ItemIdGetLength(Pitemid); if (freeCbuf) ReleaseBuffer(Cbuf); Cbuf = Pbuf; freeCbuf = true; break; } if (num_vtmove == 0) break; } if (freeCbuf) ReleaseBuffer(Cbuf); if (num_vtmove == 0) /* chain can't be moved */ { pfree(vtmove); break; } ItemPointerSetInvalid(&Ctid); for (ti = 0; ti < num_vtmove; ti++) { VacPage destvacpage = vtmove[ti].vacpage; /* Get page to move from */ tuple.t_self = vtmove[ti].tid; Cbuf = ReadBuffer(onerel, ItemPointerGetBlockNumber(&(tuple.t_self))); /* Get page to move to */ cur_buffer = ReadBuffer(onerel, destvacpage->blkno); LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); if (cur_buffer != Cbuf) LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE); ToPage = BufferGetPage(cur_buffer); Cpage = BufferGetPage(Cbuf); Citemid = PageGetItemId(Cpage, ItemPointerGetOffsetNumber(&(tuple.t_self))); tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid); tuple_len = tuple.t_len = ItemIdGetLength(Citemid); /* * make a copy of the source tuple, and then mark the * source tuple MOVED_OFF. */ heap_copytuple_with_tuple(&tuple, &newtup); RelationInvalidateHeapTuple(onerel, &tuple); /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */ START_CRIT_SECTION(); TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin)); tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); tuple.t_data->t_infomask |= HEAP_MOVED_OFF; /* * If this page was not used before - clean it. * * NOTE: a nasty bug used to lurk here. It is possible * for the source and destination pages to be the same * (since this tuple-chain member can be on a page * lower than the one we're currently processing in * the outer loop). If that's true, then after * vacuum_page() the source tuple will have been * moved, and tuple.t_data will be pointing at * garbage. Therefore we must do everything that uses * tuple.t_data BEFORE this step!! * * This path is different from the other callers of * vacuum_page, because we have already incremented * the vacpage's offsets_used field to account for the * tuple(s) we expect to move onto the page. Therefore * vacuum_page's check for offsets_used == 0 is wrong. * But since that's a good debugging check for all * other callers, we work around it here rather than * remove it. */ if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd) { int sv_offsets_used = destvacpage->offsets_used; destvacpage->offsets_used = 0; vacuum_page(onerel, cur_buffer, destvacpage); destvacpage->offsets_used = sv_offsets_used; } /* * Update the state of the copied tuple, and store it * on the destination page. */ TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin)); newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF); newtup.t_data->t_infomask |= HEAP_MOVED_IN; newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len, InvalidOffsetNumber, LP_USED); if (newoff == InvalidOffsetNumber) { elog(STOP, "moving chain: failed to add item with len = %lu to page %u", (unsigned long) tuple_len, destvacpage->blkno); } newitemid = PageGetItemId(ToPage, newoff); pfree(newtup.t_data); newtup.t_datamcxt = NULL; newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid); ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff); { XLogRecPtr recptr = log_heap_move(onerel, Cbuf, tuple.t_self, cur_buffer, &newtup); if (Cbuf != cur_buffer) { PageSetLSN(Cpage, recptr); PageSetSUI(Cpage, ThisStartUpID); } PageSetLSN(ToPage, recptr); PageSetSUI(ToPage, ThisStartUpID); } END_CRIT_SECTION(); if (destvacpage->blkno > last_move_dest_block) last_move_dest_block = destvacpage->blkno; /* * Set new tuple's t_ctid pointing to itself for last * tuple in chain, and to next tuple in chain * otherwise. */ if (!ItemPointerIsValid(&Ctid)) newtup.t_data->t_ctid = newtup.t_self; else newtup.t_data->t_ctid = Ctid; Ctid = newtup.t_self; num_moved++; /* * Remember that we moved tuple from the current page * (corresponding index tuple will be cleaned). */ if (Cbuf == buf) vacpage->offsets[vacpage->offsets_free++] = ItemPointerGetOffsetNumber(&(tuple.t_self)); else keep_tuples++; LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK); if (cur_buffer != Cbuf) LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK); /* Create index entries for the moved tuple */ if (resultRelInfo->ri_NumIndices > 0) { ExecStoreTuple(&newtup, slot, InvalidBuffer, false); ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true); } WriteBuffer(cur_buffer); WriteBuffer(Cbuf); } cur_buffer = InvalidBuffer; pfree(vtmove); chain_tuple_moved = true; continue; } /* try to find new page for this tuple */ if (cur_buffer == InvalidBuffer || !enough_space(cur_page, tuple_len)) { if (cur_buffer != InvalidBuffer) { WriteBuffer(cur_buffer); cur_buffer = InvalidBuffer; } for (i = 0; i < num_fraged_pages; i++) { if (enough_space(fraged_pages->pagedesc[i], tuple_len)) break; } if (i == num_fraged_pages) break; /* can't move item anywhere */ cur_item = i; cur_page = fraged_pages->pagedesc[cur_item]; cur_buffer = ReadBuffer(onerel, cur_page->blkno); LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); ToPage = BufferGetPage(cur_buffer); /* if this page was not used before - clean it */ if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0) vacuum_page(onerel, cur_buffer, cur_page); } else LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); /* copy tuple */ heap_copytuple_with_tuple(&tuple, &newtup); RelationInvalidateHeapTuple(onerel, &tuple); /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */ START_CRIT_SECTION(); /* * Mark new tuple as moved_in by vacuum and store vacuum XID * in t_cmin !!! */ TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin)); newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF); newtup.t_data->t_infomask |= HEAP_MOVED_IN; /* add tuple to the page */ newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len, InvalidOffsetNumber, LP_USED); if (newoff == InvalidOffsetNumber) { elog(STOP, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)", (unsigned long) tuple_len, cur_page->blkno, (unsigned long) cur_page->free, cur_page->offsets_used, cur_page->offsets_free); } newitemid = PageGetItemId(ToPage, newoff); pfree(newtup.t_data); newtup.t_datamcxt = NULL; newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid); ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff); newtup.t_self = newtup.t_data->t_ctid; /* * Mark old tuple as moved_off by vacuum and store vacuum XID * in t_cmin !!! */ TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin)); tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); tuple.t_data->t_infomask |= HEAP_MOVED_OFF; { XLogRecPtr recptr = log_heap_move(onerel, buf, tuple.t_self, cur_buffer, &newtup); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); PageSetLSN(ToPage, recptr); PageSetSUI(ToPage, ThisStartUpID); } END_CRIT_SECTION(); cur_page->offsets_used++; num_moved++; cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower; if (cur_page->blkno > last_move_dest_block) last_move_dest_block = cur_page->blkno; vacpage->offsets[vacpage->offsets_free++] = offnum; LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buf, BUFFER_LOCK_UNLOCK); /* insert index' tuples if needed */ if (resultRelInfo->ri_NumIndices > 0) { ExecStoreTuple(&newtup, slot, InvalidBuffer, false); ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true); } } /* walk along page */ if (offnum < maxoff && keep_tuples > 0) { OffsetNumber off; for (off = OffsetNumberNext(offnum); off <= maxoff; off = OffsetNumberNext(off)) { itemid = PageGetItemId(page, off); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED) continue; if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin (4)"); if (tuple.t_data->t_infomask & HEAP_MOVED_IN) elog(ERROR, "HEAP_MOVED_IN was not expected (2)"); if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { /* some chains was moved while */ if (chain_tuple_moved) { /* cleaning this page */ Assert(vacpage->offsets_free > 0); for (i = 0; i < vacpage->offsets_free; i++) { if (vacpage->offsets[i] == off) break; } if (i >= vacpage->offsets_free) /* not found */ { vacpage->offsets[vacpage->offsets_free++] = off; Assert(keep_tuples > 0); keep_tuples--; } } else { vacpage->offsets[vacpage->offsets_free++] = off; Assert(keep_tuples > 0); keep_tuples--; } } } } if (vacpage->offsets_free > 0) /* some tuples were moved */ { if (chain_tuple_moved) /* else - they are ordered */ { qsort((char *) (vacpage->offsets), vacpage->offsets_free, sizeof(OffsetNumber), vac_cmp_offno); } vpage_insert(&Nvacpagelist, copy_vac_page(vacpage)); WriteBuffer(buf); } else if (dowrite) WriteBuffer(buf); else ReleaseBuffer(buf); if (offnum <= maxoff) break; /* some item(s) left */ } /* walk along relation */ blkno++; /* new number of blocks */ if (cur_buffer != InvalidBuffer) { Assert(num_moved > 0); WriteBuffer(cur_buffer); } if (num_moved > 0) { /* * We have to commit our tuple movings before we truncate the * relation. Ideally we should do Commit/StartTransactionCommand * here, relying on the session-level table lock to protect our * exclusive access to the relation. However, that would require * a lot of extra code to close and re-open the relation, indices, * etc. For now, a quick hack: record status of current * transaction as committed, and continue. */ RecordTransactionCommit(); } /* * We are not going to move any more tuples across pages, but we still * need to apply vacuum_page to compact free space in the remaining * pages in vacuum_pages list. Note that some of these pages may also * be in the fraged_pages list, and may have had tuples moved onto them; * if so, we already did vacuum_page and needn't do it again. */ for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++) { Assert((*curpage)->blkno < blkno); if ((*curpage)->offsets_used == 0) { /* this page was not used as a move target, so must clean it */ buf = ReadBuffer(onerel, (*curpage)->blkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); if (!PageIsEmpty(page)) vacuum_page(onerel, buf, *curpage); LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); } } /* * Now scan all the pages that we moved tuples onto and update * tuple status bits. This is not really necessary, but will save time * for future transactions examining these tuples. * * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from * pages that were move source pages but not move dest pages. One also * wonders whether it wouldn't be better to skip this step and let the * tuple status updates happen someplace that's not holding an exclusive * lock on the relation. */ checked_moved = 0; for (i = 0, curpage = fraged_pages->pagedesc; i < num_fraged_pages; i++, curpage++) { Assert((*curpage)->blkno < blkno); if ((*curpage)->blkno > last_move_dest_block) break; /* no need to scan any further */ if ((*curpage)->offsets_used == 0) continue; /* this page was never used as a move dest */ buf = ReadBuffer(onerel, (*curpage)->blkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); num_tuples = 0; max_offset = PageGetMaxOffsetNumber(page); for (newoff = FirstOffsetNumber; newoff <= max_offset; newoff = OffsetNumberNext(newoff)) { itemid = PageGetItemId(page, newoff); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin (2)"); if (tuple.t_data->t_infomask & HEAP_MOVED_IN) { tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED; num_tuples++; } else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) tuple.t_data->t_infomask |= HEAP_XMIN_INVALID; else elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected"); } } LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); Assert((*curpage)->offsets_used == num_tuples); checked_moved += num_tuples; } Assert(num_moved == checked_moved); elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s", RelationGetRelationName(onerel), nblocks, blkno, num_moved, vac_show_rusage(&ru0)); /* * Reflect the motion of system tuples to catalog cache here. */ CommandCounterIncrement(); if (Nvacpagelist.num_pages > 0) { /* vacuum indices again if needed */ if (Irel != (Relation *) NULL) { VacPage *vpleft, *vpright, vpsave; /* re-sort Nvacpagelist.pagedesc */ for (vpleft = Nvacpagelist.pagedesc, vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1; vpleft < vpright; vpleft++, vpright--) { vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave; } Assert(keep_tuples >= 0); for (i = 0; i < nindices; i++) vacuum_index(&Nvacpagelist, Irel[i], vacrelstats->rel_tuples, keep_tuples); } /* clean moved tuples from last page in Nvacpagelist list */ if (vacpage->blkno == (blkno - 1) && vacpage->offsets_free > 0) { OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)]; OffsetNumber *unused = unbuf; int uncnt; buf = ReadBuffer(onerel, vacpage->blkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); num_tuples = 0; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin (3)"); if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { itemid->lp_flags &= ~LP_USED; num_tuples++; } else elog(ERROR, "HEAP_MOVED_OFF was expected (2)"); } } Assert(vacpage->offsets_free == num_tuples); START_CRIT_SECTION(); uncnt = PageRepairFragmentation(page, unused); { XLogRecPtr recptr; recptr = log_heap_clean(onerel, buf, (char *) unused, (char *) (&(unused[uncnt])) - (char *) unused); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); } END_CRIT_SECTION(); LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); } /* now - free new list of reaped pages */ curpage = Nvacpagelist.pagedesc; for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++) pfree(*curpage); pfree(Nvacpagelist.pagedesc); } /* * Flush dirty pages out to disk. We do this unconditionally, even if * we don't need to truncate, because we want to ensure that all * tuples have correct on-row commit status on disk (see bufmgr.c's * comments for FlushRelationBuffers()). */ i = FlushRelationBuffers(onerel, blkno); if (i < 0) elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d", i); /* truncate relation, if needed */ if (blkno < nblocks) { blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno); onerel->rd_nblocks = blkno; /* update relcache immediately */ onerel->rd_targblock = InvalidBlockNumber; vacrelstats->rel_pages = blkno; /* set new number of blocks */ } /* clean up */ pfree(vacpage); if (vacrelstats->vtlinks != NULL) pfree(vacrelstats->vtlinks); ExecDropTupleTable(tupleTable, true); ExecCloseIndices(resultRelInfo); } /* * vacuum_heap() -- free dead tuples * * This routine marks dead tuples as unused and truncates relation * if there are "empty" end-blocks. */ static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages) { Buffer buf; VacPage *vacpage; BlockNumber relblocks; int nblocks; int i; nblocks = vacuum_pages->num_pages; nblocks -= vacuum_pages->empty_end_pages; /* nothing to do with them */ for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++) { if ((*vacpage)->offsets_free > 0) { buf = ReadBuffer(onerel, (*vacpage)->blkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); vacuum_page(onerel, buf, *vacpage); LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); } } /* * Flush dirty pages out to disk. We do this unconditionally, even if * we don't need to truncate, because we want to ensure that all * tuples have correct on-row commit status on disk (see bufmgr.c's * comments for FlushRelationBuffers()). */ Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages); relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages; i = FlushRelationBuffers(onerel, relblocks); if (i < 0) elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d", i); /* truncate relation if there are some empty end-pages */ if (vacuum_pages->empty_end_pages > 0) { elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.", RelationGetRelationName(onerel), vacrelstats->rel_pages, relblocks); relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks); onerel->rd_nblocks = relblocks; /* update relcache immediately */ onerel->rd_targblock = InvalidBlockNumber; vacrelstats->rel_pages = relblocks; /* set new number of * blocks */ } } /* * vacuum_page() -- free dead tuples on a page * and repair its fragmentation. */ static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage) { OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)]; OffsetNumber *unused = unbuf; int uncnt; Page page = BufferGetPage(buffer); ItemId itemid; int i; /* There shouldn't be any tuples moved onto the page yet! */ Assert(vacpage->offsets_used == 0); START_CRIT_SECTION(); for (i = 0; i < vacpage->offsets_free; i++) { itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]); itemid->lp_flags &= ~LP_USED; } uncnt = PageRepairFragmentation(page, unused); { XLogRecPtr recptr; recptr = log_heap_clean(onerel, buffer, (char *) unused, (char *) (&(unused[uncnt])) - (char *) unused); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); } END_CRIT_SECTION(); } /* * scan_index() -- scan one index relation to update statistic. */ static void scan_index(Relation indrel, double num_tuples) { RetrieveIndexResult res; IndexScanDesc iscan; BlockNumber nipages; double nitups; VacRUsage ru0; vac_init_rusage(&ru0); /* walk through the entire index */ iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL); nitups = 0; while ((res = index_getnext(iscan, ForwardScanDirection)) != (RetrieveIndexResult) NULL) { nitups += 1; pfree(res); } index_endscan(iscan); /* now update statistics in pg_class */ nipages = RelationGetNumberOfBlocks(indrel); vac_update_relstats(RelationGetRelid(indrel), nipages, nitups, false); elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f. %s", RelationGetRelationName(indrel), nipages, nitups, vac_show_rusage(&ru0)); /* * Check for tuple count mismatch. If the index is partial, then * it's OK for it to have fewer tuples than the heap; else we got trouble. */ if (nitups != num_tuples) { if (nitups > num_tuples || ! is_partial_index(indrel)) elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\ \n\tRecreate the index.", RelationGetRelationName(indrel), nitups, num_tuples); } } /* * vacuum_index() -- vacuum one index relation. * * Vpl is the VacPageList of the heap we're currently vacuuming. * It's locked. Indrel is an index relation on the vacuumed heap. * We don't set locks on the index relation here, since the indexed * access methods support locking at different granularities. * We let them handle it. * * Finally, we arrange to update the index relation's statistics in * pg_class. */ static void vacuum_index(VacPageList vacpagelist, Relation indrel, double num_tuples, int keep_tuples) { RetrieveIndexResult res; IndexScanDesc iscan; ItemPointer heapptr; int tups_vacuumed; BlockNumber num_pages; double num_index_tuples; VacPage vp; VacRUsage ru0; vac_init_rusage(&ru0); /* walk through the entire index */ iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL); tups_vacuumed = 0; num_index_tuples = 0; while ((res = index_getnext(iscan, ForwardScanDirection)) != (RetrieveIndexResult) NULL) { heapptr = &res->heap_iptr; if ((vp = tid_reaped(heapptr, vacpagelist)) != (VacPage) NULL) { #ifdef NOT_USED elog(DEBUG, "<%x,%x> -> <%x,%x>", ItemPointerGetBlockNumber(&(res->index_iptr)), ItemPointerGetOffsetNumber(&(res->index_iptr)), ItemPointerGetBlockNumber(&(res->heap_iptr)), ItemPointerGetOffsetNumber(&(res->heap_iptr))); #endif if (vp->offsets_free == 0) { elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing", RelationGetRelationName(indrel), vp->blkno, ItemPointerGetOffsetNumber(heapptr)); } ++tups_vacuumed; index_delete(indrel, &res->index_iptr); } else num_index_tuples += 1; pfree(res); } index_endscan(iscan); /* now update statistics in pg_class */ num_pages = RelationGetNumberOfBlocks(indrel); vac_update_relstats(RelationGetRelid(indrel), num_pages, num_index_tuples, false); elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f: Deleted %u. %s", RelationGetRelationName(indrel), num_pages, num_index_tuples - keep_tuples, tups_vacuumed, vac_show_rusage(&ru0)); /* * Check for tuple count mismatch. If the index is partial, then * it's OK for it to have fewer tuples than the heap; else we got trouble. */ if (num_index_tuples != num_tuples + keep_tuples) { if (num_index_tuples > num_tuples + keep_tuples || ! is_partial_index(indrel)) elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\ \n\tRecreate the index.", RelationGetRelationName(indrel), num_index_tuples, num_tuples); } } /* * tid_reaped() -- is a particular tid reaped? * * vacpagelist->VacPage_array is sorted in right order. */ static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist) { OffsetNumber ioffno; OffsetNumber *voff; VacPage vp, *vpp; VacPageData vacpage; vacpage.blkno = ItemPointerGetBlockNumber(itemptr); ioffno = ItemPointerGetOffsetNumber(itemptr); vp = &vacpage; vpp = (VacPage *) vac_bsearch((void *) &vp, (void *) (vacpagelist->pagedesc), vacpagelist->num_pages, sizeof(VacPage), vac_cmp_blk); if (vpp == (VacPage *) NULL) return (VacPage) NULL; /* ok - we are on a partially or fully reaped page */ vp = *vpp; if (vp->offsets_free == 0) { /* this is EmptyPage, so claim all tuples on it are reaped!!! */ return vp; } voff = (OffsetNumber *) vac_bsearch((void *) &ioffno, (void *) (vp->offsets), vp->offsets_free, sizeof(OffsetNumber), vac_cmp_offno); if (voff == (OffsetNumber *) NULL) return (VacPage) NULL; /* tid is reaped */ return vp; } /* * Update the shared Free Space Map with the info we now have about * free space in the relation, discarding any old info the map may have. */ static void vac_update_fsm(Relation onerel, VacPageList fraged_pages, BlockNumber rel_pages) { int nPages = fraged_pages->num_pages; int i; BlockNumber *pages; Size *spaceAvail; /* +1 to avoid palloc(0) */ pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber)); spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size)); for (i = 0; i < nPages; i++) { pages[i] = fraged_pages->pagedesc[i]->blkno; spaceAvail[i] = fraged_pages->pagedesc[i]->free; /* * fraged_pages may contain entries for pages that we later decided * to truncate from the relation; don't enter them into the map! */ if (pages[i] >= rel_pages) { nPages = i; break; } } MultiRecordFreeSpace(&onerel->rd_node, 0, MaxBlockNumber, nPages, pages, spaceAvail); pfree(pages); pfree(spaceAvail); } /* Copy a VacPage structure */ static VacPage copy_vac_page(VacPage vacpage) { VacPage newvacpage; /* allocate a VacPageData entry */ newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber)); /* fill it in */ if (vacpage->offsets_free > 0) memcpy(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber)); newvacpage->blkno = vacpage->blkno; newvacpage->free = vacpage->free; newvacpage->offsets_used = vacpage->offsets_used; newvacpage->offsets_free = vacpage->offsets_free; return newvacpage; } /* * Add a VacPage pointer to a VacPageList. * * As a side effect of the way that scan_heap works, * higher pages come after lower pages in the array * (and highest tid on a page is last). */ static void vpage_insert(VacPageList vacpagelist, VacPage vpnew) { #define PG_NPAGEDESC 1024 /* allocate a VacPage entry if needed */ if (vacpagelist->num_pages == 0) { vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage)); vacpagelist->num_allocated_pages = PG_NPAGEDESC; } else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages) { vacpagelist->num_allocated_pages *= 2; vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage)); } vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew; (vacpagelist->num_pages)++; } /* * vac_bsearch: just like standard C library routine bsearch(), * except that we first test to see whether the target key is outside * the range of the table entries. This case is handled relatively slowly * by the normal binary search algorithm (ie, no faster than any other key) * but it occurs often enough in VACUUM to be worth optimizing. */ static void * vac_bsearch(const void *key, const void *base, size_t nelem, size_t size, int (*compar) (const void *, const void *)) { int res; const void *last; if (nelem == 0) return NULL; res = compar(key, base); if (res < 0) return NULL; if (res == 0) return (void *) base; if (nelem > 1) { last = (const void *) ((const char *) base + (nelem - 1) * size); res = compar(key, last); if (res > 0) return NULL; if (res == 0) return (void *) last; } if (nelem <= 2) return NULL; /* already checked 'em all */ return bsearch(key, base, nelem, size, compar); } /* * Comparator routines for use with qsort() and bsearch(). */ static int vac_cmp_blk(const void *left, const void *right) { BlockNumber lblk, rblk; lblk = (*((VacPage *) left))->blkno; rblk = (*((VacPage *) right))->blkno; if (lblk < rblk) return -1; if (lblk == rblk) return 0; return 1; } static int vac_cmp_offno(const void *left, const void *right) { if (*(OffsetNumber *) left < *(OffsetNumber *) right) return -1; if (*(OffsetNumber *) left == *(OffsetNumber *) right) return 0; return 1; } static int vac_cmp_vtlinks(const void *left, const void *right) { if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi < ((VTupleLink) right)->new_tid.ip_blkid.bi_hi) return -1; if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi > ((VTupleLink) right)->new_tid.ip_blkid.bi_hi) return 1; /* bi_hi-es are equal */ if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo < ((VTupleLink) right)->new_tid.ip_blkid.bi_lo) return -1; if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo > ((VTupleLink) right)->new_tid.ip_blkid.bi_lo) return 1; /* bi_lo-es are equal */ if (((VTupleLink) left)->new_tid.ip_posid < ((VTupleLink) right)->new_tid.ip_posid) return -1; if (((VTupleLink) left)->new_tid.ip_posid > ((VTupleLink) right)->new_tid.ip_posid) return 1; return 0; } static void get_indices(Relation relation, int *nindices, Relation **Irel) { List *indexoidlist, *indexoidscan; int i; indexoidlist = RelationGetIndexList(relation); *nindices = length(indexoidlist); if (*nindices > 0) *Irel = (Relation *) palloc(*nindices * sizeof(Relation)); else *Irel = NULL; i = 0; foreach(indexoidscan, indexoidlist) { Oid indexoid = lfirsti(indexoidscan); (*Irel)[i] = index_open(indexoid); i++; } freeList(indexoidlist); } static void close_indices(int nindices, Relation *Irel) { if (Irel == (Relation *) NULL) return; while (nindices--) index_close(Irel[nindices]); pfree(Irel); } static bool is_partial_index(Relation indrel) { bool result; HeapTuple cachetuple; Form_pg_index indexStruct; cachetuple = SearchSysCache(INDEXRELID, ObjectIdGetDatum(RelationGetRelid(indrel)), 0, 0, 0); if (!HeapTupleIsValid(cachetuple)) elog(ERROR, "is_partial_index: index %u not found", RelationGetRelid(indrel)); indexStruct = (Form_pg_index) GETSTRUCT(cachetuple); result = (VARSIZE(&indexStruct->indpred) != 0); ReleaseSysCache(cachetuple); return result; } static bool enough_space(VacPage vacpage, Size len) { len = MAXALIGN(len); if (len > vacpage->free) return false; if (vacpage->offsets_used < vacpage->offsets_free) /* there are free * itemid(s) */ return true; /* and len <= free_space */ /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */ if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free) return true; return false; } /* * Initialize usage snapshot. */ void vac_init_rusage(VacRUsage *ru0) { struct timezone tz; getrusage(RUSAGE_SELF, &ru0->ru); gettimeofday(&ru0->tv, &tz); } /* * Compute elapsed time since ru0 usage snapshot, and format into * a displayable string. Result is in a static string, which is * tacky, but no one ever claimed that the Postgres backend is * threadable... */ const char * vac_show_rusage(VacRUsage *ru0) { static char result[100]; VacRUsage ru1; vac_init_rusage(&ru1); if (ru1.tv.tv_usec < ru0->tv.tv_usec) { ru1.tv.tv_sec--; ru1.tv.tv_usec += 1000000; } if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec) { ru1.ru.ru_stime.tv_sec--; ru1.ru.ru_stime.tv_usec += 1000000; } if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec) { ru1.ru.ru_utime.tv_sec--; ru1.ru.ru_utime.tv_usec += 1000000; } snprintf(result, sizeof(result), "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.", (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec), (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000, (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec), (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000, (int) (ru1.tv.tv_sec - ru0->tv.tv_sec), (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000); return result; }