Allow specifying row filters for logical replication of tables.

This feature adds row filtering for publication tables. When a publication
is defined or modified, an optional WHERE clause can be specified. Rows
that don't satisfy this WHERE clause will be filtered out. This allows a
set of tables to be partially replicated. The row filter is per table. A
new row filter can be added simply by specifying a WHERE clause after the
table name. The WHERE clause must be enclosed by parentheses.

The row filter WHERE clause for a table added to a publication that
publishes UPDATE and/or DELETE operations must contain only columns that
are covered by REPLICA IDENTITY. The row filter WHERE clause for a table
added to a publication that publishes INSERT can use any column. If the
row filter evaluates to NULL, it is regarded as "false". The WHERE clause
only allows simple expressions that don't have user-defined functions,
user-defined operators, user-defined types, user-defined collations,
non-immutable built-in functions, or references to system columns. These
restrictions could be addressed in the future.

If you choose to do the initial table synchronization, only data that
satisfies the row filters is copied to the subscriber. If the subscription
has several publications in which a table has been published with
different WHERE clauses, rows that satisfy ANY of the expressions will be
copied. If a subscriber is a pre-15 version, the initial table
synchronization won't use row filters even if they are defined in the
publisher.

The row filters are applied before publishing the changes. If the
subscription has several publications in which the same table has been
published with different filters (for the same publish operation), those
expressions get OR'ed together so that rows satisfying any of the
expressions will be replicated.

This means all the other filters become redundant if (a) one of the
publications have no filter at all, (b) one of the publications was
created using FOR ALL TABLES, (c) one of the publications was created
using FOR ALL TABLES IN SCHEMA and the table belongs to that same schema.

If your publication contains a partitioned table, the publication
parameter publish_via_partition_root determines if it uses the partition's
row filter (if the parameter is false, the default) or the root
partitioned table's row filter.

Psql commands \dRp+ and \d <table-name> will display any row filters.

Author: Hou Zhijie, Euler Taveira, Peter Smith, Ajin Cherian
Reviewed-by: Greg Nancarrow, Haiying Tang, Amit Kapila, Tomas Vondra, Dilip Kumar, Vignesh C, Alvaro Herrera, Andres Freund, Wei Wang
Discussion: https://www.postgresql.org/message-id/flat/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com
This commit is contained in:
Amit Kapila 2022-02-22 07:54:12 +05:30
parent ebf6c5249b
commit 52e4f0cd47
33 changed files with 3120 additions and 243 deletions

View File

@ -6325,6 +6325,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
Reference to relation
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>prqual</structfield> <type>pg_node_tree</type>
</para>
<para>Expression tree (in <function>nodeToString()</function>
representation) for the relation's publication qualifying condition. Null
if there is no publication qualifying condition.</para></entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@ -52,7 +52,9 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
remove one or more tables/schemas from the publication. Note that adding
tables/schemas to a publication that is already subscribed to will require an
<literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the
subscribing side in order to become effective.
subscribing side in order to become effective. Note also that the combination
of <literal>DROP</literal> with a <literal>WHERE</literal> clause is not
allowed.
</para>
<para>
@ -110,6 +112,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
If the optional <literal>WHERE</literal> clause is specified, rows for
which the <replaceable class="parameter">expression</replaceable>
evaluates to false or null will not be published. Note that parentheses
are required around the expression. The
<replaceable class="parameter">expression</replaceable> is evaluated with
the role used for the replication connection.
</para>
</listitem>
</varlistentry>

View File

@ -163,8 +163,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<para>
Specifies whether to copy pre-existing data in the publications
that are being subscribed to when the replication starts.
The default is <literal>true</literal>. (Previously-subscribed
tables are not copied.)
The default is <literal>true</literal>.
</para>
<para>
Previously subscribed tables are not copied, even if a table's row
filter <literal>WHERE</literal> clause has since been modified.
</para>
</listitem>
</varlistentry>

View File

@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@ -78,6 +78,14 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
publication, so they are never explicitly added to the publication.
</para>
<para>
If the optional <literal>WHERE</literal> clause is specified, rows for
which the <replaceable class="parameter">expression</replaceable>
evaluates to false or null will not be published. Note that parentheses
are required around the expression. It has no effect on
<literal>TRUNCATE</literal> commands.
</para>
<para>
Only persistent base tables and partitioned tables can be part of a
publication. Temporary tables, unlogged tables, foreign tables,
@ -225,6 +233,22 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
disallowed on those tables.
</para>
<para>
A <literal>WHERE</literal> (i.e. row filter) expression must contain only
columns that are covered by the <literal>REPLICA IDENTITY</literal>, in
order for <command>UPDATE</command> and <command>DELETE</command> operations
to be published. For publication of <command>INSERT</command> operations,
any column may be used in the <literal>WHERE</literal> expression. The
<literal>WHERE</literal> clause allows simple expressions that don't have
user-defined functions, user-defined operators, user-defined types,
user-defined collations, non-immutable built-in functions, or references to
system columns.
If your publication contains a partitioned table, the publication parameter
<literal>publish_via_partition_root</literal> determines if it uses the
partition's row filter (if the parameter is false, the default) or the root
partitioned table's row filter.
</para>
<para>
For an <command>INSERT ... ON CONFLICT</command> command, the publication will
publish the operation that actually results from the command. So depending
@ -247,6 +271,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
<para>
<acronym>DDL</acronym> operations are not published.
</para>
<para>
The <literal>WHERE</literal> clause expression is executed with the role used
for the replication connection.
</para>
</refsect1>
<refsect1>
@ -259,6 +288,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
</programlisting>
</para>
<para>
Create a publication that publishes all changes from active departments:
<programlisting>
CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
</programlisting>
</para>
<para>
Create a publication that publishes all changes in all tables:
<programlisting>

View File

@ -208,6 +208,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
that are being subscribed to when the replication starts.
The default is <literal>true</literal>.
</para>
<para>
If the publications contain <literal>WHERE</literal> clauses, it
will affect what data is copied. Refer to the
<xref linkend="sql-createsubscription-notes" /> for details.
</para>
</listitem>
</varlistentry>
@ -293,7 +298,7 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</variablelist>
</refsect1>
<refsect1>
<refsect1 id="sql-createsubscription-notes" xreflabel="Notes">
<title>Notes</title>
<para>
@ -319,6 +324,26 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
the parameter <literal>create_slot = false</literal>. This is an
implementation restriction that might be lifted in a future release.
</para>
<para>
If any table in the publication has a <literal>WHERE</literal> clause, rows
for which the <replaceable class="parameter">expression</replaceable>
evaluates to false or null will not be published. If the subscription has
several publications in which the same table has been published with
different <literal>WHERE</literal> clauses, a row will be published if any
of the expressions (referring to that publish operation) are satisfied. In
the case of different <literal>WHERE</literal> clauses, if one of the
publications has no <literal>WHERE</literal> clause (referring to that
publish operation) or the publication is declared as
<literal>FOR ALL TABLES</literal> or
<literal>FOR ALL TABLES IN SCHEMA</literal>, rows are always published
regardless of the definition of the other expressions.
If the subscriber is a <productname>PostgreSQL</productname> version before
15 then any row filtering is ignored during the initial data synchronization
phase. For this case, the user might want to consider deleting any initially
copied data that would be incompatible with subsequent filtering.
</para>
</refsect1>
<refsect1>

View File

@ -275,18 +275,57 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
return result;
}
/*
* Returns the relid of the topmost ancestor that is published via this
* publication if any, otherwise returns InvalidOid.
*
* Note that the list of ancestors should be ordered such that the topmost
* ancestor is at the end of the list.
*/
Oid
GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
{
ListCell *lc;
Oid topmost_relid = InvalidOid;
/*
* Find the "topmost" ancestor that is in this publication.
*/
foreach(lc, ancestors)
{
Oid ancestor = lfirst_oid(lc);
List *apubids = GetRelationPublications(ancestor);
List *aschemaPubids = NIL;
if (list_member_oid(apubids, puboid))
topmost_relid = ancestor;
else
{
aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
if (list_member_oid(aschemaPubids, puboid))
topmost_relid = ancestor;
}
list_free(apubids);
list_free(aschemaPubids);
}
return topmost_relid;
}
/*
* Insert new publication / relation mapping.
*/
ObjectAddress
publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
publication_add_relation(Oid pubid, PublicationRelInfo *pri,
bool if_not_exists)
{
Relation rel;
HeapTuple tup;
Datum values[Natts_pg_publication_rel];
bool nulls[Natts_pg_publication_rel];
Oid relid = RelationGetRelid(targetrel->relation);
Relation targetrel = pri->relation;
Oid relid = RelationGetRelid(targetrel);
Oid pubreloid;
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
@ -311,10 +350,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("relation \"%s\" is already member of publication \"%s\"",
RelationGetRelationName(targetrel->relation), pub->name)));
RelationGetRelationName(targetrel), pub->name)));
}
check_publication_add_relation(targetrel->relation);
check_publication_add_relation(targetrel);
/* Form a tuple. */
memset(values, 0, sizeof(values));
@ -328,6 +367,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
values[Anum_pg_publication_rel_prrelid - 1] =
ObjectIdGetDatum(relid);
/* Add qualifications, if available */
if (pri->whereClause != NULL)
values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
else
nulls[Anum_pg_publication_rel_prqual - 1] = true;
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
@ -345,6 +390,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
ObjectAddressSet(referenced, RelationRelationId, relid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
/* Add dependency on the objects mentioned in the qualifications */
if (pri->whereClause)
recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
false);
/* Close the table. */
table_close(rel, RowExclusiveLock);

View File

