diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 0d61d98b11..386c6d7bd1 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5437,6 +5437,16 @@ SCRAM-SHA-256$<iteration count>:&l
If true, TRUNCATE operations are replicated for
tables in the publication.
+
+
+ pubviaroot
+ bool
+
+ If true, operations on a leaf partition are replicated using the
+ identity and schema of its topmost partitioned ancestor mentioned in the
+ publication instead of its own.
+
+
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index c513621470..eba331a72b 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -411,10 +411,14 @@
When replicating between partitioned tables, the actual replication
- originates from the leaf partitions on the publisher, so partitions on
- the publisher must also exist on the subscriber as valid target tables.
- (They could either be leaf partitions themselves, or they could be
- further subpartitioned, or they could even be independent tables.)
+ originates, by default, from the leaf partitions on the publisher, so
+ partitions on the publisher must also exist on the subscriber as valid
+ target tables. (They could either be leaf partitions themselves, or they
+ could be further subpartitioned, or they could even be independent
+ tables.) Publications can also specify that changes are to be replicated
+ using the identity and schema of the partitioned root table instead of
+ that of the individual leaf partitions in which the changes actually
+ originate (see ).
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index 597cb28f33..2c52a8aada 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -123,6 +123,26 @@ CREATE PUBLICATION name
+
+
+ publish_via_partition_root (boolean)
+
+
+ This parameter determines whether changes in a partitioned table (or
+ on its partitions) contained in the publication will be published
+ using the identity and schema of the partitioned table rather than
+ that of the individual partitions that are actually changed; the
+ latter is the default. Enablings this allows the changes to be
+ replicated into a non-partitioned table or a partitioned table
+ consisting of a different set of partitions.
+
+
+
+ If this is enabled, TRUNCATE operations performed
+ directly on partitions are not replicated.
+
+
+
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 500a5ae1ee..68f6887b38 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -42,8 +42,6 @@
#include "utils/rel.h"
#include "utils/syscache.h"
-static List *get_rel_publications(Oid relid);
-
/*
* Check if relation can be in given publication and throws appropriate
* error if not.
@@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel,
return myself;
}
-
-/*
- * Gets list of publication oids for a relation, plus those of ancestors,
- * if any, if the relation is a partition.
- */
+/* Gets list of publication oids for a relation */
List *
GetRelationPublications(Oid relid)
-{
- List *result = NIL;
-
- result = get_rel_publications(relid);
- if (get_rel_relispartition(relid))
- {
- List *ancestors = get_partition_ancestors(relid);
- ListCell *lc;
-
- foreach(lc, ancestors)
- {
- Oid ancestor = lfirst_oid(lc);
- List *ancestor_pubs = get_rel_publications(ancestor);
-
- result = list_concat(result, ancestor_pubs);
- }
- }
-
- return result;
-}
-
-/* Workhorse of GetRelationPublications() */
-static List *
-get_rel_publications(Oid relid)
{
List *result = NIL;
CatCList *pubrellist;
@@ -373,9 +343,13 @@ GetAllTablesPublications(void)
/*
* Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
*/
List *
-GetAllTablesPublicationRelations(void)
+GetAllTablesPublicationRelations(bool pubviaroot)
{
Relation classRel;
ScanKeyData key[1];
@@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void)
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
Oid relid = relForm->oid;
- if (is_publishable_class(relid, relForm))
+ if (is_publishable_class(relid, relForm) &&
+ !(relForm->relispartition && pubviaroot))
result = lappend_oid(result, relid);
}
table_endscan(scan);
- table_close(classRel, AccessShareLock);
+
+ if (pubviaroot)
+ {
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_PARTITIONED_TABLE));
+
+ scan = table_beginscan_catalog(classRel, 1, key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+
+ if (is_publishable_class(relid, relForm) &&
+ !relForm->relispartition)
+ result = lappend_oid(result, relid);
+ }
+
+ table_endscan(scan);
+ table_close(classRel, AccessShareLock);
+ }
return result;
}
@@ -433,6 +430,7 @@ GetPublication(Oid pubid)
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
+ pub->pubviaroot = pubform->pubviaroot;
ReleaseSysCache(tup);
@@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* need those.
*/
if (publication->alltables)
- tables = GetAllTablesPublicationRelations();
+ tables = GetAllTablesPublicationRelations(publication->pubviaroot);
else
tables = GetPublicationRelations(publication->oid,
+ publication->pubviaroot ?
+ PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
funcctx->user_fctx = (void *) tables;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 494c0bdc28..771268f70a 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -23,6 +23,7 @@
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
+#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
@@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
static void
parse_publication_options(List *options,
bool *publish_given,
- bool *publish_insert,
- bool *publish_update,
- bool *publish_delete,
- bool *publish_truncate)
+ PublicationActions *pubactions,
+ bool *publish_via_partition_root_given,
+ bool *publish_via_partition_root)
{
ListCell *lc;
*publish_given = false;
+ *publish_via_partition_root_given = false;
- /* Defaults are true */
- *publish_insert = true;
- *publish_update = true;
- *publish_delete = true;
- *publish_truncate = true;
+ /* defaults */
+ pubactions->pubinsert = true;
+ pubactions->pubupdate = true;
+ pubactions->pubdelete = true;
+ pubactions->pubtruncate = true;
+ *publish_via_partition_root = false;
/* Parse options */
foreach(lc, options)
@@ -91,10 +93,10 @@ parse_publication_options(List *options,
* If publish option was given only the explicitly listed actions
* should be published.
*/
- *publish_insert = false;
- *publish_update = false;
- *publish_delete = false;
- *publish_truncate = false;
+ pubactions->pubinsert = false;
+ pubactions->pubupdate = false;
+ pubactions->pubdelete = false;
+ pubactions->pubtruncate = false;
*publish_given = true;
publish = defGetString(defel);
@@ -110,19 +112,28 @@ parse_publication_options(List *options,
char *publish_opt = (char *) lfirst(lc);
if (strcmp(publish_opt, "insert") == 0)
- *publish_insert = true;
+ pubactions->pubinsert = true;
else if (strcmp(publish_opt, "update") == 0)
- *publish_update = true;
+ pubactions->pubupdate = true;
else if (strcmp(publish_opt, "delete") == 0)
- *publish_delete = true;
+ pubactions->pubdelete = true;
else if (strcmp(publish_opt, "truncate") == 0)
- *publish_truncate = true;
+ pubactions->pubtruncate = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
}
}
+ else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
+ {
+ if (*publish_via_partition_root_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ *publish_via_partition_root_given = true;
+ *publish_via_partition_root = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt)
Datum values[Natts_pg_publication];
HeapTuple tup;
bool publish_given;
- bool publish_insert;
- bool publish_update;
- bool publish_delete;
- bool publish_truncate;
+ PublicationActions pubactions;
+ bool publish_via_partition_root_given;
+ bool publish_via_partition_root;
AclResult aclresult;
/* must have CREATE privilege on database */
@@ -183,9 +193,9 @@ CreatePublication(CreatePublicationStmt *stmt)
values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
parse_publication_options(stmt->options,
- &publish_given, &publish_insert,
- &publish_update, &publish_delete,
- &publish_truncate);
+ &publish_given, &pubactions,
+ &publish_via_partition_root_given,
+ &publish_via_partition_root);
puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
Anum_pg_publication_oid);
@@ -193,13 +203,15 @@ CreatePublication(CreatePublicationStmt *stmt)
values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
values[Anum_pg_publication_pubinsert - 1] =
- BoolGetDatum(publish_insert);
+ BoolGetDatum(pubactions.pubinsert);
values[Anum_pg_publication_pubupdate - 1] =
- BoolGetDatum(publish_update);
+ BoolGetDatum(pubactions.pubupdate);
values[Anum_pg_publication_pubdelete - 1] =
- BoolGetDatum(publish_delete);
+ BoolGetDatum(pubactions.pubdelete);
values[Anum_pg_publication_pubtruncate - 1] =
- BoolGetDatum(publish_truncate);
+ BoolGetDatum(pubactions.pubtruncate);
+ values[Anum_pg_publication_pubviaroot - 1] =
+ BoolGetDatum(publish_via_partition_root);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
bool replaces[Natts_pg_publication];
Datum values[Natts_pg_publication];
bool publish_given;
- bool publish_insert;
- bool publish_update;
- bool publish_delete;
- bool publish_truncate;
+ PublicationActions pubactions;
+ bool publish_via_partition_root_given;
+ bool publish_via_partition_root;
ObjectAddress obj;
Form_pg_publication pubform;
parse_publication_options(stmt->options,
- &publish_given, &publish_insert,
- &publish_update, &publish_delete,
- &publish_truncate);
+ &publish_given, &pubactions,
+ &publish_via_partition_root_given,
+ &publish_via_partition_root);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
if (publish_given)
{
- values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
+ values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
replaces[Anum_pg_publication_pubinsert - 1] = true;
- values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
+ values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
replaces[Anum_pg_publication_pubupdate - 1] = true;
- values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
+ values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
replaces[Anum_pg_publication_pubdelete - 1] = true;
- values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+ values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
replaces[Anum_pg_publication_pubtruncate - 1] = true;
}
+ if (publish_via_partition_root_given)
+ {
+ values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
+ replaces[Anum_pg_publication_pubviaroot - 1] = true;
+ }
+
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 552a70cffa..5fbf2d4367 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,6 +12,8 @@
*/
#include "postgres.h"
+#include "access/tupconvert.h"
+#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "fmgr.h"
#include "replication/logical.h"
@@ -20,6 +22,7 @@
#include "replication/pgoutput.h"
#include "utils/int8.h"
#include "utils/inval.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@@ -49,6 +52,7 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
+static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
@@ -59,9 +63,31 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
- bool schema_sent; /* did we send the schema? */
+
+ /*
+ * Did we send the schema? If ancestor relid is set, its schema must also
+ * have been sent for this to be true.
+ */
+ bool schema_sent;
+
bool replicate_valid;
PublicationActions pubactions;
+
+ /*
+ * OID of the relation to publish changes as. For a partition, this may
+ * be set to one of its ancestors whose schema will be used when
+ * replicating changes, if publish_via_partition_root is set for the
+ * publication.
+ */
+ Oid publish_as_relid;
+
+ /*
+ * Map used when replicating using an ancestor's schema to convert tuples
+ * from partition's type to the ancestor's; NULL if publish_as_relid is
+ * same as 'relid' or if unnecessary due to partition and the ancestor
+ * having identical TupleDesc.
+ */
+ TupleConversionMap *map;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
@@ -259,47 +285,71 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
- * Write the relation schema if the current schema hasn't been sent yet.
+ * Write the current schema of the relation and its ancestor (if any) if not
+ * done yet.
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
Relation relation, RelationSyncEntry *relentry)
{
- if (!relentry->schema_sent)
+ if (relentry->schema_sent)
+ return;
+
+ /* If needed, send the ancestor's schema first. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
{
- TupleDesc desc;
- int i;
+ Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ TupleDesc indesc = RelationGetDescr(relation);
+ TupleDesc outdesc = RelationGetDescr(ancestor);
+ MemoryContext oldctx;
- desc = RelationGetDescr(relation);
+ /* Map must live as long as the session does. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ relentry->map = convert_tuples_by_name(indesc, outdesc);
+ MemoryContextSwitchTo(oldctx);
+ send_relation_and_attrs(ancestor, ctx);
+ RelationClose(ancestor);
+ }
- /*
- * Write out type info if needed. We do that only for user-created
- * types. We use FirstGenbkiObjectId as the cutoff, so that we only
- * consider objects with hand-assigned OIDs to be "built in", not for
- * instance any function or type defined in the information_schema.
- * This is important because only hand-assigned OIDs can be expected
- * to remain stable across major versions.
- */
- for (i = 0; i < desc->natts; i++)
- {
- Form_pg_attribute att = TupleDescAttr(desc, i);
+ send_relation_and_attrs(relation, ctx);
+ relentry->schema_sent = true;
+}
- if (att->attisdropped || att->attgenerated)
- continue;
+/*
+ * Sends a relation
+ */
+static void
+send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+{
+ TupleDesc desc = RelationGetDescr(relation);
+ int i;
- if (att->atttypid < FirstGenbkiObjectId)
- continue;
+ /*
+ * Write out type info if needed. We do that only for user-created types.
+ * We use FirstGenbkiObjectId as the cutoff, so that we only consider
+ * objects with hand-assigned OIDs to be "built in", not for instance any
+ * function or type defined in the information_schema. This is important
+ * because only hand-assigned OIDs can be expected to remain stable across
+ * major versions.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
- OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_typ(ctx->out, att->atttypid);
- OutputPluginWrite(ctx, false);
- }
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ if (att->atttypid < FirstGenbkiObjectId)
+ continue;
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, relation);
+ logicalrep_write_typ(ctx->out, att->atttypid);
OutputPluginWrite(ctx, false);
- relentry->schema_sent = true;
}
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_rel(ctx->out, relation);
+ OutputPluginWrite(ctx, false);
}
/*
@@ -346,28 +396,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, relation,
- &change->data.tp.newtuple->tuple);
- OutputPluginWrite(ctx, true);
- break;
+ {
+ HeapTuple tuple = &change->data.tp.newtuple->tuple;
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ relation = RelationIdGetRelation(relentry->publish_as_relid);
+ /* Convert tuple if needed. */
+ if (relentry->map)
+ tuple = execute_attr_map_tuple(tuple, relentry->map);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_insert(ctx->out, relation, tuple);
+ OutputPluginWrite(ctx, true);
+ break;
+ }
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple oldtuple = change->data.tp.oldtuple ?
&change->data.tp.oldtuple->tuple : NULL;
+ HeapTuple newtuple = &change->data.tp.newtuple->tuple;
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ relation = RelationIdGetRelation(relentry->publish_as_relid);
+ /* Convert tuples if needed. */
+ if (relentry->map)
+ {
+ oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+ newtuple = execute_attr_map_tuple(newtuple, relentry->map);
+ }
+ }
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_update(ctx->out, relation, oldtuple,
- &change->data.tp.newtuple->tuple);
+ logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
OutputPluginWrite(ctx, true);
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
+ HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ relation = RelationIdGetRelation(relentry->publish_as_relid);
+ /* Convert tuple if needed. */
+ if (relentry->map)
+ oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+ }
+
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_delete(ctx->out, relation,
- &change->data.tp.oldtuple->tuple);
+ logicalrep_write_delete(ctx->out, relation, oldtuple);
OutputPluginWrite(ctx, true);
}
else
@@ -412,10 +499,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue;
/*
- * Don't send partitioned tables, because partitions should be sent
- * instead.
+ * Don't send partitions if the publication wants to send only the
+ * root tables through it.
*/
- if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ if (relation->rd_rel->relispartition &&
+ relentry->publish_as_relid != relid)
continue;
relids[nrelids++] = relid;
@@ -540,12 +628,15 @@ init_rel_sync_cache(MemoryContext cachectx)
* This looks up publications that the given relation is directly or
* indirectly part of (the latter if it's really the relation's ancestor that
* is part of a publication) and fills up the found entry with the information
- * about which operations to publish.
+ * about which operations to publish and whether to use an ancestor's schema
+ * when publishing.
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Oid relid)
{
RelationSyncEntry *entry;
+ bool am_partition = get_rel_relispartition(relid);
+ char relkind = get_rel_relkind(relid);
bool found;
MemoryContext oldctx;
@@ -564,6 +655,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
{
List *pubids = GetRelationPublications(relid);
ListCell *lc;
+ Oid publish_as_relid = relid;
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -588,8 +680,56 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
foreach(lc, data->publications)
{
Publication *pub = lfirst(lc);
+ bool publish = false;
- if (pub->alltables || list_member_oid(pubids, pub->oid))
+ if (pub->alltables)
+ {
+ publish = true;
+ if (pub->pubviaroot && am_partition)
+ publish_as_relid = llast_oid(get_partition_ancestors(relid));
+ }
+
+ if (!publish)
+ {
+ bool ancestor_published = false;
+
+ /*
+ * For a partition, check if any of the ancestors are
+ * published. If so, note down the topmost ancestor that is
+ * published via this publication, which will be used as the
+ * relation via which to publish the partition's changes.
+ */
+ if (am_partition)
+ {
+ List *ancestors = get_partition_ancestors(relid);
+ ListCell *lc2;
+
+ /* Find the "topmost" ancestor that is in this publication. */
+ foreach(lc2, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc2);
+
+ if (list_member_oid(GetRelationPublications(ancestor),
+ pub->oid))
+ {
+ ancestor_published = true;
+ if (pub->pubviaroot)
+ publish_as_relid = ancestor;
+ }
+ }
+ }
+
+ if (list_member_oid(pubids, pub->oid) || ancestor_published)
+ publish = true;
+ }
+
+ /*
+ * Don't publish changes for partitioned tables, because
+ * publishing those of its partitions suffices, unless partition
+ * changes won't be published due to pubviaroot being set.
+ */
+ if (publish &&
+ (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
@@ -604,6 +744,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
list_free(pubids);
+ entry->publish_as_relid = publish_as_relid;
entry->replicate_valid = true;
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index dfd81f1320..9f1f11d0c1 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -44,6 +44,7 @@
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
+#include "catalog/partition.h"
#include "catalog/pg_am.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_attrdef.h"
@@ -5314,6 +5315,20 @@ GetRelationPublicationActions(Relation relation)
/* Fetch the publication membership info. */
puboids = GetRelationPublications(RelationGetRelid(relation));
+ if (relation->rd_rel->relispartition)
+ {
+ /* Add publications that the ancestors are in too. */
+ List *ancestors = get_partition_ancestors(RelationGetRelid(relation));
+ ListCell *lc;
+
+ foreach(lc, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc);
+
+ puboids = list_concat_unique_oid(puboids,
+ GetRelationPublications(ancestor));
+ }
+ }
puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
foreach(lc, puboids)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 408637cfec..c579227b19 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3868,6 +3868,7 @@ getPublications(Archive *fout)
int i_pubupdate;
int i_pubdelete;
int i_pubtruncate;
+ int i_pubviaroot;
int i,
ntups;
@@ -3879,18 +3880,25 @@ getPublications(Archive *fout)
resetPQExpBuffer(query);
/* Get the publications. */
- if (fout->remoteVersion >= 110000)
+ if (fout->remoteVersion >= 130000)
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"(%s p.pubowner) AS rolname, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+ "FROM pg_publication p",
+ username_subquery);
+ else if (fout->remoteVersion >= 110000)
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
"FROM pg_publication p",
username_subquery);
else
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"(%s p.pubowner) AS rolname, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
"FROM pg_publication p",
username_subquery);
@@ -3907,6 +3915,7 @@ getPublications(Archive *fout)
i_pubupdate = PQfnumber(res, "pubupdate");
i_pubdelete = PQfnumber(res, "pubdelete");
i_pubtruncate = PQfnumber(res, "pubtruncate");
+ i_pubviaroot = PQfnumber(res, "pubviaroot");
pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
@@ -3929,6 +3938,8 @@ getPublications(Archive *fout)
(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
pubinfo[i].pubtruncate =
(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
+ pubinfo[i].pubviaroot =
+ (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
if (strlen(pubinfo[i].rolname) == 0)
pg_log_warning("owner of publication \"%s\" appears to be invalid",
@@ -4005,7 +4016,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
first = false;
}
- appendPQExpBufferStr(query, "');\n");
+ appendPQExpBufferStr(query, "'");
+
+ if (pubinfo->pubviaroot)
+ appendPQExpBufferStr(query, ", publish_via_partition_root = true");
+
+ appendPQExpBufferStr(query, ");\n");
ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 3e11166615..61c909e06d 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -602,6 +602,7 @@ typedef struct _PublicationInfo
bool pubupdate;
bool pubdelete;
bool pubtruncate;
+ bool pubviaroot;
} PublicationInfo;
/*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 109245fea7..f05e914b4d 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5707,7 +5707,7 @@ listPublications(const char *pattern)
PQExpBufferData buf;
PGresult *res;
printQueryOpt myopt = pset.popt;
- static const bool translate_columns[] = {false, false, false, false, false, false, false};
+ static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -5738,6 +5738,10 @@ listPublications(const char *pattern)
appendPQExpBuffer(&buf,
",\n pubtruncate AS \"%s\"",
gettext_noop("Truncates"));
+ if (pset.sversion >= 130000)
+ appendPQExpBuffer(&buf,
+ ",\n pubviaroot AS \"%s\"",
+ gettext_noop("Via root"));
appendPQExpBufferStr(&buf,
"\nFROM pg_catalog.pg_publication\n");
@@ -5779,6 +5783,7 @@ describePublications(const char *pattern)
int i;
PGresult *res;
bool has_pubtruncate;
+ bool has_pubviaroot;
if (pset.sversion < 100000)
{
@@ -5791,6 +5796,7 @@ describePublications(const char *pattern)
}
has_pubtruncate = (pset.sversion >= 110000);
+ has_pubviaroot = (pset.sversion >= 130000);
initPQExpBuffer(&buf);
@@ -5801,6 +5807,9 @@ describePublications(const char *pattern)
if (has_pubtruncate)
appendPQExpBufferStr(&buf,
", pubtruncate");
+ if (has_pubviaroot)
+ appendPQExpBufferStr(&buf,
+ ", pubviaroot");
appendPQExpBufferStr(&buf,
"\nFROM pg_catalog.pg_publication\n");
@@ -5850,6 +5859,8 @@ describePublications(const char *pattern)
if (has_pubtruncate)
ncols++;
+ if (has_pubviaroot)
+ ncols++;
initPQExpBuffer(&title);
printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -5862,6 +5873,8 @@ describePublications(const char *pattern)
printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
if (has_pubtruncate)
printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
+ if (has_pubviaroot)
+ printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -5870,6 +5883,8 @@ describePublications(const char *pattern)
printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
if (has_pubtruncate)
printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
+ if (has_pubviaroot)
+ printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
if (!puballtables)
{
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 27381d7874..13bbddf785 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202004073
+#define CATALOG_VERSION_NO 202004074
#endif
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index bb52e8c5e0..ec02f48da0 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -52,6 +52,8 @@ CATALOG(pg_publication,6104,PublicationRelationId)
/* true if truncates are published */
bool pubtruncate;
+ /* true if partition changes are published using root schema */
+ bool pubviaroot;
} FormData_pg_publication;
/* ----------------
@@ -74,6 +76,7 @@ typedef struct Publication
Oid oid;
char *name;
bool alltables;
+ bool pubviaroot;
PublicationActions pubactions;
} Publication;
@@ -99,7 +102,7 @@ typedef enum PublicationPartOpt
extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(void);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot);
extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 2634d2c1e1..63d6ab7a4e 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -25,21 +25,23 @@ CREATE PUBLICATION testpub_xxx WITH (foo);
ERROR: unrecognized publication parameter: "foo"
CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
ERROR: unrecognized "publish" value: "cluster"
+CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
+ERROR: conflicting or redundant options
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f | t | f | f | f
- testpub_default | regress_publication_user | f | f | t | f | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f
+ testpub_default | regress_publication_user | f | f | t | f | f | f
(2 rows)
ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f | t | f | f | f
- testpub_default | regress_publication_user | f | t | t | t | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f
+ testpub_default | regress_publication_user | f | t | t | t | f | f
(2 rows)
--- adding tables
@@ -83,10 +85,10 @@ Publications:
"testpub_foralltables"
\dRp+ testpub_foralltables
- Publication testpub_foralltables
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | t | t | t | f | f
+ Publication testpub_foralltables
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | t | t | t | f | f | f
(1 row)
DROP TABLE testpub_tbl2;
@@ -98,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
RESET client_min_messages;
\dRp+ testpub3
- Publication testpub3
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f | t | t | t | t
+ Publication testpub3
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | t | f
Tables:
"public.testpub_tbl3"
"public.testpub_tbl3a"
\dRp+ testpub4
- Publication testpub4
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f | t | t | t | t
+ Publication testpub4
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | t | f
Tables:
"public.testpub_tbl3"
@@ -129,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
-- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted
- Publication testpub_forparted
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f | t | t | t | t
+ Publication testpub_forparted
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | t | f
Tables:
"public.testpub_parted"
@@ -143,6 +145,15 @@ HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
-- works again, because parent's publication is no longer considered
UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
+\dRp+ testpub_forparted
+ Publication testpub_forparted
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | t | t
+Tables:
+ "public.testpub_parted"
+
DROP TABLE testpub_parted1;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- fail - view
@@ -159,10 +170,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
ERROR: publication "testpub_fortbl" already exists
\dRp+ testpub_fortbl
- Publication testpub_fortbl
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f | t | t | t | t
+ Publication testpub_fortbl
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | t | f
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
@@ -200,10 +211,10 @@ Publications:
"testpub_fortbl"
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f | t | t | t | f
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | f | f
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
@@ -247,10 +258,10 @@ DROP TABLE testpub_parted;
DROP VIEW testpub_view;
DROP TABLE testpub_tbl1;
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes | Truncates
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f | t | t | t | f
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f | t | t | t | f | f
(1 row)
-- fail - must be owner of publication
@@ -260,20 +271,20 @@ ERROR: must be owner of publication testpub_default
RESET ROLE;
ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
\dRp testpub_foo
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
--------------+--------------------------+------------+---------+---------+---------+-----------
- testpub_foo | regress_publication_user | f | t | t | t | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpub_foo | regress_publication_user | f | t | t | t | f | f
(1 row)
-- rename back to keep the rest simple
ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
\dRp testpub_default
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
------------------+---------------------------+------------+---------+---------+---------+-----------
- testpub_default | regress_publication_user2 | f | t | t | t | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------
+ testpub_default | regress_publication_user2 | f | t | t | t | f | f
(1 row)
DROP PUBLICATION testpub_default;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 219e04129d..d844075368 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -23,6 +23,7 @@ ALTER PUBLICATION testpub_default SET (publish = update);
-- error cases
CREATE PUBLICATION testpub_xxx WITH (foo);
CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
+CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
\dRp
@@ -87,6 +88,8 @@ UPDATE testpub_parted1 SET a = 1;
ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
-- works again, because parent's publication is no longer considered
UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
+\dRp+ testpub_forparted
DROP TABLE testpub_parted1;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 5db1b21c59..208bb556ce 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 24;
+use Test::More tests => 51;
# setup
@@ -48,7 +48,6 @@ $node_subscriber1->safe_psql('postgres',
"CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
$node_subscriber1->safe_psql('postgres',
"CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
-
$node_subscriber1->safe_psql('postgres',
"ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
$node_subscriber1->safe_psql('postgres',
@@ -87,6 +86,8 @@ $node_subscriber1->poll_query_until('postgres', $synced_query)
$node_subscriber2->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
+# Tests for replication using leaf partition identity and schema
+
# insert
$node_publisher->safe_psql('postgres',
"INSERT INTO tab1 VALUES (1)");
@@ -260,3 +261,296 @@ is($result, qq(), 'truncate of tab1_1 replicated');
$result = $node_subscriber2->safe_psql('postgres',
"SELECT a FROM tab1 ORDER BY 1");
is($result, qq(), 'truncate of tab1 replicated');
+
+# Tests for replication using root table identity and schema
+
+# publisher
+$node_publisher->safe_psql('postgres',
+ "DROP PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
+# Note: tab3_1's parent is not in the publication, in which case its
+# changes are published using own identity.
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)");
+
+# subscriber 1
+$node_subscriber1->safe_psql('postgres',
+ "DROP SUBSCRIPTION sub1");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+ "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot");
+
+# subscriber 2
+$node_subscriber2->safe_psql('postgres',
+ "DROP TABLE tab1");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)");
+# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)");
+# Publication that sub2 points to now publishes via root, so must update
+# subscription target relations.
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+# Wait for initial sync of all subscriptions
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1), (0)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1), (0), (3), (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab3 VALUES (1), (0), (3), (5)");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|5), 'inserts into tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|5), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|5), 'inserts into tab3 replicated');
+
+# update (replicated as update)
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab2 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab3 SET a = 6 WHERE a = 5");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|6), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|6), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|6), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|6), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|6), 'inserts into tab3 replicated');
+
+# update (replicated as delete+insert)
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab2 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab3 SET a = 2 WHERE a = 6");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|2
+sub1_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|2
+sub1_tab3_1|3), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|2
+sub2_tab1|3), 'update of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|2
+sub2_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|2
+sub2_tab3|3), 'update of tab3 replicated');
+
+# delete
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab1");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab2");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab2");
+is($result, qq(), 'delete tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab2");
+is($result, qq(), 'delete from tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab3");
+is($result, qq(), 'delete from tab3 replicated');
+
+# truncate
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1), (2), (5)");
+# these will NOT be replicated
+$node_publisher->safe_psql('postgres',
+ "TRUNCATE tab1_2, tab2_1, tab3_1");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab2 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab1_2 NOT replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab2 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$node_publisher->safe_psql('postgres',
+ "TRUNCATE tab1, tab2, tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab1");
+is($result, qq(), 'truncate of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab3");
+is($result, qq(), 'truncate of tab3 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab3_1");
+is($result, qq(), 'truncate of tab3_1 replicated');