Allow specifying column lists for logical replication

This allows specifying an optional column list when adding a table to
logical replication. The column list may be specified after the table
name, enclosed in parentheses. Columns not included in this list are not
sent to the subscriber, allowing the schema on the subscriber to be a
subset of the publisher schema.

For UPDATE/DELETE publications, the column list needs to cover all
REPLICA IDENTITY columns. For INSERT publications, the column list is
arbitrary and may omit some REPLICA IDENTITY columns. Furthermore, if
the table uses REPLICA IDENTITY FULL, column list is not allowed.

The column list can contain only simple column references. Complex
expressions, function calls etc. are not allowed. This restriction could
be relaxed in the future.

During the initial table synchronization, only columns included in the
column list are copied to the subscriber. If the subscription has
several publications, containing the same table with different column
lists, columns specified in any of the lists will be copied.

This means all columns are replicated if the table has no column list
at all (which is treated as column list with all columns), or when of
the publications is defined as FOR ALL TABLES (possibly IN SCHEMA that
matches the schema of the table).

For partitioned tables, publish_via_partition_root determines whether
the column list for the root or the leaf relation will be used. If the
parameter is 'false' (the default), the list defined for the leaf
relation is used. Otherwise, the column list for the root partition
will be used.

Psql commands \dRp+ and \d <table-name> now display any column lists.

Author: Tomas Vondra, Alvaro Herrera, Rahila Syed
Reviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed,
Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yu
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
This commit is contained in:
Tomas Vondra 2022-03-26 00:45:21 +01:00
parent 05843b1aa4
commit 923def9a53
26 changed files with 2833 additions and 92 deletions

View File

@ -4410,7 +4410,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
</para>
<para>
This is an array of <structfield>indnatts</structfield> values that
indicate which table columns this index indexes. For example a value
indicate which table columns this index indexes. For example, a value
of <literal>1 3</literal> would mean that the first and the third table
columns make up the index entries. Key columns come before non-key
(included) columns. A zero in this array indicates that the
@ -6291,6 +6291,19 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
Reference to schema
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>prattrs</structfield> <type>int2vector</type>
(references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attnum</structfield>)
</para>
<para>
This is an array of values that indicates which table columns are
part of the publication. For example, a value of <literal>1 3</literal>
would mean that the first and the third table columns are published.
A null value indicates that all columns are published.
</para></entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -7016,7 +7016,8 @@ Relation
</listitem>
</varlistentry>
</variablelist>
Next, the following message part appears for each column (except generated columns):
Next, the following message part appears for each column included in
the publication (except generated columns):
<variablelist>
<varlistentry>
<term>

View File

@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
@ -114,6 +114,14 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
</para>
<para>
Optionally, a column list can be specified. See <xref
linkend="sql-createpublication"/> for details.
</para>
<para>
If the optional <literal>WHERE</literal> clause is specified, rows for
which the <replaceable class="parameter">expression</replaceable>
evaluates to false or null will not be published. Note that parentheses
@ -185,7 +193,13 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
<para>
Add some tables to the publication:
<programlisting>
ALTER PUBLICATION mypublication ADD TABLE users, departments;
ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
</programlisting></para>
<para>
Change the set of columns published for a table:
<programlisting>
ALTER PUBLICATION mypublication SET TABLE users (user_id, firstname, lastname), TABLE departments;
</programlisting></para>
<para>

View File

@ -33,7 +33,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable> [, ... ] ) ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
@ -93,6 +93,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<literal>TRUNCATE</literal> commands.
</para>
<para>
When a column list is specified, only the named columns are replicated.
If no column list is specified, all columns of the table are replicated
through this publication, including any columns added later. If a column
list is specified, it must include the replica identity columns.
</para>
<para>
Only persistent base tables and partitioned tables can be part of a
publication. Temporary tables, unlogged tables, foreign tables,
@ -348,6 +355,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABL
<structname>sales</structname>:
<programlisting>
CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales;
</programlisting></para>
<para>
Create a publication that publishes all changes for table <structname>users</structname>,
but replicates only columns <structname>user_id</structname> and
<structname>firstname</structname>:
<programlisting>
CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname);
</programlisting></para>
</refsect1>

View File

@ -45,6 +45,9 @@
#include "utils/rel.h"
#include "utils/syscache.h"
static void publication_translate_columns(Relation targetrel, List *columns,
int *natts, AttrNumber **attrs);
/*
* Check if relation can be in given publication and throws appropriate
* error if not.
@ -395,6 +398,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
Oid relid = RelationGetRelid(targetrel);
Oid pubreloid;
Publication *pub = GetPublication(pubid);
AttrNumber *attarray = NULL;
int natts = 0;
ObjectAddress myself,
referenced;
List *relids = NIL;
@ -422,6 +427,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
check_publication_add_relation(targetrel);
/*
* Translate column names to attnums and make sure the column list contains
* only allowed elements (no system or generated columns etc.). Also build
* an array of attnums, for storing in the catalog.
*/
publication_translate_columns(pri->relation, pri->columns,
&natts, &attarray);
/* Form a tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
@ -440,6 +453,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
else
nulls[Anum_pg_publication_rel_prqual - 1] = true;
/* Add column list, if available */
if (pri->columns)
values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
else
nulls[Anum_pg_publication_rel_prattrs - 1] = true;
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
@ -463,6 +482,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
false);
/* Add dependency on the columns, if any are listed */
for (int i = 0; i < natts; i++)
{
ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Close the table. */
table_close(rel, RowExclusiveLock);
@ -482,6 +508,125 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
return myself;
}
/* qsort comparator for attnums */
static int
compare_int16(const void *a, const void *b)
{
int av = *(const int16 *) a;
int bv = *(const int16 *) b;
/* this can't overflow if int is wider than int16 */
return (av - bv);
}
/*
* Translate a list of column names to an array of attribute numbers
* and a Bitmapset with them; verify that each attribute is appropriate
* to have in a publication column list (no system or generated attributes,
* no duplicates). Additional checks with replica identity are done later;
* see check_publication_columns.
*
* Note that the attribute numbers are *not* offset by
* FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
* is okay.
*/
static void
publication_translate_columns(Relation targetrel, List *columns,
int *natts, AttrNumber **attrs)
{
AttrNumber *attarray = NULL;
Bitmapset *set = NULL;
ListCell *lc;
int n = 0;
TupleDesc tupdesc = RelationGetDescr(targetrel);
/* Bail out when no column list defined. */
if (!columns)
return;
/*
* Translate list of columns to attnums. We prohibit system attributes and
* make sure there are no duplicate columns.
*/
attarray = palloc(sizeof(AttrNumber) * list_length(columns));
foreach(lc, columns)
{
char *colname = strVal(lfirst(lc));
AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
colname, RelationGetRelationName(targetrel)));
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot reference system column \"%s\" in publication column list",
colname));
if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
ereport(ERROR,
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot reference generated column \"%s\" in publication column list",
colname));
if (bms_is_member(attnum, set))
ereport(ERROR,
errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("duplicate column \"%s\" in publication column list",
colname));
set = bms_add_member(set, attnum);
attarray[n++] = attnum;
}
/* Be tidy, so that the catalog representation is always sorted */
qsort(attarray, n, sizeof(AttrNumber), compare_int16);
*natts = n;
*attrs = attarray;
bms_free(set);
}
/*
* Transform the column list (represented by an array) to a bitmapset.
*/
Bitmapset *
pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
{
Bitmapset *result = NULL;
ArrayType *arr;
int nelems;
int16 *elems;
MemoryContext oldcxt;
/*
* If an existing bitmap was provided, use it. Otherwise just use NULL
* and build a new bitmap.
*/
if (columns)
result = columns;
arr = DatumGetArrayTypeP(pubcols);
nelems = ARR_DIMS(arr)[0];
elems = (int16 *) ARR_DATA_PTR(arr);
/* If a memory context was specified, switch to it. */
if (mcxt)
oldcxt = MemoryContextSwitchTo(mcxt);
for (int i = 0; i < nelems; i++)
result = bms_add_member(result, elems[i]);
if (mcxt)
MemoryContextSwitchTo(oldcxt);
return result;
}
/*
* Insert new publication / schema mapping.
*/

