Raise a WARNING for missing publications.
When we create or alter a subscription to add publications raise a warning for non-existent publications. We don't want to give an error here because it is possible that users can later create the missing publications. Author: Vignesh C Reviewed-by: Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma, Amit Kapila Discussion: https://postgr.es/m/CALDaNm0f4YujGW+q-Di0CbZpnQKFFrXntikaQQKuEmGG0=Zw=Q@mail.gmail.com
This commit is contained in:
parent
8ac4c25a05
commit
8f2e2bbf14
|
@ -114,7 +114,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
|
||||||
replaces the entire list of publications with a new list,
|
replaces the entire list of publications with a new list,
|
||||||
<literal>ADD</literal> adds additional publications to the list of
|
<literal>ADD</literal> adds additional publications to the list of
|
||||||
publications, and <literal>DROP</literal> removes the publications from
|
publications, and <literal>DROP</literal> removes the publications from
|
||||||
the list of publications. See <xref linkend="sql-createsubscription"/>
|
the list of publications. We allow non-existent publications to be
|
||||||
|
specified in <literal>ADD</literal> and <literal>SET</literal> variants
|
||||||
|
so that users can add those later. See <xref linkend="sql-createsubscription"/>
|
||||||
for more information. By default, this command will also act like
|
for more information. By default, this command will also act like
|
||||||
<literal>REFRESH PUBLICATION</literal>.
|
<literal>REFRESH PUBLICATION</literal>.
|
||||||
</para>
|
</para>
|
||||||
|
|
|
@ -356,6 +356,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
|
||||||
copied data that would be incompatible with subsequent filtering.
|
copied data that would be incompatible with subsequent filtering.
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
We allow non-existent publications to be specified so that users can add
|
||||||
|
those later. This means
|
||||||
|
<link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
|
||||||
|
can have non-existent publications.
|
||||||
|
</para>
|
||||||
|
|
||||||
</refsect1>
|
</refsect1>
|
||||||
|
|
||||||
<refsect1>
|
<refsect1>
|
||||||
|
|
|
@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Add publication names from the list to a string.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
get_publications_str(List *publications, StringInfo dest, bool quote_literal)
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
bool first = true;
|
||||||
|
|
||||||
|
Assert(list_length(publications) > 0);
|
||||||
|
|
||||||
|
foreach(lc, publications)
|
||||||
|
{
|
||||||
|
char *pubname = strVal(lfirst(lc));
|
||||||
|
|
||||||
|
if (first)
|
||||||
|
first = false;
|
||||||
|
else
|
||||||
|
appendStringInfoString(dest, ", ");
|
||||||
|
|
||||||
|
if (quote_literal)
|
||||||
|
appendStringInfoString(dest, quote_literal_cstr(pubname));
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfoChar(dest, '"');
|
||||||
|
appendStringInfoString(dest, pubname);
|
||||||
|
appendStringInfoChar(dest, '"');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check the specified publication(s) is(are) present in the publisher.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
check_publications(WalReceiverConn *wrconn, List *publications)
|
||||||
|
{
|
||||||
|
WalRcvExecResult *res;
|
||||||
|
StringInfo cmd;
|
||||||
|
TupleTableSlot *slot;
|
||||||
|
List *publicationsCopy = NIL;
|
||||||
|
Oid tableRow[1] = {TEXTOID};
|
||||||
|
|
||||||
|
cmd = makeStringInfo();
|
||||||
|
appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
|
||||||
|
" pg_catalog.pg_publication t WHERE\n"
|
||||||
|
" t.pubname IN (");
|
||||||
|
get_publications_str(publications, cmd, true);
|
||||||
|
appendStringInfoChar(cmd, ')');
|
||||||
|
|
||||||
|
res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
|
||||||
|
pfree(cmd->data);
|
||||||
|
pfree(cmd);
|
||||||
|
|
||||||
|
if (res->status != WALRCV_OK_TUPLES)
|
||||||
|
ereport(ERROR,
|
||||||
|
errmsg_plural("could not receive publication from the publisher: %s",
|
||||||
|
"could not receive list of publications from the publisher: %s",
|
||||||
|
list_length(publications),
|
||||||
|
res->err));
|
||||||
|
|
||||||
|
publicationsCopy = list_copy(publications);
|
||||||
|
|
||||||
|
/* Process publication(s). */
|
||||||
|
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
|
||||||
|
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
||||||
|
{
|
||||||
|
char *pubname;
|
||||||
|
bool isnull;
|
||||||
|
|
||||||
|
pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
|
||||||
|
Assert(!isnull);
|
||||||
|
|
||||||
|
/* Delete the publication present in publisher from the list. */
|
||||||
|
publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
|
||||||
|
ExecClearTuple(slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecDropSingleTupleTableSlot(slot);
|
||||||
|
|
||||||
|
walrcv_clear_result(res);
|
||||||
|
|
||||||
|
if (list_length(publicationsCopy))
|
||||||
|
{
|
||||||
|
/* Prepare the list of non-existent publication(s) for error message. */
|
||||||
|
StringInfo pubnames = makeStringInfo();
|
||||||
|
|
||||||
|
get_publications_str(publicationsCopy, pubnames, false);
|
||||||
|
ereport(WARNING,
|
||||||
|
errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||||
|
errmsg_plural("publication %s does not exist in the publisher",
|
||||||
|
"publications %s do not exist in the publisher",
|
||||||
|
list_length(publicationsCopy),
|
||||||
|
pubnames->data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Auxiliary function to build a text array out of a list of String nodes.
|
* Auxiliary function to build a text array out of a list of String nodes.
|
||||||
*/
|
*/
|
||||||
|
@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
|
check_publications(wrconn, publications);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set sync state based on if we were asked to do data copy or
|
* Set sync state based on if we were asked to do data copy or
|
||||||
* not.
|
* not.
|
||||||
|
@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
AlterSubscription_refresh(Subscription *sub, bool copy_data)
|
AlterSubscription_refresh(Subscription *sub, bool copy_data,
|
||||||
|
List *validate_publications)
|
||||||
{
|
{
|
||||||
char *err;
|
char *err;
|
||||||
List *pubrel_names;
|
List *pubrel_names;
|
||||||
|
@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
|
if (validate_publications)
|
||||||
|
check_publications(wrconn, validate_publications);
|
||||||
|
|
||||||
/* Get the list of relations from publisher. */
|
/* Get the list of relations from publisher. */
|
||||||
pubrel_names = fetch_table_list(wrconn, sub->publications);
|
pubrel_names = fetch_table_list(wrconn, sub->publications);
|
||||||
pubrel_names = list_concat(pubrel_names,
|
pubrel_names = list_concat(pubrel_names,
|
||||||
|
@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
||||||
/* Make sure refresh sees the new list of publications. */
|
/* Make sure refresh sees the new list of publications. */
|
||||||
sub->publications = stmt->publication;
|
sub->publications = stmt->publication;
|
||||||
|
|
||||||
AlterSubscription_refresh(sub, opts.copy_data);
|
AlterSubscription_refresh(sub, opts.copy_data,
|
||||||
|
stmt->publication);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
||||||
/* Refresh if user asked us to. */
|
/* Refresh if user asked us to. */
|
||||||
if (opts.refresh)
|
if (opts.refresh)
|
||||||
{
|
{
|
||||||
|
/* We only need to validate user specified publications. */
|
||||||
|
List *validate_publications = (isadd) ? stmt->publication : NULL;
|
||||||
|
|
||||||
if (!sub->enabled)
|
if (!sub->enabled)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
||||||
/* Refresh the new list of publications. */
|
/* Refresh the new list of publications. */
|
||||||
sub->publications = publist;
|
sub->publications = publist;
|
||||||
|
|
||||||
AlterSubscription_refresh(sub, opts.copy_data);
|
AlterSubscription_refresh(sub, opts.copy_data,
|
||||||
|
validate_publications);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
||||||
|
|
||||||
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
|
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
|
||||||
|
|
||||||
AlterSubscription_refresh(sub, opts.copy_data);
|
AlterSubscription_refresh(sub, opts.copy_data, NULL);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
|
||||||
StringInfoData cmd;
|
StringInfoData cmd;
|
||||||
TupleTableSlot *slot;
|
TupleTableSlot *slot;
|
||||||
Oid tableRow[2] = {TEXTOID, TEXTOID};
|
Oid tableRow[2] = {TEXTOID, TEXTOID};
|
||||||
ListCell *lc;
|
|
||||||
bool first;
|
|
||||||
List *tablelist = NIL;
|
List *tablelist = NIL;
|
||||||
|
|
||||||
Assert(list_length(publications) > 0);
|
|
||||||
|
|
||||||
initStringInfo(&cmd);
|
initStringInfo(&cmd);
|
||||||
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
|
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
|
||||||
" FROM pg_catalog.pg_publication_tables t\n"
|
" FROM pg_catalog.pg_publication_tables t\n"
|
||||||
" WHERE t.pubname IN (");
|
" WHERE t.pubname IN (");
|
||||||
first = true;
|
get_publications_str(publications, &cmd, true);
|
||||||
foreach(lc, publications)
|
|
||||||
{
|
|
||||||
char *pubname = strVal(lfirst(lc));
|
|
||||||
|
|
||||||
if (first)
|
|
||||||
first = false;
|
|
||||||
else
|
|
||||||
appendStringInfoString(&cmd, ", ");
|
|
||||||
|
|
||||||
appendStringInfoString(&cmd, quote_literal_cstr(pubname));
|
|
||||||
}
|
|
||||||
appendStringInfoChar(&cmd, ')');
|
appendStringInfoChar(&cmd, ')');
|
||||||
|
|
||||||
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
|
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
|
||||||
|
|
|
@ -41,6 +41,43 @@ COMMIT;
|
||||||
|
|
||||||
pass "subscription disable and drop in same transaction did not hang";
|
pass "subscription disable and drop in same transaction did not hang";
|
||||||
|
|
||||||
|
# One of the specified publications exists.
|
||||||
|
my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
|
||||||
|
"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub"
|
||||||
|
);
|
||||||
|
ok( $stderr =~
|
||||||
|
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
|
||||||
|
"Create subscription throws warning for non-existent publication");
|
||||||
|
|
||||||
|
$node_publisher->wait_for_catchup('mysub1');
|
||||||
|
|
||||||
|
# Also wait for initial table sync to finish.
|
||||||
|
my $synced_query =
|
||||||
|
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
||||||
|
$node_subscriber->poll_query_until('postgres', $synced_query)
|
||||||
|
or die "Timed out while waiting for subscriber to synchronize data";
|
||||||
|
|
||||||
|
# Also wait for initial table sync to finish.
|
||||||
|
$node_subscriber->poll_query_until('postgres', $synced_query)
|
||||||
|
or die "Timed out while waiting for subscriber to synchronize data";
|
||||||
|
|
||||||
|
# Specifying non-existent publication along with add publication.
|
||||||
|
($ret, $stdout, $stderr) = $node_subscriber->psql(
|
||||||
|
'postgres',
|
||||||
|
"ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2"
|
||||||
|
);
|
||||||
|
ok( $stderr =~
|
||||||
|
m/WARNING: publications "non_existent_pub1", "non_existent_pub2" do not exist in the publisher/,
|
||||||
|
"Alter subscription add publication throws warning for non-existent publications");
|
||||||
|
|
||||||
|
# Specifying non-existent publication along with set publication.
|
||||||
|
($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
|
||||||
|
"ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
|
||||||
|
);
|
||||||
|
ok( $stderr =~
|
||||||
|
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
|
||||||
|
"Alter subscription set publication throws warning for non-existent publication");
|
||||||
|
|
||||||
$node_subscriber->stop;
|
$node_subscriber->stop;
|
||||||
$node_publisher->stop;
|
$node_publisher->stop;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue