Speed up ruleutils' name de-duplication code, and fix overlength-name case.

Since commit 11e131854f, ruleutils.c has
attempted to ensure that each RTE in a query or plan tree has a unique
alias name.  However, the code that was added for this could be quite slow,
even as bad as O(N^3) if N identical RTE names must be replaced, as noted
by Jeff Janes.  Improve matters by building a transient hash table within
set_rtable_names.  The hash table in itself reduces the cost of detecting a
duplicate from O(N) to O(1), and we can save another factor of N by storing
the number of de-duplicated names already created for each entry, so that
we don't have to re-try names already created.  This way is probably a bit
slower overall for small range tables, but almost by definition, such cases
should not be a performance problem.

In principle the same problem applies to the column-name-de-duplication
code; but in practice that seems to be less of a problem, first because
N is limited since we don't support extremely wide tables, and second
because duplicate column names within an RTE are fairly rare, so that in
practice the cost is more like O(N^2) not O(N^3).  It would be very much
messier to fix the column-name code, so for now I've left that alone.

An independent problem in the same area was that the de-duplication code
paid no attention to the identifier length limit, and would happily produce
identifiers that were longer than NAMEDATALEN and wouldn't be unique after
truncation to NAMEDATALEN.  This could result in dump/reload failures, or
perhaps even views that silently behaved differently than before.  We can
fix that by shortening the base name as needed.  Fix it for both the
relation and column name cases.

In passing, check for interrupts in set_rtable_names, just in case it's
still slow enough to be an issue.

Back-patch to 9.3 where this code was introduced.
This commit is contained in:
Tom Lane 2015-11-16 13:45:17 -05:00
parent 179c97bf58
commit 8004953b5a
3 changed files with 162 additions and 47 deletions

View File