View File

@ -342,7 +342,7 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
* Returns true if any invalid column is found.
*/
bool
contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
bool pubviaroot)
{
HeapTuple rftuple;
@ -411,6 +411,114 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
return result;
}
/*
* Check if all columns referenced in the REPLICA IDENTITY are covered by
* the column list.
*
* Returns true if any replica identity column is not covered by column list.
*/
bool
pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
bool pubviaroot)
{
HeapTuple tuple;
Oid relid = RelationGetRelid(relation);
Oid publish_as_relid = RelationGetRelid(relation);
bool result = false;
Datum datum;
bool isnull;
/*
* For a partition, if pubviaroot is true, find the topmost ancestor that
* is published via this publication as we need to use its column list
* for the changes.
*
* Note that even though the column list used is for an ancestor, the
* REPLICA IDENTITY used will be for the actual child table.
*/
if (pubviaroot && relation->rd_rel->relispartition)
{
publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
if (!OidIsValid(publish_as_relid))
publish_as_relid = relid;
}
tuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(publish_as_relid),
ObjectIdGetDatum(pubid));
if (!HeapTupleIsValid(tuple))
return false;
datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
Anum_pg_publication_rel_prattrs,
&isnull);
if (!isnull)
{
int x;
Bitmapset *idattrs;
Bitmapset *columns = NULL;
/* With REPLICA IDENTITY FULL, no column list is allowed. */
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
result = true;
/* Transform the column list datum to a bitmapset. */
columns = pub_collist_to_bitmapset(NULL, datum, NULL);
/* Remember columns that are part of the REPLICA IDENTITY */
idattrs = RelationGetIndexAttrBitmap(relation,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/*
* Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
* offset (to handle system columns the usual way), while column list
* does not use offset, so we can't do bms_is_subset(). Instead, we have
* to loop over the idattrs and check all of them are in the list.
*/
x = -1;
while ((x = bms_next_member(idattrs, x)) >= 0)
{
AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
/*
* If pubviaroot is true, we are validating the column list of the
* parent table, but the bitmap contains the replica identity
* information of the child table. The parent/child attnums may not
* match, so translate them to the parent - get the attname from
* the child, and look it up in the parent.
*/
if (pubviaroot)
{
/* attribute name in the child table */
char *colname = get_attname(relid, attnum, false);
/*
* Determine the attnum for the attribute name in parent (we
* are using the column list defined on the parent).
*/
attnum = get_attnum(publish_as_relid, colname);
}
/* replica identity column, not covered by the column list */
if (!bms_is_member(attnum, columns))
{
result = true;
break;
}
}
bms_free(idattrs);
bms_free(columns);
}
ReleaseSysCache(tuple);
return result;
}
/* check_functions_in_node callback */
static bool
contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@ -652,6 +760,39 @@ TransformPubWhereClauses(List *tables, const char *queryString,
}
}
/*
* Check the publication column lists expression for all relations in the list.
*/
static void
CheckPubRelationColumnList(List *tables, const char *queryString,
bool pubviaroot)
{
ListCell *lc;
foreach(lc, tables)
{
PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
if (pri->columns == NIL)
continue;
/*
* If the publication doesn't publish changes via the root partitioned
* table, the partition's column list will be used. So disallow using
* the column list on partitioned table in this case.
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use publication column list for relation \"%s\"",
RelationGetRelationName(pri->relation)),
errdetail("column list cannot be used for a partitioned table when %s is false.",
"publish_via_partition_root")));
}
}
/*
* Create new publication.
*/
@ -808,6 +949,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
TransformPubWhereClauses(rels, pstate->p_sourcetext,
publish_via_partition_root);
CheckPubRelationColumnList(rels, pstate->p_sourcetext,
publish_via_partition_root);
PublicationAddRelations(puboid, rels, true, NULL);
CloseRelationList(rels);
}
@ -895,8 +1039,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
/*
* If the publication doesn't publish changes via the root partitioned
* table, the partition's row filter will be used. So disallow using WHERE
* clause on partitioned table in this case.
* table, the partition's row filter and column list will be used. So disallow
* using WHERE clause and column lists on partitioned table in this case.
*/
if (!pubform->puballtables && publish_via_partition_root_given &&
!publish_via_partition_root)
@ -904,7 +1048,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
/*
* Lock the publication so nobody else can do anything with it. This
* prevents concurrent alter to add partitioned table(s) with WHERE
* clause(s) which we don't allow when not publishing via root.
* clause(s) and/or column lists which we don't allow when not
* publishing via root.
*/
LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
AccessShareLock);
@ -917,13 +1062,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
{
HeapTuple rftuple;
Oid relid = lfirst_oid(lc);
bool has_column_list;
bool has_row_filter;
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubform->oid));
has_row_filter
= !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
has_column_list
= !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
if (HeapTupleIsValid(rftuple) &&
!heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL))
(has_row_filter || has_column_list))
{
HeapTuple tuple;
@ -932,7 +1085,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
{
Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple);
if (relform->relkind == RELKIND_PARTITIONED_TABLE)
if ((relform->relkind == RELKIND_PARTITIONED_TABLE) &&
has_row_filter)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot set %s for publication \"%s\"",
@ -943,6 +1097,18 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
NameStr(relform->relname),
"publish_via_partition_root")));
if ((relform->relkind == RELKIND_PARTITIONED_TABLE) &&
has_column_list)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot set %s for publication \"%s\"",
"publish_via_partition_root = false",
stmt->pubname),
errdetail("The publication contains a column list for a partitioned table \"%s\" "
"which is not allowed when %s is false.",
NameStr(relform->relname),
"publish_via_partition_root")));
ReleaseSysCache(tuple);
}
@ -1107,6 +1273,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
PublicationAddRelations(pubid, rels, false, stmt);
}
else if (stmt->action == AP_DropObjects)
@ -1124,6 +1292,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
/*
* To recreate the relation list for the publication, look for
* existing relations that do not need to be dropped.
@ -1135,42 +1305,79 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
PublicationRelInfo *oldrel;
bool found = false;
HeapTuple rftuple;
bool rfisnull = true;
Node *oldrelwhereclause = NULL;
Bitmapset *oldcolumns = NULL;
/* look up the cache for the old relmap */
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(oldrelid),
ObjectIdGetDatum(pubid));
/*
* See if the existing relation currently has a WHERE clause or a
* column list. We need to compare those too.
*/
if (HeapTupleIsValid(rftuple))
{
bool isnull = true;
Datum whereClauseDatum;
Datum columnListDatum;
/* Load the WHERE clause for this table. */
whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
Anum_pg_publication_rel_prqual,
&rfisnull);
if (!rfisnull)
&isnull);
if (!isnull)
oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
/* Transform the int2vector column list to a bitmap. */
columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
Anum_pg_publication_rel_prattrs,
&isnull);
if (!isnull)
oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
ReleaseSysCache(rftuple);
}
foreach(newlc, rels)
{
PublicationRelInfo *newpubrel;
Oid newrelid;
Bitmapset *newcolumns = NULL;
newpubrel = (PublicationRelInfo *) lfirst(newlc);
newrelid = RelationGetRelid(newpubrel->relation);
/*
* If the new publication has column list, transform it to
* a bitmap too.
*/
if (newpubrel->columns)
{
ListCell *lc;
foreach(lc, newpubrel->columns)
{
char *colname = strVal(lfirst(lc));
AttrNumber attnum = get_attnum(newrelid, colname);
newcolumns = bms_add_member(newcolumns, attnum);
}
}
/*
* Check if any of the new set of relations matches with the
* existing relations in the publication. Additionally, if the
* relation has an associated WHERE clause, check the WHERE
* expressions also match. Drop the rest.
* expressions also match. Same for the column list. Drop the
* rest.
*/
if (RelationGetRelid(newpubrel->relation) == oldrelid)
{
if (equal(oldrelwhereclause, newpubrel->whereClause))
if (equal(oldrelwhereclause, newpubrel->whereClause) &&
bms_equal(oldcolumns, newcolumns))
{
found = true;
break;
@ -1186,6 +1393,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
{
oldrel = palloc(sizeof(PublicationRelInfo));
oldrel->whereClause = NULL;
oldrel->columns = NIL;
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);
delrels = lappend(delrels, oldrel);
@ -1401,6 +1609,7 @@ AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup,
{
oldrel = palloc(sizeof(PublicationRelInfo));
oldrel->whereClause = NULL;
oldrel->columns = NULL;
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);
delrels = lappend(delrels, oldrel);
@ -1660,6 +1869,7 @@ OpenRelationList(List *rels, char objectType)
List *result = NIL;
ListCell *lc;
List *relids_with_rf = NIL;
List *relids_with_collist = NIL;
/*
* Open, share-lock, and check all the explicitly-specified relations
@ -1710,6 +1920,13 @@ OpenRelationList(List *rels, char objectType)
errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
RelationGetRelationName(rel))));
/* Disallow duplicate tables if there are any with column lists. */
if (t->columns || list_member_oid(relids_with_collist, myrelid))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("conflicting or redundant column lists for table \"%s\"",
RelationGetRelationName(rel))));
table_close(rel, ShareUpdateExclusiveLock);
continue;
}
@ -1717,12 +1934,16 @@ OpenRelationList(List *rels, char objectType)
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
pub_rel->whereClause = t->whereClause;
pub_rel->columns = t->columns;
result = lappend(result, pub_rel);
relids = lappend_oid(relids, myrelid);
if (t->whereClause)
relids_with_rf = lappend_oid(relids_with_rf, myrelid);
if (t->columns)
relids_with_collist = lappend_oid(relids_with_collist, myrelid);
/*
* Add children of this rel, if requested, so that they too are added
* to the publication. A partitioned table can't have any inheritance
@ -1762,6 +1983,18 @@ OpenRelationList(List *rels, char objectType)
errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
RelationGetRelationName(rel))));
/*
* We don't allow to specify column list for both parent
* and child table at the same time as it is not very
* clear which one should be given preference.
*/
if (childrelid != myrelid &&
(t->columns || list_member_oid(relids_with_collist, childrelid)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("conflicting or redundant column lists for table \"%s\"",
RelationGetRelationName(rel))));
continue;
}
@ -1771,11 +2004,16 @@ OpenRelationList(List *rels, char objectType)
pub_rel->relation = rel;
/* child inherits WHERE clause from parent */
pub_rel->whereClause = t->whereClause;
/* child inherits column list from parent */
pub_rel->columns = t->columns;
result = lappend(result, pub_rel);
relids = lappend_oid(relids, childrelid);
if (t->whereClause)
relids_with_rf = lappend_oid(relids_with_rf, childrelid);
if (t->columns)
relids_with_collist = lappend_oid(relids_with_collist, childrelid);
}
}
}
@ -1884,6 +2122,11 @@ PublicationDropRelations(Oid pubid, List *rels, bool missing_ok)
Relation rel = pubrel->relation;
Oid relid = RelationGetRelid(rel);
if (pubrel->columns)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubid));

