From 26ac2171735f288a1380e173ffbb6af1a50183d8 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sun, 3 Mar 2002 17:47:56 +0000 Subject: [PATCH] Catcaches can now store negative entries as well as positive ones, to speed up repetitive failed searches; per pghackers discussion in late January. inval.c logic substantially simplified, since we can now treat inserts and deletes alike as far as inval events are concerned. Some repair work needed in heap_create_with_catalog, which turns out to have been doing CommandCounterIncrement at a point where the new relation has non-self-consistent catalog entries. With the new inval code, that resulted in assert failures during a relcache entry rebuild. --- src/backend/access/heap/heapam.c | 14 +- src/backend/catalog/heap.c | 200 ++++--- src/backend/catalog/index.c | 67 ++- src/backend/commands/vacuum.c | 21 +- src/backend/utils/cache/catcache.c | 806 ++++++++++++++++++----------- src/backend/utils/cache/inval.c | 238 ++++----- src/backend/utils/cache/relcache.c | 16 +- src/include/storage/sinval.h | 32 +- src/include/utils/catcache.h | 25 +- src/include/utils/inval.h | 6 +- 10 files changed, 830 insertions(+), 595 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 51f1c63d70..adcf9bc6cb 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.130 2002/03/02 21:39:17 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.131 2002/03/03 17:47:53 tgl Exp $ * * * INTERFACE ROUTINES @@ -1131,12 +1131,12 @@ heap_insert(Relation relation, HeapTuple tup) WriteBuffer(buffer); /* - * If tuple is cachable, mark it for rollback from the caches in case + * If tuple is cachable, mark it for invalidation from the caches in case * we abort. Note it is OK to do this after WriteBuffer releases the * buffer, because the "tup" data structure is all in local memory, * not in the shared buffer. */ - RelationMark4RollbackHeapTuple(relation, tup); + CacheInvalidateHeapTuple(relation, tup); return tup->t_data->t_oid; } @@ -1278,7 +1278,7 @@ l1: * look at the contents of the tuple, so we need to hold our refcount * on the buffer. */ - RelationInvalidateHeapTuple(relation, &tp); + CacheInvalidateHeapTuple(relation, &tp); WriteBuffer(buffer); @@ -1585,19 +1585,19 @@ l2: * boundary. We have to do this before WriteBuffer because we need to * look at the contents of the tuple, so we need to hold our refcount. */ - RelationInvalidateHeapTuple(relation, &oldtup); + CacheInvalidateHeapTuple(relation, &oldtup); if (newbuf != buffer) WriteBuffer(newbuf); WriteBuffer(buffer); /* - * If new tuple is cachable, mark it for rollback from the caches in + * If new tuple is cachable, mark it for invalidation from the caches in * case we abort. Note it is OK to do this after WriteBuffer releases * the buffer, because the "newtup" data structure is all in local * memory, not in the shared buffer. */ - RelationMark4RollbackHeapTuple(relation, newtup); + CacheInvalidateHeapTuple(relation, newtup); return HeapTupleMayBeUpdated; } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index ad192dca50..e14abe6567 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.182 2002/02/19 20:11:11 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.183 2002/03/03 17:47:54 tgl Exp $ * * * INTERFACE ROUTINES @@ -57,6 +57,7 @@ #include "storage/smgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/relcache.h" #include "utils/syscache.h" @@ -75,10 +76,10 @@ static void RelationRemoveIndexes(Relation relation); static void RelationRemoveInheritance(Relation relation); static void AddNewRelationType(char *typeName, Oid new_rel_oid, Oid new_type_oid); -static void StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, - bool updatePgAttribute); +static void StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin); static void StoreRelCheck(Relation rel, char *ccname, char *ccbin); -static void StoreConstraints(Relation rel); +static void StoreConstraints(Relation rel, TupleDesc tupdesc); +static void SetRelationNumChecks(Relation rel, int numchecks); static void RemoveConstraints(Relation rel); static void RemoveStatistics(Relation rel); @@ -202,9 +203,6 @@ SystemAttributeByName(const char *attname, bool relhasoids) * * Remove the system relation specific code to elsewhere eventually. * - * Eventually, must place information about this temporary relation - * into the transaction context block. - * * NOTE: if istemp is TRUE then heap_create will overwrite relname with * the unique "real" name chosen for the temp relation. * @@ -799,9 +797,16 @@ heap_create_with_catalog(char *relname, * now add tuples to pg_attribute for the attributes in our new * relation. */ - AddNewAttributeTuples(new_rel_oid, tupdesc, relhasoids); + AddNewAttributeTuples(new_rel_oid, new_rel_desc->rd_att, relhasoids); - StoreConstraints(new_rel_desc); + /* + * store constraints and defaults passed in the tupdesc, if any. + * + * NB: this may do a CommandCounterIncrement and rebuild the relcache + * entry, so the relation must be valid and self-consistent at this point. + * In particular, there are not yet constraints and defaults anywhere. + */ + StoreConstraints(new_rel_desc, tupdesc); /* * We create the disk file for this relation here @@ -922,8 +927,6 @@ RelationRemoveIndexes(Relation relation) Oid indexoid = lfirsti(indexoidscan); index_drop(indexoid); - /* advance cmd counter to make catalog changes visible */ - CommandCounterIncrement(); } freeList(indexoidlist); @@ -1377,12 +1380,9 @@ heap_drop_with_catalog(const char *relname, /* * Store a default expression for column attnum of relation rel. * The expression must be presented as a nodeToString() string. - * If updatePgAttribute is true, update the pg_attribute entry - * for the column to show that a default exists. */ static void -StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, - bool updatePgAttribute) +StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin) { Node *expr; char *adsrc; @@ -1429,9 +1429,10 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, heap_freetuple(tuple); pfree(adsrc); - if (!updatePgAttribute) - return; /* done if pg_attribute is OK */ - + /* + * Update the pg_attribute entry for the column to show that a default + * exists. + */ attrrel = heap_openr(AttributeRelationName, RowExclusiveLock); atttup = SearchSysCacheCopy(ATTNUM, ObjectIdGetDatum(RelationGetRelid(rel)), @@ -1516,33 +1517,35 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin) * NOTE: only pre-cooked expressions will be passed this way, which is to * say expressions inherited from an existing relation. Newly parsed * expressions can be added later, by direct calls to StoreAttrDefault - * and StoreRelCheck (see AddRelationRawConstraints()). We assume that - * pg_attribute and pg_class entries for the relation were already set - * to reflect the existence of these defaults/constraints. + * and StoreRelCheck (see AddRelationRawConstraints()). */ static void -StoreConstraints(Relation rel) +StoreConstraints(Relation rel, TupleDesc tupdesc) { - TupleConstr *constr = rel->rd_att->constr; + TupleConstr *constr = tupdesc->constr; int i; if (!constr) - return; + return; /* nothing to do */ /* - * deparsing of constraint expressions will fail unless the + * Deparsing of constraint expressions will fail unless the * just-created pg_attribute tuples for this relation are made - * visible. So, bump the command counter. + * visible. So, bump the command counter. CAUTION: this will + * cause a relcache entry rebuild. */ CommandCounterIncrement(); for (i = 0; i < constr->num_defval; i++) StoreAttrDefault(rel, constr->defval[i].adnum, - constr->defval[i].adbin, false); + constr->defval[i].adbin); for (i = 0; i < constr->num_check; i++) StoreRelCheck(rel, constr->check[i].ccname, constr->check[i].ccbin); + + if (constr->num_check > 0) + SetRelationNumChecks(rel, constr->num_check); } /* @@ -1580,10 +1583,6 @@ AddRelationRawConstraints(Relation rel, RangeTblEntry *rte; int numchecks; List *listptr; - Relation relrel; - Relation relidescs[Num_pg_class_indices]; - HeapTuple reltup; - Form_pg_class relStruct; /* * Get info about existing constraints. @@ -1681,7 +1680,7 @@ AddRelationRawConstraints(Relation rel, /* * OK, store it. */ - StoreAttrDefault(rel, colDef->attnum, nodeToString(expr), true); + StoreAttrDefault(rel, colDef->attnum, nodeToString(expr)); } /* @@ -1839,9 +1838,29 @@ AddRelationRawConstraints(Relation rel, * We do this even if there was no change, in order to ensure that an * SI update message is sent out for the pg_class tuple, which will * force other backends to rebuild their relcache entries for the rel. - * (Of course, for a newly created rel there is no need for an SI - * message, but for ALTER TABLE ADD ATTRIBUTE this'd be important.) + * (This is critical if we added defaults but not constraints.) */ + SetRelationNumChecks(rel, numchecks); +} + +/* + * Update the count of constraints in the relation's pg_class tuple. + * + * Caller had better hold exclusive lock on the relation. + * + * An important side effect is that a SI update message will be sent out for + * the pg_class tuple, which will force other backends to rebuild their + * relcache entries for the rel. Also, this backend will rebuild its + * own relcache entry at the next CommandCounterIncrement. + */ +static void +SetRelationNumChecks(Relation rel, int numchecks) +{ + Relation relrel; + HeapTuple reltup; + Form_pg_class relStruct; + Relation relidescs[Num_pg_class_indices]; + relrel = heap_openr(RelationRelationName, RowExclusiveLock); reltup = SearchSysCacheCopy(RELOID, ObjectIdGetDatum(RelationGetRelid(rel)), @@ -1851,22 +1870,30 @@ AddRelationRawConstraints(Relation rel, RelationGetRelid(rel)); relStruct = (Form_pg_class) GETSTRUCT(reltup); - relStruct->relchecks = numchecks; + if (relStruct->relchecks != numchecks) + { + relStruct->relchecks = numchecks; - simple_heap_update(relrel, &reltup->t_self, reltup); + simple_heap_update(relrel, &reltup->t_self, reltup); - /* keep catalog indices current */ - CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, - relidescs); - CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); - CatalogCloseIndices(Num_pg_class_indices, relidescs); + /* keep catalog indices current */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, + relidescs); + CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); + CatalogCloseIndices(Num_pg_class_indices, relidescs); + } + else + { + /* Skip the disk update, but force relcache inval anyway */ + CacheInvalidateRelcache(RelationGetRelid(rel)); + } heap_freetuple(reltup); heap_close(relrel, RowExclusiveLock); } static void -RemoveAttrDefault(Relation rel) +RemoveAttrDefaults(Relation rel) { Relation adrel; HeapScanDesc adscan; @@ -1889,7 +1916,7 @@ RemoveAttrDefault(Relation rel) } static void -RemoveRelCheck(Relation rel) +RemoveRelChecks(Relation rel) { Relation rcrel; HeapScanDesc rcscan; @@ -1923,9 +1950,6 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) { Oid relid; Relation rcrel; - Relation relrel; - Relation inhrel; - Relation relidescs[Num_pg_class_indices]; TupleDesc tupleDesc; TupleConstr *oldconstr; int numoldchecks; @@ -1933,8 +1957,6 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) HeapScanDesc rcscan; ScanKeyData key[2]; HeapTuple rctup; - HeapTuple reltup; - Form_pg_class relStruct; int rel_deleted = 0; int all_deleted = 0; @@ -1960,6 +1982,7 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) foreach(child, children) { Oid childrelid = lfirsti(child); + Relation inhrel; if (childrelid == relid) continue; @@ -1969,7 +1992,17 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) } } - /* Grab an exclusive lock on the pg_relcheck relation */ + /* + * Get number of existing constraints. + */ + tupleDesc = RelationGetDescr(rel); + oldconstr = tupleDesc->constr; + if (oldconstr) + numoldchecks = oldconstr->num_check; + else + numoldchecks = 0; + + /* Grab an appropriate lock on the pg_relcheck relation */ rcrel = heap_openr(RelCheckRelationName, RowExclusiveLock); /* @@ -2002,60 +2035,21 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) /* Clean up after the scan */ heap_endscan(rcscan); - - /* - * Update the count of constraints in the relation's pg_class tuple. - * We do this even if there was no change, in order to ensure that an - * SI update message is sent out for the pg_class tuple, which will - * force other backends to rebuild their relcache entries for the rel. - * (Of course, for a newly created rel there is no need for an SI - * message, but for ALTER TABLE ADD ATTRIBUTE this'd be important.) - */ - - /* - * Get number of existing constraints. - */ - - tupleDesc = RelationGetDescr(rel); - oldconstr = tupleDesc->constr; - if (oldconstr) - numoldchecks = oldconstr->num_check; - else - numoldchecks = 0; - - /* Calculate the new number of checks in the table, fail if negative */ - numchecks = numoldchecks - rel_deleted; - - if (numchecks < 0) - elog(ERROR, "check count became negative"); - - relrel = heap_openr(RelationRelationName, RowExclusiveLock); - reltup = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(RelationGetRelid(rel)), 0, 0, 0); - - if (!HeapTupleIsValid(reltup)) - elog(ERROR, "cache lookup of relation %u failed", - RelationGetRelid(rel)); - relStruct = (Form_pg_class) GETSTRUCT(reltup); - - relStruct->relchecks = numchecks; - - simple_heap_update(relrel, &reltup->t_self, reltup); - - /* Keep catalog indices current */ - CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, - relidescs); - CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); - CatalogCloseIndices(Num_pg_class_indices, relidescs); - - /* Clean up after the scan */ - heap_freetuple(reltup); - heap_close(relrel, RowExclusiveLock); - - /* Close the heap relation */ heap_close(rcrel, RowExclusiveLock); - /* Return the number of tuples deleted */ + if (rel_deleted) + { + /* + * Update the count of constraints in the relation's pg_class tuple. + */ + numchecks = numoldchecks - rel_deleted; + if (numchecks < 0) + elog(ERROR, "check count became negative"); + + SetRelationNumChecks(rel, numchecks); + } + + /* Return the number of tuples deleted, including all children */ return all_deleted; } @@ -2068,10 +2062,10 @@ RemoveConstraints(Relation rel) return; if (constr->num_defval > 0) - RemoveAttrDefault(rel); + RemoveAttrDefaults(rel); if (constr->num_check > 0) - RemoveRelCheck(rel); + RemoveRelChecks(rel); } static void diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index bd6bd1de38..ce63a0fd54 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.172 2002/02/19 20:11:11 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.173 2002/03/03 17:47:54 tgl Exp $ * * * INTERFACE ROUTINES @@ -800,19 +800,6 @@ index_drop(Oid indexId) simple_heap_delete(relationRelation, &tuple->t_self); heap_freetuple(tuple); - /* - * Update the pg_class tuple for the owning relation. We are - * presently too lazy to attempt to compute the new correct value of - * relhasindex (the next VACUUM will fix it if necessary). But we - * must send out a shared-cache-inval notice on the owning relation to - * ensure other backends update their relcache lists of indexes. So, - * unconditionally do setRelhasindex(true). - * - * Possible future improvement: skip the physical tuple update and just - * send out an invalidation message. - */ - setRelhasindex(heapId, true, false, InvalidOid); - heap_close(relationRelation, RowExclusiveLock); /* @@ -858,6 +845,15 @@ index_drop(Oid indexId) smgrunlink(DEFAULT_SMGR, userIndexRelation); + /* + * We are presently too lazy to attempt to compute the new correct value + * of relhasindex (the next VACUUM will fix it if necessary). So there is + * no need to update the pg_class tuple for the owning relation. + * But we must send out a shared-cache-inval notice on the owning relation + * to ensure other backends update their relcache lists of indexes. + */ + CacheInvalidateRelcache(heapId); + /* * Close rels, but keep locks */ @@ -1076,7 +1072,7 @@ LockClassinfoForUpdate(Oid relid, HeapTuple rtup, } break; } - RelationInvalidateHeapTuple(relationRelation, rtup); + CacheInvalidateHeapTuple(relationRelation, rtup); if (confirmCommitted) { HeapTupleHeader th = rtup->t_data; @@ -1137,10 +1133,8 @@ IndexesAreActive(Oid relid, bool confirmCommitted) * * NOTE: an important side-effect of this operation is that an SI invalidation * message is sent out to all backends --- including me --- causing relcache - * entries to be flushed or updated with the new hasindex data. - * Therefore, we execute the update even if relhasindex has the right value - * already. Possible future improvement: skip the disk update and just send - * an SI message in that case. + * entries to be flushed or updated with the new hasindex data. This must + * happen even if we find that no change is needed in the pg_class row. * ---------------- */ void @@ -1149,6 +1143,7 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) Relation pg_class; HeapTuple tuple; Form_pg_class classtuple; + bool dirty = false; HeapScanDesc pg_class_scan = NULL; /* @@ -1192,13 +1187,28 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) LockBuffer(pg_class_scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE); classtuple = (Form_pg_class) GETSTRUCT(tuple); - classtuple->relhasindex = hasindex; + + if (classtuple->relhasindex != hasindex) + { + classtuple->relhasindex = hasindex; + dirty = true; + } if (isprimary) - classtuple->relhaspkey = true; + { + if (!classtuple->relhaspkey) + { + classtuple->relhaspkey = true; + dirty = true; + } + } if (OidIsValid(reltoastidxid)) { Assert(classtuple->relkind == RELKIND_TOASTVALUE); - classtuple->reltoastidxid = reltoastidxid; + if (classtuple->reltoastidxid != reltoastidxid) + { + classtuple->reltoastidxid = reltoastidxid; + dirty = true; + } } if (pg_class_scan) @@ -1210,10 +1220,10 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) WriteNoReleaseBuffer(pg_class_scan->rs_cbuf); /* Send out shared cache inval if necessary */ if (!IsBootstrapProcessingMode()) - RelationInvalidateHeapTuple(pg_class, tuple); + CacheInvalidateHeapTuple(pg_class, tuple); BufferSync(); } - else + else if (dirty) { simple_heap_update(pg_class, &tuple->t_self, tuple); @@ -1228,6 +1238,11 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) CatalogCloseIndices(Num_pg_class_indices, idescs); } } + else + { + /* no need to change tuple, but force relcache rebuild anyway */ + CacheInvalidateRelcache(relid); + } if (!pg_class_scan) heap_freetuple(tuple); @@ -1280,7 +1295,7 @@ setNewRelfilenode(Relation relation) classTuple = &lockTupleData; /* Send out shared cache inval if necessary */ if (!IsBootstrapProcessingMode()) - RelationInvalidateHeapTuple(pg_class, classTuple); + CacheInvalidateHeapTuple(pg_class, classTuple); /* Update the buffer in-place */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ((Form_pg_class) GETSTRUCT(classTuple))->relfilenode = newrelfilenode; @@ -1442,7 +1457,7 @@ UpdateStats(Oid relid, double reltuples) LockBuffer(pg_class_scan->rs_cbuf, BUFFER_LOCK_UNLOCK); WriteNoReleaseBuffer(pg_class_scan->rs_cbuf); if (!IsBootstrapProcessingMode()) - RelationInvalidateHeapTuple(pg_class, tuple); + CacheInvalidateHeapTuple(pg_class, tuple); } else { diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index a88ce7099e..afcdac7fb5 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -13,7 +13,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.215 2002/03/02 21:39:23 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.216 2002/03/03 17:47:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -527,7 +527,7 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples, * no flush will occur, but no great harm is done since there are no * noncritical state updates here.) */ - RelationInvalidateHeapTuple(rd, &rtup); + CacheInvalidateHeapTuple(rd, &rtup); /* Write the buffer */ WriteBuffer(buffer); @@ -583,7 +583,7 @@ vac_update_dbstats(Oid dbid, dbform->datfrozenxid = frozenXID; /* invalidate the tuple in the cache and write the buffer */ - RelationInvalidateHeapTuple(relation, tuple); + CacheInvalidateHeapTuple(relation, tuple); WriteNoReleaseBuffer(scan->rs_cbuf); heap_endscan(scan); @@ -1796,7 +1796,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, */ heap_copytuple_with_tuple(&tuple, &newtup); - RelationInvalidateHeapTuple(onerel, &tuple); + /* + * register invalidation of source tuple in catcaches. + */ + CacheInvalidateHeapTuple(onerel, &tuple); /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */ START_CRIT_SECTION(); @@ -1953,7 +1956,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, /* copy tuple */ heap_copytuple_with_tuple(&tuple, &newtup); - RelationInvalidateHeapTuple(onerel, &tuple); + /* + * register invalidation of source tuple in catcaches. + * + * (Note: we do not need to register the copied tuple, + * because we are not changing the tuple contents and + * so there cannot be any need to flush negative + * catcache entries.) + */ + CacheInvalidateHeapTuple(onerel, &tuple); /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */ START_CRIT_SECTION(); diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c index b79d671255..e04792d71f 100644 --- a/src/backend/utils/cache/catcache.c +++ b/src/backend/utils/cache/catcache.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.89 2002/03/02 21:39:32 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.90 2002/03/03 17:47:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -91,15 +91,15 @@ static const Oid eqproc[] = { #define EQPROC(SYSTEMTYPEOID) eqproc[(SYSTEMTYPEOID)-BOOLOID] -static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct); -static Index CatalogCacheComputeHashIndex(CatCache *cache, +static uint32 CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey); -static Index CatalogCacheComputeTupleHashIndex(CatCache *cache, +static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple); -static void CatalogCacheInitializeCache(CatCache *cache); #ifdef CATCACHE_STATS static void CatCachePrintStats(void); #endif +static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct); +static void CatalogCacheInitializeCache(CatCache *cache); /* @@ -136,221 +136,17 @@ GetCCHashFunc(Oid keytype) } } -#ifdef CATCACHE_STATS - -static void -CatCachePrintStats(void) -{ - CatCache *cache; - long cc_searches = 0; - long cc_hits = 0; - long cc_newloads = 0; - - elog(LOG, "Catcache stats dump: %d/%d tuples in catcaches", - CacheHdr->ch_ntup, CacheHdr->ch_maxtup); - - for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) - { - if (cache->cc_ntup == 0 && cache->cc_searches == 0) - continue; /* don't print unused caches */ - elog(LOG, "Catcache %s/%s: %d tup, %ld srch, %ld hits, %ld loads, %ld not found", - cache->cc_relname, - cache->cc_indname, - cache->cc_ntup, - cache->cc_searches, - cache->cc_hits, - cache->cc_newloads, - cache->cc_searches - cache->cc_hits - cache->cc_newloads); - cc_searches += cache->cc_searches; - cc_hits += cache->cc_hits; - cc_newloads += cache->cc_newloads; - } - elog(LOG, "Catcache totals: %d tup, %ld srch, %ld hits, %ld loads, %ld not found", - CacheHdr->ch_ntup, - cc_searches, - cc_hits, - cc_newloads, - cc_searches - cc_hits - cc_newloads); -} - -#endif /* CATCACHE_STATS */ - - /* - * Standard routine for creating cache context if it doesn't exist yet + * CatalogCacheComputeHashValue * - * There are a lot of places (probably far more than necessary) that check - * whether CacheMemoryContext exists yet and want to create it if not. - * We centralize knowledge of exactly how to create it here. + * Compute the hash value associated with a given set of lookup keys */ -void -CreateCacheMemoryContext(void) +static uint32 +CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey) { - /* - * Purely for paranoia, check that context doesn't exist; caller - * probably did so already. - */ - if (!CacheMemoryContext) - CacheMemoryContext = AllocSetContextCreate(TopMemoryContext, - "CacheMemoryContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); -} + uint32 hashValue = 0; - -/* - * CatalogCacheInitializeCache - * - * This function does final initialization of a catcache: obtain the tuple - * descriptor and set up the hash and equality function links. We assume - * that the relcache entry can be opened at this point! - */ -#ifdef CACHEDEBUG -#define CatalogCacheInitializeCache_DEBUG1 \ - elog(LOG, "CatalogCacheInitializeCache: cache @%p %s", cache, \ - cache->cc_relname) - -#define CatalogCacheInitializeCache_DEBUG2 \ -do { \ - if (cache->cc_key[i] > 0) { \ - elog(LOG, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \ - i+1, cache->cc_nkeys, cache->cc_key[i], \ - tupdesc->attrs[cache->cc_key[i] - 1]->atttypid); \ - } else { \ - elog(LOG, "CatalogCacheInitializeCache: load %d/%d w/%d", \ - i+1, cache->cc_nkeys, cache->cc_key[i]); \ - } \ -} while(0) - -#else -#define CatalogCacheInitializeCache_DEBUG1 -#define CatalogCacheInitializeCache_DEBUG2 -#endif - -static void -CatalogCacheInitializeCache(CatCache *cache) -{ - Relation relation; - MemoryContext oldcxt; - TupleDesc tupdesc; - int i; - - CatalogCacheInitializeCache_DEBUG1; - - /* - * Open the relation without locking --- we only need the tupdesc, - * which we assume will never change ... - */ - relation = heap_openr(cache->cc_relname, NoLock); - Assert(RelationIsValid(relation)); - - /* - * switch to the cache context so our allocations do not vanish at the - * end of a transaction - */ - Assert(CacheMemoryContext != NULL); - - oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - - /* - * copy the relcache's tuple descriptor to permanent cache storage - */ - tupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation)); - - /* - * get the relation's relisshared flag, too - */ - cache->cc_relisshared = RelationGetForm(relation)->relisshared; - - /* - * return to the caller's memory context and close the rel - */ - MemoryContextSwitchTo(oldcxt); - - heap_close(relation, NoLock); - - CACHE3_elog(LOG, "CatalogCacheInitializeCache: %s, %d keys", - cache->cc_relname, cache->cc_nkeys); - - /* - * initialize cache's key information - */ - for (i = 0; i < cache->cc_nkeys; ++i) - { - Oid keytype; - - CatalogCacheInitializeCache_DEBUG2; - - if (cache->cc_key[i] > 0) - keytype = tupdesc->attrs[cache->cc_key[i] - 1]->atttypid; - else - { - if (cache->cc_key[i] != ObjectIdAttributeNumber) - elog(FATAL, "CatalogCacheInit: only sys attr supported is OID"); - keytype = OIDOID; - } - - cache->cc_hashfunc[i] = GetCCHashFunc(keytype); - - /* - * If GetCCHashFunc liked the type, safe to index into eqproc[] - */ - cache->cc_skey[i].sk_procedure = EQPROC(keytype); - - /* Do function lookup */ - fmgr_info_cxt(cache->cc_skey[i].sk_procedure, - &cache->cc_skey[i].sk_func, - CacheMemoryContext); - - /* Initialize sk_attno suitably for HeapKeyTest() and heap scans */ - cache->cc_skey[i].sk_attno = cache->cc_key[i]; - - CACHE4_elog(LOG, "CatalogCacheInit %s %d %p", - cache->cc_relname, - i, - cache); - } - - /* - * mark this cache fully initialized - */ - cache->cc_tupdesc = tupdesc; -} - -/* - * InitCatCachePhase2 -- external interface for CatalogCacheInitializeCache - * - * The only reason to call this routine is to ensure that the relcache - * has created entries for all the catalogs and indexes referenced by - * catcaches. Therefore, open the index too. An exception is the indexes - * on pg_am, which we don't use (cf. IndexScanOK). - */ -void -InitCatCachePhase2(CatCache *cache) -{ - if (cache->cc_tupdesc == NULL) - CatalogCacheInitializeCache(cache); - - if (cache->id != AMOID && - cache->id != AMNAME) - { - Relation idesc; - - idesc = index_openr(cache->cc_indname); - index_close(idesc); - } -} - -/* - * CatalogCacheComputeHashIndex - */ -static Index -CatalogCacheComputeHashIndex(CatCache *cache, ScanKey cur_skey) -{ - uint32 hashIndex = 0; - - CACHE4_elog(LOG, "CatalogCacheComputeHashIndex %s %d %p", + CACHE4_elog(DEBUG1, "CatalogCacheComputeHashValue %s %d %p", cache->cc_relname, cache->cc_nkeys, cache); @@ -358,39 +154,40 @@ CatalogCacheComputeHashIndex(CatCache *cache, ScanKey cur_skey) switch (cache->cc_nkeys) { case 4: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[3], cur_skey[3].sk_argument)) << 9; /* FALLTHROUGH */ case 3: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[2], cur_skey[2].sk_argument)) << 6; /* FALLTHROUGH */ case 2: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[1], cur_skey[1].sk_argument)) << 3; /* FALLTHROUGH */ case 1: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[0], cur_skey[0].sk_argument)); break; default: - elog(FATAL, "CCComputeHashIndex: %d cc_nkeys", cache->cc_nkeys); + elog(FATAL, "CCComputeHashValue: %d cc_nkeys", cache->cc_nkeys); break; } - hashIndex %= (uint32) cache->cc_size; - return (Index) hashIndex; + + return hashValue; } /* - * CatalogCacheComputeTupleHashIndex + * CatalogCacheComputeTupleHashValue + * + * Compute the hash value associated with a given tuple to be cached */ -static Index -CatalogCacheComputeTupleHashIndex(CatCache *cache, - HeapTuple tuple) +static uint32 +CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple) { ScanKeyData cur_skey[4]; bool isNull = false; @@ -442,16 +239,75 @@ CatalogCacheComputeTupleHashIndex(CatCache *cache, Assert(!isNull); break; default: - elog(FATAL, "CCComputeTupleHashIndex: %d cc_nkeys", + elog(FATAL, "CCComputeTupleHashValue: %d cc_nkeys", cache->cc_nkeys); break; } - return CatalogCacheComputeHashIndex(cache, cur_skey); + return CatalogCacheComputeHashValue(cache, cur_skey); } + +#ifdef CATCACHE_STATS + +static void +CatCachePrintStats(void) +{ + CatCache *cache; + long cc_searches = 0; + long cc_hits = 0; + long cc_neg_hits = 0; + long cc_newloads = 0; + long cc_invals = 0; + long cc_discards = 0; + + elog(DEBUG1, "Catcache stats dump: %d/%d tuples in catcaches", + CacheHdr->ch_ntup, CacheHdr->ch_maxtup); + + for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) + { + if (cache->cc_ntup == 0 && cache->cc_searches == 0) + continue; /* don't print unused caches */ + elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards", + cache->cc_relname, + cache->cc_indname, + cache->cc_ntup, + cache->cc_searches, + cache->cc_hits, + cache->cc_neg_hits, + cache->cc_hits + cache->cc_neg_hits, + cache->cc_newloads, + cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads, + cache->cc_searches - cache->cc_hits - cache->cc_neg_hits, + cache->cc_invals, + cache->cc_discards); + cc_searches += cache->cc_searches; + cc_hits += cache->cc_hits; + cc_neg_hits += cache->cc_neg_hits; + cc_newloads += cache->cc_newloads; + cc_invals += cache->cc_invals; + cc_discards += cache->cc_discards; + } + elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards", + CacheHdr->ch_ntup, + cc_searches, + cc_hits, + cc_neg_hits, + cc_hits + cc_neg_hits, + cc_newloads, + cc_searches - cc_hits - cc_neg_hits - cc_newloads, + cc_searches - cc_hits - cc_neg_hits, + cc_invals, + cc_discards); +} + +#endif /* CATCACHE_STATS */ + + /* * CatCacheRemoveCTup + * + * Unlink and delete the given cache entry */ static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct) @@ -473,16 +329,26 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct) } /* - * CatalogCacheIdInvalidate() + * CatalogCacheIdInvalidate * - * Invalidate a tuple given a cache id. In this case the id should always - * be found (whether the cache has opened its relation or not). Of course, - * if the cache has yet to open its relation, there will be no tuples so - * no problem. + * Invalidate entries in the specified cache, given a hash value and + * item pointer. Positive entries are deleted if they match the item + * pointer. Negative entries must be deleted if they match the hash + * value (since we do not have the exact key of the tuple that's being + * inserted). But this should only rarely result in loss of a cache + * entry that could have been kept. + * + * Note that it's not very relevant whether the tuple identified by + * the item pointer is being inserted or deleted. We don't expect to + * find matching positive entries in the one case, and we don't expect + * to find matching negative entries in the other; but we will do the + * right things in any case. + * + * This routine is only quasi-public: it should only be used by inval.c. */ void CatalogCacheIdInvalidate(int cacheId, - Index hashIndex, + uint32 hashValue, ItemPointer pointer) { CatCache *ccp; @@ -491,37 +357,51 @@ CatalogCacheIdInvalidate(int cacheId, * sanity checks */ Assert(ItemPointerIsValid(pointer)); - CACHE1_elog(LOG, "CatalogCacheIdInvalidate: called"); + CACHE1_elog(DEBUG1, "CatalogCacheIdInvalidate: called"); /* * inspect caches to find the proper cache */ for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next) { + Index hashIndex; Dlelem *elt, *nextelt; if (cacheId != ccp->id) continue; - Assert(hashIndex < ccp->cc_size); + /* + * We don't bother to check whether the cache has finished + * initialization yet; if not, there will be no entries in it + * so no problem. + */ /* - * inspect the hash bucket until we find a match or exhaust + * inspect the proper hash bucket for matches */ + hashIndex = (Index) (hashValue % (uint32) ccp->cc_size); + for (elt = DLGetHead(&ccp->cc_bucket[hashIndex]); elt; elt = nextelt) { CatCTup *ct = (CatCTup *) DLE_VAL(elt); nextelt = DLGetSucc(elt); - if (ItemPointerEquals(pointer, &ct->tuple.t_self)) + if (hashValue != ct->hash_value) + continue; /* ignore non-matching hash values */ + + if (ct->negative || + ItemPointerEquals(pointer, &ct->tuple.t_self)) { if (ct->refcount > 0) ct->dead = true; else CatCacheRemoveCTup(ccp, ct); - CACHE1_elog(LOG, "CatalogCacheIdInvalidate: invalidated"); + CACHE1_elog(DEBUG1, "CatalogCacheIdInvalidate: invalidated"); +#ifdef CATCACHE_STATS + ccp->cc_invals++; +#endif /* could be multiple matches, so keep looking! */ } } @@ -531,17 +411,33 @@ CatalogCacheIdInvalidate(int cacheId, /* ---------------------------------------------------------------- * public functions - * - * AtEOXact_CatCache - * ResetCatalogCaches - * InitCatCache - * SearchCatCache - * ReleaseCatCache - * RelationInvalidateCatalogCacheTuple * ---------------------------------------------------------------- */ +/* + * Standard routine for creating cache context if it doesn't exist yet + * + * There are a lot of places (probably far more than necessary) that check + * whether CacheMemoryContext exists yet and want to create it if not. + * We centralize knowledge of exactly how to create it here. + */ +void +CreateCacheMemoryContext(void) +{ + /* + * Purely for paranoia, check that context doesn't exist; caller + * probably did so already. + */ + if (!CacheMemoryContext) + CacheMemoryContext = AllocSetContextCreate(TopMemoryContext, + "CacheMemoryContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); +} + + /* * AtEOXact_CatCache * @@ -609,6 +505,9 @@ ResetCatalogCache(CatCache *cache) ct->dead = true; else CatCacheRemoveCTup(cache, ct); +#ifdef CATCACHE_STATS + cache->cc_invals++; +#endif } } } @@ -623,12 +522,12 @@ ResetCatalogCaches(void) { CatCache *cache; - CACHE1_elog(LOG, "ResetCatalogCaches called"); + CACHE1_elog(DEBUG1, "ResetCatalogCaches called"); for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) ResetCatalogCache(cache); - CACHE1_elog(LOG, "end of ResetCatalogCaches call"); + CACHE1_elog(DEBUG1, "end of ResetCatalogCaches call"); } /* @@ -656,7 +555,7 @@ CatalogCacheFlushRelation(Oid relId) { CatCache *cache; - CACHE2_elog(LOG, "CatalogCacheFlushRelation called for %u", relId); + CACHE2_elog(DEBUG1, "CatalogCacheFlushRelation called for %u", relId); for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) { @@ -691,6 +590,13 @@ CatalogCacheFlushRelation(Oid relId) nextelt = DLGetSucc(elt); + /* + * Negative entries are never considered related to a rel, + * even if the rel is part of their lookup key. + */ + if (ct->negative) + continue; + if (cache->cc_reloidattr == ObjectIdAttributeNumber) tupRelid = ct->tuple.t_data->t_oid; else @@ -711,12 +617,15 @@ CatalogCacheFlushRelation(Oid relId) ct->dead = true; else CatCacheRemoveCTup(cache, ct); +#ifdef CATCACHE_STATS + cache->cc_invals++; +#endif } } } } - CACHE1_elog(LOG, "end of CatalogCacheFlushRelation call"); + CACHE1_elog(DEBUG1, "end of CatalogCacheFlushRelation call"); } /* @@ -730,7 +639,7 @@ CatalogCacheFlushRelation(Oid relId) #ifdef CACHEDEBUG #define InitCatCache_DEBUG1 \ do { \ - elog(LOG, "InitCatCache: rel=%s id=%d nkeys=%d size=%d\n", \ + elog(DEBUG1, "InitCatCache: rel=%s id=%d nkeys=%d size=%d\n", \ cp->cc_relname, cp->id, cp->cc_nkeys, cp->cc_size); \ } while(0) @@ -791,9 +700,10 @@ InitCatCache(int id, cp->id = id; cp->cc_relname = relname; cp->cc_indname = indname; - cp->cc_reloidattr = reloidattr; + cp->cc_reloid = InvalidOid; /* temporary */ cp->cc_relisshared = false; /* temporary */ cp->cc_tupdesc = (TupleDesc) NULL; + cp->cc_reloidattr = reloidattr; cp->cc_ntup = 0; cp->cc_size = NCCBUCKETS; cp->cc_nkeys = nkeys; @@ -820,6 +730,152 @@ InitCatCache(int id, return cp; } +/* + * CatalogCacheInitializeCache + * + * This function does final initialization of a catcache: obtain the tuple + * descriptor and set up the hash and equality function links. We assume + * that the relcache entry can be opened at this point! + */ +#ifdef CACHEDEBUG +#define CatalogCacheInitializeCache_DEBUG1 \ + elog(DEBUG1, "CatalogCacheInitializeCache: cache @%p %s", cache, \ + cache->cc_relname) + +#define CatalogCacheInitializeCache_DEBUG2 \ +do { \ + if (cache->cc_key[i] > 0) { \ + elog(DEBUG1, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \ + i+1, cache->cc_nkeys, cache->cc_key[i], \ + tupdesc->attrs[cache->cc_key[i] - 1]->atttypid); \ + } else { \ + elog(DEBUG1, "CatalogCacheInitializeCache: load %d/%d w/%d", \ + i+1, cache->cc_nkeys, cache->cc_key[i]); \ + } \ +} while(0) + +#else +#define CatalogCacheInitializeCache_DEBUG1 +#define CatalogCacheInitializeCache_DEBUG2 +#endif + +static void +CatalogCacheInitializeCache(CatCache *cache) +{ + Relation relation; + MemoryContext oldcxt; + TupleDesc tupdesc; + int i; + + CatalogCacheInitializeCache_DEBUG1; + + /* + * Open the relation without locking --- we only need the tupdesc, + * which we assume will never change ... + */ + relation = heap_openr(cache->cc_relname, NoLock); + Assert(RelationIsValid(relation)); + + /* + * switch to the cache context so our allocations do not vanish at the + * end of a transaction + */ + Assert(CacheMemoryContext != NULL); + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + /* + * copy the relcache's tuple descriptor to permanent cache storage + */ + tupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation)); + + /* + * get the relation's OID and relisshared flag, too + */ + cache->cc_reloid = RelationGetRelid(relation); + cache->cc_relisshared = RelationGetForm(relation)->relisshared; + + /* + * return to the caller's memory context and close the rel + */ + MemoryContextSwitchTo(oldcxt); + + heap_close(relation, NoLock); + + CACHE3_elog(DEBUG1, "CatalogCacheInitializeCache: %s, %d keys", + cache->cc_relname, cache->cc_nkeys); + + /* + * initialize cache's key information + */ + for (i = 0; i < cache->cc_nkeys; ++i) + { + Oid keytype; + + CatalogCacheInitializeCache_DEBUG2; + + if (cache->cc_key[i] > 0) + keytype = tupdesc->attrs[cache->cc_key[i] - 1]->atttypid; + else + { + if (cache->cc_key[i] != ObjectIdAttributeNumber) + elog(FATAL, "CatalogCacheInit: only sys attr supported is OID"); + keytype = OIDOID; + } + + cache->cc_hashfunc[i] = GetCCHashFunc(keytype); + + cache->cc_isname[i] = (keytype == NAMEOID); + + /* + * If GetCCHashFunc liked the type, safe to index into eqproc[] + */ + cache->cc_skey[i].sk_procedure = EQPROC(keytype); + + /* Do function lookup */ + fmgr_info_cxt(cache->cc_skey[i].sk_procedure, + &cache->cc_skey[i].sk_func, + CacheMemoryContext); + + /* Initialize sk_attno suitably for HeapKeyTest() and heap scans */ + cache->cc_skey[i].sk_attno = cache->cc_key[i]; + + CACHE4_elog(DEBUG1, "CatalogCacheInit %s %d %p", + cache->cc_relname, + i, + cache); + } + + /* + * mark this cache fully initialized + */ + cache->cc_tupdesc = tupdesc; +} + +/* + * InitCatCachePhase2 -- external interface for CatalogCacheInitializeCache + * + * The only reason to call this routine is to ensure that the relcache + * has created entries for all the catalogs and indexes referenced by + * catcaches. Therefore, open the index too. An exception is the indexes + * on pg_am, which we don't use (cf. IndexScanOK). + */ +void +InitCatCachePhase2(CatCache *cache) +{ + if (cache->cc_tupdesc == NULL) + CatalogCacheInitializeCache(cache); + + if (cache->id != AMOID && + cache->id != AMNAME) + { + Relation idesc; + + idesc = index_openr(cache->cc_indname); + index_close(idesc); + } +} + /* * IndexScanOK @@ -874,10 +930,20 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey) } /* - * SearchCatCache + * SearchCatCache * * This call searches a system cache for a tuple, opening the relation - * if necessary (the first access to a particular cache). + * if necessary (on the first access to a particular cache). + * + * The result is NULL if not found, or a pointer to a HeapTuple in + * the cache. The caller must not modify the tuple, and must call + * ReleaseCatCache() when done with it. + * + * The search key values should be expressed as Datums of the key columns' + * datatype(s). (Pass zeroes for any unused parameters.) As a special + * exception, the passed-in key for a NAME column can be just a C string; + * the caller need not go to the trouble of converting it to a fully + * null-padded NAME. */ HeapTuple SearchCatCache(CatCache *cache, @@ -887,11 +953,13 @@ SearchCatCache(CatCache *cache, Datum v4) { ScanKeyData cur_skey[4]; - Index hash; + uint32 hashValue; + Index hashIndex; Dlelem *elt; CatCTup *ct; - HeapTuple ntp; Relation relation; + HeapTuple ntp; + int i; MemoryContext oldcxt; /* @@ -916,12 +984,13 @@ SearchCatCache(CatCache *cache, /* * find the hash bucket in which to look for the tuple */ - hash = CatalogCacheComputeHashIndex(cache, cur_skey); + hashValue = CatalogCacheComputeHashValue(cache, cur_skey); + hashIndex = (Index) (hashValue % (uint32) cache->cc_size); /* * scan the hash bucket until we find a match or exhaust our tuples */ - for (elt = DLGetHead(&cache->cc_bucket[hash]); + for (elt = DLGetHead(&cache->cc_bucket[hashIndex]); elt; elt = DLGetSucc(elt)) { @@ -932,9 +1001,11 @@ SearchCatCache(CatCache *cache, if (ct->dead) continue; /* ignore dead entries */ + if (ct->hash_value != hashValue) + continue; /* quickly skip entry if wrong hash val */ + /* - * see if the cached tuple matches our key. (should we be worried - * about time ranges? -cim 10/2/90) + * see if the cached tuple matches our key. */ HeapKeyTest(&ct->tuple, cache->cc_tupdesc, @@ -945,33 +1016,53 @@ SearchCatCache(CatCache *cache, continue; /* - * we found a tuple in the cache: bump its refcount, move it to - * the front of the LRU list, and return it. We also move it to - * the front of the list for its hashbucket, in order to speed - * subsequent searches. (The most frequently accessed elements in - * any hashbucket will tend to be near the front of the - * hashbucket's list.) + * we found a match in the cache: move it to the front of the global + * LRU list. We also move it to the front of the list for its + * hashbucket, in order to speed subsequent searches. (The most + * frequently accessed elements in any hashbucket will tend to be + * near the front of the hashbucket's list.) */ - ct->refcount++; - DLMoveToFront(&ct->lrulist_elem); DLMoveToFront(&ct->cache_elem); + /* + * If it's a positive entry, bump its refcount and return it. + * If it's negative, we can report failure to the caller. + */ + if (!ct->negative) + { + ct->refcount++; + #ifdef CACHEDEBUG - CACHE3_elog(LOG, "SearchCatCache(%s): found in bucket %d", - cache->cc_relname, hash); + CACHE3_elog(DEBUG1, "SearchCatCache(%s): found in bucket %d", + cache->cc_relname, hashIndex); #endif /* CACHEDEBUG */ #ifdef CATCACHE_STATS - cache->cc_hits++; + cache->cc_hits++; #endif - return &ct->tuple; + return &ct->tuple; + } + else + { +#ifdef CACHEDEBUG + CACHE3_elog(DEBUG1, "SearchCatCache(%s): found neg entry in bucket %d", + cache->cc_relname, hashIndex); +#endif /* CACHEDEBUG */ + +#ifdef CATCACHE_STATS + cache->cc_neg_hits++; +#endif + + return NULL; + } } /* - * Tuple was not found in cache, so we have to try and retrieve it - * directly from the relation. If it's found, we add it to the cache. + * Tuple was not found in cache, so we have to try to retrieve it + * directly from the relation. If found, we will add it to the + * cache; if not found, we will add a negative cache entry instead. * * NOTE: it is possible for recursive cache lookups to occur while * reading the relation --- for example, due to shared-cache-inval @@ -987,14 +1078,18 @@ SearchCatCache(CatCache *cache, /* * open the relation associated with the cache */ - relation = heap_openr(cache->cc_relname, AccessShareLock); + relation = heap_open(cache->cc_reloid, AccessShareLock); + + /* + * Pre-create cache entry header, and mark no tuple found. + */ + ct = (CatCTup *) MemoryContextAlloc(CacheMemoryContext, sizeof(CatCTup)); + ct->negative = true; /* * Scan the relation to find the tuple. If there's an index, and if * it's safe to do so, use the index. Else do a heap scan. */ - ct = NULL; - if ((RelationGetForm(relation))->relhasindex && !IsIgnoringSystemIndexes() && IndexScanOK(cache, cur_skey)) @@ -1004,9 +1099,8 @@ SearchCatCache(CatCache *cache, RetrieveIndexResult indexRes; HeapTupleData tuple; Buffer buffer; - int i; - CACHE2_elog(LOG, "SearchCatCache(%s): performing index scan", + CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing index scan", cache->cc_relname); /* @@ -1031,8 +1125,8 @@ SearchCatCache(CatCache *cache, { /* Copy tuple into our context */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - ct = (CatCTup *) palloc(sizeof(CatCTup)); heap_copytuple_with_tuple(&tuple, &ct->tuple); + ct->negative = false; MemoryContextSwitchTo(oldcxt); ReleaseBuffer(buffer); break; @@ -1045,7 +1139,7 @@ SearchCatCache(CatCache *cache, { HeapScanDesc sd; - CACHE2_elog(LOG, "SearchCatCache(%s): performing heap scan", + CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing heap scan", cache->cc_relname); sd = heap_beginscan(relation, 0, SnapshotNow, @@ -1057,8 +1151,8 @@ SearchCatCache(CatCache *cache, { /* Copy tuple into our context */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - ct = (CatCTup *) palloc(sizeof(CatCTup)); heap_copytuple_with_tuple(ntp, &ct->tuple); + ct->negative = false; MemoryContextSwitchTo(oldcxt); /* We should not free the result of heap_getnext... */ } @@ -1072,58 +1166,140 @@ SearchCatCache(CatCache *cache, heap_close(relation, AccessShareLock); /* - * scan is complete. if tup was found, we can add it to the cache. + * scan is complete. If tuple was not found, we need to build + * a fake tuple for the negative cache entry. The fake tuple has + * the correct key columns, but nulls everywhere else. */ - if (ct == NULL) - return NULL; + if (ct->negative) + { + TupleDesc tupDesc = cache->cc_tupdesc; + Datum *values; + char *nulls; + Oid negOid = InvalidOid; + + values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); + nulls = (char *) palloc(tupDesc->natts * sizeof(char)); + + memset(values, 0, tupDesc->natts * sizeof(Datum)); + memset(nulls, 'n', tupDesc->natts * sizeof(char)); + + for (i = 0; i < cache->cc_nkeys; i++) + { + int attindex = cache->cc_key[i]; + Datum keyval = cur_skey[i].sk_argument; + + if (attindex > 0) + { + /* + * Here we must be careful in case the caller passed a + * C string where a NAME is wanted: convert the given + * argument to a correctly padded NAME. Otherwise the + * memcpy() done in heap_formtuple could fall off the + * end of memory. + */ + if (cache->cc_isname[i]) + { + Name newval = (Name) palloc(NAMEDATALEN); + + namestrcpy(newval, DatumGetCString(keyval)); + keyval = NameGetDatum(newval); + } + values[attindex-1] = keyval; + nulls[attindex-1] = ' '; + } + else + { + Assert(attindex == ObjectIdAttributeNumber); + negOid = DatumGetObjectId(keyval); + } + } + + ntp = heap_formtuple(tupDesc, values, nulls); + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + heap_copytuple_with_tuple(ntp, &ct->tuple); + ct->tuple.t_data->t_oid = negOid; + MemoryContextSwitchTo(oldcxt); + + heap_freetuple(ntp); + for (i = 0; i < cache->cc_nkeys; i++) + { + if (cache->cc_isname[i]) + pfree(DatumGetName(values[cache->cc_key[i]-1])); + } + pfree(values); + pfree(nulls); + } /* * Finish initializing the CatCTup header, and add it to the linked * lists. */ - CACHE1_elog(LOG, "SearchCatCache: found tuple"); - ct->ct_magic = CT_MAGIC; ct->my_cache = cache; DLInitElem(&ct->lrulist_elem, (void *) ct); DLInitElem(&ct->cache_elem, (void *) ct); ct->refcount = 1; /* count this first reference */ ct->dead = false; + ct->hash_value = hashValue; DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem); - DLAddHead(&cache->cc_bucket[hash], &ct->cache_elem); - -#ifdef CATCACHE_STATS - cache->cc_newloads++; -#endif + DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem); /* * If we've exceeded the desired size of the caches, try to throw away - * the least recently used entry. + * the least recently used entry. NB: the newly-built entry cannot + * get thrown away here, because it has positive refcount. */ ++cache->cc_ntup; if (++CacheHdr->ch_ntup > CacheHdr->ch_maxtup) { - for (elt = DLGetTail(&CacheHdr->ch_lrulist); - elt; - elt = DLGetPred(elt)) + Dlelem *prevelt; + + for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt) { CatCTup *oldct = (CatCTup *) DLE_VAL(elt); + prevelt = DLGetPred(elt); + if (oldct->refcount == 0) { - CACHE2_elog(LOG, "SearchCatCache(%s): Overflow, LRU removal", + CACHE2_elog(DEBUG1, "SearchCatCache(%s): Overflow, LRU removal", cache->cc_relname); +#ifdef CATCACHE_STATS + oldct->my_cache->cc_discards++; +#endif CatCacheRemoveCTup(oldct->my_cache, oldct); - break; + if (CacheHdr->ch_ntup <= CacheHdr->ch_maxtup) + break; } } } - CACHE4_elog(LOG, "SearchCatCache(%s): Contains %d/%d tuples", + CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples", cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup); - CACHE3_elog(LOG, "SearchCatCache(%s): put in bucket %d", - cache->cc_relname, hash); + + if (ct->negative) + { + CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d", + cache->cc_relname, hashIndex); + + /* + * We are not returning the new entry to the caller, so reset its + * refcount. Note it would be uncool to set the refcount to 0 + * before doing the extra-entry removal step above. + */ + ct->refcount = 0; /* negative entries never have refs */ + + return NULL; + } + + CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d", + cache->cc_relname, hashIndex); + +#ifdef CATCACHE_STATS + cache->cc_newloads++; +#endif return &ct->tuple; } @@ -1164,7 +1340,7 @@ ReleaseCatCache(HeapTuple tuple) * * This is part of a rather subtle chain of events, so pay attention: * - * When a tuple is updated or deleted, it cannot be flushed from the + * When a tuple is inserted or deleted, it cannot be flushed from the * catcaches immediately, for reasons explained at the top of cache/inval.c. * Instead we have to add entry(s) for the tuple to a list of pending tuple * invalidations that will be done at the end of the command or transaction. @@ -1172,15 +1348,16 @@ ReleaseCatCache(HeapTuple tuple) * The lists of tuples that need to be flushed are kept by inval.c. This * routine is a helper routine for inval.c. Given a tuple belonging to * the specified relation, find all catcaches it could be in, compute the - * correct hashindex for each such catcache, and call the specified function - * to record the cache id, hashindex, and tuple ItemPointer in inval.c's + * correct hash value for each such catcache, and call the specified function + * to record the cache id, hash value, and tuple ItemPointer in inval.c's * lists. CatalogCacheIdInvalidate will be called later, if appropriate, * using the recorded information. * * Note that it is irrelevant whether the given tuple is actually loaded * into the catcache at the moment. Even if it's not there now, it might - * be by the end of the command --- or might be in other backends' caches - * --- so we have to be prepared to flush it. + * be by the end of the command, or there might be a matching negative entry + * to flush --- or other backends' caches might have such entries --- so + * we have to make list entries to flush it later. * * Also note that it's not an error if there are no catcaches for the * specified relation. inval.c doesn't know exactly which rels have @@ -1190,11 +1367,12 @@ ReleaseCatCache(HeapTuple tuple) void PrepareToInvalidateCacheTuple(Relation relation, HeapTuple tuple, - void (*function) (int, Index, ItemPointer, Oid)) + void (*function) (int, uint32, ItemPointer, Oid)) { CatCache *ccp; + Oid reloid; - CACHE1_elog(LOG, "PrepareToInvalidateCacheTuple: called"); + CACHE1_elog(DEBUG1, "PrepareToInvalidateCacheTuple: called"); /* * sanity checks @@ -1204,25 +1382,27 @@ PrepareToInvalidateCacheTuple(Relation relation, Assert(PointerIsValid(function)); Assert(CacheHdr != NULL); + reloid = RelationGetRelid(relation); + /* ---------------- * for each cache * if the cache contains tuples from the specified relation - * compute the tuple's hash index in this cache, + * compute the tuple's hash value in this cache, * and call the passed function to register the information. * ---------------- */ for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next) { - if (strcmp(ccp->cc_relname, RelationGetRelationName(relation)) != 0) - continue; - /* Just in case cache hasn't finished initialization yet... */ if (ccp->cc_tupdesc == NULL) CatalogCacheInitializeCache(ccp); + if (ccp->cc_reloid != reloid) + continue; + (*function) (ccp->id, - CatalogCacheComputeTupleHashIndex(ccp, tuple), + CatalogCacheComputeTupleHashValue(ccp, tuple), &tuple->t_self, ccp->cc_relisshared ? (Oid) 0 : MyDatabaseId); } diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 0e383b32c8..6d397a7ba9 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -5,26 +5,38 @@ * * This is subtle stuff, so pay attention: * - * When a tuple is updated or deleted, our time qualification rules consider - * that it is *still valid* so long as we are in the same command, ie, - * until the next CommandCounterIncrement() or transaction commit. - * (See utils/time/tqual.c.) At the command boundary, the old tuple stops + * When a tuple is updated or deleted, our standard time qualification rules + * consider that it is *still valid* so long as we are in the same command, + * ie, until the next CommandCounterIncrement() or transaction commit. + * (See utils/time/tqual.c, and note that system catalogs are generally + * scanned under SnapshotNow rules by the system, or plain user snapshots + * for user queries.) At the command boundary, the old tuple stops * being valid and the new version, if any, becomes valid. Therefore, * we cannot simply flush a tuple from the system caches during heap_update() * or heap_delete(). The tuple is still good at that point; what's more, * even if we did flush it, it might be reloaded into the caches by a later * request in the same command. So the correct behavior is to keep a list * of outdated (updated/deleted) tuples and then do the required cache - * flushes at the next command boundary. Similarly, we need a list of - * inserted tuples (including new versions of updated tuples), which we will - * use to flush those tuples out of the caches if we abort the transaction. - * Notice that the first list lives only till command boundary, whereas the - * second lives till end of transaction. Finally, we need a third list of - * all tuples outdated in the current transaction; if we commit, we send - * those invalidation events to all other backends (via the SI message queue) - * so that they can flush obsolete entries from their caches. This list - * definitely can't be processed until after we commit, otherwise the other - * backends won't see our updated tuples as good. + * flushes at the next command boundary. We must also keep track of + * inserted tuples so that we can flush "negative" cache entries that match + * the new tuples; again, that mustn't happen until end of command. + * + * Once we have finished the command, we still need to remember inserted + * tuples (including new versions of updated tuples), so that we can flush + * them from the caches if we abort the transaction. Similarly, we'd better + * be able to flush "negative" cache entries that may have been loaded in + * place of deleted tuples, so we still need the deleted ones too. + * + * If we successfully complete the transaction, we have to broadcast all + * these invalidation events to other backends (via the SI message queue) + * so that they can flush obsolete entries from their caches. Note we have + * to record the transaction commit before sending SI messages, otherwise + * the other backends won't see our updated tuples as good. + * + * In short, we need to remember until xact end every insert or delete + * of a tuple that might be in the system caches. Updates are treated as + * two events, delete + insert, for simplicity. (There are cases where + * it'd be possible to record just one event, but we don't currently try.) * * We do not need to register EVERY tuple operation in this way, just those * on tuples in relations that have associated catcaches. We do, however, @@ -62,7 +74,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/cache/inval.c,v 1.48 2002/02/19 20:11:17 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/cache/inval.c,v 1.49 2002/03/03 17:47:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -99,34 +111,27 @@ typedef struct InvalidationListHeader /* * ---------------- - * Invalidation info is divided into three parts. - * 1) shared invalidation to be sent to all backends at commit - * 2) local invalidation for the transaction itself (actually, just - * for the current command within the transaction) - * 3) rollback information for the transaction itself (in case we abort) + * Invalidation info is divided into two lists: + * 1) events so far in current command, not yet reflected to caches. + * 2) events in previous commands of current transaction; these have + * been reflected to local caches, and must be either broadcast to + * other backends or rolled back from local cache when we commit + * or abort the transaction. + * + * The relcache-file-invalidated flag can just be a simple boolean, + * since we only act on it at transaction commit; we don't care which + * command of the transaction set it. * ---------------- */ -/* - * head of invalidation message list for all backends - * eaten by AtCommit_Cache() in CommitTransaction() - */ -static InvalidationListHeader GlobalInvalidMsgs; +/* head of current-command event list */ +static InvalidationListHeader CurrentCmdInvalidMsgs; + +/* head of previous-commands event list */ +static InvalidationListHeader PriorCmdInvalidMsgs; static bool RelcacheInitFileInval; /* init file must be invalidated? */ -/* - * head of invalidation message list for the current command - * eaten by AtCommit_LocalCache() in CommandCounterIncrement() - */ -static InvalidationListHeader LocalInvalidMsgs; - -/* - * head of rollback message list for abort-time processing - * eaten by AtAbort_Cache() in AbortTransaction() - */ -static InvalidationListHeader RollbackMsgs; - /* ---------------------------------------------------------------- * Invalidation list support functions @@ -204,6 +209,29 @@ FreeInvalidationMessageList(InvalidationChunk **listHdr) } } +/* + * Append one list of invalidation message chunks to another, resetting + * the source chunk-list pointer to NULL. + */ +static void +AppendInvalidationMessageList(InvalidationChunk **destHdr, + InvalidationChunk **srcHdr) +{ + InvalidationChunk *chunk = *srcHdr; + + if (chunk == NULL) + return; /* nothing to do */ + + while (chunk->next != NULL) + chunk = chunk->next; + + chunk->next = *destHdr; + + *destHdr = *srcHdr; + + *srcHdr = NULL; +} + /* * Process a list of invalidation messages. * @@ -238,15 +266,15 @@ FreeInvalidationMessageList(InvalidationChunk **listHdr) */ static void AddCatcacheInvalidationMessage(InvalidationListHeader *hdr, - int id, Index hashIndex, + int id, uint32 hashValue, ItemPointer tuplePtr, Oid dbId) { SharedInvalidationMessage msg; msg.cc.id = (int16) id; - msg.cc.hashIndex = (uint16) hashIndex; - msg.cc.dbId = dbId; msg.cc.tuplePtr = *tuplePtr; + msg.cc.dbId = dbId; + msg.cc.hashValue = hashValue; AddInvalidationMessage(&hdr->cclist, &msg); } @@ -271,6 +299,18 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, AddInvalidationMessage(&hdr->rclist, &msg); } +/* + * Append one list of invalidation messages to another, resetting + * the source list to empty. + */ +static void +AppendInvalidationMessages(InvalidationListHeader *dest, + InvalidationListHeader *src) +{ + AppendInvalidationMessageList(&dest->cclist, &src->cclist); + AppendInvalidationMessageList(&dest->rclist, &src->rclist); +} + /* * Reset an invalidation list to empty * @@ -318,21 +358,16 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr, /* * RegisterCatcacheInvalidation * - * Register an invalidation event for an updated/deleted catcache entry. - * We insert the event into both GlobalInvalidMsgs (for transmission - * to other backends at transaction commit) and LocalInvalidMsgs (for - * my local invalidation at end of command within xact). + * Register an invalidation event for a catcache tuple entry. */ static void RegisterCatcacheInvalidation(int cacheId, - Index hashIndex, + uint32 hashValue, ItemPointer tuplePtr, Oid dbId) { - AddCatcacheInvalidationMessage(&GlobalInvalidMsgs, - cacheId, hashIndex, tuplePtr, dbId); - AddCatcacheInvalidationMessage(&LocalInvalidMsgs, - cacheId, hashIndex, tuplePtr, dbId); + AddCatcacheInvalidationMessage(&CurrentCmdInvalidMsgs, + cacheId, hashValue, tuplePtr, dbId); } /* @@ -343,11 +378,8 @@ RegisterCatcacheInvalidation(int cacheId, static void RegisterRelcacheInvalidation(Oid dbId, Oid relId) { - AddRelcacheInvalidationMessage(&GlobalInvalidMsgs, + AddRelcacheInvalidationMessage(&CurrentCmdInvalidMsgs, dbId, relId); - AddRelcacheInvalidationMessage(&LocalInvalidMsgs, - dbId, relId); - /* * If the relation being invalidated is one of those cached in the * relcache init file, mark that we need to zap that file at commit. @@ -356,34 +388,6 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId) RelcacheInitFileInval = true; } -/* - * RegisterCatcacheRollback - * - * Register an invalidation event for an inserted catcache entry. - * This only needs to be flushed out of my local catcache, if I abort. - */ -static void -RegisterCatcacheRollback(int cacheId, - Index hashIndex, - ItemPointer tuplePtr, - Oid dbId) -{ - AddCatcacheInvalidationMessage(&RollbackMsgs, - cacheId, hashIndex, tuplePtr, dbId); -} - -/* - * RegisterRelcacheRollback - * - * As above, but register a relcache invalidation event. - */ -static void -RegisterRelcacheRollback(Oid dbId, Oid relId) -{ - AddRelcacheInvalidationMessage(&RollbackMsgs, - dbId, relId); -} - /* * LocalExecuteInvalidationMessage * @@ -398,7 +402,7 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) { if (msg->cc.dbId == MyDatabaseId || msg->cc.dbId == 0) CatalogCacheIdInvalidate(msg->cc.id, - msg->cc.hashIndex, + msg->cc.hashValue, &msg->cc.tuplePtr); } else if (msg->id == SHAREDINVALRELCACHE_ID) @@ -438,7 +442,7 @@ InvalidateSystemCaches(void) */ static void PrepareForTupleInvalidation(Relation relation, HeapTuple tuple, - void (*CacheIdRegisterFunc) (int, Index, + void (*CacheIdRegisterFunc) (int, uint32, ItemPointer, Oid), void (*RelationIdRegisterFunc) (Oid, Oid)) { @@ -517,16 +521,18 @@ AcceptInvalidationMessages(void) * AtEOXactInvalidationMessages * Process queued-up invalidation messages at end of transaction. * - * If isCommit, we must send out the messages in our GlobalInvalidMsgs list + * If isCommit, we must send out the messages in our PriorCmdInvalidMsgs list * to the shared invalidation message queue. Note that these will be read * not only by other backends, but also by our own backend at the next - * transaction start (via AcceptInvalidationMessages). Therefore, it's okay - * to discard any pending LocalInvalidMsgs, since these will be redundant - * with the global list. + * transaction start (via AcceptInvalidationMessages). This means that + * we can skip immediate local processing of anything that's still in + * CurrentCmdInvalidMsgs, and just send that list out too. * * If not isCommit, we are aborting, and must locally process the messages - * in our RollbackMsgs list. No messages need be sent to other backends, - * since they'll not have seen our changed tuples anyway. + * in PriorCmdInvalidMsgs. No messages need be sent to other backends, + * since they'll not have seen our changed tuples anyway. We can forget + * about CurrentCmdInvalidMsgs too, since those changes haven't touched + * the caches yet. * * In any case, reset the various lists to empty. We need not physically * free memory here, since TopTransactionContext is about to be emptied @@ -548,7 +554,10 @@ AtEOXactInvalidationMessages(bool isCommit) if (RelcacheInitFileInval) RelationCacheInitFileInvalidate(true); - ProcessInvalidationMessages(&GlobalInvalidMsgs, + AppendInvalidationMessages(&PriorCmdInvalidMsgs, + &CurrentCmdInvalidMsgs); + + ProcessInvalidationMessages(&PriorCmdInvalidMsgs, SendSharedInvalidMessage); if (RelcacheInitFileInval) @@ -556,15 +565,14 @@ AtEOXactInvalidationMessages(bool isCommit) } else { - ProcessInvalidationMessages(&RollbackMsgs, + ProcessInvalidationMessages(&PriorCmdInvalidMsgs, LocalExecuteInvalidationMessage); } RelcacheInitFileInval = false; - DiscardInvalidationMessages(&GlobalInvalidMsgs, false); - DiscardInvalidationMessages(&LocalInvalidMsgs, false); - DiscardInvalidationMessages(&RollbackMsgs, false); + DiscardInvalidationMessages(&PriorCmdInvalidMsgs, false); + DiscardInvalidationMessages(&CurrentCmdInvalidMsgs, false); } /* @@ -573,13 +581,13 @@ AtEOXactInvalidationMessages(bool isCommit) * in a transaction. * * Here, we send no messages to the shared queue, since we don't know yet if - * we will commit. But we do need to locally process the LocalInvalidMsgs - * list, so as to flush our caches of any tuples we have outdated in the - * current command. + * we will commit. We do need to locally process the CurrentCmdInvalidMsgs + * list, so as to flush our caches of any entries we have outdated in the + * current command. We then move the current-cmd list over to become part + * of the prior-cmds list. * * The isCommit = false case is not currently used, but may someday be * needed to support rollback to a savepoint within a transaction. - * (I suspect it needs more work first --- tgl.) * * Note: * This should be called during CommandCounterIncrement(), @@ -590,29 +598,24 @@ CommandEndInvalidationMessages(bool isCommit) { if (isCommit) { - ProcessInvalidationMessages(&LocalInvalidMsgs, + ProcessInvalidationMessages(&CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); + AppendInvalidationMessages(&PriorCmdInvalidMsgs, + &CurrentCmdInvalidMsgs); } else { - ProcessInvalidationMessages(&RollbackMsgs, - LocalExecuteInvalidationMessage); + /* XXX what needs to be done here? */ } - - /* - * LocalInvalidMsgs list is not interesting anymore, so flush it (for - * real). Do *not* clear GlobalInvalidMsgs or RollbackMsgs. - */ - DiscardInvalidationMessages(&LocalInvalidMsgs, true); } /* - * RelationInvalidateHeapTuple + * CacheInvalidateHeapTuple * Register the given tuple for invalidation at end of command * (ie, current command is outdating this tuple). */ void -RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple) +CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple) { PrepareForTupleInvalidation(relation, tuple, RegisterCatcacheInvalidation, @@ -620,14 +623,17 @@ RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple) } /* - * RelationMark4RollbackHeapTuple - * Register the given tuple for invalidation in case of abort - * (ie, current command is creating this tuple). + * CacheInvalidateRelcache + * Register invalidation of the specified relation's relcache entry + * at end of command. + * + * This is used in places that need to force relcache rebuild but aren't + * changing any of the tuples recognized as contributors to the relcache + * entry by PrepareForTupleInvalidation. (An example is dropping an index.) */ void -RelationMark4RollbackHeapTuple(Relation relation, HeapTuple tuple) +CacheInvalidateRelcache(Oid relationId) { - PrepareForTupleInvalidation(relation, tuple, - RegisterCatcacheRollback, - RegisterRelcacheRollback); + /* See KLUGE ALERT in PrepareForTupleInvalidation */ + RegisterRelcacheInvalidation(MyDatabaseId, relationId); } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index cd5162ac81..e3b0c69372 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.152 2002/02/19 20:11:17 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.153 2002/03/03 17:47:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2065,10 +2065,16 @@ RelationBuildLocalRelation(const char *relname, rel->rd_isnailed = true; /* - * create a new tuple descriptor from the one passed in (we do this to - * copy it into the cache context) + * create a new tuple descriptor from the one passed in. We do this + * partly to copy it into the cache context, and partly because the + * new relation can't have any defaults or constraints yet; they + * have to be added in later steps, because they require additions + * to multiple system catalogs. We can copy attnotnull constraints + * here, however. */ - rel->rd_att = CreateTupleDescCopyConstr(tupDesc); + rel->rd_att = CreateTupleDescCopy(tupDesc); + for (i = 0; i < natts; i++) + rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull; /* * initialize relation tuple form (caller may add/override data later) @@ -2082,8 +2088,6 @@ RelationBuildLocalRelation(const char *relname, rel->rd_rel->relhasoids = true; rel->rd_rel->relnatts = natts; rel->rd_rel->reltype = InvalidOid; - if (tupDesc->constr) - rel->rd_rel->relchecks = tupDesc->constr->num_check; /* * Insert relation OID and database/tablespace ID into the right diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index af88e066e2..e679910e25 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: sinval.h,v 1.25 2001/11/05 17:46:35 momjian Exp $ + * $Id: sinval.h,v 1.26 2002/03/03 17:47:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -28,22 +28,32 @@ * are available to identify other inval message types. * * Shared-inval events are initially driven by detecting tuple inserts, - * updates and deletions in system catalogs (see RelationInvalidateHeapTuple - * and RelationMark4RollbackHeapTuple). Note that some system catalogs have - * multiple caches on them (with different indexes). On detecting a tuple - * invalidation in such a catalog, a separate catcache inval message must be - * generated for each of its caches. The catcache inval message carries the - * hash index for the target tuple, so that the catcache only needs to search - * one hash chain not all its chains. Of course this assumes that all the - * backends are using identical hashing code, but that should be OK. + * updates and deletions in system catalogs (see CacheInvalidateHeapTuple). + * An update generates two inval events, one for the old tuple and one for + * the new --- this is needed to get rid of both positive entries for the + * old tuple, and negative cache entries associated with the new tuple's + * cache key. (This could perhaps be optimized down to one event when the + * cache key is not changing, but for now we don't bother to try.) Note that + * the inval events themselves don't actually say whether the tuple is being + * inserted or deleted. + * + * Note that some system catalogs have multiple caches on them (with different + * indexes). On detecting a tuple invalidation in such a catalog, separate + * catcache inval messages must be generated for each of its caches. The + * catcache inval messages carry the hash value for the target tuple, so + * that the catcache only needs to search one hash chain not all its chains, + * and so that negative cache entries can be recognized with good accuracy. + * (Of course this assumes that all the backends are using identical hashing + * code, but that should be OK.) */ typedef struct { + /* note: field layout chosen with an eye to alignment concerns */ int16 id; /* cache ID --- must be first */ - uint16 hashIndex; /* hashchain index within this catcache */ - Oid dbId; /* database ID, or 0 if a shared relation */ ItemPointerData tuplePtr; /* tuple identifier in cached relation */ + Oid dbId; /* database ID, or 0 if a shared relation */ + uint32 hashValue; /* hash value of key for this catcache */ } SharedInvalCatcacheMsg; #define SHAREDINVALRELCACHE_ID (-1) diff --git a/src/include/utils/catcache.h b/src/include/utils/catcache.h index 6ad0d74851..cf820e37b8 100644 --- a/src/include/utils/catcache.h +++ b/src/include/utils/catcache.h @@ -13,7 +13,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: catcache.h,v 1.38 2002/02/19 20:11:19 tgl Exp $ + * $Id: catcache.h,v 1.39 2002/03/03 17:47:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -35,20 +35,28 @@ typedef struct catcache struct catcache *cc_next; /* link to next catcache */ char *cc_relname; /* name of relation the tuples come from */ char *cc_indname; /* name of index matching cache keys */ - int cc_reloidattr; /* AttrNumber of relation OID, or 0 */ + Oid cc_reloid; /* OID of relation the tuples come from */ bool cc_relisshared; /* is relation shared? */ TupleDesc cc_tupdesc; /* tuple descriptor (copied from reldesc) */ + int cc_reloidattr; /* AttrNumber of relation OID attr, or 0 */ int cc_ntup; /* # of tuples currently in this cache */ int cc_size; /* # of hash buckets in this cache */ int cc_nkeys; /* number of keys (1..4) */ int cc_key[4]; /* AttrNumber of each key */ PGFunction cc_hashfunc[4]; /* hash function to use for each key */ ScanKeyData cc_skey[4]; /* precomputed key info for heap scans */ + bool cc_isname[4]; /* flag key columns that are NAMEs */ #ifdef CATCACHE_STATS long cc_searches; /* total # searches against this cache */ long cc_hits; /* # of matches against existing entry */ + long cc_neg_hits; /* # of matches against negative entry */ long cc_newloads; /* # of successful loads of new entry */ - /* cc_searches - (cc_hits + cc_newloads) is # of failed searches */ + /* + * cc_searches - (cc_hits + cc_neg_hits + cc_newloads) is number of + * failed searches, each of which will result in loading a negative entry + */ + long cc_invals; /* # of entries invalidated from cache */ + long cc_discards; /* # of entries discarded due to overflow */ #endif Dllist cc_bucket[1]; /* hash buckets --- VARIABLE LENGTH ARRAY */ } CatCache; /* VARIABLE LENGTH STRUCT */ @@ -68,11 +76,18 @@ typedef struct catctup * A tuple marked "dead" must not be returned by subsequent searches. * However, it won't be physically deleted from the cache until its * refcount goes to zero. + * + * A negative cache entry is an assertion that there is no tuple + * matching a particular key. This is just as useful as a normal entry + * so far as avoiding catalog searches is concerned. Management of + * positive and negative entries is identical. */ Dlelem lrulist_elem; /* list member of global LRU list */ Dlelem cache_elem; /* list member of per-bucket list */ int refcount; /* number of active references */ bool dead; /* dead but not yet removed? */ + bool negative; /* negative cache entry? */ + uint32 hash_value; /* hash value for this tuple's keys */ HeapTupleData tuple; /* tuple management header */ } CatCTup; @@ -104,10 +119,10 @@ extern void ReleaseCatCache(HeapTuple tuple); extern void ResetCatalogCaches(void); extern void CatalogCacheFlushRelation(Oid relId); -extern void CatalogCacheIdInvalidate(int cacheId, Index hashIndex, +extern void CatalogCacheIdInvalidate(int cacheId, uint32 hashValue, ItemPointer pointer); extern void PrepareToInvalidateCacheTuple(Relation relation, HeapTuple tuple, - void (*function) (int, Index, ItemPointer, Oid)); + void (*function) (int, uint32, ItemPointer, Oid)); #endif /* CATCACHE_H */ diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 25fb73d570..d2286ed54e 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: inval.h,v 1.23 2001/11/05 17:46:36 momjian Exp $ + * $Id: inval.h,v 1.24 2002/03/03 17:47:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,8 +23,8 @@ extern void AtEOXactInvalidationMessages(bool isCommit); extern void CommandEndInvalidationMessages(bool isCommit); -extern void RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple); +extern void CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple); -extern void RelationMark4RollbackHeapTuple(Relation relation, HeapTuple tuple); +extern void CacheInvalidateRelcache(Oid relationId); #endif /* INVAL_H */