@ -38,6 +38,7 @@
#include "commands/tablespace.h"
#include "executor/spi.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@ -55,6 +56,7 @@
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/ruleutils.h"
@ -267,6 +269,15 @@ typedef struct
#define deparse_columns_fetch(rangetable_index, dpns) \
((deparse_columns *) list_nth((dpns)->rtable_columns, (rangetable_index)-1))
/*
* Entry in set_rtable_names' hash table
*/
typedef struct
{
char name[NAMEDATALEN]; /* Hash key --- must be first */
int counter; /* Largest addition used so far for name */
} NameHashEntry;
/* ----------
* Global data
@ -312,8 +323,6 @@ static void print_function_rettype(StringInfo buf, HeapTuple proctup);
static void print_function_trftypes(StringInfo buf, HeapTuple proctup);
static void set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
Bitmapset *rels_used);
static bool refname_is_unique(char *refname, deparse_namespace *dpns,
List *parent_namespaces);
static void set_deparse_for_query(deparse_namespace *dpns, Query *query,
List *parent_namespaces);
static void set_simple_column_names(deparse_namespace *dpns);
@ -2676,15 +2685,61 @@ static void
set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
Bitmapset *rels_used)
{
HASHCTL hash_ctl;
HTAB *names_hash;
NameHashEntry *hentry;
bool found;
int rtindex;
ListCell *lc;
int rtindex = 1;
dpns->rtable_names = NIL;
/* nothing more to do if empty rtable */
if (dpns->rtable == NIL)
return;
/*
* We use a hash table to hold known names, so that this process is O(N)
* not O(N^2) for N names.
*/
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = NAMEDATALEN;
hash_ctl.entrysize = sizeof(NameHashEntry);
hash_ctl.hcxt = CurrentMemoryContext;
names_hash = hash_create("set_rtable_names names",
list_length(dpns->rtable),
&hash_ctl,
HASH_ELEM | HASH_CONTEXT);
/* Preload the hash table with names appearing in parent_namespaces */
foreach(lc, parent_namespaces)
{
deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc);
ListCell *lc2;
foreach(lc2, olddpns->rtable_names)
{
char *oldname = (char *) lfirst(lc2);
if (oldname == NULL)
continue;
hentry = (NameHashEntry *) hash_search(names_hash,
oldname,
HASH_ENTER,
&found);
/* we do not complain about duplicate names in parent namespaces */
hentry->counter = 0;
}
}
/* Now we can scan the rtable */
rtindex = 1;
foreach(lc, dpns->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
char *refname;
/* Just in case this takes an unreasonable amount of time ... */
CHECK_FOR_INTERRUPTS();
if (rels_used && !bms_is_member(rtindex, rels_used))
{
/* Ignore unreferenced RTE */
@ -2712,56 +2767,62 @@ set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
}
/*
* If the selected name isn't unique, append digits to make it so
* If the selected name isn't unique, append digits to make it so, and
* make a new hash entry for it once we've got a unique name. For a
* very long input name, we might have to truncate to stay within
* NAMEDATALEN.
*/
if (refname &&
!refname_is_unique(refname, dpns, parent_namespaces))
if (refname)
{
char *modname = (char *) palloc(strlen(refname) + 32);
int i = 0;
do
hentry = (NameHashEntry *) hash_search(names_hash,
refname,
HASH_ENTER,
&found);
if (found)
{
sprintf(modname, "%s_%d", refname, ++i);
} while (!refname_is_unique(modname, dpns, parent_namespaces));
refname = modname;
/* Name already in use, must choose a new one */
int refnamelen = strlen(refname);
char *modname = (char *) palloc(refnamelen + 16);
NameHashEntry *hentry2;
do
{
hentry->counter++;
for (;;)
{
/*
* We avoid using %.*s here because it can misbehave
* if the data is not valid in what libc thinks is the
* prevailing encoding.
*/
memcpy(modname, refname, refnamelen);
sprintf(modname + refnamelen, "_%d", hentry->counter);
if (strlen(modname) < NAMEDATALEN)
break;
/* drop chars from refname to keep all the digits */
refnamelen = pg_mbcliplen(refname, refnamelen,
refnamelen - 1);
}
hentry2 = (NameHashEntry *) hash_search(names_hash,
modname,
HASH_ENTER,
&found);
} while (found);
hentry2->counter = 0; /* init new hash entry */
refname = modname;
}
else
{
/* Name not previously used, need only initialize hentry */
hentry->counter = 0;
}
}
dpns->rtable_names = lappend(dpns->rtable_names, refname);
rtindex++;
}
}
/*
* refname_is_unique: is refname distinct from all already-chosen RTE names?
*/
static bool
refname_is_unique(char *refname, deparse_namespace *dpns,
List *parent_namespaces)
{
ListCell *lc;
foreach(lc, dpns->rtable_names)
{
char *oldname = (char *) lfirst(lc);
if (oldname && strcmp(oldname, refname) == 0)
return false;
}
foreach(lc, parent_namespaces)
{
deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc);
ListCell *lc2;
foreach(lc2, olddpns->rtable_names)
{
char *oldname = (char *) lfirst(lc2);
if (oldname && strcmp(oldname, refname) == 0)
return false;
}
}
return true;
hash_destroy(names_hash);
}
/*
@ -3589,16 +3650,34 @@ make_colname_unique(char *colname, deparse_namespace *dpns,
deparse_columns *colinfo)
{
/*
* If the selected name isn't unique, append digits to make it so
* If the selected name isn't unique, append digits to make it so. For a
* very long input name, we might have to truncate to stay within
* NAMEDATALEN.
*/
if (!colname_is_unique(colname, dpns, colinfo))
{
char *modname = (char *) palloc(strlen(colname) + 32);
int colnamelen = strlen(colname);
char *modname = (char *) palloc(colnamelen + 16);
int i = 0;
do
{
sprintf(modname, "%s_%d", colname, ++i);
i++;
for (;;)
{
/*
* We avoid using %.*s here because it can misbehave if the
* data is not valid in what libc thinks is the prevailing
* encoding.
*/
memcpy(modname, colname, colnamelen);
sprintf(modname + colnamelen, "_%d", i);
if (strlen(modname) < NAMEDATALEN)
break;
/* drop chars from colname to keep all the digits */
colnamelen = pg_mbcliplen(colname, colnamelen,
colnamelen - 1);
}
} while (!colname_is_unique(modname, dpns, colinfo));
colname = modname;
}

View File

@ -1475,6 +1475,33 @@ select * from int8_tbl i where i.* in (values(i.*::int8_tbl));
4567890123456789 | -4567890123456789
(5 rows)
-- check unique-ification of overlength names
create view tt18v as
select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy
union all
select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz;
NOTICE: identifier "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy" will be truncated to "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
NOTICE: identifier "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz" will be truncated to "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
select pg_get_viewdef('tt18v', true);
pg_get_viewdef
-----------------------------------------------------------------------------------
SELECT xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q1, +
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q2 +
FROM int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +
UNION ALL +
SELECT xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q1, +
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q2 +
FROM int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx;
(1 row)
explain (costs off) select * from tt18v;
QUERY PLAN
--------------------------------------------------------------------------------------------
Append
-> Seq Scan on int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-> Seq Scan on int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_1
(3 rows)
-- clean up all the random objects we made above
set client_min_messages = warning;
DROP SCHEMA temp_view_test CASCADE;

View File

@ -487,6 +487,15 @@ select * from tt17v;
select pg_get_viewdef('tt17v', true);
select * from int8_tbl i where i.* in (values(i.*::int8_tbl));
-- check unique-ification of overlength names
create view tt18v as
select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy
union all
select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz;
select pg_get_viewdef('tt18v', true);
explain (costs off) select * from tt18v;
-- clean up all the random objects we made above
set client_min_messages = warning;
DROP SCHEMA temp_view_test CASCADE;