postgresql/src/backend/parser/parse_utilcmd.c

2251 lines
64 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* parse_utilcmd.c
* Perform parse analysis work for various utility commands
*
* Formerly we did this work during parse_analyze() in analyze.c. However
* that is fairly unsafe in the presence of querytree caching, since any
* database state that we depend on in making the transformations might be
* obsolete by the time the utility command is executed; and utility commands
* have no infrastructure for holding locks or rechecking plan validity.
* Hence these functions are now called at the start of execution of their
* respective utility commands.
*
* NOTE: in general we must avoid scribbling on the passed-in raw parse
* tree, since it might be in a plan cache. The simplest solution is
* a quick copyObject() call before manipulating the query tree.
*
*
2009-01-01 18:24:05 +01:00
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.27 2009/10/12 19:49:24 adunstan Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/reloptions.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_type.h"
#include "commands/comment.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/analyze.h"
#include "parser/parse_clause.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "parser/parse_utilcmd.h"
#include "parser/parser.h"
#include "rewrite/rewriteManip.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
/* State shared by transformCreateStmt and its subroutines */
typedef struct
{
const char *stmtType; /* "CREATE TABLE" or "ALTER TABLE" */
RangeVar *relation; /* relation to create */
Relation rel; /* opened/locked rel, if ALTER */
List *inhRelations; /* relations to inherit from */
bool isalter; /* true if altering existing table */
bool hasoids; /* does relation have an OID column? */
List *columns; /* ColumnDef items */
List *ckconstraints; /* CHECK constraints */
List *fkconstraints; /* FOREIGN KEY constraints */
List *ixconstraints; /* index-creating constraints */
List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */
List *blist; /* "before list" of things to do before
* creating the table */
List *alist; /* "after list" of things to do after creating
* the table */
IndexStmt *pkey; /* PRIMARY KEY index, if any */
} CreateStmtContext;
/* State shared by transformCreateSchemaStmt and its subroutines */
typedef struct
{
const char *stmtType; /* "CREATE SCHEMA" or "ALTER SCHEMA" */
char *schemaname; /* name of schema */
char *authid; /* owner of schema */
List *sequences; /* CREATE SEQUENCE items */
List *tables; /* CREATE TABLE items */
List *views; /* CREATE VIEW items */
List *indexes; /* CREATE INDEX items */
List *triggers; /* CREATE TRIGGER items */
List *grants; /* GRANT items */
} CreateSchemaStmtContext;
static void transformColumnDefinition(ParseState *pstate,
CreateStmtContext *cxt,
ColumnDef *column);
static void transformTableConstraint(ParseState *pstate,
CreateStmtContext *cxt,
Constraint *constraint);
static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
InhRelation *inhrelation);
static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
2007-11-15 22:14:46 +01:00
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
Relation parent_index, AttrNumber *attmap);
static List *get_opclass(Oid opclass, Oid actual_datatype);
static void transformIndexConstraints(ParseState *pstate,
CreateStmtContext *cxt);
static IndexStmt *transformIndexConstraint(Constraint *constraint,
2007-11-15 22:14:46 +01:00
CreateStmtContext *cxt);
static void transformFKConstraints(ParseState *pstate,
CreateStmtContext *cxt,
bool skipValidation,
bool isAddConstraint);
static void transformConstraintAttrs(ParseState *pstate, List *constraintList);
static void transformColumnType(ParseState *pstate, ColumnDef *column);
static void setSchemaName(char *context_schema, char **stmt_schema_name);
/*
* transformCreateStmt -
* parse analysis for CREATE TABLE
*
* Returns a List of utility commands to be done in sequence. One of these
* will be the transformed CreateStmt, but there may be additional actions
* to be done before and after the actual DefineRelation() call.
*
* SQL92 allows constraints to be scattered all over, so thumb through
* the columns and collect all constraints into one place.
* If there are any implied indices (e.g. UNIQUE or PRIMARY KEY)
* then expand those into multiple IndexStmt blocks.
* - thomas 1997-12-02
*/
List *
transformCreateStmt(CreateStmt *stmt, const char *queryString)
{
ParseState *pstate;
CreateStmtContext cxt;
List *result;
List *save_alist;
ListCell *elements;
/*
2007-11-15 22:14:46 +01:00
* We must not scribble on the passed-in CreateStmt, so copy it. (This is
* overkill, but easy.)
*/
stmt = (CreateStmt *) copyObject(stmt);
Fix a couple of misbehaviors rooted in the fact that the default creation namespace isn't necessarily first in the search path (there could be implicit schemas ahead of it). Examples are test=# set search_path TO s1; test=# create view pg_timezone_names as select * from pg_timezone_names(); ERROR: "pg_timezone_names" is already a view test=# create table pg_class (f1 int primary key); ERROR: permission denied: "pg_class" is a system catalog You'd expect these commands to create the requested objects in s1, since names beginning with pg_ aren't supposed to be reserved anymore. What is happening is that we create the requested base table and then execute additional commands (here, CREATE RULE or CREATE INDEX), and that code is passed the same RangeVar that was in the original command. Since that RangeVar has schemaname = NULL, the secondary commands think they should do a path search, and that means they find system catalogs that are implicitly in front of s1 in the search path. This is perilously close to being a security hole: if the secondary command failed to apply a permission check then it'd be possible for unprivileged users to make schema modifications to system catalogs. But as far as I can find, there is no code path in which a check doesn't occur. Which makes it just a weird corner-case bug for people who are silly enough to want to name their tables the same as a system catalog. The relevant code has changed quite a bit since 8.2, which means this patch wouldn't work as-is in the back branches. Since it's a corner case no one has reported from the field, I'm not going to bother trying to back-patch.
2007-08-27 05:36:08 +02:00
/*
* If the target relation name isn't schema-qualified, make it so. This
* prevents some corner cases in which added-on rewritten commands might
2007-11-15 22:14:46 +01:00
* think they should apply to other relations that have the same name and
* are earlier in the search path. "istemp" is equivalent to a
Fix a couple of misbehaviors rooted in the fact that the default creation namespace isn't necessarily first in the search path (there could be implicit schemas ahead of it). Examples are test=# set search_path TO s1; test=# create view pg_timezone_names as select * from pg_timezone_names(); ERROR: "pg_timezone_names" is already a view test=# create table pg_class (f1 int primary key); ERROR: permission denied: "pg_class" is a system catalog You'd expect these commands to create the requested objects in s1, since names beginning with pg_ aren't supposed to be reserved anymore. What is happening is that we create the requested base table and then execute additional commands (here, CREATE RULE or CREATE INDEX), and that code is passed the same RangeVar that was in the original command. Since that RangeVar has schemaname = NULL, the secondary commands think they should do a path search, and that means they find system catalogs that are implicitly in front of s1 in the search path. This is perilously close to being a security hole: if the secondary command failed to apply a permission check then it'd be possible for unprivileged users to make schema modifications to system catalogs. But as far as I can find, there is no code path in which a check doesn't occur. Which makes it just a weird corner-case bug for people who are silly enough to want to name their tables the same as a system catalog. The relevant code has changed quite a bit since 8.2, which means this patch wouldn't work as-is in the back branches. Since it's a corner case no one has reported from the field, I'm not going to bother trying to back-patch.
2007-08-27 05:36:08 +02:00
* specification of pg_temp, so no need for anything extra in that case.
*/
if (stmt->relation->schemaname == NULL && !stmt->relation->istemp)
{
2007-11-15 22:14:46 +01:00
Oid namespaceid = RangeVarGetCreationNamespace(stmt->relation);
Fix a couple of misbehaviors rooted in the fact that the default creation namespace isn't necessarily first in the search path (there could be implicit schemas ahead of it). Examples are test=# set search_path TO s1; test=# create view pg_timezone_names as select * from pg_timezone_names(); ERROR: "pg_timezone_names" is already a view test=# create table pg_class (f1 int primary key); ERROR: permission denied: "pg_class" is a system catalog You'd expect these commands to create the requested objects in s1, since names beginning with pg_ aren't supposed to be reserved anymore. What is happening is that we create the requested base table and then execute additional commands (here, CREATE RULE or CREATE INDEX), and that code is passed the same RangeVar that was in the original command. Since that RangeVar has schemaname = NULL, the secondary commands think they should do a path search, and that means they find system catalogs that are implicitly in front of s1 in the search path. This is perilously close to being a security hole: if the secondary command failed to apply a permission check then it'd be possible for unprivileged users to make schema modifications to system catalogs. But as far as I can find, there is no code path in which a check doesn't occur. Which makes it just a weird corner-case bug for people who are silly enough to want to name their tables the same as a system catalog. The relevant code has changed quite a bit since 8.2, which means this patch wouldn't work as-is in the back branches. Since it's a corner case no one has reported from the field, I'm not going to bother trying to back-patch.
2007-08-27 05:36:08 +02:00
stmt->relation->schemaname = get_namespace_name(namespaceid);
}
/* Set up pstate */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
cxt.stmtType = "CREATE TABLE";
cxt.relation = stmt->relation;
cxt.rel = NULL;
cxt.inhRelations = stmt->inhRelations;
cxt.isalter = false;
cxt.columns = NIL;
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
cxt.inh_indexes = NIL;
cxt.blist = NIL;
cxt.alist = NIL;
cxt.pkey = NULL;
cxt.hasoids = interpretOidsOption(stmt->options);
/*
* Run through each primary element in the table creation clause. Separate
* column defs from constraints, and do preliminary analysis.
*/
foreach(elements, stmt->tableElts)
{
Node *element = lfirst(elements);
switch (nodeTag(element))
{
case T_ColumnDef:
transformColumnDefinition(pstate, &cxt,
(ColumnDef *) element);
break;
case T_Constraint:
transformTableConstraint(pstate, &cxt,
(Constraint *) element);
break;
case T_InhRelation:
transformInhRelation(pstate, &cxt,
(InhRelation *) element);
break;
default:
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(element));
break;
}
}
/*
* transformIndexConstraints wants cxt.alist to contain only index
* statements, so transfer anything we already have into save_alist.
*/
save_alist = cxt.alist;
cxt.alist = NIL;
Assert(stmt->constraints == NIL);
/*
* Postprocess constraints that give rise to index definitions.
*/
transformIndexConstraints(pstate, &cxt);
/*
* Postprocess foreign-key constraints.
*/
transformFKConstraints(pstate, &cxt, true, false);
/*
* Output results.
*/
stmt->tableElts = cxt.columns;
stmt->constraints = cxt.ckconstraints;
result = lappend(cxt.blist, stmt);
result = list_concat(result, cxt.alist);
result = list_concat(result, save_alist);
return result;
}
/*
* transformColumnDefinition -
* transform a single ColumnDef within CREATE TABLE
* Also used in ALTER TABLE ADD COLUMN
*/
static void
transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
ColumnDef *column)
{
bool is_serial;
bool saw_nullable;
bool saw_default;
Constraint *constraint;
ListCell *clist;
cxt->columns = lappend(cxt->columns, column);
/* Check for SERIAL pseudo-types */
is_serial = false;
if (list_length(column->typeName->names) == 1 &&
!column->typeName->pct_type)
{
char *typname = strVal(linitial(column->typeName->names));
if (strcmp(typname, "serial") == 0 ||
strcmp(typname, "serial4") == 0)
{
is_serial = true;
column->typeName->names = NIL;
column->typeName->typeOid = INT4OID;
}
else if (strcmp(typname, "bigserial") == 0 ||
strcmp(typname, "serial8") == 0)
{
is_serial = true;
column->typeName->names = NIL;
column->typeName->typeOid = INT8OID;
}
/*
* We have to reject "serial[]" explicitly, because once we've set
* typeid, LookupTypeName won't notice arrayBounds. We don't need any
* special coding for serial(typmod) though.
*/
if (is_serial && column->typeName->arrayBounds != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array of serial is not implemented"),
parser_errposition(pstate, column->typeName->location)));
}
/* Do necessary work on the column type declaration */
transformColumnType(pstate, column);
/* Special actions for SERIAL pseudo-types */
if (is_serial)
{
Oid snamespaceid;
char *snamespace;
char *sname;
char *qstring;
A_Const *snamenode;
TypeCast *castnode;
FuncCall *funccallnode;
CreateSeqStmt *seqstmt;
AlterSeqStmt *altseqstmt;
List *attnamelist;
/*
* Determine namespace and name to use for the sequence.
*
* Although we use ChooseRelationName, it's not guaranteed that the
* selected sequence name won't conflict; given sufficiently long
* field names, two different serial columns in the same table could
* be assigned the same sequence name, and we'd not notice since we
* aren't creating the sequence quite yet. In practice this seems
* quite unlikely to be a problem, especially since few people would
* need two serial columns in one table.
*/
if (cxt->rel)
snamespaceid = RelationGetNamespace(cxt->rel);
else
snamespaceid = RangeVarGetCreationNamespace(cxt->relation);
snamespace = get_namespace_name(snamespaceid);
sname = ChooseRelationName(cxt->relation->relname,
column->colname,
"seq",
snamespaceid);
ereport(NOTICE,
(errmsg("%s will create implicit sequence \"%s\" for serial column \"%s.%s\"",
cxt->stmtType, sname,
cxt->relation->relname, column->colname)));
/*
* Build a CREATE SEQUENCE command to create the sequence object, and
* add it to the list of things to be done before this CREATE/ALTER
* TABLE.
*/
seqstmt = makeNode(CreateSeqStmt);
seqstmt->sequence = makeRangeVar(snamespace, sname, -1);
seqstmt->options = NIL;
cxt->blist = lappend(cxt->blist, seqstmt);
/*
* Build an ALTER SEQUENCE ... OWNED BY command to mark the sequence
* as owned by this column, and add it to the list of things to be
* done after this CREATE/ALTER TABLE.
*/
altseqstmt = makeNode(AlterSeqStmt);
altseqstmt->sequence = makeRangeVar(snamespace, sname, -1);
attnamelist = list_make3(makeString(snamespace),
makeString(cxt->relation->relname),
makeString(column->colname));
altseqstmt->options = list_make1(makeDefElem("owned_by",
(Node *) attnamelist));
cxt->alist = lappend(cxt->alist, altseqstmt);
/*
* Create appropriate constraints for SERIAL. We do this in full,
* rather than shortcutting, so that we will detect any conflicting
* constraints the user wrote (like a different DEFAULT).
*
* Create an expression tree representing the function call
* nextval('sequencename'). We cannot reduce the raw tree to cooked
* form until after the sequence is created, but there's no need to do
* so.
*/
qstring = quote_qualified_identifier(snamespace, sname);
snamenode = makeNode(A_Const);
snamenode->val.type = T_String;
snamenode->val.val.str = qstring;
snamenode->location = -1;
castnode = makeNode(TypeCast);
castnode->typeName = SystemTypeName("regclass");
castnode->arg = (Node *) snamenode;
castnode->location = -1;
funccallnode = makeNode(FuncCall);
funccallnode->funcname = SystemFuncName("nextval");
funccallnode->args = list_make1(castnode);
funccallnode->agg_star = false;
funccallnode->agg_distinct = false;
funccallnode->func_variadic = false;
funccallnode->over = NULL;
funccallnode->location = -1;
constraint = makeNode(Constraint);
constraint->contype = CONSTR_DEFAULT;
constraint->location = -1;
constraint->raw_expr = (Node *) funccallnode;
constraint->cooked_expr = NULL;
column->constraints = lappend(column->constraints, constraint);
constraint = makeNode(Constraint);
constraint->contype = CONSTR_NOTNULL;
constraint->location = -1;
column->constraints = lappend(column->constraints, constraint);
}
/* Process column constraints, if any... */
transformConstraintAttrs(pstate, column->constraints);
saw_nullable = false;
saw_default = false;
foreach(clist, column->constraints)
{
constraint = lfirst(clist);
Assert(IsA(constraint, Constraint));
switch (constraint->contype)
{
case CONSTR_NULL:
if (saw_nullable && column->is_not_null)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(pstate,
constraint->location)));
column->is_not_null = FALSE;
saw_nullable = true;
break;
case CONSTR_NOTNULL:
if (saw_nullable && !column->is_not_null)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(pstate,
constraint->location)));
column->is_not_null = TRUE;
saw_nullable = true;
break;
case CONSTR_DEFAULT:
if (saw_default)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple default values specified for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(pstate,
constraint->location)));
column->raw_default = constraint->raw_expr;
Assert(constraint->cooked_expr == NULL);
saw_default = true;
break;
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
if (constraint->keys == NIL)
constraint->keys = list_make1(makeString(column->colname));
cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
break;
case CONSTR_CHECK:
cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
break;
case CONSTR_FOREIGN:
/*
* Fill in the current attribute's name and throw it into the
* list of FK constraints to be processed later.
*/
constraint->fk_attrs = list_make1(makeString(column->colname));
cxt->fkconstraints = lappend(cxt->fkconstraints, constraint);
break;
case CONSTR_ATTR_DEFERRABLE:
case CONSTR_ATTR_NOT_DEFERRABLE:
case CONSTR_ATTR_DEFERRED:
case CONSTR_ATTR_IMMEDIATE:
/* transformConstraintAttrs took care of these */
break;
default:
elog(ERROR, "unrecognized constraint type: %d",
constraint->contype);
break;
}
}
}
/*
* transformTableConstraint
* transform a Constraint node within CREATE TABLE or ALTER TABLE
*/
static void
transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt,
Constraint *constraint)
{
switch (constraint->contype)
{
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
break;
case CONSTR_CHECK:
cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
break;
case CONSTR_FOREIGN:
cxt->fkconstraints = lappend(cxt->fkconstraints, constraint);
break;
case CONSTR_NULL:
case CONSTR_NOTNULL:
case CONSTR_DEFAULT:
case CONSTR_ATTR_DEFERRABLE:
case CONSTR_ATTR_NOT_DEFERRABLE:
case CONSTR_ATTR_DEFERRED:
case CONSTR_ATTR_IMMEDIATE:
elog(ERROR, "invalid context for constraint type %d",
constraint->contype);
break;
default:
elog(ERROR, "unrecognized constraint type: %d",
constraint->contype);
break;
}
}
/*
* transformInhRelation
*
* Change the LIKE <subtable> portion of a CREATE TABLE statement into
* column definitions which recreate the user defined column portions of
* <subtable>.
*/
static void
transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
InhRelation *inhRelation)
{
AttrNumber parent_attno;
Relation relation;
TupleDesc tupleDesc;
TupleConstr *constr;
AclResult aclresult;
char *comment;
relation = parserOpenTable(pstate, inhRelation->relation, AccessShareLock);
if (relation->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("inherited relation \"%s\" is not a table",
inhRelation->relation->relname)));
/*
* Check for SELECT privilages
*/
aclresult = pg_class_aclcheck(RelationGetRelid(relation), GetUserId(),
ACL_SELECT);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(relation));
tupleDesc = RelationGetDescr(relation);
constr = tupleDesc->constr;
/*
2007-11-15 22:14:46 +01:00
* Insert the copied attributes into the cxt for the new table definition.
*/
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
char *attributeName = NameStr(attribute->attname);
ColumnDef *def;
/*
* Ignore dropped columns in the parent.
*/
if (attribute->attisdropped)
continue;
/*
* Create a new column, which is marked as NOT inherited.
*
* For constraints, ONLY the NOT NULL constraint is inherited by the
* new column definition per SQL99.
*/
def = makeNode(ColumnDef);
def->colname = pstrdup(attributeName);
def->typeName = makeTypeNameFromOid(attribute->atttypid,
attribute->atttypmod);
def->inhcount = 0;
def->is_local = true;
def->is_not_null = attribute->attnotnull;
def->raw_default = NULL;
def->cooked_default = NULL;
def->constraints = NIL;
/*
* Add to column list
*/
cxt->columns = lappend(cxt->columns, def);
/*
* Copy default, if present and the default has been requested
*/
if (attribute->atthasdef &&
(inhRelation->options & CREATE_TABLE_LIKE_DEFAULTS))
{
Node *this_default = NULL;
AttrDefault *attrdef;
int i;
/* Find default in constraint structure */
Assert(constr != NULL);
attrdef = constr->defval;
for (i = 0; i < constr->num_defval; i++)
{
if (attrdef[i].adnum == parent_attno)
{
this_default = stringToNode(attrdef[i].adbin);
break;
}
}
Assert(this_default != NULL);
/*
* If default expr could contain any vars, we'd need to fix 'em,
* but it can't; so default is ready to apply to child.
*/
def->cooked_default = this_default;
}
/* Likewise, copy storage if requested */
if (inhRelation->options & CREATE_TABLE_LIKE_STORAGE)
def->storage = attribute->attstorage;
/* Likewise, copy comment if requested */
if ((inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) &&
(comment = GetComment(attribute->attrelid, RelationRelationId,
attribute->attnum)) != NULL)
{
CommentStmt *stmt = makeNode(CommentStmt);
stmt->objtype = OBJECT_COLUMN;
stmt->objname = list_make3(makeString(cxt->relation->schemaname),
makeString(cxt->relation->relname),
makeString(def->colname));
stmt->objargs = NIL;
stmt->comment = comment;
cxt->alist = lappend(cxt->alist, stmt);
}
}
/*
2007-11-15 22:14:46 +01:00
* Copy CHECK constraints if requested, being careful to adjust attribute
* numbers
*/
if ((inhRelation->options & CREATE_TABLE_LIKE_CONSTRAINTS) && tupleDesc->constr)
{
AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
int ccnum;
for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
{
char *ccname = tupleDesc->constr->check[ccnum].ccname;
char *ccbin = tupleDesc->constr->check[ccnum].ccbin;
Node *ccbin_node = stringToNode(ccbin);
Constraint *n = makeNode(Constraint);
change_varattnos_of_a_node(ccbin_node, attmap);
n->contype = CONSTR_CHECK;
n->location = -1;
n->conname = pstrdup(ccname);
n->raw_expr = NULL;
n->cooked_expr = nodeToString(ccbin_node);
cxt->ckconstraints = lappend(cxt->ckconstraints, n);
/* Copy comment on constraint */
if ((inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) &&
(comment = GetComment(GetConstraintByName(RelationGetRelid(
relation), n->conname), ConstraintRelationId, 0)) != NULL)
{
CommentStmt *stmt = makeNode(CommentStmt);
stmt->objtype = OBJECT_CONSTRAINT;
stmt->objname = list_make3(makeString(cxt->relation->schemaname),
makeString(cxt->relation->relname),
makeString(n->conname));
stmt->objargs = NIL;
stmt->comment = comment;
cxt->alist = lappend(cxt->alist, stmt);
}
}
}
/*
* Likewise, copy indexes if requested
*/
if ((inhRelation->options & CREATE_TABLE_LIKE_INDEXES) &&
relation->rd_rel->relhasindex)
{
AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
List *parent_indexes;
ListCell *l;
parent_indexes = RelationGetIndexList(relation);
foreach(l, parent_indexes)
{
2007-11-15 22:14:46 +01:00
Oid parent_index_oid = lfirst_oid(l);
Relation parent_index;
IndexStmt *index_stmt;
parent_index = index_open(parent_index_oid, AccessShareLock);
/* Build CREATE INDEX statement to recreate the parent_index */
index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap);
/* Copy comment on index */
if (inhRelation->options & CREATE_TABLE_LIKE_COMMENTS)
{
CommentStmt *stmt;
ListCell *lc;
int i;
comment = GetComment(parent_index_oid, RelationRelationId, 0);
if (comment != NULL)
{
/* Assign name for index because CommentStmt requires name. */
if (index_stmt->idxname == NULL)
index_stmt->idxname = chooseIndexName(cxt->relation, index_stmt);
stmt = makeNode(CommentStmt);
stmt->objtype = OBJECT_INDEX;
stmt->objname = list_make2(makeString(cxt->relation->schemaname),
makeString(index_stmt->idxname));
stmt->objargs = NIL;
stmt->comment = comment;
cxt->alist = lappend(cxt->alist, stmt);
}
/* Copy comment on index's columns */
i = 0;
foreach(lc, index_stmt->indexParams)
{
char *attname;
i++;
comment = GetComment(parent_index_oid, RelationRelationId, i);
if (comment == NULL)
continue;
/* Assign name for index because CommentStmt requires name. */
if (index_stmt->idxname == NULL)
index_stmt->idxname = chooseIndexName(cxt->relation, index_stmt);
attname = ((IndexElem *) lfirst(lc))->name;
/* expression index has a dummy column name */
if (attname == NULL)
{
attname = palloc(NAMEDATALEN);
sprintf(attname, "pg_expression_%d", i);
}
stmt = makeNode(CommentStmt);
stmt->objtype = OBJECT_COLUMN;
stmt->objname = list_make3(
makeString(cxt->relation->schemaname),
makeString(index_stmt->idxname),
makeString(attname));
stmt->objargs = NIL;
stmt->comment = comment;
cxt->alist = lappend(cxt->alist, stmt);
}
}
/* Save it in the inh_indexes list for the time being */
cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
index_close(parent_index, AccessShareLock);
}
}
/*
* Close the parent rel, but keep our AccessShareLock on it until xact
* commit. That will prevent someone else from deleting or ALTERing the
* parent before the child is committed.
*/
heap_close(relation, NoLock);
}
/*
* chooseIndexName
*
* Set name to unnamed index. See also the same logic in DefineIndex.
*/
static char *
chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt)
{
Oid namespaceId;
namespaceId = RangeVarGetCreationNamespace(relation);
if (index_stmt->primary)
{
/* no need for column list with pkey */
return ChooseRelationName(relation->relname, NULL,
"pkey", namespaceId);
}
else
{
IndexElem *iparam = (IndexElem *) linitial(index_stmt->indexParams);
return ChooseRelationName(relation->relname, iparam->name,
"key", namespaceId);
}
}
/*
* Generate an IndexStmt node using information from an already existing index
* "source_idx". Attribute numbers should be adjusted according to attmap.
*/
static IndexStmt *
generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
AttrNumber *attmap)
{
Oid source_relid = RelationGetRelid(source_idx);
2007-11-15 22:14:46 +01:00
HeapTuple ht_idxrel;
HeapTuple ht_idx;
2007-11-15 22:14:46 +01:00
Form_pg_class idxrelrec;
Form_pg_index idxrec;
2007-11-15 22:14:46 +01:00
Form_pg_am amrec;
oidvector *indclass;
IndexStmt *index;
List *indexprs;
2007-11-15 22:14:46 +01:00
ListCell *indexpr_item;
Oid indrelid;
int keyno;
Oid keycoltype;
Datum datum;
2007-11-15 22:14:46 +01:00
bool isnull;
/*
* Fetch pg_class tuple of source index. We can't use the copy in the
* relcache entry because it doesn't include optional fields.
*/
ht_idxrel = SearchSysCache(RELOID,
ObjectIdGetDatum(source_relid),
0, 0, 0);
if (!HeapTupleIsValid(ht_idxrel))
elog(ERROR, "cache lookup failed for relation %u", source_relid);
idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel);
/* Fetch pg_index tuple for source index from relcache entry */
ht_idx = source_idx->rd_indextuple;
idxrec = (Form_pg_index) GETSTRUCT(ht_idx);
indrelid = idxrec->indrelid;
/* Fetch pg_am tuple for source index from relcache entry */
amrec = source_idx->rd_am;
/* Must get indclass the hard way, since it's not stored in relcache */
datum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
indclass = (oidvector *) DatumGetPointer(datum);
/* Begin building the IndexStmt */
index = makeNode(IndexStmt);
index->relation = cxt->relation;
index->accessMethod = pstrdup(NameStr(amrec->amname));
if (OidIsValid(idxrelrec->reltablespace))
index->tableSpace = get_tablespace_name(idxrelrec->reltablespace);
else
index->tableSpace = NULL;
index->unique = idxrec->indisunique;
index->primary = idxrec->indisprimary;
index->concurrent = false;
/*
* We don't try to preserve the name of the source index; instead, just
* let DefineIndex() choose a reasonable name.
*/
index->idxname = NULL;
/*
* If the index is marked PRIMARY, it's certainly from a constraint; else,
* if it's not marked UNIQUE, it certainly isn't. If it is or might be
* from a constraint, we have to fetch the constraint to check for
* deferrability attributes.
*/
if (index->primary || index->unique)
{
Oid constraintId = get_index_constraint(source_relid);
if (OidIsValid(constraintId))
{
HeapTuple ht_constr;
Form_pg_constraint conrec;
ht_constr = SearchSysCache(CONSTROID,
ObjectIdGetDatum(constraintId),
0, 0, 0);
if (!HeapTupleIsValid(ht_constr))
elog(ERROR, "cache lookup failed for constraint %u",
constraintId);
conrec = (Form_pg_constraint) GETSTRUCT(ht_constr);
index->isconstraint = true;
index->deferrable = conrec->condeferrable;
index->initdeferred = conrec->condeferred;
ReleaseSysCache(ht_constr);
}
else
index->isconstraint = false;
}
else
index->isconstraint = false;
/* Get the index expressions, if any */
datum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indexprs, &isnull);
if (!isnull)
{
char *exprsString;
exprsString = TextDatumGetCString(datum);
indexprs = (List *) stringToNode(exprsString);
}
else
indexprs = NIL;
/* Build the list of IndexElem */
index->indexParams = NIL;
indexpr_item = list_head(indexprs);
for (keyno = 0; keyno < idxrec->indnatts; keyno++)
{
2007-11-15 22:14:46 +01:00
IndexElem *iparam;
AttrNumber attnum = idxrec->indkey.values[keyno];
int16 opt = source_idx->rd_indoption[keyno];
iparam = makeNode(IndexElem);
if (AttributeNumberIsValid(attnum))
{
/* Simple index column */
char *attname;
attname = get_relid_attribute_name(indrelid, attnum);
keycoltype = get_atttype(indrelid, attnum);
iparam->name = attname;
iparam->expr = NULL;
}
else
{
/* Expressional index */
Node *indexkey;
if (indexpr_item == NULL)
elog(ERROR, "too few entries in indexprs list");
indexkey = (Node *) lfirst(indexpr_item);
indexpr_item = lnext(indexpr_item);
/* OK to modify indexkey since we are working on a private copy */
change_varattnos_of_a_node(indexkey, attmap);
iparam->name = NULL;
iparam->expr = indexkey;
keycoltype = exprType(indexkey);
}
/* Add the operator class name, if non-default */
iparam->opclass = get_opclass(indclass->values[keyno], keycoltype);
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
/* Adjust options if necessary */
if (amrec->amcanorder)
{
/*
* If it supports sort ordering, copy DESC and NULLS opts. Don't
* set non-default settings unnecessarily, though, so as to
* improve the chance of recognizing equivalence to constraint
* indexes.
*/
if (opt & INDOPTION_DESC)
{
iparam->ordering = SORTBY_DESC;
if ((opt & INDOPTION_NULLS_FIRST) == 0)
iparam->nulls_ordering = SORTBY_NULLS_LAST;
}
else
{
if (opt & INDOPTION_NULLS_FIRST)
iparam->nulls_ordering = SORTBY_NULLS_FIRST;
}
}
index->indexParams = lappend(index->indexParams, iparam);
}
/* Copy reloptions if any */
datum = SysCacheGetAttr(RELOID, ht_idxrel,
Anum_pg_class_reloptions, &isnull);
if (!isnull)
index->options = untransformRelOptions(datum);
/* If it's a partial index, decompile and append the predicate */
datum = SysCacheGetAttr(INDEXRELID, ht_idx,
Anum_pg_index_indpred, &isnull);
if (!isnull)
{
char *pred_str;
/* Convert text string to node tree */
pred_str = TextDatumGetCString(datum);
index->whereClause = (Node *) stringToNode(pred_str);
/* Adjust attribute numbers */
change_varattnos_of_a_node(index->whereClause, attmap);
}
/* Clean up */
ReleaseSysCache(ht_idxrel);
return index;
}
/*
* get_opclass - fetch name of an index operator class
*
* If the opclass is the default for the given actual_datatype, then
* the return value is NIL.
*/
static List *
get_opclass(Oid opclass, Oid actual_datatype)
{
2007-11-15 22:14:46 +01:00
HeapTuple ht_opc;
Form_pg_opclass opc_rec;
List *result = NIL;
ht_opc = SearchSysCache(CLAOID,
ObjectIdGetDatum(opclass),
0, 0, 0);
if (!HeapTupleIsValid(ht_opc))
elog(ERROR, "cache lookup failed for opclass %u", opclass);
opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc);
if (GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass)
{
/* For simplicity, we always schema-qualify the name */
2007-11-15 22:14:46 +01:00
char *nsp_name = get_namespace_name(opc_rec->opcnamespace);
char *opc_name = pstrdup(NameStr(opc_rec->opcname));
result = list_make2(makeString(nsp_name), makeString(opc_name));
}
ReleaseSysCache(ht_opc);
return result;
}
/*
* transformIndexConstraints
* Handle UNIQUE and PRIMARY KEY constraints, which create indexes.
* We also merge in any index definitions arising from
* LIKE ... INCLUDING INDEXES.
*/
static void
transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
{
IndexStmt *index;
List *indexlist = NIL;
ListCell *lc;
/*
* Run through the constraints that need to generate an index. For PRIMARY
* KEY, mark each column as NOT NULL and create an index. For UNIQUE,
* create an index as for PRIMARY KEY, but do not insist on NOT NULL.
*/
foreach(lc, cxt->ixconstraints)
{
Constraint *constraint = (Constraint *) lfirst(lc);
Assert(IsA(constraint, Constraint));
Assert(constraint->contype == CONSTR_PRIMARY ||
constraint->contype == CONSTR_UNIQUE);
index = transformIndexConstraint(constraint, cxt);
indexlist = lappend(indexlist, index);
}
/* Add in any indexes defined by LIKE ... INCLUDING INDEXES */
foreach(lc, cxt->inh_indexes)
{
index = (IndexStmt *) lfirst(lc);
if (index->primary)
{
if (cxt->pkey != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->pkey = index;
}
indexlist = lappend(indexlist, index);
}
/*
* Scan the index list and remove any redundant index specifications. This
* can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A
* strict reading of SQL92 would suggest raising an error instead, but
* that strikes me as too anal-retentive. - tgl 2001-02-14
*
* XXX in ALTER TABLE case, it'd be nice to look for duplicate
* pre-existing indexes, too.
*/
Assert(cxt->alist == NIL);
if (cxt->pkey != NULL)
{
/* Make sure we keep the PKEY index in preference to others... */
cxt->alist = list_make1(cxt->pkey);
}
foreach(lc, indexlist)
{
bool keep = true;
ListCell *k;
index = lfirst(lc);
/* if it's pkey, it's already in cxt->alist */
if (index == cxt->pkey)
continue;
foreach(k, cxt->alist)
{
IndexStmt *priorindex = lfirst(k);
if (equal(index->indexParams, priorindex->indexParams) &&
equal(index->whereClause, priorindex->whereClause) &&
strcmp(index->accessMethod, priorindex->accessMethod) == 0 &&
index->deferrable == priorindex->deferrable &&
index->initdeferred == priorindex->initdeferred)
{
priorindex->unique |= index->unique;
/*
* If the prior index is as yet unnamed, and this one is
* named, then transfer the name to the prior index. This
* ensures that if we have named and unnamed constraints,
* we'll use (at least one of) the names for the index.
*/
if (priorindex->idxname == NULL)
priorindex->idxname = index->idxname;
keep = false;
break;
}
}
if (keep)
cxt->alist = lappend(cxt->alist, index);
}
}
/*
* transformIndexConstraint
* Transform one UNIQUE or PRIMARY KEY constraint for
* transformIndexConstraints.
*/
static IndexStmt *
transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
2007-11-15 22:14:46 +01:00
IndexStmt *index;
ListCell *keys;
IndexElem *iparam;
index = makeNode(IndexStmt);
index->unique = true;
index->primary = (constraint->contype == CONSTR_PRIMARY);
if (index->primary)
{
if (cxt->pkey != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2007-11-15 22:14:46 +01:00
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->pkey = index;
/*
* In ALTER TABLE case, a primary index might already exist, but
* DefineIndex will check for it.
*/
}
index->isconstraint = true;
index->deferrable = constraint->deferrable;
index->initdeferred = constraint->initdeferred;
if (constraint->conname != NULL)
index->idxname = pstrdup(constraint->conname);
else
2007-11-15 22:14:46 +01:00
index->idxname = NULL; /* DefineIndex will choose name */
index->relation = cxt->relation;
index->accessMethod = DEFAULT_INDEX_TYPE;
index->options = constraint->options;
index->tableSpace = constraint->indexspace;
index->indexParams = NIL;
index->whereClause = NULL;
index->concurrent = false;
/*
2007-11-15 22:14:46 +01:00
* Make sure referenced keys exist. If we are making a PRIMARY KEY index,
* also make sure they are NOT NULL, if possible. (Although we could leave
* it to DefineIndex to mark the columns NOT NULL, it's more efficient to
* get it right the first time.)
*/
foreach(keys, constraint->keys)
{
char *key = strVal(lfirst(keys));
bool found = false;
ColumnDef *column = NULL;
ListCell *columns;
foreach(columns, cxt->columns)
{
column = (ColumnDef *) lfirst(columns);
Assert(IsA(column, ColumnDef));
if (strcmp(column->colname, key) == 0)
{
found = true;
break;
}
}
if (found)
{
/* found column in the new table; force it to be NOT NULL */
if (constraint->contype == CONSTR_PRIMARY)
column->is_not_null = TRUE;
}
else if (SystemAttributeByName(key, cxt->hasoids) != NULL)
{
/*
2007-11-15 22:14:46 +01:00
* column will be a system column in the new table, so accept it.
* System columns can't ever be null, so no need to worry about
* PRIMARY/NOT NULL constraint.
*/
found = true;
}
else if (cxt->inhRelations)
{
/* try inherited tables */
ListCell *inher;
foreach(inher, cxt->inhRelations)
{
RangeVar *inh = (RangeVar *) lfirst(inher);
Relation rel;
int count;
Assert(IsA(inh, RangeVar));
rel = heap_openrv(inh, AccessShareLock);
if (rel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2007-11-15 22:14:46 +01:00
errmsg("inherited relation \"%s\" is not a table",
inh->relname)));
for (count = 0; count < rel->rd_att->natts; count++)
{
Form_pg_attribute inhattr = rel->rd_att->attrs[count];
char *inhname = NameStr(inhattr->attname);
if (inhattr->attisdropped)
continue;
if (strcmp(key, inhname) == 0)
{
found = true;
/*
2007-11-15 22:14:46 +01:00
* We currently have no easy way to force an inherited
* column to be NOT NULL at creation, if its parent
* wasn't so already. We leave it to DefineIndex to
* fix things up in this case.
*/
break;
}
}
heap_close(rel, NoLock);
if (found)
break;
}
}
/*
* In the ALTER TABLE case, don't complain about index keys not
2007-11-15 22:14:46 +01:00
* created in the command; they may well exist already. DefineIndex
* will complain about them if not, and will also take care of marking
* them NOT NULL.
*/
if (!found && !cxt->isalter)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
key)));
/* Check for PRIMARY KEY(foo, foo) */
foreach(columns, index->indexParams)
{
iparam = (IndexElem *) lfirst(columns);
if (iparam->name && strcmp(key, iparam->name) == 0)
{
if (index->primary)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in primary key constraint",
key)));
else
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
2007-11-15 22:14:46 +01:00
errmsg("column \"%s\" appears twice in unique constraint",
key)));
}
}
/* OK, add it to the index definition */
iparam = makeNode(IndexElem);
iparam->name = pstrdup(key);
iparam->expr = NULL;
iparam->opclass = NIL;
iparam->ordering = SORTBY_DEFAULT;
iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
index->indexParams = lappend(index->indexParams, iparam);
}
return index;
}
/*
* transformFKConstraints
* handle FOREIGN KEY constraints
*/
static void
transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt,
bool skipValidation, bool isAddConstraint)
{
ListCell *fkclist;
if (cxt->fkconstraints == NIL)
return;
/*
* If CREATE TABLE or adding a column with NULL default, we can safely
* skip validation of the constraint.
*/
if (skipValidation)
{
foreach(fkclist, cxt->fkconstraints)
{
Constraint *constraint = (Constraint *) lfirst(fkclist);
constraint->skip_validation = true;
}
}
/*
* For CREATE TABLE or ALTER TABLE ADD COLUMN, gin up an ALTER TABLE ADD
* CONSTRAINT command to execute after the basic command is complete. (If
* called from ADD CONSTRAINT, that routine will add the FK constraints to
* its own subcommand list.)
*
* Note: the ADD CONSTRAINT command must also execute after any index
* creation commands. Thus, this should run after
* transformIndexConstraints, so that the CREATE INDEX commands are
* already in cxt->alist.
*/
if (!isAddConstraint)
{
AlterTableStmt *alterstmt = makeNode(AlterTableStmt);
alterstmt->relation = cxt->relation;
alterstmt->cmds = NIL;
alterstmt->relkind = OBJECT_TABLE;
foreach(fkclist, cxt->fkconstraints)
{
Constraint *constraint = (Constraint *) lfirst(fkclist);
AlterTableCmd *altercmd = makeNode(AlterTableCmd);
altercmd->subtype = AT_ProcessedConstraint;
altercmd->name = NULL;
altercmd->def = (Node *) constraint;
alterstmt->cmds = lappend(alterstmt->cmds, altercmd);
}
cxt->alist = lappend(cxt->alist, alterstmt);
}
}
/*
* transformIndexStmt - parse analysis for CREATE INDEX
*
* Note: this is a no-op for an index not using either index expressions or
2007-11-15 22:14:46 +01:00
* a predicate expression. There are several code paths that create indexes
* without bothering to call this, because they know they don't have any
* such expressions to deal with.
*/
IndexStmt *
transformIndexStmt(IndexStmt *stmt, const char *queryString)
{
Relation rel;
ParseState *pstate;
RangeTblEntry *rte;
ListCell *l;
/*
2007-11-15 22:14:46 +01:00
* We must not scribble on the passed-in IndexStmt, so copy it. (This is
* overkill, but easy.)
*/
stmt = (IndexStmt *) copyObject(stmt);
/*
2007-11-15 22:14:46 +01:00
* Open the parent table with appropriate locking. We must do this
* because addRangeTableEntry() would acquire only AccessShareLock,
2007-11-15 22:14:46 +01:00
* leaving DefineIndex() needing to do a lock upgrade with consequent risk
* of deadlock. Make sure this stays in sync with the type of lock
* DefineIndex() wants.
*/
rel = heap_openrv(stmt->relation,
2007-11-15 22:14:46 +01:00
(stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
/* Set up pstate */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
/*
2007-11-15 22:14:46 +01:00
* Put the parent table into the rtable so that the expressions can refer
* to its fields without qualification.
*/
rte = addRangeTableEntry(pstate, stmt->relation, NULL, false, true);
/* no to join list, yes to namespaces */
addRTEtoQuery(pstate, rte, false, true, true);
/* take care of the where clause */
if (stmt->whereClause)
stmt->whereClause = transformWhereClause(pstate,
stmt->whereClause,
"WHERE");
/* take care of any index expressions */
foreach(l, stmt->indexParams)
{
IndexElem *ielem = (IndexElem *) lfirst(l);
if (ielem->expr)
{
ielem->expr = transformExpr(pstate, ielem->expr);
/*
* We check only that the result type is legitimate; this is for
* consistency with what transformWhereClause() checks for the
* predicate. DefineIndex() will make more checks.
*/
if (expression_returns_set(ielem->expr))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("index expression cannot return a set")));
}
}
/*
* Check that only the base rel is mentioned.
*/
if (list_length(pstate->p_rtable) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("index expressions and predicates can refer only to the table being indexed")));
free_parsestate(pstate);
/* Close relation, but keep the lock */
heap_close(rel, NoLock);
return stmt;
}
/*
* transformRuleStmt -
* transform a CREATE RULE Statement. The action is a list of parse
* trees which is transformed into a list of query trees, and we also
* transform the WHERE clause if any.
*
* actions and whereClause are output parameters that receive the
* transformed results.
*
* Note that we must not scribble on the passed-in RuleStmt, so we do
* copyObject() on the actions and WHERE clause.
*/
void
transformRuleStmt(RuleStmt *stmt, const char *queryString,
List **actions, Node **whereClause)
{
Relation rel;
ParseState *pstate;
RangeTblEntry *oldrte;
RangeTblEntry *newrte;
/*
* To avoid deadlock, make sure the first thing we do is grab
* AccessExclusiveLock on the target relation. This will be needed by
* DefineQueryRewrite(), and we don't want to grab a lesser lock
* beforehand.
*/
rel = heap_openrv(stmt->relation, AccessExclusiveLock);
/* Set up pstate */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
/*
* NOTE: 'OLD' must always have a varno equal to 1 and 'NEW' equal to 2.
* Set up their RTEs in the main pstate for use in parsing the rule
* qualification.
*/
oldrte = addRangeTableEntryForRelation(pstate, rel,
makeAlias("*OLD*", NIL),
false, false);
newrte = addRangeTableEntryForRelation(pstate, rel,
makeAlias("*NEW*", NIL),
false, false);
/* Must override addRangeTableEntry's default access-check flags */
oldrte->requiredPerms = 0;
newrte->requiredPerms = 0;
/*
* They must be in the namespace too for lookup purposes, but only add the
* one(s) that are relevant for the current kind of rule. In an UPDATE
* rule, quals must refer to OLD.field or NEW.field to be unambiguous, but
* there's no need to be so picky for INSERT & DELETE. We do not add them
* to the joinlist.
*/
switch (stmt->event)
{
case CMD_SELECT:
addRTEtoQuery(pstate, oldrte, false, true, true);
break;
case CMD_UPDATE:
addRTEtoQuery(pstate, oldrte, false, true, true);
addRTEtoQuery(pstate, newrte, false, true, true);
break;
case CMD_INSERT:
addRTEtoQuery(pstate, newrte, false, true, true);
break;
case CMD_DELETE:
addRTEtoQuery(pstate, oldrte, false, true, true);
break;
default:
elog(ERROR, "unrecognized event type: %d",
(int) stmt->event);
break;
}
/* take care of the where clause */
*whereClause = transformWhereClause(pstate,
2007-11-15 22:14:46 +01:00
(Node *) copyObject(stmt->whereClause),
"WHERE");
if (list_length(pstate->p_rtable) != 2) /* naughty, naughty... */
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("rule WHERE condition cannot contain references to other relations")));
/* aggregates not allowed (but subselects are okay) */
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in rule WHERE condition")));
if (pstate->p_hasWindowFuncs)
ereport(ERROR,
(errcode(ERRCODE_WINDOWING_ERROR),
errmsg("cannot use window function in rule WHERE condition")));
/*
* 'instead nothing' rules with a qualification need a query rangetable so
* the rewrite handler can add the negated rule qualification to the
* original query. We create a query with the new command type CMD_NOTHING
* here that is treated specially by the rewrite system.
*/
if (stmt->actions == NIL)
{
Query *nothing_qry = makeNode(Query);
nothing_qry->commandType = CMD_NOTHING;
nothing_qry->rtable = pstate->p_rtable;
2007-11-15 22:14:46 +01:00
nothing_qry->jointree = makeFromExpr(NIL, NULL); /* no join wanted */
*actions = list_make1(nothing_qry);
}
else
{
ListCell *l;
List *newactions = NIL;
/*
* transform each statement, like parse_sub_analyze()
*/
foreach(l, stmt->actions)
{
Node *action = (Node *) lfirst(l);
ParseState *sub_pstate = make_parsestate(NULL);
Query *sub_qry,
*top_subqry;
bool has_old,
has_new;
/*
2007-11-15 22:14:46 +01:00
* Since outer ParseState isn't parent of inner, have to pass down
* the query text by hand.
*/
sub_pstate->p_sourcetext = queryString;
/*
* Set up OLD/NEW in the rtable for this statement. The entries
* are added only to relnamespace, not varnamespace, because we
* don't want them to be referred to by unqualified field names
* nor "*" in the rule actions. We decide later whether to put
* them in the joinlist.
*/
oldrte = addRangeTableEntryForRelation(sub_pstate, rel,
makeAlias("*OLD*", NIL),
false, false);
newrte = addRangeTableEntryForRelation(sub_pstate, rel,
makeAlias("*NEW*", NIL),
false, false);
oldrte->requiredPerms = 0;
newrte->requiredPerms = 0;
addRTEtoQuery(sub_pstate, oldrte, false, true, false);
addRTEtoQuery(sub_pstate, newrte, false, true, false);
/* Transform the rule action statement */
top_subqry = transformStmt(sub_pstate,
(Node *) copyObject(action));
/*
* We cannot support utility-statement actions (eg NOTIFY) with
* nonempty rule WHERE conditions, because there's no way to make
* the utility action execute conditionally.
*/
if (top_subqry->commandType == CMD_UTILITY &&
*whereClause != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("rules with WHERE conditions can only have SELECT, INSERT, UPDATE, or DELETE actions")));
/*
* If the action is INSERT...SELECT, OLD/NEW have been pushed down
* into the SELECT, and that's what we need to look at. (Ugly
* kluge ... try to fix this when we redesign querytrees.)
*/
sub_qry = getInsertSelectQuery(top_subqry, NULL);
/*
* If the sub_qry is a setop, we cannot attach any qualifications
* to it, because the planner won't notice them. This could
* perhaps be relaxed someday, but for now, we may as well reject
* such a rule immediately.
*/
if (sub_qry->setOperations != NULL && *whereClause != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("conditional UNION/INTERSECT/EXCEPT statements are not implemented")));
/*
* Validate action's use of OLD/NEW, qual too
*/
has_old =
rangeTableEntry_used((Node *) sub_qry, PRS2_OLD_VARNO, 0) ||
rangeTableEntry_used(*whereClause, PRS2_OLD_VARNO, 0);
has_new =
rangeTableEntry_used((Node *) sub_qry, PRS2_NEW_VARNO, 0) ||
rangeTableEntry_used(*whereClause, PRS2_NEW_VARNO, 0);
switch (stmt->event)
{
case CMD_SELECT:
if (has_old)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("ON SELECT rule cannot use OLD")));
if (has_new)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("ON SELECT rule cannot use NEW")));
break;
case CMD_UPDATE:
/* both are OK */
break;
case CMD_INSERT:
if (has_old)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("ON INSERT rule cannot use OLD")));
break;
case CMD_DELETE:
if (has_new)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("ON DELETE rule cannot use NEW")));
break;
default:
elog(ERROR, "unrecognized event type: %d",
(int) stmt->event);
break;
}
/*
* For efficiency's sake, add OLD to the rule action's jointree
* only if it was actually referenced in the statement or qual.
*
* For INSERT, NEW is not really a relation (only a reference to
* the to-be-inserted tuple) and should never be added to the
* jointree.
*
* For UPDATE, we treat NEW as being another kind of reference to
* OLD, because it represents references to *transformed* tuples
* of the existing relation. It would be wrong to enter NEW
* separately in the jointree, since that would cause a double
* join of the updated relation. It's also wrong to fail to make
* a jointree entry if only NEW and not OLD is mentioned.
*/
if (has_old || (has_new && stmt->event == CMD_UPDATE))
{
/*
* If sub_qry is a setop, manipulating its jointree will do no
* good at all, because the jointree is dummy. (This should be
* a can't-happen case because of prior tests.)
*/
if (sub_qry->setOperations != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("conditional UNION/INTERSECT/EXCEPT statements are not implemented")));
/* hack so we can use addRTEtoQuery() */
sub_pstate->p_rtable = sub_qry->rtable;
sub_pstate->p_joinlist = sub_qry->jointree->fromlist;
addRTEtoQuery(sub_pstate, oldrte, true, false, false);
sub_qry->jointree->fromlist = sub_pstate->p_joinlist;
}
newactions = lappend(newactions, top_subqry);
free_parsestate(sub_pstate);
}
*actions = newactions;
}
free_parsestate(pstate);
/* Close relation, but keep the exclusive lock */
heap_close(rel, NoLock);
}
/*
* transformAlterTableStmt -
* parse analysis for ALTER TABLE
*
* Returns a List of utility commands to be done in sequence. One of these
* will be the transformed AlterTableStmt, but there may be additional actions
* to be done before and after the actual AlterTable() call.
*/
List *
transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
{
Relation rel;
ParseState *pstate;
CreateStmtContext cxt;
List *result;
List *save_alist;
ListCell *lcmd,
*l;
List *newcmds = NIL;
bool skipValidation = true;
AlterTableCmd *newcmd;
/*
2007-11-15 22:14:46 +01:00
* We must not scribble on the passed-in AlterTableStmt, so copy it. (This
* is overkill, but easy.)
*/
stmt = (AlterTableStmt *) copyObject(stmt);
/*
2007-11-15 22:14:46 +01:00
* Acquire exclusive lock on the target relation, which will be held until
* end of transaction. This ensures any decisions we make here based on
* the state of the relation will still be good at execution. We must get
* exclusive lock now because execution will; taking a lower grade lock
* now and trying to upgrade later risks deadlock.
*/
rel = relation_openrv(stmt->relation, AccessExclusiveLock);
/* Set up pstate */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
cxt.stmtType = "ALTER TABLE";
cxt.relation = stmt->relation;
cxt.rel = rel;
cxt.inhRelations = NIL;
cxt.isalter = true;
cxt.hasoids = false; /* need not be right */
cxt.columns = NIL;
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
cxt.inh_indexes = NIL;
cxt.blist = NIL;
cxt.alist = NIL;
cxt.pkey = NULL;
/*
* The only subtypes that currently require parse transformation handling
* are ADD COLUMN and ADD CONSTRAINT. These largely re-use code from
* CREATE TABLE.
*/
foreach(lcmd, stmt->cmds)
{
AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
switch (cmd->subtype)
{
case AT_AddColumn:
case AT_AddColumnToView:
{
ColumnDef *def = (ColumnDef *) cmd->def;
Assert(IsA(def, ColumnDef));
transformColumnDefinition(pstate, &cxt, def);
/*
* If the column has a non-null default, we can't skip
* validation of foreign keys.
*/
if (def->raw_default != NULL)
skipValidation = false;
/*
* All constraints are processed in other ways. Remove the
* original list
*/
def->constraints = NIL;
newcmds = lappend(newcmds, cmd);
break;
}
case AT_AddConstraint:
/*
* The original AddConstraint cmd node doesn't go to newcmds
*/
if (IsA(cmd->def, Constraint))
{
transformTableConstraint(pstate, &cxt,
(Constraint *) cmd->def);
if (((Constraint *) cmd->def)->contype == CONSTR_FOREIGN)
skipValidation = false;
}
else
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(cmd->def));
break;
case AT_ProcessedConstraint:
/*
* Already-transformed ADD CONSTRAINT, so just make it look
* like the standard case.
*/
cmd->subtype = AT_AddConstraint;
newcmds = lappend(newcmds, cmd);
break;
default:
newcmds = lappend(newcmds, cmd);
break;
}
}
/*
* transformIndexConstraints wants cxt.alist to contain only index
* statements, so transfer anything we already have into save_alist
* immediately.
*/
save_alist = cxt.alist;
cxt.alist = NIL;
/* Postprocess index and FK constraints */
transformIndexConstraints(pstate, &cxt);
transformFKConstraints(pstate, &cxt, skipValidation, true);
/*
* Push any index-creation commands into the ALTER, so that they can be
* scheduled nicely by tablecmds.c. Note that tablecmds.c assumes that
* the IndexStmt attached to an AT_AddIndex subcommand has already been
* through transformIndexStmt.
*/
foreach(l, cxt.alist)
{
Node *idxstmt = (Node *) lfirst(l);
Assert(IsA(idxstmt, IndexStmt));
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_AddIndex;
newcmd->def = (Node *) transformIndexStmt((IndexStmt *) idxstmt,
queryString);
newcmds = lappend(newcmds, newcmd);
}
cxt.alist = NIL;
/* Append any CHECK or FK constraints to the commands list */
foreach(l, cxt.ckconstraints)
{
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_AddConstraint;
newcmd->def = (Node *) lfirst(l);
newcmds = lappend(newcmds, newcmd);
}
foreach(l, cxt.fkconstraints)
{
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_AddConstraint;
newcmd->def = (Node *) lfirst(l);
newcmds = lappend(newcmds, newcmd);
}
/* Close rel but keep lock */
relation_close(rel, NoLock);
/*
* Output results.
*/
stmt->cmds = newcmds;
result = lappend(cxt.blist, stmt);
result = list_concat(result, cxt.alist);
result = list_concat(result, save_alist);
return result;
}
/*
* Preprocess a list of column constraint clauses
* to attach constraint attributes to their primary constraint nodes
* and detect inconsistent/misplaced constraint attributes.
*
* NOTE: currently, attributes are only supported for FOREIGN KEY, UNIQUE,
* and PRIMARY KEY constraints, but someday they ought to be supported
* for other constraint types.
*/
static void
transformConstraintAttrs(ParseState *pstate, List *constraintList)
{
Constraint *lastprimarycon = NULL;
bool saw_deferrability = false;
bool saw_initially = false;
ListCell *clist;
#define SUPPORTS_ATTRS(node) \
((node) != NULL && \
((node)->contype == CONSTR_PRIMARY || \
(node)->contype == CONSTR_UNIQUE || \
(node)->contype == CONSTR_FOREIGN))
foreach(clist, constraintList)
{
Constraint *con = (Constraint *) lfirst(clist);
if (!IsA(con, Constraint))
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(con));
switch (con->contype)
{
case CONSTR_ATTR_DEFERRABLE:
if (!SUPPORTS_ATTRS(lastprimarycon))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced DEFERRABLE clause"),
parser_errposition(pstate, con->location)));
if (saw_deferrability)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple DEFERRABLE/NOT DEFERRABLE clauses not allowed"),
parser_errposition(pstate, con->location)));
saw_deferrability = true;
lastprimarycon->deferrable = true;
break;
case CONSTR_ATTR_NOT_DEFERRABLE:
if (!SUPPORTS_ATTRS(lastprimarycon))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced NOT DEFERRABLE clause"),
parser_errposition(pstate, con->location)));
if (saw_deferrability)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple DEFERRABLE/NOT DEFERRABLE clauses not allowed"),
parser_errposition(pstate, con->location)));
saw_deferrability = true;
lastprimarycon->deferrable = false;
if (saw_initially &&
lastprimarycon->initdeferred)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("constraint declared INITIALLY DEFERRED must be DEFERRABLE"),
parser_errposition(pstate, con->location)));
break;
case CONSTR_ATTR_DEFERRED:
if (!SUPPORTS_ATTRS(lastprimarycon))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced INITIALLY DEFERRED clause"),
parser_errposition(pstate, con->location)));
if (saw_initially)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple INITIALLY IMMEDIATE/DEFERRED clauses not allowed"),
parser_errposition(pstate, con->location)));
saw_initially = true;
lastprimarycon->initdeferred = true;
/*
* If only INITIALLY DEFERRED appears, assume DEFERRABLE
*/
if (!saw_deferrability)
lastprimarycon->deferrable = true;
else if (!lastprimarycon->deferrable)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("constraint declared INITIALLY DEFERRED must be DEFERRABLE"),
parser_errposition(pstate, con->location)));
break;
case CONSTR_ATTR_IMMEDIATE:
if (!SUPPORTS_ATTRS(lastprimarycon))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced INITIALLY IMMEDIATE clause"),
parser_errposition(pstate, con->location)));
if (saw_initially)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple INITIALLY IMMEDIATE/DEFERRED clauses not allowed"),
parser_errposition(pstate, con->location)));
saw_initially = true;
lastprimarycon->initdeferred = false;
break;
default:
/* Otherwise it's not an attribute */
lastprimarycon = con;
/* reset flags for new primary node */
saw_deferrability = false;
saw_initially = false;
break;
}
}
}
/*
* Special handling of type definition for a column
*/
static void
transformColumnType(ParseState *pstate, ColumnDef *column)
{
/*
* All we really need to do here is verify that the type is valid.
*/
Type ctype = typenameType(pstate, column->typeName, NULL);
ReleaseSysCache(ctype);
}
/*
* transformCreateSchemaStmt -
* analyzes the CREATE SCHEMA statement
*
* Split the schema element list into individual commands and place
* them in the result list in an order such that there are no forward
* references (e.g. GRANT to a table created later in the list). Note
* that the logic we use for determining forward references is
* presently quite incomplete.
*
* SQL92 also allows constraints to make forward references, so thumb through
* the table columns and move forward references to a posterior alter-table
* command.
*
* The result is a list of parse nodes that still need to be analyzed ---
* but we can't analyze the later commands until we've executed the earlier
* ones, because of possible inter-object references.
*
* Note: this breaks the rules a little bit by modifying schema-name fields
* within passed-in structs. However, the transformation would be the same
* if done over, so it should be all right to scribble on the input to this
* extent.
*/
List *
transformCreateSchemaStmt(CreateSchemaStmt *stmt)
{
CreateSchemaStmtContext cxt;
List *result;
ListCell *elements;
cxt.stmtType = "CREATE SCHEMA";
cxt.schemaname = stmt->schemaname;
cxt.authid = stmt->authid;
cxt.sequences = NIL;
cxt.tables = NIL;
cxt.views = NIL;
cxt.indexes = NIL;
cxt.triggers = NIL;
cxt.grants = NIL;
/*
* Run through each schema element in the schema element list. Separate
* statements by type, and do preliminary analysis.
*/
foreach(elements, stmt->schemaElts)
{
Node *element = lfirst(elements);
switch (nodeTag(element))
{
case T_CreateSeqStmt:
{
CreateSeqStmt *elp = (CreateSeqStmt *) element;
setSchemaName(cxt.schemaname, &elp->sequence->schemaname);
cxt.sequences = lappend(cxt.sequences, element);
}
break;
case T_CreateStmt:
{
CreateStmt *elp = (CreateStmt *) element;
setSchemaName(cxt.schemaname, &elp->relation->schemaname);
/*
* XXX todo: deal with constraints
*/
cxt.tables = lappend(cxt.tables, element);
}
break;
case T_ViewStmt:
{
ViewStmt *elp = (ViewStmt *) element;
setSchemaName(cxt.schemaname, &elp->view->schemaname);
/*
* XXX todo: deal with references between views
*/
cxt.views = lappend(cxt.views, element);
}
break;
case T_IndexStmt:
{
IndexStmt *elp = (IndexStmt *) element;
setSchemaName(cxt.schemaname, &elp->relation->schemaname);
cxt.indexes = lappend(cxt.indexes, element);
}
break;
case T_CreateTrigStmt:
{
CreateTrigStmt *elp = (CreateTrigStmt *) element;
setSchemaName(cxt.schemaname, &elp->relation->schemaname);
cxt.triggers = lappend(cxt.triggers, element);
}
break;
case T_GrantStmt:
cxt.grants = lappend(cxt.grants, element);
break;
default:
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(element));
}
}
result = NIL;
result = list_concat(result, cxt.sequences);
result = list_concat(result, cxt.tables);
result = list_concat(result, cxt.views);
result = list_concat(result, cxt.indexes);
result = list_concat(result, cxt.triggers);
result = list_concat(result, cxt.grants);
return result;
}
/*
* setSchemaName
* Set or check schema name in an element of a CREATE SCHEMA command
*/
static void
setSchemaName(char *context_schema, char **stmt_schema_name)
{
if (*stmt_schema_name == NULL)
*stmt_schema_name = context_schema;
else if (strcmp(context_schema, *stmt_schema_name) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_SCHEMA_DEFINITION),
errmsg("CREATE specifies a schema (%s) "
"different from the one being created (%s)",
*stmt_schema_name, context_schema)));
}