From ac9099fc1dd460bffaafec19272159dd7bc86f5b Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sun, 4 Apr 2021 14:28:35 -0400 Subject: [PATCH] Fix confusion in SP-GiST between attribute type and leaf storage type. According to the documentation, the attType passed to the opclass config function (and also relied on by the core code) is the type of the heap column or expression being indexed. But what was actually being passed was the type stored for the index column. This made no difference for user-defined SP-GiST opclasses, because we weren't allowing the STORAGE clause of CREATE OPCLASS to be used, so the two types would be the same. But it's silly not to allow that, seeing that the built-in poly_ops opclass has a different value for opckeytype than opcintype, and that if you want to do lossy storage then the types must really be different. (Thus, user-defined opclasses doing lossy storage had to lie about what type is in the index.) Hence, remove the restriction, and make sure that we use the input column type not opckeytype where relevant. For reasons of backwards compatibility with existing user-defined opclasses, we can't quite insist that the specified leafType match the STORAGE clause; instead just add an amvalidate() warning if they don't match. Also fix some bugs that would only manifest when trying to return index entries when attType is different from attLeafType. It's not too surprising that these have not been reported, because the only usual reason for such a difference is to store the leaf value lossily, rendering index-only scans impossible. Add a src/test/modules module to exercise cases where attType is different from attLeafType and yet index-only scan is supported. Discussion: https://postgr.es/m/3728741.1617381471@sss.pgh.pa.us --- doc/src/sgml/ref/create_opclass.sgml | 2 +- doc/src/sgml/spgist.sgml | 74 ++- src/backend/access/spgist/spgscan.c | 31 +- src/backend/access/spgist/spgutils.c | 84 ++- src/backend/access/spgist/spgvalidate.c | 27 +- src/include/access/spgist_private.h | 6 +- src/test/modules/Makefile | 1 + src/test/modules/spgist_name_ops/.gitignore | 4 + src/test/modules/spgist_name_ops/Makefile | 23 + src/test/modules/spgist_name_ops/README | 8 + .../expected/spgist_name_ops.out | 95 ++++ .../spgist_name_ops/spgist_name_ops--1.0.sql | 54 ++ .../modules/spgist_name_ops/spgist_name_ops.c | 501 ++++++++++++++++++ .../spgist_name_ops/spgist_name_ops.control | 4 + .../spgist_name_ops/sql/spgist_name_ops.sql | 38 ++ src/test/regress/expected/rangetypes.out | 19 + src/test/regress/sql/rangetypes.sql | 10 + 17 files changed, 937 insertions(+), 44 deletions(-) create mode 100644 src/test/modules/spgist_name_ops/.gitignore create mode 100644 src/test/modules/spgist_name_ops/Makefile create mode 100644 src/test/modules/spgist_name_ops/README create mode 100644 src/test/modules/spgist_name_ops/expected/spgist_name_ops.out create mode 100644 src/test/modules/spgist_name_ops/spgist_name_ops--1.0.sql create mode 100644 src/test/modules/spgist_name_ops/spgist_name_ops.c create mode 100644 src/test/modules/spgist_name_ops/spgist_name_ops.control create mode 100644 src/test/modules/spgist_name_ops/sql/spgist_name_ops.sql diff --git a/doc/src/sgml/ref/create_opclass.sgml b/doc/src/sgml/ref/create_opclass.sgml index 2d75a1c0b0..4f1bfae822 100644 --- a/doc/src/sgml/ref/create_opclass.sgml +++ b/doc/src/sgml/ref/create_opclass.sgml @@ -234,7 +234,7 @@ CREATE OPERATOR CLASS name [ DEFAUL The data type actually stored in the index. Normally this is the same as the column data type, but some index methods - (currently GiST, GIN and BRIN) allow it to be different. The + (currently GiST, GIN, SP-GiST and BRIN) allow it to be different. The STORAGE clause must be omitted unless the index method allows a different type to be used. If the column data_type is specified diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml index ea88ae45e5..054234784f 100644 --- a/doc/src/sgml/spgist.sgml +++ b/doc/src/sgml/spgist.sgml @@ -205,10 +205,12 @@ - Leaf tuples of an SP-GiST tree contain values of the - same data type as the indexed column. Leaf tuples at the root level will - always contain the original indexed data value, but leaf tuples at lower - levels might contain only a compressed representation, such as a suffix. + Leaf tuples of an SP-GiST tree usually contain values + of the same data type as the indexed column, although it is also possible + for them to contain lossy representations of the indexed column. + Leaf tuples stored at the root level will directly represent + the original indexed data value, but leaf tuples at lower + levels might contain only a partial value, such as a suffix. In that case the operator class support functions must be able to reconstruct the original value using information accumulated from the inner tuples that are passed through to reach the leaf level. @@ -330,19 +332,29 @@ typedef struct spgConfigOut - leafType is typically the same as - attType. For the reasons of backward - compatibility, method config can - leave leafType uninitialized; that would - give the same effect as setting leafType equal - to attType. When attType - and leafType are different, then optional + leafType should match the index storage type + defined by the operator class's opckeytype + catalog entry. + (Note that opckeytype can be zero, + implying the storage type is the same as the operator class's input + type, which is the most common situation.) + For reasons of backward compatibility, the config + method can set leafType to some other value, + and that value will be used; but this is deprecated since the index + contents are then incorrectly identified in the catalogs. + Also, it's permissible to + leave leafType uninitialized (zero); + that is interpreted as meaning the index storage type derived from + opckeytype. + + + + When attType + and leafType are different, the optional method compress must be provided. Method compress is responsible for transformation of datums to be indexed from attType to leafType. - Note: both consistent functions will get scankeys - unchanged, without transformation using compress. @@ -677,8 +689,7 @@ typedef struct spgInnerConsistentOut reconstructedValue is the value reconstructed for the parent tuple; it is (Datum) 0 at the root level or if the inner_consistent function did not provide a value at the - parent level. reconstructedValue is always of - spgConfigOut.leafType type. + parent level. traversalValue is a pointer to any traverse data passed down from the previous call of inner_consistent on the parent index tuple, or NULL at the root level. @@ -713,9 +724,14 @@ typedef struct spgInnerConsistentOut necessarily so, so an array is used.) If value reconstruction is needed, set reconstructedValues to an array of the values - of spgConfigOut.leafType type reconstructed for each child node to be visited; otherwise, leave reconstructedValues as NULL. + The reconstructed values are assumed to be of type + spgConfigOut.leafType. + (However, since the core system will do nothing with them except + possibly copy them, it is sufficient for them to have the + same typlen and typbyval + properties as leafType.) If ordered search is performed, set distances to an array of distance values according to orderbys array (nodes with lowest distances will be processed first). Leave it @@ -797,8 +813,7 @@ typedef struct spgLeafConsistentOut reconstructedValue is the value reconstructed for the parent tuple; it is (Datum) 0 at the root level or if the inner_consistent function did not provide a value at the - parent level. reconstructedValue is always of - spgConfigOut.leafType type. + parent level. traversalValue is a pointer to any traverse data passed down from the previous call of inner_consistent on the parent index tuple, or NULL at the root level. @@ -816,8 +831,8 @@ typedef struct spgLeafConsistentOut The function must return true if the leaf tuple matches the query, or false if not. In the true case, if returnData is true then - leafValue must be set to the value of - spgConfigIn.attType type + leafValue must be set to the value (of type + spgConfigIn.attType) originally supplied to be indexed for this leaf tuple. Also, recheck may be set to true if the match is uncertain and so the operator(s) must be re-applied to the actual @@ -834,7 +849,7 @@ typedef struct spgLeafConsistentOut - The optional user-defined method are: + The optional user-defined methods are: @@ -842,15 +857,22 @@ typedef struct spgLeafConsistentOut Datum compress(Datum in) - Converts the data item into a format suitable for physical storage in - a leaf tuple of index page. It accepts + Converts a data item into a format suitable for physical storage in + a leaf tuple of the index. It accepts a value of type spgConfigIn.attType - value and returns - spgConfigOut.leafType - value. Output value should not be toasted. + and returns a value of type + spgConfigOut.leafType. + The output value must not contain an out-of-line TOAST pointer. + + + + Note: the compress method is only applied to + values to be stored. The consistent methods receive query scankeys + unchanged, without transformation using compress. + options diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index 20e67c3f7d..06011d2c02 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -81,7 +81,10 @@ pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a, static void spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item) { - if (!so->state.attLeafType.attbyval && + /* value is of type attType if isLeaf, else of type attLeafType */ + /* (no, that is not backwards; yes, it's confusing) */ + if (!(item->isLeaf ? so->state.attType.attbyval : + so->state.attLeafType.attbyval) && DatumGetPointer(item->value) != NULL) pfree(DatumGetPointer(item->value)); @@ -296,6 +299,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) { IndexScanDesc scan; SpGistScanOpaque so; + TupleDesc outTupDesc; int i; scan = RelationGetIndexScan(rel, keysz, orderbysz); @@ -314,8 +318,21 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) "SP-GiST traversal-value context", ALLOCSET_DEFAULT_SIZES); - /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */ - so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel); + /* + * Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan. + * (It's rather annoying to do this work when it might be wasted, but for + * most opclasses we can re-use the index reldesc instead of making one.) + */ + if (so->state.attType.type == + TupleDescAttr(RelationGetDescr(rel), 0)->atttypid) + outTupDesc = RelationGetDescr(rel); + else + { + outTupDesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(outTupDesc, 1, NULL, + so->state.attType.type, -1, 0); + } + so->indexTupDesc = scan->xs_hitupdesc = outTupDesc; /* Allocate various arrays needed for order-by scans */ if (scan->numberOfOrderBys > 0) @@ -447,9 +464,10 @@ spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr, item->level = level; item->heapPtr = *heapPtr; /* copy value to queue cxt out of tmp cxt */ + /* caution: "leafValue" is of type attType not leafType */ item->value = isnull ? (Datum) 0 : - datumCopy(leafValue, so->state.attLeafType.attbyval, - so->state.attLeafType.attlen); + datumCopy(leafValue, so->state.attType.attbyval, + so->state.attType.attlen); item->traversalValue = NULL; item->isLeaf = true; item->recheck = recheck; @@ -497,6 +515,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, in.nkeys = so->numberOfKeys; in.orderbys = so->orderByData; in.norderbys = so->numberOfNonNullOrderBys; + Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */ in.reconstructedValue = item->value; in.traversalValue = item->traversalValue; in.level = item->level; @@ -563,6 +582,7 @@ spgInitInnerConsistentIn(spgInnerConsistentIn *in, in->orderbys = so->orderByData; in->nkeys = so->numberOfKeys; in->norderbys = so->numberOfNonNullOrderBys; + Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */ in->reconstructedValue = item->value; in->traversalMemoryContext = so->traversalCxt; in->traversalValue = item->traversalValue; @@ -589,6 +609,7 @@ spgMakeInnerItem(SpGistScanOpaque so, : parentItem->level; /* Must copy value out of temp context */ + /* (recall that reconstructed values are of type leafType) */ item->value = out->reconstructedValues ? datumCopy(out->reconstructedValues[i], so->state.attLeafType.attbyval, diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 949352ada7..f2ba72b5aa 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "catalog/pg_amop.h" #include "commands/vacuum.h" +#include "nodes/nodeFuncs.h" #include "storage/bufmgr.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" @@ -53,7 +54,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amoptionalkey = true; amroutine->amsearcharray = false; amroutine->amsearchnulls = true; - amroutine->amstorage = false; + amroutine->amstorage = true; amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; @@ -89,6 +90,66 @@ spghandler(PG_FUNCTION_ARGS) PG_RETURN_POINTER(amroutine); } +/* + * GetIndexInputType + * Determine the nominal input data type for an index column + * + * We define the "nominal" input type as the associated opclass's opcintype, + * or if that is a polymorphic type, the base type of the heap column or + * expression that is the index's input. The reason for preferring the + * opcintype is that non-polymorphic opclasses probably don't want to hear + * about binary-compatible input types. For instance, if a text opclass + * is being used with a varchar heap column, we want to report "text" not + * "varchar". Likewise, opclasses don't want to hear about domain types, + * so if we do consult the actual input type, we make sure to flatten domains. + * + * At some point maybe this should go somewhere else, but it's not clear + * if any other index AMs have a use for it. + */ +static Oid +GetIndexInputType(Relation index, AttrNumber indexcol) +{ + Oid opcintype; + AttrNumber heapcol; + List *indexprs; + ListCell *indexpr_item; + + Assert(index->rd_index != NULL); + Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts); + opcintype = index->rd_opcintype[indexcol - 1]; + if (!IsPolymorphicType(opcintype)) + return opcintype; + heapcol = index->rd_index->indkey.values[indexcol - 1]; + if (heapcol != 0) /* Simple index column? */ + return getBaseType(get_atttype(index->rd_index->indrelid, heapcol)); + + /* + * If the index expressions are already cached, skip calling + * RelationGetIndexExpressions, as it will make a copy which is overkill. + * We're not going to modify the trees, and we're not going to do anything + * that would invalidate the relcache entry before we're done. + */ + if (index->rd_indexprs) + indexprs = index->rd_indexprs; + else + indexprs = RelationGetIndexExpressions(index); + indexpr_item = list_head(indexprs); + for (int i = 1; i <= index->rd_index->indnkeyatts; i++) + { + if (index->rd_index->indkey.values[i - 1] == 0) + { + /* expression column */ + if (indexpr_item == NULL) + elog(ERROR, "wrong number of index expressions"); + if (i == indexcol) + return getBaseType(exprType((Node *) lfirst(indexpr_item))); + indexpr_item = lnext(indexprs, indexpr_item); + } + } + elog(ERROR, "wrong number of index expressions"); + return InvalidOid; /* keep compiler quiet */ +} + /* Fill in a SpGistTypeDesc struct with info about the specified data type */ static void fillTypeDesc(SpGistTypeDesc *desc, Oid type) @@ -121,11 +182,11 @@ spgGetCache(Relation index) Assert(index->rd_att->natts == 1); /* - * Get the actual data type of the indexed column from the index - * tupdesc. We pass this to the opclass config function so that + * Get the actual (well, nominal) data type of the column being + * indexed. We pass this to the opclass config function so that * polymorphic opclasses are possible. */ - atttype = TupleDescAttr(index->rd_att, 0)->atttypid; + atttype = GetIndexInputType(index, 1); /* Call the config function to get config info for the opclass */ in.attType = atttype; @@ -136,11 +197,21 @@ spgGetCache(Relation index) PointerGetDatum(&in), PointerGetDatum(&cache->config)); + /* + * If leafType isn't specified, use the declared index column type, + * which index.c will have derived from the opclass's opcintype. + * (Although we now make spgvalidate.c warn if these aren't the same, + * old user-defined opclasses may not set the STORAGE parameter + * correctly, so believe leafType if it's given.) + */ + if (!OidIsValid(cache->config.leafType)) + cache->config.leafType = + TupleDescAttr(RelationGetDescr(index), 0)->atttypid; + /* Get the information we need about each relevant datatype */ fillTypeDesc(&cache->attType, atttype); - if (OidIsValid(cache->config.leafType) && - cache->config.leafType != atttype) + if (cache->config.leafType != atttype) { if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC))) ereport(ERROR, @@ -151,6 +222,7 @@ spgGetCache(Relation index) } else { + /* Save lookups in this common case */ cache->attLeafType = cache->attType; } diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c index 8bc3889a4d..472a28b808 100644 --- a/src/backend/access/spgist/spgvalidate.c +++ b/src/backend/access/spgist/spgvalidate.c @@ -43,6 +43,7 @@ spgvalidate(Oid opclassoid) Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; + Oid opckeytype; char *opclassname; HeapTuple familytup; Form_pg_opfamily familyform; @@ -57,6 +58,7 @@ spgvalidate(Oid opclassoid) spgConfigOut configOut; Oid configOutLefttype = InvalidOid; Oid configOutRighttype = InvalidOid; + Oid configOutLeafType = InvalidOid; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -66,6 +68,7 @@ spgvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opckeytype = classform->opckeytype; opclassname = NameStr(classform->opcname); /* Fetch opfamily information */ @@ -118,13 +121,31 @@ spgvalidate(Oid opclassoid) configOutLefttype = procform->amproclefttype; configOutRighttype = procform->amprocrighttype; + /* Default leaf type is opckeytype or input type */ + if (OidIsValid(opckeytype)) + configOutLeafType = opckeytype; + else + configOutLeafType = procform->amproclefttype; + + /* If some other leaf datum type is specified, warn */ + if (OidIsValid(configOut.leafType) && + configOutLeafType != configOut.leafType) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("SP-GiST leaf data type %s does not match declared type %s", + format_type_be(configOut.leafType), + format_type_be(configOutLeafType)))); + result = false; + configOutLeafType = configOut.leafType; + } + /* * When leaf and attribute types are the same, compress * function is not required and we set corresponding bit in * functionset for later group consistency check. */ - if (!OidIsValid(configOut.leafType) || - configOut.leafType == configIn.attType) + if (configOutLeafType == configIn.attType) { foreach(lc, grouplist) { @@ -156,7 +177,7 @@ spgvalidate(Oid opclassoid) ok = false; else ok = check_amproc_signature(procform->amproc, - configOut.leafType, true, + configOutLeafType, true, 1, 1, procform->amproclefttype); break; case SPGIST_OPTIONS_PROC: diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h index b6f95421d8..980aa7766b 100644 --- a/src/include/access/spgist_private.h +++ b/src/include/access/spgist_private.h @@ -151,8 +151,8 @@ typedef struct SpGistState typedef struct SpGistSearchItem { pairingheap_node phNode; /* pairing heap node */ - Datum value; /* value reconstructed from parent or - * leafValue if heaptuple */ + Datum value; /* value reconstructed from parent, or + * leafValue if isLeaf */ void *traversalValue; /* opclass-specific traverse value */ int level; /* level of items on this page */ ItemPointerData heapPtr; /* heap info, if heap tuple */ @@ -208,7 +208,7 @@ typedef struct SpGistScanOpaqueData /* These fields are only used in amgettuple scans: */ bool want_itup; /* are we reconstructing tuples? */ - TupleDesc indexTupDesc; /* if so, tuple descriptor for them */ + TupleDesc indexTupDesc; /* if so, descriptor for reconstructed tuples */ int nPtrs; /* number of TIDs found on current page */ int iPtr; /* index for scanning through same */ ItemPointerData heapPtrs[MaxIndexTuplesPerPage]; /* TIDs from cur page */ diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 93e7829c67..dffc79b2d9 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -13,6 +13,7 @@ SUBDIRS = \ libpq_pipeline \ plsample \ snapshot_too_old \ + spgist_name_ops \ test_bloomfilter \ test_ddl_deparse \ test_extensions \ diff --git a/src/test/modules/spgist_name_ops/.gitignore b/src/test/modules/spgist_name_ops/.gitignore new file mode 100644 index 0000000000..5dcb3ff972 --- /dev/null +++ b/src/test/modules/spgist_name_ops/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/spgist_name_ops/Makefile b/src/test/modules/spgist_name_ops/Makefile new file mode 100644 index 0000000000..05b2464b27 --- /dev/null +++ b/src/test/modules/spgist_name_ops/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/spgist_name_ops/Makefile + +MODULE_big = spgist_name_ops +OBJS = \ + $(WIN32RES) \ + spgist_name_ops.o +PGFILEDESC = "spgist_name_ops - test opclass for SP-GiST" + +EXTENSION = spgist_name_ops +DATA = spgist_name_ops--1.0.sql + +REGRESS = spgist_name_ops + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/spgist_name_ops +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/spgist_name_ops/README b/src/test/modules/spgist_name_ops/README new file mode 100644 index 0000000000..119208b568 --- /dev/null +++ b/src/test/modules/spgist_name_ops/README @@ -0,0 +1,8 @@ +spgist_name_ops implements an SP-GiST operator class that indexes +columns of type "name", but with storage identical to that used +by SP-GiST text_ops. + +This is not terribly useful in itself, perhaps, but it allows +testing cases where the indexed data type is different from the leaf +data type and yet we can reconstruct the original indexed value. +That situation is not tested by any built-in SP-GiST opclass. diff --git a/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out b/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out new file mode 100644 index 0000000000..87b8bd0d5e --- /dev/null +++ b/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out @@ -0,0 +1,95 @@ +create extension spgist_name_ops; +select opcname, amvalidate(opc.oid) +from pg_opclass opc join pg_am am on am.oid = opcmethod +where amname = 'spgist' and opcname = 'name_ops'; + opcname | amvalidate +----------+------------ + name_ops | t +(1 row) + +-- warning expected here +select opcname, amvalidate(opc.oid) +from pg_opclass opc join pg_am am on am.oid = opcmethod +where amname = 'spgist' and opcname = 'name_ops_old'; +INFO: SP-GiST leaf data type text does not match declared type name + opcname | amvalidate +--------------+------------ + name_ops_old | f +(1 row) + +create table t(f1 name); +create index on t using spgist(f1); +\d+ t_f1_idx + Index "public.t_f1_idx" + Column | Type | Key? | Definition | Storage | Stats target +--------+------+------+------------+----------+-------------- + f1 | text | yes | f1 | extended | +spgist, for table "public.t" + +insert into t select proname from pg_proc; +vacuum analyze t; +explain (costs off) +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Sort + Sort Key: f1 + -> Index Only Scan using t_f1_idx on t + Index Cond: ((f1 > 'binary_upgrade_set_n'::name) AND (f1 < 'binary_upgrade_set_p'::name)) +(4 rows) + +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; + f1 +------------------------------------------------------ + binary_upgrade_set_next_array_pg_type_oid + binary_upgrade_set_next_heap_pg_class_oid + binary_upgrade_set_next_index_pg_class_oid + binary_upgrade_set_next_multirange_array_pg_type_oid + binary_upgrade_set_next_multirange_pg_type_oid + binary_upgrade_set_next_pg_authid_oid + binary_upgrade_set_next_pg_enum_oid + binary_upgrade_set_next_pg_type_oid + binary_upgrade_set_next_toast_pg_class_oid +(9 rows) + +drop index t_f1_idx; +create index on t using spgist(f1 name_ops_old); +\d+ t_f1_idx + Index "public.t_f1_idx" + Column | Type | Key? | Definition | Storage | Stats target +--------+------+------+------------+---------+-------------- + f1 | name | yes | f1 | plain | +spgist, for table "public.t" + +explain (costs off) +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Sort + Sort Key: f1 + -> Index Only Scan using t_f1_idx on t + Index Cond: ((f1 > 'binary_upgrade_set_n'::name) AND (f1 < 'binary_upgrade_set_p'::name)) +(4 rows) + +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; + f1 +------------------------------------------------------ + binary_upgrade_set_next_array_pg_type_oid + binary_upgrade_set_next_heap_pg_class_oid + binary_upgrade_set_next_index_pg_class_oid + binary_upgrade_set_next_multirange_array_pg_type_oid + binary_upgrade_set_next_multirange_pg_type_oid + binary_upgrade_set_next_pg_authid_oid + binary_upgrade_set_next_pg_enum_oid + binary_upgrade_set_next_pg_type_oid + binary_upgrade_set_next_toast_pg_class_oid +(9 rows) + diff --git a/src/test/modules/spgist_name_ops/spgist_name_ops--1.0.sql b/src/test/modules/spgist_name_ops/spgist_name_ops--1.0.sql new file mode 100644 index 0000000000..432216ee6d --- /dev/null +++ b/src/test/modules/spgist_name_ops/spgist_name_ops--1.0.sql @@ -0,0 +1,54 @@ +/* src/test/modules/spgist_name_ops/spgist_name_ops--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION spgist_name_ops" to load this file. \quit + +CREATE FUNCTION spgist_name_config(internal, internal) +RETURNS void IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION spgist_name_choose(internal, internal) +RETURNS void IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION spgist_name_inner_consistent(internal, internal) +RETURNS void IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION spgist_name_leaf_consistent(internal, internal) +RETURNS boolean IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION spgist_name_compress(name) +RETURNS text IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE OPERATOR CLASS name_ops DEFAULT FOR TYPE name +USING spgist AS + OPERATOR 1 < , + OPERATOR 2 <= , + OPERATOR 3 = , + OPERATOR 4 >= , + OPERATOR 5 > , + FUNCTION 1 spgist_name_config(internal, internal), + FUNCTION 2 spgist_name_choose(internal, internal), + FUNCTION 3 spg_text_picksplit(internal, internal), + FUNCTION 4 spgist_name_inner_consistent(internal, internal), + FUNCTION 5 spgist_name_leaf_consistent(internal, internal), + FUNCTION 6 spgist_name_compress(name), + STORAGE text; + +-- Also test old-style where the STORAGE clause is disallowed +CREATE OPERATOR CLASS name_ops_old FOR TYPE name +USING spgist AS + OPERATOR 1 < , + OPERATOR 2 <= , + OPERATOR 3 = , + OPERATOR 4 >= , + OPERATOR 5 > , + FUNCTION 1 spgist_name_config(internal, internal), + FUNCTION 2 spgist_name_choose(internal, internal), + FUNCTION 3 spg_text_picksplit(internal, internal), + FUNCTION 4 spgist_name_inner_consistent(internal, internal), + FUNCTION 5 spgist_name_leaf_consistent(internal, internal), + FUNCTION 6 spgist_name_compress(name); diff --git a/src/test/modules/spgist_name_ops/spgist_name_ops.c b/src/test/modules/spgist_name_ops/spgist_name_ops.c new file mode 100644 index 0000000000..63d3faaef4 --- /dev/null +++ b/src/test/modules/spgist_name_ops/spgist_name_ops.c @@ -0,0 +1,501 @@ +/*-------------------------------------------------------------------------- + * + * spgist_name_ops.c + * Test opclass for SP-GiST + * + * This indexes input values of type "name", but the index storage is "text", + * with the same choices as made in the core SP-GiST text_ops opclass. + * Much of the code is identical to src/backend/access/spgist/spgtextproc.c, + * which see for a more detailed header comment. + * + * Unlike spgtextproc.c, we don't bother with collation-aware logic. + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/test/modules/spgist_name_ops/spgist_name_ops.c + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/spgist.h" +#include "catalog/pg_type.h" +#include "utils/datum.h" + +PG_MODULE_MAGIC; + + +PG_FUNCTION_INFO_V1(spgist_name_config); +Datum +spgist_name_config(PG_FUNCTION_ARGS) +{ + /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */ + spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1); + + cfg->prefixType = TEXTOID; + cfg->labelType = INT2OID; + cfg->leafType = TEXTOID; + cfg->canReturnData = true; + cfg->longValuesOK = true; /* suffixing will shorten long values */ + PG_RETURN_VOID(); +} + +/* + * Form a text datum from the given not-necessarily-null-terminated string, + * using short varlena header format if possible + */ +static Datum +formTextDatum(const char *data, int datalen) +{ + char *p; + + p = (char *) palloc(datalen + VARHDRSZ); + + if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX) + { + SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT); + if (datalen) + memcpy(p + VARHDRSZ_SHORT, data, datalen); + } + else + { + SET_VARSIZE(p, datalen + VARHDRSZ); + memcpy(p + VARHDRSZ, data, datalen); + } + + return PointerGetDatum(p); +} + +/* + * Find the length of the common prefix of a and b + */ +static int +commonPrefix(const char *a, const char *b, int lena, int lenb) +{ + int i = 0; + + while (i < lena && i < lenb && *a == *b) + { + a++; + b++; + i++; + } + + return i; +} + +/* + * Binary search an array of int16 datums for a match to c + * + * On success, *i gets the match location; on failure, it gets where to insert + */ +static bool +searchChar(Datum *nodeLabels, int nNodes, int16 c, int *i) +{ + int StopLow = 0, + StopHigh = nNodes; + + while (StopLow < StopHigh) + { + int StopMiddle = (StopLow + StopHigh) >> 1; + int16 middle = DatumGetInt16(nodeLabels[StopMiddle]); + + if (c < middle) + StopHigh = StopMiddle; + else if (c > middle) + StopLow = StopMiddle + 1; + else + { + *i = StopMiddle; + return true; + } + } + + *i = StopHigh; + return false; +} + +PG_FUNCTION_INFO_V1(spgist_name_choose); +Datum +spgist_name_choose(PG_FUNCTION_ARGS) +{ + spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0); + spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1); + Name inName = DatumGetName(in->datum); + char *inStr = NameStr(*inName); + int inSize = strlen(inStr); + char *prefixStr = NULL; + int prefixSize = 0; + int commonLen = 0; + int16 nodeChar = 0; + int i = 0; + + /* Check for prefix match, set nodeChar to first byte after prefix */ + if (in->hasPrefix) + { + text *prefixText = DatumGetTextPP(in->prefixDatum); + + prefixStr = VARDATA_ANY(prefixText); + prefixSize = VARSIZE_ANY_EXHDR(prefixText); + + commonLen = commonPrefix(inStr + in->level, + prefixStr, + inSize - in->level, + prefixSize); + + if (commonLen == prefixSize) + { + if (inSize - in->level > commonLen) + nodeChar = *(unsigned char *) (inStr + in->level + commonLen); + else + nodeChar = -1; + } + else + { + /* Must split tuple because incoming value doesn't match prefix */ + out->resultType = spgSplitTuple; + + if (commonLen == 0) + { + out->result.splitTuple.prefixHasPrefix = false; + } + else + { + out->result.splitTuple.prefixHasPrefix = true; + out->result.splitTuple.prefixPrefixDatum = + formTextDatum(prefixStr, commonLen); + } + out->result.splitTuple.prefixNNodes = 1; + out->result.splitTuple.prefixNodeLabels = + (Datum *) palloc(sizeof(Datum)); + out->result.splitTuple.prefixNodeLabels[0] = + Int16GetDatum(*(unsigned char *) (prefixStr + commonLen)); + + out->result.splitTuple.childNodeN = 0; + + if (prefixSize - commonLen == 1) + { + out->result.splitTuple.postfixHasPrefix = false; + } + else + { + out->result.splitTuple.postfixHasPrefix = true; + out->result.splitTuple.postfixPrefixDatum = + formTextDatum(prefixStr + commonLen + 1, + prefixSize - commonLen - 1); + } + + PG_RETURN_VOID(); + } + } + else if (inSize > in->level) + { + nodeChar = *(unsigned char *) (inStr + in->level); + } + else + { + nodeChar = -1; + } + + /* Look up nodeChar in the node label array */ + if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i)) + { + /* + * Descend to existing node. (If in->allTheSame, the core code will + * ignore our nodeN specification here, but that's OK. We still have + * to provide the correct levelAdd and restDatum values, and those are + * the same regardless of which node gets chosen by core.) + */ + int levelAdd; + + out->resultType = spgMatchNode; + out->result.matchNode.nodeN = i; + levelAdd = commonLen; + if (nodeChar >= 0) + levelAdd++; + out->result.matchNode.levelAdd = levelAdd; + if (inSize - in->level - levelAdd > 0) + out->result.matchNode.restDatum = + formTextDatum(inStr + in->level + levelAdd, + inSize - in->level - levelAdd); + else + out->result.matchNode.restDatum = + formTextDatum(NULL, 0); + } + else if (in->allTheSame) + { + /* + * Can't use AddNode action, so split the tuple. The upper tuple has + * the same prefix as before and uses a dummy node label -2 for the + * lower tuple. The lower tuple has no prefix and the same node + * labels as the original tuple. + * + * Note: it might seem tempting to shorten the upper tuple's prefix, + * if it has one, then use its last byte as label for the lower tuple. + * But that doesn't win since we know the incoming value matches the + * whole prefix: we'd just end up splitting the lower tuple again. + */ + out->resultType = spgSplitTuple; + out->result.splitTuple.prefixHasPrefix = in->hasPrefix; + out->result.splitTuple.prefixPrefixDatum = in->prefixDatum; + out->result.splitTuple.prefixNNodes = 1; + out->result.splitTuple.prefixNodeLabels = (Datum *) palloc(sizeof(Datum)); + out->result.splitTuple.prefixNodeLabels[0] = Int16GetDatum(-2); + out->result.splitTuple.childNodeN = 0; + out->result.splitTuple.postfixHasPrefix = false; + } + else + { + /* Add a node for the not-previously-seen nodeChar value */ + out->resultType = spgAddNode; + out->result.addNode.nodeLabel = Int16GetDatum(nodeChar); + out->result.addNode.nodeN = i; + } + + PG_RETURN_VOID(); +} + +/* The picksplit function is identical to the core opclass, so just use that */ + +PG_FUNCTION_INFO_V1(spgist_name_inner_consistent); +Datum +spgist_name_inner_consistent(PG_FUNCTION_ARGS) +{ + spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); + spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); + text *reconstructedValue; + text *reconstrText; + int maxReconstrLen; + text *prefixText = NULL; + int prefixSize = 0; + int i; + + /* + * Reconstruct values represented at this tuple, including parent data, + * prefix of this tuple if any, and the node label if it's non-dummy. + * in->level should be the length of the previously reconstructed value, + * and the number of bytes added here is prefixSize or prefixSize + 1. + * + * Recall that reconstructedValues are assumed to be the same type as leaf + * datums, so we must use "text" not "name" for them. + * + * Note: we assume that in->reconstructedValue isn't toasted and doesn't + * have a short varlena header. This is okay because it must have been + * created by a previous invocation of this routine, and we always emit + * long-format reconstructed values. + */ + reconstructedValue = (text *) DatumGetPointer(in->reconstructedValue); + Assert(reconstructedValue == NULL ? in->level == 0 : + VARSIZE_ANY_EXHDR(reconstructedValue) == in->level); + + maxReconstrLen = in->level + 1; + if (in->hasPrefix) + { + prefixText = DatumGetTextPP(in->prefixDatum); + prefixSize = VARSIZE_ANY_EXHDR(prefixText); + maxReconstrLen += prefixSize; + } + + reconstrText = palloc(VARHDRSZ + maxReconstrLen); + SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen); + + if (in->level) + memcpy(VARDATA(reconstrText), + VARDATA(reconstructedValue), + in->level); + if (prefixSize) + memcpy(((char *) VARDATA(reconstrText)) + in->level, + VARDATA_ANY(prefixText), + prefixSize); + /* last byte of reconstrText will be filled in below */ + + /* + * Scan the child nodes. For each one, complete the reconstructed value + * and see if it's consistent with the query. If so, emit an entry into + * the output arrays. + */ + out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes); + out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes); + out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes); + out->nNodes = 0; + + for (i = 0; i < in->nNodes; i++) + { + int16 nodeChar = DatumGetInt16(in->nodeLabels[i]); + int thisLen; + bool res = true; + int j; + + /* If nodeChar is a dummy value, don't include it in data */ + if (nodeChar <= 0) + thisLen = maxReconstrLen - 1; + else + { + ((unsigned char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar; + thisLen = maxReconstrLen; + } + + for (j = 0; j < in->nkeys; j++) + { + StrategyNumber strategy = in->scankeys[j].sk_strategy; + Name inName; + char *inStr; + int inSize; + int r; + + inName = DatumGetName(in->scankeys[j].sk_argument); + inStr = NameStr(*inName); + inSize = strlen(inStr); + + r = memcmp(VARDATA(reconstrText), inStr, + Min(inSize, thisLen)); + + switch (strategy) + { + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + if (r > 0) + res = false; + break; + case BTEqualStrategyNumber: + if (r != 0 || inSize < thisLen) + res = false; + break; + case BTGreaterEqualStrategyNumber: + case BTGreaterStrategyNumber: + if (r < 0) + res = false; + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[j].sk_strategy); + break; + } + + if (!res) + break; /* no need to consider remaining conditions */ + } + + if (res) + { + out->nodeNumbers[out->nNodes] = i; + out->levelAdds[out->nNodes] = thisLen - in->level; + SET_VARSIZE(reconstrText, VARHDRSZ + thisLen); + out->reconstructedValues[out->nNodes] = + datumCopy(PointerGetDatum(reconstrText), false, -1); + out->nNodes++; + } + } + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(spgist_name_leaf_consistent); +Datum +spgist_name_leaf_consistent(PG_FUNCTION_ARGS) +{ + spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0); + spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1); + int level = in->level; + text *leafValue, + *reconstrValue = NULL; + char *fullValue; + int fullLen; + bool res; + int j; + + /* all tests are exact */ + out->recheck = false; + + leafValue = DatumGetTextPP(in->leafDatum); + + /* As above, in->reconstructedValue isn't toasted or short. */ + if (DatumGetPointer(in->reconstructedValue)) + reconstrValue = (text *) DatumGetPointer(in->reconstructedValue); + + Assert(reconstrValue == NULL ? level == 0 : + VARSIZE_ANY_EXHDR(reconstrValue) == level); + + /* Reconstruct the Name represented by this leaf tuple */ + fullValue = palloc0(NAMEDATALEN); + fullLen = level + VARSIZE_ANY_EXHDR(leafValue); + Assert(fullLen < NAMEDATALEN); + if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0) + { + memcpy(fullValue, VARDATA(reconstrValue), + VARSIZE_ANY_EXHDR(reconstrValue)); + } + else + { + if (level) + memcpy(fullValue, VARDATA(reconstrValue), level); + if (VARSIZE_ANY_EXHDR(leafValue) > 0) + memcpy(fullValue + level, VARDATA_ANY(leafValue), + VARSIZE_ANY_EXHDR(leafValue)); + } + out->leafValue = PointerGetDatum(fullValue); + + /* Perform the required comparison(s) */ + res = true; + for (j = 0; j < in->nkeys; j++) + { + StrategyNumber strategy = in->scankeys[j].sk_strategy; + Name queryName = DatumGetName(in->scankeys[j].sk_argument); + char *queryStr = NameStr(*queryName); + int queryLen = strlen(queryStr); + int r; + + /* Non-collation-aware comparison */ + r = memcmp(fullValue, queryStr, Min(queryLen, fullLen)); + + if (r == 0) + { + if (queryLen > fullLen) + r = -1; + else if (queryLen < fullLen) + r = 1; + } + + switch (strategy) + { + case BTLessStrategyNumber: + res = (r < 0); + break; + case BTLessEqualStrategyNumber: + res = (r <= 0); + break; + case BTEqualStrategyNumber: + res = (r == 0); + break; + case BTGreaterEqualStrategyNumber: + res = (r >= 0); + break; + case BTGreaterStrategyNumber: + res = (r > 0); + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[j].sk_strategy); + res = false; + break; + } + + if (!res) + break; /* no need to consider remaining conditions */ + } + + PG_RETURN_BOOL(res); +} + +PG_FUNCTION_INFO_V1(spgist_name_compress); +Datum +spgist_name_compress(PG_FUNCTION_ARGS) +{ + Name inName = PG_GETARG_NAME(0); + char *inStr = NameStr(*inName); + + PG_RETURN_DATUM(formTextDatum(inStr, strlen(inStr))); +} diff --git a/src/test/modules/spgist_name_ops/spgist_name_ops.control b/src/test/modules/spgist_name_ops/spgist_name_ops.control new file mode 100644 index 0000000000..f02df7ef85 --- /dev/null +++ b/src/test/modules/spgist_name_ops/spgist_name_ops.control @@ -0,0 +1,4 @@ +comment = 'Test opclass for SP-GiST' +default_version = '1.0' +module_pathname = '$libdir/spgist_name_ops' +relocatable = true diff --git a/src/test/modules/spgist_name_ops/sql/spgist_name_ops.sql b/src/test/modules/spgist_name_ops/sql/spgist_name_ops.sql new file mode 100644 index 0000000000..5c97e248a4 --- /dev/null +++ b/src/test/modules/spgist_name_ops/sql/spgist_name_ops.sql @@ -0,0 +1,38 @@ +create extension spgist_name_ops; + +select opcname, amvalidate(opc.oid) +from pg_opclass opc join pg_am am on am.oid = opcmethod +where amname = 'spgist' and opcname = 'name_ops'; + +-- warning expected here +select opcname, amvalidate(opc.oid) +from pg_opclass opc join pg_am am on am.oid = opcmethod +where amname = 'spgist' and opcname = 'name_ops_old'; + +create table t(f1 name); +create index on t using spgist(f1); +\d+ t_f1_idx + +insert into t select proname from pg_proc; +vacuum analyze t; + +explain (costs off) +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; + +drop index t_f1_idx; + +create index on t using spgist(f1 name_ops_old); +\d+ t_f1_idx + +explain (costs off) +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; +select * from t + where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p' + order by 1; diff --git a/src/test/regress/expected/rangetypes.out b/src/test/regress/expected/rangetypes.out index 05b882fde9..e6ca99d43d 100644 --- a/src/test/regress/expected/rangetypes.out +++ b/src/test/regress/expected/rangetypes.out @@ -1413,12 +1413,31 @@ RESET enable_bitmapscan; create table test_range_elem(i int4); create index test_range_elem_idx on test_range_elem (i); insert into test_range_elem select i from generate_series(1,100) i; +SET enable_seqscan = f; select count(*) from test_range_elem where i <@ int4range(10,50); count ------- 40 (1 row) +-- also test spgist index on anyrange expression +create index on test_range_elem using spgist(int4range(i,i+10)); +explain (costs off) +select count(*) from test_range_elem where int4range(i,i+10) <@ int4range(10,30); + QUERY PLAN +------------------------------------------------------------------------- + Aggregate + -> Index Scan using test_range_elem_int4range_idx on test_range_elem + Index Cond: (int4range(i, (i + 10)) <@ '[10,30)'::int4range) +(3 rows) + +select count(*) from test_range_elem where int4range(i,i+10) <@ int4range(10,30); + count +------- + 11 +(1 row) + +RESET enable_seqscan; drop table test_range_elem; -- -- Btree_gist is not included by default, so to test exclusion diff --git a/src/test/regress/sql/rangetypes.sql b/src/test/regress/sql/rangetypes.sql index e1b8917c0c..cb5353de6f 100644 --- a/src/test/regress/sql/rangetypes.sql +++ b/src/test/regress/sql/rangetypes.sql @@ -375,8 +375,18 @@ create table test_range_elem(i int4); create index test_range_elem_idx on test_range_elem (i); insert into test_range_elem select i from generate_series(1,100) i; +SET enable_seqscan = f; + select count(*) from test_range_elem where i <@ int4range(10,50); +-- also test spgist index on anyrange expression +create index on test_range_elem using spgist(int4range(i,i+10)); +explain (costs off) +select count(*) from test_range_elem where int4range(i,i+10) <@ int4range(10,30); +select count(*) from test_range_elem where int4range(i,i+10) <@ int4range(10,30); + +RESET enable_seqscan; + drop table test_range_elem; --