From 923def9a533a7d986acfb524139d8b9e5466d0a5 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Sat, 26 Mar 2022 00:45:21 +0100 Subject: [PATCH] Allow specifying column lists for logical replication This allows specifying an optional column list when adding a table to logical replication. The column list may be specified after the table name, enclosed in parentheses. Columns not included in this list are not sent to the subscriber, allowing the schema on the subscriber to be a subset of the publisher schema. For UPDATE/DELETE publications, the column list needs to cover all REPLICA IDENTITY columns. For INSERT publications, the column list is arbitrary and may omit some REPLICA IDENTITY columns. Furthermore, if the table uses REPLICA IDENTITY FULL, column list is not allowed. The column list can contain only simple column references. Complex expressions, function calls etc. are not allowed. This restriction could be relaxed in the future. During the initial table synchronization, only columns included in the column list are copied to the subscriber. If the subscription has several publications, containing the same table with different column lists, columns specified in any of the lists will be copied. This means all columns are replicated if the table has no column list at all (which is treated as column list with all columns), or when of the publications is defined as FOR ALL TABLES (possibly IN SCHEMA that matches the schema of the table). For partitioned tables, publish_via_partition_root determines whether the column list for the root or the leaf relation will be used. If the parameter is 'false' (the default), the list defined for the leaf relation is used. Otherwise, the column list for the root partition will be used. Psql commands \dRp+ and \d now display any column lists. Author: Tomas Vondra, Alvaro Herrera, Rahila Syed Reviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed, Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yu Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 15 +- doc/src/sgml/protocol.sgml | 3 +- doc/src/sgml/ref/alter_publication.sgml | 18 +- doc/src/sgml/ref/create_publication.sgml | 17 +- src/backend/catalog/pg_publication.c | 145 +++ src/backend/commands/publicationcmds.c | 265 ++++- src/backend/executor/execReplication.c | 19 +- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/equalfuncs.c | 1 + src/backend/parser/gram.y | 33 +- src/backend/replication/logical/proto.c | 61 +- src/backend/replication/logical/tablesync.c | 153 ++- src/backend/replication/pgoutput/pgoutput.c | 201 +++- src/backend/utils/cache/relcache.c | 33 +- src/bin/pg_dump/pg_dump.c | 41 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 60 + src/bin/psql/describe.c | 44 +- src/include/catalog/pg_publication.h | 12 + src/include/catalog/pg_publication_rel.h | 1 + src/include/commands/publicationcmds.h | 4 +- src/include/nodes/parsenodes.h | 1 + src/include/replication/logicalproto.h | 6 +- src/test/regress/expected/publication.out | 372 ++++++ src/test/regress/sql/publication.sql | 287 +++++ src/test/subscription/t/031_column_list.pl | 1131 +++++++++++++++++++ 26 files changed, 2833 insertions(+), 92 deletions(-) create mode 100644 src/test/subscription/t/031_column_list.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index b8c954a554..560e205b95 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -4410,7 +4410,7 @@ SCRAM-SHA-256$<iteration count>:&l This is an array of indnatts values that - indicate which table columns this index indexes. For example a value + indicate which table columns this index indexes. For example, a value of 1 3 would mean that the first and the third table columns make up the index entries. Key columns come before non-key (included) columns. A zero in this array indicates that the @@ -6291,6 +6291,19 @@ SCRAM-SHA-256$<iteration count>:&l Reference to schema + + + + prattrs int2vector + (references pg_attribute.attnum) + + + This is an array of values that indicates which table columns are + part of the publication. For example, a value of 1 3 + would mean that the first and the third table columns are published. + A null value indicates that all columns are published. + + diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 8bb34a7c5b..2fa3cedfe9 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -7016,7 +7016,8 @@ Relation - Next, the following message part appears for each column (except generated columns): + Next, the following message part appears for each column included in + the publication (except generated columns): diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index a8cc8f3dc2..40366a10fe 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -30,7 +30,7 @@ ALTER PUBLICATION name RENAME TO where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] SEQUENCE sequence_name [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] ALL SEQUENCES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -114,6 +114,14 @@ ALTER PUBLICATION name RENAME TO * can be specified after the table name to explicitly indicate that descendant tables are included. + + + + Optionally, a column list can be specified. See for details. + + + If the optional WHERE clause is specified, rows for which the expression evaluates to false or null will not be published. Note that parentheses @@ -185,7 +193,13 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); Add some tables to the publication: -ALTER PUBLICATION mypublication ADD TABLE users, departments; +ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments; + + + + Change the set of columns published for a table: + +ALTER PUBLICATION mypublication SET TABLE users (user_id, firstname, lastname), TABLE departments; diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index e5081eb50e..d2739968d9 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -33,7 +33,7 @@ CREATE PUBLICATION name where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] SEQUENCE sequence_name [ * ] [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] ALL SEQUENCES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -93,6 +93,13 @@ CREATE PUBLICATION name TRUNCATE commands. + + When a column list is specified, only the named columns are replicated. + If no column list is specified, all columns of the table are replicated + through this publication, including any columns added later. If a column + list is specified, it must include the replica identity columns. + + Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, @@ -348,6 +355,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABL sales: CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales; + + + + Create a publication that publishes all changes for table users, + but replicates only columns user_id and + firstname: + +CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname); diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 514a94796e..a5a54e676e 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -45,6 +45,9 @@ #include "utils/rel.h" #include "utils/syscache.h" +static void publication_translate_columns(Relation targetrel, List *columns, + int *natts, AttrNumber **attrs); + /* * Check if relation can be in given publication and throws appropriate * error if not. @@ -395,6 +398,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Publication *pub = GetPublication(pubid); + AttrNumber *attarray = NULL; + int natts = 0; ObjectAddress myself, referenced; List *relids = NIL; @@ -422,6 +427,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, check_publication_add_relation(targetrel); + /* + * Translate column names to attnums and make sure the column list contains + * only allowed elements (no system or generated columns etc.). Also build + * an array of attnums, for storing in the catalog. + */ + publication_translate_columns(pri->relation, pri->columns, + &natts, &attarray); + /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -440,6 +453,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, else nulls[Anum_pg_publication_rel_prqual - 1] = true; + /* Add column list, if available */ + if (pri->columns) + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts)); + else + nulls[Anum_pg_publication_rel_prattrs - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -463,6 +482,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, DEPENDENCY_NORMAL, DEPENDENCY_NORMAL, false); + /* Add dependency on the columns, if any are listed */ + for (int i = 0; i < natts; i++) + { + ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + /* Close the table. */ table_close(rel, RowExclusiveLock); @@ -482,6 +508,125 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, return myself; } +/* qsort comparator for attnums */ +static int +compare_int16(const void *a, const void *b) +{ + int av = *(const int16 *) a; + int bv = *(const int16 *) b; + + /* this can't overflow if int is wider than int16 */ + return (av - bv); +} + +/* + * Translate a list of column names to an array of attribute numbers + * and a Bitmapset with them; verify that each attribute is appropriate + * to have in a publication column list (no system or generated attributes, + * no duplicates). Additional checks with replica identity are done later; + * see check_publication_columns. + * + * Note that the attribute numbers are *not* offset by + * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this + * is okay. + */ +static void +publication_translate_columns(Relation targetrel, List *columns, + int *natts, AttrNumber **attrs) +{ + AttrNumber *attarray = NULL; + Bitmapset *set = NULL; + ListCell *lc; + int n = 0; + TupleDesc tupdesc = RelationGetDescr(targetrel); + + /* Bail out when no column list defined. */ + if (!columns) + return; + + /* + * Translate list of columns to attnums. We prohibit system attributes and + * make sure there are no duplicate columns. + */ + attarray = palloc(sizeof(AttrNumber) * list_length(columns)); + foreach(lc, columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel))); + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference system column \"%s\" in publication column list", + colname)); + + if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference generated column \"%s\" in publication column list", + colname)); + + if (bms_is_member(attnum, set)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("duplicate column \"%s\" in publication column list", + colname)); + + set = bms_add_member(set, attnum); + attarray[n++] = attnum; + } + + /* Be tidy, so that the catalog representation is always sorted */ + qsort(attarray, n, sizeof(AttrNumber), compare_int16); + + *natts = n; + *attrs = attarray; + + bms_free(set); +} + +/* + * Transform the column list (represented by an array) to a bitmapset. + */ +Bitmapset * +pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt) +{ + Bitmapset *result = NULL; + ArrayType *arr; + int nelems; + int16 *elems; + MemoryContext oldcxt; + + /* + * If an existing bitmap was provided, use it. Otherwise just use NULL + * and build a new bitmap. + */ + if (columns) + result = columns; + + arr = DatumGetArrayTypeP(pubcols); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + /* If a memory context was specified, switch to it. */ + if (mcxt) + oldcxt = MemoryContextSwitchTo(mcxt); + + for (int i = 0; i < nelems; i++) + result = bms_add_member(result, elems[i]); + + if (mcxt) + MemoryContextSwitchTo(oldcxt); + + return result; +} + /* * Insert new publication / schema mapping. */ diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index e449e8e8f2..84e37df783 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -342,7 +342,7 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context) * Returns true if any invalid column is found. */ bool -contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, +pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot) { HeapTuple rftuple; @@ -411,6 +411,114 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, return result; } +/* + * Check if all columns referenced in the REPLICA IDENTITY are covered by + * the column list. + * + * Returns true if any replica identity column is not covered by column list. + */ +bool +pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, + bool pubviaroot) +{ + HeapTuple tuple; + Oid relid = RelationGetRelid(relation); + Oid publish_as_relid = RelationGetRelid(relation); + bool result = false; + Datum datum; + bool isnull; + + /* + * For a partition, if pubviaroot is true, find the topmost ancestor that + * is published via this publication as we need to use its column list + * for the changes. + * + * Note that even though the column list used is for an ancestor, the + * REPLICA IDENTITY used will be for the actual child table. + */ + if (pubviaroot && relation->rd_rel->relispartition) + { + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + + if (!OidIsValid(publish_as_relid)) + publish_as_relid = relid; + } + + tuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pubid)); + + if (!HeapTupleIsValid(tuple)) + return false; + + datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + + if (!isnull) + { + int x; + Bitmapset *idattrs; + Bitmapset *columns = NULL; + + /* With REPLICA IDENTITY FULL, no column list is allowed. */ + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + result = true; + + /* Transform the column list datum to a bitmapset. */ + columns = pub_collist_to_bitmapset(NULL, datum, NULL); + + /* Remember columns that are part of the REPLICA IDENTITY */ + idattrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* + * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are + * offset (to handle system columns the usual way), while column list + * does not use offset, so we can't do bms_is_subset(). Instead, we have + * to loop over the idattrs and check all of them are in the list. + */ + x = -1; + while ((x = bms_next_member(idattrs, x)) >= 0) + { + AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); + + /* + * If pubviaroot is true, we are validating the column list of the + * parent table, but the bitmap contains the replica identity + * information of the child table. The parent/child attnums may not + * match, so translate them to the parent - get the attname from + * the child, and look it up in the parent. + */ + if (pubviaroot) + { + /* attribute name in the child table */ + char *colname = get_attname(relid, attnum, false); + + /* + * Determine the attnum for the attribute name in parent (we + * are using the column list defined on the parent). + */ + attnum = get_attnum(publish_as_relid, colname); + } + + /* replica identity column, not covered by the column list */ + if (!bms_is_member(attnum, columns)) + { + result = true; + break; + } + } + + bms_free(idattrs); + bms_free(columns); + } + + ReleaseSysCache(tuple); + + return result; +} + /* check_functions_in_node callback */ static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context) @@ -652,6 +760,39 @@ TransformPubWhereClauses(List *tables, const char *queryString, } } + +/* + * Check the publication column lists expression for all relations in the list. + */ +static void +CheckPubRelationColumnList(List *tables, const char *queryString, + bool pubviaroot) +{ + ListCell *lc; + + foreach(lc, tables) + { + PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); + + if (pri->columns == NIL) + continue; + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's column list will be used. So disallow using + * the column list on partitioned table in this case. + */ + if (!pubviaroot && + pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use publication column list for relation \"%s\"", + RelationGetRelationName(pri->relation)), + errdetail("column list cannot be used for a partitioned table when %s is false.", + "publish_via_partition_root"))); + } +} + /* * Create new publication. */ @@ -808,6 +949,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); + CheckPubRelationColumnList(rels, pstate->p_sourcetext, + publish_via_partition_root); + PublicationAddRelations(puboid, rels, true, NULL); CloseRelationList(rels); } @@ -895,8 +1039,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, /* * If the publication doesn't publish changes via the root partitioned - * table, the partition's row filter will be used. So disallow using WHERE - * clause on partitioned table in this case. + * table, the partition's row filter and column list will be used. So disallow + * using WHERE clause and column lists on partitioned table in this case. */ if (!pubform->puballtables && publish_via_partition_root_given && !publish_via_partition_root) @@ -904,7 +1048,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, /* * Lock the publication so nobody else can do anything with it. This * prevents concurrent alter to add partitioned table(s) with WHERE - * clause(s) which we don't allow when not publishing via root. + * clause(s) and/or column lists which we don't allow when not + * publishing via root. */ LockDatabaseObject(PublicationRelationId, pubform->oid, 0, AccessShareLock); @@ -917,13 +1062,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, { HeapTuple rftuple; Oid relid = lfirst_oid(lc); + bool has_column_list; + bool has_row_filter; rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubform->oid)); + has_row_filter + = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL); + + has_column_list + = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL); + if (HeapTupleIsValid(rftuple) && - !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL)) + (has_row_filter || has_column_list)) { HeapTuple tuple; @@ -932,7 +1085,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, { Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple); - if (relform->relkind == RELKIND_PARTITIONED_TABLE) + if ((relform->relkind == RELKIND_PARTITIONED_TABLE) && + has_row_filter) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot set %s for publication \"%s\"", @@ -943,6 +1097,18 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, NameStr(relform->relname), "publish_via_partition_root"))); + if ((relform->relkind == RELKIND_PARTITIONED_TABLE) && + has_column_list) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set %s for publication \"%s\"", + "publish_via_partition_root = false", + stmt->pubname), + errdetail("The publication contains a column list for a partitioned table \"%s\" " + "which is not allowed when %s is false.", + NameStr(relform->relname), + "publish_via_partition_root"))); + ReleaseSysCache(tuple); } @@ -1107,6 +1273,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + PublicationAddRelations(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) @@ -1124,6 +1292,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + /* * To recreate the relation list for the publication, look for * existing relations that do not need to be dropped. @@ -1135,42 +1305,79 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PublicationRelInfo *oldrel; bool found = false; HeapTuple rftuple; - bool rfisnull = true; Node *oldrelwhereclause = NULL; + Bitmapset *oldcolumns = NULL; /* look up the cache for the old relmap */ rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(oldrelid), ObjectIdGetDatum(pubid)); + /* + * See if the existing relation currently has a WHERE clause or a + * column list. We need to compare those too. + */ if (HeapTupleIsValid(rftuple)) { + bool isnull = true; Datum whereClauseDatum; + Datum columnListDatum; + /* Load the WHERE clause for this table. */ whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, - &rfisnull); - if (!rfisnull) + &isnull); + if (!isnull) oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum)); + /* Transform the int2vector column list to a bitmap. */ + columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prattrs, + &isnull); + + if (!isnull) + oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL); + ReleaseSysCache(rftuple); } foreach(newlc, rels) { PublicationRelInfo *newpubrel; + Oid newrelid; + Bitmapset *newcolumns = NULL; newpubrel = (PublicationRelInfo *) lfirst(newlc); + newrelid = RelationGetRelid(newpubrel->relation); + + /* + * If the new publication has column list, transform it to + * a bitmap too. + */ + if (newpubrel->columns) + { + ListCell *lc; + + foreach(lc, newpubrel->columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(newrelid, colname); + + newcolumns = bms_add_member(newcolumns, attnum); + } + } /* * Check if any of the new set of relations matches with the * existing relations in the publication. Additionally, if the * relation has an associated WHERE clause, check the WHERE - * expressions also match. Drop the rest. + * expressions also match. Same for the column list. Drop the + * rest. */ if (RelationGetRelid(newpubrel->relation) == oldrelid) { - if (equal(oldrelwhereclause, newpubrel->whereClause)) + if (equal(oldrelwhereclause, newpubrel->whereClause) && + bms_equal(oldcolumns, newcolumns)) { found = true; break; @@ -1186,6 +1393,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, { oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; + oldrel->columns = NIL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1401,6 +1609,7 @@ AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, { oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; + oldrel->columns = NULL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1660,6 +1869,7 @@ OpenRelationList(List *rels, char objectType) List *result = NIL; ListCell *lc; List *relids_with_rf = NIL; + List *relids_with_collist = NIL; /* * Open, share-lock, and check all the explicitly-specified relations @@ -1710,6 +1920,13 @@ OpenRelationList(List *rels, char objectType) errmsg("conflicting or redundant WHERE clauses for table \"%s\"", RelationGetRelationName(rel)))); + /* Disallow duplicate tables if there are any with column lists. */ + if (t->columns || list_member_oid(relids_with_collist, myrelid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant column lists for table \"%s\"", + RelationGetRelationName(rel)))); + table_close(rel, ShareUpdateExclusiveLock); continue; } @@ -1717,12 +1934,16 @@ OpenRelationList(List *rels, char objectType) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; + pub_rel->columns = t->columns; result = lappend(result, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) relids_with_rf = lappend_oid(relids_with_rf, myrelid); + if (t->columns) + relids_with_collist = lappend_oid(relids_with_collist, myrelid); + /* * Add children of this rel, if requested, so that they too are added * to the publication. A partitioned table can't have any inheritance @@ -1762,6 +1983,18 @@ OpenRelationList(List *rels, char objectType) errmsg("conflicting or redundant WHERE clauses for table \"%s\"", RelationGetRelationName(rel)))); + /* + * We don't allow to specify column list for both parent + * and child table at the same time as it is not very + * clear which one should be given preference. + */ + if (childrelid != myrelid && + (t->columns || list_member_oid(relids_with_collist, childrelid))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant column lists for table \"%s\"", + RelationGetRelationName(rel)))); + continue; } @@ -1771,11 +2004,16 @@ OpenRelationList(List *rels, char objectType) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; + /* child inherits column list from parent */ + pub_rel->columns = t->columns; result = lappend(result, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) relids_with_rf = lappend_oid(relids_with_rf, childrelid); + + if (t->columns) + relids_with_collist = lappend_oid(relids_with_collist, childrelid); } } } @@ -1884,6 +2122,11 @@ PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); + if (pubrel->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 0df7cf5874..1a4fbdc38c 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -574,9 +574,6 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) if (cmd != CMD_UPDATE && cmd != CMD_DELETE) return; - if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - return; - /* * It is only safe to execute UPDATE/DELETE when all columns, referenced * in the row filters from publications which the relation is in, are @@ -596,17 +593,33 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) errmsg("cannot update table \"%s\"", RelationGetRelationName(rel)), errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot update table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column list used by the publication does not cover the replica identity."))); else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot delete from table \"%s\"", RelationGetRelationName(rel)), errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot delete from table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column list used by the publication does not cover the replica identity."))); /* If relation has replica identity we are always good. */ if (OidIsValid(RelationGetReplicaIndex(rel))) return; + /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */ + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + return; + /* * This is UPDATE/DELETE and there is no replica identity. * diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 55f720a88f..e38ff4000f 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4850,6 +4850,7 @@ _copyPublicationTable(const PublicationTable *from) COPY_NODE_FIELD(relation); COPY_NODE_FIELD(whereClause); + COPY_NODE_FIELD(columns); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 82562eb9b8..0f330e3c70 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2322,6 +2322,7 @@ _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(whereClause); + COMPARE_NODE_FIELD(columns); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index f8301541c9..945a9ada8b 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9749,13 +9749,14 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr OptWhereClause + TABLE relation_expr opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; - $$->pubtable->whereClause = $3; + $$->pubtable->columns = $3; + $$->pubtable->whereClause = $4; } | ALL TABLES IN_P SCHEMA ColId { @@ -9790,11 +9791,15 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA; $$->location = @5; } - | ColId OptWhereClause + | ColId opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - if ($2) + /* + * If either a row filter or column list is specified, create + * a PublicationTable object. + */ + if ($2 || $3) { /* * The OptWhereClause must be stored here but it is @@ -9804,7 +9809,8 @@ PublicationObjSpec: */ $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVar(NULL, $1, @1); - $$->pubtable->whereClause = $2; + $$->pubtable->columns = $2; + $$->pubtable->whereClause = $3; } else { @@ -9812,23 +9818,25 @@ PublicationObjSpec: } $$->location = @1; } - | ColId indirection OptWhereClause + | ColId indirection opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); - $$->pubtable->whereClause = $3; + $$->pubtable->columns = $3; + $$->pubtable->whereClause = $4; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr OptWhereClause + | extended_relation_expr opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; - $$->pubtable->whereClause = $2; + $$->pubtable->columns = $2; + $$->pubtable->whereClause = $3; } | CURRENT_SCHEMA { @@ -17524,6 +17532,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) errmsg("WHERE clause not allowed for schema"), parser_errposition(pubobj->location)); + /* Column list is not allowed on a schema object */ + if (pubobj->pubtable && pubobj->pubtable->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column specification not allowed for schema"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3dbe85d61a..18d3cbb924 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,16 +29,30 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, + Bitmapset *columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary); + bool binary, Bitmapset *columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); +/* + * Check if a column is covered by a column list. + * + * Need to be careful about NULL, which is treated as a column list covering + * all columns. + */ +static bool +column_in_column_list(int attnum, Bitmapset *columns) +{ + return (columns == NULL || bms_is_member(attnum, columns)); +} + + /* * Write BEGIN to the output stream. */ @@ -398,7 +412,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary) + TupleTableSlot *newslot, bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +424,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -443,7 +457,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary) + bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -464,11 +478,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary); + logicalrep_write_tuple(out, rel, oldslot, binary, NULL); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -537,7 +551,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary); + logicalrep_write_tuple(out, rel, oldslot, binary, NULL); } /* @@ -702,7 +716,8 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, + Bitmapset *columns) { char *relname; @@ -724,7 +739,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, columns); } /* @@ -801,7 +816,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary) + bool binary, Bitmapset *columns) { TupleDesc desc; Datum *values; @@ -813,8 +828,14 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) continue; + + if (!column_in_column_list(att->attnum, columns)) + continue; + nliveatts++; } pq_sendint16(out, nliveatts); @@ -833,6 +854,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, if (att->attisdropped || att->attgenerated) continue; + if (!column_in_column_list(att->attnum, columns)) + continue; + if (isnull[i]) { pq_sendbyte(out, LOGICALREP_COLUMN_NULL); @@ -954,7 +978,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { TupleDesc desc; int i; @@ -967,8 +991,14 @@ logicalrep_write_attrs(StringInfo out, Relation rel) /* send number of live attributes */ for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) continue; + + if (!column_in_column_list(att->attnum, columns)) + continue; + nliveatts++; } pq_sendint16(out, nliveatts); @@ -987,6 +1017,9 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; + if (!column_in_column_list(att->attnum, columns)) + continue; + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d8b12d94bc..697fb23634 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -113,6 +113,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "utils/acl.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -702,12 +703,13 @@ fetch_remote_table_info(char *nspname, char *relname, StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; ListCell *lc; bool first; + Bitmapset *included_cols = NULL; lrel->nspname = nspname; lrel->relname = relname; @@ -748,10 +750,110 @@ fetch_remote_table_info(char *nspname, char *relname, ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - /* Now fetch columns. */ + + /* + * Get column lists for each relation. + * + * For initial synchronization, column lists can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any column list + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as ALL TABLES IN + * SCHEMA that includes this relation + * + * We need to do this before fetching info about column names and types, + * so that we can skip columns that should not be replicated. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + WalRcvExecResult *pubres; + TupleTableSlot *slot; + Oid attrsRow[] = {INT2OID}; + StringInfoData pub_names; + bool first = true; + + initStringInfo(&pub_names); + foreach(lc, MySubscription->publications) + { + if (!first) + appendStringInfo(&pub_names, ", "); + appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc)))); + first = false; + } + + /* + * Fetch info about column lists for the relation (from all the + * publications). We unnest the int2vector values, because that + * makes it easier to combine lists by simply adding the attnums + * to a new bitmap (without having to parse the int2vector data). + * This preserves NULL values, so that if one of the publications + * has no column list, we'll know that. + */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT unnest" + " FROM pg_publication p" + " LEFT OUTER JOIN pg_publication_rel pr" + " ON (p.oid = pr.prpubid AND pr.prrelid = %u)" + " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + lrel->remoteid, + pub_names.data); + + pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrsRow), attrsRow); + + if (pubres->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s", + nspname, relname, pubres->err))); + + /* + * Merge the column lists (from different publications) by creating + * a single bitmap with all the attnums. If we find a NULL value, + * that means one of the publications has no column list for the + * table we're syncing. + */ + slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + { + Datum cfval = slot_getattr(slot, 1, &isnull); + + /* NULL means empty column list, so we're done. */ + if (isnull) + { + bms_free(included_cols); + included_cols = NULL; + break; + } + + included_cols = bms_add_member(included_cols, + DatumGetInt16(cfval)); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(pubres); + + pfree(pub_names.data); + } + + /* + * Now fetch column names and types. + */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT a.attname," + "SELECT a.attnum," + " a.attname," " a.atttypid," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" @@ -779,16 +881,35 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + /* + * Store the columns as a list of names. Ignore those that are not + * present in the column list, if there is one. + */ natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + + /* If the column is not in the column list, skip it. */ + if (included_cols != NULL && !bms_is_member(attnum, included_cols)) + { + ExecClearTuple(slot); + continue; + } + + rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 3, &isnull))) + + lrel->attnames[natt] = rel_colname; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ @@ -931,8 +1052,24 @@ copy_table(Relation rel) /* Regular table with no row filter */ if (lrel.relkind == RELKIND_RELATION && qual == NIL) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + + /* + * XXX Do we need to list the columns in all cases? Maybe we're replicating + * all columns? + */ + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 292e7299d8..893833ea83 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -30,6 +30,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -90,7 +91,8 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -148,9 +150,6 @@ typedef struct RelationSyncEntry */ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; EState *estate; /* executor state used for row filter */ - MemoryContext cache_expr_cxt; /* private context for exprstate and - * estate, if any */ - TupleTableSlot *new_slot; /* slot for storing new tuple */ TupleTableSlot *old_slot; /* slot for storing old tuple */ @@ -169,6 +168,19 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ AttrMap *attrmap; + + /* + * Columns included in the publication, or NULL if all columns are + * included implicitly. Note that the attnums in this bitmap are not + * shifted by FirstLowInvalidHeapAttributeNumber. + */ + Bitmapset *columns; + + /* + * Private context to store additional data for this entry - state for + * the row filter expressions, column list, etc. + */ + MemoryContext entry_cxt; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -200,6 +212,11 @@ static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, RelationSyncEntry *entry, ReorderBufferChangeType *action); +/* column list routines */ +static void pgoutput_column_list_init(PGOutputData *data, + List *publications, + RelationSyncEntry *entry); + /* * Specify output plugin callbacks */ @@ -622,11 +639,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->columns); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -639,7 +656,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -662,13 +680,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; + /* Skip this attribute if it's not present in the column list */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, columns); OutputPluginWrite(ctx, false); } @@ -722,6 +744,28 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) return DatumGetBool(ret); } +/* + * Make sure the per-entry memory context exists. + */ +static void +pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry) +{ + Relation relation; + + /* The context may already exist, in which case bail out. */ + if (entry->entry_cxt) + return; + + relation = RelationIdGetRelation(entry->publish_as_relid); + + entry->entry_cxt = AllocSetContextCreate(data->cachectx, + "entry private context", + ALLOCSET_SMALL_SIZES); + + MemoryContextCopyAndSetIdentifier(entry->entry_cxt, + RelationGetRelationName(relation)); +} + /* * Initialize the row filter. */ @@ -842,21 +886,13 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, { Relation relation = RelationIdGetRelation(entry->publish_as_relid); - Assert(entry->cache_expr_cxt == NULL); - - /* Create the memory context for row filters */ - entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx, - "Row filter expressions", - ALLOCSET_DEFAULT_SIZES); - - MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt, - RelationGetRelationName(relation)); + pgoutput_ensure_entry_cxt(data, entry); /* * Now all the filters for all pubactions are known. Combine them when * their pubactions are the same. */ - oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt); + oldctx = MemoryContextSwitchTo(entry->entry_cxt); entry->estate = create_estate_for_relation(relation); for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) { @@ -879,6 +915,105 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, } } +/* + * Initialize the column list. + */ +static void +pgoutput_column_list_init(PGOutputData *data, List *publications, + RelationSyncEntry *entry) +{ + ListCell *lc; + + /* + * Find if there are any column lists for this relation. If there are, + * build a bitmap merging all the column lists. + * + * All the given publication-table mappings must be checked. + * + * Multiple publications might have multiple column lists for this relation. + * + * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column + * list" so it takes precedence. + */ + foreach(lc, publications) + { + Publication *pub = lfirst(lc); + HeapTuple cftuple = NULL; + Datum cfdatum = 0; + + /* + * Assume there's no column list. Only if we find pg_publication_rel + * entry with a column list we'll switch it to false. + */ + bool pub_no_list = true; + + /* + * If the publication is FOR ALL TABLES then it is treated the same as if + * there are no column lists (even if other publications have a list). + */ + if (!pub->alltables) + { + /* + * Check for the presence of a column list in this publication. + * + * Note: If we find no pg_publication_rel row, it's a publication + * defined for a whole schema, so it can't have a column list, just + * like a FOR ALL TABLES publication. + */ + cftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(entry->publish_as_relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(cftuple)) + { + /* + * Lookup the column list attribute. + * + * Note: We update the pub_no_list value directly, because if + * the value is NULL, we have no list (and vice versa). + */ + cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, + Anum_pg_publication_rel_prattrs, + &pub_no_list); + + /* + * Build the column list bitmap in the per-entry context. + * + * We need to merge column lists from all publications, so we + * update the same bitmapset. If the column list is null, we + * interpret it as replicating all columns. + */ + if (!pub_no_list) /* when not null */ + { + pgoutput_ensure_entry_cxt(data, entry); + + entry->columns = pub_collist_to_bitmapset(entry->columns, + cfdatum, + entry->entry_cxt); + } + } + } + + /* + * Found a publication with no column list, so we're done. But first + * discard column list we might have from preceding publications. + */ + if (pub_no_list) + { + if (cftuple) + ReleaseSysCache(cftuple); + + bms_free(entry->columns); + entry->columns = NULL; + + break; + } + + ReleaseSysCache(cftuple); + } /* loop all subscribed publications */ + +} + /* * Initialize the slot for storing new and old tuples, and build the map that * will be used to convert the relation's tuples into the ancestor's format. @@ -1243,7 +1378,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary); + data->binary, relentry->columns); OutputPluginWrite(ctx, true); break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -1297,11 +1432,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, - new_slot, data->binary); + new_slot, data->binary, + relentry->columns); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, - old_slot, new_slot, data->binary); + old_slot, new_slot, data->binary, + relentry->columns); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, @@ -1794,8 +1931,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); - entry->cache_expr_cxt = NULL; + entry->entry_cxt = NULL; entry->publish_as_relid = InvalidOid; + entry->columns = NULL; entry->attrmap = NULL; } @@ -1841,6 +1979,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->columns); + entry->columns = NULL; entry->pubactions.pubinsert = false; entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; @@ -1865,17 +2005,18 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* * Row filter cache cleanups. */ - if (entry->cache_expr_cxt) - MemoryContextDelete(entry->cache_expr_cxt); + if (entry->entry_cxt) + MemoryContextDelete(entry->entry_cxt); - entry->cache_expr_cxt = NULL; + entry->entry_cxt = NULL; entry->estate = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); /* * Build publication cache. We can't use one provided by relcache as - * relcache considers all publications given relation is in, but here - * we only need to consider ones that the subscriber requested. + * relcache considers all publications that the given relation is in, + * but here we only need to consider ones that the subscriber + * requested. */ foreach(lc, data->publications) { @@ -1946,6 +2087,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* + * If the relation is to be published, determine actions to + * publish, and list of columns, if appropriate. + * * Don't publish changes for partitioned tables, because * publishing those of its partitions suffices, unless partition * changes won't be published due to pubviaroot being set. @@ -2007,6 +2151,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* Initialize the row filter */ pgoutput_row_filter_init(data, rel_publications, entry); + + /* Initialize the column list */ + pgoutput_column_list_init(data, rel_publications, entry); } list_free(pubids); diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 4f3fe1118a..d47fac7bb9 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5575,6 +5575,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) memset(pubdesc, 0, sizeof(PublicationDesc)); pubdesc->rf_valid_for_update = true; pubdesc->rf_valid_for_delete = true; + pubdesc->cols_valid_for_update = true; + pubdesc->cols_valid_for_delete = true; return; } @@ -5587,6 +5589,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) memset(pubdesc, 0, sizeof(PublicationDesc)); pubdesc->rf_valid_for_update = true; pubdesc->rf_valid_for_delete = true; + pubdesc->cols_valid_for_update = true; + pubdesc->cols_valid_for_delete = true; /* Fetch the publication membership info. */ puboids = GetRelationPublications(relid); @@ -5657,7 +5661,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) */ if (!pubform->puballtables && (pubform->pubupdate || pubform->pubdelete) && - contain_invalid_rfcolumn(pubid, relation, ancestors, + pub_rf_contains_invalid_column(pubid, relation, ancestors, pubform->pubviaroot)) { if (pubform->pubupdate) @@ -5666,6 +5670,23 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->rf_valid_for_delete = false; } + /* + * Check if all columns are part of the REPLICA IDENTITY index or not. + * + * If the publication is FOR ALL TABLES then it means the table has no + * column list and we can skip the validation. + */ + if (!pubform->puballtables && + (pubform->pubupdate || pubform->pubdelete) && + pub_collist_contains_invalid_column(pubid, relation, ancestors, + pubform->pubviaroot)) + { + if (pubform->pubupdate) + pubdesc->cols_valid_for_update = false; + if (pubform->pubdelete) + pubdesc->cols_valid_for_delete = false; + } + ReleaseSysCache(tup); /* @@ -5677,6 +5698,16 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate && !pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete) break; + + /* + * If we know everything is replicated and the column list is invalid + * for update and delete, there is no point to check for other + * publications. + */ + if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate && + pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate && + !pubdesc->cols_valid_for_update && !pubdesc->cols_valid_for_delete) + break; } if (relation->rd_pubdesc) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 00629f0836..535b160165 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4131,6 +4131,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_prpubid; int i_prrelid; int i_prrelqual; + int i_prattrs; int i, j, ntups; @@ -4144,12 +4145,20 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " - "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual " - "FROM pg_catalog.pg_publication_rel"); + "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, " + "(CASE\n" + " WHEN pr.prattrs IS NOT NULL THEN\n" + " (SELECT array_agg(attname)\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" + " ELSE NULL END) prattrs " + "FROM pg_catalog.pg_publication_rel pr"); else appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " - "NULL AS prrelqual " + "NULL AS prrelqual, NULL AS prattrs " "FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -4160,6 +4169,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); i_prrelqual = PQfnumber(res, "prrelqual"); + i_prattrs = PQfnumber(res, "prattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4205,6 +4215,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) else pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual)); + if (!PQgetisnull(res, i, i_prattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer attribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prattrs), + &attnames, &nattnames)) + fatal("could not parse %s array", "prattrs"); + attribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(attribs, ", "); + + appendPQExpBufferStr(attribs, fmtId(attnames[k])); + } + pubrinfo[j].pubrattrs = attribs->data; + } + else + pubrinfo[j].pubrattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4300,6 +4332,9 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) appendPQExpBuffer(query, " %s", fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrattrs) + appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + if (pubrinfo->pubrelqual) { /* diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 893725d121..688093c55e 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -634,6 +634,7 @@ typedef struct _PublicationRelInfo PublicationInfo *publication; TableInfo *pubtable; char *pubrelqual; + char *pubrattrs; } PublicationRelInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 0e724b0366..af5d6fa5a3 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2438,6 +2438,28 @@ my %tests = ( unlike => { exclude_dump_test_schema => 1, }, }, + 'ALTER PUBLICATION pub1 ADD TABLE test_sixth_table (col3, col2)' => { + create_order => 52, + create_sql => + 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_sixth_table (col3, col2);', + regexp => qr/^ + \QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_sixth_table (col2, col3);\E + /xm, + like => { %full_runs, section_post_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + + 'ALTER PUBLICATION pub1 ADD TABLE test_seventh_table (col3, col2) WHERE (col1 = 1)' => { + create_order => 52, + create_sql => + 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_seventh_table (col3, col2) WHERE (col1 = 1);', + regexp => qr/^ + \QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_seventh_table (col2, col3) WHERE ((col1 = 1));\E + /xm, + like => { %full_runs, section_post_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + 'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test' => { create_order => 51, create_sql => @@ -2809,6 +2831,44 @@ my %tests = ( unlike => { exclude_dump_test_schema => 1, }, }, + 'CREATE TABLE test_sixth_table' => { + create_order => 6, + create_sql => 'CREATE TABLE dump_test.test_sixth_table ( + col1 int, + col2 text, + col3 bytea + );', + regexp => qr/^ + \QCREATE TABLE dump_test.test_sixth_table (\E + \n\s+\Qcol1 integer,\E + \n\s+\Qcol2 text,\E + \n\s+\Qcol3 bytea\E + \n\); + /xm, + like => + { %full_runs, %dump_test_schema_runs, section_pre_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + + 'CREATE TABLE test_seventh_table' => { + create_order => 6, + create_sql => 'CREATE TABLE dump_test.test_seventh_table ( + col1 int, + col2 text, + col3 bytea + );', + regexp => qr/^ + \QCREATE TABLE dump_test.test_seventh_table (\E + \n\s+\Qcol1 integer,\E + \n\s+\Qcol2 text,\E + \n\s+\Qcol3 bytea\E + \n\); + /xm, + like => + { %full_runs, %dump_test_schema_runs, section_pre_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + 'CREATE TABLE test_table_identity' => { create_order => 3, create_sql => 'CREATE TABLE dump_test.test_table_identity ( diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index b8a532a45f..4dddf08789 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -2960,6 +2960,7 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" @@ -2967,6 +2968,12 @@ describeOneTableDetails(const char *schemaname, "UNION\n" "SELECT pubname\n" " , pg_get_expr(pr.prqual, c.oid)\n" + " , (CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " (SELECT string_agg(attname, ', ')\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" + " ELSE NULL END) " "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" @@ -2974,6 +2981,7 @@ describeOneTableDetails(const char *schemaname, "UNION\n" "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" "ORDER BY 1;", @@ -2984,12 +2992,14 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" "WHERE pr.prrelid = '%s'\n" "UNION ALL\n" "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" "ORDER BY 1;", @@ -3011,6 +3021,11 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, " \"%s\"", PQgetvalue(result, i, 0)); + /* column list (if any) */ + if (!PQgetisnull(result, i, 2)) + appendPQExpBuffer(&buf, " (%s)", + PQgetvalue(result, i, 2)); + /* row filter (if any) */ if (!PQgetisnull(result, i, 1)) appendPQExpBuffer(&buf, " WHERE %s", @@ -5979,7 +5994,7 @@ listPublications(const char *pattern) */ static bool addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, - bool singlecol, printTableContent *cont) + bool as_schema, printTableContent *cont) { PGresult *res; int count = 0; @@ -5996,15 +6011,19 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, for (i = 0; i < count; i++) { - if (!singlecol) + if (as_schema) + printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); + else { printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), PQgetvalue(res, i, 1)); + + if (!PQgetisnull(res, i, 3)) + appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 3)); + if (!PQgetisnull(res, i, 2)) appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2)); } - else - printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); printTableAddFooter(cont, buf->data); } @@ -6155,11 +6174,22 @@ describePublications(const char *pattern) printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname"); if (pset.sversion >= 150000) + { appendPQExpBufferStr(&buf, ", pg_get_expr(pr.prqual, c.oid)"); + appendPQExpBufferStr(&buf, + ", (CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " pg_catalog.array_to_string(" + " ARRAY(SELECT attname\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n" + " ELSE NULL END)"); + } else appendPQExpBufferStr(&buf, - ", NULL"); + ", NULL, NULL"); appendPQExpBuffer(&buf, "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" @@ -6189,9 +6219,9 @@ describePublications(const char *pattern) if (!puballsequences) { - /* Get the tables for the specified publication */ + /* Get the sequences for the specified publication */ printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname, NULL\n" + "SELECT n.nspname, c.relname, NULL, NULL\n" "FROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 97f26208e1..186d8ea74b 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -95,6 +95,13 @@ typedef struct PublicationDesc */ bool rf_valid_for_update; bool rf_valid_for_delete; + + /* + * true if the columns are part of the replica identity or the publication actions + * do not include UPDATE or DELETE. + */ + bool cols_valid_for_update; + bool cols_valid_for_delete; } PublicationDesc; typedef struct Publication @@ -111,6 +118,7 @@ typedef struct PublicationRelInfo { Relation relation; Node *whereClause; + List *columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -137,6 +145,7 @@ extern List *GetPublicationRelations(Oid pubid, char objectType, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern void GetActionsInPublication(Oid pubid, PublicationActions *actions); extern List *GetPublicationSchemas(Oid pubid, char objectType); extern List *GetSchemaPublications(Oid schemaid, char objectType); extern List *GetSchemaPublicationRelations(Oid schemaid, char objectType, @@ -160,6 +169,9 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, char objectType, bool if_not_exists); +extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, + MemoryContext mcxt); + extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 0dd0f425db..4feb581899 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -34,6 +34,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ + int2vector prattrs; /* columns to replicate */ #endif } FormData_pg_publication_rel; diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 7813cbcb6b..ae87caf089 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -31,7 +31,9 @@ extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); extern void InvalidatePublicationRels(List *relids); -extern bool contain_invalid_rfcolumn(Oid pubid, Relation relation, +extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, + List *ancestors, bool pubviaroot); +extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot); #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index cb1fcc0ee3..5a458c42e5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3654,6 +3654,7 @@ typedef struct PublicationTable NodeTag type; RangeVar *relation; /* relation to be published */ Node *whereClause; /* qualifications */ + List *columns; /* List of columns in a publication table */ } PublicationTable; /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index fb86ca022d..13ee10fdd4 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -222,12 +222,12 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, - bool binary); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - TupleTableSlot *newslot, bool binary); + TupleTableSlot *newslot, bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -250,7 +250,7 @@ extern void logicalrep_write_sequence(StringInfo out, Relation rel, bool is_called); extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index a5a519d6c8..0308e40ba6 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -1137,6 +1137,369 @@ DROP TABLE rf_tbl_abcd_pk; DROP TABLE rf_tbl_abcd_nopk; DROP TABLE rf_tbl_abcd_part_pk; -- ====================================================== +-- fail - duplicate tables are not allowed if that table has any column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert'); +ERROR: conflicting or redundant column lists for table "testpub_tbl1" +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert'); +ERROR: conflicting or redundant column lists for table "testpub_tbl1" +RESET client_min_messages; +-- test for column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1; +CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert'); +RESET client_min_messages; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text, + d int generated always as (a + length(b)) stored); +-- error: column "x" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); +ERROR: column "x" of relation "testpub_tbl5" does not exist +-- error: replica identity "a" not included in the column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); +UPDATE testpub_tbl5 SET a = 1; +ERROR: cannot update table "testpub_tbl5" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- error: generated column "d" can't be in list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ERROR: cannot reference generated column "d" in publication column list +-- error: system attributes "ctid" not allowed in column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); +ERROR: cannot reference system column "ctid" in publication column list +-- ok +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice +ERROR: cannot drop column c of table testpub_tbl5 because other objects depend on it +DETAIL: publication of table testpub_tbl5 in publication testpub_fortable depends on column c of table testpub_tbl5 +HINT: Use DROP ... CASCADE to drop the dependent objects too. +-- ok: for insert-only publication, any column list is acceptable +ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c); +/* not all replica identities are good enough */ +CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c); +ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +-- error: replica identity (b,c) is not covered by column list (a, c) +UPDATE testpub_tbl5 SET a = 1; +ERROR: cannot update table "testpub_tbl5" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- error: change the replica identity to "b", and column list to (a, c) +-- then update fails, because (a, c) does not cover replica identity +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +UPDATE testpub_tbl5 SET a = 1; +ERROR: cannot update table "testpub_tbl5" +DETAIL: Column list used by the publication does not cover the replica identity. +/* But if upd/del are not published, it works OK */ +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); +RESET client_min_messages; +ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok +\dRp+ testpub_table_ins + Publication testpub_table_ins + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | t | f | f +Tables: + "public.testpub_tbl5" (a) + +-- tests with REPLICA IDENTITY FULL +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); +UPDATE testpub_tbl6 SET a = 1; +ERROR: cannot update table "testpub_tbl6" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok +UPDATE testpub_tbl6 SET a = 1; +-- make sure changing the column list is propagated to the catalog +CREATE TABLE testpub_tbl7 (a int primary key, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 + Table "public.testpub_tbl7" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+----------+--------------+------------- + a | integer | | not null | | plain | | + b | text | | | | extended | | + c | text | | | | extended | | +Indexes: + "testpub_tbl7_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_fortable" (a, b) + +-- ok: the column list is the same, we should skip this table (or at least not fail) +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 + Table "public.testpub_tbl7" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+----------+--------------+------------- + a | integer | | not null | | plain | | + b | text | | | | extended | | + c | text | | | | extended | | +Indexes: + "testpub_tbl7_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_fortable" (a, b) + +-- ok: the column list changes, make sure the catalog gets updated +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c); +\d+ testpub_tbl7 + Table "public.testpub_tbl7" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+----------+--------------+------------- + a | integer | | not null | | plain | | + b | text | | | | extended | | + c | text | | | | extended | | +Indexes: + "testpub_tbl7_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_fortable" (a, c) + +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +-- ok: column list covers both "a" and "b" +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true'); +RESET client_min_messages; +-- ok: the same thing, but try plain ADD TABLE +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +UPDATE testpub_tbl8 SET a = 1; +-- failure: column list does not cover replica identity for the second partition +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +-- failure: one of the partitions has REPLICA IDENTITY FULL +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +-- add table and then try changing replica identity +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +-- failure: replica identity full can't be used with a column list +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +-- failure: replica identity has to be covered by the column list +ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey; +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP TABLE testpub_tbl8; +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 (a int, b text, c text); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 (a int, b text, c text); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +-- ok: attaching first partition works, because (a) is in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0); +-- failure: second partition has replica identity (c), which si not in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1); +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +-- failure: changing replica identity to FULL for partition fails, because +-- of the column list on the parent +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_0" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1; +DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list; +-- ====================================================== +-- Test combination of column list and row filter +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_both_filters; +RESET client_min_messages; +CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c)); +ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey; +ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1); +\dRp+ testpub_both_filters + Publication testpub_both_filters + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1) + +\d+ testpub_tbl_both_filters + Table "public.testpub_tbl_both_filters" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | + c | integer | | not null | | plain | | +Indexes: + "testpub_tbl_both_filters_pkey" PRIMARY KEY, btree (a, c) REPLICA IDENTITY +Publications: + "testpub_both_filters" (a, c) WHERE (c <> 1) + +DROP TABLE testpub_tbl_both_filters; +DROP PUBLICATION testpub_both_filters; +-- ====================================================== +-- More column list tests for validating column references +CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int); +CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b)); +CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a); +CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY); +ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10); +-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing) +-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b); +RESET client_min_messages; +-- ok - (a,b) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c); +-- ok - (a,b,c) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - "b" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b); +-- fail - "a" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- ok - there's no replica identity, so any column list works +-- note: it fails anyway, just a bit later because UPDATE requires RI +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +-- Case 2. REPLICA IDENTITY FULL +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" +DETAIL: Column list used by the publication does not cover the replica identity. +-- Case 3. REPLICA IDENTITY NOTHING +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +-- Case 4. REPLICA IDENTITY INDEX +ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c); +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c; +ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c); +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; +-- Tests for partitioned table +-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - cannot use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +ERROR: cannot use publication column list for relation "rf_tbl_abcd_part_pk" +DETAIL: column list cannot be used for a partitioned table when publish_via_partition_root is false. +-- ok - can use column list for partition +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is +-- used for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +ERROR: cannot set publish_via_partition_root = false for publication "testpub6" +DETAIL: The publication contains a column list for a partitioned table "rf_tbl_abcd_part_pk" which is not allowed when publish_via_partition_root is false. +-- Now change the root column list to use a column "b" +-- (which is not in the replica identity) +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b); +-- ok - we don't have column list for partitioned table. +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_part_pk_1" +DETAIL: Column list used by the publication does not cover the replica identity. +-- set PUBLISH_VIA_PARTITION_ROOT to true +-- can use column list for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_part_pk_1" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP PUBLICATION testpub6; +DROP TABLE rf_tbl_abcd_pk; +DROP TABLE rf_tbl_abcd_nopk; +DROP TABLE rf_tbl_abcd_part_pk; +-- ====================================================== -- Test cache invalidation FOR ALL TABLES publication SET client_min_messages = 'ERROR'; CREATE TABLE testpub_tbl4(a int); @@ -1582,6 +1945,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes Tables from schemas: "pub_test1" +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ERROR: syntax error at or near "(" +LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); + ^ +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); +ERROR: column specification not allowed for schema +LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)... + ^ -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 151b8be14e..96b02947fa 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -568,6 +568,289 @@ DROP TABLE rf_tbl_abcd_nopk; DROP TABLE rf_tbl_abcd_part_pk; -- ====================================================== +-- fail - duplicate tables are not allowed if that table has any column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert'); +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert'); +RESET client_min_messages; + +-- test for column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1; +CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert'); +RESET client_min_messages; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text, + d int generated always as (a + length(b)) stored); +-- error: column "x" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); +-- error: replica identity "a" not included in the column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); +UPDATE testpub_tbl5 SET a = 1; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- error: generated column "d" can't be in list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +-- error: system attributes "ctid" not allowed in column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); +-- ok +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice +-- ok: for insert-only publication, any column list is acceptable +ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c); + +/* not all replica identities are good enough */ +CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c); +ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +-- error: replica identity (b,c) is not covered by column list (a, c) +UPDATE testpub_tbl5 SET a = 1; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; + +-- error: change the replica identity to "b", and column list to (a, c) +-- then update fails, because (a, c) does not cover replica identity +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +UPDATE testpub_tbl5 SET a = 1; + +/* But if upd/del are not published, it works OK */ +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); +RESET client_min_messages; +ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok +\dRp+ testpub_table_ins + +-- tests with REPLICA IDENTITY FULL +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; + +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); +UPDATE testpub_tbl6 SET a = 1; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6; + +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok +UPDATE testpub_tbl6 SET a = 1; + +-- make sure changing the column list is propagated to the catalog +CREATE TABLE testpub_tbl7 (a int primary key, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 +-- ok: the column list is the same, we should skip this table (or at least not fail) +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 +-- ok: the column list changes, make sure the catalog gets updated +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c); +\d+ testpub_tbl7 + +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; + +-- ok: column list covers both "a" and "b" +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true'); +RESET client_min_messages; + +-- ok: the same thing, but try plain ADD TABLE +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +UPDATE testpub_tbl8 SET a = 1; + +-- failure: column list does not cover replica identity for the second partition +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; + +-- failure: one of the partitions has REPLICA IDENTITY FULL +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; + +-- add table and then try changing replica identity +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); + +-- failure: replica identity full can't be used with a column list +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; + +-- failure: replica identity has to be covered by the column list +ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey; +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +UPDATE testpub_tbl8 SET a = 1; + +DROP TABLE testpub_tbl8; + +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 (a int, b text, c text); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 (a int, b text, c text); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; + +-- ok: attaching first partition works, because (a) is in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0); +-- failure: second partition has replica identity (c), which si not in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1); +UPDATE testpub_tbl8 SET a = 1; + +-- failure: changing replica identity to FULL for partition fails, because +-- of the column list on the parent +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; + +DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1; +DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list; +-- ====================================================== + +-- Test combination of column list and row filter +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_both_filters; +RESET client_min_messages; +CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c)); +ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey; +ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1); +\dRp+ testpub_both_filters +\d+ testpub_tbl_both_filters + +DROP TABLE testpub_tbl_both_filters; +DROP PUBLICATION testpub_both_filters; +-- ====================================================== + +-- More column list tests for validating column references +CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int); +CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b)); +CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a); +CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY); +ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10); + +-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing) + +-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b); +RESET client_min_messages; +-- ok - (a,b) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c); +-- ok - (a,b,c) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - "b" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b); +-- fail - "a" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; + +-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- ok - there's no replica identity, so any column list works +-- note: it fails anyway, just a bit later because UPDATE requires RI +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Case 2. REPLICA IDENTITY FULL +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Case 3. REPLICA IDENTITY NOTHING +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Case 4. REPLICA IDENTITY INDEX +ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c); +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c; +ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c); +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Tests for partitioned table + +-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - cannot use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +-- ok - can use column list for partition +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is +-- used for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- Now change the root column list to use a column "b" +-- (which is not in the replica identity) +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b); +-- ok - we don't have column list for partitioned table. +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- set PUBLISH_VIA_PARTITION_ROOT to true +-- can use column list for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; + +DROP PUBLICATION testpub6; +DROP TABLE rf_tbl_abcd_pk; +DROP TABLE rf_tbl_abcd_nopk; +DROP TABLE rf_tbl_abcd_part_pk; +-- ====================================================== + -- Test cache invalidation FOR ALL TABLES publication SET client_min_messages = 'ERROR'; CREATE TABLE testpub_tbl4(a int); @@ -809,6 +1092,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); + -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl new file mode 100644 index 0000000000..c778840f4c --- /dev/null +++ b/src/test/subscription/t/031_column_list.pl @@ -0,0 +1,1131 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test partial-column publication of tables +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +sub wait_for_subscription_sync +{ + my ($node) = @_; + + # Also wait for initial table sync to finish + my $synced_query = "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + + $node->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +} + +# setup tables on both nodes + +# tab1: simple 1:1 replication +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int) +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int) +)); + +# tab2: replication from regular to table with fewer columns +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab2 (a int PRIMARY KEY, b varchar) +)); + +# tab3: simple 1:1 replication with weird column names +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab3 ("a'" int PRIMARY KEY, "B" varchar, "c'" int) +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab3 ("a'" int PRIMARY KEY, "c'" int) +)); + +# test_part: partitioned tables, with partitioning (including multi-level +# partitioning, and fewer columns on the subscriber) +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a); + CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6); + CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a); + CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a); + CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6); + CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a); + CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10); +)); + +# tab4: table with user-defined enum types +$node_publisher->safe_psql('postgres', qq( + CREATE TYPE test_typ AS ENUM ('blue', 'red'); + CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, c int, d text); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TYPE test_typ AS ENUM ('blue', 'red'); + CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, d text); +)); + + +# TEST: create publication and subscription for some of the tables with +# column lists +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub1 + FOR TABLE tab1 (a, "B"), tab3 ("a'", "c'"), test_part (a, b), tab4 (a, b, d) + WITH (publish_via_partition_root = 'true'); +)); + +# check that we got the right prattrs values for the publication in the +# pg_publication_rel catalog (order by relname, to get stable ordering) +my $result = $node_publisher->safe_psql('postgres', qq( + SELECT relname, prattrs + FROM pg_publication_rel pb JOIN pg_class pc ON(pb.prrelid = pc.oid) + ORDER BY relname +)); + +is($result, qq(tab1|1 2 +tab3|1 3 +tab4|1 2 4 +test_part|1 2), 'publication relation updated'); + +# TEST: insert data into the tables, create subscription and see if sync +# replicates the right columns +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab1 VALUES (1, 2, 3); + INSERT INTO tab1 VALUES (4, 5, 6); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab3 VALUES (1, 2, 3); + INSERT INTO tab3 VALUES (4, 5, 6); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab4 VALUES (1, 'red', 3, 'oh my'); + INSERT INTO tab4 VALUES (2, 'blue', 4, 'hello'); +)); + +# replication of partitioned table +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part VALUES (1, 'abc', '2021-07-04 12:00:00'); + INSERT INTO test_part VALUES (2, 'bcd', '2021-07-03 11:12:13'); + INSERT INTO test_part VALUES (7, 'abc', '2021-07-04 12:00:00'); + INSERT INTO test_part VALUES (8, 'bcd', '2021-07-03 11:12:13'); +)); + +# create subscription for the publication, wait for sync to complete, +# then check the sync results +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 +)); + +wait_for_subscription_sync($node_subscriber); + +# tab1: only (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1 ORDER BY a"); +is($result, qq(1|2| +4|5|), 'insert on column tab1.c is not replicated'); + +# tab3: only (a,c) is replicated +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab3 ORDER BY "a'")); +is($result, qq(1|3 +4|6), 'insert on column tab3.b is not replicated'); + +# tab4: only (a,b,d) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab4 ORDER BY a"); +is($result, qq(1|red|oh my +2|blue|hello), 'insert on column tab4.c is not replicated'); + +# test_part: (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part ORDER BY a"); +is($result, qq(1|abc +2|bcd +7|abc +8|bcd), 'insert on column test_part.c columns is not replicated'); + + +# TEST: now insert more data into the tables, and wait until we replicate +# them (not by tablesync, but regular decoding and replication) + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab1 VALUES (2, 3, 4); + INSERT INTO tab1 VALUES (5, 6, 7); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab3 VALUES (2, 3, 4); + INSERT INTO tab3 VALUES (5, 6, 7); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab4 VALUES (3, 'red', 5, 'foo'); + INSERT INTO tab4 VALUES (4, 'blue', 6, 'bar'); +)); + +# replication of partitioned table +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part VALUES (3, 'xxx', '2022-02-01 10:00:00'); + INSERT INTO test_part VALUES (4, 'yyy', '2022-03-02 15:12:13'); + INSERT INTO test_part VALUES (9, 'zzz', '2022-04-03 21:00:00'); + INSERT INTO test_part VALUES (10, 'qqq', '2022-05-04 22:12:13'); +)); + +# wait for catchup before checking the subscriber +$node_publisher->wait_for_catchup('sub1'); + +# tab1: only (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1 ORDER BY a"); +is($result, qq(1|2| +2|3| +4|5| +5|6|), 'insert on column tab1.c is not replicated'); + +# tab3: only (a,c) is replicated +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab3 ORDER BY "a'")); +is($result, qq(1|3 +2|4 +4|6 +5|7), 'insert on column tab3.b is not replicated'); + +# tab4: only (a,b,d) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab4 ORDER BY a"); +is($result, qq(1|red|oh my +2|blue|hello +3|red|foo +4|blue|bar), 'insert on column tab4.c is not replicated'); + +# test_part: (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part ORDER BY a"); +is($result, qq(1|abc +2|bcd +3|xxx +4|yyy +7|abc +8|bcd +9|zzz +10|qqq), 'insert on column test_part.c columns is not replicated'); + + +# TEST: do some updates on some of the tables, both on columns included +# in the column list and other + +# tab1: update of replicated column +$node_publisher->safe_psql('postgres', + qq(UPDATE tab1 SET "B" = 2 * "B" where a = 1)); + +# tab1: update of non-replicated column +$node_publisher->safe_psql('postgres', + qq(UPDATE tab1 SET c = 2*c where a = 4)); + +# tab3: update of non-replicated +$node_publisher->safe_psql('postgres', + qq(UPDATE tab3 SET "B" = "B" || ' updated' where "a'" = 4)); + +# tab3: update of replicated column +$node_publisher->safe_psql('postgres', + qq(UPDATE tab3 SET "c'" = 2 * "c'" where "a'" = 1)); + +# tab4 +$node_publisher->safe_psql('postgres', + qq(UPDATE tab4 SET b = 'blue', c = c * 2, d = d || ' updated' where a = 1)); + +# tab4 +$node_publisher->safe_psql('postgres', + qq(UPDATE tab4 SET b = 'red', c = c * 2, d = d || ' updated' where a = 2)); + +# wait for the replication to catch up, and check the UPDATE results got +# replicated correctly, with the right column list +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab1 ORDER BY a)); +is($result, +qq(1|4| +2|3| +4|5| +5|6|), 'only update on column tab1.b is replicated'); + +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab3 ORDER BY "a'")); +is($result, +qq(1|6 +2|4 +4|6 +5|7), 'only update on column tab3.c is replicated'); + +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab4 ORDER BY a)); + +is($result, qq(1|blue|oh my updated +2|red|hello updated +3|red|foo +4|blue|bar), 'update on column tab4.c is not replicated'); + + +# TEST: add table with a column list, insert data, replicate + +# insert some data before adding it to the publication +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab2 VALUES (1, 'abc', 3); +)); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab2 (a, b)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"); + +# wait for the tablesync to complete, add a bit more data and then check +# the results of the replication +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab2 VALUES (2, 'def', 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2 ORDER BY a"); +is($result, qq(1|abc +2|def), 'insert on column tab2.c is not replicated'); + +# do a couple updates, check the correct stuff gets replicated +$node_publisher->safe_psql('postgres', qq( + UPDATE tab2 SET c = 5 where a = 1; + UPDATE tab2 SET b = 'xyz' where a = 2; +)); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2 ORDER BY a"); +is($result, qq(1|abc +2|xyz), 'update on column tab2.c is not replicated'); + + +# TEST: add a table to two publications with different column lists, and +# create a single subscription replicating both publications +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int); + CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b); + CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d); + + -- insert a couple initial records + INSERT INTO tab5 VALUES (1, 11, 111, 1111); + INSERT INTO tab5 VALUES (2, 22, 222, 2222); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->wait_for_catchup('sub1'); + +# insert data and make sure all the columns (union of the columns lists) +# get fully replicated +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab5 VALUES (3, 33, 333, 3333); + INSERT INTO tab5 VALUES (4, 44, 444, 4444); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"), + qq(1|11|1111 +2|22|2222 +3|33|3333 +4|44|4444), + 'overlapping publications with overlapping column lists'); + +# and finally, remove the column list for one of the publications, which +# means replicating all columns (removing the column list), but first add +# the missing column to the table on subscriber +$node_publisher->safe_psql('postgres', qq( + ALTER PUBLICATION pub3 SET TABLE tab5; +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; + ALTER TABLE tab5 ADD COLUMN c INT; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab5 VALUES (5, 55, 555, 5555); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"), + qq(1|11|1111| +2|22|2222| +3|33|3333| +4|44|4444| +5|55|5555|555), + 'overlapping publications with overlapping column lists'); + +# TEST: create a table with a column list, then change the replica +# identity by replacing a primary key (but use a different column in +# the column list) +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int); + CREATE PUBLICATION pub4 FOR TABLE tab6 (a, b); + + -- initial data + INSERT INTO tab6 VALUES (1, 22, 333, 4444); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab6 VALUES (2, 33, 444, 5555); + UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab6 ORDER BY a"), + qq(1|44|| +2|66||), 'replication with the original primary key'); + +# now redefine the constraint - move the primary key to a different column +# (which is still covered by the column list, though) + +$node_publisher->safe_psql('postgres', qq( + ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey; + ALTER TABLE tab6 ADD PRIMARY KEY (b); +)); + +# we need to do the same thing on the subscriber +# XXX What would happen if this happens before the publisher ALTER? Or +# interleaved, somehow? But that seems unrelated to column lists. +$node_subscriber->safe_psql('postgres', qq( + ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey; + ALTER TABLE tab6 ADD PRIMARY KEY (b); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab6 VALUES (3, 55, 666, 8888); + UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab6 ORDER BY a"), + qq(1|88|| +2|132|| +3|110||), + 'replication with the modified primary key'); + + +# TEST: create a table with a column list, then change the replica +# identity by replacing a primary key with a key on multiple columns +# (all of them covered by the column list) +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int); + CREATE PUBLICATION pub5 FOR TABLE tab7 (a, b); + + -- some initial data + INSERT INTO tab7 VALUES (1, 22, 333, 4444); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab7 VALUES (2, 33, 444, 5555); + UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"), + qq(1|44|| +2|66||), 'replication with the original primary key'); + +# now redefine the constraint - move the primary key to a different column +# (which is not covered by the column list) +$node_publisher->safe_psql('postgres', qq( + ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey; + ALTER TABLE tab7 ADD PRIMARY KEY (a, b); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab7 VALUES (3, 55, 666, 7777); + UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"), + qq(1|88|| +2|132|| +3|110||), + 'replication with the modified primary key'); + +# now switch the primary key again to another columns not covered by the +# column list, but also generate writes between the drop and creation +# of the new constraint + +$node_publisher->safe_psql('postgres', qq( + ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey; + INSERT INTO tab7 VALUES (4, 77, 888, 9999); + -- update/delete is not allowed for tables without RI + ALTER TABLE tab7 ADD PRIMARY KEY (b, a); + UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; + DELETE FROM tab7 WHERE a = 1; +)); + +$node_publisher->safe_psql('postgres', qq( +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"), + qq(2|264|| +3|220|| +4|154||), + 'replication with the modified primary key'); + + +# TEST: partitioned tables (with publish_via_partition_root = false) +# and replica identity. The (leaf) partitions may have different RI, so +# we need to check the partition RI (with respect to the column list) +# while attaching the partition. + +# First, let's create a partitioned table with two partitions, each with +# a different RI, but a column list not covering all those RI. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_a (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; + + CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; + + -- initial data, one row in each partition + INSERT INTO test_part_a VALUES (1, 3); + INSERT INTO test_part_a VALUES (6, 4); +)); + +# do the same thing on the subscriber (with the opposite column order) +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_a (b int, a int) PARTITION BY LIST (a); + + CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; + + CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; +)); + +# create a publication replicating just the column "a", which is not enough +# for the second partition +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub6 FOR TABLE test_part_a (b, a) WITH (publish_via_partition_root = true); + ALTER PUBLICATION pub6 ADD TABLE test_part_a_1 (a); + ALTER PUBLICATION pub6 ADD TABLE test_part_a_2 (b); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_a VALUES (2, 5); + INSERT INTO test_part_a VALUES (7, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT a, b FROM test_part_a ORDER BY a, b"), + qq(1|3 +2|5 +6|4 +7|6), + 'partitions with different replica identities not replicated correctly'); + +# This time start with a column list covering RI for all partitions, but +# then update the column list to not cover column "b" (needed by the +# second partition) + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey; + + CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey; + + -- initial data, one row in each partitions + INSERT INTO test_part_b VALUES (1, 1); + INSERT INTO test_part_b VALUES (6, 2); +)); + +# do the same thing on the subscriber +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey; + + CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey; +)); + +# create a publication replicating both columns, which is sufficient for +# both partitions +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub7 FOR TABLE test_part_b (a, b) WITH (publish_via_partition_root = true); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_b VALUES (2, 3); + INSERT INTO test_part_b VALUES (7, 4); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_b ORDER BY a, b"), + qq(1|1 +2|3 +6|2 +7|4), + 'partitions with different replica identities not replicated correctly'); + + +# TEST: This time start with a column list covering RI for all partitions, +# but then update RI for one of the partitions to not be covered by the +# column list anymore. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3); + ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey; + + CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4); + ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey; + + -- initial data, one row for each partition + INSERT INTO test_part_c VALUES (1, 3, 5); + INSERT INTO test_part_c VALUES (2, 4, 6); +)); + +# do the same thing on the subscriber +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3); + ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey; + + CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4); + ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey; +)); + +# create a publication replicating data through partition root, with a column +# list on the root, and then add the partitions one by one with separate +# column lists (but those are not applied) +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a,c); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; +)); + +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_c VALUES (3, 7, 8); + INSERT INTO test_part_c VALUES (4, 9, 10); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_c ORDER BY a, b"), + qq(1||5 +2|4| +3||8 +4|9|), + 'partitions with different replica identities not replicated correctly'); + + +# create a publication not replicating data through partition root, without +# a column list on the root, and then add the partitions one by one with +# separate column lists +$node_publisher->safe_psql('postgres', qq( + DROP PUBLICATION pub8; + CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; + TRUNCATE test_part_c; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + TRUNCATE test_part_c; + INSERT INTO test_part_c VALUES (1, 3, 5); + INSERT INTO test_part_c VALUES (2, 4, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_c ORDER BY a, b"), + qq(1|| +2|4|), + 'partitions with different replica identities not replicated correctly'); + + +# TEST: Start with a single partition, with RI compatible with the column +# list, and then attach a partition with incompatible RI. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3); + ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey; + + INSERT INTO test_part_d VALUES (1, 2); +)); + +# do the same thing on the subscriber (in fact, create both partitions right +# away, no need to delay that) +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3); + ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey; + + CREATE TABLE test_part_d_2 PARTITION OF test_part_d FOR VALUES IN (2,4); + ALTER TABLE test_part_d_2 ADD PRIMARY KEY (a); + ALTER TABLE test_part_d_2 REPLICA IDENTITY USING INDEX test_part_d_2_pkey; +)); + +# create a publication replicating both columns, which is sufficient for +# both partitions +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub9 FOR TABLE test_part_d (a) WITH (publish_via_partition_root = true); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_d VALUES (3, 4); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_d ORDER BY a, b"), + qq(1| +3|), + 'partitions with different replica identities not replicated correctly'); + +# TEST: With a table included in multiple publications, we should use a +# union of the column lists. So with column lists (a,b) and (a,c) we +# should replicate (a,b,c). + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); + CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); + + -- initial data + INSERT INTO test_mix_1 VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_mix_1 VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_1 ORDER BY a"), + qq(1|2|3 +4|5|6), + 'a mix of publications should use a union of column list'); + + +# TEST: With a table included in multiple publications, we should use a +# union of the column lists. If any of the publications is FOR ALL +# TABLES, we should replicate all columns. + +# drop unnecessary tables, so as not to interfere with the FOR ALL TABLES +$node_publisher->safe_psql('postgres', qq( + DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1, + test_part, test_part_a, test_part_b, test_part_c, test_part_d; +)); + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b); + CREATE PUBLICATION pub_mix_4 FOR ALL TABLES; + + -- initial data + INSERT INTO test_mix_2 VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_3, pub_mix_4; + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_mix_2 VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_2"), + qq(1|2|3 +4|5|6), + 'a mix of publications should use a union of column list'); + + +# TEST: With a table included in multiple publications, we should use a +# union of the column lists. If any of the publications is FOR ALL +# TABLES IN SCHEMA, we should replicate all columns. + +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); +)); + +$node_publisher->safe_psql('postgres', qq( + DROP TABLE test_mix_2; + CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b); + CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public; + + -- initial data + INSERT INTO test_mix_3 VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_mix_3 VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_3"), + qq(1|2|3 +4|5|6), + 'a mix of publications should use a union of column list'); + + +# TEST: Check handling of publish_via_partition_root - if a partition is +# published through partition root, we should only apply the column list +# defined for the whole table (not the partitions) - both during the initial +# sync and when replicating changes. This is what we do for row filters. + +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + + CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); + CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); +)); + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); + CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); + + CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true); + + -- initial data + INSERT INTO test_root VALUES (1, 2, 3); + INSERT INTO test_root VALUES (10, 20, 30); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_root VALUES (2, 3, 4); + INSERT INTO test_root VALUES (11, 21, 31); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_root ORDER BY a, b, c"), + qq(1|| +2|| +10|| +11||), + 'publication via partition root applies column list'); + + +# TEST: Multiple publications which publish schema of parent table and +# partition. The partition is published through two publications, once +# through a schema (so no column list) containing the parent, and then +# also directly (with a columns list). The expected outcome is there is +# no column list. + +$node_publisher->safe_psql('postgres', qq( + DROP PUBLICATION pub1, pub2, pub3, pub4, pub5, pub6, pub7, pub8; + + CREATE SCHEMA s1; + CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); + + CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1; + CREATE PUBLICATION pub2 FOR TABLE t_1(b); + + -- initial data + INSERT INTO s1.t VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SCHEMA s1; + CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO s1.t VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM s1.t ORDER BY a"), + qq(1|2|3 +4|5|6), + 'two publications, publishing the same relation'); + +# Now resync the subcription, but with publications in the opposite order. +# The result should be the same. + +$node_subscriber->safe_psql('postgres', qq( + TRUNCATE s1.t; + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO s1.t VALUES (7, 8, 9); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM s1.t ORDER BY a"), + qq(7|8|9), + 'two publications, publishing the same relation'); + + +# TEST: One publication, containing both the parent and child relations. +# The expected outcome is list "a", because that's the column list defined +# for the top-most ancestor added to the publication. + +$node_publisher->safe_psql('postgres', qq( + DROP SCHEMA s1 CASCADE; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + CREATE PUBLICATION pub3 FOR TABLE t_1 (a), t_2 + WITH (PUBLISH_VIA_PARTITION_ROOT); + + -- initial data + INSERT INTO t VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + DROP SCHEMA s1 CASCADE; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO t VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"), + qq(1|| +4||), + 'publication containing both parent and child relation'); + + +# TEST: One publication, containing both the parent and child relations. +# The expected outcome is list "a", because that's the column list defined +# for the top-most ancestor added to the publication. +# Note: The difference from the preceding test is that in this case both +# relations have a column list defined. + +$node_publisher->safe_psql('postgres', qq( + DROP TABLE t; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + CREATE PUBLICATION pub4 FOR TABLE t_1 (a), t_2 (b) + WITH (PUBLISH_VIA_PARTITION_ROOT); + + -- initial data + INSERT INTO t VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + DROP TABLE t; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO t VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"), + qq(1|| +4||), + 'publication containing both parent and child relation'); + + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +done_testing();