@ -26,6 +26,7 @@
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
@ -36,6 +37,10 @@
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_clause.h"
#include "parser/parse_collate.h"
#include "parser/parse_relation.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
@ -48,6 +53,19 @@
#include "utils/syscache.h"
#include "utils/varlena.h"
/*
* Information used to validate the columns in the row filter expression. See
* contain_invalid_rfcolumn_walker for details.
*/
typedef struct rf_context
{
Bitmapset *bms_replident; /* bitset of replica identity columns */
bool pubviaroot; /* true if we are validating the parent
* relation's row filter */
Oid relid; /* relid of the relation */
Oid parentid; /* relid of the parent relation */
} rf_context;
static List *OpenRelIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
@ -234,6 +252,362 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
}
}
/*
* Returns true if any of the columns used in the row filter WHERE expression is
* not part of REPLICA IDENTITY, false otherwise.
*/
static bool
contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
{
if (node == NULL)
return false;
if (IsA(node, Var))
{
Var *var = (Var *) node;
AttrNumber attnum = var->varattno;
/*
* If pubviaroot is true, we are validating the row filter of the
* parent table, but the bitmap contains the replica identity
* information of the child table. So, get the column number of the
* child table as parent and child column order could be different.
*/
if (context->pubviaroot)
{
char *colname = get_attname(context->parentid, attnum, false);
attnum = get_attnum(context->relid, colname);
}
if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
context->bms_replident))
return true;
}
return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
(void *) context);
}
/*
* Check if all columns referenced in the filter expression are part of the
* REPLICA IDENTITY index or not.
*
* Returns true if any invalid column is found.
*/
bool
contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
bool pubviaroot)
{
HeapTuple rftuple;
Oid relid = RelationGetRelid(relation);
Oid publish_as_relid = RelationGetRelid(relation);
bool result = false;
Datum rfdatum;
bool rfisnull;
/*
* FULL means all columns are in the REPLICA IDENTITY, so all columns are
* allowed in the row filter and we can skip the validation.
*/
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
return false;
/*
* For a partition, if pubviaroot is true, find the topmost ancestor that
* is published via this publication as we need to use its row filter
* expression to filter the partition's changes.
*
* Note that even though the row filter used is for an ancestor, the
* REPLICA IDENTITY used will be for the actual child table.
*/
if (pubviaroot && relation->rd_rel->relispartition)
{
publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors);
if (!OidIsValid(publish_as_relid))
publish_as_relid = relid;
}
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(publish_as_relid),
ObjectIdGetDatum(pubid));
if (!HeapTupleIsValid(rftuple))
return false;
rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
Anum_pg_publication_rel_prqual,
&rfisnull);
if (!rfisnull)
{
rf_context context = {0};
Node *rfnode;
Bitmapset *bms = NULL;
context.pubviaroot = pubviaroot;
context.parentid = publish_as_relid;
context.relid = relid;
/* Remember columns that are part of the REPLICA IDENTITY */
bms = RelationGetIndexAttrBitmap(relation,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
context.bms_replident = bms;
rfnode = stringToNode(TextDatumGetCString(rfdatum));
result = contain_invalid_rfcolumn_walker(rfnode, &context);
bms_free(bms);
pfree(rfnode);
}
ReleaseSysCache(rftuple);
return result;
}
/* check_functions_in_node callback */
static bool
contain_mutable_or_user_functions_checker(Oid func_id, void *context)
{
return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
func_id >= FirstNormalObjectId);
}
/*
* Check if the node contains any unallowed object. See
* check_simple_rowfilter_expr_walker.
*
* Returns the error detail message in errdetail_msg for unallowed expressions.
*/
static void
expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg)
{
if (IsA(node, List))
{
/*
* OK, we don't need to perform other expr checks for List nodes
* because those are undefined for List.
*/
return;
}
if (exprType(node) >= FirstNormalObjectId)
*errdetail_msg = _("User-defined types are not allowed.");
else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
(void *) pstate))
*errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
else if (exprCollation(node) >= FirstNormalObjectId ||
exprInputCollation(node) >= FirstNormalObjectId)
*errdetail_msg = _("User-defined collations are not allowed.");
}
/*
* The row filter walker checks if the row filter expression is a "simple
* expression".
*
* It allows only simple or compound expressions such as:
* - (Var Op Const)
* - (Var Op Var)
* - (Var Op Const) AND/OR (Var Op Const)
* - etc
* (where Var is a column of the table this filter belongs to)
*
* The simple expression has the following restrictions:
* - User-defined operators are not allowed;
* - User-defined functions are not allowed;
* - User-defined types are not allowed;
* - User-defined collations are not allowed;
* - Non-immutable built-in functions are not allowed;
* - System columns are not allowed.
*
* NOTES
*
* We don't allow user-defined functions/operators/types/collations because
* (a) if a user drops a user-defined object used in a row filter expression or
* if there is any other error while using it, the logical decoding
* infrastructure won't be able to recover from such an error even if the
* object is recreated again because a historic snapshot is used to evaluate
* the row filter;
* (b) a user-defined function can be used to access tables that could have
* unpleasant results because a historic snapshot is used. That's why only
* immutable built-in functions are allowed in row filter expressions.
*
* We don't allow system columns because currently, we don't have that
* information in the tuple passed to downstream. Also, as we don't replicate
* those to subscribers, there doesn't seem to be a need for a filter on those
* columns.
*
* We can allow other node types after more analysis and testing.
*/
static bool
check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
{
char *errdetail_msg = NULL;
if (node == NULL)
return false;
switch (nodeTag(node))
{
case T_Var:
/* System columns are not allowed. */
if (((Var *) node)->varattno < InvalidAttrNumber)
errdetail_msg = _("System columns are not allowed.");
break;
case T_OpExpr:
case T_DistinctExpr:
case T_NullIfExpr:
/* OK, except user-defined operators are not allowed. */
if (((OpExpr *) node)->opno >= FirstNormalObjectId)
errdetail_msg = _("User-defined operators are not allowed.");
break;
case T_ScalarArrayOpExpr:
/* OK, except user-defined operators are not allowed. */
if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
errdetail_msg = _("User-defined operators are not allowed.");
/*
* We don't need to check the hashfuncid and negfuncid of
* ScalarArrayOpExpr as those functions are only built for a
* subquery.
*/
break;
case T_RowCompareExpr:
{
ListCell *opid;
/* OK, except user-defined operators are not allowed. */
foreach(opid, ((RowCompareExpr *) node)->opnos)
{
if (lfirst_oid(opid) >= FirstNormalObjectId)
{
errdetail_msg = _("User-defined operators are not allowed.");
break;
}
}
}
break;
case T_Const:
case T_FuncExpr:
case T_BoolExpr:
case T_RelabelType:
case T_CollateExpr:
case T_CaseExpr:
case T_CaseTestExpr:
case T_ArrayExpr:
case T_RowExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_XmlExpr:
case T_NullTest:
case T_BooleanTest:
case T_List:
/* OK, supported */
break;
default:
errdetail_msg = _("Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations and immutable built-in functions.");
break;
}
/*
* For all the supported nodes, check the types, functions, and collations
* used in the nodes.
*/
if (!errdetail_msg)
expr_allowed_in_node(node, pstate, &errdetail_msg);
if (errdetail_msg)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid publication WHERE expression"),
errdetail("%s", errdetail_msg),
parser_errposition(pstate, exprLocation(node))));
return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
(void *) pstate);
}
/*
* Check if the row filter expression is a "simple expression".
*
* See check_simple_rowfilter_expr_walker for details.
*/
static bool
check_simple_rowfilter_expr(Node *node, ParseState *pstate)
{
return check_simple_rowfilter_expr_walker(node, pstate);
}
/*
* Transform the publication WHERE expression for all the relations in the list,
* ensuring it is coerced to boolean and necessary collation information is
* added if required, and add a new nsitem/RTE for the associated relation to
* the ParseState's namespace list.
*
* Also check the publication row filter expression and throw an error if
* anything not permitted or unexpected is encountered.
*/
static void
TransformPubWhereClauses(List *tables, const char *queryString,
bool pubviaroot)
{
ListCell *lc;
foreach(lc, tables)
{
ParseNamespaceItem *nsitem;
Node *whereclause = NULL;
ParseState *pstate;
PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
if (pri->whereClause == NULL)
continue;
/*
* If the publication doesn't publish changes via the root partitioned
* table, the partition's row filter will be used. So disallow using
* WHERE clause on partitioned table in this case.
*/
if (!pubviaroot &&
pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use publication WHERE clause for relation \"%s\"",
RelationGetRelationName(pri->relation)),
errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
"publish_via_partition_root")));
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
AccessShareLock, NULL,
false, false);
addNSItemToQuery(pstate, nsitem, false, true, true);
whereclause = transformWhereClause(pstate,
copyObject(pri->whereClause),
EXPR_KIND_WHERE,
"PUBLICATION WHERE");
/* Fix up collation information */
assign_expr_collations(pstate, whereclause);
/*
* We allow only simple expressions in row filters. See
* check_simple_rowfilter_expr_walker.
*/
check_simple_rowfilter_expr(whereclause, pstate);
free_parsestate(pstate);
pri->whereClause = whereclause;
}
}
/*
* Create new publication.
*/
@ -346,6 +720,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
rels = OpenTableList(relations);
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLE);
TransformPubWhereClauses(rels, pstate->p_sourcetext,
publish_via_partition_root);
PublicationAddTables(puboid, rels, true, NULL);
CloseTableList(rels);
}
@ -392,6 +770,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
bool publish_via_partition_root;
ObjectAddress obj;
Form_pg_publication pubform;
List *root_relids = NIL;
ListCell *lc;
parse_publication_options(pstate,
stmt->options,
@ -399,6 +779,65 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
&publish_via_partition_root_given,
&publish_via_partition_root);
pubform = (Form_pg_publication) GETSTRUCT(tup);
/*
* If the publication doesn't publish changes via the root partitioned
* table, the partition's row filter will be used. So disallow using WHERE
* clause on partitioned table in this case.
*/
if (!pubform->puballtables && publish_via_partition_root_given &&
!publish_via_partition_root)
{
/*
* Lock the publication so nobody else can do anything with it. This
* prevents concurrent alter to add partitioned table(s) with WHERE
* clause(s) which we don't allow when not publishing via root.
*/
LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
AccessShareLock);
root_relids = GetPublicationRelations(pubform->oid,
PUBLICATION_PART_ROOT);
foreach(lc, root_relids)
{
HeapTuple rftuple;
Oid relid = lfirst_oid(lc);
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(pubform->oid));
if (HeapTupleIsValid(rftuple) &&
!heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL))
{
HeapTuple tuple;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (HeapTupleIsValid(tuple))
{
Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple);
if (relform->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot set %s for publication \"%s\"",
"publish_via_partition_root = false",
stmt->pubname),
errdetail("The publication contains a WHERE clause for a partitioned table \"%s\" "
"which is not allowed when %s is false.",
NameStr(relform->relname),
"publish_via_partition_root")));
ReleaseSysCache(tuple);
}
ReleaseSysCache(rftuple);
}
}
}
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
@ -450,8 +889,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
* invalidate all partitions contained in the respective partition
* trees, not just those explicitly mentioned in the publication.
*/
relids = GetPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL);
if (root_relids == NIL)
relids = GetPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL);
else
{
/*
* We already got tables explicitly mentioned in the publication.
* Now get all partitions for the partitioned table in the list.
*/
foreach(lc, root_relids)
relids = GetPubPartitionOptionRelations(relids,
PUBLICATION_PART_ALL,
lfirst_oid(lc));
}
schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL);
relids = list_concat_unique_oid(relids, schemarelids);
@ -492,7 +944,8 @@ InvalidatePublicationRels(List *relids)
*/
static void
AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
List *tables, List *schemaidlist)
List *tables, List *schemaidlist,
const char *queryString)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
@ -519,6 +972,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
CheckObjSchemaNotAlreadyInPublication(rels, schemas,
PUBLICATIONOBJ_TABLE);
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
PublicationAddTables(pubid, rels, false, stmt);
}
else if (stmt->action == AP_DropObjects)
@ -533,37 +989,76 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLE);
/* Calculate which relations to drop. */
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
/*
* To recreate the relation list for the publication, look for
* existing relations that do not need to be dropped.
*/
foreach(oldlc, oldrelids)
{
Oid oldrelid = lfirst_oid(oldlc);
ListCell *newlc;
PublicationRelInfo *oldrel;
bool found = false;
HeapTuple rftuple;
bool rfisnull = true;
Node *oldrelwhereclause = NULL;
/* look up the cache for the old relmap */
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(oldrelid),
ObjectIdGetDatum(pubid));
if (HeapTupleIsValid(rftuple))
{
Datum whereClauseDatum;
whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
Anum_pg_publication_rel_prqual,
&rfisnull);
if (!rfisnull)
oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
ReleaseSysCache(rftuple);
}
foreach(newlc, rels)
{
PublicationRelInfo *newpubrel;
newpubrel = (PublicationRelInfo *) lfirst(newlc);
/*
* Check if any of the new set of relations matches with the
* existing relations in the publication. Additionally, if the
* relation has an associated WHERE clause, check the WHERE
* expressions also match. Drop the rest.
*/
if (RelationGetRelid(newpubrel->relation) == oldrelid)
{
found = true;
break;
if (equal(oldrelwhereclause, newpubrel->whereClause))
{
found = true;
break;
}
}
}
/* Not yet in the list, open it and add to the list */
if (oldrelwhereclause)
pfree(oldrelwhereclause);
/*
* Add the non-matched relations to a list so that they can be
* dropped.
*/
if (!found)
{
Relation oldrel;
PublicationRelInfo *pubrel;
/* Wrap relation into PublicationRelInfo */
oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
pubrel = palloc(sizeof(PublicationRelInfo));
pubrel->relation = oldrel;
delrels = lappend(delrels, pubrel);
oldrel = palloc(sizeof(PublicationRelInfo));
oldrel->whereClause = NULL;
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);
delrels = lappend(delrels, oldrel);
}
}
@ -720,12 +1215,15 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
{
List *relations = NIL;
List *schemaidlist = NIL;
Oid pubid = pubform->oid;
ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
&schemaidlist);
CheckAlterPublication(stmt, tup, relations, schemaidlist);
heap_freetuple(tup);
/*
* Lock the publication so nobody else can do anything with it. This
* prevents concurrent alter to add table(s) that were already going
@ -734,22 +1232,24 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
* addition of schema(s) for which there is any corresponding table
* being added by this command.
*/
LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
LockDatabaseObject(PublicationRelationId, pubid, 0,
AccessExclusiveLock);
/*
* It is possible that by the time we acquire the lock on publication,
* concurrent DDL has removed it. We can test this by checking the
* existence of publication.
* existence of publication. We get the tuple again to avoid the risk
* of any publication option getting changed.
*/
if (!SearchSysCacheExists1(PUBLICATIONOID,
ObjectIdGetDatum(pubform->oid)))
tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
if (!HeapTupleIsValid(tup))
ereport(ERROR,
errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("publication \"%s\" does not exist",
stmt->pubname));
AlterPublicationTables(stmt, tup, relations, schemaidlist);
AlterPublicationTables(stmt, tup, relations, schemaidlist,
pstate->p_sourcetext);
AlterPublicationSchemas(stmt, tup, schemaidlist);
}
@ -901,6 +1401,7 @@ OpenTableList(List *tables)
List *relids = NIL;
List *rels = NIL;
ListCell *lc;
List *relids_with_rf = NIL;
/*
* Open, share-lock, and check all the explicitly-specified relations
@ -928,15 +1429,26 @@ OpenTableList(List *tables)
*/
if (list_member_oid(relids, myrelid))
{
/* Disallow duplicate tables if there are any with row filters. */
if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
RelationGetRelationName(rel))));
table_close(rel, ShareUpdateExclusiveLock);
continue;
}
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
pub_rel->whereClause = t->whereClause;
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, myrelid);
if (t->whereClause)
relids_with_rf = lappend_oid(relids_with_rf, myrelid);
/*
* Add children of this rel, if requested, so that they too are added
* to the publication. A partitioned table can't have any inheritance
@ -963,19 +1475,39 @@ OpenTableList(List *tables)
* tables.
*/
if (list_member_oid(relids, childrelid))
{
/*
* We don't allow to specify row filter for both parent
* and child table at the same time as it is not very
* clear which one should be given preference.
*/
if (childrelid != myrelid &&
(t->whereClause || list_member_oid(relids_with_rf, childrelid)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
RelationGetRelationName(rel))));
continue;
}
/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
/* child inherits WHERE clause from parent */
pub_rel->whereClause = t->whereClause;
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, childrelid);
if (t->whereClause)
relids_with_rf = lappend_oid(relids_with_rf, childrelid);
}
}
}
list_free(relids);
list_free(relids_with_rf);
return rels;
}
@ -995,6 +1527,8 @@ CloseTableList(List *rels)
pub_rel = (PublicationRelInfo *) lfirst(lc);
table_close(pub_rel->relation, NoLock);
}
list_free_deep(rels);
}
/*
@ -1090,6 +1624,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
RelationGetRelationName(rel))));
}
if (pubrel->whereClause)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot use a WHERE clause when removing a table from a publication")));
ObjectAddressSet(obj, PublicationRelRelationId, prid);
performDeletion(&obj, DROP_CASCADE, 0);
}

View File

@ -567,15 +567,43 @@ ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
void
CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
{
PublicationActions *pubactions;
PublicationDesc pubdesc;
/* We only need to do checks for UPDATE and DELETE. */
if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
return;
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
return;
/*
* It is only safe to execute UPDATE/DELETE when all columns, referenced
* in the row filters from publications which the relation is in, are
* valid - i.e. when all referenced columns are part of REPLICA IDENTITY
* or the table does not publish UPDATEs or DELETEs.
*
* XXX We could optimize it by first checking whether any of the
* publications have a row filter for this relation. If not and relation
* has replica identity then we can avoid building the descriptor but as
* this happens only one time it doesn't seem worth the additional
* complexity.
*/
RelationBuildPublicationDesc(rel, &pubdesc);
if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot update table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("cannot delete from table \"%s\"",
RelationGetRelationName(rel)),
errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
/* If relation has replica identity we are always good. */
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
OidIsValid(RelationGetReplicaIndex(rel)))
if (OidIsValid(RelationGetReplicaIndex(rel)))
return;
/*
@ -583,14 +611,13 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
*
* Check if the table publishes UPDATES or DELETES.
*/
pubactions = GetRelationPublicationActions(rel);
if (cmd == CMD_UPDATE && pubactions->pubupdate)
if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
RelationGetRelationName(rel)),
errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
else if (cmd == CMD_DELETE && pubactions->pubdelete)
else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",

