/*------------------------------------------------------------------------- * * vacuum.c * the postgres vacuum cleaner * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.136 2000/01/19 22:23:00 momjian Exp $ * *------------------------------------------------------------------------- */ #include #include #include #include #include #include "postgres.h" #include "access/genam.h" #include "access/heapam.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/catname.h" #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/pg_operator.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "parser/parse_oper.h" #include "storage/sinval.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/portal.h" #include "utils/relcache.h" #include "utils/syscache.h" #ifndef HAVE_GETRUSAGE #include "rusagestub.h" #else #include #include #endif bool VacuumRunning = false; static Portal vc_portal; static int MESSAGE_LEVEL; /* message level */ static TransactionId XmaxRecent; #define swapLong(a,b) {long tmp; tmp=a; a=b; b=tmp;} #define swapInt(a,b) {int tmp; tmp=a; a=b; b=tmp;} #define swapDatum(a,b) {Datum tmp; tmp=a; a=b; b=tmp;} #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq.fn_addr != NULL ) #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt.fn_addr != NULL && \ stats->f_cmpgt.fn_addr != NULL && \ RegProcedureIsValid(stats->outfunc) ) /* non-export function prototypes */ static void vc_init(void); static void vc_shutdown(void); static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols); static VRelList vc_getrels(NameData *VacRelP); static void vc_vacone(Oid relid, bool analyze, List *va_cols); static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages); static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages, int nindices, Relation *Irel); static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl); static void vc_vacpage(Page page, VPageDescr vpd); static void vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples, int keep_tuples); static void vc_scanoneind(Relation indrel, int num_tuples); static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple tuple); static void vc_bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int *bucket_len); static void vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats); static void vc_delstats(Oid relid, int attcnt, int *attnums); static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl); static void vc_reappage(VPageList vpl, VPageDescr vpc); static void vc_vpinsert(VPageList vpl, VPageDescr vpnew); static void vc_getindices(Oid relid, int *nindices, Relation **Irel); static void vc_clsindices(int nindices, Relation *Irel); static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc); static void *vc_find_eq(void *bot, int nelem, int size, void *elm, int (*compar) (const void *, const void *)); static int vc_cmp_blk(const void *left, const void *right); static int vc_cmp_offno(const void *left, const void *right); static int vc_cmp_vtlinks(const void *left, const void *right); static bool vc_enough_space(VPageDescr vpd, Size len); static char *vc_show_rusage(struct rusage *ru0); void vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec) { NameData VacRel; PortalVariableMemory pmem; MemoryContext old; List *le; List *va_cols = NIL; if (va_spec != NIL && !analyze) elog(ERROR, "Can't vacuum columns, only tables. You can 'vacuum analyze' columns."); /* * We cannot run VACUUM inside a user transaction block; if we were * inside a transaction, then our commit- and start-transaction-command * calls would not have the intended effect! Furthermore, the forced * commit that occurs before truncating the relation's file would have * the effect of committing the rest of the user's transaction too, * which would certainly not be the desired behavior. */ if (IsTransactionBlock()) elog(ERROR, "VACUUM cannot run inside a BEGIN/END block"); /* initialize vacuum cleaner, particularly vc_portal */ vc_init(); if (verbose) MESSAGE_LEVEL = NOTICE; else MESSAGE_LEVEL = DEBUG; /* vacrel gets de-allocated on transaction commit, so copy it */ if (vacrel) strcpy(NameStr(VacRel), vacrel); /* must also copy the column list, if any, to safe storage */ pmem = PortalGetVariableMemory(vc_portal); old = MemoryContextSwitchTo((MemoryContext) pmem); foreach(le, va_spec) { char *col = (char *) lfirst(le); va_cols = lappend(va_cols, pstrdup(col)); } MemoryContextSwitchTo(old); /* vacuum the database */ if (vacrel) vc_vacuum(&VacRel, analyze, va_cols); else vc_vacuum(NULL, analyze, NIL); /* clean up */ vc_shutdown(); } /* * vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner. * * Formerly, there was code here to prevent more than one VACUUM from * executing concurrently in the same database. However, there's no * good reason to prevent that, and manually removing lockfiles after * a vacuum crash was a pain for dbadmins. So, forget about lockfiles, * and just rely on the exclusive lock we grab on each target table * to ensure that there aren't two VACUUMs running on the same table * at the same time. * * The strangeness with committing and starting transactions in the * init and shutdown routines is due to the fact that the vacuum cleaner * is invoked via an SQL command, and so is already executing inside * a transaction. We need to leave ourselves in a predictable state * on entry and exit to the vacuum cleaner. We commit the transaction * started in PostgresMain() inside vc_init(), and start one in * vc_shutdown() to match the commit waiting for us back in * PostgresMain(). */ static void vc_init() { char *pname; /* * Create a portal for safe memory across transactions. We need to * palloc the name space for it because our hash function expects the * name to be on a longword boundary. CreatePortal copies the name to * safe storage for us. */ pname = pstrdup(VACPNAME); vc_portal = CreatePortal(pname); pfree(pname); /* * Set flag to indicate that vc_portal must be removed after an error. * This global variable is checked in the transaction manager on xact * abort, and the routine vc_abort() is called if necessary. */ VacuumRunning = true; /* matches the StartTransaction in PostgresMain() */ CommitTransactionCommand(); } static void vc_shutdown() { /* on entry, we are not in a transaction */ /* * Flush the init file that relcache.c uses to save startup time. The * next backend startup will rebuild the init file with up-to-date * information from pg_class. This lets the optimizer see the stats * that we've collected for certain critical system indexes. See * relcache.c for more details. * * Ignore any failure to unlink the file, since it might not be there if * no backend has been started since the last vacuum... */ unlink(RELCACHE_INIT_FILENAME); /* * Release our portal for cross-transaction memory. */ PortalDrop(&vc_portal); /* okay, we're done */ VacuumRunning = false; /* matches the CommitTransaction in PostgresMain() */ StartTransactionCommand(); } void vc_abort() { /* Clear flag first, to avoid recursion if PortalDrop elog's */ VacuumRunning = false; /* * Release our portal for cross-transaction memory. */ PortalDrop(&vc_portal); } /* * vc_vacuum() -- vacuum the database. * * This routine builds a list of relations to vacuum, and then calls * code that vacuums them one at a time. We are careful to vacuum each * relation in a separate transaction in order to avoid holding too many * locks at one time. */ static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols) { VRelList vrl, cur; /* get list of relations */ vrl = vc_getrels(VacRelP); /* vacuum each heap relation */ for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next) vc_vacone(cur->vrl_relid, analyze, va_cols); } static VRelList vc_getrels(NameData *VacRelP) { Relation rel; TupleDesc tupdesc; HeapScanDesc scan; HeapTuple tuple; PortalVariableMemory portalmem; MemoryContext old; VRelList vrl, cur; Datum d; char *rname; char rkind; bool n; bool found = false; ScanKeyData key; StartTransactionCommand(); if (NameStr(*VacRelP)) { /* we could use the cache here, but it is clearer to use * scankeys for both vacuum cases, bjm 2000/01/19 */ ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname, F_NAMEEQ, PointerGetDatum(NameStr(*VacRelP))); } else { ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind, F_CHAREQ, CharGetDatum('r')); } portalmem = PortalGetVariableMemory(vc_portal); vrl = cur = (VRelList) NULL; rel = heap_openr(RelationRelationName, AccessShareLock); tupdesc = RelationGetDescr(rel); scan = heap_beginscan(rel, false, SnapshotNow, 1, &key); while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) { found = true; d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n); rname = (char *) d; d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n); rkind = DatumGetChar(d); if (rkind != RELKIND_RELATION) { elog(NOTICE, "Vacuum: can not process index and certain system tables"); continue; } /* get a relation list entry for this guy */ old = MemoryContextSwitchTo((MemoryContext) portalmem); if (vrl == (VRelList) NULL) vrl = cur = (VRelList) palloc(sizeof(VRelListData)); else { cur->vrl_next = (VRelList) palloc(sizeof(VRelListData)); cur = cur->vrl_next; } MemoryContextSwitchTo(old); cur->vrl_relid = tuple->t_data->t_oid; cur->vrl_next = (VRelList) NULL; } if (found == false) elog(NOTICE, "Vacuum: table not found"); heap_endscan(scan); heap_close(rel, AccessShareLock); CommitTransactionCommand(); return vrl; } /* * vc_vacone() -- vacuum one heap relation * * This routine vacuums a single heap, cleans out its indices, and * updates its statistics num_pages and num_tuples statistics. * * Doing one heap at a time incurs extra overhead, since we need to * check that the heap exists again just before we vacuum it. The * reason that we do this is so that vacuuming can be spread across * many small transactions. Otherwise, two-phase locking would require * us to lock the entire database during one pass of the vacuum cleaner. */ static void vc_vacone(Oid relid, bool analyze, List *va_cols) { HeapTuple tuple, typetuple; Relation onerel; VPageListData vacuum_pages; /* List of pages to vacuum and/or clean * indices */ VPageListData fraged_pages; /* List of pages with space enough for * re-using */ VPageDescr *vpp; Relation *Irel; int32 nindices, i; VRelStats *vacrelstats; StartTransactionCommand(); /* * Check for user-requested abort. Note we want this to be inside * a transaction, so xact.c doesn't issue useless NOTICE. */ if (QueryCancel) CancelQuery(); /* * Race condition -- if the pg_class tuple has gone away since the * last time we saw it, we don't need to vacuum it. */ tuple = SearchSysCacheTuple(RELOID, ObjectIdGetDatum(relid), 0, 0, 0); if (!HeapTupleIsValid(tuple)) { CommitTransactionCommand(); return; } /* * Open the class, get an exclusive lock on it, and check permissions. * * Note we choose to treat permissions failure as a NOTICE and keep * trying to vacuum the rest of the DB --- is this appropriate? */ onerel = heap_open(relid, AccessExclusiveLock); #ifndef NO_SECURITY if (!pg_ownercheck(GetPgUserName(), RelationGetRelationName(onerel), RELNAME)) { elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it", RelationGetRelationName(onerel)); heap_close(onerel, AccessExclusiveLock); CommitTransactionCommand(); return; } #endif /* * Set up statistics-gathering machinery. */ vacrelstats = (VRelStats *) palloc(sizeof(VRelStats)); vacrelstats->relid = relid; vacrelstats->num_pages = vacrelstats->num_tuples = 0; vacrelstats->hasindex = false; /* we can VACUUM ANALYZE any table except pg_statistic; see vc_updstats */ if (analyze && strcmp(RelationGetRelationName(onerel), StatisticRelationName) != 0) { int attr_cnt, *attnums = NULL; Form_pg_attribute *attr; attr_cnt = onerel->rd_att->natts; attr = onerel->rd_att->attrs; if (va_cols != NIL) { int tcnt = 0; List *le; if (length(va_cols) > attr_cnt) elog(ERROR, "vacuum: too many attributes specified for relation %s", RelationGetRelationName(onerel)); attnums = (int *) palloc(attr_cnt * sizeof(int)); foreach(le, va_cols) { char *col = (char *) lfirst(le); for (i = 0; i < attr_cnt; i++) { if (namestrcmp(&(attr[i]->attname), col) == 0) break; } if (i < attr_cnt) /* found */ attnums[tcnt++] = i; else { elog(ERROR, "vacuum: there is no attribute %s in %s", col, RelationGetRelationName(onerel)); } } attr_cnt = tcnt; } vacrelstats->vacattrstats = (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats)); for (i = 0; i < attr_cnt; i++) { Operator func_operator; Form_pg_operator pgopform; VacAttrStats *stats; stats = &vacrelstats->vacattrstats[i]; stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE); memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE); stats->best = stats->guess1 = stats->guess2 = 0; stats->max = stats->min = 0; stats->best_len = stats->guess1_len = stats->guess2_len = 0; stats->max_len = stats->min_len = 0; stats->initialized = false; stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0; stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0; func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true); if (func_operator != NULL) { pgopform = (Form_pg_operator) GETSTRUCT(func_operator); fmgr_info(pgopform->oprcode, &(stats->f_cmpeq)); } else stats->f_cmpeq.fn_addr = NULL; func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true); if (func_operator != NULL) { pgopform = (Form_pg_operator) GETSTRUCT(func_operator); fmgr_info(pgopform->oprcode, &(stats->f_cmplt)); stats->op_cmplt = oprid(func_operator); } else { stats->f_cmplt.fn_addr = NULL; stats->op_cmplt = InvalidOid; } func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true); if (func_operator != NULL) { pgopform = (Form_pg_operator) GETSTRUCT(func_operator); fmgr_info(pgopform->oprcode, &(stats->f_cmpgt)); } else stats->f_cmpgt.fn_addr = NULL; typetuple = SearchSysCacheTuple(TYPEOID, ObjectIdGetDatum(stats->attr->atttypid), 0, 0, 0); if (HeapTupleIsValid(typetuple)) { stats->outfunc = ((Form_pg_type) GETSTRUCT(typetuple))->typoutput; stats->typelem = ((Form_pg_type) GETSTRUCT(typetuple))->typelem; } else { stats->outfunc = InvalidOid; stats->typelem = InvalidOid; } } vacrelstats->va_natts = attr_cnt; /* delete existing pg_statistic rows for relation */ vc_delstats(relid, ((attnums) ? attr_cnt : 0), attnums); if (attnums) pfree(attnums); } else { vacrelstats->va_natts = 0; vacrelstats->vacattrstats = (VacAttrStats *) NULL; } GetXmaxRecent(&XmaxRecent); /* scan it */ vacuum_pages.vpl_num_pages = fraged_pages.vpl_num_pages = 0; vc_scanheap(vacrelstats, onerel, &vacuum_pages, &fraged_pages); /* Now open indices */ Irel = (Relation *) NULL; vc_getindices(vacrelstats->relid, &nindices, &Irel); if (nindices > 0) vacrelstats->hasindex = true; else vacrelstats->hasindex = false; /* Clean/scan index relation(s) */ if (Irel != (Relation *) NULL) { if (vacuum_pages.vpl_num_pages > 0) { for (i = 0; i < nindices; i++) vc_vaconeind(&vacuum_pages, Irel[i], vacrelstats->num_tuples, 0); } else /* just scan indices to update statistic */ { for (i = 0; i < nindices; i++) vc_scanoneind(Irel[i], vacrelstats->num_tuples); } } if (fraged_pages.vpl_num_pages > 0) /* Try to shrink heap */ vc_rpfheap(vacrelstats, onerel, &vacuum_pages, &fraged_pages, nindices, Irel); else { if (Irel != (Relation *) NULL) vc_clsindices(nindices, Irel); if (vacuum_pages.vpl_num_pages > 0) /* Clean pages from * vacuum_pages list */ vc_vacheap(vacrelstats, onerel, &vacuum_pages); } /* ok - free vacuum_pages list of reapped pages */ if (vacuum_pages.vpl_num_pages > 0) { vpp = vacuum_pages.vpl_pagedesc; for (i = 0; i < vacuum_pages.vpl_num_pages; i++, vpp++) pfree(*vpp); pfree(vacuum_pages.vpl_pagedesc); if (fraged_pages.vpl_num_pages > 0) pfree(fraged_pages.vpl_pagedesc); } /* update statistics in pg_class */ vc_updstats(vacrelstats->relid, vacrelstats->num_pages, vacrelstats->num_tuples, vacrelstats->hasindex, vacrelstats); /* all done with this class, but hold lock until commit */ heap_close(onerel, NoLock); /* next command frees attribute stats */ CommitTransactionCommand(); } /* * vc_scanheap() -- scan an open heap relation * * This routine sets commit times, constructs vacuum_pages list of * empty/uninitialized pages and pages with dead tuples and * ~LP_USED line pointers, constructs fraged_pages list of pages * appropriate for purposes of shrinking and maintains statistics * on the number of live tuples in a heap. */ static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages) { int nblocks, blkno; ItemId itemid; Buffer buf; HeapTupleData tuple; Page page, tempPage = NULL; OffsetNumber offnum, maxoff; bool pgchanged, tupgone, dobufrel, notup; char *relname; VPageDescr vpc, vp; uint32 tups_vacuumed, num_tuples, nkeep, nunused, ncrash, empty_pages, new_pages, changed_pages, empty_end_pages; Size free_size, usable_free_size; Size min_tlen = MaxTupleSize; Size max_tlen = 0; int32 i; bool do_shrinking = true; VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData)); int num_vtlinks = 0; int free_vtlinks = 100; struct rusage ru0; getrusage(RUSAGE_SELF, &ru0); relname = RelationGetRelationName(onerel); elog(MESSAGE_LEVEL, "--Relation %s--", relname); tups_vacuumed = num_tuples = nkeep = nunused = ncrash = empty_pages = new_pages = changed_pages = empty_end_pages = 0; free_size = usable_free_size = 0; nblocks = RelationGetNumberOfBlocks(onerel); vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber)); vpc->vpd_offsets_used = 0; for (blkno = 0; blkno < nblocks; blkno++) { buf = ReadBuffer(onerel, blkno); page = BufferGetPage(buf); vpc->vpd_blkno = blkno; vpc->vpd_offsets_free = 0; if (PageIsNew(page)) { elog(NOTICE, "Rel %s: Uninitialized page %u - fixing", relname, blkno); PageInit(page, BufferGetPageSize(buf), 0); vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; free_size += (vpc->vpd_free - sizeof(ItemIdData)); new_pages++; empty_end_pages++; vc_reappage(vacuum_pages, vpc); WriteBuffer(buf); continue; } if (PageIsEmpty(page)) { vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; free_size += (vpc->vpd_free - sizeof(ItemIdData)); empty_pages++; empty_end_pages++; vc_reappage(vacuum_pages, vpc); ReleaseBuffer(buf); continue; } pgchanged = false; notup = true; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); /* * Collect un-used items too - it's possible to have indices * pointing here after crash. */ if (!ItemIdIsUsed(itemid)) { vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum; nunused++; continue; } tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); tuple.t_len = ItemIdGetLength(itemid); ItemPointerSet(&(tuple.t_self), blkno, offnum); tupgone = false; if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if (tuple.t_data->t_infomask & HEAP_XMIN_INVALID) tupgone = true; else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { if (TransactionIdDidCommit((TransactionId) tuple.t_data->t_cmin)) { tuple.t_data->t_infomask |= HEAP_XMIN_INVALID; tupgone = true; } else { tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED; pgchanged = true; } } else if (tuple.t_data->t_infomask & HEAP_MOVED_IN) { if (!TransactionIdDidCommit((TransactionId) tuple.t_data->t_cmin)) { tuple.t_data->t_infomask |= HEAP_XMIN_INVALID; tupgone = true; } else { tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED; pgchanged = true; } } else { if (TransactionIdDidAbort(tuple.t_data->t_xmin)) tupgone = true; else if (TransactionIdDidCommit(tuple.t_data->t_xmin)) { tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED; pgchanged = true; } else if (!TransactionIdIsInProgress(tuple.t_data->t_xmin)) { /* * Not Aborted, Not Committed, Not in Progress - * so it's from crashed process. - vadim 11/26/96 */ ncrash++; tupgone = true; } else { elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation", relname, blkno, offnum, tuple.t_data->t_xmin); do_shrinking = false; } } } /* * here we are concerned about tuples with xmin committed and * xmax unknown or committed */ if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED && !(tuple.t_data->t_infomask & HEAP_XMAX_INVALID)) { if (tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED) { if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) { pgchanged = true; tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; } else tupgone = true; } else if (TransactionIdDidAbort(tuple.t_data->t_xmax)) { tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; pgchanged = true; } else if (TransactionIdDidCommit(tuple.t_data->t_xmax)) { if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) { tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; pgchanged = true; } else tupgone = true; } else if (!TransactionIdIsInProgress(tuple.t_data->t_xmax)) { /* * Not Aborted, Not Committed, Not in Progress - so it * from crashed process. - vadim 06/02/97 */ tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; pgchanged = true; } else { elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation", relname, blkno, offnum, tuple.t_data->t_xmax); do_shrinking = false; } /* * If tuple is recently deleted then we must not remove it * from relation. */ if (tupgone && tuple.t_data->t_xmax >= XmaxRecent) { tupgone = false; nkeep++; if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)) { tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED; pgchanged = true; } /* * If we do shrinking and this tuple is updated one * then remember it to construct updated tuple * dependencies. */ if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))) { if (free_vtlinks == 0) { free_vtlinks = 1000; vtlinks = (VTupleLink) repalloc(vtlinks, (free_vtlinks + num_vtlinks) * sizeof(VTupleLinkData)); } vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid; vtlinks[num_vtlinks].this_tid = tuple.t_self; free_vtlinks--; num_vtlinks++; } } } /* * Other checks... */ if (!OidIsValid(tuple.t_data->t_oid)) { elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.", relname, blkno, offnum, tupgone); } if (tupgone) { ItemId lpp; if (tempPage == (Page) NULL) { Size pageSize; pageSize = PageGetPageSize(page); tempPage = (Page) palloc(pageSize); memmove(tempPage, page, pageSize); } lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]); /* mark it unused */ lpp->lp_flags &= ~LP_USED; vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum; tups_vacuumed++; } else { num_tuples++; notup = false; if (tuple.t_len < min_tlen) min_tlen = tuple.t_len; if (tuple.t_len > max_tlen) max_tlen = tuple.t_len; vc_attrstats(onerel, vacrelstats, &tuple); } } if (pgchanged) { WriteBuffer(buf); dobufrel = false; changed_pages++; } else dobufrel = true; if (tempPage != (Page) NULL) { /* Some tuples are gone */ PageRepairFragmentation(tempPage); vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower; free_size += vpc->vpd_free; vc_reappage(vacuum_pages, vpc); pfree(tempPage); tempPage = (Page) NULL; } else if (vpc->vpd_offsets_free > 0) { /* there are only ~LP_USED line pointers */ vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; free_size += vpc->vpd_free; vc_reappage(vacuum_pages, vpc); } if (dobufrel) ReleaseBuffer(buf); if (notup) empty_end_pages++; else empty_end_pages = 0; } pfree(vpc); /* save stats in the rel list for use later */ vacrelstats->num_tuples = num_tuples; vacrelstats->num_pages = nblocks; /* vacrelstats->natts = attr_cnt;*/ if (num_tuples == 0) min_tlen = max_tlen = 0; vacrelstats->min_tlen = min_tlen; vacrelstats->max_tlen = max_tlen; vacuum_pages->vpl_empty_end_pages = empty_end_pages; fraged_pages->vpl_empty_end_pages = empty_end_pages; /* * Try to make fraged_pages keeping in mind that we can't use free * space of "empty" end-pages and last page if it reapped. */ if (do_shrinking && vacuum_pages->vpl_num_pages - empty_end_pages > 0) { int nusf; /* blocks usefull for re-using */ nusf = vacuum_pages->vpl_num_pages - empty_end_pages; if ((vacuum_pages->vpl_pagedesc[nusf - 1])->vpd_blkno == nblocks - empty_end_pages - 1) nusf--; for (i = 0; i < nusf; i++) { vp = vacuum_pages->vpl_pagedesc[i]; if (vc_enough_space(vp, min_tlen)) { vc_vpinsert(fraged_pages, vp); usable_free_size += vp->vpd_free; } } } if (usable_free_size > 0 && num_vtlinks > 0) { qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData), vc_cmp_vtlinks); vacrelstats->vtlinks = vtlinks; vacrelstats->num_vtlinks = num_vtlinks; } else { vacrelstats->vtlinks = NULL; vacrelstats->num_vtlinks = 0; pfree(vtlinks); } elog(MESSAGE_LEVEL, "Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \ Tup %u: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; \ Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. %s", nblocks, changed_pages, vacuum_pages->vpl_num_pages, empty_pages, new_pages, num_tuples, tups_vacuumed, nkeep, vacrelstats->num_vtlinks, ncrash, nunused, min_tlen, max_tlen, free_size, usable_free_size, empty_end_pages, fraged_pages->vpl_num_pages, vc_show_rusage(&ru0)); } /* vc_scanheap */ /* * vc_rpfheap() -- try to repair relation's fragmentation * * This routine marks dead tuples as unused and tries re-use dead space * by moving tuples (and inserting indices if needed). It constructs * Nvpl list of free-ed pages (moved tuples) and clean indices * for them after committing (in hack-manner - without losing locks * and freeing memory!) current transaction. It truncates relation * if some end-blocks are gone away. */ static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages, int nindices, Relation *Irel) { TransactionId myXID; CommandId myCID; Buffer buf, cur_buffer; int nblocks, blkno; Page page, ToPage = NULL; OffsetNumber offnum = 0, maxoff = 0, newoff, max_offset; ItemId itemid, newitemid; HeapTupleData tuple, newtup; TupleDesc tupdesc = NULL; Datum *idatum = NULL; char *inulls = NULL; InsertIndexResult iresult; VPageListData Nvpl; VPageDescr cur_page = NULL, last_vacuum_page, vpc, *vpp; int cur_item = 0; IndDesc *Idesc, *idcur; int last_move_dest_block = -1, last_vacuum_block, i = 0; Size tuple_len; int num_moved, num_fraged_pages, vacuumed_pages; int checked_moved, num_tuples, keep_tuples = 0; bool isempty, dowrite, chain_tuple_moved; struct rusage ru0; getrusage(RUSAGE_SELF, &ru0); myXID = GetCurrentTransactionId(); myCID = GetCurrentCommandId(); if (Irel != (Relation *) NULL) /* preparation for index' inserts */ { vc_mkindesc(onerel, nindices, Irel, &Idesc); tupdesc = RelationGetDescr(onerel); idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum)); inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls)); } Nvpl.vpl_num_pages = 0; num_fraged_pages = fraged_pages->vpl_num_pages; Assert(vacuum_pages->vpl_num_pages > vacuum_pages->vpl_empty_end_pages); vacuumed_pages = vacuum_pages->vpl_num_pages - vacuum_pages->vpl_empty_end_pages; last_vacuum_page = vacuum_pages->vpl_pagedesc[vacuumed_pages - 1]; last_vacuum_block = last_vacuum_page->vpd_blkno; cur_buffer = InvalidBuffer; num_moved = 0; vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber)); vpc->vpd_offsets_used = vpc->vpd_offsets_free = 0; /* * Scan pages backwards from the last nonempty page, trying to move * tuples down to lower pages. Quit when we reach a page that we * have moved any tuples onto. Note that if a page is still in the * fraged_pages list (list of candidate move-target pages) when we * reach it, we will remove it from the list. This ensures we never * move a tuple up to a higher page number. * * NB: this code depends on the vacuum_pages and fraged_pages lists * being in order, and on fraged_pages being a subset of vacuum_pages. */ nblocks = vacrelstats->num_pages; for (blkno = nblocks - vacuum_pages->vpl_empty_end_pages - 1; blkno > last_move_dest_block; blkno--) { buf = ReadBuffer(onerel, blkno); page = BufferGetPage(buf); vpc->vpd_offsets_free = 0; isempty = PageIsEmpty(page); dowrite = false; if (blkno == last_vacuum_block) /* it's reapped page */ { if (last_vacuum_page->vpd_offsets_free > 0) /* there are dead tuples */ { /* on this page - clean */ Assert(!isempty); vc_vacpage(page, last_vacuum_page); dowrite = true; } else Assert(isempty); --vacuumed_pages; if (vacuumed_pages > 0) { /* get prev reapped page from vacuum_pages */ last_vacuum_page = vacuum_pages->vpl_pagedesc[vacuumed_pages - 1]; last_vacuum_block = last_vacuum_page->vpd_blkno; } else { last_vacuum_page = NULL; last_vacuum_block = -1; } if (num_fraged_pages > 0 && blkno == fraged_pages->vpl_pagedesc[num_fraged_pages-1]->vpd_blkno) { /* page is in fraged_pages too; remove it */ --num_fraged_pages; } if (isempty) { ReleaseBuffer(buf); continue; } } else Assert(!isempty); chain_tuple_moved = false; /* no one chain-tuple was moved * off this page, yet */ vpc->vpd_blkno = blkno; maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); tuple_len = tuple.t_len = ItemIdGetLength(itemid); ItemPointerSet(&(tuple.t_self), blkno, offnum); if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin"); if (tuple.t_data->t_infomask & HEAP_MOVED_IN) elog(ERROR, "HEAP_MOVED_IN was not expected"); /* * If this (chain) tuple is moved by me already then I * have to check is it in vpc or not - i.e. is it moved * while cleaning this page or some previous one. */ if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { if (keep_tuples == 0) continue; if (chain_tuple_moved) /* some chains was moved * while */ { /* cleaning this page */ Assert(vpc->vpd_offsets_free > 0); for (i = 0; i < vpc->vpd_offsets_free; i++) { if (vpc->vpd_offsets[i] == offnum) break; } if (i >= vpc->vpd_offsets_free) /* not found */ { vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum; keep_tuples--; } } else { vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum; keep_tuples--; } continue; } elog(ERROR, "HEAP_MOVED_OFF was expected"); } /* * If this tuple is in the chain of tuples created in updates * by "recent" transactions then we have to move all chain of * tuples to another places. */ if ((tuple.t_data->t_infomask & HEAP_UPDATED && tuple.t_data->t_xmin >= XmaxRecent) || (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) && !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid))))) { Buffer Cbuf = buf; Page Cpage; ItemId Citemid; ItemPointerData Ctid; HeapTupleData tp = tuple; Size tlen = tuple_len; VTupleMove vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData)); int num_vtmove = 0; int free_vtmove = 100; VPageDescr to_vpd = NULL; int to_item = 0; bool freeCbuf = false; int ti; if (vacrelstats->vtlinks == NULL) elog(ERROR, "No one parent tuple was found"); if (cur_buffer != InvalidBuffer) { WriteBuffer(cur_buffer); cur_buffer = InvalidBuffer; } /* * If this tuple is in the begin/middle of the chain then * we have to move to the end of chain. */ while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) && !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid)))) { Ctid = tp.t_data->t_ctid; if (freeCbuf) ReleaseBuffer(Cbuf); freeCbuf = true; Cbuf = ReadBuffer(onerel, ItemPointerGetBlockNumber(&Ctid)); Cpage = BufferGetPage(Cbuf); Citemid = PageGetItemId(Cpage, ItemPointerGetOffsetNumber(&Ctid)); if (!ItemIdIsUsed(Citemid)) { /* * This means that in the middle of chain there was * tuple updated by older (than XmaxRecent) xaction * and this tuple is already deleted by me. Actually, * upper part of chain should be removed and seems * that this should be handled in vc_scanheap(), but * it's not implemented at the moment and so we * just stop shrinking here. */ ReleaseBuffer(Cbuf); pfree(vtmove); vtmove = NULL; elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue vc_rpfheap"); break; } tp.t_datamcxt = NULL; tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid); tp.t_self = Ctid; tlen = tp.t_len = ItemIdGetLength(Citemid); } if (vtmove == NULL) break; /* first, can chain be moved ? */ for (;;) { if (to_vpd == NULL || !vc_enough_space(to_vpd, tlen)) { /* if to_vpd no longer has enough free space to be * useful, remove it from fraged_pages list */ if (to_vpd != NULL && !vc_enough_space(to_vpd, vacrelstats->min_tlen)) { Assert(num_fraged_pages > to_item); memmove(fraged_pages->vpl_pagedesc + to_item, fraged_pages->vpl_pagedesc + to_item + 1, sizeof(VPageDescr) * (num_fraged_pages - to_item - 1)); num_fraged_pages--; } for (i = 0; i < num_fraged_pages; i++) { if (vc_enough_space(fraged_pages->vpl_pagedesc[i], tlen)) break; } /* can't move item anywhere */ if (i == num_fraged_pages) { for (i = 0; i < num_vtmove; i++) { Assert(vtmove[i].vpd->vpd_offsets_used > 0); (vtmove[i].vpd->vpd_offsets_used)--; } num_vtmove = 0; break; } to_item = i; to_vpd = fraged_pages->vpl_pagedesc[to_item]; } to_vpd->vpd_free -= MAXALIGN(tlen); if (to_vpd->vpd_offsets_used >= to_vpd->vpd_offsets_free) to_vpd->vpd_free -= MAXALIGN(sizeof(ItemIdData)); (to_vpd->vpd_offsets_used)++; if (free_vtmove == 0) { free_vtmove = 1000; vtmove = (VTupleMove) repalloc(vtmove, (free_vtmove + num_vtmove) * sizeof(VTupleMoveData)); } vtmove[num_vtmove].tid = tp.t_self; vtmove[num_vtmove].vpd = to_vpd; if (to_vpd->vpd_offsets_used == 1) vtmove[num_vtmove].cleanVpd = true; else vtmove[num_vtmove].cleanVpd = false; free_vtmove--; num_vtmove++; /* All done ? */ if (!(tp.t_data->t_infomask & HEAP_UPDATED) || tp.t_data->t_xmin < XmaxRecent) break; /* Well, try to find tuple with old row version */ for (;;) { Buffer Pbuf; Page Ppage; ItemId Pitemid; HeapTupleData Ptp; VTupleLinkData vtld, *vtlp; vtld.new_tid = tp.t_self; vtlp = (VTupleLink) vc_find_eq((void *) (vacrelstats->vtlinks), vacrelstats->num_vtlinks, sizeof(VTupleLinkData), (void *) &vtld, vc_cmp_vtlinks); if (vtlp == NULL) elog(ERROR, "Parent tuple was not found"); tp.t_self = vtlp->this_tid; Pbuf = ReadBuffer(onerel, ItemPointerGetBlockNumber(&(tp.t_self))); Ppage = BufferGetPage(Pbuf); Pitemid = PageGetItemId(Ppage, ItemPointerGetOffsetNumber(&(tp.t_self))); if (!ItemIdIsUsed(Pitemid)) elog(ERROR, "Parent itemid marked as unused"); Ptp.t_datamcxt = NULL; Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid); Assert(ItemPointerEquals(&(vtld.new_tid), &(Ptp.t_data->t_ctid))); /* * Read above about cases when !ItemIdIsUsed(Citemid) * (child item is removed)... Due to the fact that * at the moment we don't remove unuseful part of * update-chain, it's possible to get too old * parent row here. Like as in the case which * caused this problem, we stop shrinking here. * I could try to find real parent row but want * not to do it because of real solution will * be implemented anyway, latter, and we are too * close to 6.5 release. - vadim 06/11/99 */ if (Ptp.t_data->t_xmax != tp.t_data->t_xmin) { if (freeCbuf) ReleaseBuffer(Cbuf); freeCbuf = false; ReleaseBuffer(Pbuf); for (i = 0; i < num_vtmove; i++) { Assert(vtmove[i].vpd->vpd_offsets_used > 0); (vtmove[i].vpd->vpd_offsets_used)--; } num_vtmove = 0; elog(NOTICE, "Too old parent tuple found - can't continue vc_rpfheap"); break; } #ifdef NOT_USED /* I'm not sure that this will wotk properly... */ /* * If this tuple is updated version of row and it * was created by the same transaction then no one * is interested in this tuple - mark it as * removed. */ if (Ptp.t_data->t_infomask & HEAP_UPDATED && Ptp.t_data->t_xmin == Ptp.t_data->t_xmax) { TransactionIdStore(myXID, (TransactionId *) &(Ptp.t_data->t_cmin)); Ptp.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); Ptp.t_data->t_infomask |= HEAP_MOVED_OFF; WriteBuffer(Pbuf); continue; } #endif tp.t_datamcxt = Ptp.t_datamcxt; tp.t_data = Ptp.t_data; tlen = tp.t_len = ItemIdGetLength(Pitemid); if (freeCbuf) ReleaseBuffer(Cbuf); Cbuf = Pbuf; freeCbuf = true; break; } if (num_vtmove == 0) break; } if (freeCbuf) ReleaseBuffer(Cbuf); if (num_vtmove == 0) /* chain can't be moved */ { pfree(vtmove); break; } ItemPointerSetInvalid(&Ctid); for (ti = 0; ti < num_vtmove; ti++) { /* Get tuple from chain */ tuple.t_self = vtmove[ti].tid; Cbuf = ReadBuffer(onerel, ItemPointerGetBlockNumber(&(tuple.t_self))); Cpage = BufferGetPage(Cbuf); Citemid = PageGetItemId(Cpage, ItemPointerGetOffsetNumber(&(tuple.t_self))); tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid); tuple_len = tuple.t_len = ItemIdGetLength(Citemid); /* Get page to move in */ cur_buffer = ReadBuffer(onerel, vtmove[ti].vpd->vpd_blkno); /* * We should LockBuffer(cur_buffer) but don't, at the * moment. If you'll do LockBuffer then UNLOCK it * before index_insert: unique btree-s call heap_fetch * to get t_infomask of inserted heap tuple !!! */ ToPage = BufferGetPage(cur_buffer); /* if this page was not used before - clean it */ if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd) vc_vacpage(ToPage, vtmove[ti].vpd); heap_copytuple_with_tuple(&tuple, &newtup); RelationInvalidateHeapTuple(onerel, &tuple); TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin)); newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF); newtup.t_data->t_infomask |= HEAP_MOVED_IN; newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len, InvalidOffsetNumber, LP_USED); if (newoff == InvalidOffsetNumber) { elog(ERROR, "\ moving chain: failed to add item with len = %u to page %u", tuple_len, vtmove[ti].vpd->vpd_blkno); } newitemid = PageGetItemId(ToPage, newoff); pfree(newtup.t_data); newtup.t_datamcxt = NULL; newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid); ItemPointerSet(&(newtup.t_self), vtmove[ti].vpd->vpd_blkno, newoff); if (((int) vtmove[ti].vpd->vpd_blkno) > last_move_dest_block) last_move_dest_block = vtmove[ti].vpd->vpd_blkno; /* * Set t_ctid pointing to itself for last tuple in * chain and to next tuple in chain otherwise. */ if (!ItemPointerIsValid(&Ctid)) newtup.t_data->t_ctid = newtup.t_self; else newtup.t_data->t_ctid = Ctid; Ctid = newtup.t_self; TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin)); tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); tuple.t_data->t_infomask |= HEAP_MOVED_OFF; num_moved++; /* * Remember that we moved tuple from the current page * (corresponding index tuple will be cleaned). */ if (Cbuf == buf) vpc->vpd_offsets[vpc->vpd_offsets_free++] = ItemPointerGetOffsetNumber(&(tuple.t_self)); else keep_tuples++; if (Irel != (Relation *) NULL) { for (i = 0, idcur = Idesc; i < nindices; i++, idcur++) { FormIndexDatum(idcur->natts, (AttrNumber *) &(idcur->tform->indkey[0]), &newtup, tupdesc, idatum, inulls, idcur->finfoP); iresult = index_insert(Irel[i], idatum, inulls, &newtup.t_self, onerel); if (iresult) pfree(iresult); } } WriteBuffer(cur_buffer); if (Cbuf == buf) ReleaseBuffer(Cbuf); else WriteBuffer(Cbuf); } cur_buffer = InvalidBuffer; pfree(vtmove); chain_tuple_moved = true; continue; } /* try to find new page for this tuple */ if (cur_buffer == InvalidBuffer || !vc_enough_space(cur_page, tuple_len)) { if (cur_buffer != InvalidBuffer) { WriteBuffer(cur_buffer); cur_buffer = InvalidBuffer; /* * If previous target page is now too full to add * *any* tuple to it, remove it from fraged_pages. */ if (!vc_enough_space(cur_page, vacrelstats->min_tlen)) { Assert(num_fraged_pages > cur_item); memmove(fraged_pages->vpl_pagedesc + cur_item, fraged_pages->vpl_pagedesc + cur_item + 1, sizeof(VPageDescr) * (num_fraged_pages - cur_item - 1)); num_fraged_pages--; } } for (i = 0; i < num_fraged_pages; i++) { if (vc_enough_space(fraged_pages->vpl_pagedesc[i], tuple_len)) break; } if (i == num_fraged_pages) break; /* can't move item anywhere */ cur_item = i; cur_page = fraged_pages->vpl_pagedesc[cur_item]; cur_buffer = ReadBuffer(onerel, cur_page->vpd_blkno); ToPage = BufferGetPage(cur_buffer); /* if this page was not used before - clean it */ if (!PageIsEmpty(ToPage) && cur_page->vpd_offsets_used == 0) vc_vacpage(ToPage, cur_page); } /* copy tuple */ heap_copytuple_with_tuple(&tuple, &newtup); RelationInvalidateHeapTuple(onerel, &tuple); /* * Mark new tuple as moved_in by vacuum and store vacuum XID * in t_cmin !!! */ TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin)); newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF); newtup.t_data->t_infomask |= HEAP_MOVED_IN; /* add tuple to the page */ newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len, InvalidOffsetNumber, LP_USED); if (newoff == InvalidOffsetNumber) { elog(ERROR, "\ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)", tuple_len, cur_page->vpd_blkno, cur_page->vpd_free, cur_page->vpd_offsets_used, cur_page->vpd_offsets_free); } newitemid = PageGetItemId(ToPage, newoff); pfree(newtup.t_data); newtup.t_datamcxt = NULL; newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid); ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->vpd_blkno, newoff); newtup.t_self = newtup.t_data->t_ctid; /* * Mark old tuple as moved_off by vacuum and store vacuum XID * in t_cmin !!! */ TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin)); tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN); tuple.t_data->t_infomask |= HEAP_MOVED_OFF; cur_page->vpd_offsets_used++; num_moved++; cur_page->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower; if (((int) cur_page->vpd_blkno) > last_move_dest_block) last_move_dest_block = cur_page->vpd_blkno; vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum; /* insert index' tuples if needed */ if (Irel != (Relation *) NULL) { for (i = 0, idcur = Idesc; i < nindices; i++, idcur++) { FormIndexDatum(idcur->natts, (AttrNumber *) &(idcur->tform->indkey[0]), &newtup, tupdesc, idatum, inulls, idcur->finfoP); iresult = index_insert(Irel[i], idatum, inulls, &newtup.t_self, onerel); if (iresult) pfree(iresult); } } } /* walk along page */ if (offnum < maxoff && keep_tuples > 0) { OffsetNumber off; for (off = OffsetNumberNext(offnum); off <= maxoff; off = OffsetNumberNext(off)) { itemid = PageGetItemId(page, off); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED) continue; if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin (4)"); if (tuple.t_data->t_infomask & HEAP_MOVED_IN) elog(ERROR, "HEAP_MOVED_IN was not expected (2)"); if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { /* some chains was moved while */ if (chain_tuple_moved) { /* cleaning this page */ Assert(vpc->vpd_offsets_free > 0); for (i = 0; i < vpc->vpd_offsets_free; i++) { if (vpc->vpd_offsets[i] == off) break; } if (i >= vpc->vpd_offsets_free) /* not found */ { vpc->vpd_offsets[vpc->vpd_offsets_free++] = off; Assert(keep_tuples > 0); keep_tuples--; } } else { vpc->vpd_offsets[vpc->vpd_offsets_free++] = off; Assert(keep_tuples > 0); keep_tuples--; } } } } if (vpc->vpd_offsets_free > 0) /* some tuples were moved */ { if (chain_tuple_moved) /* else - they are ordered */ { qsort((char *) (vpc->vpd_offsets), vpc->vpd_offsets_free, sizeof(OffsetNumber), vc_cmp_offno); } vc_reappage(&Nvpl, vpc); WriteBuffer(buf); } else if (dowrite) WriteBuffer(buf); else ReleaseBuffer(buf); if (offnum <= maxoff) break; /* some item(s) left */ } /* walk along relation */ blkno++; /* new number of blocks */ if (cur_buffer != InvalidBuffer) { Assert(num_moved > 0); WriteBuffer(cur_buffer); } if (num_moved > 0) { /* * We have to commit our tuple' movings before we'll truncate * relation, but we shouldn't lose our locks. And so - quick hack: * flush buffers and record status of current transaction as * committed, and continue. - vadim 11/13/96 */ FlushBufferPool(); TransactionIdCommit(myXID); FlushBufferPool(); } /* * Clean uncleaned reapped pages from vacuum_pages list list and set * xmin committed for inserted tuples */ checked_moved = 0; for (i = 0, vpp = vacuum_pages->vpl_pagedesc; i < vacuumed_pages; i++, vpp++) { Assert((*vpp)->vpd_blkno < blkno); buf = ReadBuffer(onerel, (*vpp)->vpd_blkno); page = BufferGetPage(buf); if ((*vpp)->vpd_offsets_used == 0) /* this page was not used */ { if (!PageIsEmpty(page)) vc_vacpage(page, *vpp); } else /* this page was used */ { num_tuples = 0; max_offset = PageGetMaxOffsetNumber(page); for (newoff = FirstOffsetNumber; newoff <= max_offset; newoff = OffsetNumberNext(newoff)) { itemid = PageGetItemId(page, newoff); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin (2)"); if (tuple.t_data->t_infomask & HEAP_MOVED_IN) { tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED; num_tuples++; } else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) tuple.t_data->t_infomask |= HEAP_XMIN_INVALID; else elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected"); } } Assert((*vpp)->vpd_offsets_used == num_tuples); checked_moved += num_tuples; } WriteBuffer(buf); } Assert(num_moved == checked_moved); elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s", RelationGetRelationName(onerel), nblocks, blkno, num_moved, vc_show_rusage(&ru0)); if (Nvpl.vpl_num_pages > 0) { /* vacuum indices again if needed */ if (Irel != (Relation *) NULL) { VPageDescr *vpleft, *vpright, vpsave; /* re-sort Nvpl.vpl_pagedesc */ for (vpleft = Nvpl.vpl_pagedesc, vpright = Nvpl.vpl_pagedesc + Nvpl.vpl_num_pages - 1; vpleft < vpright; vpleft++, vpright--) { vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave; } Assert(keep_tuples >= 0); for (i = 0; i < nindices; i++) vc_vaconeind(&Nvpl, Irel[i], vacrelstats->num_tuples, keep_tuples); } /* clean moved tuples from last page in Nvpl list */ if (vpc->vpd_blkno == blkno - 1 && vpc->vpd_offsets_free > 0) { buf = ReadBuffer(onerel, vpc->vpd_blkno); page = BufferGetPage(buf); num_tuples = 0; for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itemid = PageGetItemId(page, offnum); if (!ItemIdIsUsed(itemid)) continue; tuple.t_datamcxt = NULL; tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) { if ((TransactionId) tuple.t_data->t_cmin != myXID) elog(ERROR, "Invalid XID in t_cmin (3)"); if (tuple.t_data->t_infomask & HEAP_MOVED_OFF) { itemid->lp_flags &= ~LP_USED; num_tuples++; } else elog(ERROR, "HEAP_MOVED_OFF was expected (2)"); } } Assert(vpc->vpd_offsets_free == num_tuples); PageRepairFragmentation(page); WriteBuffer(buf); } /* now - free new list of reapped pages */ vpp = Nvpl.vpl_pagedesc; for (i = 0; i < Nvpl.vpl_num_pages; i++, vpp++) pfree(*vpp); pfree(Nvpl.vpl_pagedesc); } /* truncate relation */ if (blkno < nblocks) { i = FlushRelationBuffers(onerel, blkno, false); if (i < 0) elog(FATAL, "VACUUM (vc_rpfheap): FlushRelationBuffers returned %d", i); blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno); Assert(blkno >= 0); vacrelstats->num_pages = blkno; /* set new number of blocks */ } if (Irel != (Relation *) NULL) /* pfree index' allocations */ { pfree(Idesc); pfree(idatum); pfree(inulls); vc_clsindices(nindices, Irel); } pfree(vpc); if (vacrelstats->vtlinks != NULL) pfree(vacrelstats->vtlinks); } /* vc_rpfheap */ /* * vc_vacheap() -- free dead tuples * * This routine marks dead tuples as unused and truncates relation * if there are "empty" end-blocks. */ static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages) { Buffer buf; Page page; VPageDescr *vpp; int nblocks; int i; nblocks = vacuum_pages->vpl_num_pages; nblocks -= vacuum_pages->vpl_empty_end_pages; /* nothing to do with * them */ for (i = 0, vpp = vacuum_pages->vpl_pagedesc; i < nblocks; i++, vpp++) { if ((*vpp)->vpd_offsets_free > 0) { buf = ReadBuffer(onerel, (*vpp)->vpd_blkno); page = BufferGetPage(buf); vc_vacpage(page, *vpp); WriteBuffer(buf); } } /* truncate relation if there are some empty end-pages */ if (vacuum_pages->vpl_empty_end_pages > 0) { Assert(vacrelstats->num_pages >= vacuum_pages->vpl_empty_end_pages); nblocks = vacrelstats->num_pages - vacuum_pages->vpl_empty_end_pages; elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.", RelationGetRelationName(onerel), vacrelstats->num_pages, nblocks); /* * We have to flush "empty" end-pages (if changed, but who knows it) * before truncation */ FlushBufferPool(); i = FlushRelationBuffers(onerel, nblocks, false); if (i < 0) elog(FATAL, "VACUUM (vc_vacheap): FlushRelationBuffers returned %d", i); nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks); Assert(nblocks >= 0); vacrelstats->num_pages = nblocks; /* set new number of * blocks */ } } /* vc_vacheap */ /* * vc_vacpage() -- free dead tuples on a page * and repair its fragmentation. */ static void vc_vacpage(Page page, VPageDescr vpd) { ItemId itemid; int i; /* There shouldn't be any tuples moved onto the page yet! */ Assert(vpd->vpd_offsets_used == 0); for (i = 0; i < vpd->vpd_offsets_free; i++) { itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_offsets[i] - 1]); itemid->lp_flags &= ~LP_USED; } PageRepairFragmentation(page); } /* vc_vacpage */ /* * _vc_scanoneind() -- scan one index relation to update statistic. * */ static void vc_scanoneind(Relation indrel, int num_tuples) { RetrieveIndexResult res; IndexScanDesc iscan; int nitups; int nipages; struct rusage ru0; getrusage(RUSAGE_SELF, &ru0); /* walk through the entire index */ iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL); nitups = 0; while ((res = index_getnext(iscan, ForwardScanDirection)) != (RetrieveIndexResult) NULL) { nitups++; pfree(res); } index_endscan(iscan); /* now update statistics in pg_class */ nipages = RelationGetNumberOfBlocks(indrel); vc_updstats(RelationGetRelid(indrel), nipages, nitups, false, NULL); elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u. %s", RelationGetRelationName(indrel), nipages, nitups, vc_show_rusage(&ru0)); if (nitups != num_tuples) elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u).\ \n\tRecreate the index.", RelationGetRelationName(indrel), nitups, num_tuples); } /* vc_scanoneind */ /* * vc_vaconeind() -- vacuum one index relation. * * Vpl is the VPageList of the heap we're currently vacuuming. * It's locked. Indrel is an index relation on the vacuumed heap. * We don't set locks on the index relation here, since the indexed * access methods support locking at different granularities. * We let them handle it. * * Finally, we arrange to update the index relation's statistics in * pg_class. */ static void vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples, int keep_tuples) { RetrieveIndexResult res; IndexScanDesc iscan; ItemPointer heapptr; int tups_vacuumed; int num_index_tuples; int num_pages; VPageDescr vp; struct rusage ru0; getrusage(RUSAGE_SELF, &ru0); /* walk through the entire index */ iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL); tups_vacuumed = 0; num_index_tuples = 0; while ((res = index_getnext(iscan, ForwardScanDirection)) != (RetrieveIndexResult) NULL) { heapptr = &res->heap_iptr; if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL) { #ifdef NOT_USED elog(DEBUG, "<%x,%x> -> <%x,%x>", ItemPointerGetBlockNumber(&(res->index_iptr)), ItemPointerGetOffsetNumber(&(res->index_iptr)), ItemPointerGetBlockNumber(&(res->heap_iptr)), ItemPointerGetOffsetNumber(&(res->heap_iptr))); #endif if (vp->vpd_offsets_free == 0) { /* this is EmptyPage !!! */ elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing", RelationGetRelationName(indrel), vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr)); } ++tups_vacuumed; index_delete(indrel, &res->index_iptr); } else num_index_tuples++; pfree(res); } index_endscan(iscan); /* now update statistics in pg_class */ num_pages = RelationGetNumberOfBlocks(indrel); vc_updstats(RelationGetRelid(indrel), num_pages, num_index_tuples, false, NULL); elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u: Deleted %u. %s", RelationGetRelationName(indrel), num_pages, num_index_tuples - keep_tuples, tups_vacuumed, vc_show_rusage(&ru0)); if (num_index_tuples != num_tuples + keep_tuples) elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u).\ \n\tRecreate the index.", RelationGetRelationName(indrel), num_index_tuples, num_tuples); } /* vc_vaconeind */ /* * vc_tidreapped() -- is a particular tid reapped? * * vpl->VPageDescr_array is sorted in right order. */ static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl) { OffsetNumber ioffno; OffsetNumber *voff; VPageDescr vp, *vpp; VPageDescrData vpd; vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr); ioffno = ItemPointerGetOffsetNumber(itemptr); vp = &vpd; vpp = (VPageDescr *) vc_find_eq((void *) (vpl->vpl_pagedesc), vpl->vpl_num_pages, sizeof(VPageDescr), (void *) &vp, vc_cmp_blk); if (vpp == (VPageDescr *) NULL) return (VPageDescr) NULL; vp = *vpp; /* ok - we are on true page */ if (vp->vpd_offsets_free == 0) { /* this is EmptyPage !!! */ return vp; } voff = (OffsetNumber *) vc_find_eq((void *) (vp->vpd_offsets), vp->vpd_offsets_free, sizeof(OffsetNumber), (void *) &ioffno, vc_cmp_offno); if (voff == (OffsetNumber *) NULL) return (VPageDescr) NULL; return vp; } /* vc_tidreapped */ /* * vc_attrstats() -- compute column statistics used by the optimzer * * We compute the column min, max, null and non-null counts. * Plus we attempt to find the count of the value that occurs most * frequently in each column. These figures are used to compute * the selectivity of the column. * * We use a three-bucked cache to get the most frequent item. * The 'guess' buckets count hits. A cache miss causes guess1 * to get the most hit 'guess' item in the most recent cycle, and * the new item goes into guess2. Whenever the total count of hits * of a 'guess' entry is larger than 'best', 'guess' becomes 'best'. * * This method works perfectly for columns with unique values, and columns * with only two unique values, plus nulls. * * It becomes less perfect as the number of unique values increases and * their distribution in the table becomes more random. * */ static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple tuple) { int i, attr_cnt = vacrelstats->va_natts; VacAttrStats *vacattrstats = vacrelstats->vacattrstats; TupleDesc tupDesc = onerel->rd_att; Datum value; bool isnull; for (i = 0; i < attr_cnt; i++) { VacAttrStats *stats = &vacattrstats[i]; bool value_hit = true; value = heap_getattr(tuple, stats->attr->attnum, tupDesc, &isnull); if (!VacAttrStatsEqValid(stats)) continue; if (isnull) stats->null_cnt++; else { stats->nonnull_cnt++; if (stats->initialized == false) { vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len); /* best_cnt gets incremented later */ vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len); stats->guess1_cnt = stats->guess1_hits = 1; vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len); stats->guess2_hits = 1; if (VacAttrStatsLtGtValid(stats)) { vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len); vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len); } stats->initialized = true; } if (VacAttrStatsLtGtValid(stats)) { if ((*fmgr_faddr(&stats->f_cmplt)) (value, stats->min)) { vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len); stats->min_cnt = 0; } if ((*fmgr_faddr(&stats->f_cmpgt)) (value, stats->max)) { vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len); stats->max_cnt = 0; } if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->min)) stats->min_cnt++; else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->max)) stats->max_cnt++; } if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->best)) stats->best_cnt++; else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess1)) { stats->guess1_cnt++; stats->guess1_hits++; } else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess2)) stats->guess2_hits++; else value_hit = false; if (stats->guess2_hits > stats->guess1_hits) { swapDatum(stats->guess1, stats->guess2); swapInt(stats->guess1_len, stats->guess2_len); swapLong(stats->guess1_hits, stats->guess2_hits); stats->guess1_cnt = stats->guess1_hits; } if (stats->guess1_cnt > stats->best_cnt) { swapDatum(stats->best, stats->guess1); swapInt(stats->best_len, stats->guess1_len); swapLong(stats->best_cnt, stats->guess1_cnt); stats->guess1_hits = 1; stats->guess2_hits = 1; } if (!value_hit) { vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len); stats->guess1_hits = 1; stats->guess2_hits = 1; } } } return; } /* * vc_bucketcpy() -- update pg_class statistics for one relation * */ static void vc_bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int *bucket_len) { if (attr->attbyval && attr->attlen != -1) *bucket = value; else { int len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value)); if (len > *bucket_len) { if (*bucket_len != 0) pfree(DatumGetPointer(*bucket)); *bucket = PointerGetDatum(palloc(len)); *bucket_len = len; } memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len); } } /* * vc_updstats() -- update statistics for one relation * * Statistics are stored in several places: the pg_class row for the * relation has stats about the whole relation, the pg_attribute rows * for each attribute store "disbursion", and there is a pg_statistic * row for each (non-system) attribute. (Disbursion probably ought to * be moved to pg_statistic, but it's not worth doing unless there's * another reason to have to change pg_attribute.) Disbursion and * pg_statistic values are only updated by VACUUM ANALYZE, but we * always update the stats in pg_class. * * This routine works for both index and heap relation entries in * pg_class. We violate no-overwrite semantics here by storing new * values for the statistics columns directly into the pg_class * tuple that's already on the page. The reason for this is that if * we updated these tuples in the usual way, vacuuming pg_class itself * wouldn't work very well --- by the time we got done with a vacuum * cycle, most of the tuples in pg_class would've been obsoleted. * Updating pg_class's own statistics would be especially tricky. * Of course, this only works for fixed-size never-null columns, but * these are. * * Updates of pg_attribute statistics are handled in the same way * for the same reasons. * * To keep things simple, we punt for pg_statistic, and don't try * to compute or store rows for pg_statistic itself in pg_statistic. * This could possibly be made to work, but it's not worth the trouble. */ static void vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats) { Relation rd, ad, sd; HeapScanDesc scan; HeapTupleData rtup; HeapTuple ctup, atup, stup; Form_pg_class pgcform; ScanKeyData askey; Form_pg_attribute attp; Buffer buffer; /* * update number of tuples and number of pages in pg_class */ rd = heap_openr(RelationRelationName, RowExclusiveLock); ctup = SearchSysCacheTupleCopy(RELOID, ObjectIdGetDatum(relid), 0, 0, 0); if (!HeapTupleIsValid(ctup)) elog(ERROR, "pg_class entry for relid %u vanished during vacuuming", relid); /* get the buffer cache tuple */ rtup.t_self = ctup->t_self; heap_fetch(rd, SnapshotNow, &rtup, &buffer); heap_freetuple(ctup); /* overwrite the existing statistics in the tuple */ pgcform = (Form_pg_class) GETSTRUCT(&rtup); pgcform->reltuples = num_tuples; pgcform->relpages = num_pages; pgcform->relhasindex = hasindex; /* invalidate the tuple in the cache and write the buffer */ RelationInvalidateHeapTuple(rd, &rtup); WriteBuffer(buffer); heap_close(rd, RowExclusiveLock); if (vacrelstats != NULL && vacrelstats->va_natts > 0) { VacAttrStats *vacattrstats = vacrelstats->vacattrstats; int natts = vacrelstats->va_natts; ad = heap_openr(AttributeRelationName, RowExclusiveLock); sd = heap_openr(StatisticRelationName, RowExclusiveLock); /* Find pg_attribute rows for this relation */ ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid, F_INT4EQ, relid); scan = heap_beginscan(ad, false, SnapshotNow, 1, &askey); while (HeapTupleIsValid(atup = heap_getnext(scan, 0))) { int i; VacAttrStats *stats; attp = (Form_pg_attribute) GETSTRUCT(atup); if (attp->attnum <= 0) /* skip system attributes for now */ continue; for (i = 0; i < natts; i++) { if (attp->attnum == vacattrstats[i].attr->attnum) break; } if (i >= natts) continue; /* skip attr if no stats collected */ stats = &(vacattrstats[i]); if (VacAttrStatsEqValid(stats)) { float32data selratio; /* average ratio of rows selected * for a random constant */ /* Compute disbursion */ if (stats->nonnull_cnt == 0 && stats->null_cnt == 0) { /* empty relation, so put a dummy value in attdisbursion */ selratio = 0; } else if (stats->null_cnt <= 1 && stats->best_cnt == 1) { /* looks like we have a unique-key attribute --- * flag this with special -1.0 flag value. * * The correct disbursion is 1.0/numberOfRows, but since * the relation row count can get updated without * recomputing disbursion, we want to store a "symbolic" * value and figure 1.0/numberOfRows on the fly. */ selratio = -1; } else { if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt) { /* exact result when there are just 1 or 2 values... */ double min_cnt_d = stats->min_cnt, max_cnt_d = stats->max_cnt, null_cnt_d = stats->null_cnt; double total = ((double) stats->nonnull_cnt) + null_cnt_d; selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) / (total * total); } else { double most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt); double total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt); /* * we assume count of other values are 20% of best * count in table */ selratio = (most * most + 0.20 * most * (total - most)) / (total * total); } /* Make sure calculated values are in-range */ if (selratio < 0.0) selratio = 0.0; else if (selratio > 1.0) selratio = 1.0; } /* overwrite the existing statistics in the tuple */ attp->attdisbursion = selratio; /* invalidate the tuple in the cache and write the buffer */ RelationInvalidateHeapTuple(ad, atup); WriteNoReleaseBuffer(scan->rs_cbuf); /* * Create pg_statistic tuples for the relation, if we have * gathered the right data. vc_delstats() previously * deleted all the pg_statistic tuples for the rel, so we * just have to insert new ones here. * * Note vc_vacone() has seen to it that we won't come here * when vacuuming pg_statistic itself. */ if (VacAttrStatsLtGtValid(stats) && stats->initialized) { float32data nullratio; float32data bestratio; FmgrInfo out_function; char *out_string; double best_cnt_d = stats->best_cnt, null_cnt_d = stats->null_cnt, nonnull_cnt_d = stats->nonnull_cnt; /* prevent overflow */ Datum values[Natts_pg_statistic]; char nulls[Natts_pg_statistic]; nullratio = null_cnt_d / (nonnull_cnt_d + null_cnt_d); bestratio = best_cnt_d / (nonnull_cnt_d + null_cnt_d); fmgr_info(stats->outfunc, &out_function); for (i = 0; i < Natts_pg_statistic; ++i) nulls[i] = ' '; /* ---------------- * initialize values[] * ---------------- */ i = 0; values[i++] = (Datum) relid; /* starelid */ values[i++] = (Datum) attp->attnum; /* staattnum */ values[i++] = (Datum) stats->op_cmplt; /* staop */ /* hack: this code knows float4 is pass-by-ref */ values[i++] = PointerGetDatum(&nullratio); /* stanullfrac */ values[i++] = PointerGetDatum(&bestratio); /* stacommonfrac */ out_string = (*fmgr_faddr(&out_function)) (stats->best, stats->typelem, stats->attr->atttypmod); values[i++] = PointerGetDatum(textin(out_string)); /* stacommonval */ pfree(out_string); out_string = (*fmgr_faddr(&out_function)) (stats->min, stats->typelem, stats->attr->atttypmod); values[i++] = PointerGetDatum(textin(out_string)); /* staloval */ pfree(out_string); out_string = (char *) (*fmgr_faddr(&out_function)) (stats->max, stats->typelem, stats->attr->atttypmod); values[i++] = PointerGetDatum(textin(out_string)); /* stahival */ pfree(out_string); stup = heap_formtuple(sd->rd_att, values, nulls); /* ---------------- * Watch out for oversize tuple, which can happen if * all three of the saved data values are long. * Our fallback strategy is just to not store the * pg_statistic tuple at all in that case. (We could * replace the values by NULLs and still store the * numeric stats, but presently selfuncs.c couldn't * do anything useful with that case anyway.) * * We could reduce the probability of overflow, but not * prevent it, by storing the data values as compressed * text; is that worth doing? The problem should go * away whenever long tuples get implemented... * ---------------- */ if (MAXALIGN(stup->t_len) <= MaxTupleSize) { /* OK, store tuple and update indexes too */ Relation irelations[Num_pg_statistic_indices]; heap_insert(sd, stup); CatalogOpenIndices(Num_pg_statistic_indices, Name_pg_statistic_indices, irelations); CatalogIndexInsert(irelations, Num_pg_statistic_indices, sd, stup); CatalogCloseIndices(Num_pg_statistic_indices, irelations); } /* release allocated space */ pfree(DatumGetPointer(values[Anum_pg_statistic_stacommonval-1])); pfree(DatumGetPointer(values[Anum_pg_statistic_staloval-1])); pfree(DatumGetPointer(values[Anum_pg_statistic_stahival-1])); heap_freetuple(stup); } } } heap_endscan(scan); /* close rels, but hold locks till upcoming commit */ heap_close(ad, NoLock); heap_close(sd, NoLock); } } /* * vc_delstats() -- delete pg_statistic rows for a relation * * If a list of attribute numbers is given, only zap stats for those attrs. */ static void vc_delstats(Oid relid, int attcnt, int *attnums) { Relation pgstatistic; HeapScanDesc scan; HeapTuple tuple; ScanKeyData key; pgstatistic = heap_openr(StatisticRelationName, RowExclusiveLock); ScanKeyEntryInitialize(&key, 0x0, Anum_pg_statistic_starelid, F_OIDEQ, ObjectIdGetDatum(relid)); scan = heap_beginscan(pgstatistic, false, SnapshotNow, 1, &key); while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) { if (attcnt > 0) { Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(tuple); int i; for (i = 0; i < attcnt; i++) { if (pgs->staattnum == attnums[i] + 1) break; } if (i >= attcnt) continue; /* don't delete it */ } heap_delete(pgstatistic, &tuple->t_self, NULL); } heap_endscan(scan); /* * Close rel, but *keep* lock; we will need to reacquire it later, * so there's a possibility of deadlock against another VACUUM process * if we let go now. Keeping the lock shouldn't delay any common * operation other than an attempted VACUUM of pg_statistic itself. */ heap_close(pgstatistic, NoLock); } /* * vc_reappage() -- save a page on the array of reapped pages. * * As a side effect of the way that the vacuuming loop for a given * relation works, higher pages come after lower pages in the array * (and highest tid on a page is last). */ static void vc_reappage(VPageList vpl, VPageDescr vpc) { VPageDescr newvpd; /* allocate a VPageDescrData entry */ newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_offsets_free * sizeof(OffsetNumber)); /* fill it in */ if (vpc->vpd_offsets_free > 0) memmove(newvpd->vpd_offsets, vpc->vpd_offsets, vpc->vpd_offsets_free * sizeof(OffsetNumber)); newvpd->vpd_blkno = vpc->vpd_blkno; newvpd->vpd_free = vpc->vpd_free; newvpd->vpd_offsets_used = vpc->vpd_offsets_used; newvpd->vpd_offsets_free = vpc->vpd_offsets_free; /* insert this page into vpl list */ vc_vpinsert(vpl, newvpd); } /* vc_reappage */ static void vc_vpinsert(VPageList vpl, VPageDescr vpnew) { #define PG_NPAGEDESC 1024 /* allocate a VPageDescr entry if needed */ if (vpl->vpl_num_pages == 0) { vpl->vpl_pagedesc = (VPageDescr *) palloc(PG_NPAGEDESC * sizeof(VPageDescr)); vpl->vpl_num_allocated_pages = PG_NPAGEDESC; } else if (vpl->vpl_num_pages >= vpl->vpl_num_allocated_pages) { vpl->vpl_num_allocated_pages *= 2; vpl->vpl_pagedesc = (VPageDescr *) repalloc(vpl->vpl_pagedesc, vpl->vpl_num_allocated_pages * sizeof(VPageDescr)); } vpl->vpl_pagedesc[vpl->vpl_num_pages] = vpnew; (vpl->vpl_num_pages)++; } static void * vc_find_eq(void *bot, int nelem, int size, void *elm, int (*compar) (const void *, const void *)) { int res; int last = nelem - 1; int celm = nelem / 2; bool last_move, first_move; last_move = first_move = true; for (;;) { if (first_move == true) { res = compar(bot, elm); if (res > 0) return NULL; if (res == 0) return bot; first_move = false; } if (last_move == true) { res = compar(elm, (void *) ((char *) bot + last * size)); if (res > 0) return NULL; if (res == 0) return (void *) ((char *) bot + last * size); last_move = false; } res = compar(elm, (void *) ((char *) bot + celm * size)); if (res == 0) return (void *) ((char *) bot + celm * size); if (res < 0) { if (celm == 0) return NULL; last = celm - 1; celm = celm / 2; last_move = true; continue; } if (celm == last) return NULL; last = last - celm - 1; bot = (void *) ((char *) bot + (celm + 1) * size); celm = (last + 1) / 2; first_move = true; } } /* vc_find_eq */ static int vc_cmp_blk(const void *left, const void *right) { BlockNumber lblk, rblk; lblk = (*((VPageDescr *) left))->vpd_blkno; rblk = (*((VPageDescr *) right))->vpd_blkno; if (lblk < rblk) return -1; if (lblk == rblk) return 0; return 1; } /* vc_cmp_blk */ static int vc_cmp_offno(const void *left, const void *right) { if (*(OffsetNumber *) left < *(OffsetNumber *) right) return -1; if (*(OffsetNumber *) left == *(OffsetNumber *) right) return 0; return 1; } /* vc_cmp_offno */ static int vc_cmp_vtlinks(const void *left, const void *right) { if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi < ((VTupleLink) right)->new_tid.ip_blkid.bi_hi) return -1; if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi > ((VTupleLink) right)->new_tid.ip_blkid.bi_hi) return 1; /* bi_hi-es are equal */ if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo < ((VTupleLink) right)->new_tid.ip_blkid.bi_lo) return -1; if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo > ((VTupleLink) right)->new_tid.ip_blkid.bi_lo) return 1; /* bi_lo-es are equal */ if (((VTupleLink) left)->new_tid.ip_posid < ((VTupleLink) right)->new_tid.ip_posid) return -1; if (((VTupleLink) left)->new_tid.ip_posid > ((VTupleLink) right)->new_tid.ip_posid) return 1; return 0; } static void vc_getindices(Oid relid, int *nindices, Relation **Irel) { Relation pgindex; Relation irel; TupleDesc tupdesc; HeapTuple tuple; HeapScanDesc scan; Datum d; int i, k; bool n; ScanKeyData key; Oid *ioid; *nindices = i = 0; ioid = (Oid *) palloc(10 * sizeof(Oid)); /* prepare a heap scan on the pg_index relation */ pgindex = heap_openr(IndexRelationName, AccessShareLock); tupdesc = RelationGetDescr(pgindex); ScanKeyEntryInitialize(&key, 0x0, Anum_pg_index_indrelid, F_OIDEQ, ObjectIdGetDatum(relid)); scan = heap_beginscan(pgindex, false, SnapshotNow, 1, &key); while (HeapTupleIsValid(tuple = heap_getnext(scan, 0))) { d = heap_getattr(tuple, Anum_pg_index_indexrelid, tupdesc, &n); i++; if (i % 10 == 0) ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid)); ioid[i - 1] = DatumGetObjectId(d); } heap_endscan(scan); heap_close(pgindex, AccessShareLock); if (i == 0) { /* No one index found */ pfree(ioid); return; } if (Irel != (Relation **) NULL) *Irel = (Relation *) palloc(i * sizeof(Relation)); for (k = 0; i > 0;) { irel = index_open(ioid[--i]); if (irel != (Relation) NULL) { if (Irel != (Relation **) NULL) (*Irel)[k] = irel; else index_close(irel); k++; } else elog(NOTICE, "CAN'T OPEN INDEX %u - SKIP IT", ioid[i]); } *nindices = k; pfree(ioid); if (Irel != (Relation **) NULL && *nindices == 0) { pfree(*Irel); *Irel = (Relation *) NULL; } } /* vc_getindices */ static void vc_clsindices(int nindices, Relation *Irel) { if (Irel == (Relation *) NULL) return; while (nindices--) index_close(Irel[nindices]); pfree(Irel); } /* vc_clsindices */ static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc) { IndDesc *idcur; HeapTuple cachetuple; AttrNumber *attnumP; int natts; int i; *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc)); for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) { cachetuple = SearchSysCacheTupleCopy(INDEXRELID, ObjectIdGetDatum(RelationGetRelid(Irel[i])), 0, 0, 0); Assert(cachetuple); /* * we never free the copy we make, because Idesc needs it for * later */ idcur->tform = (Form_pg_index) GETSTRUCT(cachetuple); for (attnumP = &(idcur->tform->indkey[0]), natts = 0; natts < INDEX_MAX_KEYS && *attnumP != InvalidAttrNumber; attnumP++, natts++); if (idcur->tform->indproc != InvalidOid) { idcur->finfoP = &(idcur->finfo); FIgetnArgs(idcur->finfoP) = natts; natts = 1; FIgetProcOid(idcur->finfoP) = idcur->tform->indproc; *(FIgetname(idcur->finfoP)) = '\0'; } else idcur->finfoP = (FuncIndexInfo *) NULL; idcur->natts = natts; } } /* vc_mkindesc */ static bool vc_enough_space(VPageDescr vpd, Size len) { len = MAXALIGN(len); if (len > vpd->vpd_free) return false; if (vpd->vpd_offsets_used < vpd->vpd_offsets_free) /* there are free * itemid(s) */ return true; /* and len <= free_space */ /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */ if (len + MAXALIGN(sizeof(ItemIdData)) <= vpd->vpd_free) return true; return false; } /* vc_enough_space */ /* * Compute elapsed time since ru0 usage snapshot, and format into * a displayable string. Result is in a static string, which is * tacky, but no one ever claimed that the Postgres backend is * threadable... */ static char * vc_show_rusage(struct rusage *ru0) { static char result[64]; struct rusage ru1; getrusage(RUSAGE_SELF, &ru1); if (ru1.ru_stime.tv_usec < ru0->ru_stime.tv_usec) { ru1.ru_stime.tv_sec--; ru1.ru_stime.tv_usec += 1000000; } if (ru1.ru_utime.tv_usec < ru0->ru_utime.tv_usec) { ru1.ru_utime.tv_sec--; ru1.ru_utime.tv_usec += 1000000; } snprintf(result, sizeof(result), "CPU %d.%02ds/%d.%02du sec.", (int) (ru1.ru_stime.tv_sec - ru0->ru_stime.tv_sec), (int) (ru1.ru_stime.tv_usec - ru0->ru_stime.tv_usec) / 10000, (int) (ru1.ru_utime.tv_sec - ru0->ru_utime.tv_sec), (int) (ru1.ru_utime.tv_usec - ru0->ru_utime.tv_usec) / 10000); return result; }