diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 83987a9904..7777d60514 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7769,6 +7769,16 @@ SCRAM-SHA-256$<iteration count>:&l
+
+
+ subdisableonerr bool
+
+
+ If true, the subscription will be disabled if one of its workers
+ detects an error
+
+
+
subconninfo text
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 82326c3901..6431d4796d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -364,8 +364,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
the replication origin name can be found from the server log (LSN 0/14C0378 and
replication origin pg_16395 in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
- ALTER SUBSCRIPTION ... DISABLE first. Then, the transaction
- can be skipped by calling the
+ ALTER SUBSCRIPTION ... DISABLE first or alternatively, the
+ subscription can be used with the disable_on_error option.
+ Then, the transaction can be skipped by calling the
pg_replication_origin_advance() function with
the node_name (i.e., pg_16395) and the
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0d6f064f58..58b78a94ea 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -204,8 +204,8 @@ ALTER SUBSCRIPTION name RENAME TO <
information. The parameters that can be altered
are slot_name,
synchronous_commit,
- binary, and
- streaming.
+ binary, streaming, and
+ disable_on_error.
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e80a2617a3..b701752fc9 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -290,6 +290,18 @@ CREATE SUBSCRIPTION subscription_name
+
+
+ disable_on_error (boolean)
+
+
+ Specifies whether the subscription should be automatically disabled
+ if any errors are detected by subscription workers during data
+ replication from the publisher. The default is
+ false.
+
+
+
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd20..a6304f5f81 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->binary = subform->subbinary;
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
+ sub->disableonerr = subform->subdisableonerr;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -156,6 +157,45 @@ FreeSubscription(Subscription *sub)
pfree(sub);
}
+/*
+ * Disable the given subscription.
+ */
+void
+DisableSubscription(Oid subid)
+{
+ Relation rel;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+ HeapTuple tup;
+
+ /* Look up the subscription in the catalog */
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* Set the subscription to disabled. */
+ values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+ replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+ /* Update the catalog */
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+ heap_freetuple(tup);
+
+ table_close(rel, NoLock);
+}
+
/*
* get_subscription_oid - given a subscription name, look up the OID
*
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca5a9..bb1ac30cd1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
- substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+ substream, subtwophasestate, subdisableonerr, subslotname,
+ subsynccommit, subpublications)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d24..3922658bbc 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
+#define SUBOPT_DISABLE_ON_ERR 0x00000400
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
bool binary;
bool streaming;
bool twophase;
+ bool disableonerr;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->streaming = false;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
+ if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+ opts->disableonerr = false;
/* Parse options */
foreach(lc, stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
opts->twophase = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+ strcmp(defel->defname, "disable_on_error") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+ opts->disableonerr = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
+ values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING);
+ SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_substream - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+ {
+ values[Anum_pg_subscription_subdisableonerr - 1]
+ = BoolGetDatum(opts.disableonerr);
+ replaces[Anum_pg_subscription_subdisableonerr - 1]
+ = true;
+ }
+
update_tuple = true;
break;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8653e1d840..a1fe81b34f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -305,6 +305,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void);
+static void DisableSubscriptionAndExit(void);
+
/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
@@ -3374,6 +3376,84 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+ char *syncslotname;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+ pfree(syncslotname);
+}
+
+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ *
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
+ */
+static void
+start_apply(XLogRecPtr origin_startpos)
+{
+ PG_TRY();
+ {
+ LogicalRepApplyLoop(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed while applying changes. Abort the
+ * current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -3381,8 +3461,8 @@ ApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *myslotname = NULL;
WalRcvStreamOptions options;
int server_version;
@@ -3477,32 +3557,7 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
- char *syncslotname;
-
- PG_TRY();
- {
- /* This is table synchronization worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message
- * in an idle state.
- */
- AbortOutOfAnyTransaction();
-
- /* Report the worker failed during table synchronization */
- pgstat_report_subscription_error(MySubscription->oid, false);
-
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- /* allocate slot name in long-lived context */
- myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-
- pfree(syncslotname);
+ start_table_sync(&origin_startpos, &myslotname);
/*
* Allocate the origin name in long-lived context for error context
@@ -3633,24 +3688,43 @@ ApplyWorkerMain(Datum main_arg)
}
/* Run the main loop. */
- PG_TRY();
- {
- LogicalRepApplyLoop(origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message in
- * an idle state.
- */
- AbortOutOfAnyTransaction();
+ start_apply(origin_startpos);
- /* Report the worker failed while applying changes */
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ proc_exit(0);
+}
- PG_RE_THROW();
- }
- PG_END_TRY();
+/*
+ * After error recovery, disable the subscription in a new transaction
+ * and exit cleanly.
+ */
+static void
+DisableSubscriptionAndExit(void)
+{
+ /*
+ * Emit the error message, and recover from the error state to an idle
+ * state
+ */
+ HOLD_INTERRUPTS();
+
+ EmitErrorReport();
+ AbortOutOfAnyTransaction();
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ /* Report the worker failed during either table synchronization or apply */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ !am_tablesync_worker());
+
+ /* Disable the subscription */
+ StartTransactionCommand();
+ DisableSubscription(MySubscription->oid);
+ CommitTransactionCommand();
+
+ /* Notify the subscription has been disabled and exit */
+ ereport(LOG,
+ errmsg("logical replication subscription \"%s\" has been disabled due to an error",
+ MySubscription->name));
proc_exit(0);
}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e69dcf8a48..4dd24b8c89 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4293,6 +4293,7 @@ getSubscriptions(Archive *fout)
int i_subowner;
int i_substream;
int i_subtwophasestate;
+ int i_subdisableonerr;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
@@ -4340,10 +4341,13 @@ getSubscriptions(Archive *fout)
appendPQExpBufferStr(query, " false AS substream,\n");
if (fout->remoteVersion >= 150000)
- appendPQExpBufferStr(query, " s.subtwophasestate\n");
+ appendPQExpBufferStr(query,
+ " s.subtwophasestate,\n"
+ " s.subdisableonerr\n");
else
appendPQExpBuffer(query,
- " '%c' AS subtwophasestate\n",
+ " '%c' AS subtwophasestate,\n"
+ " false AS subdisableonerr\n",
LOGICALREP_TWOPHASE_STATE_DISABLED);
appendPQExpBufferStr(query,
@@ -4366,6 +4370,7 @@ getSubscriptions(Archive *fout)
i_subbinary = PQfnumber(res, "subbinary");
i_substream = PQfnumber(res, "substream");
i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+ i_subdisableonerr = PQfnumber(res, "subdisableonerr");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -4393,6 +4398,8 @@ getSubscriptions(Archive *fout)
pg_strdup(PQgetvalue(res, i, i_substream));
subinfo[i].subtwophasestate =
pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+ subinfo[i].subdisableonerr =
+ pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4463,6 +4470,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");
+ if (strcmp(subinfo->subdisableonerr, "t") == 0)
+ appendPQExpBufferStr(query, ", disable_on_error = true");
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 997a3b6071..772dc0cf7a 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -657,6 +657,7 @@ typedef struct _SubscriptionInfo
char *subbinary;
char *substream;
char *subtwophasestate;
+ char *subdisableonerr;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index e3382933d9..9229eacb6d 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false, false, false};
+ false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6118,11 +6118,13 @@ describeSubscriptions(const char *pattern, bool verbose)
gettext_noop("Binary"),
gettext_noop("Streaming"));
- /* Two_phase is only supported in v15 and higher */
+ /* Two_phase and disable_on_error are only supported in v15 and higher */
if (pset.sversion >= 150000)
appendPQExpBuffer(&buf,
- ", subtwophasestate AS \"%s\"\n",
- gettext_noop("Two phase commit"));
+ ", subtwophasestate AS \"%s\"\n"
+ ", subdisableonerr AS \"%s\"\n",
+ gettext_noop("Two phase commit"),
+ gettext_noop("Disable on error"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6d5c928c10..17172827a9 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1834,7 +1834,7 @@ psql_completion(const char *text, int start, int end)
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION SET ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
- COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+ COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
/* ALTER SUBSCRIPTION SET PUBLICATION */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
{
@@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end)
else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
"enabled", "slot_name", "streaming",
- "synchronous_commit", "two_phase");
+ "synchronous_commit", "two_phase", "disable_on_error");
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 695990959e..1aa1d41e79 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202203031
+#define CATALOG_VERSION_NO 202203141
#endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c291289f..e2befaf351 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
char subtwophasestate; /* Stream two-phase transactions */
+ bool subdisableonerr; /* True if a worker error should cause the
+ * subscription to be disabled */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,9 @@ typedef struct Subscription
* binary format */
bool stream; /* Allow streaming in-progress transactions. */
char twophasestate; /* Allow streaming two-phase transactions */
+ bool disableonerr; /* Indicates if the subscription should be
+ * automatically disabled if a worker error
+ * occurs */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
@@ -111,6 +117,7 @@ typedef struct Subscription
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern void FreeSubscription(Subscription *sub);
+extern void DisableSubscription(Oid subid);
extern Oid get_subscription_oid(const char *subname, bool missing_ok);
extern char *get_subscription_name(Oid subid, bool missing_ok);
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562..ad8003fae1 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR: streaming requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR: two_phase requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist
(1 row)
--fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,33 @@ DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+ERROR: disable_on_error requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af1e4..a7c15b1daf 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -228,6 +228,21 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
new file mode 100644
index 0000000000..5eca804446
--- /dev/null
+++ b/src/test/subscription/t/029_disable_on_error.pl
@@ -0,0 +1,94 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical table on both nodes.
+$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+
+# Insert duplicate values on the publisher.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl (i) VALUES (1), (1), (1)");
+
+# Create an additional unique index on the subscriber.
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Drop the unique index on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+ "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
+is($result, qq(1|3), "subscription sub replicated data");
+
+# Delete the data from the subscriber and recreate the unique index.
+$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Add more non-unique data to the publisher.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl (i) VALUES (3), (3), (3)");
+
+# Apply failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscription sub to be disabled";
+
+# Drop the unique index on the subscriber and re-enabled the subscription. Then
+# confirm that the previously failing insert was applied OK.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT COUNT(*) FROM tbl WHERE i = 3");
+is($result, qq(3), 'check the result of apply');
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();