View File

@ -4849,6 +4849,7 @@ _copyPublicationTable(const PublicationTable *from)
PublicationTable *newnode = makeNode(PublicationTable);
COPY_NODE_FIELD(relation);
COPY_NODE_FIELD(whereClause);
return newnode;
}

View File

@ -2321,6 +2321,7 @@ static bool
_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
COMPARE_NODE_FIELD(whereClause);
return true;
}

View File

@ -9751,12 +9751,13 @@ CreatePublicationStmt:
* relation_expr here.
*/
PublicationObjSpec:
TABLE relation_expr
TABLE relation_expr OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_TABLE;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $2;
$$->pubtable->whereClause = $3;
}
| ALL TABLES IN_P SCHEMA ColId
{
@ -9771,28 +9772,45 @@ PublicationObjSpec:
$$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
$$->location = @5;
}
| ColId
| ColId OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->name = $1;
if ($2)
{
/*
* The OptWhereClause must be stored here but it is
* valid only for tables. For non-table objects, an
* error will be thrown later via
* preprocess_pubobj_list().
*/
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
$$->pubtable->whereClause = $2;
}
else
{
$$->name = $1;
}
$$->location = @1;
}
| ColId indirection
| ColId indirection OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
$$->pubtable->whereClause = $3;
$$->location = @1;
}
/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
| extended_relation_expr
| extended_relation_expr OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $1;
$$->pubtable->whereClause = $2;
}
| CURRENT_SCHEMA
{
@ -17448,7 +17466,8 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid table name at or near"),
parser_errposition(pubobj->location));
else if (pubobj->name)
if (pubobj->name)
{
/* convert it to PublicationTable */
PublicationTable *pubtable = makeNode(PublicationTable);
@ -17462,6 +17481,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA ||
pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
{
/* WHERE clause is not allowed on a schema object */
if (pubobj->pubtable && pubobj->pubtable->whereClause)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("WHERE clause not allowed for schema"),
parser_errposition(pubobj->location));
/*
* We can distinguish between the different type of schema
* objects based on whether name and pubtable is set.

View File

@ -31,8 +31,8 @@
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple, bool binary);
TupleTableSlot *slot,
bool binary);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
HeapTuple newtuple, bool binary)
TupleTableSlot *newslot, bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple, binary);
logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, HeapTuple newtuple, bool binary)
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
if (oldtuple != NULL)
if (oldslot != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple, binary);
logicalrep_write_tuple(out, rel, oldslot, binary);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple, binary);
logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
*/
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, bool binary)
TupleTableSlot *oldslot, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple, binary);
logicalrep_write_tuple(out, rel, oldslot, binary);
}
/*
@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
bool binary)
{
TupleDesc desc;
Datum values[MaxTupleAttributeNumber];
bool isnull[MaxTupleAttributeNumber];
Datum *values;
bool *isnull;
int i;
uint16 nliveatts = 0;
@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
}
pq_sendint16(out, nliveatts);
/* try to allocate enough memory from the get-go */
enlargeStringInfo(out, tuple->t_len +
nliveatts * (1 + 4));
heap_deform_tuple(tuple, desc, values, isnull);
slot_getallattrs(slot);
values = slot->tts_values;
isnull = slot->tts_isnull;
/* Write the values */
for (i = 0; i < desc->natts; i++)

View File

@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
* message provides during replication.
* message provides during replication. This function also returns the relation
* qualifications to be used in the COPY command.
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
LogicalRepRelation *lrel)
LogicalRepRelation *lrel, List **qual)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
ListCell *lc;
bool first;
lrel->nspname = nspname;
lrel->relname = relname;
@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->natts = natt;
walrcv_clear_result(res);
/*
* Get relation's row filter expressions. DISTINCT avoids the same
* expression of a table in multiple publications from being included
* multiple times in the final expression.
*
* We need to copy the row even if it matches just one of the
* publications, so we later combine all the quals with OR.
*
* For initial synchronization, row filtering can be ignored in following
* cases:
*
* 1) one of the subscribed publications for the table hasn't specified
* any row filter
*
* 2) one of the subscribed publications has puballtables set to true
*
* 3) one of the subscribed publications is declared as ALL TABLES IN
* SCHEMA that includes this relation
*/
if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
{
StringInfoData pub_names;
/* Build the pubname list. */
initStringInfo(&pub_names);
first = true;
foreach(lc, MySubscription->publications)
{
char *pubname = strVal(lfirst(lc));
if (first)
first = false;
else
appendStringInfoString(&pub_names, ", ");
appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
}
/* Check for row filters. */
resetStringInfo(&cmd);
appendStringInfo(&cmd,
"SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
" FROM pg_publication p"
" LEFT OUTER JOIN pg_publication_rel pr"
" ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
" LATERAL pg_get_publication_tables(p.pubname) gpt"
" WHERE gpt.relid = %u"
" AND p.pubname IN ( %s )",
lrel->remoteid,
lrel->remoteid,
pub_names.data);
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));
/*
* Multiple row filter expressions for the same table will be combined
* by COPY using OR. If any of the filter expressions for this table
* are null, it means the whole table will be copied. In this case it
* is not necessary to construct a unified row filter expression at
* all.
*/
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
Datum rf = slot_getattr(slot, 1, &isnull);
if (!isnull)
*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
else
{
/* Ignore filters and cleanup as necessary. */
if (*qual)
{
list_free_deep(*qual);
*qual = NIL;
}
break;
}
ExecClearTuple(slot);
}
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
}
pfree(cmd.data);
}
@ -811,6 +907,7 @@ copy_table(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
List *qual = NIL;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
@ -819,7 +916,7 @@ copy_table(Relation rel)
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
RelationGetRelationName(rel), &lrel);
RelationGetRelationName(rel), &lrel, &qual);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@ -830,14 +927,18 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
if (lrel.relkind == RELKIND_RELATION)
/* Regular table with no row filter */
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
appendStringInfo(&cmd, "COPY %s TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
else
{
/*
* For non-tables, we need to do COPY (SELECT ...), but we can't just
* do SELECT * because we need to not copy generated columns.
* For non-tables and tables with row filters, we need to do COPY
* (SELECT ...), but we can't just do SELECT * because we need to not
* copy generated columns. For tables with any row filters, build a
* SELECT query with OR'ed row filters for COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)
@ -846,8 +947,33 @@ copy_table(Relation rel)
if (i < lrel.natts - 1)
appendStringInfoString(&cmd, ", ");
}
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
appendStringInfoString(&cmd, " FROM ");
/*
* For regular tables, make sure we don't copy data from a child that
* inherits the named table as those will be copied separately.
*/
if (lrel.relkind == RELKIND_RELATION)
appendStringInfoString(&cmd, "ONLY ");
appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
/* list of OR'ed filters */
if (qual != NIL)
{
ListCell *lc;
char *q = strVal(linitial(qual));
appendStringInfo(&cmd, " WHERE %s", q);
for_each_from(lc, qual, 1)
{
q = strVal(lfirst(lc));
appendStringInfo(&cmd, " OR %s", q);
}
list_free_deep(qual);
}
appendStringInfoString(&cmd, ") TO STDOUT");
}
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);