View File

@ -574,9 +574,6 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
return;
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
return;
/*
* It is only safe to execute UPDATE/DELETE when all columns, referenced
* in the row filters from publications which the relation is in, are
@ -596,17 +593,33 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
errmsg("cannot update table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot update table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column list used by the publication does not cover the replica identity.")));
else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot delete from table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot delete from table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column list used by the publication does not cover the replica identity.")));
/* If relation has replica identity we are always good. */
if (OidIsValid(RelationGetReplicaIndex(rel)))
return;
/* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
return;
/*
* This is UPDATE/DELETE and there is no replica identity.
*

View File

@ -4850,6 +4850,7 @@ _copyPublicationTable(const PublicationTable *from)
COPY_NODE_FIELD(relation);
COPY_NODE_FIELD(whereClause);
COPY_NODE_FIELD(columns);
return newnode;
}

View File

@ -2322,6 +2322,7 @@ _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
COMPARE_NODE_FIELD(whereClause);
COMPARE_NODE_FIELD(columns);
return true;
}

View File

@ -9749,13 +9749,14 @@ CreatePublicationStmt:
* relation_expr here.
*/
PublicationObjSpec:
TABLE relation_expr OptWhereClause
TABLE relation_expr opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_TABLE;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $2;
$$->pubtable->whereClause = $3;
$$->pubtable->columns = $3;
$$->pubtable->whereClause = $4;
}
| ALL TABLES IN_P SCHEMA ColId
{
@ -9790,11 +9791,15 @@ PublicationObjSpec:
$$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA;
$$->location = @5;
}
| ColId OptWhereClause
| ColId opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
if ($2)
/*
* If either a row filter or column list is specified, create
* a PublicationTable object.
*/
if ($2 || $3)
{
/*
* The OptWhereClause must be stored here but it is
@ -9804,7 +9809,8 @@ PublicationObjSpec:
*/
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
$$->pubtable->whereClause = $2;
$$->pubtable->columns = $2;
$$->pubtable->whereClause = $3;
}
else
{
@ -9812,23 +9818,25 @@ PublicationObjSpec:
}
$$->location = @1;
}
| ColId indirection OptWhereClause
| ColId indirection opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
$$->pubtable->whereClause = $3;
$$->pubtable->columns = $3;
$$->pubtable->whereClause = $4;
$$->location = @1;
}
/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
| extended_relation_expr OptWhereClause
| extended_relation_expr opt_column_list OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $1;
$$->pubtable->whereClause = $2;
$$->pubtable->columns = $2;
$$->pubtable->whereClause = $3;
}
| CURRENT_SCHEMA
{
@ -17524,6 +17532,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
errmsg("WHERE clause not allowed for schema"),
parser_errposition(pubobj->location));
/* Column list is not allowed on a schema object */
if (pubobj->pubtable && pubobj->pubtable->columns)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("column specification not allowed for schema"),
parser_errposition(pubobj->location));
/*
* We can distinguish between the different type of schema
* objects based on whether name and pubtable is set.

View File

@ -29,16 +29,30 @@
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_attrs(StringInfo out, Relation rel,
Bitmapset *columns);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
TupleTableSlot *slot,
bool binary);
bool binary, Bitmapset *columns);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
static const char *logicalrep_read_namespace(StringInfo in);
/*
* Check if a column is covered by a column list.
*
* Need to be careful about NULL, which is treated as a column list covering
* all columns.
*/
static bool
column_in_column_list(int attnum, Bitmapset *columns)
{
return (columns == NULL || bms_is_member(attnum, columns));
}
/*
* Write BEGIN to the output stream.
*/
@ -398,7 +412,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *newslot, bool binary)
TupleTableSlot *newslot, bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@ -410,7 +424,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newslot, binary);
logicalrep_write_tuple(out, rel, newslot, binary, columns);
}
/*
@ -443,7 +457,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary)
bool binary, Bitmapset *columns)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@ -464,11 +478,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldslot, binary);
logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newslot, binary);
logicalrep_write_tuple(out, rel, newslot, binary, columns);
}
/*
@ -537,7 +551,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldslot, binary);
logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
}
/*
@ -702,7 +716,8 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
* Write relation description to the output stream.
*/
void
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
Bitmapset *columns)
{
char *relname;
@ -724,7 +739,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
logicalrep_write_attrs(out, rel);
logicalrep_write_attrs(out, rel, columns);
}
/*
@ -801,7 +816,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
bool binary)
bool binary, Bitmapset *columns)
{
TupleDesc desc;
Datum *values;
@ -813,8 +828,14 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
for (i = 0; i < desc->natts; i++)
{
if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped || att->attgenerated)
continue;
if (!column_in_column_list(att->attnum, columns))
continue;
nliveatts++;
}
pq_sendint16(out, nliveatts);
@ -833,6 +854,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
if (att->attisdropped || att->attgenerated)
continue;
if (!column_in_column_list(att->attnum, columns))
continue;
if (isnull[i])
{
pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@ -954,7 +978,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
* Write relation attribute metadata to the stream.
*/
static void
logicalrep_write_attrs(StringInfo out, Relation rel)
logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
{
TupleDesc desc;
int i;
@ -967,8 +991,14 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
/* send number of live attributes */
for (i = 0; i < desc->natts; i++)
{
if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped || att->attgenerated)
continue;
if (!column_in_column_list(att->attnum, columns))
continue;
nliveatts++;
}
pq_sendint16(out, nliveatts);
@ -987,6 +1017,9 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
if (att->attisdropped || att->attgenerated)
continue;
if (!column_in_column_list(att->attnum, columns))
continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
if (replidentfull ||
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,

View File

@ -113,6 +113,7 @@
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@ -702,12 +703,13 @@ fetch_remote_table_info(char *nspname, char *relname,
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
ListCell *lc;
bool first;
Bitmapset *included_cols = NULL;
lrel->nspname = nspname;
lrel->relname = relname;
@ -748,10 +750,110 @@ fetch_remote_table_info(char *nspname, char *relname,
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
/* Now fetch columns. */
/*
* Get column lists for each relation.
*
* For initial synchronization, column lists can be ignored in following
* cases:
*
* 1) one of the subscribed publications for the table hasn't specified
* any column list
*
* 2) one of the subscribed publications has puballtables set to true
*
* 3) one of the subscribed publications is declared as ALL TABLES IN
* SCHEMA that includes this relation
*
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
{
WalRcvExecResult *pubres;
TupleTableSlot *slot;
Oid attrsRow[] = {INT2OID};
StringInfoData pub_names;
bool first = true;
initStringInfo(&pub_names);
foreach(lc, MySubscription->publications)
{
if (!first)
appendStringInfo(&pub_names, ", ");
appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
first = false;
}
/*
* Fetch info about column lists for the relation (from all the
* publications). We unnest the int2vector values, because that
* makes it easier to combine lists by simply adding the attnums
* to a new bitmap (without having to parse the int2vector data).
* This preserves NULL values, so that if one of the publications
* has no column list, we'll know that.
*/
resetStringInfo(&cmd);
appendStringInfo(&cmd,
"SELECT DISTINCT unnest"
" FROM pg_publication p"
" LEFT OUTER JOIN pg_publication_rel pr"
" ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
" LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
" LATERAL pg_get_publication_tables(p.pubname) gpt"
" WHERE gpt.relid = %u"
" AND p.pubname IN ( %s )",
lrel->remoteid,
lrel->remoteid,
pub_names.data);
pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
lengthof(attrsRow), attrsRow);
if (pubres->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
nspname, relname, pubres->err)));
/*
* Merge the column lists (from different publications) by creating
* a single bitmap with all the attnums. If we find a NULL value,
* that means one of the publications has no column list for the
* table we're syncing.
*/
slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
{
Datum cfval = slot_getattr(slot, 1, &isnull);
/* NULL means empty column list, so we're done. */
if (isnull)
{
bms_free(included_cols);
included_cols = NULL;
break;
}
included_cols = bms_add_member(included_cols,
DatumGetInt16(cfval));
ExecClearTuple(slot);
}
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(pubres);
pfree(pub_names.data);
}
/*
* Now fetch column names and types.
*/
resetStringInfo(&cmd);
appendStringInfo(&cmd,
"SELECT a.attname,"
"SELECT a.attnum,"
" a.attname,"
" a.atttypid,"
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
@ -779,16 +881,35 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
lrel->attkeys = NULL;
/*
* Store the columns as a list of names. Ignore those that are not
* present in the column list, if there is one.
*/
natt = 0;
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
lrel->attnames[natt] =
TextDatumGetCString(slot_getattr(slot, 1, &isnull));
char *rel_colname;
AttrNumber attnum;
attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
/* If the column is not in the column list, skip it. */
if (included_cols != NULL && !bms_is_member(attnum, included_cols))
{
ExecClearTuple(slot);
continue;
}
rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
lrel->attnames[natt] = rel_colname;
lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
/* Should never happen. */
@ -931,8 +1052,24 @@ copy_table(Relation rel)
/* Regular table with no row filter */
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
appendStringInfo(&cmd, "COPY %s TO STDOUT",
{
appendStringInfo(&cmd, "COPY %s (",
quote_qualified_identifier(lrel.nspname, lrel.relname));
/*
* XXX Do we need to list the columns in all cases? Maybe we're replicating
* all columns?
*/
for (int i = 0; i < lrel.natts; i++)
{
if (i > 0)
appendStringInfoString(&cmd, ", ");
appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
}
appendStringInfo(&cmd, ") TO STDOUT");
}
else
{
/*

View File

@ -30,6 +30,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@ -90,7 +91,8 @@ static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
LogicalDecodingContext *ctx,
Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
@ -148,9 +150,6 @@ typedef struct RelationSyncEntry
*/
ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
EState *estate; /* executor state used for row filter */
MemoryContext cache_expr_cxt; /* private context for exprstate and
* estate, if any */
TupleTableSlot *new_slot; /* slot for storing new tuple */
TupleTableSlot *old_slot; /* slot for storing old tuple */
@ -169,6 +168,19 @@ typedef struct RelationSyncEntry
* having identical TupleDesc.
*/
AttrMap *attrmap;
/*
* Columns included in the publication, or NULL if all columns are
* included implicitly. Note that the attnums in this bitmap are not
* shifted by FirstLowInvalidHeapAttributeNumber.
*/
Bitmapset *columns;
/*
* Private context to store additional data for this entry - state for
* the row filter expressions, column list, etc.
*/
MemoryContext entry_cxt;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@ -200,6 +212,11 @@ static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
RelationSyncEntry *entry,
ReorderBufferChangeType *action);
/* column list routines */
static void pgoutput_column_list_init(PGOutputData *data,
List *publications,
RelationSyncEntry *entry);
/*
* Specify output plugin callbacks
*/
@ -622,11 +639,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
send_relation_and_attrs(ancestor, xid, ctx);
send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
RelationClose(ancestor);
}
send_relation_and_attrs(relation, xid, ctx);
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
@ -639,7 +656,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
*/
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx)
LogicalDecodingContext *ctx,
Bitmapset *columns)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
@ -662,13 +680,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
if (att->atttypid < FirstGenbkiObjectId)
continue;
/* Skip this attribute if it's not present in the column list */
if (columns != NULL && !bms_is_member(att->attnum, columns))
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, xid, relation);
logicalrep_write_rel(ctx->out, xid, relation, columns);
OutputPluginWrite(ctx, false);
}
@ -722,6 +744,28 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
return DatumGetBool(ret);
}
/*
* Make sure the per-entry memory context exists.
*/
static void
pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
{
Relation relation;
/* The context may already exist, in which case bail out. */
if (entry->entry_cxt)
return;
relation = RelationIdGetRelation(entry->publish_as_relid);
entry->entry_cxt = AllocSetContextCreate(data->cachectx,
"entry private context",
ALLOCSET_SMALL_SIZES);
MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
RelationGetRelationName(relation));
}
/*
* Initialize the row filter.
*/
@ -842,21 +886,13 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
{
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
Assert(entry->cache_expr_cxt == NULL);
/* Create the memory context for row filters */
entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx,
"Row filter expressions",
ALLOCSET_DEFAULT_SIZES);
MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt,
RelationGetRelationName(relation));
pgoutput_ensure_entry_cxt(data, entry);
/*
* Now all the filters for all pubactions are known. Combine them when
* their pubactions are the same.
*/
oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt);
oldctx = MemoryContextSwitchTo(entry->entry_cxt);
entry->estate = create_estate_for_relation(relation);
for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
{
@ -879,6 +915,105 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
}
}
/*
* Initialize the column list.
*/
static void
pgoutput_column_list_init(PGOutputData *data, List *publications,
RelationSyncEntry *entry)
{
ListCell *lc;
/*
* Find if there are any column lists for this relation. If there are,
* build a bitmap merging all the column lists.
*
* All the given publication-table mappings must be checked.
*
* Multiple publications might have multiple column lists for this relation.
*
* FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
* list" so it takes precedence.
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
HeapTuple cftuple = NULL;
Datum cfdatum = 0;
/*
* Assume there's no column list. Only if we find pg_publication_rel
* entry with a column list we'll switch it to false.
*/
bool pub_no_list = true;
/*
* If the publication is FOR ALL TABLES then it is treated the same as if
* there are no column lists (even if other publications have a list).
*/
if (!pub->alltables)
{
/*
* Check for the presence of a column list in this publication.
*
* Note: If we find no pg_publication_rel row, it's a publication
* defined for a whole schema, so it can't have a column list, just
* like a FOR ALL TABLES publication.
*/
cftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(entry->publish_as_relid),
ObjectIdGetDatum(pub->oid));
if (HeapTupleIsValid(cftuple))
{
/*
* Lookup the column list attribute.
*
* Note: We update the pub_no_list value directly, because if
* the value is NULL, we have no list (and vice versa).
*/
cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
Anum_pg_publication_rel_prattrs,
&pub_no_list);
/*
* Build the column list bitmap in the per-entry context.
*
* We need to merge column lists from all publications, so we
* update the same bitmapset. If the column list is null, we
* interpret it as replicating all columns.
*/
if (!pub_no_list) /* when not null */
{
pgoutput_ensure_entry_cxt(data, entry);
entry->columns = pub_collist_to_bitmapset(entry->columns,
cfdatum,
entry->entry_cxt);
}
}
}
/*
* Found a publication with no column list, so we're done. But first
* discard column list we might have from preceding publications.
*/
if (pub_no_list)
{
if (cftuple)
ReleaseSysCache(cftuple);
bms_free(entry->columns);
entry->columns = NULL;
break;
}
ReleaseSysCache(cftuple);
} /* loop all subscribed publications */
}
/*
* Initialize the slot for storing new and old tuples, and build the map that
* will be used to convert the relation's tuples into the ancestor's format.
@ -1243,7 +1378,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
data->binary);
data->binary, relentry->columns);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
@ -1297,11 +1432,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel,
new_slot, data->binary);
new_slot, data->binary,
relentry->columns);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel,
old_slot, new_slot, data->binary);
old_slot, new_slot, data->binary,
relentry->columns);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel,
@ -1794,8 +1931,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
entry->cache_expr_cxt = NULL;
entry->entry_cxt = NULL;
entry->publish_as_relid = InvalidOid;
entry->columns = NULL;
entry->attrmap = NULL;
}
@ -1841,6 +1979,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
bms_free(entry->columns);
entry->columns = NULL;
entry->pubactions.pubinsert = false;
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
@ -1865,17 +2005,18 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/*
* Row filter cache cleanups.
*/
if (entry->cache_expr_cxt)
MemoryContextDelete(entry->cache_expr_cxt);
if (entry->entry_cxt)
MemoryContextDelete(entry->entry_cxt);
entry->cache_expr_cxt = NULL;
entry->entry_cxt = NULL;
entry->estate = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
/*
* Build publication cache. We can't use one provided by relcache as
* relcache considers all publications given relation is in, but here
* we only need to consider ones that the subscriber requested.
* relcache considers all publications that the given relation is in,
* but here we only need to consider ones that the subscriber
* requested.
*/
foreach(lc, data->publications)
{
@ -1946,6 +2087,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
}
/*
* If the relation is to be published, determine actions to
* publish, and list of columns, if appropriate.
*
* Don't publish changes for partitioned tables, because
* publishing those of its partitions suffices, unless partition
* changes won't be published due to pubviaroot being set.
@ -2007,6 +2151,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
/* Initialize the column list */
pgoutput_column_list_init(data, rel_publications, entry);
}
list_free(pubids);

