Allow users to skip logical replication of data having origin.

This patch adds a new SUBSCRIPTION parameter "origin". It specifies
whether the subscription will request the publisher to only send changes
that don't have an origin or send changes regardless of origin. Setting it
to "none" means that the subscription will request the publisher to only
send changes that have no origin associated. Setting it to "any" means
that the publisher sends changes regardless of their origin. The default
is "any".
Usage:
CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'
PUBLICATION pub1 WITH (origin = none);

This can be used to avoid loops (infinite replication of the same data)
among replication nodes.

This feature allows filtering only the replication data originating from
WAL but for initial sync (initial copy of table data) we don't have such a
facility as we can only distinguish the data based on origin from WAL. As
a follow-up patch, we are planning to forbid the initial sync if the
origin is specified as none and we notice that the publication tables were
also replicated from other publishers to avoid duplicate data or loops.

We forbid to allow creating origin with names 'none' and 'any' to avoid
confusion with the same name options.

Author: Vignesh C, Amit Kapila
Reviewed-By: Peter Smith, Amit Kapila, Dilip Kumar, Shi yu, Ashutosh Bapat, Hayato Kuroda
Discussion: https://postgr.es/m/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com
This commit is contained in:
Amit Kapila 2022-07-21 08:47:38 +05:30
parent f2d0c7f18b
commit 366283961a
24 changed files with 463 additions and 78 deletions

View File

@ -56,6 +56,16 @@ SELECT pg_replication_origin_drop('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
ERROR: replication origin "regress_test_decoding: temp" does not exist
-- specifying reserved origin names is not supported
SELECT pg_replication_origin_create('any');
ERROR: replication origin name "any" is reserved
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
SELECT pg_replication_origin_create('none');
ERROR: replication origin name "none" is reserved
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
SELECT pg_replication_origin_create('pg_replication_origin');
ERROR: replication origin name "pg_replication_origin" is reserved
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
-- various failure checks for undefined slots
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
ERROR: replication origin "regress_test_decoding: temp" does not exist

View File

@ -31,6 +31,11 @@ SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
-- specifying reserved origin names is not supported
SELECT pg_replication_origin_create('any');
SELECT pg_replication_origin_create('none');
SELECT pg_replication_origin_create('pg_replication_origin');
-- various failure checks for undefined slots
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
select pg_replication_origin_session_setup('regress_test_decoding: temp');

View File

@ -7943,6 +7943,20 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
see <xref linkend="logical-replication-publication"/>.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>suborigin</structfield> <type>text</type>
</para>
<para>
The origin value must be either <literal>none</literal> or
<literal>any</literal>. The default is <literal>any</literal>.
If <literal>none</literal>, the subscription will request the publisher
to only send changes that don't have an origin. If
<literal>any</literal>, the publisher sends changes regardless of their
origin.
</para></entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
information. The parameters that can be altered
are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>,
<literal>binary</literal>, <literal>streaming</literal>, and
<literal>disable_on_error</literal>.
<literal>binary</literal>, <literal>streaming</literal>,
<literal>disable_on_error</literal>, and
<literal>origin</literal>.
</para>
</listitem>
</varlistentry>

View File

@ -302,6 +302,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>origin</literal> (<type>string</type>)</term>
<listitem>
<para>
Specifies whether the subscription will request the publisher to only
send changes that don't have an origin or send changes regardless of
origin. Setting <literal>origin</literal> to <literal>none</literal>
means that the subscription will request the publisher to only send
changes that don't have an origin. Setting <literal>origin</literal>
to <literal>any</literal> means that the publisher sends changes
regardless of their origin. The default is <literal>any</literal>.
</para>
</listitem>
</varlistentry>
</variablelist></para>
</listitem>

View File

@ -106,6 +106,14 @@ GetSubscription(Oid subid, bool missing_ok)
Assert(!isnull);
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
/* Get origin */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
tup,
Anum_pg_subscription_suborigin,
&isnull);
Assert(!isnull);
sub->origin = TextDatumGetCString(datum);
ReleaseSysCache(tup);
return sub;

View File

@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
subsynccommit, subpublications)
subbinary, substream, subtwophasestate, subdisableonerr,
subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS

View File

@ -64,6 +64,7 @@
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_LSN 0x00000800
#define SUBOPT_ORIGIN 0x00001000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@ -86,6 +87,7 @@ typedef struct SubOpts
bool streaming;
bool twophase;
bool disableonerr;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_COPY_DATA));
/* Set default values for the boolean supported options. */
/* Set default values for the supported options. */
if (IsSet(supported_opts, SUBOPT_CONNECT))
opts->connect = true;
if (IsSet(supported_opts, SUBOPT_ENABLED))
@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
opts->disableonerr = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
/* Parse options */
foreach(lc, stmt_options)
@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
opts->disableonerr = defGetBoolean(defel);
}
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_ORIGIN;
pfree(opts->origin);
/*
* Even though the "origin" parameter allows only "none" and "any"
* values, it is implemented as a string type so that the
* parameter can be extended in future versions to support
* filtering using origin names specified by the user.
*/
opts->origin = defGetString(defel);
if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
(pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized origin value: \"%s\"", opts->origin));
}
else if (IsSet(supported_opts, SUBOPT_LSN) &&
strcmp(defel->defname, "lsn") == 0)
{
@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR);
SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CStringGetTextDatum(opts.synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
= true;
}
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
replaces[Anum_pg_subscription_suborigin - 1] = true;
}
update_tuple = true;
break;
}

View File

@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
PQserverVersion(conn->streamConn) >= 150000)
appendStringInfoString(&cmd, ", two_phase 'on'");
if (options->proto.logical.origin &&
PQserverVersion(conn->streamConn) >= 160000)
appendStringInfo(&cmd, ", origin '%s'",
options->proto.logical.origin);
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)

View File