File diff suppressed because it is too large Load Diff

View File

@ -66,6 +66,7 @@
#include "catalog/schemapg.h"
#include "catalog/storage.h"
#include "commands/policy.h"
#include "commands/publicationcmds.h"
#include "commands/trigger.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
@ -2419,8 +2420,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
bms_free(relation->rd_pkattr);
bms_free(relation->rd_idattr);
bms_free(relation->rd_hotblockingattr);
if (relation->rd_pubactions)
pfree(relation->rd_pubactions);
if (relation->rd_pubdesc)
pfree(relation->rd_pubdesc);
if (relation->rd_options)
pfree(relation->rd_options);
if (relation->rd_indextuple)
@ -5523,38 +5524,57 @@ RelationGetExclusionInfo(Relation indexRelation,
}
/*
* Get publication actions for the given relation.
* Get the publication information for the given relation.
*
* Traverse all the publications which the relation is in to get the
* publication actions and validate the row filter expressions for such
* publications if any. We consider the row filter expression as invalid if it
* references any column which is not part of REPLICA IDENTITY.
*
* To avoid fetching the publication information repeatedly, we cache the
* publication actions and row filter validation information.
*/
struct PublicationActions *
GetRelationPublicationActions(Relation relation)
void
RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
{
List *puboids;
ListCell *lc;
MemoryContext oldcxt;
Oid schemaid;
PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
List *ancestors = NIL;
Oid relid = RelationGetRelid(relation);
/*
* If not publishable, it publishes no actions. (pgoutput_change() will
* ignore it.)
*/
if (!is_publishable_relation(relation))
return pubactions;
{
memset(pubdesc, 0, sizeof(PublicationDesc));
pubdesc->rf_valid_for_update = true;
pubdesc->rf_valid_for_delete = true;
return;
}
if (relation->rd_pubactions)
return memcpy(pubactions, relation->rd_pubactions,
sizeof(PublicationActions));
if (relation->rd_pubdesc)
{
memcpy(pubdesc, relation->rd_pubdesc, sizeof(PublicationDesc));
return;
}
memset(pubdesc, 0, sizeof(PublicationDesc));
pubdesc->rf_valid_for_update = true;
pubdesc->rf_valid_for_delete = true;
/* Fetch the publication membership info. */
puboids = GetRelationPublications(RelationGetRelid(relation));
puboids = GetRelationPublications(relid);
schemaid = RelationGetNamespace(relation);
puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid));
if (relation->rd_rel->relispartition)
{
/* Add publications that the ancestors are in too. */
List *ancestors = get_partition_ancestors(RelationGetRelid(relation));
ListCell *lc;
ancestors = get_partition_ancestors(relid);
foreach(lc, ancestors)
{
@ -5582,35 +5602,53 @@ GetRelationPublicationActions(Relation relation)
pubform = (Form_pg_publication) GETSTRUCT(tup);
pubactions->pubinsert |= pubform->pubinsert;
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
pubactions->pubtruncate |= pubform->pubtruncate;
pubdesc->pubactions.pubinsert |= pubform->pubinsert;
pubdesc->pubactions.pubupdate |= pubform->pubupdate;
pubdesc->pubactions.pubdelete |= pubform->pubdelete;
pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
/*
* Check if all columns referenced in the filter expression are part of
* the REPLICA IDENTITY index or not.
*
* If the publication is FOR ALL TABLES then it means the table has no
* row filters and we can skip the validation.
*/
if (!pubform->puballtables &&
(pubform->pubupdate || pubform->pubdelete) &&
contain_invalid_rfcolumn(pubid, relation, ancestors,
pubform->pubviaroot))
{
if (pubform->pubupdate)
pubdesc->rf_valid_for_update = false;
if (pubform->pubdelete)
pubdesc->rf_valid_for_delete = false;
}
ReleaseSysCache(tup);
/*
* If we know everything is replicated, there is no point to check for
* other publications.
* If we know everything is replicated and the row filter is invalid
* for update and delete, there is no point to check for other
* publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
pubactions->pubdelete && pubactions->pubtruncate)
if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
!pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete)
break;
}
if (relation->rd_pubactions)
if (relation->rd_pubdesc)
{
pfree(relation->rd_pubactions);
relation->rd_pubactions = NULL;
pfree(relation->rd_pubdesc);
relation->rd_pubdesc = NULL;
}
/* Now save copy of the actions in the relcache entry. */
/* Now save copy of the descriptor in the relcache entry. */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
relation->rd_pubactions = palloc(sizeof(PublicationActions));
memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
relation->rd_pubdesc = palloc(sizeof(PublicationDesc));
memcpy(relation->rd_pubdesc, pubdesc, sizeof(PublicationDesc));
MemoryContextSwitchTo(oldcxt);
return pubactions;
}
/*
@ -6163,7 +6201,7 @@ load_relcache_init_file(bool shared)
rel->rd_pkattr = NULL;
rel->rd_idattr = NULL;
rel->rd_hotblockingattr = NULL;
rel->rd_pubactions = NULL;
rel->rd_pubdesc = NULL;
rel->rd_statvalid = false;
rel->rd_statlist = NIL;
rel->rd_fkeyvalid = false;

View File

@ -4074,6 +4074,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
int i_oid;
int i_prpubid;
int i_prrelid;
int i_prrelqual;
int i,
j,
ntups;
@ -4084,9 +4085,16 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
query = createPQExpBuffer();
/* Collect all publication membership info. */
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid "
"FROM pg_catalog.pg_publication_rel");
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid, "
"pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual "
"FROM pg_catalog.pg_publication_rel");
else
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid, "
"NULL AS prrelqual "
"FROM pg_catalog.pg_publication_rel");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
@ -4095,6 +4103,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
i_oid = PQfnumber(res, "oid");
i_prpubid = PQfnumber(res, "prpubid");
i_prrelid = PQfnumber(res, "prrelid");
i_prrelqual = PQfnumber(res, "prrelqual");
/* this allocation may be more than we need */
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@ -4135,6 +4144,10 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
pubrinfo[j].dobj.name = tbinfo->dobj.name;
pubrinfo[j].publication = pubinfo;
pubrinfo[j].pubtable = tbinfo;
if (PQgetisnull(res, i, i_prrelqual))
pubrinfo[j].pubrelqual = NULL;
else
pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
/* Decide whether we want to dump it */
selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
@ -4212,8 +4225,17 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
fmtId(pubinfo->dobj.name));
appendPQExpBuffer(query, " %s;\n",
appendPQExpBuffer(query, " %s",
fmtQualifiedDumpable(tbinfo));
if (pubrinfo->pubrelqual)
{
/*
* It's necessary to add parentheses around the expression because
* pg_get_expr won't supply the parentheses for things like WHERE TRUE.
*/
appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
}
appendPQExpBufferStr(query, ";\n");
/*
* There is no point in creating a drop query as the drop is done by table

View File

@ -631,6 +631,7 @@ typedef struct _PublicationRelInfo
DumpableObject dobj;
PublicationInfo *publication;
TableInfo *pubtable;
char *pubrelqual;
} PublicationRelInfo;
/*

View File

@ -2879,17 +2879,21 @@ describeOneTableDetails(const char *schemaname,
{
printfPQExpBuffer(&buf,
"SELECT pubname\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
" JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n"
" JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n"
"WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n"
"UNION\n"
"SELECT pubname\n"
" , pg_get_expr(pr.prqual, c.oid)\n"
"FROM pg_catalog.pg_publication p\n"
" JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
" JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n"
"WHERE pr.prrelid = '%s'\n"
"UNION\n"
"SELECT pubname\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
"ORDER BY 1;",
@ -2899,11 +2903,13 @@ describeOneTableDetails(const char *schemaname,
{
printfPQExpBuffer(&buf,
"SELECT pubname\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
"WHERE pr.prrelid = '%s'\n"
"UNION ALL\n"
"SELECT pubname\n"
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
"ORDER BY 1;",
@ -2925,6 +2931,11 @@ describeOneTableDetails(const char *schemaname,
printfPQExpBuffer(&buf, " \"%s\"",
PQgetvalue(result, i, 0));
/* row filter (if any) */
if (!PQgetisnull(result, i, 1))
appendPQExpBuffer(&buf, " WHERE %s",
PQgetvalue(result, i, 1));
printTableAddFooter(&cont, buf.data);
}
PQclear(result);
@ -5874,8 +5885,12 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
for (i = 0; i < count; i++)
{
if (!singlecol)
{
printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1));
if (!PQgetisnull(res, i, 2))
appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2));
}
else
printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0));
@ -6004,8 +6019,15 @@ describePublications(const char *pattern)
{
/* Get the tables for the specified publication */
printfPQExpBuffer(&buf,
"SELECT n.nspname, c.relname\n"
"FROM pg_catalog.pg_class c,\n"
"SELECT n.nspname, c.relname");
if (pset.sversion >= 150000)
appendPQExpBufferStr(&buf,
", pg_get_expr(pr.prqual, c.oid)");
else
appendPQExpBufferStr(&buf,
", NULL");
appendPQExpBuffer(&buf,
"\nFROM pg_catalog.pg_class c,\n"
" pg_catalog.pg_namespace n,\n"
" pg_catalog.pg_publication_rel pr\n"
"WHERE c.relnamespace = n.oid\n"

View File

@ -1787,6 +1787,20 @@ psql_completion(const char *text, int start, int end)
(HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") &&
ends_with(prev_wd, ',')))
COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables);
/*
* "ALTER PUBLICATION <name> SET TABLE <name> WHERE (" - complete with
* table attributes
*
* "ALTER PUBLICATION <name> ADD TABLE <name> WHERE (" - complete with
* table attributes
*/
else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("WHERE"))
COMPLETE_WITH("(");
else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("WHERE", "("))
COMPLETE_WITH_ATTR(prev3_wd);
else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") &&
!TailMatches("WHERE", "(*)"))
COMPLETE_WITH(",", "WHERE (");
else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE"))
COMPLETE_WITH(",");
/* ALTER PUBLICATION <name> DROP */
@ -2919,12 +2933,23 @@ psql_completion(const char *text, int start, int end)
COMPLETE_WITH("TABLES", "TABLES IN SCHEMA");
else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES"))
COMPLETE_WITH("IN SCHEMA", "WITH (");
else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny))
COMPLETE_WITH("WITH (");
else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny) && !ends_with(prev_wd, ','))
COMPLETE_WITH("WHERE (", "WITH (");
/* Complete "CREATE PUBLICATION <name> FOR TABLE" with "<table>, ..." */
else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE"))
COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables);
/*
* "CREATE PUBLICATION <name> FOR TABLE <name> WHERE (" - complete with
* table attributes
*/
else if (HeadMatches("CREATE", "PUBLICATION", MatchAny) && TailMatches("WHERE"))
COMPLETE_WITH("(");
else if (HeadMatches("CREATE", "PUBLICATION", MatchAny) && TailMatches("WHERE", "("))
COMPLETE_WITH_ATTR(prev3_wd);
else if (HeadMatches("CREATE", "PUBLICATION", MatchAny) && TailMatches("WHERE", "(*)"))
COMPLETE_WITH(" WITH (");
/*
* Complete "CREATE PUBLICATION <name> FOR ALL TABLES IN SCHEMA <schema>,
* ..."

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202202141
#define CATALOG_VERSION_NO 202202221
#endif

View File

@ -74,6 +74,19 @@ typedef struct PublicationActions
bool pubtruncate;
} PublicationActions;
typedef struct PublicationDesc
{
PublicationActions pubactions;
/*
* true if the columns referenced in row filters which are used for UPDATE
* or DELETE are part of the replica identity or the publication actions
* do not include UPDATE or DELETE.
*/
bool rf_valid_for_update;
bool rf_valid_for_delete;
} PublicationDesc;
typedef struct Publication
{
Oid oid;
@ -86,6 +99,7 @@ typedef struct Publication
typedef struct PublicationRelInfo
{
Relation relation;
Node *whereClause;
} PublicationRelInfo;
extern Publication *GetPublication(Oid pubid);
@ -120,10 +134,11 @@ extern List *GetAllSchemaPublicationRelations(Oid puboid,
extern List *GetPubPartitionOptionRelations(List *result,
PublicationPartOpt pub_partopt,
Oid relid);
extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors);
extern bool is_publishable_relation(Relation rel);
extern bool is_schema_publication(Oid pubid);
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri,
bool if_not_exists);
extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
bool if_not_exists);
@ -131,5 +146,4 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
#endif /* PG_PUBLICATION_H */

