Allow for pg_upgrade of attributes with missing values

Commit 16828d5c02 neglected to do this, so upgraded databases would
silently get null instead of the specified default in rows without the
attribute defined.

A new binary upgrade function is provided to perform this and pg_dump is
adjusted to output a call to the function if required in binary upgrade
mode.

Also included is code to drop missing attribute values for dropped
columns. That way if the type is later dropped the missing value won't
have a dangling reference to the type.

Finally the regression tests are adjusted to ensure that there is a row
with a missing value so that this code is exercised in upgrade testing.

Catalog version unfortunately bumped.

Regression test changes from Tom Lane.
Remainder from me, reviewed by Tom Lane, Andres Freund, Alvaro Herrera

Discussion: https://postgr.es/m/19987.1529420110@sss.pgh.pa.us
This commit is contained in:
Andrew Dunstan 2018-06-22 08:42:36 -04:00
parent 9a994e37e0
commit 2448adf29c
9 changed files with 186 additions and 7 deletions

View File

@ -1613,6 +1613,29 @@ RemoveAttributeById(Oid relid, AttrNumber attnum)
"........pg.dropped.%d........", attnum);
namestrcpy(&(attStruct->attname), newattname);
/* clear the missing value if any */
if (attStruct->atthasmissing)
{
Datum valuesAtt[Natts_pg_attribute];
bool nullsAtt[Natts_pg_attribute];
bool replacesAtt[Natts_pg_attribute];
/* update the tuple - set atthasmissing and attmissingval */
MemSet(valuesAtt, 0, sizeof(valuesAtt));
MemSet(nullsAtt, false, sizeof(nullsAtt));
MemSet(replacesAtt, false, sizeof(replacesAtt));
valuesAtt[Anum_pg_attribute_atthasmissing - 1] =
BoolGetDatum(false);
replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
valuesAtt[Anum_pg_attribute_attmissingval - 1] = (Datum) 0;
nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
valuesAtt, nullsAtt, replacesAtt);
}
CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
}
@ -2001,6 +2024,63 @@ RelationClearMissing(Relation rel)
heap_close(attr_rel, RowExclusiveLock);
}
/*
* SetAttrMissing
*
* Set the missing value of a single attribute. This should only be used by
* binary upgrade. Takes an AccessExclusive lock on the relation owning the
* attribute.
*/
void
SetAttrMissing(Oid relid, char *attname, char *value)
{
Datum valuesAtt[Natts_pg_attribute];
bool nullsAtt[Natts_pg_attribute];
bool replacesAtt[Natts_pg_attribute];
Datum missingval;
Form_pg_attribute attStruct;
Relation attrrel,
tablerel;
HeapTuple atttup,
newtup;
/* lock the table the attribute belongs to */
tablerel = heap_open(relid, AccessExclusiveLock);
/* Lock the attribute row and get the data */
attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
atttup = SearchSysCacheAttName(relid, attname);
if (!HeapTupleIsValid(atttup))
elog(ERROR, "cache lookup failed for attribute %s of relation %u",
attname, relid);
attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
/* get an array value from the value string */
missingval = OidFunctionCall3(F_ARRAY_IN,
CStringGetDatum(value),
ObjectIdGetDatum(attStruct->atttypid),
Int32GetDatum(attStruct->atttypmod));
/* update the tuple - set atthasmissing and attmissingval */
MemSet(valuesAtt, 0, sizeof(valuesAtt));
MemSet(nullsAtt, false, sizeof(nullsAtt));
MemSet(replacesAtt, false, sizeof(replacesAtt));
valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
valuesAtt, nullsAtt, replacesAtt);
CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
/* clean up */
ReleaseSysCache(atttup);
heap_close(attrrel, RowExclusiveLock);
heap_close(tablerel, AccessExclusiveLock);
}
/*
* Store a default expression for column attnum of relation rel.
*

View File

@ -12,6 +12,7 @@
#include "postgres.h"
#include "catalog/binary_upgrade.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
@ -192,3 +193,18 @@ binary_upgrade_set_record_init_privs(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
Datum
binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
{
Oid table_id = PG_GETARG_OID(0);
text *attname = PG_GETARG_TEXT_P(1);
text *value = PG_GETARG_TEXT_P(2);
char *cattname = text_to_cstring(attname);
char *cvalue = text_to_cstring(value);
CHECK_IS_BINARY_UPGRADE;
SetAttrMissing(table_id, cattname, cvalue);
PG_RETURN_VOID();
}

View File

@ -8103,6 +8103,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
int i_attoptions;
int i_attcollation;
int i_attfdwoptions;
int i_attmissingval;
PGresult *res;
int ntups;
bool hasdefaults;
@ -8132,7 +8133,34 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
resetPQExpBuffer(q);
if (fout->remoteVersion >= 100000)
if (fout->remoteVersion >= 110000)
{
/* atthasmissing and attmissingval are new in 11 */
appendPQExpBuffer(q, "SELECT a.attnum, a.attname, a.atttypmod, "
"a.attstattarget, a.attstorage, t.typstorage, "
"a.attnotnull, a.atthasdef, a.attisdropped, "
"a.attlen, a.attalign, a.attislocal, "
"pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
"array_to_string(a.attoptions, ', ') AS attoptions, "
"CASE WHEN a.attcollation <> t.typcollation "
"THEN a.attcollation ELSE 0 END AS attcollation, "
"a.attidentity, "
"pg_catalog.array_to_string(ARRAY("
"SELECT pg_catalog.quote_ident(option_name) || "
"' ' || pg_catalog.quote_literal(option_value) "
"FROM pg_catalog.pg_options_to_table(attfdwoptions) "
"ORDER BY option_name"
"), E',\n ') AS attfdwoptions ,"
"CASE WHEN a.atthasmissing AND NOT a.attisdropped "
"THEN a.attmissingval ELSE null END AS attmissingval "
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid "
"WHERE a.attrelid = '%u'::pg_catalog.oid "
"AND a.attnum > 0::pg_catalog.int2 "
"ORDER BY a.attnum",
tbinfo->dobj.catId.oid);
}
else if (fout->remoteVersion >= 100000)
{
/*
* attidentity is new in version 10.
@ -8151,7 +8179,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
"' ' || pg_catalog.quote_literal(option_value) "
"FROM pg_catalog.pg_options_to_table(attfdwoptions) "
"ORDER BY option_name"
"), E',\n ') AS attfdwoptions "
"), E',\n ') AS attfdwoptions ,"
"NULL as attmissingval "
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid "
"WHERE a.attrelid = '%u'::pg_catalog.oid "
@ -8177,7 +8206,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
"' ' || pg_catalog.quote_literal(option_value) "
"FROM pg_catalog.pg_options_to_table(attfdwoptions) "
"ORDER BY option_name"
"), E',\n ') AS attfdwoptions "
"), E',\n ') AS attfdwoptions, "
"NULL as attmissingval "
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid "
"WHERE a.attrelid = '%u'::pg_catalog.oid "
@ -8201,7 +8231,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
"array_to_string(a.attoptions, ', ') AS attoptions, "
"CASE WHEN a.attcollation <> t.typcollation "
"THEN a.attcollation ELSE 0 END AS attcollation, "
"NULL AS attfdwoptions "
"NULL AS attfdwoptions, "
"NULL as attmissingval "
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid "
"WHERE a.attrelid = '%u'::pg_catalog.oid "
@ -8219,7 +8250,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
"pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
"array_to_string(a.attoptions, ', ') AS attoptions, "
"0 AS attcollation, "
"NULL AS attfdwoptions "
"NULL AS attfdwoptions, "
"NULL as attmissingval "
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid "
"WHERE a.attrelid = '%u'::pg_catalog.oid "
@ -8236,7 +8268,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
"a.attlen, a.attalign, a.attislocal, "
"pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
"'' AS attoptions, 0 AS attcollation, "
"NULL AS attfdwoptions "
"NULL AS attfdwoptions, "
"NULL as attmissingval "
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid "
"WHERE a.attrelid = '%u'::pg_catalog.oid "
@ -8266,6 +8299,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
i_attoptions = PQfnumber(res, "attoptions");
i_attcollation = PQfnumber(res, "attcollation");
i_attfdwoptions = PQfnumber(res, "attfdwoptions");
i_attmissingval = PQfnumber(res, "attmissingval");
tbinfo->numatts = ntups;
tbinfo->attnames = (char **) pg_malloc(ntups * sizeof(char *));
@ -8282,6 +8316,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
tbinfo->attoptions = (char **) pg_malloc(ntups * sizeof(char *));
tbinfo->attcollation = (Oid *) pg_malloc(ntups * sizeof(Oid));
tbinfo->attfdwoptions = (char **) pg_malloc(ntups * sizeof(char *));
tbinfo->attmissingval = (char **) pg_malloc(ntups * sizeof(char *));
tbinfo->notnull = (bool *) pg_malloc(ntups * sizeof(bool));
tbinfo->inhNotNull = (bool *) pg_malloc(ntups * sizeof(bool));
tbinfo->attrdefs = (AttrDefInfo **) pg_malloc(ntups * sizeof(AttrDefInfo *));
@ -8309,6 +8344,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
tbinfo->attoptions[j] = pg_strdup(PQgetvalue(res, j, i_attoptions));
tbinfo->attcollation[j] = atooid(PQgetvalue(res, j, i_attcollation));
tbinfo->attfdwoptions[j] = pg_strdup(PQgetvalue(res, j, i_attfdwoptions));
tbinfo->attmissingval[j] = pg_strdup(PQgetvalue(res, j, i_attmissingval));
tbinfo->attrdefs[j] = NULL; /* fix below */
if (PQgetvalue(res, j, i_atthasdef)[0] == 't')
hasdefaults = true;
@ -15658,6 +15694,29 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
else
appendPQExpBufferStr(q, ";\n");
/*
* in binary upgrade mode, update the catalog with any missing values
* that might be present.
*/
if (dopt->binary_upgrade)
{
for (j = 0; j < tbinfo->numatts; j++)
{
if (tbinfo->attmissingval[j][0] != '\0')
{
appendPQExpBufferStr(q, "\n-- set missing value.\n");
appendPQExpBufferStr(q,
"SELECT pg_catalog.binary_upgrade_set_missing_value(");
appendStringLiteralAH(q, qualrelname, fout);
appendPQExpBufferStr(q, "::pg_catalog.regclass,");
appendStringLiteralAH(q, tbinfo->attnames[j], fout);
appendPQExpBufferStr(q, ",");
appendStringLiteralAH(q, tbinfo->attmissingval[j], fout);
appendPQExpBufferStr(q, ");\n\n");
}
}
}
/*
* To create binary-compatible heap files, we have to ensure the same
* physical column order, including dropped columns, as in the

View File

@ -316,6 +316,7 @@ typedef struct _tableInfo
char **attoptions; /* per-attribute options */
Oid *attcollation; /* per-attribute collation selection */
char **attfdwoptions; /* per-attribute fdw options */
char **attmissingval; /* per attribute missing value */
bool *notnull; /* NOT NULL constraints on attributes */
bool *inhNotNull; /* true if NOT NULL is inherited */
struct _attrDefInfo **attrdefs; /* DEFAULT expressions */

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201804191
#define CATALOG_VERSION_NO 201806221
#endif

View File

@ -105,6 +105,7 @@ extern List *AddRelationNewConstraints(Relation rel,
bool is_internal);
extern void RelationClearMissing(Relation rel);
extern void SetAttrMissing(Oid relid, char *attname, char *value);
extern Oid StoreAttrDefault(Relation rel, AttrNumber attnum,
Node *expr, bool is_internal,

View File

@ -10037,6 +10037,10 @@
proname => 'binary_upgrade_set_record_init_privs', provolatile => 'v',
proparallel => 'r', prorettype => 'void', proargtypes => 'bool',
prosrc => 'binary_upgrade_set_record_init_privs' },
{ oid => '4101', descr => 'for use by pg_upgrade',
proname => 'binary_upgrade_set_missing_value', provolatile => 'v',
proparallel => 'r', prorettype => 'void', proargtypes => 'oid text text',
prosrc => 'binary_upgrade_set_missing_value' },
# replication/origin.h
{ oid => '6003', descr => 'create a replication origin',

View File

@ -548,3 +548,14 @@ DROP TABLE has_volatile;
DROP EVENT TRIGGER has_volatile_rewrite;
DROP FUNCTION log_rewrite;
DROP SCHEMA fast_default;
-- Leave a table with an active fast default in place, for pg_upgrade testing
set search_path = public;
create table has_fast_default(f1 int);
insert into has_fast_default values(1);
alter table has_fast_default add column f2 int default 42;
table has_fast_default;
f1 | f2
----+----
1 | 42
(1 row)

View File

@ -369,3 +369,10 @@ DROP TABLE has_volatile;
DROP EVENT TRIGGER has_volatile_rewrite;
DROP FUNCTION log_rewrite;
DROP SCHEMA fast_default;
-- Leave a table with an active fast default in place, for pg_upgrade testing
set search_path = public;
create table has_fast_default(f1 int);
insert into has_fast_default values(1);
alter table has_fast_default add column f2 int default 42;
table has_fast_default;