diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index b8c954a554..560e205b95 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -4410,7 +4410,7 @@ SCRAM-SHA-256$<iteration count>:&l This is an array of indnatts values that - indicate which table columns this index indexes. For example a value + indicate which table columns this index indexes. For example, a value of 1 3 would mean that the first and the third table columns make up the index entries. Key columns come before non-key (included) columns. A zero in this array indicates that the @@ -6291,6 +6291,19 @@ SCRAM-SHA-256$<iteration count>:&l Reference to schema + + + + prattrs int2vector + (references pg_attribute.attnum) + + + This is an array of values that indicates which table columns are + part of the publication. For example, a value of 1 3 + would mean that the first and the third table columns are published. + A null value indicates that all columns are published. + + diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 8bb34a7c5b..2fa3cedfe9 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -7016,7 +7016,8 @@ Relation - Next, the following message part appears for each column (except generated columns): + Next, the following message part appears for each column included in + the publication (except generated columns): diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index a8cc8f3dc2..40366a10fe 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -30,7 +30,7 @@ ALTER PUBLICATION name RENAME TO where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] SEQUENCE sequence_name [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] ALL SEQUENCES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -114,6 +114,14 @@ ALTER PUBLICATION name RENAME TO * can be specified after the table name to explicitly indicate that descendant tables are included. + + + + Optionally, a column list can be specified. See for details. + + + If the optional WHERE clause is specified, rows for which the expression evaluates to false or null will not be published. Note that parentheses @@ -185,7 +193,13 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); Add some tables to the publication: -ALTER PUBLICATION mypublication ADD TABLE users, departments; +ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments; + + + + Change the set of columns published for a table: + +ALTER PUBLICATION mypublication SET TABLE users (user_id, firstname, lastname), TABLE departments; diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index e5081eb50e..d2739968d9 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -33,7 +33,7 @@ CREATE PUBLICATION name where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] SEQUENCE sequence_name [ * ] [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] ALL SEQUENCES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -93,6 +93,13 @@ CREATE PUBLICATION name TRUNCATE commands. + + When a column list is specified, only the named columns are replicated. + If no column list is specified, all columns of the table are replicated + through this publication, including any columns added later. If a column + list is specified, it must include the replica identity columns. + + Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, @@ -348,6 +355,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABL sales: CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales; + + + + Create a publication that publishes all changes for table users, + but replicates only columns user_id and + firstname: + +CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname); diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 514a94796e..a5a54e676e 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -45,6 +45,9 @@ #include "utils/rel.h" #include "utils/syscache.h" +static void publication_translate_columns(Relation targetrel, List *columns, + int *natts, AttrNumber **attrs); + /* * Check if relation can be in given publication and throws appropriate * error if not. @@ -395,6 +398,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Publication *pub = GetPublication(pubid); + AttrNumber *attarray = NULL; + int natts = 0; ObjectAddress myself, referenced; List *relids = NIL; @@ -422,6 +427,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, check_publication_add_relation(targetrel); + /* + * Translate column names to attnums and make sure the column list contains + * only allowed elements (no system or generated columns etc.). Also build + * an array of attnums, for storing in the catalog. + */ + publication_translate_columns(pri->relation, pri->columns, + &natts, &attarray); + /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -440,6 +453,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, else nulls[Anum_pg_publication_rel_prqual - 1] = true; + /* Add column list, if available */ + if (pri->columns) + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts)); + else + nulls[Anum_pg_publication_rel_prattrs - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -463,6 +482,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, DEPENDENCY_NORMAL, DEPENDENCY_NORMAL, false); + /* Add dependency on the columns, if any are listed */ + for (int i = 0; i < natts; i++) + { + ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + /* Close the table. */ table_close(rel, RowExclusiveLock); @@ -482,6 +508,125 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, return myself; } +/* qsort comparator for attnums */ +static int +compare_int16(const void *a, const void *b) +{ + int av = *(const int16 *) a; + int bv = *(const int16 *) b; + + /* this can't overflow if int is wider than int16 */ + return (av - bv); +} + +/* + * Translate a list of column names to an array of attribute numbers + * and a Bitmapset with them; verify that each attribute is appropriate + * to have in a publication column list (no system or generated attributes, + * no duplicates). Additional checks with replica identity are done later; + * see check_publication_columns. + * + * Note that the attribute numbers are *not* offset by + * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this + * is okay. + */ +static void +publication_translate_columns(Relation targetrel, List *columns, + int *natts, AttrNumber **attrs) +{ + AttrNumber *attarray = NULL; + Bitmapset *set = NULL; + ListCell *lc; + int n = 0; + TupleDesc tupdesc = RelationGetDescr(targetrel); + + /* Bail out when no column list defined. */ + if (!columns) + return; + + /* + * Translate list of columns to attnums. We prohibit system attributes and + * make sure there are no duplicate columns. + */ + attarray = palloc(sizeof(AttrNumber) * list_length(columns)); + foreach(lc, columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel))); + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference system column \"%s\" in publication column list", + colname)); + + if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference generated column \"%s\" in publication column list", + colname)); + + if (bms_is_member(attnum, set)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("duplicate column \"%s\" in publication column list", + colname)); + + set = bms_add_member(set, attnum); + attarray[n++] = attnum; + } + + /* Be tidy, so that the catalog representation is always sorted */ + qsort(attarray, n, sizeof(AttrNumber), compare_int16); + + *natts = n; + *attrs = attarray; + + bms_free(set); +} + +/* + * Transform the column list (represented by an array) to a bitmapset. + */ +Bitmapset * +pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt) +{ + Bitmapset *result = NULL; + ArrayType *arr; + int nelems; + int16 *elems; + MemoryContext oldcxt; + + /* + * If an existing bitmap was provided, use it. Otherwise just use NULL + * and build a new bitmap. + */ + if (columns) + result = columns; + + arr = DatumGetArrayTypeP(pubcols); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + /* If a memory context was specified, switch to it. */ + if (mcxt) + oldcxt = MemoryContextSwitchTo(mcxt); + + for (int i = 0; i < nelems; i++) + result = bms_add_member(result, elems[i]); + + if (mcxt) + MemoryContextSwitchTo(oldcxt); + + return result; +} + /* * Insert new publication / schema mapping. */ diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index e449e8e8f2..84e37df783 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -342,7 +342,7 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context) * Returns true if any invalid column is found. */ bool -contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, +pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot) { HeapTuple rftuple; @@ -411,6 +411,114 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, return result; } +/* + * Check if all columns referenced in the REPLICA IDENTITY are covered by + * the column list. + * + * Returns true if any replica identity column is not covered by column list. + */ +bool +pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, + bool pubviaroot) +{ + HeapTuple tuple; + Oid relid = RelationGetRelid(relation); + Oid publish_as_relid = RelationGetRelid(relation); + bool result = false; + Datum datum; + bool isnull; + + /* + * For a partition, if pubviaroot is true, find the topmost ancestor that + * is published via this publication as we need to use its column list + * for the changes. + * + * Note that even though the column list used is for an ancestor, the + * REPLICA IDENTITY used will be for the actual child table. + */ + if (pubviaroot && relation->rd_rel->relispartition) + { + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + + if (!OidIsValid(publish_as_relid)) + publish_as_relid = relid; + } + + tuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pubid)); + + if (!HeapTupleIsValid(tuple)) + return false; + + datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + + if (!isnull) + { + int x; + Bitmapset *idattrs; + Bitmapset *columns = NULL; + + /* With REPLICA IDENTITY FULL, no column list is allowed. */ + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + result = true; + + /* Transform the column list datum to a bitmapset. */ + columns = pub_collist_to_bitmapset(NULL, datum, NULL); + + /* Remember columns that are part of the REPLICA IDENTITY */ + idattrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* + * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are + * offset (to handle system columns the usual way), while column list + * does not use offset, so we can't do bms_is_subset(). Instead, we have + * to loop over the idattrs and check all of them are in the list. + */ + x = -1; + while ((x = bms_next_member(idattrs, x)) >= 0) + { + AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); + + /* + * If pubviaroot is true, we are validating the column list of the + * parent table, but the bitmap contains the replica identity + * information of the child table. The parent/child attnums may not + * match, so translate them to the parent - get the attname from + * the child, and look it up in the parent. + */ + if (pubviaroot) + { + /* attribute name in the child table */ + char *colname = get_attname(relid, attnum, false); + + /* + * Determine the attnum for the attribute name in parent (we + * are using the column list defined on the parent). + */ + attnum = get_attnum(publish_as_relid, colname); + } + + /* replica identity column, not covered by the column list */ + if (!bms_is_member(attnum, columns)) + { + result = true; + break; + } + } + + bms_free(idattrs); + bms_free(columns); + } + + ReleaseSysCache(tuple); + + return result; +} + /* check_functions_in_node callback */ static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context) @@ -652,6 +760,39 @@ TransformPubWhereClauses(List *tables, const char *queryString, } } + +/* + * Check the publication column lists expression for all relations in the list. + */ +static void +CheckPubRelationColumnList(List *tables, const char *queryString, + bool pubviaroot) +{ + ListCell *lc; + + foreach(lc, tables) + { + PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); + + if (pri->columns == NIL) + continue; + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's column list will be used. So disallow using + * the column list on partitioned table in this case. + */ + if (!pubviaroot && + pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use publication column list for relation \"%s\"", + RelationGetRelationName(pri->relation)), + errdetail("column list cannot be used for a partitioned table when %s is false.", + "publish_via_partition_root"))); + } +} + /* * Create new publication. */ @@ -808,6 +949,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); + CheckPubRelationColumnList(rels, pstate->p_sourcetext, + publish_via_partition_root); + PublicationAddRelations(puboid, rels, true, NULL); CloseRelationList(rels); } @@ -895,8 +1039,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, /* * If the publication doesn't publish changes via the root partitioned - * table, the partition's row filter will be used. So disallow using WHERE - * clause on partitioned table in this case. + * table, the partition's row filter and column list will be used. So disallow + * using WHERE clause and column lists on partitioned table in this case. */ if (!pubform->puballtables && publish_via_partition_root_given && !publish_via_partition_root) @@ -904,7 +1048,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, /* * Lock the publication so nobody else can do anything with it. This * prevents concurrent alter to add partitioned table(s) with WHERE - * clause(s) which we don't allow when not publishing via root. + * clause(s) and/or column lists which we don't allow when not + * publishing via root. */ LockDatabaseObject(PublicationRelationId, pubform->oid, 0, AccessShareLock); @@ -917,13 +1062,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, { HeapTuple rftuple; Oid relid = lfirst_oid(lc); + bool has_column_list; + bool has_row_filter; rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubform->oid)); + has_row_filter + = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL); + + has_column_list + = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL); + if (HeapTupleIsValid(rftuple) && - !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL)) + (has_row_filter || has_column_list)) { HeapTuple tuple; @@ -932,7 +1085,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, { Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple); - if (relform->relkind == RELKIND_PARTITIONED_TABLE) + if ((relform->relkind == RELKIND_PARTITIONED_TABLE) && + has_row_filter) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot set %s for publication \"%s\"", @@ -943,6 +1097,18 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, NameStr(relform->relname), "publish_via_partition_root"))); + if ((relform->relkind == RELKIND_PARTITIONED_TABLE) && + has_column_list) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set %s for publication \"%s\"", + "publish_via_partition_root = false", + stmt->pubname), + errdetail("The publication contains a column list for a partitioned table \"%s\" " + "which is not allowed when %s is false.", + NameStr(relform->relname), + "publish_via_partition_root"))); + ReleaseSysCache(tuple); } @@ -1107,6 +1273,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + PublicationAddRelations(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) @@ -1124,6 +1292,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + /* * To recreate the relation list for the publication, look for * existing relations that do not need to be dropped. @@ -1135,42 +1305,79 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PublicationRelInfo *oldrel; bool found = false; HeapTuple rftuple; - bool rfisnull = true; Node *oldrelwhereclause = NULL; + Bitmapset *oldcolumns = NULL; /* look up the cache for the old relmap */ rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(oldrelid), ObjectIdGetDatum(pubid)); + /* + * See if the existing relation currently has a WHERE clause or a + * column list. We need to compare those too. + */ if (HeapTupleIsValid(rftuple)) { + bool isnull = true; Datum whereClauseDatum; + Datum columnListDatum; + /* Load the WHERE clause for this table. */ whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, - &rfisnull); - if (!rfisnull) + &isnull); + if (!isnull) oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum)); + /* Transform the int2vector column list to a bitmap. */ + columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prattrs, + &isnull); + + if (!isnull) + oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL); + ReleaseSysCache(rftuple); } foreach(newlc, rels) { PublicationRelInfo *newpubrel; + Oid newrelid; + Bitmapset *newcolumns = NULL; newpubrel = (PublicationRelInfo *) lfirst(newlc); + newrelid = RelationGetRelid(newpubrel->relation); + + /* + * If the new publication has column list, transform it to + * a bitmap too. + */ + if (newpubrel->columns) + { + ListCell *lc; + + foreach(lc, newpubrel->columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(newrelid, colname); + + newcolumns = bms_add_member(newcolumns, attnum); + } + } /* * Check if any of the new set of relations matches with the * existing relations in the publication. Additionally, if the * relation has an associated WHERE clause, check the WHERE - * expressions also match. Drop the rest. + * expressions also match. Same for the column list. Drop the + * rest. */ if (RelationGetRelid(newpubrel->relation) == oldrelid) { - if (equal(oldrelwhereclause, newpubrel->whereClause)) + if (equal(oldrelwhereclause, newpubrel->whereClause) && + bms_equal(oldcolumns, newcolumns)) { found = true; break; @@ -1186,6 +1393,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, { oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; + oldrel->columns = NIL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1401,6 +1609,7 @@ AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup, { oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; + oldrel->columns = NULL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1660,6 +1869,7 @@ OpenRelationList(List *rels, char objectType) List *result = NIL; ListCell *lc; List *relids_with_rf = NIL; + List *relids_with_collist = NIL; /* * Open, share-lock, and check all the explicitly-specified relations @@ -1710,6 +1920,13 @@ OpenRelationList(List *rels, char objectType) errmsg("conflicting or redundant WHERE clauses for table \"%s\"", RelationGetRelationName(rel)))); + /* Disallow duplicate tables if there are any with column lists. */ + if (t->columns || list_member_oid(relids_with_collist, myrelid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant column lists for table \"%s\"", + RelationGetRelationName(rel)))); + table_close(rel, ShareUpdateExclusiveLock); continue; } @@ -1717,12 +1934,16 @@ OpenRelationList(List *rels, char objectType) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; + pub_rel->columns = t->columns; result = lappend(result, pub_rel); relids = lappend_oid(relids, myrelid); if (t->whereClause) relids_with_rf = lappend_oid(relids_with_rf, myrelid); + if (t->columns) + relids_with_collist = lappend_oid(relids_with_collist, myrelid); + /* * Add children of this rel, if requested, so that they too are added * to the publication. A partitioned table can't have any inheritance @@ -1762,6 +1983,18 @@ OpenRelationList(List *rels, char objectType) errmsg("conflicting or redundant WHERE clauses for table \"%s\"", RelationGetRelationName(rel)))); + /* + * We don't allow to specify column list for both parent + * and child table at the same time as it is not very + * clear which one should be given preference. + */ + if (childrelid != myrelid && + (t->columns || list_member_oid(relids_with_collist, childrelid))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant column lists for table \"%s\"", + RelationGetRelationName(rel)))); + continue; } @@ -1771,11 +2004,16 @@ OpenRelationList(List *rels, char objectType) pub_rel->relation = rel; /* child inherits WHERE clause from parent */ pub_rel->whereClause = t->whereClause; + /* child inherits column list from parent */ + pub_rel->columns = t->columns; result = lappend(result, pub_rel); relids = lappend_oid(relids, childrelid); if (t->whereClause) relids_with_rf = lappend_oid(relids_with_rf, childrelid); + + if (t->columns) + relids_with_collist = lappend_oid(relids_with_collist, childrelid); } } } @@ -1884,6 +2122,11 @@ PublicationDropRelations(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); + if (pubrel->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 0df7cf5874..1a4fbdc38c 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -574,9 +574,6 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) if (cmd != CMD_UPDATE && cmd != CMD_DELETE) return; - if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - return; - /* * It is only safe to execute UPDATE/DELETE when all columns, referenced * in the row filters from publications which the relation is in, are @@ -596,17 +593,33 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) errmsg("cannot update table \"%s\"", RelationGetRelationName(rel)), errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot update table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column list used by the publication does not cover the replica identity."))); else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot delete from table \"%s\"", RelationGetRelationName(rel)), errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot delete from table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column list used by the publication does not cover the replica identity."))); /* If relation has replica identity we are always good. */ if (OidIsValid(RelationGetReplicaIndex(rel))) return; + /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */ + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + return; + /* * This is UPDATE/DELETE and there is no replica identity. * diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 55f720a88f..e38ff4000f 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4850,6 +4850,7 @@ _copyPublicationTable(const PublicationTable *from) COPY_NODE_FIELD(relation); COPY_NODE_FIELD(whereClause); + COPY_NODE_FIELD(columns); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 82562eb9b8..0f330e3c70 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2322,6 +2322,7 @@ _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(whereClause); + COMPARE_NODE_FIELD(columns); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index f8301541c9..945a9ada8b 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9749,13 +9749,14 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr OptWhereClause + TABLE relation_expr opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; - $$->pubtable->whereClause = $3; + $$->pubtable->columns = $3; + $$->pubtable->whereClause = $4; } | ALL TABLES IN_P SCHEMA ColId { @@ -9790,11 +9791,15 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA; $$->location = @5; } - | ColId OptWhereClause + | ColId opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - if ($2) + /* + * If either a row filter or column list is specified, create + * a PublicationTable object. + */ + if ($2 || $3) { /* * The OptWhereClause must be stored here but it is @@ -9804,7 +9809,8 @@ PublicationObjSpec: */ $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVar(NULL, $1, @1); - $$->pubtable->whereClause = $2; + $$->pubtable->columns = $2; + $$->pubtable->whereClause = $3; } else { @@ -9812,23 +9818,25 @@ PublicationObjSpec: } $$->location = @1; } - | ColId indirection OptWhereClause + | ColId indirection opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); - $$->pubtable->whereClause = $3; + $$->pubtable->columns = $3; + $$->pubtable->whereClause = $4; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr OptWhereClause + | extended_relation_expr opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; - $$->pubtable->whereClause = $2; + $$->pubtable->columns = $2; + $$->pubtable->whereClause = $3; } | CURRENT_SCHEMA { @@ -17524,6 +17532,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) errmsg("WHERE clause not allowed for schema"), parser_errposition(pubobj->location)); + /* Column list is not allowed on a schema object */ + if (pubobj->pubtable && pubobj->pubtable->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column specification not allowed for schema"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3dbe85d61a..18d3cbb924 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,16 +29,30 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, + Bitmapset *columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary); + bool binary, Bitmapset *columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); +/* + * Check if a column is covered by a column list. + * + * Need to be careful about NULL, which is treated as a column list covering + * all columns. + */ +static bool +column_in_column_list(int attnum, Bitmapset *columns) +{ + return (columns == NULL || bms_is_member(attnum, columns)); +} + + /* * Write BEGIN to the output stream. */ @@ -398,7 +412,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary) + TupleTableSlot *newslot, bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +424,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -443,7 +457,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary) + bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -464,11 +478,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary); + logicalrep_write_tuple(out, rel, oldslot, binary, NULL); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -537,7 +551,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary); + logicalrep_write_tuple(out, rel, oldslot, binary, NULL); } /* @@ -702,7 +716,8 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, + Bitmapset *columns) { char *relname; @@ -724,7 +739,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, columns); } /* @@ -801,7 +816,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary) + bool binary, Bitmapset *columns) { TupleDesc desc; Datum *values; @@ -813,8 +828,14 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) continue; + + if (!column_in_column_list(att->attnum, columns)) + continue; + nliveatts++; } pq_sendint16(out, nliveatts); @@ -833,6 +854,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, if (att->attisdropped || att->attgenerated) continue; + if (!column_in_column_list(att->attnum, columns)) + continue; + if (isnull[i]) { pq_sendbyte(out, LOGICALREP_COLUMN_NULL); @@ -954,7 +978,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { TupleDesc desc; int i; @@ -967,8 +991,14 @@ logicalrep_write_attrs(StringInfo out, Relation rel) /* send number of live attributes */ for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) continue; + + if (!column_in_column_list(att->attnum, columns)) + continue; + nliveatts++; } pq_sendint16(out, nliveatts); @@ -987,6 +1017,9 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; + if (!column_in_column_list(att->attnum, columns)) + continue; + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d8b12d94bc..697fb23634 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -113,6 +113,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "utils/acl.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -702,12 +703,13 @@ fetch_remote_table_info(char *nspname, char *relname, StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; ListCell *lc; bool first; + Bitmapset *included_cols = NULL; lrel->nspname = nspname; lrel->relname = relname; @@ -748,10 +750,110 @@ fetch_remote_table_info(char *nspname, char *relname, ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - /* Now fetch columns. */ + + /* + * Get column lists for each relation. + * + * For initial synchronization, column lists can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any column list + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as ALL TABLES IN + * SCHEMA that includes this relation + * + * We need to do this before fetching info about column names and types, + * so that we can skip columns that should not be replicated. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + WalRcvExecResult *pubres; + TupleTableSlot *slot; + Oid attrsRow[] = {INT2OID}; + StringInfoData pub_names; + bool first = true; + + initStringInfo(&pub_names); + foreach(lc, MySubscription->publications) + { + if (!first) + appendStringInfo(&pub_names, ", "); + appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc)))); + first = false; + } + + /* + * Fetch info about column lists for the relation (from all the + * publications). We unnest the int2vector values, because that + * makes it easier to combine lists by simply adding the attnums + * to a new bitmap (without having to parse the int2vector data). + * This preserves NULL values, so that if one of the publications + * has no column list, we'll know that. + */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT unnest" + " FROM pg_publication p" + " LEFT OUTER JOIN pg_publication_rel pr" + " ON (p.oid = pr.prpubid AND pr.prrelid = %u)" + " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + lrel->remoteid, + pub_names.data); + + pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrsRow), attrsRow); + + if (pubres->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s", + nspname, relname, pubres->err))); + + /* + * Merge the column lists (from different publications) by creating + * a single bitmap with all the attnums. If we find a NULL value, + * that means one of the publications has no column list for the + * table we're syncing. + */ + slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + { + Datum cfval = slot_getattr(slot, 1, &isnull); + + /* NULL means empty column list, so we're done. */ + if (isnull) + { + bms_free(included_cols); + included_cols = NULL; + break; + } + + included_cols = bms_add_member(included_cols, + DatumGetInt16(cfval)); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(pubres); + + pfree(pub_names.data); + } + + /* + * Now fetch column names and types. + */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT a.attname," + "SELECT a.attnum," + " a.attname," " a.atttypid," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" @@ -779,16 +881,35 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + /* + * Store the columns as a list of names. Ignore those that are not + * present in the column list, if there is one. + */ natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + + /* If the column is not in the column list, skip it. */ + if (included_cols != NULL && !bms_is_member(attnum, included_cols)) + { + ExecClearTuple(slot); + continue; + } + + rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 3, &isnull))) + + lrel->attnames[natt] = rel_colname; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ @@ -931,8 +1052,24 @@ copy_table(Relation rel) /* Regular table with no row filter */ if (lrel.relkind == RELKIND_RELATION && qual == NIL) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + + /* + * XXX Do we need to list the columns in all cases? Maybe we're replicating + * all columns? + */ + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 292e7299d8..893833ea83 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -30,6 +30,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -90,7 +91,8 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -148,9 +150,6 @@ typedef struct RelationSyncEntry */ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; EState *estate; /* executor state used for row filter */ - MemoryContext cache_expr_cxt; /* private context for exprstate and - * estate, if any */ - TupleTableSlot *new_slot; /* slot for storing new tuple */ TupleTableSlot *old_slot; /* slot for storing old tuple */ @@ -169,6 +168,19 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ AttrMap *attrmap; + + /* + * Columns included in the publication, or NULL if all columns are + * included implicitly. Note that the attnums in this bitmap are not + * shifted by FirstLowInvalidHeapAttributeNumber. + */ + Bitmapset *columns; + + /* + * Private context to store additional data for this entry - state for + * the row filter expressions, column list, etc. + */ + MemoryContext entry_cxt; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -200,6 +212,11 @@ static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, RelationSyncEntry *entry, ReorderBufferChangeType *action); +/* column list routines */ +static void pgoutput_column_list_init(PGOutputData *data, + List *publications, + RelationSyncEntry *entry); + /* * Specify output plugin callbacks */ @@ -622,11 +639,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->columns); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -639,7 +656,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -662,13 +680,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; + /* Skip this attribute if it's not present in the column list */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, columns); OutputPluginWrite(ctx, false); } @@ -722,6 +744,28 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) return DatumGetBool(ret); } +/* + * Make sure the per-entry memory context exists. + */ +static void +pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry) +{ + Relation relation; + + /* The context may already exist, in which case bail out. */ + if (entry->entry_cxt) + return; + + relation = RelationIdGetRelation(entry->publish_as_relid); + + entry->entry_cxt = AllocSetContextCreate(data->cachectx, + "entry private context", + ALLOCSET_SMALL_SIZES); + + MemoryContextCopyAndSetIdentifier(entry->entry_cxt, + RelationGetRelationName(relation)); +} + /* * Initialize the row filter. */ @@ -842,21 +886,13 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, { Relation relation = RelationIdGetRelation(entry->publish_as_relid); - Assert(entry->cache_expr_cxt == NULL); - - /* Create the memory context for row filters */ - entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx, - "Row filter expressions", - ALLOCSET_DEFAULT_SIZES); - - MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt, - RelationGetRelationName(relation)); + pgoutput_ensure_entry_cxt(data, entry); /* * Now all the filters for all pubactions are known. Combine them when * their pubactions are the same. */ - oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt); + oldctx = MemoryContextSwitchTo(entry->entry_cxt); entry->estate = create_estate_for_relation(relation); for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) { @@ -879,6 +915,105 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, } } +/* + * Initialize the column list. + */ +static void +pgoutput_column_list_init(PGOutputData *data, List *publications, + RelationSyncEntry *entry) +{ + ListCell *lc; + + /* + * Find if there are any column lists for this relation. If there are, + * build a bitmap merging all the column lists. + * + * All the given publication-table mappings must be checked. + * + * Multiple publications might have multiple column lists for this relation. + * + * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column + * list" so it takes precedence. + */ + foreach(lc, publications) + { + Publication *pub = lfirst(lc); + HeapTuple cftuple = NULL; + Datum cfdatum = 0; + + /* + * Assume there's no column list. Only if we find pg_publication_rel + * entry with a column list we'll switch it to false. + */ + bool pub_no_list = true; + + /* + * If the publication is FOR ALL TABLES then it is treated the same as if + * there are no column lists (even if other publications have a list). + */ + if (!pub->alltables) + { + /* + * Check for the presence of a column list in this publication. + * + * Note: If we find no pg_publication_rel row, it's a publication + * defined for a whole schema, so it can't have a column list, just + * like a FOR ALL TABLES publication. + */ + cftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(entry->publish_as_relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(cftuple)) + { + /* + * Lookup the column list attribute. + * + * Note: We update the pub_no_list value directly, because if + * the value is NULL, we have no list (and vice versa). + */ + cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, + Anum_pg_publication_rel_prattrs, + &pub_no_list); + + /* + * Build the column list bitmap in the per-entry context. + * + * We need to merge column lists from all publications, so we + * update the same bitmapset. If the column list is null, we + * interpret it as replicating all columns. + */ + if (!pub_no_list) /* when not null */ + { + pgoutput_ensure_entry_cxt(data, entry); + + entry->columns = pub_collist_to_bitmapset(entry->columns, + cfdatum, + entry->entry_cxt); + } + } + } + + /* + * Found a publication with no column list, so we're done. But first + * discard column list we might have from preceding publications. + */ + if (pub_no_list) + { + if (cftuple) + ReleaseSysCache(cftuple); + + bms_free(entry->columns); + entry->columns = NULL; + + break; + } + + ReleaseSysCache(cftuple); + } /* loop all subscribed publications */ + +} + /* * Initialize the slot for storing new and old tuples, and build the map that * will be used to convert the relation's tuples into the ancestor's format. @@ -1243,7 +1378,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary); + data->binary, relentry->columns); OutputPluginWrite(ctx, true); break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -1297,11 +1432,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, - new_slot, data->binary); + new_slot, data->binary, + relentry->columns); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, - old_slot, new_slot, data->binary); + old_slot, new_slot, data->binary, + relentry->columns); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, @@ -1794,8 +1931,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); - entry->cache_expr_cxt = NULL; + entry->entry_cxt = NULL; entry->publish_as_relid = InvalidOid; + entry->columns = NULL; entry->attrmap = NULL; } @@ -1841,6 +1979,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->columns); + entry->columns = NULL; entry->pubactions.pubinsert = false; entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; @@ -1865,17 +2005,18 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* * Row filter cache cleanups. */ - if (entry->cache_expr_cxt) - MemoryContextDelete(entry->cache_expr_cxt); + if (entry->entry_cxt) + MemoryContextDelete(entry->entry_cxt); - entry->cache_expr_cxt = NULL; + entry->entry_cxt = NULL; entry->estate = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); /* * Build publication cache. We can't use one provided by relcache as - * relcache considers all publications given relation is in, but here - * we only need to consider ones that the subscriber requested. + * relcache considers all publications that the given relation is in, + * but here we only need to consider ones that the subscriber + * requested. */ foreach(lc, data->publications) { @@ -1946,6 +2087,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* + * If the relation is to be published, determine actions to + * publish, and list of columns, if appropriate. + * * Don't publish changes for partitioned tables, because * publishing those of its partitions suffices, unless partition * changes won't be published due to pubviaroot being set. @@ -2007,6 +2151,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* Initialize the row filter */ pgoutput_row_filter_init(data, rel_publications, entry); + + /* Initialize the column list */ + pgoutput_column_list_init(data, rel_publications, entry); } list_free(pubids); diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 4f3fe1118a..d47fac7bb9 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5575,6 +5575,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) memset(pubdesc, 0, sizeof(PublicationDesc)); pubdesc->rf_valid_for_update = true; pubdesc->rf_valid_for_delete = true; + pubdesc->cols_valid_for_update = true; + pubdesc->cols_valid_for_delete = true; return; } @@ -5587,6 +5589,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) memset(pubdesc, 0, sizeof(PublicationDesc)); pubdesc->rf_valid_for_update = true; pubdesc->rf_valid_for_delete = true; + pubdesc->cols_valid_for_update = true; + pubdesc->cols_valid_for_delete = true; /* Fetch the publication membership info. */ puboids = GetRelationPublications(relid); @@ -5657,7 +5661,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) */ if (!pubform->puballtables && (pubform->pubupdate || pubform->pubdelete) && - contain_invalid_rfcolumn(pubid, relation, ancestors, + pub_rf_contains_invalid_column(pubid, relation, ancestors, pubform->pubviaroot)) { if (pubform->pubupdate) @@ -5666,6 +5670,23 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->rf_valid_for_delete = false; } + /* + * Check if all columns are part of the REPLICA IDENTITY index or not. + * + * If the publication is FOR ALL TABLES then it means the table has no + * column list and we can skip the validation. + */ + if (!pubform->puballtables && + (pubform->pubupdate || pubform->pubdelete) && + pub_collist_contains_invalid_column(pubid, relation, ancestors, + pubform->pubviaroot)) + { + if (pubform->pubupdate) + pubdesc->cols_valid_for_update = false; + if (pubform->pubdelete) + pubdesc->cols_valid_for_delete = false; + } + ReleaseSysCache(tup); /* @@ -5677,6 +5698,16 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate && !pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete) break; + + /* + * If we know everything is replicated and the column list is invalid + * for update and delete, there is no point to check for other + * publications. + */ + if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate && + pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate && + !pubdesc->cols_valid_for_update && !pubdesc->cols_valid_for_delete) + break; } if (relation->rd_pubdesc) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 00629f0836..535b160165 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4131,6 +4131,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_prpubid; int i_prrelid; int i_prrelqual; + int i_prattrs; int i, j, ntups; @@ -4144,12 +4145,20 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " - "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual " - "FROM pg_catalog.pg_publication_rel"); + "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, " + "(CASE\n" + " WHEN pr.prattrs IS NOT NULL THEN\n" + " (SELECT array_agg(attname)\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" + " ELSE NULL END) prattrs " + "FROM pg_catalog.pg_publication_rel pr"); else appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " - "NULL AS prrelqual " + "NULL AS prrelqual, NULL AS prattrs " "FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -4160,6 +4169,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); i_prrelqual = PQfnumber(res, "prrelqual"); + i_prattrs = PQfnumber(res, "prattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4205,6 +4215,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) else pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual)); + if (!PQgetisnull(res, i, i_prattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer attribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prattrs), + &attnames, &nattnames)) + fatal("could not parse %s array", "prattrs"); + attribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(attribs, ", "); + + appendPQExpBufferStr(attribs, fmtId(attnames[k])); + } + pubrinfo[j].pubrattrs = attribs->data; + } + else + pubrinfo[j].pubrattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4300,6 +4332,9 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) appendPQExpBuffer(query, " %s", fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrattrs) + appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + if (pubrinfo->pubrelqual) { /* diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 893725d121..688093c55e 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -634,6 +634,7 @@ typedef struct _PublicationRelInfo PublicationInfo *publication; TableInfo *pubtable; char *pubrelqual; + char *pubrattrs; } PublicationRelInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 0e724b0366..af5d6fa5a3 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2438,6 +2438,28 @@ my %tests = ( unlike => { exclude_dump_test_schema => 1, }, }, + 'ALTER PUBLICATION pub1 ADD TABLE test_sixth_table (col3, col2)' => { + create_order => 52, + create_sql => + 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_sixth_table (col3, col2);', + regexp => qr/^ + \QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_sixth_table (col2, col3);\E + /xm, + like => { %full_runs, section_post_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + + 'ALTER PUBLICATION pub1 ADD TABLE test_seventh_table (col3, col2) WHERE (col1 = 1)' => { + create_order => 52, + create_sql => + 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_seventh_table (col3, col2) WHERE (col1 = 1);', + regexp => qr/^ + \QALTER PUBLICATION pub1 ADD TABLE ONLY dump_test.test_seventh_table (col2, col3) WHERE ((col1 = 1));\E + /xm, + like => { %full_runs, section_post_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + 'ALTER PUBLICATION pub3 ADD ALL TABLES IN SCHEMA dump_test' => { create_order => 51, create_sql => @@ -2809,6 +2831,44 @@ my %tests = ( unlike => { exclude_dump_test_schema => 1, }, }, + 'CREATE TABLE test_sixth_table' => { + create_order => 6, + create_sql => 'CREATE TABLE dump_test.test_sixth_table ( + col1 int, + col2 text, + col3 bytea + );', + regexp => qr/^ + \QCREATE TABLE dump_test.test_sixth_table (\E + \n\s+\Qcol1 integer,\E + \n\s+\Qcol2 text,\E + \n\s+\Qcol3 bytea\E + \n\); + /xm, + like => + { %full_runs, %dump_test_schema_runs, section_pre_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + + 'CREATE TABLE test_seventh_table' => { + create_order => 6, + create_sql => 'CREATE TABLE dump_test.test_seventh_table ( + col1 int, + col2 text, + col3 bytea + );', + regexp => qr/^ + \QCREATE TABLE dump_test.test_seventh_table (\E + \n\s+\Qcol1 integer,\E + \n\s+\Qcol2 text,\E + \n\s+\Qcol3 bytea\E + \n\); + /xm, + like => + { %full_runs, %dump_test_schema_runs, section_pre_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + 'CREATE TABLE test_table_identity' => { create_order => 3, create_sql => 'CREATE TABLE dump_test.test_table_identity ( diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index b8a532a45f..4dddf08789 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -2960,6 +2960,7 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" @@ -2967,6 +2968,12 @@ describeOneTableDetails(const char *schemaname, "UNION\n" "SELECT pubname\n" " , pg_get_expr(pr.prqual, c.oid)\n" + " , (CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " (SELECT string_agg(attname, ', ')\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" + " ELSE NULL END) " "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" @@ -2974,6 +2981,7 @@ describeOneTableDetails(const char *schemaname, "UNION\n" "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" "ORDER BY 1;", @@ -2984,12 +2992,14 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" "WHERE pr.prrelid = '%s'\n" "UNION ALL\n" "SELECT pubname\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" "ORDER BY 1;", @@ -3011,6 +3021,11 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, " \"%s\"", PQgetvalue(result, i, 0)); + /* column list (if any) */ + if (!PQgetisnull(result, i, 2)) + appendPQExpBuffer(&buf, " (%s)", + PQgetvalue(result, i, 2)); + /* row filter (if any) */ if (!PQgetisnull(result, i, 1)) appendPQExpBuffer(&buf, " WHERE %s", @@ -5979,7 +5994,7 @@ listPublications(const char *pattern) */ static bool addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, - bool singlecol, printTableContent *cont) + bool as_schema, printTableContent *cont) { PGresult *res; int count = 0; @@ -5996,15 +6011,19 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, for (i = 0; i < count; i++) { - if (!singlecol) + if (as_schema) + printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); + else { printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), PQgetvalue(res, i, 1)); + + if (!PQgetisnull(res, i, 3)) + appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 3)); + if (!PQgetisnull(res, i, 2)) appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2)); } - else - printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); printTableAddFooter(cont, buf->data); } @@ -6155,11 +6174,22 @@ describePublications(const char *pattern) printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname"); if (pset.sversion >= 150000) + { appendPQExpBufferStr(&buf, ", pg_get_expr(pr.prqual, c.oid)"); + appendPQExpBufferStr(&buf, + ", (CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " pg_catalog.array_to_string(" + " ARRAY(SELECT attname\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n" + " ELSE NULL END)"); + } else appendPQExpBufferStr(&buf, - ", NULL"); + ", NULL, NULL"); appendPQExpBuffer(&buf, "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" @@ -6189,9 +6219,9 @@ describePublications(const char *pattern) if (!puballsequences) { - /* Get the tables for the specified publication */ + /* Get the sequences for the specified publication */ printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname, NULL\n" + "SELECT n.nspname, c.relname, NULL, NULL\n" "FROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 97f26208e1..186d8ea74b 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -95,6 +95,13 @@ typedef struct PublicationDesc */ bool rf_valid_for_update; bool rf_valid_for_delete; + + /* + * true if the columns are part of the replica identity or the publication actions + * do not include UPDATE or DELETE. + */ + bool cols_valid_for_update; + bool cols_valid_for_delete; } PublicationDesc; typedef struct Publication @@ -111,6 +118,7 @@ typedef struct PublicationRelInfo { Relation relation; Node *whereClause; + List *columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -137,6 +145,7 @@ extern List *GetPublicationRelations(Oid pubid, char objectType, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern void GetActionsInPublication(Oid pubid, PublicationActions *actions); extern List *GetPublicationSchemas(Oid pubid, char objectType); extern List *GetSchemaPublications(Oid schemaid, char objectType); extern List *GetSchemaPublicationRelations(Oid schemaid, char objectType, @@ -160,6 +169,9 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, char objectType, bool if_not_exists); +extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, + MemoryContext mcxt); + extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 0dd0f425db..4feb581899 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -34,6 +34,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ + int2vector prattrs; /* columns to replicate */ #endif } FormData_pg_publication_rel; diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 7813cbcb6b..ae87caf089 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -31,7 +31,9 @@ extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); extern void InvalidatePublicationRels(List *relids); -extern bool contain_invalid_rfcolumn(Oid pubid, Relation relation, +extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, + List *ancestors, bool pubviaroot); +extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot); #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index cb1fcc0ee3..5a458c42e5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3654,6 +3654,7 @@ typedef struct PublicationTable NodeTag type; RangeVar *relation; /* relation to be published */ Node *whereClause; /* qualifications */ + List *columns; /* List of columns in a publication table */ } PublicationTable; /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index fb86ca022d..13ee10fdd4 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -222,12 +222,12 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, - bool binary); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - TupleTableSlot *newslot, bool binary); + TupleTableSlot *newslot, bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -250,7 +250,7 @@ extern void logicalrep_write_sequence(StringInfo out, Relation rel, bool is_called); extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index a5a519d6c8..0308e40ba6 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -1137,6 +1137,369 @@ DROP TABLE rf_tbl_abcd_pk; DROP TABLE rf_tbl_abcd_nopk; DROP TABLE rf_tbl_abcd_part_pk; -- ====================================================== +-- fail - duplicate tables are not allowed if that table has any column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert'); +ERROR: conflicting or redundant column lists for table "testpub_tbl1" +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert'); +ERROR: conflicting or redundant column lists for table "testpub_tbl1" +RESET client_min_messages; +-- test for column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1; +CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert'); +RESET client_min_messages; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text, + d int generated always as (a + length(b)) stored); +-- error: column "x" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); +ERROR: column "x" of relation "testpub_tbl5" does not exist +-- error: replica identity "a" not included in the column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); +UPDATE testpub_tbl5 SET a = 1; +ERROR: cannot update table "testpub_tbl5" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- error: generated column "d" can't be in list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ERROR: cannot reference generated column "d" in publication column list +-- error: system attributes "ctid" not allowed in column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); +ERROR: cannot reference system column "ctid" in publication column list +-- ok +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice +ERROR: cannot drop column c of table testpub_tbl5 because other objects depend on it +DETAIL: publication of table testpub_tbl5 in publication testpub_fortable depends on column c of table testpub_tbl5 +HINT: Use DROP ... CASCADE to drop the dependent objects too. +-- ok: for insert-only publication, any column list is acceptable +ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c); +/* not all replica identities are good enough */ +CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c); +ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +-- error: replica identity (b,c) is not covered by column list (a, c) +UPDATE testpub_tbl5 SET a = 1; +ERROR: cannot update table "testpub_tbl5" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- error: change the replica identity to "b", and column list to (a, c) +-- then update fails, because (a, c) does not cover replica identity +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +UPDATE testpub_tbl5 SET a = 1; +ERROR: cannot update table "testpub_tbl5" +DETAIL: Column list used by the publication does not cover the replica identity. +/* But if upd/del are not published, it works OK */ +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); +RESET client_min_messages; +ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok +\dRp+ testpub_table_ins + Publication testpub_table_ins + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | f | f | t | f | f +Tables: + "public.testpub_tbl5" (a) + +-- tests with REPLICA IDENTITY FULL +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); +UPDATE testpub_tbl6 SET a = 1; +ERROR: cannot update table "testpub_tbl6" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok +UPDATE testpub_tbl6 SET a = 1; +-- make sure changing the column list is propagated to the catalog +CREATE TABLE testpub_tbl7 (a int primary key, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 + Table "public.testpub_tbl7" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+----------+--------------+------------- + a | integer | | not null | | plain | | + b | text | | | | extended | | + c | text | | | | extended | | +Indexes: + "testpub_tbl7_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_fortable" (a, b) + +-- ok: the column list is the same, we should skip this table (or at least not fail) +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 + Table "public.testpub_tbl7" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+----------+--------------+------------- + a | integer | | not null | | plain | | + b | text | | | | extended | | + c | text | | | | extended | | +Indexes: + "testpub_tbl7_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_fortable" (a, b) + +-- ok: the column list changes, make sure the catalog gets updated +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c); +\d+ testpub_tbl7 + Table "public.testpub_tbl7" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+----------+--------------+------------- + a | integer | | not null | | plain | | + b | text | | | | extended | | + c | text | | | | extended | | +Indexes: + "testpub_tbl7_pkey" PRIMARY KEY, btree (a) +Publications: + "testpub_fortable" (a, c) + +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +-- ok: column list covers both "a" and "b" +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true'); +RESET client_min_messages; +-- ok: the same thing, but try plain ADD TABLE +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +UPDATE testpub_tbl8 SET a = 1; +-- failure: column list does not cover replica identity for the second partition +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +-- failure: one of the partitions has REPLICA IDENTITY FULL +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +-- add table and then try changing replica identity +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +-- failure: replica identity full can't be used with a column list +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +-- failure: replica identity has to be covered by the column list +ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey; +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP TABLE testpub_tbl8; +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 (a int, b text, c text); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 (a int, b text, c text); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +-- ok: attaching first partition works, because (a) is in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0); +-- failure: second partition has replica identity (c), which si not in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1); +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_1" +DETAIL: Column list used by the publication does not cover the replica identity. +-- failure: changing replica identity to FULL for partition fails, because +-- of the column list on the parent +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; +ERROR: cannot update table "testpub_tbl8_0" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1; +DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list; +-- ====================================================== +-- Test combination of column list and row filter +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_both_filters; +RESET client_min_messages; +CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c)); +ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey; +ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1); +\dRp+ testpub_both_filters + Publication testpub_both_filters + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Sequences | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-----------+---------- + regress_publication_user | f | f | t | t | t | t | t | f +Tables: + "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1) + +\d+ testpub_tbl_both_filters + Table "public.testpub_tbl_both_filters" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + a | integer | | not null | | plain | | + b | integer | | | | plain | | + c | integer | | not null | | plain | | +Indexes: + "testpub_tbl_both_filters_pkey" PRIMARY KEY, btree (a, c) REPLICA IDENTITY +Publications: + "testpub_both_filters" (a, c) WHERE (c <> 1) + +DROP TABLE testpub_tbl_both_filters; +DROP PUBLICATION testpub_both_filters; +-- ====================================================== +-- More column list tests for validating column references +CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int); +CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b)); +CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a); +CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY); +ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10); +-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing) +-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b); +RESET client_min_messages; +-- ok - (a,b) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c); +-- ok - (a,b,c) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - "b" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b); +-- fail - "a" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- ok - there's no replica identity, so any column list works +-- note: it fails anyway, just a bit later because UPDATE requires RI +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +-- Case 2. REPLICA IDENTITY FULL +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" +DETAIL: Column list used by the publication does not cover the replica identity. +-- Case 3. REPLICA IDENTITY NOTHING +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +-- Case 4. REPLICA IDENTITY INDEX +ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c); +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c; +ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c); +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_pk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_nopk" +DETAIL: Column list used by the publication does not cover the replica identity. +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; +-- Tests for partitioned table +-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - cannot use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +ERROR: cannot use publication column list for relation "rf_tbl_abcd_part_pk" +DETAIL: column list cannot be used for a partitioned table when publish_via_partition_root is false. +-- ok - can use column list for partition +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is +-- used for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +ERROR: cannot set publish_via_partition_root = false for publication "testpub6" +DETAIL: The publication contains a column list for a partitioned table "rf_tbl_abcd_part_pk" which is not allowed when publish_via_partition_root is false. +-- Now change the root column list to use a column "b" +-- (which is not in the replica identity) +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b); +-- ok - we don't have column list for partitioned table. +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_part_pk_1" +DETAIL: Column list used by the publication does not cover the replica identity. +-- set PUBLISH_VIA_PARTITION_ROOT to true +-- can use column list for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; +ERROR: cannot update table "rf_tbl_abcd_part_pk_1" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP PUBLICATION testpub6; +DROP TABLE rf_tbl_abcd_pk; +DROP TABLE rf_tbl_abcd_nopk; +DROP TABLE rf_tbl_abcd_part_pk; +-- ====================================================== -- Test cache invalidation FOR ALL TABLES publication SET client_min_messages = 'ERROR'; CREATE TABLE testpub_tbl4(a int); @@ -1582,6 +1945,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes Tables from schemas: "pub_test1" +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ERROR: syntax error at or near "(" +LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); + ^ +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); +ERROR: column specification not allowed for schema +LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)... + ^ -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 151b8be14e..96b02947fa 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -568,6 +568,289 @@ DROP TABLE rf_tbl_abcd_nopk; DROP TABLE rf_tbl_abcd_part_pk; -- ====================================================== +-- fail - duplicate tables are not allowed if that table has any column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1 (a), testpub_tbl1 WITH (publish = 'insert'); +CREATE PUBLICATION testpub_dups FOR TABLE testpub_tbl1, testpub_tbl1 (a) WITH (publish = 'insert'); +RESET client_min_messages; + +-- test for column lists +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_fortable FOR TABLE testpub_tbl1; +CREATE PUBLICATION testpub_fortable_insert WITH (publish = 'insert'); +RESET client_min_messages; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text, + d int generated always as (a + length(b)) stored); +-- error: column "x" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); +-- error: replica identity "a" not included in the column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); +UPDATE testpub_tbl5 SET a = 1; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +-- error: generated column "d" can't be in list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +-- error: system attributes "ctid" not allowed in column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); +-- ok +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice +-- ok: for insert-only publication, any column list is acceptable +ALTER PUBLICATION testpub_fortable_insert ADD TABLE testpub_tbl5 (b, c); + +/* not all replica identities are good enough */ +CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c); +ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +-- error: replica identity (b,c) is not covered by column list (a, c) +UPDATE testpub_tbl5 SET a = 1; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; + +-- error: change the replica identity to "b", and column list to (a, c) +-- then update fails, because (a, c) does not cover replica identity +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); +UPDATE testpub_tbl5 SET a = 1; + +/* But if upd/del are not published, it works OK */ +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); +RESET client_min_messages; +ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok +\dRp+ testpub_table_ins + +-- tests with REPLICA IDENTITY FULL +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; + +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); +UPDATE testpub_tbl6 SET a = 1; +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl6; + +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok +UPDATE testpub_tbl6 SET a = 1; + +-- make sure changing the column list is propagated to the catalog +CREATE TABLE testpub_tbl7 (a int primary key, b text, c text); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 +-- ok: the column list is the same, we should skip this table (or at least not fail) +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, b); +\d+ testpub_tbl7 +-- ok: the column list changes, make sure the catalog gets updated +ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl7 (a, c); +\d+ testpub_tbl7 + +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 0); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 PARTITION OF testpub_tbl8 FOR VALUES WITH (modulus 2, remainder 1); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (b); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; + +-- ok: column list covers both "a" and "b" +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_col_list FOR TABLE testpub_tbl8 (a, b) WITH (publish_via_partition_root = 'true'); +RESET client_min_messages; + +-- ok: the same thing, but try plain ADD TABLE +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +UPDATE testpub_tbl8 SET a = 1; + +-- failure: column list does not cover replica identity for the second partition +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; + +-- failure: one of the partitions has REPLICA IDENTITY FULL +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, c); +UPDATE testpub_tbl8 SET a = 1; +ALTER PUBLICATION testpub_col_list DROP TABLE testpub_tbl8; + +-- add table and then try changing replica identity +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); + +-- failure: replica identity full can't be used with a column list +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; + +-- failure: replica identity has to be covered by the column list +ALTER TABLE testpub_tbl8_1 DROP CONSTRAINT testpub_tbl8_1_pkey; +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; +UPDATE testpub_tbl8 SET a = 1; + +DROP TABLE testpub_tbl8; + +-- column list for partitioned tables has to cover replica identities for +-- all child relations +CREATE TABLE testpub_tbl8 (a int, b text, c text) PARTITION BY HASH (a); +ALTER PUBLICATION testpub_col_list ADD TABLE testpub_tbl8 (a, b); +-- first partition has replica identity "a" +CREATE TABLE testpub_tbl8_0 (a int, b text, c text); +ALTER TABLE testpub_tbl8_0 ADD PRIMARY KEY (a); +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY USING INDEX testpub_tbl8_0_pkey; +-- second partition has replica identity "b" +CREATE TABLE testpub_tbl8_1 (a int, b text, c text); +ALTER TABLE testpub_tbl8_1 ADD PRIMARY KEY (c); +ALTER TABLE testpub_tbl8_1 REPLICA IDENTITY USING INDEX testpub_tbl8_1_pkey; + +-- ok: attaching first partition works, because (a) is in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_0 FOR VALUES WITH (modulus 2, remainder 0); +-- failure: second partition has replica identity (c), which si not in column list +ALTER TABLE testpub_tbl8 ATTACH PARTITION testpub_tbl8_1 FOR VALUES WITH (modulus 2, remainder 1); +UPDATE testpub_tbl8 SET a = 1; + +-- failure: changing replica identity to FULL for partition fails, because +-- of the column list on the parent +ALTER TABLE testpub_tbl8_0 REPLICA IDENTITY FULL; +UPDATE testpub_tbl8 SET a = 1; + +DROP TABLE testpub_tbl5, testpub_tbl6, testpub_tbl7, testpub_tbl8, testpub_tbl8_1; +DROP PUBLICATION testpub_table_ins, testpub_fortable, testpub_fortable_insert, testpub_col_list; +-- ====================================================== + +-- Test combination of column list and row filter +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_both_filters; +RESET client_min_messages; +CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c)); +ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey; +ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1); +\dRp+ testpub_both_filters +\d+ testpub_tbl_both_filters + +DROP TABLE testpub_tbl_both_filters; +DROP PUBLICATION testpub_both_filters; +-- ====================================================== + +-- More column list tests for validating column references +CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int); +CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b)); +CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a); +CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY); +ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10); + +-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing) + +-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK. +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk (a, b); +RESET client_min_messages; +-- ok - (a,b) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c); +-- ok - (a,b,c) coverts all PK cols +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - "b" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (b); +-- fail - "a" is missing from the column list +UPDATE rf_tbl_abcd_pk SET a = 1; + +-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- ok - there's no replica identity, so any column list works +-- note: it fails anyway, just a bit later because UPDATE requires RI +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Case 2. REPLICA IDENTITY FULL +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a, b, c, d); +-- fail - with REPLICA IDENTITY FULL no column list is allowed +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Case 3. REPLICA IDENTITY NOTHING +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING; +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a, b, c, d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (d); +-- ok - REPLICA IDENTITY NOTHING means all column lists are valid +-- it still fails later because without RI we can't replicate updates +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Case 4. REPLICA IDENTITY INDEX +ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c); +ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c; +ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL; +CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c); +ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_pk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (a); +-- fail - column list "a" does not cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk (c); +-- ok - column list "c" does cover the REPLICA IDENTITY INDEX on "c" +UPDATE rf_tbl_abcd_nopk SET a = 1; + +-- Tests for partitioned table + +-- set PUBLISH_VIA_PARTITION_ROOT to false and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - cannot use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +-- ok - can use column list for partition +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- set PUBLISH_VIA_PARTITION_ROOT to true and test column list for partitioned +-- table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (a); +-- ok - "a" is a PK col +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any column list is +-- used for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- Now change the root column list to use a column "b" +-- (which is not in the replica identity) +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 (b); +-- ok - we don't have column list for partitioned table. +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; +-- set PUBLISH_VIA_PARTITION_ROOT to true +-- can use column list for partitioned table +ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1); +-- ok - can use column list for partitioned table +ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk (b); +-- fail - "b" is not in REPLICA IDENTITY INDEX +UPDATE rf_tbl_abcd_part_pk SET a = 1; + +DROP PUBLICATION testpub6; +DROP TABLE rf_tbl_abcd_pk; +DROP TABLE rf_tbl_abcd_nopk; +DROP TABLE rf_tbl_abcd_part_pk; +-- ====================================================== + -- Test cache invalidation FOR ALL TABLES publication SET client_min_messages = 'ERROR'; CREATE TABLE testpub_tbl4(a int); @@ -809,6 +1092,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); + -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl new file mode 100644 index 0000000000..c778840f4c --- /dev/null +++ b/src/test/subscription/t/031_column_list.pl @@ -0,0 +1,1131 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test partial-column publication of tables +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +sub wait_for_subscription_sync +{ + my ($node) = @_; + + # Also wait for initial table sync to finish + my $synced_query = "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + + $node->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +} + +# setup tables on both nodes + +# tab1: simple 1:1 replication +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int) +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int) +)); + +# tab2: replication from regular to table with fewer columns +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab2 (a int PRIMARY KEY, b varchar) +)); + +# tab3: simple 1:1 replication with weird column names +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab3 ("a'" int PRIMARY KEY, "B" varchar, "c'" int) +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab3 ("a'" int PRIMARY KEY, "c'" int) +)); + +# test_part: partitioned tables, with partitioning (including multi-level +# partitioning, and fewer columns on the subscriber) +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a); + CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6); + CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a); + CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a); + CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6); + CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a); + CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10); +)); + +# tab4: table with user-defined enum types +$node_publisher->safe_psql('postgres', qq( + CREATE TYPE test_typ AS ENUM ('blue', 'red'); + CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, c int, d text); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TYPE test_typ AS ENUM ('blue', 'red'); + CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, d text); +)); + + +# TEST: create publication and subscription for some of the tables with +# column lists +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub1 + FOR TABLE tab1 (a, "B"), tab3 ("a'", "c'"), test_part (a, b), tab4 (a, b, d) + WITH (publish_via_partition_root = 'true'); +)); + +# check that we got the right prattrs values for the publication in the +# pg_publication_rel catalog (order by relname, to get stable ordering) +my $result = $node_publisher->safe_psql('postgres', qq( + SELECT relname, prattrs + FROM pg_publication_rel pb JOIN pg_class pc ON(pb.prrelid = pc.oid) + ORDER BY relname +)); + +is($result, qq(tab1|1 2 +tab3|1 3 +tab4|1 2 4 +test_part|1 2), 'publication relation updated'); + +# TEST: insert data into the tables, create subscription and see if sync +# replicates the right columns +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab1 VALUES (1, 2, 3); + INSERT INTO tab1 VALUES (4, 5, 6); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab3 VALUES (1, 2, 3); + INSERT INTO tab3 VALUES (4, 5, 6); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab4 VALUES (1, 'red', 3, 'oh my'); + INSERT INTO tab4 VALUES (2, 'blue', 4, 'hello'); +)); + +# replication of partitioned table +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part VALUES (1, 'abc', '2021-07-04 12:00:00'); + INSERT INTO test_part VALUES (2, 'bcd', '2021-07-03 11:12:13'); + INSERT INTO test_part VALUES (7, 'abc', '2021-07-04 12:00:00'); + INSERT INTO test_part VALUES (8, 'bcd', '2021-07-03 11:12:13'); +)); + +# create subscription for the publication, wait for sync to complete, +# then check the sync results +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 +)); + +wait_for_subscription_sync($node_subscriber); + +# tab1: only (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1 ORDER BY a"); +is($result, qq(1|2| +4|5|), 'insert on column tab1.c is not replicated'); + +# tab3: only (a,c) is replicated +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab3 ORDER BY "a'")); +is($result, qq(1|3 +4|6), 'insert on column tab3.b is not replicated'); + +# tab4: only (a,b,d) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab4 ORDER BY a"); +is($result, qq(1|red|oh my +2|blue|hello), 'insert on column tab4.c is not replicated'); + +# test_part: (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part ORDER BY a"); +is($result, qq(1|abc +2|bcd +7|abc +8|bcd), 'insert on column test_part.c columns is not replicated'); + + +# TEST: now insert more data into the tables, and wait until we replicate +# them (not by tablesync, but regular decoding and replication) + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab1 VALUES (2, 3, 4); + INSERT INTO tab1 VALUES (5, 6, 7); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab3 VALUES (2, 3, 4); + INSERT INTO tab3 VALUES (5, 6, 7); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab4 VALUES (3, 'red', 5, 'foo'); + INSERT INTO tab4 VALUES (4, 'blue', 6, 'bar'); +)); + +# replication of partitioned table +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part VALUES (3, 'xxx', '2022-02-01 10:00:00'); + INSERT INTO test_part VALUES (4, 'yyy', '2022-03-02 15:12:13'); + INSERT INTO test_part VALUES (9, 'zzz', '2022-04-03 21:00:00'); + INSERT INTO test_part VALUES (10, 'qqq', '2022-05-04 22:12:13'); +)); + +# wait for catchup before checking the subscriber +$node_publisher->wait_for_catchup('sub1'); + +# tab1: only (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1 ORDER BY a"); +is($result, qq(1|2| +2|3| +4|5| +5|6|), 'insert on column tab1.c is not replicated'); + +# tab3: only (a,c) is replicated +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab3 ORDER BY "a'")); +is($result, qq(1|3 +2|4 +4|6 +5|7), 'insert on column tab3.b is not replicated'); + +# tab4: only (a,b,d) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab4 ORDER BY a"); +is($result, qq(1|red|oh my +2|blue|hello +3|red|foo +4|blue|bar), 'insert on column tab4.c is not replicated'); + +# test_part: (a,b) is replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part ORDER BY a"); +is($result, qq(1|abc +2|bcd +3|xxx +4|yyy +7|abc +8|bcd +9|zzz +10|qqq), 'insert on column test_part.c columns is not replicated'); + + +# TEST: do some updates on some of the tables, both on columns included +# in the column list and other + +# tab1: update of replicated column +$node_publisher->safe_psql('postgres', + qq(UPDATE tab1 SET "B" = 2 * "B" where a = 1)); + +# tab1: update of non-replicated column +$node_publisher->safe_psql('postgres', + qq(UPDATE tab1 SET c = 2*c where a = 4)); + +# tab3: update of non-replicated +$node_publisher->safe_psql('postgres', + qq(UPDATE tab3 SET "B" = "B" || ' updated' where "a'" = 4)); + +# tab3: update of replicated column +$node_publisher->safe_psql('postgres', + qq(UPDATE tab3 SET "c'" = 2 * "c'" where "a'" = 1)); + +# tab4 +$node_publisher->safe_psql('postgres', + qq(UPDATE tab4 SET b = 'blue', c = c * 2, d = d || ' updated' where a = 1)); + +# tab4 +$node_publisher->safe_psql('postgres', + qq(UPDATE tab4 SET b = 'red', c = c * 2, d = d || ' updated' where a = 2)); + +# wait for the replication to catch up, and check the UPDATE results got +# replicated correctly, with the right column list +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab1 ORDER BY a)); +is($result, +qq(1|4| +2|3| +4|5| +5|6|), 'only update on column tab1.b is replicated'); + +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab3 ORDER BY "a'")); +is($result, +qq(1|6 +2|4 +4|6 +5|7), 'only update on column tab3.c is replicated'); + +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT * FROM tab4 ORDER BY a)); + +is($result, qq(1|blue|oh my updated +2|red|hello updated +3|red|foo +4|blue|bar), 'update on column tab4.c is not replicated'); + + +# TEST: add table with a column list, insert data, replicate + +# insert some data before adding it to the publication +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab2 VALUES (1, 'abc', 3); +)); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab2 (a, b)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"); + +# wait for the tablesync to complete, add a bit more data and then check +# the results of the replication +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab2 VALUES (2, 'def', 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2 ORDER BY a"); +is($result, qq(1|abc +2|def), 'insert on column tab2.c is not replicated'); + +# do a couple updates, check the correct stuff gets replicated +$node_publisher->safe_psql('postgres', qq( + UPDATE tab2 SET c = 5 where a = 1; + UPDATE tab2 SET b = 'xyz' where a = 2; +)); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2 ORDER BY a"); +is($result, qq(1|abc +2|xyz), 'update on column tab2.c is not replicated'); + + +# TEST: add a table to two publications with different column lists, and +# create a single subscription replicating both publications +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int); + CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b); + CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d); + + -- insert a couple initial records + INSERT INTO tab5 VALUES (1, 11, 111, 1111); + INSERT INTO tab5 VALUES (2, 22, 222, 2222); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->wait_for_catchup('sub1'); + +# insert data and make sure all the columns (union of the columns lists) +# get fully replicated +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab5 VALUES (3, 33, 333, 3333); + INSERT INTO tab5 VALUES (4, 44, 444, 4444); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"), + qq(1|11|1111 +2|22|2222 +3|33|3333 +4|44|4444), + 'overlapping publications with overlapping column lists'); + +# and finally, remove the column list for one of the publications, which +# means replicating all columns (removing the column list), but first add +# the missing column to the table on subscriber +$node_publisher->safe_psql('postgres', qq( + ALTER PUBLICATION pub3 SET TABLE tab5; +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; + ALTER TABLE tab5 ADD COLUMN c INT; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab5 VALUES (5, 55, 555, 5555); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"), + qq(1|11|1111| +2|22|2222| +3|33|3333| +4|44|4444| +5|55|5555|555), + 'overlapping publications with overlapping column lists'); + +# TEST: create a table with a column list, then change the replica +# identity by replacing a primary key (but use a different column in +# the column list) +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int); + CREATE PUBLICATION pub4 FOR TABLE tab6 (a, b); + + -- initial data + INSERT INTO tab6 VALUES (1, 22, 333, 4444); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab6 VALUES (2, 33, 444, 5555); + UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab6 ORDER BY a"), + qq(1|44|| +2|66||), 'replication with the original primary key'); + +# now redefine the constraint - move the primary key to a different column +# (which is still covered by the column list, though) + +$node_publisher->safe_psql('postgres', qq( + ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey; + ALTER TABLE tab6 ADD PRIMARY KEY (b); +)); + +# we need to do the same thing on the subscriber +# XXX What would happen if this happens before the publisher ALTER? Or +# interleaved, somehow? But that seems unrelated to column lists. +$node_subscriber->safe_psql('postgres', qq( + ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey; + ALTER TABLE tab6 ADD PRIMARY KEY (b); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab6 VALUES (3, 55, 666, 8888); + UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab6 ORDER BY a"), + qq(1|88|| +2|132|| +3|110||), + 'replication with the modified primary key'); + + +# TEST: create a table with a column list, then change the replica +# identity by replacing a primary key with a key on multiple columns +# (all of them covered by the column list) +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int); + CREATE PUBLICATION pub5 FOR TABLE tab7 (a, b); + + -- some initial data + INSERT INTO tab7 VALUES (1, 22, 333, 4444); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int); +)); + +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab7 VALUES (2, 33, 444, 5555); + UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"), + qq(1|44|| +2|66||), 'replication with the original primary key'); + +# now redefine the constraint - move the primary key to a different column +# (which is not covered by the column list) +$node_publisher->safe_psql('postgres', qq( + ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey; + ALTER TABLE tab7 ADD PRIMARY KEY (a, b); +)); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO tab7 VALUES (3, 55, 666, 7777); + UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"), + qq(1|88|| +2|132|| +3|110||), + 'replication with the modified primary key'); + +# now switch the primary key again to another columns not covered by the +# column list, but also generate writes between the drop and creation +# of the new constraint + +$node_publisher->safe_psql('postgres', qq( + ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey; + INSERT INTO tab7 VALUES (4, 77, 888, 9999); + -- update/delete is not allowed for tables without RI + ALTER TABLE tab7 ADD PRIMARY KEY (b, a); + UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; + DELETE FROM tab7 WHERE a = 1; +)); + +$node_publisher->safe_psql('postgres', qq( +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab7 ORDER BY a"), + qq(2|264|| +3|220|| +4|154||), + 'replication with the modified primary key'); + + +# TEST: partitioned tables (with publish_via_partition_root = false) +# and replica identity. The (leaf) partitions may have different RI, so +# we need to check the partition RI (with respect to the column list) +# while attaching the partition. + +# First, let's create a partitioned table with two partitions, each with +# a different RI, but a column list not covering all those RI. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_a (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; + + CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; + + -- initial data, one row in each partition + INSERT INTO test_part_a VALUES (1, 3); + INSERT INTO test_part_a VALUES (6, 4); +)); + +# do the same thing on the subscriber (with the opposite column order) +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_a (b int, a int) PARTITION BY LIST (a); + + CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; + + CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; +)); + +# create a publication replicating just the column "a", which is not enough +# for the second partition +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub6 FOR TABLE test_part_a (b, a) WITH (publish_via_partition_root = true); + ALTER PUBLICATION pub6 ADD TABLE test_part_a_1 (a); + ALTER PUBLICATION pub6 ADD TABLE test_part_a_2 (b); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_a VALUES (2, 5); + INSERT INTO test_part_a VALUES (7, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT a, b FROM test_part_a ORDER BY a, b"), + qq(1|3 +2|5 +6|4 +7|6), + 'partitions with different replica identities not replicated correctly'); + +# This time start with a column list covering RI for all partitions, but +# then update the column list to not cover column "b" (needed by the +# second partition) + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey; + + CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey; + + -- initial data, one row in each partitions + INSERT INTO test_part_b VALUES (1, 1); + INSERT INTO test_part_b VALUES (6, 2); +)); + +# do the same thing on the subscriber +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5); + ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey; + + CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10); + ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey; +)); + +# create a publication replicating both columns, which is sufficient for +# both partitions +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub7 FOR TABLE test_part_b (a, b) WITH (publish_via_partition_root = true); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_b VALUES (2, 3); + INSERT INTO test_part_b VALUES (7, 4); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_b ORDER BY a, b"), + qq(1|1 +2|3 +6|2 +7|4), + 'partitions with different replica identities not replicated correctly'); + + +# TEST: This time start with a column list covering RI for all partitions, +# but then update RI for one of the partitions to not be covered by the +# column list anymore. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3); + ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey; + + CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4); + ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey; + + -- initial data, one row for each partition + INSERT INTO test_part_c VALUES (1, 3, 5); + INSERT INTO test_part_c VALUES (2, 4, 6); +)); + +# do the same thing on the subscriber +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3); + ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey; + + CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4); + ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b); + ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey; +)); + +# create a publication replicating data through partition root, with a column +# list on the root, and then add the partitions one by one with separate +# column lists (but those are not applied) +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a,c); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; +)); + +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_c VALUES (3, 7, 8); + INSERT INTO test_part_c VALUES (4, 9, 10); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_c ORDER BY a, b"), + qq(1||5 +2|4| +3||8 +4|9|), + 'partitions with different replica identities not replicated correctly'); + + +# create a publication not replicating data through partition root, without +# a column list on the root, and then add the partitions one by one with +# separate column lists +$node_publisher->safe_psql('postgres', qq( + DROP PUBLICATION pub8; + CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a); + ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; + TRUNCATE test_part_c; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + TRUNCATE test_part_c; + INSERT INTO test_part_c VALUES (1, 3, 5); + INSERT INTO test_part_c VALUES (2, 4, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_c ORDER BY a, b"), + qq(1|| +2|4|), + 'partitions with different replica identities not replicated correctly'); + + +# TEST: Start with a single partition, with RI compatible with the column +# list, and then attach a partition with incompatible RI. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3); + ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey; + + INSERT INTO test_part_d VALUES (1, 2); +)); + +# do the same thing on the subscriber (in fact, create both partitions right +# away, no need to delay that) +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a); + + CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3); + ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a); + ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey; + + CREATE TABLE test_part_d_2 PARTITION OF test_part_d FOR VALUES IN (2,4); + ALTER TABLE test_part_d_2 ADD PRIMARY KEY (a); + ALTER TABLE test_part_d_2 REPLICA IDENTITY USING INDEX test_part_d_2_pkey; +)); + +# create a publication replicating both columns, which is sufficient for +# both partitions +$node_publisher->safe_psql('postgres', qq( + CREATE PUBLICATION pub9 FOR TABLE test_part_d (a) WITH (publish_via_partition_root = true); +)); + +# add the publication to our subscription, wait for sync to complete +$node_subscriber->safe_psql('postgres', qq( + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9 +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_part_d VALUES (3, 4); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_d ORDER BY a, b"), + qq(1| +3|), + 'partitions with different replica identities not replicated correctly'); + +# TEST: With a table included in multiple publications, we should use a +# union of the column lists. So with column lists (a,b) and (a,c) we +# should replicate (a,b,c). + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); + CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); + + -- initial data + INSERT INTO test_mix_1 VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_mix_1 VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_1 ORDER BY a"), + qq(1|2|3 +4|5|6), + 'a mix of publications should use a union of column list'); + + +# TEST: With a table included in multiple publications, we should use a +# union of the column lists. If any of the publications is FOR ALL +# TABLES, we should replicate all columns. + +# drop unnecessary tables, so as not to interfere with the FOR ALL TABLES +$node_publisher->safe_psql('postgres', qq( + DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1, + test_part, test_part_a, test_part_b, test_part_c, test_part_d; +)); + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b); + CREATE PUBLICATION pub_mix_4 FOR ALL TABLES; + + -- initial data + INSERT INTO test_mix_2 VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_3, pub_mix_4; + ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_mix_2 VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_2"), + qq(1|2|3 +4|5|6), + 'a mix of publications should use a union of column list'); + + +# TEST: With a table included in multiple publications, we should use a +# union of the column lists. If any of the publications is FOR ALL +# TABLES IN SCHEMA, we should replicate all columns. + +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); +)); + +$node_publisher->safe_psql('postgres', qq( + DROP TABLE test_mix_2; + CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b); + CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public; + + -- initial data + INSERT INTO test_mix_3 VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_mix_3 VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_3"), + qq(1|2|3 +4|5|6), + 'a mix of publications should use a union of column list'); + + +# TEST: Check handling of publish_via_partition_root - if a partition is +# published through partition root, we should only apply the column list +# defined for the whole table (not the partitions) - both during the initial +# sync and when replicating changes. This is what we do for row filters. + +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + + CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); + CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); +)); + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); + CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); + + CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true); + + -- initial data + INSERT INTO test_root VALUES (1, 2, 3); + INSERT INTO test_root VALUES (10, 20, 30); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_root VALUES (2, 3, 4); + INSERT INTO test_root VALUES (11, 21, 31); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_root ORDER BY a, b, c"), + qq(1|| +2|| +10|| +11||), + 'publication via partition root applies column list'); + + +# TEST: Multiple publications which publish schema of parent table and +# partition. The partition is published through two publications, once +# through a schema (so no column list) containing the parent, and then +# also directly (with a columns list). The expected outcome is there is +# no column list. + +$node_publisher->safe_psql('postgres', qq( + DROP PUBLICATION pub1, pub2, pub3, pub4, pub5, pub6, pub7, pub8; + + CREATE SCHEMA s1; + CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); + + CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1; + CREATE PUBLICATION pub2 FOR TABLE t_1(b); + + -- initial data + INSERT INTO s1.t VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SCHEMA s1; + CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO s1.t VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM s1.t ORDER BY a"), + qq(1|2|3 +4|5|6), + 'two publications, publishing the same relation'); + +# Now resync the subcription, but with publications in the opposite order. +# The result should be the same. + +$node_subscriber->safe_psql('postgres', qq( + TRUNCATE s1.t; + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO s1.t VALUES (7, 8, 9); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM s1.t ORDER BY a"), + qq(7|8|9), + 'two publications, publishing the same relation'); + + +# TEST: One publication, containing both the parent and child relations. +# The expected outcome is list "a", because that's the column list defined +# for the top-most ancestor added to the publication. + +$node_publisher->safe_psql('postgres', qq( + DROP SCHEMA s1 CASCADE; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + CREATE PUBLICATION pub3 FOR TABLE t_1 (a), t_2 + WITH (PUBLISH_VIA_PARTITION_ROOT); + + -- initial data + INSERT INTO t VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + DROP SCHEMA s1 CASCADE; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO t VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"), + qq(1|| +4||), + 'publication containing both parent and child relation'); + + +# TEST: One publication, containing both the parent and child relations. +# The expected outcome is list "a", because that's the column list defined +# for the top-most ancestor added to the publication. +# Note: The difference from the preceding test is that in this case both +# relations have a column list defined. + +$node_publisher->safe_psql('postgres', qq( + DROP TABLE t; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + CREATE PUBLICATION pub4 FOR TABLE t_1 (a), t_2 (b) + WITH (PUBLISH_VIA_PARTITION_ROOT); + + -- initial data + INSERT INTO t VALUES (1, 2, 3); +)); + +$node_subscriber->safe_psql('postgres', qq( + DROP TABLE t; + CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); + CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) + PARTITION BY RANGE (a); + CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); + + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4; +)); + +wait_for_subscription_sync($node_subscriber); + +$node_publisher->safe_psql('postgres', qq( + INSERT INTO t VALUES (4, 5, 6); +)); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"), + qq(1|| +4||), + 'publication containing both parent and child relation'); + + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +done_testing();