View File

@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
Oid oid; /* oid */
Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
pg_node_tree prqual; /* qualifications */
#endif
} FormData_pg_publication_rel;
/* ----------------
@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
*/
typedef FormData_pg_publication_rel *Form_pg_publication_rel;
DECLARE_TOAST(pg_publication_rel, 8287, 8288);
DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
DECLARE_INDEX(pg_publication_rel_prpubid_index, 6116, PublicationRelPrpubidIndexId, on pg_publication_rel using btree(prpubid oid_ops));

View File

@ -31,5 +31,7 @@ extern void RemovePublicationSchemaById(Oid psoid);
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
extern void InvalidatePublicationRels(List *relids);
extern bool contain_invalid_rfcolumn(Oid pubid, Relation relation,
List *ancestors, bool pubviaroot);
#endif /* PUBLICATIONCMDS_H */

View File

@ -3651,6 +3651,7 @@ typedef struct PublicationTable
{
NodeTag type;
RangeVar *relation; /* relation to be published */
Node *whereClause; /* qualifications */
} PublicationTable;
/*

View File

@ -14,6 +14,7 @@
#define LOGICAL_PROTO_H
#include "access/xact.h"
#include "executor/tuptable.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
@ -206,17 +207,19 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
Relation rel, HeapTuple newtuple,
Relation rel,
TupleTableSlot *newslot,
bool binary);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
HeapTuple newtuple, bool binary);
Relation rel,
TupleTableSlot *oldslot,
TupleTableSlot *newslot, bool binary);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
Relation rel, TupleTableSlot *oldtuple,
bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);

View File

@ -19,6 +19,7 @@ typedef struct PGOutputData
{
MemoryContext context; /* private memory context for transient
* allocations */
MemoryContext cachectx; /* private memory context for cache data */
/* client-supplied info: */
uint32 protocol_version;

View File

@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf
* respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of
* logical decoding don't have to care about these.
*/
enum ReorderBufferChangeType
typedef enum ReorderBufferChangeType
{
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
@ -66,7 +66,7 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
REORDER_BUFFER_CHANGE_TRUNCATE,
REORDER_BUFFER_CHANGE_SEQUENCE
};
} ReorderBufferChangeType;
/* forward declaration */
struct ReorderBufferTXN;
@ -83,7 +83,7 @@ typedef struct ReorderBufferChange
XLogRecPtr lsn;
/* The type of change. */
enum ReorderBufferChangeType action;
ReorderBufferChangeType action;
/* Transaction this change belongs to. */
struct ReorderBufferTXN *txn;

View File

@ -161,7 +161,7 @@ typedef struct RelationData
Bitmapset *rd_idattr; /* included in replica identity index */
Bitmapset *rd_hotblockingattr; /* cols blocking HOT update */
PublicationActions *rd_pubactions; /* publication actions */
PublicationDesc *rd_pubdesc; /* publication descriptor, or NULL */
/*
* rd_options is set whenever rd_rel is loaded into the relcache entry.

View File

@ -74,8 +74,9 @@ extern void RelationGetExclusionInfo(Relation indexRelation,
extern void RelationInitIndexAccessInfo(Relation relation);
/* caller must include pg_publication.h */
struct PublicationActions;
extern struct PublicationActions *GetRelationPublicationActions(Relation relation);
struct PublicationDesc;
extern void RelationBuildPublicationDesc(Relation relation,
struct PublicationDesc *pubdesc);
extern void RelationInitTableAccessMethod(Relation relation);

View File

@ -239,6 +239,358 @@ ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
UPDATE testpub_parted2 SET a = 2;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Tests for row filters
CREATE TABLE testpub_rf_tbl1 (a integer, b text);
CREATE TABLE testpub_rf_tbl2 (c text, d integer);
CREATE TABLE testpub_rf_tbl3 (e integer);
CREATE TABLE testpub_rf_tbl4 (g text);
CREATE TABLE testpub_rf_tbl5 (a xml);
CREATE SCHEMA testpub_rf_schema1;
CREATE TABLE testpub_rf_schema1.testpub_rf_tbl5 (h integer);
CREATE SCHEMA testpub_rf_schema2;
CREATE TABLE testpub_rf_schema2.testpub_rf_tbl6 (i integer);
SET client_min_messages = 'ERROR';
-- Firstly, test using the option publish='insert' because the row filter
-- validation of referenced columns is less strict than for delete/update.
CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub5
Publication testpub5
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------------------+------------+---------+---------+---------+-----------+----------
regress_publication_user | f | t | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
\dRp+ testpub5
Publication testpub5
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------------------+------------+---------+---------+---------+-----------+----------
regress_publication_user | f | t | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
"public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
\dRp+ testpub5
Publication testpub5
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------------------+------------+---------+---------+---------+-----------+----------
regress_publication_user | f | t | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
\dRp+ testpub5
Publication testpub5
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------------------+------------+---------+---------+---------+-----------+----------
regress_publication_user | f | t | f | f | f | f
Tables:
"public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
-- test \d <tablename> (now it displays filter information)
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_rf_yes FOR TABLE testpub_rf_tbl1 WHERE (a > 1) WITH (publish = 'insert');
CREATE PUBLICATION testpub_rf_no FOR TABLE testpub_rf_tbl1;
RESET client_min_messages;
\d testpub_rf_tbl1
Table "public.testpub_rf_tbl1"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
a | integer | | |
b | text | | |
Publications:
"testpub_rf_no"
"testpub_rf_yes" WHERE (a > 1)
DROP PUBLICATION testpub_rf_yes, testpub_rf_no;
-- some more syntax tests to exercise other parser pathways
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub_syntax1
Publication testpub_syntax1
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------------------+------------+---------+---------+---------+-----------+----------
regress_publication_user | f | t | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl3" WHERE (e < 999)
DROP PUBLICATION testpub_syntax1;
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub_syntax2
Publication testpub_syntax2
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------------------+------------+---------+---------+---------+-----------+----------
regress_publication_user | f | t | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
DROP PUBLICATION testpub_syntax2;
-- fail - schemas don't allow WHERE clause
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1 WHERE (a = 123);
ERROR: syntax error at or near "WHERE"
LINE 1: ...ntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1 WHERE (a =...
^
CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1, testpub_rf_schema1 WHERE (a = 123);
ERROR: WHERE clause not allowed for schema
LINE 1: ...tax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1, testpub_rf...
^
RESET client_min_messages;
-- fail - duplicate tables are not allowed if that table has any WHERE clause
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1 WHERE (a = 1), testpub_rf_tbl1 WITH (publish = 'insert');
ERROR: conflicting or redundant WHERE clauses for table "testpub_rf_tbl1"
CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1, testpub_rf_tbl1 WHERE (a = 2) WITH (publish = 'insert');
ERROR: conflicting or redundant WHERE clauses for table "testpub_rf_tbl1"
RESET client_min_messages;
-- fail - publication WHERE clause must be boolean
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (1234);
ERROR: argument of PUBLICATION WHERE must be type boolean, not type integer
LINE 1: ...PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (1234);
^
-- fail - aggregate functions not allowed in WHERE clause
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e < AVG(e));
ERROR: aggregate functions are not allowed in WHERE
LINE 1: ...ATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e < AVG(e));
^
-- fail - user-defined operators are not allowed
CREATE FUNCTION testpub_rf_func1(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL;
CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func1, LEFTARG = integer, RIGHTARG = integer);
CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
ERROR: invalid publication WHERE expression
LINE 1: ...ICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
^
DETAIL: User-defined operators are not allowed.
-- fail - user-defined functions are not allowed
CREATE FUNCTION testpub_rf_func2() RETURNS integer AS $$ BEGIN RETURN 123; END; $$ LANGUAGE plpgsql;
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a >= testpub_rf_func2());
ERROR: invalid publication WHERE expression
LINE 1: ...ON testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a >= testpub_rf...
^
DETAIL: User-defined or built-in mutable functions are not allowed.
-- fail - non-immutable functions are not allowed. random() is volatile.
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a < random());
ERROR: invalid publication WHERE expression
LINE 1: ...ION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a < random());
^
DETAIL: User-defined or built-in mutable functions are not allowed.
-- fail - user-defined collations are not allowed
CREATE COLLATION user_collation FROM "C";
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (b < '2' COLLATE user_collation);
ERROR: invalid publication WHERE expression
LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (b < '2' CO...
^
DETAIL: User-defined collations are not allowed.
-- ok - NULLIF is allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (NULLIF(1,2) = a);
-- ok - built-in operators are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IS NULL);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE ((a > 5) IS FALSE);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IS DISTINCT FROM 5);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE ((a, a + 1) < (2, 3));
-- ok - built-in type coercions between two binary compatible datatypes are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (b::varchar < '2');
-- ok - immutable built-in functions are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
-- fail - user-defined types are not allowed
CREATE TYPE rf_bug_status AS ENUM ('new', 'open', 'closed');
CREATE TABLE rf_bug (id serial, description text, status rf_bug_status);
CREATE PUBLICATION testpub6 FOR TABLE rf_bug WHERE (status = 'open') WITH (publish = 'insert');
ERROR: invalid publication WHERE expression
LINE 1: ...EATE PUBLICATION testpub6 FOR TABLE rf_bug WHERE (status = '...
^
DETAIL: User-defined types are not allowed.
DROP TABLE rf_bug;
DROP TYPE rf_bug_status;
-- fail - row filter expression is not simple
CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl1 WHERE (a IN (SELECT generate_series(1,5)));
ERROR: invalid publication WHERE expression
LINE 1: ...ICATION testpub6 FOR TABLE testpub_rf_tbl1 WHERE (a IN (SELE...
^
DETAIL: Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations and immutable built-in functions.
-- fail - system columns are not allowed
CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl1 WHERE ('(0,1)'::tid = ctid);
ERROR: invalid publication WHERE expression
LINE 1: ...tpub6 FOR TABLE testpub_rf_tbl1 WHERE ('(0,1)'::tid = ctid);
^
DETAIL: System columns are not allowed.
-- ok - conditional expressions are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (a IS DOCUMENT);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE a));
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (NULLIF(1, 2) = a);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (CASE a WHEN 5 THEN true ELSE false END);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (COALESCE(b, 'foo') = 'foo');
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (GREATEST(a, 10) > 10);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IN (2, 4, 6));
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (ARRAY[a] <@ ARRAY[2, 4, 6]);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (ROW(a, 2) IS NULL);
-- fail - WHERE not allowed in DROP
ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl1 WHERE (e < 27);
ERROR: cannot use a WHERE clause when removing a table from a publication
-- fail - cannot ALTER SET table which is a member of a pre-existing schema
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub6 FOR ALL TABLES IN SCHEMA testpub_rf_schema2;
ALTER PUBLICATION testpub6 SET ALL TABLES IN SCHEMA testpub_rf_schema2, TABLE testpub_rf_schema2.testpub_rf_tbl6 WHERE (i < 99);
ERROR: cannot add relation "testpub_rf_schema2.testpub_rf_tbl6" to publication
DETAIL: Table's schema "testpub_rf_schema2" is already part of the publication or part of the specified schema list.
RESET client_min_messages;
DROP TABLE testpub_rf_tbl1;
DROP TABLE testpub_rf_tbl2;
DROP TABLE testpub_rf_tbl3;
DROP TABLE testpub_rf_tbl4;
DROP TABLE testpub_rf_tbl5;
DROP TABLE testpub_rf_schema1.testpub_rf_tbl5;
DROP TABLE testpub_rf_schema2.testpub_rf_tbl6;
DROP SCHEMA testpub_rf_schema1;
DROP SCHEMA testpub_rf_schema2;
DROP PUBLICATION testpub5;
DROP PUBLICATION testpub6;
DROP OPERATOR =#>(integer, integer);
DROP FUNCTION testpub_rf_func1(integer, integer);
DROP FUNCTION testpub_rf_func2();
DROP COLLATION user_collation;
-- ======================================================
-- More row filter tests for validating column references
CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int);
CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b));
CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a);
CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY);
ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10);
-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing)
-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK.
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk WHERE (a > 99);
RESET client_min_messages;
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (b > 99);
-- ok - "b" is a PK col
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- fail - "c" is not part of the PK
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (d > 99);
-- fail - "d" is not part of the PK
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- fail - "a" is not part of REPLICA IDENTITY
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
-- Case 2. REPLICA IDENTITY FULL
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- ok - "c" is in REPLICA IDENTITY now even though not in PK
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- ok - "a" is in REPLICA IDENTITY now
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 3. REPLICA IDENTITY NOTHING
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (a > 99);
-- fail - "a" is in PK but it is not part of REPLICA IDENTITY NOTHING
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- fail - "c" is not in PK and not in REPLICA IDENTITY NOTHING
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- fail - "a" is not in REPLICA IDENTITY NOTHING
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
-- Case 4. REPLICA IDENTITY INDEX
ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c);
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c;
ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c);
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (a > 99);
-- fail - "a" is in PK but it is not part of REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_pk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- ok - "c" is not in PK but it is part of REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- fail - "a" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_nopk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_nopk"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (c > 99);
-- ok - "c" is part of REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Tests for partitioned table
-- set PUBLISH_VIA_PARTITION_ROOT to false and test row filter for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - cannot use row filter for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk WHERE (a > 99);
ERROR: cannot use publication WHERE clause for relation "rf_tbl_abcd_part_pk"
DETAIL: WHERE clause cannot be used for a partitioned table when publish_via_partition_root is false.
-- ok - can use row filter for partition
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 WHERE (a > 99);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- set PUBLISH_VIA_PARTITION_ROOT to true and test row filter for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use row filter for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk WHERE (a > 99);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any row filter is
-- used for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
ERROR: cannot set publish_via_partition_root = false for publication "testpub6"
DETAIL: The publication contains a WHERE clause for a partitioned table "rf_tbl_abcd_part_pk" which is not allowed when publish_via_partition_root is false.
-- Now change the root filter to use a column "b"
-- (which is not in the replica identity)
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 WHERE (b > 99);
-- ok - we don't have row filter for partitioned table.
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_part_pk_1"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
-- set PUBLISH_VIA_PARTITION_ROOT to true
-- can use row filter for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use row filter for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk WHERE (b > 99);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
ERROR: cannot update table "rf_tbl_abcd_part_pk_1"
DETAIL: Column used in the publication WHERE expression is not part of the replica identity.
DROP PUBLICATION testpub6;
DROP TABLE rf_tbl_abcd_pk;
DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
CREATE TABLE testpub_tbl4(a int);

