2017-01-19 18:00:00 +01:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* subscriptioncmds.c
|
|
|
|
* subscription catalog manipulation functions
|
|
|
|
*
|
2018-01-03 05:30:12 +01:00
|
|
|
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
|
2017-01-25 18:32:05 +01:00
|
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
2017-01-19 18:00:00 +01:00
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* subscriptioncmds.c
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include "miscadmin.h"
|
|
|
|
|
|
|
|
#include "access/heapam.h"
|
|
|
|
#include "access/htup_details.h"
|
2017-03-04 05:25:34 +01:00
|
|
|
#include "access/xact.h"
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "catalog/dependency.h"
|
2017-01-19 18:00:00 +01:00
|
|
|
#include "catalog/indexing.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "catalog/namespace.h"
|
2017-01-19 18:00:00 +01:00
|
|
|
#include "catalog/objectaccess.h"
|
|
|
|
#include "catalog/objectaddress.h"
|
|
|
|
#include "catalog/pg_type.h"
|
|
|
|
#include "catalog/pg_subscription.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "catalog/pg_subscription_rel.h"
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
#include "commands/defrem.h"
|
|
|
|
#include "commands/event_trigger.h"
|
|
|
|
#include "commands/subscriptioncmds.h"
|
|
|
|
|
2017-05-17 04:57:16 +02:00
|
|
|
#include "executor/executor.h"
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "nodes/makefuncs.h"
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
#include "replication/logicallauncher.h"
|
|
|
|
#include "replication/origin.h"
|
|
|
|
#include "replication/walreceiver.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "replication/walsender.h"
|
2017-01-19 18:00:00 +01:00
|
|
|
#include "replication/worker_internal.h"
|
|
|
|
|
|
|
|
#include "storage/lmgr.h"
|
|
|
|
|
|
|
|
#include "utils/builtins.h"
|
2017-04-14 19:58:46 +02:00
|
|
|
#include "utils/guc.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "utils/lsyscache.h"
|
2017-01-19 18:00:00 +01:00
|
|
|
#include "utils/memutils.h"
|
|
|
|
#include "utils/syscache.h"
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
/*
|
|
|
|
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
|
|
|
|
*
|
|
|
|
* Since not all options can be specified in both commands, this function
|
|
|
|
* will report an error on options if the target output pointer is NULL to
|
2017-04-26 17:03:07 +02:00
|
|
|
* accommodate that.
|
2017-01-19 18:00:00 +01:00
|
|
|
*/
|
|
|
|
static void
|
2017-03-23 13:36:36 +01:00
|
|
|
parse_subscription_options(List *options, bool *connect, bool *enabled_given,
|
2017-05-09 16:20:42 +02:00
|
|
|
bool *enabled, bool *create_slot,
|
|
|
|
bool *slot_name_given, char **slot_name,
|
2017-06-06 03:37:00 +02:00
|
|
|
bool *copy_data, char **synchronous_commit,
|
|
|
|
bool *refresh)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
|
|
|
ListCell *lc;
|
2017-03-23 13:36:36 +01:00
|
|
|
bool connect_given = false;
|
2017-01-19 18:00:00 +01:00
|
|
|
bool create_slot_given = false;
|
2017-03-23 13:36:36 +01:00
|
|
|
bool copy_data_given = false;
|
2017-06-06 03:37:00 +02:00
|
|
|
bool refresh_given = false;
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-05-15 19:59:58 +02:00
|
|
|
/* If connect is specified, the others also need to be. */
|
|
|
|
Assert(!connect || (enabled && create_slot && copy_data));
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
if (connect)
|
|
|
|
*connect = true;
|
2017-01-19 18:00:00 +01:00
|
|
|
if (enabled)
|
|
|
|
{
|
|
|
|
*enabled_given = false;
|
|
|
|
*enabled = true;
|
|
|
|
}
|
|
|
|
if (create_slot)
|
|
|
|
*create_slot = true;
|
|
|
|
if (slot_name)
|
2017-05-09 16:20:42 +02:00
|
|
|
{
|
|
|
|
*slot_name_given = false;
|
2017-01-19 18:00:00 +01:00
|
|
|
*slot_name = NULL;
|
2017-05-09 16:20:42 +02:00
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
if (copy_data)
|
|
|
|
*copy_data = true;
|
2017-04-14 19:58:46 +02:00
|
|
|
if (synchronous_commit)
|
|
|
|
*synchronous_commit = NULL;
|
2017-06-06 03:37:00 +02:00
|
|
|
if (refresh)
|
|
|
|
*refresh = true;
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
/* Parse options */
|
2017-05-17 22:31:56 +02:00
|
|
|
foreach(lc, options)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
|
|
|
DefElem *defel = (DefElem *) lfirst(lc);
|
|
|
|
|
2017-05-12 14:57:01 +02:00
|
|
|
if (strcmp(defel->defname, "connect") == 0 && connect)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
2017-03-23 13:36:36 +01:00
|
|
|
if (connect_given)
|
2017-01-19 18:00:00 +01:00
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
connect_given = true;
|
2017-05-12 14:57:01 +02:00
|
|
|
*connect = defGetBoolean(defel);
|
2017-01-19 18:00:00 +01:00
|
|
|
}
|
|
|
|
else if (strcmp(defel->defname, "enabled") == 0 && enabled)
|
|
|
|
{
|
|
|
|
if (*enabled_given)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
|
|
|
*enabled_given = true;
|
|
|
|
*enabled = defGetBoolean(defel);
|
|
|
|
}
|
2017-05-12 14:57:01 +02:00
|
|
|
else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
|
|
|
if (create_slot_given)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
|
|
|
create_slot_given = true;
|
|
|
|
*create_slot = defGetBoolean(defel);
|
|
|
|
}
|
2017-05-12 14:57:01 +02:00
|
|
|
else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
2017-05-09 16:20:42 +02:00
|
|
|
if (*slot_name_given)
|
2017-01-19 18:00:00 +01:00
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
2017-05-09 16:20:42 +02:00
|
|
|
*slot_name_given = true;
|
2017-01-19 18:00:00 +01:00
|
|
|
*slot_name = defGetString(defel);
|
2017-05-09 16:20:42 +02:00
|
|
|
|
|
|
|
/* Setting slot_name = NONE is treated as no slot name. */
|
|
|
|
if (strcmp(*slot_name, "none") == 0)
|
|
|
|
*slot_name = NULL;
|
2017-01-19 18:00:00 +01:00
|
|
|
}
|
2017-05-12 14:57:01 +02:00
|
|
|
else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
|
|
|
if (copy_data_given)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
|
|
|
copy_data_given = true;
|
|
|
|
*copy_data = defGetBoolean(defel);
|
|
|
|
}
|
2017-04-14 19:58:46 +02:00
|
|
|
else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
|
|
|
|
synchronous_commit)
|
|
|
|
{
|
|
|
|
if (*synchronous_commit)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
|
|
|
*synchronous_commit = defGetString(defel);
|
|
|
|
|
|
|
|
/* Test if the given value is valid for synchronous_commit GUC. */
|
|
|
|
(void) set_config_option("synchronous_commit", *synchronous_commit,
|
|
|
|
PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
|
|
|
|
false, 0, false);
|
|
|
|
}
|
2017-06-06 03:37:00 +02:00
|
|
|
else if (strcmp(defel->defname, "refresh") == 0 && refresh)
|
|
|
|
{
|
|
|
|
if (refresh_given)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("conflicting or redundant options")));
|
|
|
|
|
|
|
|
refresh_given = true;
|
|
|
|
*refresh = defGetBoolean(defel);
|
|
|
|
}
|
2017-01-19 18:00:00 +01:00
|
|
|
else
|
2017-05-18 02:47:37 +02:00
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("unrecognized subscription parameter: %s", defel->defname)));
|
2017-01-19 18:00:00 +01:00
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/*
|
|
|
|
* We've been explicitly asked to not connect, that requires some
|
|
|
|
* additional processing.
|
|
|
|
*/
|
|
|
|
if (connect && !*connect)
|
|
|
|
{
|
|
|
|
/* Check for incompatible options from the user. */
|
2017-05-09 16:20:42 +02:00
|
|
|
if (enabled && *enabled_given && *enabled)
|
2017-03-23 13:36:36 +01:00
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
2017-05-17 18:22:56 +02:00
|
|
|
errmsg("connect = false and enabled = true are mutually exclusive options")));
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-05-09 16:20:42 +02:00
|
|
|
if (create_slot && create_slot_given && *create_slot)
|
2017-03-23 13:36:36 +01:00
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
2017-05-17 18:22:56 +02:00
|
|
|
errmsg("connect = false and create_slot = true are mutually exclusive options")));
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-05-09 16:20:42 +02:00
|
|
|
if (copy_data && copy_data_given && *copy_data)
|
2017-03-23 13:36:36 +01:00
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
2017-05-17 18:22:56 +02:00
|
|
|
errmsg("connect = false and copy_data = true are mutually exclusive options")));
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Change the defaults of other options. */
|
|
|
|
*enabled = false;
|
|
|
|
*create_slot = false;
|
|
|
|
*copy_data = false;
|
|
|
|
}
|
2017-05-09 16:20:42 +02:00
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Do additional checking for disallowed combination when slot_name = NONE
|
|
|
|
* was used.
|
2017-05-09 16:20:42 +02:00
|
|
|
*/
|
|
|
|
if (slot_name && *slot_name_given && !*slot_name)
|
|
|
|
{
|
|
|
|
if (enabled && *enabled_given && *enabled)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
2017-05-17 18:22:56 +02:00
|
|
|
errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
|
2017-05-09 16:20:42 +02:00
|
|
|
|
|
|
|
if (create_slot && create_slot_given && *create_slot)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
2017-05-17 18:22:56 +02:00
|
|
|
errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
|
2017-05-18 02:47:37 +02:00
|
|
|
|
|
|
|
if (enabled && !*enabled_given && *enabled)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("subscription with slot_name = NONE must also set enabled = false")));
|
|
|
|
|
|
|
|
if (create_slot && !create_slot_given && *create_slot)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("subscription with slot_name = NONE must also set create_slot = false")));
|
2017-05-09 16:20:42 +02:00
|
|
|
}
|
2017-01-19 18:00:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-09-23 21:16:48 +02:00
|
|
|
* Auxiliary function to build a text array out of a list of String nodes.
|
2017-01-19 18:00:00 +01:00
|
|
|
*/
|
|
|
|
static Datum
|
|
|
|
publicationListToArray(List *publist)
|
|
|
|
{
|
|
|
|
ArrayType *arr;
|
|
|
|
Datum *datums;
|
|
|
|
int j = 0;
|
|
|
|
ListCell *cell;
|
|
|
|
MemoryContext memcxt;
|
|
|
|
MemoryContext oldcxt;
|
|
|
|
|
|
|
|
/* Create memory context for temporary allocations. */
|
|
|
|
memcxt = AllocSetContextCreate(CurrentMemoryContext,
|
|
|
|
"publicationListToArray to array",
|
Rethink MemoryContext creation to improve performance.
This patch makes a number of interrelated changes to reduce the overhead
involved in creating/deleting memory contexts. The key ideas are:
* Include the AllocSetContext header of an aset.c context in its first
malloc request, rather than allocating it separately in TopMemoryContext.
This means that we now always create an initial or "keeper" block in an
aset, even if it never receives any allocation requests.
* Create freelists in which we can save and recycle recently-destroyed
asets (this idea is due to Robert Haas).
* In the common case where the name of a context is a constant string,
just store a pointer to it in the context header, rather than copying
the string.
The first change eliminates a palloc/pfree cycle per context, and
also avoids bloat in TopMemoryContext, at the price that creating
a context now involves a malloc/free cycle even if the context never
receives any allocations. That would be a loser for some common
usage patterns, but recycling short-lived contexts via the freelist
eliminates that pain.
Avoiding copying constant strings not only saves strlen() and strcpy()
overhead, but is an essential part of the freelist optimization because
it makes the context header size constant. Currently we make no
attempt to use the freelist for contexts with non-constant names.
(Perhaps someday we'll need to think harder about that, but in current
usage, most contexts with custom names are long-lived anyway.)
The freelist management in this initial commit is pretty simplistic,
and we might want to refine it later --- but in common workloads that
will never matter because the freelists will never get full anyway.
To create a context with a non-constant name, one is now required to
call AllocSetContextCreateExtended and specify the MEMCONTEXT_COPY_NAME
option. AllocSetContextCreate becomes a wrapper macro, and it includes
a test that will complain about non-string-literal context name
parameters on gcc and similar compilers.
An unfortunate side effect of making AllocSetContextCreate a macro is
that one is now *required* to use the size parameter abstraction macros
(ALLOCSET_DEFAULT_SIZES and friends) with it; the pre-9.6 habit of
writing out individual size parameters no longer works unless you
switch to AllocSetContextCreateExtended.
Internally to the memory-context-related modules, the context creation
APIs are simplified, removing the rather baroque original design whereby
a context-type module called mcxt.c which then called back into the
context-type module. That saved a bit of code duplication, but not much,
and it prevented context-type modules from exercising control over the
allocation of context headers.
In passing, I converted the test-and-elog validation of aset size
parameters into Asserts to save a few more cycles. The original thought
was that callers might compute size parameters on the fly, but in practice
nobody does that, so it's useless to expend cycles on checking those
numbers in production builds.
Also, mark the memory context method-pointer structs "const",
just for cleanliness.
Discussion: https://postgr.es/m/2264.1512870796@sss.pgh.pa.us
2017-12-13 19:55:12 +01:00
|
|
|
ALLOCSET_DEFAULT_SIZES);
|
2017-01-19 18:00:00 +01:00
|
|
|
oldcxt = MemoryContextSwitchTo(memcxt);
|
|
|
|
|
2017-09-23 21:16:48 +02:00
|
|
|
datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
foreach(cell, publist)
|
|
|
|
{
|
|
|
|
char *name = strVal(lfirst(cell));
|
|
|
|
ListCell *pcell;
|
|
|
|
|
|
|
|
/* Check for duplicates. */
|
|
|
|
foreach(pcell, publist)
|
|
|
|
{
|
2017-03-25 02:48:05 +01:00
|
|
|
char *pname = strVal(lfirst(pcell));
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-09-23 21:16:48 +02:00
|
|
|
if (pcell == cell)
|
2017-01-19 18:00:00 +01:00
|
|
|
break;
|
|
|
|
|
|
|
|
if (strcmp(name, pname) == 0)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("publication name \"%s\" used more than once",
|
|
|
|
pname)));
|
|
|
|
}
|
|
|
|
|
|
|
|
datums[j++] = CStringGetTextDatum(name);
|
|
|
|
}
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(oldcxt);
|
|
|
|
|
|
|
|
arr = construct_array(datums, list_length(publist),
|
|
|
|
TEXTOID, -1, false, 'i');
|
2017-09-23 21:16:48 +02:00
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
MemoryContextDelete(memcxt);
|
|
|
|
|
|
|
|
return PointerGetDatum(arr);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Create new subscription.
|
|
|
|
*/
|
|
|
|
ObjectAddress
|
2017-03-04 05:25:34 +01:00
|
|
|
CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
|
|
|
Relation rel;
|
|
|
|
ObjectAddress myself;
|
|
|
|
Oid subid;
|
|
|
|
bool nulls[Natts_pg_subscription];
|
|
|
|
Datum values[Natts_pg_subscription];
|
2017-01-20 20:45:02 +01:00
|
|
|
Oid owner = GetUserId();
|
2017-01-19 18:00:00 +01:00
|
|
|
HeapTuple tup;
|
2017-03-23 13:36:36 +01:00
|
|
|
bool connect;
|
2017-01-19 18:00:00 +01:00
|
|
|
bool enabled_given;
|
|
|
|
bool enabled;
|
2017-03-23 13:36:36 +01:00
|
|
|
bool copy_data;
|
2017-04-14 19:58:46 +02:00
|
|
|
char *synchronous_commit;
|
2017-01-19 18:00:00 +01:00
|
|
|
char *conninfo;
|
|
|
|
char *slotname;
|
2017-05-09 16:20:42 +02:00
|
|
|
bool slotname_given;
|
2017-01-19 18:00:00 +01:00
|
|
|
char originname[NAMEDATALEN];
|
|
|
|
bool create_slot;
|
|
|
|
List *publications;
|
|
|
|
|
2017-03-04 05:25:34 +01:00
|
|
|
/*
|
|
|
|
* Parse and check options.
|
2017-05-17 02:36:35 +02:00
|
|
|
*
|
2017-03-04 05:25:34 +01:00
|
|
|
* Connection and publication should not be specified here.
|
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
parse_subscription_options(stmt->options, &connect, &enabled_given,
|
2017-05-09 16:20:42 +02:00
|
|
|
&enabled, &create_slot, &slotname_given,
|
2017-06-06 03:37:00 +02:00
|
|
|
&slotname, ©_data, &synchronous_commit,
|
|
|
|
NULL);
|
2017-03-04 05:25:34 +01:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Since creating a replication slot is not transactional, rolling back
|
|
|
|
* the transaction leaves the created replication slot. So we cannot run
|
|
|
|
* CREATE SUBSCRIPTION inside a transaction block if creating a
|
|
|
|
* replication slot.
|
|
|
|
*/
|
|
|
|
if (create_slot)
|
2017-05-12 14:57:01 +02:00
|
|
|
PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
|
2017-03-04 05:25:34 +01:00
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
if (!superuser())
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
|
|
|
(errmsg("must be superuser to create subscriptions"))));
|
|
|
|
|
|
|
|
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
|
|
|
|
|
|
|
|
/* Check if name is used */
|
|
|
|
subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
|
|
|
|
CStringGetDatum(stmt->subname));
|
|
|
|
if (OidIsValid(subid))
|
|
|
|
{
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_DUPLICATE_OBJECT),
|
|
|
|
errmsg("subscription \"%s\" already exists",
|
|
|
|
stmt->subname)));
|
|
|
|
}
|
|
|
|
|
2017-05-09 16:20:42 +02:00
|
|
|
if (!slotname_given && slotname == NULL)
|
2017-01-19 18:00:00 +01:00
|
|
|
slotname = stmt->subname;
|
2017-05-09 16:20:42 +02:00
|
|
|
|
2017-04-14 19:58:46 +02:00
|
|
|
/* The default for synchronous_commit of subscriptions is off. */
|
|
|
|
if (synchronous_commit == NULL)
|
|
|
|
synchronous_commit = "off";
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
conninfo = stmt->conninfo;
|
|
|
|
publications = stmt->publication;
|
|
|
|
|
|
|
|
/* Load the library providing us libpq calls. */
|
|
|
|
load_file("libpqwalreceiver", false);
|
|
|
|
|
|
|
|
/* Check the connection info string. */
|
|
|
|
walrcv_check_conninfo(conninfo);
|
|
|
|
|
|
|
|
/* Everything ok, form a new tuple. */
|
|
|
|
memset(values, 0, sizeof(values));
|
|
|
|
memset(nulls, false, sizeof(nulls));
|
|
|
|
|
|
|
|
values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
|
|
|
|
values[Anum_pg_subscription_subname - 1] =
|
|
|
|
DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
|
2017-01-20 20:45:02 +01:00
|
|
|
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
|
2017-01-19 18:00:00 +01:00
|
|
|
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
|
|
|
|
values[Anum_pg_subscription_subconninfo - 1] =
|
|
|
|
CStringGetTextDatum(conninfo);
|
2017-05-09 16:20:42 +02:00
|
|
|
if (slotname)
|
|
|
|
values[Anum_pg_subscription_subslotname - 1] =
|
|
|
|
DirectFunctionCall1(namein, CStringGetDatum(slotname));
|
|
|
|
else
|
|
|
|
nulls[Anum_pg_subscription_subslotname - 1] = true;
|
2017-04-14 19:58:46 +02:00
|
|
|
values[Anum_pg_subscription_subsynccommit - 1] =
|
|
|
|
CStringGetTextDatum(synchronous_commit);
|
2017-01-19 18:00:00 +01:00
|
|
|
values[Anum_pg_subscription_subpublications - 1] =
|
2017-05-17 22:31:56 +02:00
|
|
|
publicationListToArray(publications);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
|
|
|
|
|
|
|
|
/* Insert tuple into catalog. */
|
2017-01-31 22:42:24 +01:00
|
|
|
subid = CatalogTupleInsert(rel, tup);
|
2017-01-19 18:00:00 +01:00
|
|
|
heap_freetuple(tup);
|
|
|
|
|
2017-01-20 20:45:02 +01:00
|
|
|
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
snprintf(originname, sizeof(originname), "pg_%u", subid);
|
|
|
|
replorigin_create(originname);
|
|
|
|
|
|
|
|
/*
|
2017-03-23 13:36:36 +01:00
|
|
|
* Connect to remote side to execute requested commands and fetch table
|
|
|
|
* info.
|
2017-01-19 18:00:00 +01:00
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
if (connect)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
XLogRecPtr lsn;
|
|
|
|
char *err;
|
|
|
|
WalReceiverConn *wrconn;
|
|
|
|
List *tables;
|
|
|
|
ListCell *lc;
|
|
|
|
char table_state;
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
/* Try to connect to the publisher. */
|
|
|
|
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
|
|
|
|
if (!wrconn)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not connect to the publisher: %s", err)));
|
|
|
|
|
2017-01-25 16:47:53 +01:00
|
|
|
PG_TRY();
|
|
|
|
{
|
2017-03-23 13:36:36 +01:00
|
|
|
/*
|
|
|
|
* Set sync state based on if we were asked to do data copy or
|
|
|
|
* not.
|
2017-03-14 22:13:56 +01:00
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Get the table list from publisher and build local table status
|
|
|
|
* info.
|
|
|
|
*/
|
|
|
|
tables = fetch_table_list(wrconn, publications);
|
2017-05-17 22:31:56 +02:00
|
|
|
foreach(lc, tables)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
|
|
|
RangeVar *rv = (RangeVar *) lfirst(lc);
|
|
|
|
Oid relid;
|
|
|
|
|
2017-03-24 19:44:11 +01:00
|
|
|
relid = RangeVarGetRelid(rv, AccessShareLock, false);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-05-17 04:57:16 +02:00
|
|
|
/* Check for supported relkind. */
|
|
|
|
CheckSubscriptionRelkind(get_rel_relkind(relid),
|
|
|
|
rv->schemaname, rv->relname);
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
SetSubscriptionRelState(subid, relid, table_state,
|
2017-06-07 19:49:14 +02:00
|
|
|
InvalidXLogRecPtr, false);
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
|
2017-04-21 14:35:24 +02:00
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* If requested, create permanent slot for the subscription. We
|
|
|
|
* won't use the initial snapshot for anything, so no need to
|
|
|
|
* export it.
|
2017-04-21 14:35:24 +02:00
|
|
|
*/
|
|
|
|
if (create_slot)
|
|
|
|
{
|
2017-05-09 16:20:42 +02:00
|
|
|
Assert(slotname);
|
|
|
|
|
2017-04-21 14:35:24 +02:00
|
|
|
walrcv_create_slot(wrconn, slotname, false,
|
|
|
|
CRS_NOEXPORT_SNAPSHOT, &lsn);
|
|
|
|
ereport(NOTICE,
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:35:54 +02:00
|
|
|
(errmsg("created replication slot \"%s\" on publisher",
|
|
|
|
slotname)));
|
2017-04-21 14:35:24 +02:00
|
|
|
}
|
2017-01-25 16:47:53 +01:00
|
|
|
}
|
|
|
|
PG_CATCH();
|
|
|
|
{
|
|
|
|
/* Close the connection in case of failure. */
|
|
|
|
walrcv_disconnect(wrconn);
|
|
|
|
PG_RE_THROW();
|
|
|
|
}
|
|
|
|
PG_END_TRY();
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
/* And we are done with the remote side. */
|
|
|
|
walrcv_disconnect(wrconn);
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
else
|
|
|
|
ereport(WARNING,
|
|
|
|
(errmsg("tables were not subscribed, you will have to run "
|
|
|
|
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
|
|
|
|
"subscribe the tables")));
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
heap_close(rel, RowExclusiveLock);
|
|
|
|
|
2017-05-02 04:50:32 +02:00
|
|
|
if (enabled)
|
|
|
|
ApplyLauncherWakeupAtCommit();
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
|
|
|
|
|
|
|
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
|
|
|
|
|
|
|
|
return myself;
|
|
|
|
}
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
static void
|
|
|
|
AlterSubscription_refresh(Subscription *sub, bool copy_data)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
char *err;
|
2017-03-23 13:36:36 +01:00
|
|
|
List *pubrel_names;
|
|
|
|
List *subrel_states;
|
|
|
|
Oid *subrel_local_oids;
|
|
|
|
Oid *pubrel_local_oids;
|
|
|
|
ListCell *lc;
|
|
|
|
int off;
|
|
|
|
|
|
|
|
/* Load the library providing us libpq calls. */
|
|
|
|
load_file("libpqwalreceiver", false);
|
|
|
|
|
|
|
|
/* Try to connect to the publisher. */
|
|
|
|
wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
|
|
|
|
if (!wrconn)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not connect to the publisher: %s", err)));
|
|
|
|
|
|
|
|
/* Get the table list from publisher. */
|
|
|
|
pubrel_names = fetch_table_list(wrconn, sub->publications);
|
|
|
|
|
|
|
|
/* We are done with the remote side, close connection. */
|
|
|
|
walrcv_disconnect(wrconn);
|
|
|
|
|
|
|
|
/* Get local table list. */
|
|
|
|
subrel_states = GetSubscriptionRelations(sub->oid);
|
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Build qsorted array of local table oids for faster lookup. This can
|
|
|
|
* potentially contain all tables in the database so speed of lookup is
|
|
|
|
* important.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
|
|
|
|
off = 0;
|
|
|
|
foreach(lc, subrel_states)
|
|
|
|
{
|
|
|
|
SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
|
2017-05-17 22:31:56 +02:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
subrel_local_oids[off++] = relstate->relid;
|
|
|
|
}
|
|
|
|
qsort(subrel_local_oids, list_length(subrel_states),
|
|
|
|
sizeof(Oid), oid_cmp);
|
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Walk over the remote tables and try to match them to locally known
|
|
|
|
* tables. If the table is not known locally create a new state for it.
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
|
|
|
* Also builds array of local oids of remote tables for the next step.
|
|
|
|
*/
|
|
|
|
off = 0;
|
|
|
|
pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
|
|
|
|
|
2017-05-17 22:31:56 +02:00
|
|
|
foreach(lc, pubrel_names)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
|
|
|
RangeVar *rv = (RangeVar *) lfirst(lc);
|
|
|
|
Oid relid;
|
|
|
|
|
|
|
|
relid = RangeVarGetRelid(rv, AccessShareLock, false);
|
2017-05-17 04:57:16 +02:00
|
|
|
|
|
|
|
/* Check for supported relkind. */
|
|
|
|
CheckSubscriptionRelkind(get_rel_relkind(relid),
|
|
|
|
rv->schemaname, rv->relname);
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
pubrel_local_oids[off++] = relid;
|
|
|
|
|
|
|
|
if (!bsearch(&relid, subrel_local_oids,
|
|
|
|
list_length(subrel_states), sizeof(Oid), oid_cmp))
|
|
|
|
{
|
|
|
|
SetSubscriptionRelState(sub->oid, relid,
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:35:54 +02:00
|
|
|
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
|
2017-06-07 19:49:14 +02:00
|
|
|
InvalidXLogRecPtr, false);
|
2017-08-07 15:16:03 +02:00
|
|
|
ereport(DEBUG1,
|
2017-08-07 15:40:12 +02:00
|
|
|
(errmsg("table \"%s.%s\" added to subscription \"%s\"",
|
|
|
|
rv->schemaname, rv->relname, sub->name)));
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Next remove state for tables we should not care about anymore using the
|
|
|
|
* data we collected above
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
qsort(pubrel_local_oids, list_length(pubrel_names),
|
|
|
|
sizeof(Oid), oid_cmp);
|
|
|
|
|
|
|
|
for (off = 0; off < list_length(subrel_states); off++)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
Oid relid = subrel_local_oids[off];
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
if (!bsearch(&relid, pubrel_local_oids,
|
|
|
|
list_length(pubrel_names), sizeof(Oid), oid_cmp))
|
|
|
|
{
|
|
|
|
RemoveSubscriptionRel(sub->oid, relid);
|
|
|
|
|
2017-08-05 03:14:35 +02:00
|
|
|
logicalrep_worker_stop_at_commit(sub->oid, relid);
|
2017-06-09 15:47:52 +02:00
|
|
|
|
2017-08-07 15:16:03 +02:00
|
|
|
ereport(DEBUG1,
|
2017-08-07 15:40:12 +02:00
|
|
|
(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
|
|
|
|
get_namespace_name(get_rel_namespace(relid)),
|
|
|
|
get_rel_name(relid),
|
|
|
|
sub->name)));
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
/*
|
|
|
|
* Alter the existing subscription.
|
|
|
|
*/
|
|
|
|
ObjectAddress
|
|
|
|
AlterSubscription(AlterSubscriptionStmt *stmt)
|
|
|
|
{
|
|
|
|
Relation rel;
|
|
|
|
ObjectAddress myself;
|
|
|
|
bool nulls[Natts_pg_subscription];
|
|
|
|
bool replaces[Natts_pg_subscription];
|
|
|
|
Datum values[Natts_pg_subscription];
|
|
|
|
HeapTuple tup;
|
|
|
|
Oid subid;
|
2017-03-23 13:36:36 +01:00
|
|
|
bool update_tuple = false;
|
2017-05-17 22:31:56 +02:00
|
|
|
Subscription *sub;
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
|
|
|
|
|
|
|
|
/* Fetch the existing tuple. */
|
|
|
|
tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
|
|
|
|
CStringGetDatum(stmt->subname));
|
|
|
|
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
|
|
errmsg("subscription \"%s\" does not exist",
|
|
|
|
stmt->subname)));
|
|
|
|
|
|
|
|
/* must be owner */
|
|
|
|
if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
|
2017-12-02 15:26:34 +01:00
|
|
|
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
|
2017-01-19 18:00:00 +01:00
|
|
|
stmt->subname);
|
|
|
|
|
|
|
|
subid = HeapTupleGetOid(tup);
|
2017-05-09 16:20:42 +02:00
|
|
|
sub = GetSubscription(subid, false);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-07-04 04:47:06 +02:00
|
|
|
/* Lock the subscription so nobody else can do anything with it. */
|
|
|
|
LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
/* Form a new tuple. */
|
|
|
|
memset(values, 0, sizeof(values));
|
|
|
|
memset(nulls, false, sizeof(nulls));
|
|
|
|
memset(replaces, false, sizeof(replaces));
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
switch (stmt->kind)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
2017-03-23 13:36:36 +01:00
|
|
|
case ALTER_SUBSCRIPTION_OPTIONS:
|
|
|
|
{
|
2017-05-09 16:20:42 +02:00
|
|
|
char *slotname;
|
|
|
|
bool slotname_given;
|
2017-04-14 19:58:46 +02:00
|
|
|
char *synchronous_commit;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
parse_subscription_options(stmt->options, NULL, NULL, NULL,
|
2017-05-09 16:20:42 +02:00
|
|
|
NULL, &slotname_given, &slotname,
|
2017-06-06 03:37:00 +02:00
|
|
|
NULL, &synchronous_commit, NULL);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-05-09 16:20:42 +02:00
|
|
|
if (slotname_given)
|
2017-04-14 19:58:46 +02:00
|
|
|
{
|
2017-05-09 16:20:42 +02:00
|
|
|
if (sub->enabled && !slotname)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("cannot set slot_name = NONE for enabled subscription")));
|
|
|
|
|
|
|
|
if (slotname)
|
|
|
|
values[Anum_pg_subscription_subslotname - 1] =
|
2017-05-17 22:31:56 +02:00
|
|
|
DirectFunctionCall1(namein, CStringGetDatum(slotname));
|
2017-05-09 16:20:42 +02:00
|
|
|
else
|
|
|
|
nulls[Anum_pg_subscription_subslotname - 1] = true;
|
2017-04-14 19:58:46 +02:00
|
|
|
replaces[Anum_pg_subscription_subslotname - 1] = true;
|
|
|
|
}
|
2017-05-09 16:20:42 +02:00
|
|
|
|
2017-04-14 19:58:46 +02:00
|
|
|
if (synchronous_commit)
|
|
|
|
{
|
|
|
|
values[Anum_pg_subscription_subsynccommit - 1] =
|
|
|
|
CStringGetTextDatum(synchronous_commit);
|
|
|
|
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
update_tuple = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
case ALTER_SUBSCRIPTION_ENABLED:
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
bool enabled,
|
|
|
|
enabled_given;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
parse_subscription_options(stmt->options, NULL,
|
|
|
|
&enabled_given, &enabled, NULL,
|
2017-06-06 03:37:00 +02:00
|
|
|
NULL, NULL, NULL, NULL, NULL);
|
2017-03-23 13:36:36 +01:00
|
|
|
Assert(enabled_given);
|
|
|
|
|
2017-05-09 16:20:42 +02:00
|
|
|
if (!sub->slotname && enabled)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("cannot enable subscription that does not have a slot name")));
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
values[Anum_pg_subscription_subenabled - 1] =
|
|
|
|
BoolGetDatum(enabled);
|
|
|
|
replaces[Anum_pg_subscription_subenabled - 1] = true;
|
|
|
|
|
2017-04-25 20:40:33 +02:00
|
|
|
if (enabled)
|
|
|
|
ApplyLauncherWakeupAtCommit();
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
update_tuple = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
case ALTER_SUBSCRIPTION_CONNECTION:
|
2017-05-08 20:01:00 +02:00
|
|
|
/* Load the library providing us libpq calls. */
|
|
|
|
load_file("libpqwalreceiver", false);
|
|
|
|
/* Check the connection info string. */
|
|
|
|
walrcv_check_conninfo(stmt->conninfo);
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
values[Anum_pg_subscription_subconninfo - 1] =
|
|
|
|
CStringGetTextDatum(stmt->conninfo);
|
|
|
|
replaces[Anum_pg_subscription_subconninfo - 1] = true;
|
|
|
|
update_tuple = true;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case ALTER_SUBSCRIPTION_PUBLICATION:
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
bool copy_data;
|
2017-06-06 03:37:00 +02:00
|
|
|
bool refresh;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
parse_subscription_options(stmt->options, NULL, NULL, NULL,
|
2017-05-09 16:20:42 +02:00
|
|
|
NULL, NULL, NULL, ©_data,
|
2017-06-06 03:37:00 +02:00
|
|
|
NULL, &refresh);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
values[Anum_pg_subscription_subpublications - 1] =
|
2017-05-17 22:31:56 +02:00
|
|
|
publicationListToArray(stmt->publication);
|
2017-03-23 13:36:36 +01:00
|
|
|
replaces[Anum_pg_subscription_subpublications - 1] = true;
|
|
|
|
|
|
|
|
update_tuple = true;
|
|
|
|
|
|
|
|
/* Refresh if user asked us to. */
|
2017-06-06 03:37:00 +02:00
|
|
|
if (refresh)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
2017-05-09 16:20:42 +02:00
|
|
|
if (!sub->enabled)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
2017-06-06 03:37:00 +02:00
|
|
|
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
|
|
|
|
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
|
2017-05-09 16:20:42 +02:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
/* Make sure refresh sees the new list of publications. */
|
|
|
|
sub->publications = stmt->publication;
|
|
|
|
|
|
|
|
AlterSubscription_refresh(sub, copy_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
case ALTER_SUBSCRIPTION_REFRESH:
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
bool copy_data;
|
2017-05-09 16:20:42 +02:00
|
|
|
|
|
|
|
if (!sub->enabled)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
|
|
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
parse_subscription_options(stmt->options, NULL, NULL, NULL,
|
2017-05-09 16:20:42 +02:00
|
|
|
NULL, NULL, NULL, ©_data,
|
2017-06-06 03:37:00 +02:00
|
|
|
NULL, NULL);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
AlterSubscription_refresh(sub, copy_data);
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
|
|
|
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
|
|
|
|
stmt->kind);
|
2017-01-19 18:00:00 +01:00
|
|
|
}
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
/* Update the catalog if needed. */
|
|
|
|
if (update_tuple)
|
|
|
|
{
|
|
|
|
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
|
|
|
|
replaces);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
CatalogTupleUpdate(rel, &tup->t_self, tup);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
heap_freetuple(tup);
|
|
|
|
}
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
heap_close(rel, RowExclusiveLock);
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
|
|
|
|
|
|
|
|
return myself;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Drop a subscription
|
|
|
|
*/
|
|
|
|
void
|
2017-03-04 05:25:34 +01:00
|
|
|
DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
|
|
|
Relation rel;
|
|
|
|
ObjectAddress myself;
|
|
|
|
HeapTuple tup;
|
|
|
|
Oid subid;
|
|
|
|
Datum datum;
|
|
|
|
bool isnull;
|
|
|
|
char *subname;
|
|
|
|
char *conninfo;
|
|
|
|
char *slotname;
|
2017-08-05 03:14:35 +02:00
|
|
|
List *subworkers;
|
|
|
|
ListCell *lc;
|
2017-01-19 18:00:00 +01:00
|
|
|
char originname[NAMEDATALEN];
|
|
|
|
char *err = NULL;
|
2017-05-17 22:31:56 +02:00
|
|
|
RepOriginId originid;
|
|
|
|
WalReceiverConn *wrconn = NULL;
|
|
|
|
StringInfoData cmd;
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-03-08 17:44:23 +01:00
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Lock pg_subscription with AccessExclusiveLock to ensure that the
|
|
|
|
* launcher doesn't restart new worker during dropping the subscription
|
2017-03-08 17:44:23 +01:00
|
|
|
*/
|
|
|
|
rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
|
|
|
|
CStringGetDatum(stmt->subname));
|
|
|
|
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
|
|
{
|
|
|
|
heap_close(rel, NoLock);
|
|
|
|
|
|
|
|
if (!stmt->missing_ok)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
|
|
errmsg("subscription \"%s\" does not exist",
|
|
|
|
stmt->subname)));
|
|
|
|
else
|
|
|
|
ereport(NOTICE,
|
|
|
|
(errmsg("subscription \"%s\" does not exist, skipping",
|
|
|
|
stmt->subname)));
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
subid = HeapTupleGetOid(tup);
|
|
|
|
|
|
|
|
/* must be owner */
|
|
|
|
if (!pg_subscription_ownercheck(subid, GetUserId()))
|
2017-12-02 15:26:34 +01:00
|
|
|
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
|
2017-01-19 18:00:00 +01:00
|
|
|
stmt->subname);
|
|
|
|
|
|
|
|
/* DROP hook for the subscription being removed */
|
|
|
|
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
|
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Lock the subscription so nobody else can do anything with it (including
|
|
|
|
* the replication workers).
|
2017-01-19 18:00:00 +01:00
|
|
|
*/
|
|
|
|
LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
|
|
|
|
|
|
|
|
/* Get subname */
|
|
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
|
|
|
|
Anum_pg_subscription_subname, &isnull);
|
|
|
|
Assert(!isnull);
|
|
|
|
subname = pstrdup(NameStr(*DatumGetName(datum)));
|
|
|
|
|
|
|
|
/* Get conninfo */
|
|
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
|
|
|
|
Anum_pg_subscription_subconninfo, &isnull);
|
|
|
|
Assert(!isnull);
|
2017-04-14 18:54:09 +02:00
|
|
|
conninfo = TextDatumGetCString(datum);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
/* Get slotname */
|
|
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
|
|
|
|
Anum_pg_subscription_subslotname, &isnull);
|
2017-05-09 16:20:42 +02:00
|
|
|
if (!isnull)
|
|
|
|
slotname = pstrdup(NameStr(*DatumGetName(datum)));
|
|
|
|
else
|
|
|
|
slotname = NULL;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Since dropping a replication slot is not transactional, the replication
|
|
|
|
* slot stays dropped even if the transaction rolls back. So we cannot
|
|
|
|
* run DROP SUBSCRIPTION inside a transaction block if dropping the
|
|
|
|
* replication slot.
|
|
|
|
*
|
|
|
|
* XXX The command name should really be something like "DROP SUBSCRIPTION
|
|
|
|
* of a subscription that is associated with a replication slot", but we
|
|
|
|
* don't have the proper facilities for that.
|
|
|
|
*/
|
|
|
|
if (slotname)
|
|
|
|
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
|
|
|
EventTriggerSQLDropAddObject(&myself, true, true);
|
|
|
|
|
|
|
|
/* Remove the tuple from catalog. */
|
2017-02-01 22:13:30 +01:00
|
|
|
CatalogTupleDelete(rel, &tup->t_self);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
ReleaseSysCache(tup);
|
|
|
|
|
2017-08-05 03:14:35 +02:00
|
|
|
/*
|
Fix DROP SUBSCRIPTION hang
When ALTER SUBSCRIPTION DISABLE is run in the same transaction before
DROP SUBSCRIPTION, the latter will hang because workers will still be
running, not having seen the DISABLE committed, and DROP SUBSCRIPTION
will wait until the workers have vacated the replication origin slots.
Previously, DROP SUBSCRIPTION killed the logical replication workers
immediately only if it was going to drop the replication slot, otherwise
it scheduled the worker killing for the end of the transaction, as a
result of 7e174fa793a2df89fe03d002a5087ef67abcdde8. This, however,
causes the present problem. To fix, kill the workers immediately in all
cases. This covers all cases: A subscription that doesn't have a
replication slot must be disabled. It was either disabled in the same
transaction, or it was already disabled before the current transaction,
but then there shouldn't be any workers left and this won't make a
difference.
Reported-by: Arseny Sher <a.sher@postgrespro.ru>
Discussion: https://www.postgresql.org/message-id/flat/87mv6av84w.fsf%40ars-thinkpad
2017-09-18 03:37:02 +02:00
|
|
|
* Stop all the subscription workers immediately.
|
|
|
|
*
|
|
|
|
* This is necessary if we are dropping the replication slot, so that the
|
|
|
|
* slot becomes accessible.
|
|
|
|
*
|
|
|
|
* It is also necessary if the subscription is disabled and was disabled
|
|
|
|
* in the same transaction. Then the workers haven't seen the disabling
|
|
|
|
* yet and will still be running, leading to hangs later when we want to
|
|
|
|
* drop the replication origin. If the subscription was disabled before
|
|
|
|
* this transaction, then there shouldn't be any workers left, so this
|
|
|
|
* won't make a difference.
|
2017-08-05 03:14:35 +02:00
|
|
|
*
|
|
|
|
* New workers won't be started because we hold an exclusive lock on the
|
|
|
|
* subscription till the end of the transaction.
|
|
|
|
*/
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
subworkers = logicalrep_workers_find(subid, false);
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
2017-08-14 23:29:33 +02:00
|
|
|
foreach(lc, subworkers)
|
2017-08-05 03:14:35 +02:00
|
|
|
{
|
|
|
|
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
|
2017-08-14 23:29:33 +02:00
|
|
|
|
Fix DROP SUBSCRIPTION hang
When ALTER SUBSCRIPTION DISABLE is run in the same transaction before
DROP SUBSCRIPTION, the latter will hang because workers will still be
running, not having seen the DISABLE committed, and DROP SUBSCRIPTION
will wait until the workers have vacated the replication origin slots.
Previously, DROP SUBSCRIPTION killed the logical replication workers
immediately only if it was going to drop the replication slot, otherwise
it scheduled the worker killing for the end of the transaction, as a
result of 7e174fa793a2df89fe03d002a5087ef67abcdde8. This, however,
causes the present problem. To fix, kill the workers immediately in all
cases. This covers all cases: A subscription that doesn't have a
replication slot must be disabled. It was either disabled in the same
transaction, or it was already disabled before the current transaction,
but then there shouldn't be any workers left and this won't make a
difference.
Reported-by: Arseny Sher <a.sher@postgrespro.ru>
Discussion: https://www.postgresql.org/message-id/flat/87mv6av84w.fsf%40ars-thinkpad
2017-09-18 03:37:02 +02:00
|
|
|
logicalrep_worker_stop(w->subid, w->relid);
|
2017-08-05 03:14:35 +02:00
|
|
|
}
|
|
|
|
list_free(subworkers);
|
|
|
|
|
2017-01-20 20:45:02 +01:00
|
|
|
/* Clean up dependencies */
|
|
|
|
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
/* Remove any associated relation synchronization states. */
|
|
|
|
RemoveSubscriptionRel(subid, InvalidOid);
|
|
|
|
|
2017-01-19 18:00:00 +01:00
|
|
|
/* Remove the origin tracking if exists. */
|
|
|
|
snprintf(originname, sizeof(originname), "pg_%u", subid);
|
|
|
|
originid = replorigin_by_name(originname, true);
|
|
|
|
if (originid != InvalidRepOriginId)
|
2017-08-08 22:07:46 +02:00
|
|
|
replorigin_drop(originid, false);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-05-17 22:31:56 +02:00
|
|
|
/*
|
|
|
|
* If there is no slot associated with the subscription, we can finish
|
|
|
|
* here.
|
|
|
|
*/
|
2017-05-09 16:20:42 +02:00
|
|
|
if (!slotname)
|
2017-01-19 18:00:00 +01:00
|
|
|
{
|
|
|
|
heap_close(rel, NoLock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Otherwise drop the replication slot at the publisher node using the
|
|
|
|
* replication connection.
|
2017-01-19 18:00:00 +01:00
|
|
|
*/
|
|
|
|
load_file("libpqwalreceiver", false);
|
|
|
|
|
|
|
|
initStringInfo(&cmd);
|
2017-09-01 13:44:14 +02:00
|
|
|
appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
wrconn = walrcv_connect(conninfo, true, subname, &err);
|
|
|
|
if (wrconn == NULL)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not connect to publisher when attempting to "
|
|
|
|
"drop the replication slot \"%s\"", slotname),
|
2017-05-09 16:20:42 +02:00
|
|
|
errdetail("The error was: %s", err),
|
2017-05-19 22:30:02 +02:00
|
|
|
errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
|
2017-05-09 16:20:42 +02:00
|
|
|
"to disassociate the subscription from the slot.")));
|
2017-01-19 18:00:00 +01:00
|
|
|
|
2017-03-08 15:43:38 +01:00
|
|
|
PG_TRY();
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
WalRcvExecResult *res;
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
|
|
|
|
|
|
|
|
if (res->status != WALRCV_OK_COMMAND)
|
2017-03-08 15:43:38 +01:00
|
|
|
ereport(ERROR,
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:35:54 +02:00
|
|
|
(errmsg("could not drop the replication slot \"%s\" on publisher",
|
|
|
|
slotname),
|
|
|
|
errdetail("The error was: %s", res->err)));
|
2017-03-08 15:43:38 +01:00
|
|
|
else
|
|
|
|
ereport(NOTICE,
|
|
|
|
(errmsg("dropped replication slot \"%s\" on publisher",
|
|
|
|
slotname)));
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
walrcv_clear_result(res);
|
2017-03-08 15:43:38 +01:00
|
|
|
}
|
|
|
|
PG_CATCH();
|
2017-02-21 19:36:02 +01:00
|
|
|
{
|
|
|
|
/* Close the connection in case of failure */
|
2017-02-21 21:44:07 +01:00
|
|
|
walrcv_disconnect(wrconn);
|
2017-03-08 15:43:38 +01:00
|
|
|
PG_RE_THROW();
|
2017-02-21 19:36:02 +01:00
|
|
|
}
|
2017-03-08 15:43:38 +01:00
|
|
|
PG_END_TRY();
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
walrcv_disconnect(wrconn);
|
|
|
|
|
|
|
|
pfree(cmd.data);
|
|
|
|
|
|
|
|
heap_close(rel, NoLock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Internal workhorse for changing a subscription owner
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
|
|
|
|
{
|
|
|
|
Form_pg_subscription form;
|
|
|
|
|
|
|
|
form = (Form_pg_subscription) GETSTRUCT(tup);
|
|
|
|
|
|
|
|
if (form->subowner == newOwnerId)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
|
2017-12-02 15:26:34 +01:00
|
|
|
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
|
2017-01-19 18:00:00 +01:00
|
|
|
NameStr(form->subname));
|
|
|
|
|
|
|
|
/* New owner must be a superuser */
|
|
|
|
if (!superuser_arg(newOwnerId))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:35:54 +02:00
|
|
|
errmsg("permission denied to change owner of subscription \"%s\"",
|
|
|
|
NameStr(form->subname)),
|
|
|
|
errhint("The owner of a subscription must be a superuser.")));
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
form->subowner = newOwnerId;
|
2017-01-31 22:42:24 +01:00
|
|
|
CatalogTupleUpdate(rel, &tup->t_self, tup);
|
2017-01-19 18:00:00 +01:00
|
|
|
|
|
|
|
/* Update owner dependency reference */
|
|
|
|
changeDependencyOnOwner(SubscriptionRelationId,
|
|
|
|
HeapTupleGetOid(tup),
|
|
|
|
newOwnerId);
|
|
|
|
|
|
|
|
InvokeObjectPostAlterHook(SubscriptionRelationId,
|
|
|
|
HeapTupleGetOid(tup), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Change subscription owner -- by name
|
|
|
|
*/
|
|
|
|
ObjectAddress
|
|
|
|
AlterSubscriptionOwner(const char *name, Oid newOwnerId)
|
|
|
|
{
|
|
|
|
Oid subid;
|
|
|
|
HeapTuple tup;
|
|
|
|
Relation rel;
|
|
|
|
ObjectAddress address;
|
|
|
|
|
|
|
|
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
|
|
|
|
|
|
|
|
tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
|
|
|
|
CStringGetDatum(name));
|
|
|
|
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
|
|
errmsg("subscription \"%s\" does not exist", name)));
|
|
|
|
|
|
|
|
subid = HeapTupleGetOid(tup);
|
|
|
|
|
|
|
|
AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
|
|
|
|
|
|
|
|
ObjectAddressSet(address, SubscriptionRelationId, subid);
|
|
|
|
|
|
|
|
heap_freetuple(tup);
|
|
|
|
|
|
|
|
heap_close(rel, RowExclusiveLock);
|
|
|
|
|
|
|
|
return address;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Change subscription owner -- by OID
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
|
|
|
|
{
|
|
|
|
HeapTuple tup;
|
|
|
|
Relation rel;
|
|
|
|
|
|
|
|
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
|
|
|
|
|
|
|
|
tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
|
|
|
|
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
|
|
errmsg("subscription with OID %u does not exist", subid)));
|
|
|
|
|
|
|
|
AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
|
|
|
|
|
|
|
|
heap_freetuple(tup);
|
|
|
|
|
|
|
|
heap_close(rel, RowExclusiveLock);
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Get the list of tables which belong to specified publications on the
|
|
|
|
* publisher connection.
|
|
|
|
*/
|
|
|
|
static List *
|
|
|
|
fetch_table_list(WalReceiverConn *wrconn, List *publications)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
WalRcvExecResult *res;
|
|
|
|
StringInfoData cmd;
|
|
|
|
TupleTableSlot *slot;
|
|
|
|
Oid tableRow[2] = {TEXTOID, TEXTOID};
|
|
|
|
ListCell *lc;
|
|
|
|
bool first;
|
|
|
|
List *tablelist = NIL;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
Assert(list_length(publications) > 0);
|
|
|
|
|
|
|
|
initStringInfo(&cmd);
|
2017-08-16 05:34:39 +02:00
|
|
|
appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
|
|
|
|
" FROM pg_catalog.pg_publication_tables t\n"
|
|
|
|
" WHERE t.pubname IN (");
|
2017-03-23 13:36:36 +01:00
|
|
|
first = true;
|
2017-05-17 22:31:56 +02:00
|
|
|
foreach(lc, publications)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
char *pubname = strVal(lfirst(lc));
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
if (first)
|
|
|
|
first = false;
|
|
|
|
else
|
|
|
|
appendStringInfoString(&cmd, ", ");
|
|
|
|
|
2017-08-16 05:34:39 +02:00
|
|
|
appendStringInfoString(&cmd, quote_literal_cstr(pubname));
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
2017-08-16 05:34:39 +02:00
|
|
|
appendStringInfoChar(&cmd, ')');
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
|
|
|
|
pfree(cmd.data);
|
|
|
|
|
|
|
|
if (res->status != WALRCV_OK_TUPLES)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not receive list of replicated tables from the publisher: %s",
|
|
|
|
res->err)));
|
|
|
|
|
|
|
|
/* Process tables. */
|
|
|
|
slot = MakeSingleTupleTableSlot(res->tupledesc);
|
|
|
|
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
|
|
|
{
|
|
|
|
char *nspname;
|
|
|
|
char *relname;
|
|
|
|
bool isnull;
|
|
|
|
RangeVar *rv;
|
|
|
|
|
|
|
|
nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
|
|
|
|
Assert(!isnull);
|
|
|
|
relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
|
|
|
|
Assert(!isnull);
|
|
|
|
|
|
|
|
rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
|
|
|
|
tablelist = lappend(tablelist, rv);
|
|
|
|
|
|
|
|
ExecClearTuple(slot);
|
|
|
|
}
|
|
|
|
ExecDropSingleTupleTableSlot(slot);
|
|
|
|
|
|
|
|
walrcv_clear_result(res);
|
|
|
|
|
|
|
|
return tablelist;
|
|
|
|
}
|