View File

@ -5575,6 +5575,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
memset(pubdesc, 0, sizeof(PublicationDesc));
pubdesc->rf_valid_for_update = true;
pubdesc->rf_valid_for_delete = true;
pubdesc->cols_valid_for_update = true;
pubdesc->cols_valid_for_delete = true;
return;
}
@ -5587,6 +5589,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
memset(pubdesc, 0, sizeof(PublicationDesc));
pubdesc->rf_valid_for_update = true;
pubdesc->rf_valid_for_delete = true;
pubdesc->cols_valid_for_update = true;
pubdesc->cols_valid_for_delete = true;
/* Fetch the publication membership info. */
puboids = GetRelationPublications(relid);
@ -5657,7 +5661,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
*/
if (!pubform->puballtables &&
(pubform->pubupdate || pubform->pubdelete) &&
contain_invalid_rfcolumn(pubid, relation, ancestors,
pub_rf_contains_invalid_column(pubid, relation, ancestors,
pubform->pubviaroot))
{
if (pubform->pubupdate)
@ -5666,6 +5670,23 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
pubdesc->rf_valid_for_delete = false;
}
/*
* Check if all columns are part of the REPLICA IDENTITY index or not.
*
* If the publication is FOR ALL TABLES then it means the table has no
* column list and we can skip the validation.
*/
if (!pubform->puballtables &&
(pubform->pubupdate || pubform->pubdelete) &&
pub_collist_contains_invalid_column(pubid, relation, ancestors,
pubform->pubviaroot))
{
if (pubform->pubupdate)
pubdesc->cols_valid_for_update = false;
if (pubform->pubdelete)
pubdesc->cols_valid_for_delete = false;
}
ReleaseSysCache(tup);
/*
@ -5677,6 +5698,16 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
!pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete)
break;
/*
* If we know everything is replicated and the column list is invalid
* for update and delete, there is no point to check for other
* publications.
*/
if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
!pubdesc->cols_valid_for_update && !pubdesc->cols_valid_for_delete)
break;
}
if (relation->rd_pubdesc)