View File

@ -134,6 +134,242 @@ UPDATE testpub_parted2 SET a = 2;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Tests for row filters
CREATE TABLE testpub_rf_tbl1 (a integer, b text);
CREATE TABLE testpub_rf_tbl2 (c text, d integer);
CREATE TABLE testpub_rf_tbl3 (e integer);
CREATE TABLE testpub_rf_tbl4 (g text);
CREATE TABLE testpub_rf_tbl5 (a xml);
CREATE SCHEMA testpub_rf_schema1;
CREATE TABLE testpub_rf_schema1.testpub_rf_tbl5 (h integer);
CREATE SCHEMA testpub_rf_schema2;
CREATE TABLE testpub_rf_schema2.testpub_rf_tbl6 (i integer);
SET client_min_messages = 'ERROR';
-- Firstly, test using the option publish='insert' because the row filter
-- validation of referenced columns is less strict than for delete/update.
CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub5
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
\dRp+ testpub5
ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
\dRp+ testpub5
-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
\dRp+ testpub5
-- test \d <tablename> (now it displays filter information)
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_rf_yes FOR TABLE testpub_rf_tbl1 WHERE (a > 1) WITH (publish = 'insert');
CREATE PUBLICATION testpub_rf_no FOR TABLE testpub_rf_tbl1;
RESET client_min_messages;
\d testpub_rf_tbl1
DROP PUBLICATION testpub_rf_yes, testpub_rf_no;
-- some more syntax tests to exercise other parser pathways
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub_syntax1
DROP PUBLICATION testpub_syntax1;
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub_syntax2
DROP PUBLICATION testpub_syntax2;
-- fail - schemas don't allow WHERE clause
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1 WHERE (a = 123);
CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1, testpub_rf_schema1 WHERE (a = 123);
RESET client_min_messages;
-- fail - duplicate tables are not allowed if that table has any WHERE clause
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1 WHERE (a = 1), testpub_rf_tbl1 WITH (publish = 'insert');
CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1, testpub_rf_tbl1 WHERE (a = 2) WITH (publish = 'insert');
RESET client_min_messages;
-- fail - publication WHERE clause must be boolean
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (1234);
-- fail - aggregate functions not allowed in WHERE clause
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e < AVG(e));
-- fail - user-defined operators are not allowed
CREATE FUNCTION testpub_rf_func1(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL;
CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func1, LEFTARG = integer, RIGHTARG = integer);
CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
-- fail - user-defined functions are not allowed
CREATE FUNCTION testpub_rf_func2() RETURNS integer AS $$ BEGIN RETURN 123; END; $$ LANGUAGE plpgsql;
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a >= testpub_rf_func2());
-- fail - non-immutable functions are not allowed. random() is volatile.
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a < random());
-- fail - user-defined collations are not allowed
CREATE COLLATION user_collation FROM "C";
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (b < '2' COLLATE user_collation);
-- ok - NULLIF is allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (NULLIF(1,2) = a);
-- ok - built-in operators are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IS NULL);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE ((a > 5) IS FALSE);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IS DISTINCT FROM 5);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE ((a, a + 1) < (2, 3));
-- ok - built-in type coercions between two binary compatible datatypes are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (b::varchar < '2');
-- ok - immutable built-in functions are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
-- fail - user-defined types are not allowed
CREATE TYPE rf_bug_status AS ENUM ('new', 'open', 'closed');
CREATE TABLE rf_bug (id serial, description text, status rf_bug_status);
CREATE PUBLICATION testpub6 FOR TABLE rf_bug WHERE (status = 'open') WITH (publish = 'insert');
DROP TABLE rf_bug;
DROP TYPE rf_bug_status;
-- fail - row filter expression is not simple
CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl1 WHERE (a IN (SELECT generate_series(1,5)));
-- fail - system columns are not allowed
CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl1 WHERE ('(0,1)'::tid = ctid);
-- ok - conditional expressions are allowed
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (a IS DOCUMENT);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE a));
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (NULLIF(1, 2) = a);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (CASE a WHEN 5 THEN true ELSE false END);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (COALESCE(b, 'foo') = 'foo');
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (GREATEST(a, 10) > 10);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IN (2, 4, 6));
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (ARRAY[a] <@ ARRAY[2, 4, 6]);
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (ROW(a, 2) IS NULL);
-- fail - WHERE not allowed in DROP
ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl1 WHERE (e < 27);
-- fail - cannot ALTER SET table which is a member of a pre-existing schema
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub6 FOR ALL TABLES IN SCHEMA testpub_rf_schema2;
ALTER PUBLICATION testpub6 SET ALL TABLES IN SCHEMA testpub_rf_schema2, TABLE testpub_rf_schema2.testpub_rf_tbl6 WHERE (i < 99);
RESET client_min_messages;
DROP TABLE testpub_rf_tbl1;
DROP TABLE testpub_rf_tbl2;
DROP TABLE testpub_rf_tbl3;
DROP TABLE testpub_rf_tbl4;
DROP TABLE testpub_rf_tbl5;
DROP TABLE testpub_rf_schema1.testpub_rf_tbl5;
DROP TABLE testpub_rf_schema2.testpub_rf_tbl6;
DROP SCHEMA testpub_rf_schema1;
DROP SCHEMA testpub_rf_schema2;
DROP PUBLICATION testpub5;
DROP PUBLICATION testpub6;
DROP OPERATOR =#>(integer, integer);
DROP FUNCTION testpub_rf_func1(integer, integer);
DROP FUNCTION testpub_rf_func2();
DROP COLLATION user_collation;
-- ======================================================
-- More row filter tests for validating column references
CREATE TABLE rf_tbl_abcd_nopk(a int, b int, c int, d int);
CREATE TABLE rf_tbl_abcd_pk(a int, b int, c int, d int, PRIMARY KEY(a,b));
CREATE TABLE rf_tbl_abcd_part_pk (a int PRIMARY KEY, b int) PARTITION by RANGE (a);
CREATE TABLE rf_tbl_abcd_part_pk_1 (b int, a int PRIMARY KEY);
ALTER TABLE rf_tbl_abcd_part_pk ATTACH PARTITION rf_tbl_abcd_part_pk_1 FOR VALUES FROM (1) TO (10);
-- Case 1. REPLICA IDENTITY DEFAULT (means use primary key or nothing)
-- 1a. REPLICA IDENTITY is DEFAULT and table has a PK.
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub6 FOR TABLE rf_tbl_abcd_pk WHERE (a > 99);
RESET client_min_messages;
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (b > 99);
-- ok - "b" is a PK col
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- fail - "c" is not part of the PK
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (d > 99);
-- fail - "d" is not part of the PK
UPDATE rf_tbl_abcd_pk SET a = 1;
-- 1b. REPLICA IDENTITY is DEFAULT and table has no PK
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- fail - "a" is not part of REPLICA IDENTITY
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 2. REPLICA IDENTITY FULL
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY FULL;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY FULL;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- ok - "c" is in REPLICA IDENTITY now even though not in PK
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- ok - "a" is in REPLICA IDENTITY now
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 3. REPLICA IDENTITY NOTHING
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY NOTHING;
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY NOTHING;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (a > 99);
-- fail - "a" is in PK but it is not part of REPLICA IDENTITY NOTHING
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- fail - "c" is not in PK and not in REPLICA IDENTITY NOTHING
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- fail - "a" is not in REPLICA IDENTITY NOTHING
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Case 4. REPLICA IDENTITY INDEX
ALTER TABLE rf_tbl_abcd_pk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_pk_c ON rf_tbl_abcd_pk(c);
ALTER TABLE rf_tbl_abcd_pk REPLICA IDENTITY USING INDEX idx_abcd_pk_c;
ALTER TABLE rf_tbl_abcd_nopk ALTER COLUMN c SET NOT NULL;
CREATE UNIQUE INDEX idx_abcd_nopk_c ON rf_tbl_abcd_nopk(c);
ALTER TABLE rf_tbl_abcd_nopk REPLICA IDENTITY USING INDEX idx_abcd_nopk_c;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (a > 99);
-- fail - "a" is in PK but it is not part of REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_pk WHERE (c > 99);
-- ok - "c" is not in PK but it is part of REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_pk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (a > 99);
-- fail - "a" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_nopk SET a = 1;
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_nopk WHERE (c > 99);
-- ok - "c" is part of REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_nopk SET a = 1;
-- Tests for partitioned table
-- set PUBLISH_VIA_PARTITION_ROOT to false and test row filter for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - cannot use row filter for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk WHERE (a > 99);
-- ok - can use row filter for partition
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 WHERE (a > 99);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- set PUBLISH_VIA_PARTITION_ROOT to true and test row filter for partitioned
-- table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use row filter for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk WHERE (a > 99);
-- ok - "a" is a PK col
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- fail - cannot set PUBLISH_VIA_PARTITION_ROOT to false if any row filter is
-- used for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- Now change the root filter to use a column "b"
-- (which is not in the replica identity)
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk_1 WHERE (b > 99);
-- ok - we don't have row filter for partitioned table.
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=0);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
-- set PUBLISH_VIA_PARTITION_ROOT to true
-- can use row filter for partitioned table
ALTER PUBLICATION testpub6 SET (PUBLISH_VIA_PARTITION_ROOT=1);
-- ok - can use row filter for partitioned table
ALTER PUBLICATION testpub6 SET TABLE rf_tbl_abcd_part_pk WHERE (b > 99);
-- fail - "b" is not in REPLICA IDENTITY INDEX
UPDATE rf_tbl_abcd_part_pk SET a = 1;
DROP PUBLICATION testpub6;
DROP TABLE rf_tbl_abcd_pk;
DROP TABLE rf_tbl_abcd_nopk;
DROP TABLE rf_tbl_abcd_part_pk;
-- ======================================================
-- Test cache invalidation FOR ALL TABLES publication
SET client_min_messages = 'ERROR';
CREATE TABLE testpub_tbl4(a int);