@ -77,6 +77,7 @@
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
}
/*
* IsReservedOriginName
* True iff name is either "none" or "any".
*/
static bool
IsReservedOriginName(const char *name)
{
return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
(pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
}
/* ---------------------------------------------------------------------------
* Functions for working with replication origins themselves.
* ---------------------------------------------------------------------------
@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
/* Replication origins "pg_xxx" are reserved for internal use */
if (IsReservedName(name))
/*
* Replication origins "any and "none" are reserved for system options.
* The origins "pg_xxx" are reserved for internal use.
*/
if (IsReservedName(name) || IsReservedOriginName(name))
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("replication origin name \"%s\" is reserved",
name),
errdetail("Origin names starting with \"pg_\" are reserved.")));
errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
/*
* If built with appropriate switch, whine when regression-testing

View File

@ -3077,6 +3077,7 @@ maybe_reread_subscription(void)
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
strcmp(newsub->origin, MySubscription->origin) != 0 ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg)
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
options.proto.logical.twophase = false;
options.proto.logical.origin = pstrdup(MySubscription->origin);
if (!am_tablesync_worker())
{

View File

@ -16,6 +16,7 @@
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
#include "executor/executor.h"
#include "fmgr.h"
@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool messages_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
bool origin_option_given = false;
data->binary = false;
data->streaming = false;
@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data)
data->two_phase = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "origin") == 0)
{
if (origin_option_given)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"));
origin_option_given = true;
data->origin = defGetString(defel);
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
publish_no_origin = true;
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
publish_no_origin = false;
else
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized origin value: \"%s\"", data->origin));
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
* Currently we always forward.
* Return true if the data is associated with an origin and the user has
* requested the changes that don't have an origin, false otherwise.
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
if (publish_no_origin && origin_id != InvalidRepOriginId)
return true;
return false;
}

View File

@ -4412,6 +4412,7 @@ getSubscriptions(Archive *fout)
int i_substream;
int i_subtwophasestate;
int i_subdisableonerr;
int i_suborigin;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
@ -4461,13 +4462,18 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
" s.subtwophasestate,\n"
" s.subdisableonerr\n");
" s.subdisableonerr,\n");
else
appendPQExpBuffer(query,
" '%c' AS subtwophasestate,\n"
" false AS subdisableonerr\n",
" false AS subdisableonerr,\n",
LOGICALREP_TWOPHASE_STATE_DISABLED);
if (fout->remoteVersion >= 160000)
appendPQExpBufferStr(query, " s.suborigin\n");
else
appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
appendPQExpBufferStr(query,
"FROM pg_subscription s\n"
"WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@ -4493,6 +4499,7 @@ getSubscriptions(Archive *fout)
i_substream = PQfnumber(res, "substream");
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
i_subdisableonerr = PQfnumber(res, "subdisableonerr");
i_suborigin = PQfnumber(res, "suborigin");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@ -4522,6 +4529,7 @@ getSubscriptions(Archive *fout)
pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
subinfo[i].subdisableonerr =
pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@ -4595,6 +4603,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
if (strcmp(subinfo->subdisableonerr, "t") == 0)
appendPQExpBufferStr(query, ", disable_on_error = true");
if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));

View File

@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo
char *substream;
char *subtwophasestate;
char *subdisableonerr;
char *suborigin;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;

View File

@ -2465,6 +2465,28 @@ my %tests = (
like => { %full_runs, section_post_data => 1, },
},
'CREATE SUBSCRIPTION sub2' => {
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub2
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false, origin = none);',
regexp => qr/^
\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
'CREATE SUBSCRIPTION sub3' => {
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false, origin = any);',
regexp => qr/^
\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
'ALTER PUBLICATION pub1 ADD TABLE test_table' => {
create_order => 51,
create_sql =>

View File

@ -6469,7 +6469,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false};
false, false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@ -6511,6 +6511,11 @@ describeSubscriptions(const char *pattern, bool verbose)
gettext_noop("Two-phase commit"),
gettext_noop("Disable on error"));
if (pset.sversion >= 160000)
appendPQExpBuffer(&buf,
", suborigin AS \"%s\"\n",
gettext_noop("Origin"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
", subconninfo AS \"%s\"\n",

View File

@ -1873,7 +1873,8 @@ psql_completion(const char *text, int start, int end)
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
COMPLETE_WITH("binary", "disable_on_error", "slot_name", "streaming", "synchronous_commit");
COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
"streaming", "synchronous_commit");
/* ALTER SUBSCRIPTION <name> SKIP ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
COMPLETE_WITH("lsn");
@ -3152,8 +3153,8 @@ psql_completion(const char *text, int start, int end)
/* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
"disable_on_error", "enabled", "slot_name", "streaming",
"synchronous_commit", "two_phase");
"disable_on_error", "enabled", "origin", "slot_name",
"streaming", "synchronous_commit", "two_phase");
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202207201
#define CATALOG_VERSION_NO 202207211
#endif

View File

@ -31,6 +31,18 @@
#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
/*
* The subscription will request the publisher to only send changes that do not
* have any origin.
*/
#define LOGICALREP_ORIGIN_NONE "none"
/*
* The subscription will request the publisher to send changes regardless
* of their origin.
*/
#define LOGICALREP_ORIGIN_ANY "any"
/* ----------------
* pg_subscription definition. cpp turns this into
* typedef struct FormData_pg_subscription
@ -87,6 +99,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
/* List of publications subscribed to */
text subpublications[1] BKI_FORCE_NOT_NULL;
/* Only publish data originating from the specified origin */
text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
#endif
} FormData_pg_subscription;
@ -118,6 +133,8 @@ typedef struct Subscription
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
char *origin; /* Only publish data originating from the
* specified origin */
} Subscription;
extern Subscription *GetSubscription(Oid subid, bool missing_ok);

View File

@ -29,6 +29,7 @@ typedef struct PGOutputData
bool streaming;
bool messages;
bool two_phase;
char *origin;
} PGOutputData;
#endif /* PGOUTPUT_H */

View File

@ -183,6 +183,8 @@ typedef struct
bool streaming; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
char *origin; /* Only publish data originating from the
* specified origin */
} logical;
} proto;
} WalRcvStreamOptions;

View File

