Fix multiple assignments to a column of a domain type.

We allow INSERT and UPDATE commands to assign to the same column more than
once, as long as the assignments are to subfields or elements rather than
the whole column.  However, this failed when the target column was a domain
over array rather than plain array.  Fix by teaching process_matched_tle()
to look through CoerceToDomain nodes, and add relevant test cases.

Also add a group of test cases exercising domains over array of composite.
It's doubtless accidental that CREATE DOMAIN allows this case while not
allowing straight domain over composite; but it does, so we'd better make
sure we don't break it.  (I could not find any documentation mentioning
either side of that, so no doc changes.)

It's been like this for a long time, so back-patch to all supported
branches.

Discussion: https://postgr.es/m/4206.1499798337@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2017-07-11 16:48:59 -04:00
parent c0077f7383
commit 1233680611
3 changed files with 185 additions and 4 deletions

View File

@ -878,6 +878,7 @@ process_matched_tle(TargetEntry *src_tle,
const char *attrName) const char *attrName)
{ {
TargetEntry *result; TargetEntry *result;
CoerceToDomain *coerce_expr = NULL;
Node *src_expr; Node *src_expr;
Node *prior_expr; Node *prior_expr;
Node *src_input; Node *src_input;
@ -914,10 +915,30 @@ process_matched_tle(TargetEntry *src_tle,
* For FieldStore, instead of nesting we can generate a single * For FieldStore, instead of nesting we can generate a single
* FieldStore with multiple target fields. We must nest when * FieldStore with multiple target fields. We must nest when
* ArrayRefs are involved though. * ArrayRefs are involved though.
*
* As a further complication, the destination column might be a domain,
* resulting in each assignment containing a CoerceToDomain node over a
* FieldStore or ArrayRef. These should have matching target domains,
* so we strip them and reconstitute a single CoerceToDomain over the
* combined FieldStore/ArrayRef nodes. (Notice that this has the result
* that the domain's checks are applied only after we do all the field or
* element updates, not after each one. This is arguably desirable.)
*---------- *----------
*/ */
src_expr = (Node *) src_tle->expr; src_expr = (Node *) src_tle->expr;
prior_expr = (Node *) prior_tle->expr; prior_expr = (Node *) prior_tle->expr;
if (src_expr && IsA(src_expr, CoerceToDomain) &&
prior_expr && IsA(prior_expr, CoerceToDomain) &&
((CoerceToDomain *) src_expr)->resulttype ==
((CoerceToDomain *) prior_expr)->resulttype)
{
/* we assume without checking that resulttypmod/resultcollid match */
coerce_expr = (CoerceToDomain *) src_expr;
src_expr = (Node *) ((CoerceToDomain *) src_expr)->arg;
prior_expr = (Node *) ((CoerceToDomain *) prior_expr)->arg;
}
src_input = get_assignment_input(src_expr); src_input = get_assignment_input(src_expr);
prior_input = get_assignment_input(prior_expr); prior_input = get_assignment_input(prior_expr);
if (src_input == NULL || if (src_input == NULL ||
@ -986,6 +1007,16 @@ process_matched_tle(TargetEntry *src_tle,
newexpr = NULL; newexpr = NULL;
} }
if (coerce_expr)
{
/* put back the CoerceToDomain */
CoerceToDomain *newcoerce = makeNode(CoerceToDomain);
memcpy(newcoerce, coerce_expr, sizeof(CoerceToDomain));
newcoerce->arg = (Expr *) newexpr;
newexpr = (Node *) newcoerce;
}
result = flatCopyTargetEntry(src_tle); result = flatCopyTargetEntry(src_tle);
result->expr = (Expr *) newexpr; result->expr = (Expr *) newexpr;
return result; return result;

View File

@ -107,6 +107,7 @@ INSERT INTO domarrtest values ('{2,2}', '{{"a"},{"c"}}');
INSERT INTO domarrtest values (NULL, '{{"a","b","c"},{"d","e","f"}}'); INSERT INTO domarrtest values (NULL, '{{"a","b","c"},{"d","e","f"}}');
INSERT INTO domarrtest values (NULL, '{{"toolong","b","c"},{"d","e","f"}}'); INSERT INTO domarrtest values (NULL, '{{"toolong","b","c"},{"d","e","f"}}');
ERROR: value too long for type character varying(4) ERROR: value too long for type character varying(4)
INSERT INTO domarrtest (testint4arr[1], testint4arr[3]) values (11,22);
select * from domarrtest; select * from domarrtest;
testint4arr | testchar4arr testint4arr | testchar4arr
---------------+--------------------- ---------------+---------------------
@ -115,7 +116,8 @@ select * from domarrtest;
{2,2} | {{a,b},{c,d},{e,f}} {2,2} | {{a,b},{c,d},{e,f}}
{2,2} | {{a},{c}} {2,2} | {{a},{c}}
| {{a,b,c},{d,e,f}} | {{a,b,c},{d,e,f}}
(5 rows) {11,NULL,22} |
(6 rows)
select testint4arr[1], testchar4arr[2:2] from domarrtest; select testint4arr[1], testchar4arr[2:2] from domarrtest;
testint4arr | testchar4arr testint4arr | testchar4arr
@ -125,7 +127,8 @@ select testint4arr[1], testchar4arr[2:2] from domarrtest;
2 | {{c,d}} 2 | {{c,d}}
2 | {{c}} 2 | {{c}}
| {{d,e,f}} | {{d,e,f}}
(5 rows) 11 |
(6 rows)
select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest; select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest;
array_dims | array_dims array_dims | array_dims
@ -135,7 +138,8 @@ select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest;
[1:2] | [1:3][1:2] [1:2] | [1:3][1:2]
[1:2] | [1:2][1:1] [1:2] | [1:2][1:1]
| [1:2][1:3] | [1:2][1:3]
(5 rows) [1:3] |
(6 rows)
COPY domarrtest FROM stdin; COPY domarrtest FROM stdin;
COPY domarrtest FROM stdin; -- fail COPY domarrtest FROM stdin; -- fail
@ -149,9 +153,21 @@ select * from domarrtest;
{2,2} | {{a,b},{c,d},{e,f}} {2,2} | {{a,b},{c,d},{e,f}}
{2,2} | {{a},{c}} {2,2} | {{a},{c}}
| {{a,b,c},{d,e,f}} | {{a,b,c},{d,e,f}}
{11,NULL,22} |
{3,4} | {q,w,e} {3,4} | {q,w,e}
| |
(7 rows) (8 rows)
update domarrtest set
testint4arr[1] = testint4arr[1] + 1,
testint4arr[3] = testint4arr[3] - 1
where testchar4arr is null;
select * from domarrtest where testchar4arr is null;
testint4arr | testchar4arr
------------------+--------------
{12,NULL,21} |
{NULL,NULL,NULL} |
(2 rows)
drop table domarrtest; drop table domarrtest;
drop domain domainint4arr restrict; drop domain domainint4arr restrict;
@ -182,6 +198,92 @@ select pg_typeof('{1,2,3}'::dia || 42); -- should be int[] not dia
(1 row) (1 row)
drop domain dia; drop domain dia;
-- Test domains over arrays of composite
create type comptype as (r float8, i float8);
create domain dcomptypea as comptype[];
create table dcomptable (d1 dcomptypea unique);
insert into dcomptable values (array[row(1,2)]::dcomptypea);
insert into dcomptable values (array[row(3,4), row(5,6)]::comptype[]);
insert into dcomptable values (array[row(7,8)::comptype, row(9,10)::comptype]);
insert into dcomptable values (array[row(1,2)]::dcomptypea); -- fail on uniqueness
ERROR: duplicate key value violates unique constraint "dcomptable_d1_key"
DETAIL: Key (d1)=({"(1,2)"}) already exists.
insert into dcomptable (d1[1]) values(row(9,10));
insert into dcomptable (d1[1].r) values(11);
select * from dcomptable;
d1
--------------------
{"(1,2)"}
{"(3,4)","(5,6)"}
{"(7,8)","(9,10)"}
{"(9,10)"}
{"(11,)"}
(5 rows)
select d1[2], d1[1].r, d1[1].i from dcomptable;
d1 | r | i
--------+----+----
| 1 | 2
(5,6) | 3 | 4
(9,10) | 7 | 8
| 9 | 10
| 11 |
(5 rows)
update dcomptable set d1[2] = row(d1[2].i, d1[2].r);
select * from dcomptable;
d1
--------------------
{"(1,2)","(,)"}
{"(3,4)","(6,5)"}
{"(7,8)","(10,9)"}
{"(9,10)","(,)"}
{"(11,)","(,)"}
(5 rows)
update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0;
select * from dcomptable;
d1
--------------------
{"(11,)","(,)"}
{"(2,2)","(,)"}
{"(4,4)","(6,5)"}
{"(8,8)","(10,9)"}
{"(10,10)","(,)"}
(5 rows)
alter domain dcomptypea add constraint c1 check (value[1].r <= value[1].i);
alter domain dcomptypea add constraint c2 check (value[1].r > value[1].i); -- fail
ERROR: column "d1" of table "dcomptable" contains values that violate the new constraint
select array[row(2,1)]::dcomptypea; -- fail
ERROR: value for domain dcomptypea violates check constraint "c1"
insert into dcomptable values (array[row(1,2)]::comptype[]);
insert into dcomptable values (array[row(2,1)]::comptype[]); -- fail
ERROR: value for domain dcomptypea violates check constraint "c1"
insert into dcomptable (d1[1].r) values(99);
insert into dcomptable (d1[1].r, d1[1].i) values(99, 100);
insert into dcomptable (d1[1].r, d1[1].i) values(100, 99); -- fail
ERROR: value for domain dcomptypea violates check constraint "c1"
update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0; -- fail
ERROR: value for domain dcomptypea violates check constraint "c1"
update dcomptable set d1[1].r = d1[1].r - 1 where d1[1].i > 0;
select * from dcomptable;
d1
--------------------
{"(11,)","(,)"}
{"(99,)"}
{"(1,2)","(,)"}
{"(3,4)","(6,5)"}
{"(7,8)","(10,9)"}
{"(9,10)","(,)"}
{"(0,2)"}
{"(98,100)"}
(8 rows)
drop table dcomptable;
drop type comptype cascade;
NOTICE: drop cascades to type dcomptypea
-- Test not-null restrictions
create domain dnotnull varchar(15) NOT NULL; create domain dnotnull varchar(15) NOT NULL;
create domain dnull varchar(15); create domain dnull varchar(15);
create domain dcheck varchar(15) NOT NULL CHECK (VALUE = 'a' OR VALUE = 'c' OR VALUE = 'd'); create domain dcheck varchar(15) NOT NULL CHECK (VALUE = 'a' OR VALUE = 'c' OR VALUE = 'd');

View File

@ -85,6 +85,7 @@ INSERT INTO domarrtest values ('{2,2}', '{{"a","b"},{"c","d"},{"e","f"}}');
INSERT INTO domarrtest values ('{2,2}', '{{"a"},{"c"}}'); INSERT INTO domarrtest values ('{2,2}', '{{"a"},{"c"}}');
INSERT INTO domarrtest values (NULL, '{{"a","b","c"},{"d","e","f"}}'); INSERT INTO domarrtest values (NULL, '{{"a","b","c"},{"d","e","f"}}');
INSERT INTO domarrtest values (NULL, '{{"toolong","b","c"},{"d","e","f"}}'); INSERT INTO domarrtest values (NULL, '{{"toolong","b","c"},{"d","e","f"}}');
INSERT INTO domarrtest (testint4arr[1], testint4arr[3]) values (11,22);
select * from domarrtest; select * from domarrtest;
select testint4arr[1], testchar4arr[2:2] from domarrtest; select testint4arr[1], testchar4arr[2:2] from domarrtest;
select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest; select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest;
@ -100,6 +101,13 @@ COPY domarrtest FROM stdin; -- fail
select * from domarrtest; select * from domarrtest;
update domarrtest set
testint4arr[1] = testint4arr[1] + 1,
testint4arr[3] = testint4arr[3] - 1
where testchar4arr is null;
select * from domarrtest where testchar4arr is null;
drop table domarrtest; drop table domarrtest;
drop domain domainint4arr restrict; drop domain domainint4arr restrict;
drop domain domainchar4arr restrict; drop domain domainchar4arr restrict;
@ -111,6 +119,46 @@ select pg_typeof('{1,2,3}'::dia);
select pg_typeof('{1,2,3}'::dia || 42); -- should be int[] not dia select pg_typeof('{1,2,3}'::dia || 42); -- should be int[] not dia
drop domain dia; drop domain dia;
-- Test domains over arrays of composite
create type comptype as (r float8, i float8);
create domain dcomptypea as comptype[];
create table dcomptable (d1 dcomptypea unique);
insert into dcomptable values (array[row(1,2)]::dcomptypea);
insert into dcomptable values (array[row(3,4), row(5,6)]::comptype[]);
insert into dcomptable values (array[row(7,8)::comptype, row(9,10)::comptype]);
insert into dcomptable values (array[row(1,2)]::dcomptypea); -- fail on uniqueness
insert into dcomptable (d1[1]) values(row(9,10));
insert into dcomptable (d1[1].r) values(11);
select * from dcomptable;
select d1[2], d1[1].r, d1[1].i from dcomptable;
update dcomptable set d1[2] = row(d1[2].i, d1[2].r);
select * from dcomptable;
update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0;
select * from dcomptable;
alter domain dcomptypea add constraint c1 check (value[1].r <= value[1].i);
alter domain dcomptypea add constraint c2 check (value[1].r > value[1].i); -- fail
select array[row(2,1)]::dcomptypea; -- fail
insert into dcomptable values (array[row(1,2)]::comptype[]);
insert into dcomptable values (array[row(2,1)]::comptype[]); -- fail
insert into dcomptable (d1[1].r) values(99);
insert into dcomptable (d1[1].r, d1[1].i) values(99, 100);
insert into dcomptable (d1[1].r, d1[1].i) values(100, 99); -- fail
update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0; -- fail
update dcomptable set d1[1].r = d1[1].r - 1 where d1[1].i > 0;
select * from dcomptable;
drop table dcomptable;
drop type comptype cascade;
-- Test not-null restrictions
create domain dnotnull varchar(15) NOT NULL; create domain dnotnull varchar(15) NOT NULL;
create domain dnull varchar(15); create domain dnull varchar(15);
create domain dcheck varchar(15) NOT NULL CHECK (VALUE = 'a' OR VALUE = 'c' OR VALUE = 'd'); create domain dcheck varchar(15) NOT NULL CHECK (VALUE = 'a' OR VALUE = 'c' OR VALUE = 'd');