From 83fd4532a72179c370e318075a10e0e2aa832024 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Wed, 8 Apr 2020 09:59:27 +0200 Subject: [PATCH] Allow publishing partition changes via ancestors To control whether partition changes are replicated using their own identity and schema or an ancestor's, add a new parameter that can be set per publication named 'publish_via_partition_root'. This allows replicating a partitioned table into a different partition structure on the subscriber. Author: Amit Langote Reviewed-by: Rafia Sabih Reviewed-by: Peter Eisentraut Reviewed-by: Petr Jelinek Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 10 + doc/src/sgml/logical-replication.sgml | 12 +- doc/src/sgml/ref/create_publication.sgml | 20 ++ src/backend/catalog/pg_publication.c | 70 ++--- src/backend/commands/publicationcmds.c | 95 ++++--- src/backend/replication/pgoutput/pgoutput.c | 223 ++++++++++++--- src/backend/utils/cache/relcache.c | 15 + src/bin/pg_dump/pg_dump.c | 24 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 17 +- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_publication.h | 5 +- src/test/regress/expected/publication.out | 103 ++++--- src/test/regress/sql/publication.sql | 3 + src/test/subscription/t/013_partition.pl | 298 +++++++++++++++++++- 15 files changed, 724 insertions(+), 174 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 0d61d98b11..386c6d7bd1 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5437,6 +5437,16 @@ SCRAM-SHA-256$<iteration count>:&l If true, TRUNCATE operations are replicated for tables in the publication. + + + pubviaroot + bool + + If true, operations on a leaf partition are replicated using the + identity and schema of its topmost partitioned ancestor mentioned in the + publication instead of its own. + + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index c513621470..eba331a72b 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -411,10 +411,14 @@ When replicating between partitioned tables, the actual replication - originates from the leaf partitions on the publisher, so partitions on - the publisher must also exist on the subscriber as valid target tables. - (They could either be leaf partitions themselves, or they could be - further subpartitioned, or they could even be independent tables.) + originates, by default, from the leaf partitions on the publisher, so + partitions on the publisher must also exist on the subscriber as valid + target tables. (They could either be leaf partitions themselves, or they + could be further subpartitioned, or they could even be independent + tables.) Publications can also specify that changes are to be replicated + using the identity and schema of the partitioned root table instead of + that of the individual leaf partitions in which the changes actually + originate (see ). diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 597cb28f33..2c52a8aada 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -123,6 +123,26 @@ CREATE PUBLICATION name + + + publish_via_partition_root (boolean) + + + This parameter determines whether changes in a partitioned table (or + on its partitions) contained in the publication will be published + using the identity and schema of the partitioned table rather than + that of the individual partitions that are actually changed; the + latter is the default. Enablings this allows the changes to be + replicated into a non-partitioned table or a partitioned table + consisting of a different set of partitions. + + + + If this is enabled, TRUNCATE operations performed + directly on partitions are not replicated. + + + diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 500a5ae1ee..68f6887b38 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -42,8 +42,6 @@ #include "utils/rel.h" #include "utils/syscache.h" -static List *get_rel_publications(Oid relid); - /* * Check if relation can be in given publication and throws appropriate * error if not. @@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel, return myself; } - -/* - * Gets list of publication oids for a relation, plus those of ancestors, - * if any, if the relation is a partition. - */ +/* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) -{ - List *result = NIL; - - result = get_rel_publications(relid); - if (get_rel_relispartition(relid)) - { - List *ancestors = get_partition_ancestors(relid); - ListCell *lc; - - foreach(lc, ancestors) - { - Oid ancestor = lfirst_oid(lc); - List *ancestor_pubs = get_rel_publications(ancestor); - - result = list_concat(result, ancestor_pubs); - } - } - - return result; -} - -/* Workhorse of GetRelationPublications() */ -static List * -get_rel_publications(Oid relid) { List *result = NIL; CatCList *pubrellist; @@ -373,9 +343,13 @@ GetAllTablesPublications(void) /* * Gets list of all relation published by FOR ALL TABLES publication(s). + * + * If the publication publishes partition changes via their respective root + * partitioned tables, we must exclude partitions in favor of including the + * root partitioned tables. */ List * -GetAllTablesPublicationRelations(void) +GetAllTablesPublicationRelations(bool pubviaroot) { Relation classRel; ScanKeyData key[1]; @@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; - if (is_publishable_class(relid, relForm)) + if (is_publishable_class(relid, relForm) && + !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); } table_endscan(scan); - table_close(classRel, AccessShareLock); + + if (pubviaroot) + { + ScanKeyInit(&key[0], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_PARTITIONED_TABLE)); + + scan = table_beginscan_catalog(classRel, 1, key); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm) && + !relForm->relispartition) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + } return result; } @@ -433,6 +430,7 @@ GetPublication(Oid pubid) pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; + pub->pubviaroot = pubform->pubviaroot; ReleaseSysCache(tup); @@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->alltables) - tables = GetAllTablesPublicationRelations(); + tables = GetAllTablesPublicationRelations(publication->pubviaroot); else tables = GetPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); funcctx->user_fctx = (void *) tables; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 494c0bdc28..771268f70a 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -23,6 +23,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/partition.h" #include "catalog/pg_inherits.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" @@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); static void parse_publication_options(List *options, bool *publish_given, - bool *publish_insert, - bool *publish_update, - bool *publish_delete, - bool *publish_truncate) + PublicationActions *pubactions, + bool *publish_via_partition_root_given, + bool *publish_via_partition_root) { ListCell *lc; *publish_given = false; + *publish_via_partition_root_given = false; - /* Defaults are true */ - *publish_insert = true; - *publish_update = true; - *publish_delete = true; - *publish_truncate = true; + /* defaults */ + pubactions->pubinsert = true; + pubactions->pubupdate = true; + pubactions->pubdelete = true; + pubactions->pubtruncate = true; + *publish_via_partition_root = false; /* Parse options */ foreach(lc, options) @@ -91,10 +93,10 @@ parse_publication_options(List *options, * If publish option was given only the explicitly listed actions * should be published. */ - *publish_insert = false; - *publish_update = false; - *publish_delete = false; - *publish_truncate = false; + pubactions->pubinsert = false; + pubactions->pubupdate = false; + pubactions->pubdelete = false; + pubactions->pubtruncate = false; *publish_given = true; publish = defGetString(defel); @@ -110,19 +112,28 @@ parse_publication_options(List *options, char *publish_opt = (char *) lfirst(lc); if (strcmp(publish_opt, "insert") == 0) - *publish_insert = true; + pubactions->pubinsert = true; else if (strcmp(publish_opt, "update") == 0) - *publish_update = true; + pubactions->pubupdate = true; else if (strcmp(publish_opt, "delete") == 0) - *publish_delete = true; + pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) - *publish_truncate = true; + pubactions->pubtruncate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt))); } } + else if (strcmp(defel->defname, "publish_via_partition_root") == 0) + { + if (*publish_via_partition_root_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + *publish_via_partition_root_given = true; + *publish_via_partition_root = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt) Datum values[Natts_pg_publication]; HeapTuple tup; bool publish_given; - bool publish_insert; - bool publish_update; - bool publish_delete; - bool publish_truncate; + PublicationActions pubactions; + bool publish_via_partition_root_given; + bool publish_via_partition_root; AclResult aclresult; /* must have CREATE privilege on database */ @@ -183,9 +193,9 @@ CreatePublication(CreatePublicationStmt *stmt) values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId()); parse_publication_options(stmt->options, - &publish_given, &publish_insert, - &publish_update, &publish_delete, - &publish_truncate); + &publish_given, &pubactions, + &publish_via_partition_root_given, + &publish_via_partition_root); puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId, Anum_pg_publication_oid); @@ -193,13 +203,15 @@ CreatePublication(CreatePublicationStmt *stmt) values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); values[Anum_pg_publication_pubinsert - 1] = - BoolGetDatum(publish_insert); + BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = - BoolGetDatum(publish_update); + BoolGetDatum(pubactions.pubupdate); values[Anum_pg_publication_pubdelete - 1] = - BoolGetDatum(publish_delete); + BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = - BoolGetDatum(publish_truncate); + BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubviaroot - 1] = + BoolGetDatum(publish_via_partition_root); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, bool replaces[Natts_pg_publication]; Datum values[Natts_pg_publication]; bool publish_given; - bool publish_insert; - bool publish_update; - bool publish_delete; - bool publish_truncate; + PublicationActions pubactions; + bool publish_via_partition_root_given; + bool publish_via_partition_root; ObjectAddress obj; Form_pg_publication pubform; parse_publication_options(stmt->options, - &publish_given, &publish_insert, - &publish_update, &publish_delete, - &publish_truncate); + &publish_given, &pubactions, + &publish_via_partition_root_given, + &publish_via_partition_root); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, if (publish_given) { - values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert); + values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); replaces[Anum_pg_publication_pubinsert - 1] = true; - values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update); + values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate); replaces[Anum_pg_publication_pubupdate - 1] = true; - values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); + values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete); replaces[Anum_pg_publication_pubdelete - 1] = true; - values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); + values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; } + if (publish_via_partition_root_given) + { + values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + replaces[Anum_pg_publication_pubviaroot - 1] = true; + } + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 552a70cffa..5fbf2d4367 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,6 +12,8 @@ */ #include "postgres.h" +#include "access/tupconvert.h" +#include "catalog/partition.h" #include "catalog/pg_publication.h" #include "fmgr.h" #include "replication/logical.h" @@ -20,6 +22,7 @@ #include "replication/pgoutput.h" #include "utils/int8.h" #include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -49,6 +52,7 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); +static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -59,9 +63,31 @@ static void publication_invalidation_cb(Datum arg, int cacheid, typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - bool schema_sent; /* did we send the schema? */ + + /* + * Did we send the schema? If ancestor relid is set, its schema must also + * have been sent for this to be true. + */ + bool schema_sent; + bool replicate_valid; PublicationActions pubactions; + + /* + * OID of the relation to publish changes as. For a partition, this may + * be set to one of its ancestors whose schema will be used when + * replicating changes, if publish_via_partition_root is set for the + * publication. + */ + Oid publish_as_relid; + + /* + * Map used when replicating using an ancestor's schema to convert tuples + * from partition's type to the ancestor's; NULL if publish_as_relid is + * same as 'relid' or if unnecessary due to partition and the ancestor + * having identical TupleDesc. + */ + TupleConversionMap *map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -259,47 +285,71 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Write the relation schema if the current schema hasn't been sent yet. + * Write the current schema of the relation and its ancestor (if any) if not + * done yet. */ static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry) { - if (!relentry->schema_sent) + if (relentry->schema_sent) + return; + + /* If needed, send the ancestor's schema first. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) { - TupleDesc desc; - int i; + Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + MemoryContext oldctx; - desc = RelationGetDescr(relation); + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + relentry->map = convert_tuples_by_name(indesc, outdesc); + MemoryContextSwitchTo(oldctx); + send_relation_and_attrs(ancestor, ctx); + RelationClose(ancestor); + } - /* - * Write out type info if needed. We do that only for user-created - * types. We use FirstGenbkiObjectId as the cutoff, so that we only - * consider objects with hand-assigned OIDs to be "built in", not for - * instance any function or type defined in the information_schema. - * This is important because only hand-assigned OIDs can be expected - * to remain stable across major versions. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); + send_relation_and_attrs(relation, ctx); + relentry->schema_sent = true; +} - if (att->attisdropped || att->attgenerated) - continue; +/* + * Sends a relation + */ +static void +send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; - if (att->atttypid < FirstGenbkiObjectId) - continue; + /* + * Write out type info if needed. We do that only for user-created types. + * We use FirstGenbkiObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. This is important + * because only hand-assigned OIDs can be expected to remain stable across + * major versions. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atttypid < FirstGenbkiObjectId) + continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_typ(ctx->out, att->atttypid); OutputPluginWrite(ctx, false); - relentry->schema_sent = true; } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); } /* @@ -346,28 +396,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, - &change->data.tp.newtuple->tuple); - OutputPluginWrite(ctx, true); - break; + { + HeapTuple tuple = &change->data.tp.newtuple->tuple; + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->publish_as_relid); + /* Convert tuple if needed. */ + if (relentry->map) + tuple = execute_attr_map_tuple(tuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, tuple); + OutputPluginWrite(ctx, true); + break; + } case REORDER_BUFFER_CHANGE_UPDATE: { HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + HeapTuple newtuple = &change->data.tp.newtuple->tuple; + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->publish_as_relid); + /* Convert tuples if needed. */ + if (relentry->map) + { + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + newtuple = execute_attr_map_tuple(newtuple, relentry->map); + } + } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, - &change->data.tp.newtuple->tuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); OutputPluginWrite(ctx, true); break; } case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { + HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->publish_as_relid); + /* Convert tuple if needed. */ + if (relentry->map) + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, - &change->data.tp.oldtuple->tuple); + logicalrep_write_delete(ctx->out, relation, oldtuple); OutputPluginWrite(ctx, true); } else @@ -412,10 +499,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; /* - * Don't send partitioned tables, because partitions should be sent - * instead. + * Don't send partitions if the publication wants to send only the + * root tables through it. */ - if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (relation->rd_rel->relispartition && + relentry->publish_as_relid != relid) continue; relids[nrelids++] = relid; @@ -540,12 +628,15 @@ init_rel_sync_cache(MemoryContext cachectx) * This looks up publications that the given relation is directly or * indirectly part of (the latter if it's really the relation's ancestor that * is part of a publication) and fills up the found entry with the information - * about which operations to publish. + * about which operations to publish and whether to use an ancestor's schema + * when publishing. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid) { RelationSyncEntry *entry; + bool am_partition = get_rel_relispartition(relid); + char relkind = get_rel_relkind(relid); bool found; MemoryContext oldctx; @@ -564,6 +655,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { List *pubids = GetRelationPublications(relid); ListCell *lc; + Oid publish_as_relid = relid; /* Reload publications if needed before use. */ if (!publications_valid) @@ -588,8 +680,56 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) foreach(lc, data->publications) { Publication *pub = lfirst(lc); + bool publish = false; - if (pub->alltables || list_member_oid(pubids, pub->oid)) + if (pub->alltables) + { + publish = true; + if (pub->pubviaroot && am_partition) + publish_as_relid = llast_oid(get_partition_ancestors(relid)); + } + + if (!publish) + { + bool ancestor_published = false; + + /* + * For a partition, check if any of the ancestors are + * published. If so, note down the topmost ancestor that is + * published via this publication, which will be used as the + * relation via which to publish the partition's changes. + */ + if (am_partition) + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc2; + + /* Find the "topmost" ancestor that is in this publication. */ + foreach(lc2, ancestors) + { + Oid ancestor = lfirst_oid(lc2); + + if (list_member_oid(GetRelationPublications(ancestor), + pub->oid)) + { + ancestor_published = true; + if (pub->pubviaroot) + publish_as_relid = ancestor; + } + } + } + + if (list_member_oid(pubids, pub->oid) || ancestor_published) + publish = true; + } + + /* + * Don't publish changes for partitioned tables, because + * publishing those of its partitions suffices, unless partition + * changes won't be published due to pubviaroot being set. + */ + if (publish && + (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; @@ -604,6 +744,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) list_free(pubids); + entry->publish_as_relid = publish_as_relid; entry->replicate_valid = true; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index dfd81f1320..9f1f11d0c1 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -44,6 +44,7 @@ #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/partition.h" #include "catalog/pg_am.h" #include "catalog/pg_amproc.h" #include "catalog/pg_attrdef.h" @@ -5314,6 +5315,20 @@ GetRelationPublicationActions(Relation relation) /* Fetch the publication membership info. */ puboids = GetRelationPublications(RelationGetRelid(relation)); + if (relation->rd_rel->relispartition) + { + /* Add publications that the ancestors are in too. */ + List *ancestors = get_partition_ancestors(RelationGetRelid(relation)); + ListCell *lc; + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + + puboids = list_concat_unique_oid(puboids, + GetRelationPublications(ancestor)); + } + } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); foreach(lc, puboids) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 408637cfec..c579227b19 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3868,6 +3868,7 @@ getPublications(Archive *fout) int i_pubupdate; int i_pubdelete; int i_pubtruncate; + int i_pubviaroot; int i, ntups; @@ -3879,18 +3880,25 @@ getPublications(Archive *fout) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 110000) + if (fout->remoteVersion >= 130000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "FROM pg_publication p", + username_subquery); + else if (fout->remoteVersion >= 110000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot " "FROM pg_publication p", username_subquery); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot " "FROM pg_publication p", username_subquery); @@ -3907,6 +3915,7 @@ getPublications(Archive *fout) i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); + i_pubviaroot = PQfnumber(res, "pubviaroot"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3929,6 +3938,8 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); pubinfo[i].pubtruncate = (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); + pubinfo[i].pubviaroot = + (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); if (strlen(pubinfo[i].rolname) == 0) pg_log_warning("owner of publication \"%s\" appears to be invalid", @@ -4005,7 +4016,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) first = false; } - appendPQExpBufferStr(query, "');\n"); + appendPQExpBufferStr(query, "'"); + + if (pubinfo->pubviaroot) + appendPQExpBufferStr(query, ", publish_via_partition_root = true"); + + appendPQExpBufferStr(query, ");\n"); ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = pubinfo->dobj.name, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 3e11166615..61c909e06d 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -602,6 +602,7 @@ typedef struct _PublicationInfo bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubviaroot; } PublicationInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 109245fea7..f05e914b4d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5707,7 +5707,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5738,6 +5738,10 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubtruncate AS \"%s\"", gettext_noop("Truncates")); + if (pset.sversion >= 130000) + appendPQExpBuffer(&buf, + ",\n pubviaroot AS \"%s\"", + gettext_noop("Via root")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5779,6 +5783,7 @@ describePublications(const char *pattern) int i; PGresult *res; bool has_pubtruncate; + bool has_pubviaroot; if (pset.sversion < 100000) { @@ -5791,6 +5796,7 @@ describePublications(const char *pattern) } has_pubtruncate = (pset.sversion >= 110000); + has_pubviaroot = (pset.sversion >= 130000); initPQExpBuffer(&buf); @@ -5801,6 +5807,9 @@ describePublications(const char *pattern) if (has_pubtruncate) appendPQExpBufferStr(&buf, ", pubtruncate"); + if (has_pubviaroot) + appendPQExpBufferStr(&buf, + ", pubviaroot"); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5850,6 +5859,8 @@ describePublications(const char *pattern) if (has_pubtruncate) ncols++; + if (has_pubviaroot) + ncols++; initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); @@ -5862,6 +5873,8 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); if (has_pubtruncate) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); + if (has_pubviaroot) + printTableAddHeader(&cont, gettext_noop("Via root"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); @@ -5870,6 +5883,8 @@ describePublications(const char *pattern) printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); if (has_pubtruncate) printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); + if (has_pubviaroot) + printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); if (!puballtables) { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 27381d7874..13bbddf785 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202004073 +#define CATALOG_VERSION_NO 202004074 #endif diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index bb52e8c5e0..ec02f48da0 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -52,6 +52,8 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if truncates are published */ bool pubtruncate; + /* true if partition changes are published using root schema */ + bool pubviaroot; } FormData_pg_publication; /* ---------------- @@ -74,6 +76,7 @@ typedef struct Publication Oid oid; char *name; bool alltables; + bool pubviaroot; PublicationActions pubactions; } Publication; @@ -99,7 +102,7 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(void); +extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 2634d2c1e1..63d6ab7a4e 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -25,21 +25,23 @@ CREATE PUBLICATION testpub_xxx WITH (foo); ERROR: unrecognized publication parameter: "foo" CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); ERROR: unrecognized "publish" value: "cluster" +CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0'); +ERROR: conflicting or redundant options \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------+--------------------------+------------+---------+---------+---------+----------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------+--------------------------+------------+---------+---------+---------+-----------+---------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------+--------------------------+------------+---------+---------+---------+----------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------+--------------------------+------------+---------+---------+---------+-----------+---------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f | f (2 rows) --- adding tables @@ -83,10 +85,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | t | t | t | f | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | t | t | t | f | f | f (1 row) DROP TABLE testpub_tbl2; @@ -98,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f Tables: "public.testpub_tbl3" @@ -129,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); -- only parent is listed as being in publication, not the partition ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f Tables: "public.testpub_parted" @@ -143,6 +145,15 @@ HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; -- works again, because parent's publication is no longer considered UPDATE testpub_parted1 SET a = 1; +ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | t +Tables: + "public.testpub_parted" + DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; -- fail - view @@ -159,10 +170,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -200,10 +211,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | f | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -247,10 +258,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | f | f (1 row) -- fail - must be owner of publication @@ -260,20 +271,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates --------------+--------------------------+------------+---------+---------+---------+----------- - testpub_foo | regress_publication_user | f | t | t | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +-------------+--------------------------+------------+---------+---------+---------+-----------+---------- + testpub_foo | regress_publication_user | f | t | t | t | f | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates ------------------+---------------------------+------------+---------+---------+---------+----------- - testpub_default | regress_publication_user2 | f | t | t | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +-----------------+---------------------------+------------+---------+---------+---------+-----------+---------- + testpub_default | regress_publication_user2 | f | t | t | t | f | f (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 219e04129d..d844075368 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -23,6 +23,7 @@ ALTER PUBLICATION testpub_default SET (publish = update); -- error cases CREATE PUBLICATION testpub_xxx WITH (foo); CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); +CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0'); \dRp @@ -87,6 +88,8 @@ UPDATE testpub_parted1 SET a = 1; ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; -- works again, because parent's publication is no longer considered UPDATE testpub_parted1 SET a = 1; +ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); +\dRp+ testpub_forparted DROP TABLE testpub_parted1; DROP PUBLICATION testpub_forparted, testpub_forparted1; diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 5db1b21c59..208bb556ce 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 24; +use Test::More tests => 51; # setup @@ -48,7 +48,6 @@ $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"); - $node_subscriber1->safe_psql('postgres', "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); $node_subscriber1->safe_psql('postgres', @@ -87,6 +86,8 @@ $node_subscriber1->poll_query_until('postgres', $synced_query) $node_subscriber2->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for subscriber to synchronize data"; +# Tests for replication using leaf partition identity and schema + # insert $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1)"); @@ -260,3 +261,296 @@ is($result, qq(), 'truncate of tab1_1 replicated'); $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); + +# Tests for replication using root table identity and schema + +# publisher +$node_publisher->safe_psql('postgres', + "DROP PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)"); +# Note: tab3_1's parent is not in the publication, in which case its +# changes are published using own identity. +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)"); + +# subscriber 1 +$node_subscriber1->safe_psql('postgres', + "DROP SUBSCRIPTION sub1"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)"); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot"); + +# subscriber 2 +$node_subscriber2->safe_psql('postgres', + "DROP TABLE tab1"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)"); +# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher. +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)"); +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"); +# Publication that sub2 points to now publishes via root, so must update +# subscription target relations. +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +# Wait for initial sync of all subscriptions +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (0)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (0), (3), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1), (0), (3), (5)"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is($result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|3 +sub1_tab2|5), 'inserts into tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is($result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|3 +sub1_tab3_1|5), 'inserts into tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is($result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|3 +sub2_tab1|5), 'inserts into tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is($result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|3 +sub2_tab2|5), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is($result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|3 +sub2_tab3|5), 'inserts into tab3 replicated'); + +# update (replicated as update) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', + "UPDATE tab3 SET a = 6 WHERE a = 5"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is($result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|3 +sub1_tab2|6), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is($result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|3 +sub1_tab3_1|6), 'update of tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is($result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|3 +sub2_tab1|6), 'inserts into tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is($result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|3 +sub2_tab2|6), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is($result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|3 +sub2_tab3|6), 'inserts into tab3 replicated'); + +# update (replicated as delete+insert) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 2 WHERE a = 6"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET a = 2 WHERE a = 6"); +$node_publisher->safe_psql('postgres', + "UPDATE tab3 SET a = 2 WHERE a = 6"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is($result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|2 +sub1_tab2|3), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is($result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|2 +sub1_tab3_1|3), 'update of tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is($result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|2 +sub2_tab1|3), 'update of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is($result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|2 +sub2_tab2|3), 'update of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is($result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|2 +sub2_tab3|3), 'update of tab3 replicated'); + +# delete +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab2"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab2"); +is($result, qq(), 'delete tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab1"); +is($result, qq(), 'delete from tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab2"); +is($result, qq(), 'delete from tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab3"); +is($result, qq(), 'delete from tab3 replicated'); + +# truncate +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (5)"); +# these will NOT be replicated +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1_2, tab2_1, tab3_1"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab2 ORDER BY 1"); +is($result, qq(1 +2 +5), 'truncate of tab2_1 NOT replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(1 +2 +5), 'truncate of tab1_2 NOT replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab2 ORDER BY 1"); +is($result, qq(1 +2 +5), 'truncate of tab2_1 NOT replicated'); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1, tab2, tab3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab2"); +is($result, qq(), 'truncate of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab1"); +is($result, qq(), 'truncate of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab2"); +is($result, qq(), 'truncate of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab3"); +is($result, qq(), 'truncate of tab3 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab3_1"); +is($result, qq(), 'truncate of tab3_1 replicated');