View File

@ -4131,6 +4131,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
int i_prpubid;
int i_prrelid;
int i_prrelqual;
int i_prattrs;
int i,
j,
ntups;
@ -4144,12 +4145,20 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid, "
"pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual "
"FROM pg_catalog.pg_publication_rel");
"pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
"(CASE\n"
" WHEN pr.prattrs IS NOT NULL THEN\n"
" (SELECT array_agg(attname)\n"
" FROM\n"
" pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
" pg_catalog.pg_attribute\n"
" WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
" ELSE NULL END) prattrs "
"FROM pg_catalog.pg_publication_rel pr");
else
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid, "
"NULL AS prrelqual "
"NULL AS prrelqual, NULL AS prattrs "
"FROM pg_catalog.pg_publication_rel");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@ -4160,6 +4169,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
i_prpubid = PQfnumber(res, "prpubid");
i_prrelid = PQfnumber(res, "prrelid");
i_prrelqual = PQfnumber(res, "prrelqual");
i_prattrs = PQfnumber(res, "prattrs");
/* this allocation may be more than we need */
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@ -4205,6 +4215,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
else
pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
if (!PQgetisnull(res, i, i_prattrs))
{
char **attnames;
int nattnames;
PQExpBuffer attribs;
if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
&attnames, &nattnames))
fatal("could not parse %s array", "prattrs");
attribs = createPQExpBuffer();
for (int k = 0; k < nattnames; k++)
{
if (k > 0)
appendPQExpBufferStr(attribs, ", ");
appendPQExpBufferStr(attribs, fmtId(attnames[k]));
}
pubrinfo[j].pubrattrs = attribs->data;
}
else
pubrinfo[j].pubrattrs = NULL;
/* Decide whether we want to dump it */
selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
@ -4300,6 +4332,9 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
appendPQExpBuffer(query, " %s",
fmtQualifiedDumpable(tbinfo));
if (pubrinfo->pubrattrs)
appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
if (pubrinfo->pubrelqual)
{
/*

View File

@ -634,6 +634,7 @@ typedef struct _PublicationRelInfo
PublicationInfo *publication;
TableInfo *pubtable;
char *pubrelqual;
char *pubrattrs;
} PublicationRelInfo;
/*

View File

@ -2438,6 +2438,28 @@ my %tests = (
unlike => { exclude_dump_test_schema => 1, },
},
'ALTER PUBLICATION pub1 ADD TABLE test_sixth_table (col3, col2)' => {
create_order => 52,
create_sql =>
'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_sixth_table (col3, col2);',
regexp => qr/^
\QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_sixth_table (col2, col3);\E
/xm,
like => { %full_runs, section_post_data => 1, },
unlike => { exclude_dump_test_schema => 1, },
},
'ALTER PUBLICATION pub1 ADD TABLE test_seventh_table (col3, col2) WHERE (col1 = 1)' => {
create_order => 52,
create_sql =>
'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_seventh_table (col3, col2) WHERE (col1 = 1);',
regexp => qr/^
\QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_seventh_table (col2, col3) WHERE ((col1 = 1));\E
/xm,
like => { %full_runs, section_post_data => 1, },
unlike => { exclude_dump_test_schema => 1, },
},
'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test' => {
create_order => 51,
create_sql =>
@ -2809,6 +2831,44 @@ my %tests = (
unlike => { exclude_dump_test_schema => 1, },
},
'CREATE TABLE test_sixth_table' => {
create_order => 6,
create_sql => 'CREATE TABLE dump_test.test_sixth_table (
col1 int,
col2 text,
col3 bytea
);',
regexp => qr/^
\QCREATE TABLE dump_test.test_sixth_table (\E
\n\s+\Qcol1 integer,\E
\n\s+\Qcol2 text,\E
\n\s+\Qcol3 bytea\E
\n\);
/xm,
like =>
{ %full_runs, %dump_test_schema_runs, section_pre_data => 1, },
unlike => { exclude_dump_test_schema => 1, },
},
'CREATE TABLE test_seventh_table' => {
create_order => 6,
create_sql => 'CREATE TABLE dump_test.test_seventh_table (
col1 int,
col2 text,
col3 bytea
);',
regexp => qr/^
\QCREATE TABLE dump_test.test_seventh_table (\E
\n\s+\Qcol1 integer,\E
\n\s+\Qcol2 text,\E
\n\s+\Qcol3 bytea\E
\n\);
/xm,
like =>
{ %full_runs, %dump_test_schema_runs, section_pre_data => 1, },
unlike => { exclude_dump_test_schema => 1, },
},
'CREATE TABLE test_table_identity' => {
create_order => 3,
create_sql => 'CREATE TABLE dump_test.test_table_identity (

View File

@ -2960,6 +2960,7 @@ describeOneTableDetails(const char *schemaname,
printfPQExpBuffer(&buf,
"SELECT pubname\n"
" , NULL\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
" JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n"
" JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n"
@ -2967,6 +2968,12 @@ describeOneTableDetails(const char *schemaname,
"UNION\n"
"SELECT pubname\n"
" , pg_get_expr(pr.prqual, c.oid)\n"
" , (CASE WHEN pr.prattrs IS NOT NULL THEN\n"
" (SELECT string_agg(attname, ', ')\n"
" FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
" pg_catalog.pg_attribute\n"
" WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
" ELSE NULL END) "
"FROM pg_catalog.pg_publication p\n"
" JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
" JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n"
@ -2974,6 +2981,7 @@ describeOneTableDetails(const char *schemaname,
"UNION\n"
"SELECT pubname\n"
" , NULL\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
"ORDER BY 1;",
@ -2984,12 +2992,14 @@ describeOneTableDetails(const char *schemaname,
printfPQExpBuffer(&buf,
"SELECT pubname\n"
" , NULL\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
"WHERE pr.prrelid = '%s'\n"
"UNION ALL\n"
"SELECT pubname\n"
" , NULL\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
"ORDER BY 1;",
@ -3011,6 +3021,11 @@ describeOneTableDetails(const char *schemaname,
printfPQExpBuffer(&buf, " \"%s\"",
PQgetvalue(result, i, 0));
/* column list (if any) */
if (!PQgetisnull(result, i, 2))
appendPQExpBuffer(&buf, " (%s)",
PQgetvalue(result, i, 2));
/* row filter (if any) */
if (!PQgetisnull(result, i, 1))
appendPQExpBuffer(&buf, " WHERE %s",
@ -5979,7 +5994,7 @@ listPublications(const char *pattern)
*/
static bool
addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
bool singlecol, printTableContent *cont)
bool as_schema, printTableContent *cont)
{
PGresult *res;
int count = 0;
@ -5996,15 +6011,19 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
for (i = 0; i < count; i++)
{
if (!singlecol)
if (as_schema)
printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
else
{
printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1));
if (!PQgetisnull(res, i, 3))
appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 3));
if (!PQgetisnull(res, i, 2))
appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2));
}
else
printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
printTableAddFooter(cont, buf->data);
}
@ -6155,11 +6174,22 @@ describePublications(const char *pattern)
printfPQExpBuffer(&buf,
"SELECT n.nspname, c.relname");
if (pset.sversion >= 150000)
{
appendPQExpBufferStr(&buf,
", pg_get_expr(pr.prqual, c.oid)");
appendPQExpBufferStr(&buf,
", (CASE WHEN pr.prattrs IS NOT NULL THEN\n"
" pg_catalog.array_to_string("
" ARRAY(SELECT attname\n"
" FROM\n"
" pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
" pg_catalog.pg_attribute\n"
" WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
" ELSE NULL END)");
}
else
appendPQExpBufferStr(&buf,
", NULL");
", NULL, NULL");
appendPQExpBuffer(&buf,
"\nFROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
@ -6189,9 +6219,9 @@ describePublications(const char *pattern)
if (!puballsequences)
{
/* Get the tables for the specified publication */
/* Get the sequences for the specified publication */
printfPQExpBuffer(&buf,
"SELECT n.nspname, c.relname, NULL\n"
"SELECT n.nspname, c.relname, NULL, NULL\n"
"FROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
" pg_catalog.pg_publication_rel pr\n"

View File

@ -95,6 +95,13 @@ typedef struct PublicationDesc
*/
bool rf_valid_for_update;
bool rf_valid_for_delete;
/*
* true if the columns are part of the replica identity or the publication actions
* do not include UPDATE or DELETE.
*/
bool cols_valid_for_update;
bool cols_valid_for_delete;
} PublicationDesc;
typedef struct Publication
@ -111,6 +118,7 @@ typedef struct PublicationRelInfo
{
Relation relation;
Node *whereClause;
List *columns;
} PublicationRelInfo;
extern Publication *GetPublication(Oid pubid);
@ -137,6 +145,7 @@ extern List *GetPublicationRelations(Oid pubid, char objectType,
PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
extern void GetActionsInPublication(Oid pubid, PublicationActions *actions);
extern List *GetPublicationSchemas(Oid pubid, char objectType);
extern List *GetSchemaPublications(Oid schemaid, char objectType);
extern List *GetSchemaPublicationRelations(Oid schemaid, char objectType,
@ -160,6 +169,9 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
char objectType,
bool if_not_exists);
extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols,
MemoryContext mcxt);
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);

View File

@ -34,6 +34,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
#ifdef CATALOG_VARLEN /* variable-length fields start here */
pg_node_tree prqual; /* qualifications */
int2vector prattrs; /* columns to replicate */
#endif
} FormData_pg_publication_rel;

