From 8004953b5a2449c26c4e082771276b2f8629d153 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 16 Nov 2015 13:45:17 -0500 Subject: [PATCH] Speed up ruleutils' name de-duplication code, and fix overlength-name case. Since commit 11e131854f8231a21613f834c40fe9d046926387, ruleutils.c has attempted to ensure that each RTE in a query or plan tree has a unique alias name. However, the code that was added for this could be quite slow, even as bad as O(N^3) if N identical RTE names must be replaced, as noted by Jeff Janes. Improve matters by building a transient hash table within set_rtable_names. The hash table in itself reduces the cost of detecting a duplicate from O(N) to O(1), and we can save another factor of N by storing the number of de-duplicated names already created for each entry, so that we don't have to re-try names already created. This way is probably a bit slower overall for small range tables, but almost by definition, such cases should not be a performance problem. In principle the same problem applies to the column-name-de-duplication code; but in practice that seems to be less of a problem, first because N is limited since we don't support extremely wide tables, and second because duplicate column names within an RTE are fairly rare, so that in practice the cost is more like O(N^2) not O(N^3). It would be very much messier to fix the column-name code, so for now I've left that alone. An independent problem in the same area was that the de-duplication code paid no attention to the identifier length limit, and would happily produce identifiers that were longer than NAMEDATALEN and wouldn't be unique after truncation to NAMEDATALEN. This could result in dump/reload failures, or perhaps even views that silently behaved differently than before. We can fix that by shortening the base name as needed. Fix it for both the relation and column name cases. In passing, check for interrupts in set_rtable_names, just in case it's still slow enough to be an issue. Back-patch to 9.3 where this code was introduced. --- src/backend/utils/adt/ruleutils.c | 173 ++++++++++++++++------ src/test/regress/expected/create_view.out | 27 ++++ src/test/regress/sql/create_view.sql | 9 ++ 3 files changed, 162 insertions(+), 47 deletions(-) diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 3388f7c17e..771b14b48a 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -38,6 +38,7 @@ #include "commands/tablespace.h" #include "executor/spi.h" #include "funcapi.h" +#include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -55,6 +56,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/hsearch.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/ruleutils.h" @@ -267,6 +269,15 @@ typedef struct #define deparse_columns_fetch(rangetable_index, dpns) \ ((deparse_columns *) list_nth((dpns)->rtable_columns, (rangetable_index)-1)) +/* + * Entry in set_rtable_names' hash table + */ +typedef struct +{ + char name[NAMEDATALEN]; /* Hash key --- must be first */ + int counter; /* Largest addition used so far for name */ +} NameHashEntry; + /* ---------- * Global data @@ -312,8 +323,6 @@ static void print_function_rettype(StringInfo buf, HeapTuple proctup); static void print_function_trftypes(StringInfo buf, HeapTuple proctup); static void set_rtable_names(deparse_namespace *dpns, List *parent_namespaces, Bitmapset *rels_used); -static bool refname_is_unique(char *refname, deparse_namespace *dpns, - List *parent_namespaces); static void set_deparse_for_query(deparse_namespace *dpns, Query *query, List *parent_namespaces); static void set_simple_column_names(deparse_namespace *dpns); @@ -2676,15 +2685,61 @@ static void set_rtable_names(deparse_namespace *dpns, List *parent_namespaces, Bitmapset *rels_used) { + HASHCTL hash_ctl; + HTAB *names_hash; + NameHashEntry *hentry; + bool found; + int rtindex; ListCell *lc; - int rtindex = 1; dpns->rtable_names = NIL; + /* nothing more to do if empty rtable */ + if (dpns->rtable == NIL) + return; + + /* + * We use a hash table to hold known names, so that this process is O(N) + * not O(N^2) for N names. + */ + MemSet(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(NameHashEntry); + hash_ctl.hcxt = CurrentMemoryContext; + names_hash = hash_create("set_rtable_names names", + list_length(dpns->rtable), + &hash_ctl, + HASH_ELEM | HASH_CONTEXT); + /* Preload the hash table with names appearing in parent_namespaces */ + foreach(lc, parent_namespaces) + { + deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc); + ListCell *lc2; + + foreach(lc2, olddpns->rtable_names) + { + char *oldname = (char *) lfirst(lc2); + + if (oldname == NULL) + continue; + hentry = (NameHashEntry *) hash_search(names_hash, + oldname, + HASH_ENTER, + &found); + /* we do not complain about duplicate names in parent namespaces */ + hentry->counter = 0; + } + } + + /* Now we can scan the rtable */ + rtindex = 1; foreach(lc, dpns->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); char *refname; + /* Just in case this takes an unreasonable amount of time ... */ + CHECK_FOR_INTERRUPTS(); + if (rels_used && !bms_is_member(rtindex, rels_used)) { /* Ignore unreferenced RTE */ @@ -2712,56 +2767,62 @@ set_rtable_names(deparse_namespace *dpns, List *parent_namespaces, } /* - * If the selected name isn't unique, append digits to make it so + * If the selected name isn't unique, append digits to make it so, and + * make a new hash entry for it once we've got a unique name. For a + * very long input name, we might have to truncate to stay within + * NAMEDATALEN. */ - if (refname && - !refname_is_unique(refname, dpns, parent_namespaces)) + if (refname) { - char *modname = (char *) palloc(strlen(refname) + 32); - int i = 0; - - do + hentry = (NameHashEntry *) hash_search(names_hash, + refname, + HASH_ENTER, + &found); + if (found) { - sprintf(modname, "%s_%d", refname, ++i); - } while (!refname_is_unique(modname, dpns, parent_namespaces)); - refname = modname; + /* Name already in use, must choose a new one */ + int refnamelen = strlen(refname); + char *modname = (char *) palloc(refnamelen + 16); + NameHashEntry *hentry2; + + do + { + hentry->counter++; + for (;;) + { + /* + * We avoid using %.*s here because it can misbehave + * if the data is not valid in what libc thinks is the + * prevailing encoding. + */ + memcpy(modname, refname, refnamelen); + sprintf(modname + refnamelen, "_%d", hentry->counter); + if (strlen(modname) < NAMEDATALEN) + break; + /* drop chars from refname to keep all the digits */ + refnamelen = pg_mbcliplen(refname, refnamelen, + refnamelen - 1); + } + hentry2 = (NameHashEntry *) hash_search(names_hash, + modname, + HASH_ENTER, + &found); + } while (found); + hentry2->counter = 0; /* init new hash entry */ + refname = modname; + } + else + { + /* Name not previously used, need only initialize hentry */ + hentry->counter = 0; + } } dpns->rtable_names = lappend(dpns->rtable_names, refname); rtindex++; } -} -/* - * refname_is_unique: is refname distinct from all already-chosen RTE names? - */ -static bool -refname_is_unique(char *refname, deparse_namespace *dpns, - List *parent_namespaces) -{ - ListCell *lc; - - foreach(lc, dpns->rtable_names) - { - char *oldname = (char *) lfirst(lc); - - if (oldname && strcmp(oldname, refname) == 0) - return false; - } - foreach(lc, parent_namespaces) - { - deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc); - ListCell *lc2; - - foreach(lc2, olddpns->rtable_names) - { - char *oldname = (char *) lfirst(lc2); - - if (oldname && strcmp(oldname, refname) == 0) - return false; - } - } - return true; + hash_destroy(names_hash); } /* @@ -3589,16 +3650,34 @@ make_colname_unique(char *colname, deparse_namespace *dpns, deparse_columns *colinfo) { /* - * If the selected name isn't unique, append digits to make it so + * If the selected name isn't unique, append digits to make it so. For a + * very long input name, we might have to truncate to stay within + * NAMEDATALEN. */ if (!colname_is_unique(colname, dpns, colinfo)) { - char *modname = (char *) palloc(strlen(colname) + 32); + int colnamelen = strlen(colname); + char *modname = (char *) palloc(colnamelen + 16); int i = 0; do { - sprintf(modname, "%s_%d", colname, ++i); + i++; + for (;;) + { + /* + * We avoid using %.*s here because it can misbehave if the + * data is not valid in what libc thinks is the prevailing + * encoding. + */ + memcpy(modname, colname, colnamelen); + sprintf(modname + colnamelen, "_%d", i); + if (strlen(modname) < NAMEDATALEN) + break; + /* drop chars from colname to keep all the digits */ + colnamelen = pg_mbcliplen(colname, colnamelen, + colnamelen - 1); + } } while (!colname_is_unique(modname, dpns, colinfo)); colname = modname; } diff --git a/src/test/regress/expected/create_view.out b/src/test/regress/expected/create_view.out index 3e4d424eca..ae50c94607 100644 --- a/src/test/regress/expected/create_view.out +++ b/src/test/regress/expected/create_view.out @@ -1475,6 +1475,33 @@ select * from int8_tbl i where i.* in (values(i.*::int8_tbl)); 4567890123456789 | -4567890123456789 (5 rows) +-- check unique-ification of overlength names +create view tt18v as + select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy + union all + select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz; +NOTICE: identifier "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy" will be truncated to "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" +NOTICE: identifier "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz" will be truncated to "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" +select pg_get_viewdef('tt18v', true); + pg_get_viewdef +----------------------------------------------------------------------------------- + SELECT xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q1, + + xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q2 + + FROM int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + + UNION ALL + + SELECT xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q1, + + xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q2 + + FROM int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx; +(1 row) + +explain (costs off) select * from tt18v; + QUERY PLAN +-------------------------------------------------------------------------------------------- + Append + -> Seq Scan on int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + -> Seq Scan on int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_1 +(3 rows) + -- clean up all the random objects we made above set client_min_messages = warning; DROP SCHEMA temp_view_test CASCADE; diff --git a/src/test/regress/sql/create_view.sql b/src/test/regress/sql/create_view.sql index bcee90d614..58d361df64 100644 --- a/src/test/regress/sql/create_view.sql +++ b/src/test/regress/sql/create_view.sql @@ -487,6 +487,15 @@ select * from tt17v; select pg_get_viewdef('tt17v', true); select * from int8_tbl i where i.* in (values(i.*::int8_tbl)); +-- check unique-ification of overlength names + +create view tt18v as + select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy + union all + select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz; +select pg_get_viewdef('tt18v', true); +explain (costs off) select * from tt18v; + -- clean up all the random objects we made above set client_min_messages = warning; DROP SCHEMA temp_view_test CASCADE;