@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
ERROR: cannot enable subscription that does not have a slot name
ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
-- fail - origin must be either none or any
CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
ERROR: unrecognized origin value: "foo"
-- now it works
CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+ regress_testsub4
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | none | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
DROP SUBSCRIPTION regress_testsub4;
-- fail - invalid connection string
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -96,10 +118,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@ -179,19 +201,19 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@ -202,19 +224,19 @@ ERROR: streaming requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more then once
@ -247,10 +269,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@ -284,10 +306,10 @@ ERROR: two_phase requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@ -296,10 +318,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -323,18 +345,18 @@ ERROR: disable_on_error requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

View File

@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU
ALTER SUBSCRIPTION regress_testsub3 ENABLE;
ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
-- fail - origin must be either none or any
CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo);
-- now it works
CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = none);
\dRs+ regress_testsub4
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
DROP SUBSCRIPTION regress_testsub3;
DROP SUBSCRIPTION regress_testsub4;
-- fail - invalid connection string
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';

View File

@ -0,0 +1,155 @@
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
# Test the CREATE SUBSCRIPTION 'origin' parameter.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
###############################################################################
# Setup a bidirectional logical replication between node_A & node_B
###############################################################################
# Initialize nodes
# node_A
my $node_A = PostgreSQL::Test::Cluster->new('node_A');
$node_A->init(allows_streaming => 'logical');
$node_A->start;
# node_B
my $node_B = PostgreSQL::Test::Cluster->new('node_B');
$node_B->init(allows_streaming => 'logical');
$node_B->start;
# Create table on node_A
$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
# Create the same table on node_B
$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
# Setup logical replication
# node_A (pub) -> node_B (sub)
my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
my $appname_B1 = 'tap_sub_B1';
$node_B->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub_B1
CONNECTION '$node_A_connstr application_name=$appname_B1'
PUBLICATION tap_pub_A
WITH (origin = none)");
# node_B (pub) -> node_A (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
my $appname_A = 'tap_sub_A';
$node_A->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub_A
CONNECTION '$node_B_connstr application_name=$appname_A'
PUBLICATION tap_pub_B
WITH (origin = none, copy_data = off)");
# Wait for subscribers to finish initialization
$node_A->wait_for_catchup($appname_B1);
$node_B->wait_for_catchup($appname_A);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_A->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
$node_B->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
is(1, 1, 'Bidirectional replication setup is complete');
my $result;
###############################################################################
# Check that bidirectional logical replication setup does not cause infinite
# recursive insertion.
###############################################################################
# insert a record
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
$node_A->wait_for_catchup($appname_B1);
$node_B->wait_for_catchup($appname_A);
# check that transaction was committed on subscriber(s)
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is( $result, qq(11
21),
'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
);
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is( $result, qq(11
21),
'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
);
$node_A->safe_psql('postgres', "DELETE FROM tab;");
$node_A->wait_for_catchup($appname_B1);
$node_B->wait_for_catchup($appname_A);
###############################################################################
# Check that remote data of node_B (that originated from node_C) is not
# published to node_A.
###############################################################################
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(), 'Check existing data');
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(), 'Check existing data');
# Initialize node node_C
my $node_C = PostgreSQL::Test::Cluster->new('node_C');
$node_C->init(allows_streaming => 'logical');
$node_C->start;
$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
# Setup logical replication
# node_C (pub) -> node_B (sub)
my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
my $appname_B2 = 'tap_sub_B2';
$node_B->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub_B2
CONNECTION '$node_C_connstr application_name=$appname_B2'
PUBLICATION tap_pub_C
WITH (origin = none)");
$node_C->wait_for_catchup($appname_B2);
$node_B->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# insert a record
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
$node_C->wait_for_catchup($appname_B2);
$node_B->wait_for_catchup($appname_A);
$node_A->wait_for_catchup($appname_B1);
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(32), 'The node_C data replicated to node_B');
# check that the data published from node_C to node_B is not sent to node_A
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(),
'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
);
# shutdown
$node_B->stop('fast');
$node_A->stop('fast');
$node_C->stop('fast');
done_testing();