View File

@ -31,7 +31,9 @@ extern void RemovePublicationSchemaById(Oid psoid);
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
extern void InvalidatePublicationRels(List *relids);
extern bool contain_invalid_rfcolumn(Oid pubid, Relation relation,
extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation,
List *ancestors, bool pubviaroot);
extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation,
List *ancestors, bool pubviaroot);
#endif /* PUBLICATIONCMDS_H */

View File

@ -3654,6 +3654,7 @@ typedef struct PublicationTable
NodeTag type;
RangeVar *relation; /* relation to be published */
Node *whereClause; /* qualifications */
List *columns; /* List of columns in a publication table */
} PublicationTable;
/*

View File

@ -222,12 +222,12 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
Relation rel,
TupleTableSlot *newslot,
bool binary);
bool binary, Bitmapset *columns);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel,
TupleTableSlot *oldslot,
TupleTableSlot *newslot, bool binary);
TupleTableSlot *newslot, bool binary, Bitmapset *columns);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
@ -250,7 +250,7 @@ extern void logicalrep_write_sequence(StringInfo out, Relation rel,
bool is_called);
extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
Relation rel, Bitmapset *columns);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);

View File

@ -1137,6 +1137,369 @@ DROP TABLE rf_tbl_abcd_pk;
DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
-- fail - duplicate tables are not allowed if that table has any column lists
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert');
ERROR: conflicting or redundant column lists for table "testpub_tbl1"
CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert');
ERROR: conflicting or redundant column lists for table "testpub_tbl1"
RESET client_min_messages;
-- test for column lists
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1;
CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert');
RESET client_min_messages;
CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text,
d int generated always as (a + length(b)) stored);
-- error: column "x" does not exist
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
ERROR: column "x" of relation "testpub_tbl5" does not exist
-- error: replica identity "a" not included in the column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
UPDATE testpub_tbl5 SET a = 1;
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
-- error: generated column "d" can't be in list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
ERROR: cannot reference generated column "d" in publication column list
-- error: system attributes "ctid" not allowed in column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
ERROR: cannot reference system column "ctid" in publication column list
-- ok
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice
ERROR: cannot drop column c of table testpub_tbl5 because other objects depend on it
DETAIL: publication of table testpub_tbl5 in publication testpub_fortable depends on column c of table testpub_tbl5
HINT: Use DROP ... CASCADE to drop the dependent objects too.
-- ok: for insert-only publication, any column list is acceptable
ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c);
/* not all replica identities are good enough */
CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c);
ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL;
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
-- error: replica identity (b,c) is not covered by column list (a, c)
UPDATE testpub_tbl5 SET a = 1;
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
-- error: change the replica identity to "b", and column list to (a, c)
-- then update fails, because (a, c) does not cover replica identity
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
UPDATE testpub_tbl5 SET a = 1;
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
/* But if upd/del are not published, it works OK */
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
RESET client_min_messages;
ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok
\dRp+ testpub_table_ins
Publication testpub_table_ins
Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root
--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+----------
regress_publication_user | f | f | t | f | f | t | f | f
Tables:
"public.testpub_tbl5" (a)
-- tests with REPLICA IDENTITY FULL
CREATE TABLE testpub_tbl6 (a int, b text, c text);
ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);
UPDATE testpub_tbl6 SET a = 1;
ERROR: cannot update table "testpub_tbl6"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
UPDATE testpub_tbl6 SET a = 1;
-- make sure changing the column list is propagated to the catalog
CREATE TABLE testpub_tbl7 (a int primary key, b text, c text);
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b);
\d+ testpub_tbl7
Table "public.testpub_tbl7"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+----------+--------------+-------------
a | integer | | not null | | plain | |
b | text | | | | extended | |
c | text | | | | extended | |
Indexes:
"testpub_tbl7_pkey" PRIMARY KEY, btree (a)
Publications:
"testpub_fortable" (a, b)
-- ok: the column list is the same, we should skip this table (or at least not fail)
ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b);
\d+ testpub_tbl7
Table "public.testpub_tbl7"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+----------+--------------+-------------
a | integer | | not null | | plain | |
b | text | | | | extended | |
c | text | | | | extended | |
Indexes:
"testpub_tbl7_pkey" PRIMARY KEY, btree (a)
Publications:
"testpub_fortable" (a, b)
-- ok: the column list changes, make sure the catalog gets updated
ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c);
\d+ testpub_tbl7
Table "public.testpub_tbl7"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+----------+--------------+-------------
a | integer | | not null | | plain | |
b | text | | | | extended | |
c | text | | | | extended | |
Indexes:
"testpub_tbl7_pkey" PRIMARY KEY, btree (a)
Publications:
"testpub_fortable" (a, c)
-- column list for partitioned tables has to cover replica identities for
-- all child relations
CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
-- first partition has replica identity "a"
CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0);
ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
-- second partition has replica identity "b"
CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1);
ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b);
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
-- ok: column list covers both "a" and "b"
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true');
RESET client_min_messages;
-- ok: the same thing, but try plain ADD TABLE
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
UPDATE testpub_tbl8 SET a = 1;
-- failure: column list does not cover replica identity for the second partition
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
UPDATE testpub_tbl8 SET a = 1;
ERROR: cannot update table "testpub_tbl8_1"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
-- failure: one of the partitions has REPLICA IDENTITY FULL
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
UPDATE testpub_tbl8 SET a = 1;
ERROR: cannot update table "testpub_tbl8_1"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
-- add table and then try changing replica identity
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
-- failure: replica identity full can't be used with a column list
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
UPDATE testpub_tbl8 SET a = 1;
ERROR: cannot update table "testpub_tbl8_1"
DETAIL: Column list used by the publication does not cover the replica identity.
-- failure: replica identity has to be covered by the column list
ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey;
ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
UPDATE testpub_tbl8 SET a = 1;
ERROR: cannot update table "testpub_tbl8_1"
DETAIL: Column list used by the publication does not cover the replica identity.
DROP TABLE testpub_tbl8;
-- column list for partitioned tables has to cover replica identities for
-- all child relations
CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
-- first partition has replica identity "a"
CREATE TABLE testpub_tbl8_0 (a int, b text, c text);
ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
-- second partition has replica identity "b"
CREATE TABLE testpub_tbl8_1 (a int, b text, c text);
ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
-- ok: attaching first partition works, because (a) is in column list
ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0);
-- failure: second partition has replica identity (c), which si not in column list
ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1);
UPDATE testpub_tbl8 SET a = 1;
ERROR: cannot update table "testpub_tbl8_1"
DETAIL: Column list used by the publication does not cover the replica identity.
-- failure: changing replica identity to FULL for partition fails, because
-- of the column list on the parent
ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL;
UPDATE testpub_tbl8 SET a = 1;
ERROR: cannot update table "testpub_tbl8_0"
DETAIL: Column list used by the publication does not cover the replica identity.
DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1;
DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list;
-- ======================================================
-- Test combination of column list and row filter
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_both_filters;
RESET client_min_messages;
CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
\dRp+ testpub_both_filters
Publication testpub_both_filters
Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root
--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+----------
regress_publication_user | f | f | t | t | t | t | t | f
Tables:
"public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1)
\d+ testpub_tbl_both_filters
Table "public.testpub_tbl_both_filters"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+---------+--------------+-------------
a | integer | | not null | | plain | |
b | integer | | | | plain | |
c | integer | | not null | | plain | |
Indexes:
"testpub_tbl_both_filters_pkey" PRIMARY KEY, btree (a, c) REPLICA IDENTITY
Publications:
"testpub_both_filters" (a, c) WHERE (c <> 1)
DROP TABLE testpub_tbl_both_filters;
DROP PUBLICATION testpub_both_filters;
-- ======================================================
-- More column list tests for validating column references
CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int);
CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b));
CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a);
CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY);
ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10);
-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing)
-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK.
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b);
RESET client_min_messages;
-- ok - (a,b) coverts all PK cols
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c);
-- ok - (a,b,c) coverts all PK cols
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
-- fail - "b" is missing from the column list
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b);
-- fail - "a" is missing from the column list
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column list used by the publication does not cover the replica identity.
-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
-- ok - there's no replica identity, so any column list works
-- note: it fails anyway, just a bit later because UPDATE requires RI
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
-- Case 2. REPLICA IDENTITY FULL
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
-- fail - with REPLICA IDENTITY FULL no column list is allowed
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d);
-- fail - with REPLICA IDENTITY FULL no column list is allowed
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk"
DETAIL: Column list used by the publication does not cover the replica identity.
-- Case 3. REPLICA IDENTITY NOTHING
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
-- it still fails later because without RI we can't replicate updates
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d);
-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
-- it still fails later because without RI we can't replicate updates
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d);
-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
-- it still fails later because without RI we can't replicate updates
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
-- Case 4. REPLICA IDENTITY INDEX
ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c);
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c;
ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c);
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c);
-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Tests for partitioned table
-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - cannot use column list for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
ERROR: cannot use publication column list for relation "rf_tbl_abcd_part_pk"
DETAIL: column list cannot be used for a partitioned table when publish_via_partition_root is false.
-- ok - can use column list for partition
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use column list for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is
-- used for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
ERROR: cannot set publish_via_partition_root = false for publication "testpub6"
DETAIL: The publication contains a column list for a partitioned table "rf_tbl_abcd_part_pk" which is not allowed when publish_via_partition_root is false.
-- Now change the root column list to use a column "b"
-- (which is not in the replica identity)
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b);
-- ok - we don't have column list for partitioned table.
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_part_pk_1"
DETAIL: Column list used by the publication does not cover the replica identity.
-- set PUBLISH_VIA_PARTITION_ROOT to true
-- can use column list for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use column list for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_part_pk_1"
DETAIL: Column list used by the publication does not cover the replica identity.
DROP PUBLICATION testpub6;
DROP TABLE rf_tbl_abcd_pk;
DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
CREATE TABLE testpub_tbl4(a int);
@ -1582,6 +1945,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
Tables from schemas:
"pub_test1"
-- Verify that it fails to add a schema with a column specification
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
ERROR: syntax error at or near "("
LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
^
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
ERROR: column specification not allowed for schema
LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
^
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;

