Fix dblink_build_sql_insert() and related functions to handle dropped

columns correctly.  In passing, get rid of some dead logic in the
underlying get_sql_insert() etc functions --- there is no caller that
will pass null value-arrays to them.

Per bug report from Robert Voinea.
This commit is contained in:
Tom Lane 2010-06-15 19:04:15 +00:00
parent 4a96908575
commit 3b3706d2cf
3 changed files with 102 additions and 33 deletions

View File

@ -8,7 +8,7 @@
* Darko Prenosil <Darko.Prenosil@finteh.hr> * Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in> * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
* *
* $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.96 2010/06/15 16:22:19 tgl Exp $ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.97 2010/06/15 19:04:15 tgl Exp $
* Copyright (c) 2001-2010, PostgreSQL Global Development Group * Copyright (c) 2001-2010, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED; * ALL RIGHTS RESERVED;
* *
@ -1741,7 +1741,7 @@ get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals
appendStringInfo(&buf, ") VALUES("); appendStringInfo(&buf, ") VALUES(");
/* /*
* remember attvals are 1 based * Note: i is physical column number (counting from 0).
*/ */
needComma = false; needComma = false;
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
@ -1752,12 +1752,9 @@ get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals
if (needComma) if (needComma)
appendStringInfo(&buf, ","); appendStringInfo(&buf, ",");
if (tgt_pkattvals != NULL) key = get_attnum_pk_pos(pkattnums, pknumatts, i);
key = get_attnum_pk_pos(pkattnums, pknumatts, i);
else
key = -1;
if (key > -1) if (key >= 0)
val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL; val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
else else
val = SPI_getvalue(tuple, tupdesc, i + 1); val = SPI_getvalue(tuple, tupdesc, i + 1);
@ -1802,10 +1799,6 @@ get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals
appendStringInfoString(&buf, appendStringInfoString(&buf,
quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname))); quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
if (tgt_pkattvals == NULL)
/* internal error */
elog(ERROR, "target key array must not be NULL");
if (tgt_pkattvals[i] != NULL) if (tgt_pkattvals[i] != NULL)
appendStringInfo(&buf, " = %s", appendStringInfo(&buf, " = %s",
quote_literal_cstr(tgt_pkattvals[i])); quote_literal_cstr(tgt_pkattvals[i]));
@ -1845,6 +1838,9 @@ get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals
appendStringInfo(&buf, "UPDATE %s SET ", relname); appendStringInfo(&buf, "UPDATE %s SET ", relname);
/*
* Note: i is physical column number (counting from 0).
*/
needComma = false; needComma = false;
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
{ {
@ -1857,12 +1853,9 @@ get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals
appendStringInfo(&buf, "%s = ", appendStringInfo(&buf, "%s = ",
quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname))); quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
if (tgt_pkattvals != NULL) key = get_attnum_pk_pos(pkattnums, pknumatts, i);
key = get_attnum_pk_pos(pkattnums, pknumatts, i);
else
key = -1;
if (key > -1) if (key >= 0)
val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL; val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
else else
val = SPI_getvalue(tuple, tupdesc, i + 1); val = SPI_getvalue(tuple, tupdesc, i + 1);
@ -1889,16 +1882,10 @@ get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals
appendStringInfo(&buf, "%s", appendStringInfo(&buf, "%s",
quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname))); quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
if (tgt_pkattvals != NULL) val = tgt_pkattvals[i];
val = tgt_pkattvals[i] ? pstrdup(tgt_pkattvals[i]) : NULL;
else
val = SPI_getvalue(tuple, tupdesc, pkattnum + 1);
if (val != NULL) if (val != NULL)
{
appendStringInfo(&buf, " = %s", quote_literal_cstr(val)); appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
pfree(val);
}
else else
appendStringInfo(&buf, " IS NULL"); appendStringInfo(&buf, " IS NULL");
} }
@ -1964,18 +1951,12 @@ get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pk
{ {
char *relname; char *relname;
TupleDesc tupdesc; TupleDesc tupdesc;
int natts;
StringInfoData buf; StringInfoData buf;
int ret; int ret;
HeapTuple tuple; HeapTuple tuple;
int i; int i;
initStringInfo(&buf);
/* get relation name including any needed schema prefix and quoting */
relname = generate_relation_name(rel);
tupdesc = rel->rd_att;
/* /*
* Connect to SPI manager * Connect to SPI manager
*/ */
@ -1983,11 +1964,36 @@ get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pk
/* internal error */ /* internal error */
elog(ERROR, "SPI connect failure - returned %d", ret); elog(ERROR, "SPI connect failure - returned %d", ret);
initStringInfo(&buf);
/* get relation name including any needed schema prefix and quoting */
relname = generate_relation_name(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
/* /*
* Build sql statement to look up tuple of interest Use src_pkattvals as * Build sql statement to look up tuple of interest, ie, the one matching
* the criteria. * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
* generate a result tuple that matches the table's physical structure,
* with NULLs for any dropped columns. Otherwise we have to deal with
* two different tupdescs and everything's very confusing.
*/ */
appendStringInfo(&buf, "SELECT * FROM %s WHERE ", relname); appendStringInfoString(&buf, "SELECT ");
for (i = 0; i < natts; i++)
{
if (i > 0)
appendStringInfoString(&buf, ", ");
if (tupdesc->attrs[i]->attisdropped)
appendStringInfoString(&buf, "NULL");
else
appendStringInfoString(&buf,
quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
}
appendStringInfo(&buf, " FROM %s WHERE ", relname);
for (i = 0; i < pknumatts; i++) for (i = 0; i < pknumatts; i++)
{ {

View File

@ -887,3 +887,40 @@ SELECT dblink_disconnect();
OK OK
(1 row) (1 row)
-- test dropped columns in dblink_build_sql_insert, dblink_build_sql_update
CREATE TEMP TABLE test_dropped
(
col1 INT NOT NULL DEFAULT 111,
id SERIAL PRIMARY KEY,
col2 INT NOT NULL DEFAULT 112,
col2b INT NOT NULL DEFAULT 113
);
NOTICE: CREATE TABLE will create implicit sequence "test_dropped_id_seq" for serial column "test_dropped.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "test_dropped_pkey" for table "test_dropped"
INSERT INTO test_dropped VALUES(default);
ALTER TABLE test_dropped
DROP COLUMN col1,
DROP COLUMN col2,
ADD COLUMN col3 VARCHAR(10) NOT NULL DEFAULT 'foo',
ADD COLUMN col4 INT NOT NULL DEFAULT 42;
SELECT dblink_build_sql_insert('test_dropped', '2', 1,
ARRAY['1'::TEXT], ARRAY['2'::TEXT]);
dblink_build_sql_insert
---------------------------------------------------------------------------
INSERT INTO test_dropped(id,col2b,col3,col4) VALUES('2','113','foo','42')
(1 row)
SELECT dblink_build_sql_update('test_dropped', '2', 1,
ARRAY['1'::TEXT], ARRAY['2'::TEXT]);
dblink_build_sql_update
-------------------------------------------------------------------------------------------
UPDATE test_dropped SET id = '2', col2b = '113', col3 = 'foo', col4 = '42' WHERE id = '2'
(1 row)
SELECT dblink_build_sql_delete('test_dropped', '2', 1,
ARRAY['2'::TEXT]);
dblink_build_sql_delete
-----------------------------------------
DELETE FROM test_dropped WHERE id = '2'
(1 row)

View File

@ -412,3 +412,29 @@ SELECT notify_name, be_pid = (select t.be_pid from dblink('select pg_backend_pid
SELECT * from dblink_get_notify(); SELECT * from dblink_get_notify();
SELECT dblink_disconnect(); SELECT dblink_disconnect();
-- test dropped columns in dblink_build_sql_insert, dblink_build_sql_update
CREATE TEMP TABLE test_dropped
(
col1 INT NOT NULL DEFAULT 111,
id SERIAL PRIMARY KEY,
col2 INT NOT NULL DEFAULT 112,
col2b INT NOT NULL DEFAULT 113
);
INSERT INTO test_dropped VALUES(default);
ALTER TABLE test_dropped
DROP COLUMN col1,
DROP COLUMN col2,
ADD COLUMN col3 VARCHAR(10) NOT NULL DEFAULT 'foo',
ADD COLUMN col4 INT NOT NULL DEFAULT 42;
SELECT dblink_build_sql_insert('test_dropped', '2', 1,
ARRAY['1'::TEXT], ARRAY['2'::TEXT]);
SELECT dblink_build_sql_update('test_dropped', '2', 1,
ARRAY['1'::TEXT], ARRAY['2'::TEXT]);
SELECT dblink_build_sql_delete('test_dropped', '2', 1,
ARRAY['2'::TEXT]);