View File

@ -0,0 +1,695 @@
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
# Test logical replication behavior with row filtering
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
# create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $appname = 'tap_sub';
# ====================================================================
# Testcase start: FOR ALL TABLES
#
# The FOR ALL TABLES test must come first so that it is not affected by
# all the other test tables that are later created.
# create tables pub and sub
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rf_x (x int primary key)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rf_x (x int primary key)");
# insert some initial data
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rf_x (x) VALUES (0), (5), (10), (15), (20)");
# create pub/sub
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_x FOR TABLE tab_rf_x WHERE (x > 10)");
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_forall FOR ALL TABLES");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
);
$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# The subscription of the FOR ALL TABLES publication means there should be no
# filtering on the tablesync COPY, so all expect all 5 will be present.
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(x) FROM tab_rf_x");
is($result, qq(5),
'check initial data copy from table tab_rf_x should not be filtered');
# Similarly, the table filter for tab_rf_x (after the initial phase) has no
# effect when combined with the ALL TABLES.
# Expected: 5 initial rows + 2 new rows = 7 rows
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rf_x (x) VALUES (-99), (99)");
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(x) FROM tab_rf_x");
is($result, qq(7), 'check table tab_rf_x should not be filtered');
# cleanup pub
$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_forall");
$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_x");
$node_publisher->safe_psql('postgres', "DROP TABLE tab_rf_x");
# cleanup sub
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rf_x");
# Testcase end: FOR ALL TABLES
# ====================================================================
# ====================================================================
# Testcase start: ALL TABLES IN SCHEMA
#
# The ALL TABLES IN SCHEMA test is independent of all other test cases so it
# cleans up after itself.
# create tables pub and sub
$node_publisher->safe_psql('postgres', "CREATE SCHEMA schema_rf_x");
$node_publisher->safe_psql('postgres',
"CREATE TABLE schema_rf_x.tab_rf_x (x int primary key)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE schema_rf_x.tab_rf_partitioned (x int primary key) PARTITION BY RANGE(x)"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE public.tab_rf_partition (LIKE schema_rf_x.tab_rf_partitioned)"
);
$node_publisher->safe_psql('postgres',
"ALTER TABLE schema_rf_x.tab_rf_partitioned ATTACH PARTITION public.tab_rf_partition DEFAULT"
);
$node_subscriber->safe_psql('postgres', "CREATE SCHEMA schema_rf_x");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE schema_rf_x.tab_rf_x (x int primary key)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE schema_rf_x.tab_rf_partitioned (x int primary key) PARTITION BY RANGE(x)"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE public.tab_rf_partition (LIKE schema_rf_x.tab_rf_partitioned)"
);
$node_subscriber->safe_psql('postgres',
"ALTER TABLE schema_rf_x.tab_rf_partitioned ATTACH PARTITION public.tab_rf_partition DEFAULT"
);
# insert some initial data
$node_publisher->safe_psql('postgres',
"INSERT INTO schema_rf_x.tab_rf_x (x) VALUES (0), (5), (10), (15), (20)");
$node_publisher->safe_psql('postgres',
"INSERT INTO schema_rf_x.tab_rf_partitioned (x) VALUES (1), (20)");
# create pub/sub
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_x FOR TABLE schema_rf_x.tab_rf_x WHERE (x > 10)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_allinschema FOR ALL TABLES IN SCHEMA schema_rf_x"
);
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_allinschema ADD TABLE public.tab_rf_partition WHERE (x > 10)"
);
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
);
$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# The subscription of the ALL TABLES IN SCHEMA publication means there should be
# no filtering on the tablesync COPY, so expect all 5 will be present.
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(x) FROM schema_rf_x.tab_rf_x");
is($result, qq(5),
'check initial data copy from table tab_rf_x should not be filtered');
# Similarly, the table filter for tab_rf_x (after the initial phase) has no
# effect when combined with the ALL TABLES IN SCHEMA. Meanwhile, the filter for
# the tab_rf_partition does work because that partition belongs to a different
# schema (and publish_via_partition_root = false).
# Expected:
# tab_rf_x : 5 initial rows + 2 new rows = 7 rows
# tab_rf_partition : 1 initial row + 1 new row = 2 rows
$node_publisher->safe_psql('postgres',
"INSERT INTO schema_rf_x.tab_rf_x (x) VALUES (-99), (99)");
$node_publisher->safe_psql('postgres',
"INSERT INTO schema_rf_x.tab_rf_partitioned (x) VALUES (5), (25)");
$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(x) FROM schema_rf_x.tab_rf_x");
is($result, qq(7), 'check table tab_rf_x should not be filtered');
$result = $node_subscriber->safe_psql('postgres',
"SELECT * FROM public.tab_rf_partition");
is( $result, qq(20
25), 'check table tab_rf_partition should be filtered');
# cleanup pub
$node_publisher->safe_psql('postgres',
"DROP PUBLICATION tap_pub_allinschema");
$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_x");
$node_publisher->safe_psql('postgres', "DROP TABLE public.tab_rf_partition");
$node_publisher->safe_psql('postgres',
"DROP TABLE schema_rf_x.tab_rf_partitioned");
$node_publisher->safe_psql('postgres', "DROP TABLE schema_rf_x.tab_rf_x");
$node_publisher->safe_psql('postgres', "DROP SCHEMA schema_rf_x");
# cleanup sub
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
$node_subscriber->safe_psql('postgres', "DROP TABLE public.tab_rf_partition");
$node_subscriber->safe_psql('postgres',
"DROP TABLE schema_rf_x.tab_rf_partitioned");
$node_subscriber->safe_psql('postgres', "DROP TABLE schema_rf_x.tab_rf_x");
$node_subscriber->safe_psql('postgres', "DROP SCHEMA schema_rf_x");
# Testcase end: ALL TABLES IN SCHEMA
# ====================================================================
# ======================================================
# Testcase start: FOR TABLE with row filter publications
# setup structure on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_1 REPLICA IDENTITY FULL;");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_2 (c int primary key)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_4 (c int primary key)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
);
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_partitioned_2 (a int primary key, b integer) PARTITION BY RANGE(a)"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_partition (LIKE tab_rowfilter_partitioned_2)"
);
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_partitioned_2 ATTACH PARTITION tab_rowfilter_partition DEFAULT"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_toast (a text NOT NULL, b text NOT NULL)");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_toast ALTER COLUMN a SET STORAGE EXTERNAL");
$node_publisher->safe_psql('postgres',
"CREATE UNIQUE INDEX tab_rowfilter_toast_ri_index on tab_rowfilter_toast (a, b)"
);
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_toast REPLICA IDENTITY USING INDEX tab_rowfilter_toast_ri_index"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_inherited (a int)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_child (b text) INHERITS (tab_rowfilter_inherited)"
);
# setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_2 (c int primary key)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_4 (c int primary key)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
);
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_partitioned_2 (a int primary key, b integer) PARTITION BY RANGE(a)"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_partition (LIKE tab_rowfilter_partitioned_2)"
);
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_partitioned_2 ATTACH PARTITION tab_rowfilter_partition DEFAULT"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_toast (a text NOT NULL, b text NOT NULL)");
$node_subscriber->safe_psql('postgres',
"CREATE UNIQUE INDEX tab_rowfilter_toast_ri_index on tab_rowfilter_toast (a, b)"
);
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_rowfilter_toast REPLICA IDENTITY USING INDEX tab_rowfilter_toast_ri_index"
);
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_inherited (a int)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rowfilter_child (b text) INHERITS (tab_rowfilter_inherited)"
);
# setup logical replication
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"
);
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)"
);
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_rowfilter_partitioned");
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_not_used FOR TABLE tab_rowfilter_1 WHERE (a < 0)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_4a FOR TABLE tab_rowfilter_4 WHERE (c % 2 = 0)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_4b FOR TABLE tab_rowfilter_4");
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_5a FOR TABLE tab_rowfilter_partitioned_2");
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_5b FOR TABLE tab_rowfilter_partition WHERE (a > 10)"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_toast FOR TABLE tab_rowfilter_toast WHERE (a = repeat('1234567890', 200) AND b < '10')"
);
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_inherits FOR TABLE tab_rowfilter_inherited WHERE (a > 15)"
);
#
# The following INSERTs are executed before the CREATE SUBSCRIPTION, so these
# SQL commands are for testing the initial data copy using logical replication.
#
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x"
);
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"
);
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_4 (c) SELECT generate_series(1, 10)");
# insert data into partitioned table and directly on the partition
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(7000, 101),(15000, 102),(5500, 300)"
);
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(2, 200),(6005, 201)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 103)");
# insert data into partitioned table.
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_partitioned_2 (a, b) VALUES(1, 1),(20, 20)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_toast(a, b) VALUES(repeat('1234567890', 200), '1234567890')"
);
# insert data into parent and child table.
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_inherited(a) VALUES(10),(20)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')"
);
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits"
);
$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Check expected replicated rows for tab_rowfilter_1
# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
# - INSERT (1, 'not replicated') NO, because a is not > 1000
# - INSERT (1500, 'filtered') NO, because b == 'filtered'
# - INSERT (1980, 'not filtered') YES
# - generate_series(990,1002) YES, only for 1001,1002 because a > 1000
#
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
is( $result, qq(1001|test 1001
1002|test 1002
1980|not filtered), 'check initial data copy from table tab_rowfilter_1');
# Check expected replicated rows for tab_rowfilter_2
# tap_pub_1 filter is: (c % 2 = 0)
# tap_pub_2 filter is: (c % 3 = 0)
# When there are multiple publications for the same table, the filters
# expressions are OR'ed together. In this case, rows are replicated if
# c value is divided by 2 OR 3 (2, 3, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20)
#
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
is($result, qq(13|2|20),
'check initial data copy from table tab_rowfilter_2');
# Check expected replicated rows for tab_rowfilter_4
# (same table in two publications but only one has a filter).
# tap_pub_4a filter is: (c % 2 = 0)
# tap_pub_4b filter is: <no filter>
# Expressions are OR'ed together but when there is no filter it just means
# OR everything - e.g. same as no filter at all.
# Expect all rows: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(c), min(c), max(c) FROM tab_rowfilter_4");
is($result, qq(10|1|10),
'check initial data copy from table tab_rowfilter_4');
# Check expected replicated rows for tab_rowfilter_3
# There is no filter. 10 rows are inserted, so 10 rows are replicated.
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(a) FROM tab_rowfilter_3");
is($result, qq(10), 'check initial data copy from table tab_rowfilter_3');
# Check expected replicated rows for partitions
# publication option publish_via_partition_root is false so use the row filter
# from a partition
# tab_rowfilter_partitioned filter: (a < 5000)
# tab_rowfilter_less_10k filter: (a < 6000)
# tab_rowfilter_greater_10k filter: no filter
#
# INSERT into tab_rowfilter_partitioned:
# - INSERT (1,100) YES, because 1 < 6000
# - INSERT (7000, 101) NO, because 7000 is not < 6000
# - INSERT (15000, 102) YES, because tab_rowfilter_greater_10k has no filter
# - INSERT (5500, 300) YES, because 5500 < 6000
#
# INSERT directly into tab_rowfilter_less_10k:
# - INSERT (2, 200) YES, because 2 < 6000
# - INSERT (6005, 201) NO, because 6005 is not < 6000
#
# INSERT directly into tab_rowfilter_greater_10k:
# - INSERT (16000, 103) YES, because tab_rowfilter_greater_10k has no filter
#
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a, b FROM tab_rowfilter_less_10k ORDER BY 1, 2");
is( $result, qq(1|100
2|200
5500|300), 'check initial data copy from partition tab_rowfilter_less_10k');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a, b FROM tab_rowfilter_greater_10k ORDER BY 1, 2");
is( $result, qq(15000|102
16000|103), 'check initial data copy from partition tab_rowfilter_greater_10k'
);
# Check expected replicated rows for partitions
# publication option publish_via_partition_root is false so use the row filter
# from a partition
# tap_pub_5a filter: <no filter>
# tap_pub_5b filter: (a > 10)
# The parent table for this partition is published via tap_pub_5a, so there is
# no filter for the partition. And expressions are OR'ed together so it means
# OR everything - e.g. same as no filter at all.
# Expect all rows: (1, 1) and (20, 20)
#
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a, b FROM tab_rowfilter_partition ORDER BY 1, 2");
is( $result, qq(1|1
20|20), 'check initial data copy from partition tab_rowfilter_partition');
# Check expected replicated rows for tab_rowfilter_toast
# tab_rowfilter_toast filter: (a = repeat('1234567890', 200) AND b < '10')
# INSERT (repeat('1234567890', 200) ,'1234567890') NO
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rowfilter_toast");
is($result, qq(0), 'check initial data copy from table tab_rowfilter_toast');
# Check expected replicated rows for tab_rowfilter_inherited
# tab_rowfilter_inherited filter is: (a > 15)
# - INSERT (10) NO, 10 < 15
# - INSERT (20) YES, 20 > 15
# - INSERT (0, '0') NO, 0 < 15
# - INSERT (30, '30') YES, 30 > 15
# - INSERT (40, '40') YES, 40 > 15
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a FROM tab_rowfilter_inherited ORDER BY a");
is( $result, qq(20
30
40), 'check initial data copy from table tab_rowfilter_inherited');
# The following commands are executed after CREATE SUBSCRIPTION, so these SQL
# commands are for testing normal logical replication behavior.
#
# test row filter (INSERT, UPDATE, DELETE)
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1600, 'test 1600')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1601, 'test 1601')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1602, 'filtered')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1700, 'test 1700')");
$node_publisher->safe_psql('postgres',
"UPDATE tab_rowfilter_1 SET b = NULL WHERE a = 1600");
$node_publisher->safe_psql('postgres',
"UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601");
$node_publisher->safe_psql('postgres',
"UPDATE tab_rowfilter_1 SET b = 'test 1602 updated' WHERE a = 1602");
$node_publisher->safe_psql('postgres',
"DELETE FROM tab_rowfilter_1 WHERE a = 1700");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_2 (c) VALUES (21), (22), (23), (24), (25)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_4 (c) VALUES (0), (11), (12)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_inherited (a) VALUES (14), (16)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_child (a, b) VALUES (13, '13'), (17, '17')");
$node_publisher->wait_for_catchup($appname);
# Check expected replicated rows for tab_rowfilter_2
# tap_pub_1 filter is: (c % 2 = 0)
# tap_pub_2 filter is: (c % 3 = 0)
# When there are multiple publications for the same table, the filters
# expressions are OR'ed together. In this case, rows are replicated if
# c value is divided by 2 OR 3.
#
# Expect original rows (2, 3, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20)
# Plus (21, 22, 24)
#
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
is($result, qq(16|2|24), 'check replicated rows to tab_rowfilter_2');
# Check expected replicated rows for tab_rowfilter_4
# (same table in two publications but only one has a filter).
# tap_pub_4a filter is: (c % 2 = 0)
# tap_pub_4b filter is: <no filter>
# Expressions are OR'ed together but when there is no filter it just means
# OR everything - e.g. same as no filter at all.
# Expect all rows from initial copy: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
# And also (0, 11, 12)
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(c), min(c), max(c) FROM tab_rowfilter_4");
is($result, qq(13|0|12), 'check replicated rows to tab_rowfilter_4');
# Check expected replicated rows for tab_rowfilter_1
# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
#
# - 1001, 1002, 1980 already exist from initial data copy
# - INSERT (800, 'test 800') NO, because 800 is not > 1000
# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered',
# but row deleted after the update below.
# - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered'
# - INSERT (1602, 'filtered') NO, because b == 'filtered'
# - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered'
# - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered'
# - UPDATE (1601, 'test 1601 updated') YES, because 1601 > 1000 and 'test 1601 updated' <> 'filtered'
# - UPDATE (1602, 'test 1602 updated') YES, because 1602 > 1000 and 'test 1602 updated' <> 'filtered'
# - DELETE (1700) YES, because 1700 > 1000 and 'test 1700' <> 'filtered'
#
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
is( $result, qq(1001|test 1001
1002|test 1002
1601|test 1601 updated
1602|test 1602 updated
1980|not filtered), 'check replicated rows to table tab_rowfilter_1');
# Publish using root partitioned table
# Use a different partitioned table layout (exercise publish_via_partition_root)
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_3 SET (publish_via_partition_root = true)");
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_3 SET TABLE tab_rowfilter_partitioned WHERE (a < 5000), tab_rowfilter_less_10k WHERE (a < 6000)"
);
$node_subscriber->safe_psql('postgres',
"TRUNCATE TABLE tab_rowfilter_partitioned");
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
);
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(4500, 450)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(5600, 123)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(14000, 1950)");
$node_publisher->safe_psql('postgres',
"UPDATE tab_rowfilter_less_10k SET b = 30 WHERE a = 4001");
$node_publisher->safe_psql('postgres',
"DELETE FROM tab_rowfilter_less_10k WHERE a = 4002");
$node_publisher->wait_for_catchup($appname);
# Check expected replicated rows for partitions
# publication option publish_via_partition_root is true so use the row filter
# from the root partitioned table
# tab_rowfilter_partitioned filter: (a < 5000)
# tab_rowfilter_less_10k filter: (a < 6000)
# tab_rowfilter_greater_10k filter: no filter
#
# After TRUNCATE, REFRESH PUBLICATION, the initial data copy will apply the
# partitioned table row filter.
# - INSERT (1, 100) YES, 1 < 5000
# - INSERT (7000, 101) NO, 7000 is not < 5000
# - INSERT (15000, 102) NO, 15000 is not < 5000
# - INSERT (5500, 300) NO, 5500 is not < 5000
# - INSERT (2, 200) YES, 2 < 5000
# - INSERT (6005, 201) NO, 6005 is not < 5000
# - INSERT (16000, 103) NO, 16000 is not < 5000
#
# Execute SQL commands after initial data copy for testing the logical
# replication behavior.
# - INSERT (4000, 400) YES, 4000 < 5000
# - INSERT (4001, 401) YES, 4001 < 5000
# - INSERT (4002, 402) YES, 4002 < 5000
# - INSERT (4500, 450) YES, 4500 < 5000
# - INSERT (5600, 123) NO, 5600 is not < 5000
# - INSERT (14000, 1950) NO, 16000 is not < 5000
# - UPDATE (4001) YES, 4001 < 5000
# - DELETE (4002) YES, 4002 < 5000
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a, b FROM tab_rowfilter_partitioned ORDER BY 1, 2");
is( $result, qq(1|100
2|200
4000|400
4001|30
4500|450), 'check publish_via_partition_root behavior');
# Check expected replicated rows for tab_rowfilter_inherited and
# tab_rowfilter_child.
# tab_rowfilter_inherited filter is: (a > 15)
# - INSERT (14) NO, 14 < 15
# - INSERT (16) YES, 16 > 15
#
# tab_rowfilter_child filter is: (a > 15)
# - INSERT (13, '13') NO, 13 < 15
# - INSERT (17, '17') YES, 17 > 15
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a FROM tab_rowfilter_inherited ORDER BY a");
is( $result, qq(16
17
20
30
40),
'check replicated rows to tab_rowfilter_inherited and tab_rowfilter_child'
);
# UPDATE the non-toasted column for table tab_rowfilter_toast
$node_publisher->safe_psql('postgres',
"UPDATE tab_rowfilter_toast SET b = '1'");
# Check expected replicated rows for tab_rowfilter_toast
# tab_rowfilter_toast filter: (a = repeat('1234567890', 200) AND b < '10')
# UPDATE old (repeat('1234567890', 200) ,'1234567890') NO
# new: (repeat('1234567890', 200) ,'1') YES
$result =
$node_subscriber->safe_psql('postgres',
"SELECT a = repeat('1234567890', 200), b FROM tab_rowfilter_toast");
is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
# Testcase end: FOR TABLE with row filter publications
# ======================================================
$node_subscriber->stop('fast');
$node_publisher->stop('fast');
done_testing();

View File

@ -2053,6 +2053,7 @@ PsqlScanStateData
PsqlSettings
Publication
PublicationActions
PublicationDesc
PublicationInfo
PublicationObjSpec
PublicationObjSpecType
@ -2199,6 +2200,7 @@ ReorderBufferApplyChangeCB
ReorderBufferApplyTruncateCB
ReorderBufferBeginCB
ReorderBufferChange
ReorderBufferChangeType
ReorderBufferCommitCB
ReorderBufferCommitPreparedCB
ReorderBufferDiskChange
@ -3506,6 +3508,7 @@ replace_rte_variables_context
ret_type
rewind_source
rewrite_event
rf_context
rijndael_ctx
rm_detail_t
role_auth_extra