View File

@ -568,6 +568,289 @@ DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
-- fail - duplicate tables are not allowed if that table has any column lists
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert');
CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert');
RESET client_min_messages;
-- test for column lists
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1;
CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert');
RESET client_min_messages;
CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text,
d int generated always as (a + length(b)) stored);
-- error: column "x" does not exist
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
-- error: replica identity "a" not included in the column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
UPDATE testpub_tbl5 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
-- error: generated column "d" can't be in list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-- error: system attributes "ctid" not allowed in column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
-- ok
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice
-- ok: for insert-only publication, any column list is acceptable
ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c);
/* not all replica identities are good enough */
CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c);
ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL;
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
-- error: replica identity (b,c) is not covered by column list (a, c)
UPDATE testpub_tbl5 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
-- error: change the replica identity to "b", and column list to (a, c)
-- then update fails, because (a, c) does not cover replica identity
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
UPDATE testpub_tbl5 SET a = 1;
/* But if upd/del are not published, it works OK */
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
RESET client_min_messages;
ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok
\dRp+ testpub_table_ins
-- tests with REPLICA IDENTITY FULL
CREATE TABLE testpub_tbl6 (a int, b text, c text);
ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);
UPDATE testpub_tbl6 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
UPDATE testpub_tbl6 SET a = 1;
-- make sure changing the column list is propagated to the catalog
CREATE TABLE testpub_tbl7 (a int primary key, b text, c text);
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b);
\d+ testpub_tbl7
-- ok: the column list is the same, we should skip this table (or at least not fail)
ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b);
\d+ testpub_tbl7
-- ok: the column list changes, make sure the catalog gets updated
ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c);
\d+ testpub_tbl7
-- column list for partitioned tables has to cover replica identities for
-- all child relations
CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
-- first partition has replica identity "a"
CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0);
ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
-- second partition has replica identity "b"
CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1);
ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b);
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
-- ok: column list covers both "a" and "b"
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true');
RESET client_min_messages;
-- ok: the same thing, but try plain ADD TABLE
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
UPDATE testpub_tbl8 SET a = 1;
-- failure: column list does not cover replica identity for the second partition
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
UPDATE testpub_tbl8 SET a = 1;
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
-- failure: one of the partitions has REPLICA IDENTITY FULL
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c);
UPDATE testpub_tbl8 SET a = 1;
ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8;
-- add table and then try changing replica identity
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
-- failure: replica identity full can't be used with a column list
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL;
UPDATE testpub_tbl8 SET a = 1;
-- failure: replica identity has to be covered by the column list
ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey;
ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
UPDATE testpub_tbl8 SET a = 1;
DROP TABLE testpub_tbl8;
-- column list for partitioned tables has to cover replica identities for
-- all child relations
CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a);
ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b);
-- first partition has replica identity "a"
CREATE TABLE testpub_tbl8_0 (a int, b text, c text);
ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a);
ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey;
-- second partition has replica identity "b"
CREATE TABLE testpub_tbl8_1 (a int, b text, c text);
ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c);
ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey;
-- ok: attaching first partition works, because (a) is in column list
ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0);
-- failure: second partition has replica identity (c), which si not in column list
ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1);
UPDATE testpub_tbl8 SET a = 1;
-- failure: changing replica identity to FULL for partition fails, because
-- of the column list on the parent
ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL;
UPDATE testpub_tbl8 SET a = 1;
DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1;
DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list;
-- ======================================================
-- Test combination of column list and row filter
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_both_filters;
RESET client_min_messages;
CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
\dRp+ testpub_both_filters
\d+ testpub_tbl_both_filters
DROP TABLE testpub_tbl_both_filters;
DROP PUBLICATION testpub_both_filters;
-- ======================================================
-- More column list tests for validating column references
CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int);
CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b));
CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a);
CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY);
ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10);
-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing)
-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK.
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b);
RESET client_min_messages;
-- ok - (a,b) coverts all PK cols
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c);
-- ok - (a,b,c) coverts all PK cols
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
-- fail - "b" is missing from the column list
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b);
-- fail - "a" is missing from the column list
UPDATE rf_tbl_abcd_pk SET a = 1;
-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
-- ok - there's no replica identity, so any column list works
-- note: it fails anyway, just a bit later because UPDATE requires RI
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 2. REPLICA IDENTITY FULL
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
-- fail - with REPLICA IDENTITY FULL no column list is allowed
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d);
-- fail - with REPLICA IDENTITY FULL no column list is allowed
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 3. REPLICA IDENTITY NOTHING
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
-- it still fails later because without RI we can't replicate updates
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d);
-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
-- it still fails later because without RI we can't replicate updates
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d);
-- ok - REPLICA IDENTITY NOTHING means all column lists are valid
-- it still fails later because without RI we can't replicate updates
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 4. REPLICA IDENTITY INDEX
ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c);
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c;
ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c);
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a);
-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c);
-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a);
-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_nopk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c);
-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c"
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Tests for partitioned table
-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - cannot use column list for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
-- ok - can use column list for partition
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use column list for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is
-- used for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- Now change the root column list to use a column "b"
-- (which is not in the replica identity)
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b);
-- ok - we don't have column list for partitioned table.
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- set PUBLISH_VIA_PARTITION_ROOT to true
-- can use column list for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use column list for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
DROP PUBLICATION testpub6;
DROP TABLE rf_tbl_abcd_pk;
DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
CREATE TABLE testpub_tbl4(a int);
@ -809,6 +1092,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
\dRp+ testpub1_forschema
-- Verify that it fails to add a schema with a column specification
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
-- cleanup pub_test1 schema for invalidation tests
ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;

File diff suppressed because it is too large Load Diff