mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-10-04 04:46:52 +02:00
7854e07f25
This follows multiple complains from Peter Geoghegan, Andres Freund and Alvaro Herrera that this issue ought to be dug more before actually happening, if it happens. Discussion: https://postgr.es/m/20191226144606.GA5659@alvherre.pgsql
505 lines
13 KiB
C
505 lines
13 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* pg_publication.c
|
|
* publication C API manipulation
|
|
*
|
|
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* pg_publication.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/genam.h"
|
|
#include "access/heapam.h"
|
|
#include "access/htup_details.h"
|
|
#include "access/tableam.h"
|
|
#include "access/xact.h"
|
|
#include "catalog/catalog.h"
|
|
#include "catalog/dependency.h"
|
|
#include "catalog/index.h"
|
|
#include "catalog/indexing.h"
|
|
#include "catalog/namespace.h"
|
|
#include "catalog/objectaccess.h"
|
|
#include "catalog/objectaddress.h"
|
|
#include "catalog/pg_publication.h"
|
|
#include "catalog/pg_publication_rel.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "funcapi.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/array.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/catcache.h"
|
|
#include "utils/fmgroids.h"
|
|
#include "utils/inval.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/syscache.h"
|
|
|
|
/*
|
|
* Check if relation can be in given publication and throws appropriate
|
|
* error if not.
|
|
*/
|
|
static void
|
|
check_publication_add_relation(Relation targetrel)
|
|
{
|
|
/* Give more specific error for partitioned tables */
|
|
if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("\"%s\" is a partitioned table",
|
|
RelationGetRelationName(targetrel)),
|
|
errdetail("Adding partitioned tables to publications is not supported."),
|
|
errhint("You can add the table partitions individually.")));
|
|
|
|
/* Must be table */
|
|
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("\"%s\" is not a table",
|
|
RelationGetRelationName(targetrel)),
|
|
errdetail("Only tables can be added to publications.")));
|
|
|
|
/* Can't be system table */
|
|
if (IsCatalogRelation(targetrel))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("\"%s\" is a system table",
|
|
RelationGetRelationName(targetrel)),
|
|
errdetail("System tables cannot be added to publications.")));
|
|
|
|
/* UNLOGGED and TEMP relations cannot be part of publication. */
|
|
if (!RelationNeedsWAL(targetrel))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("table \"%s\" cannot be replicated",
|
|
RelationGetRelationName(targetrel)),
|
|
errdetail("Temporary and unlogged relations cannot be replicated.")));
|
|
}
|
|
|
|
/*
|
|
* Returns if relation represented by oid and Form_pg_class entry
|
|
* is publishable.
|
|
*
|
|
* Does same checks as the above, but does not need relation to be opened
|
|
* and also does not throw errors.
|
|
*
|
|
* XXX This also excludes all tables with relid < FirstNormalObjectId,
|
|
* ie all tables created during initdb. This mainly affects the preinstalled
|
|
* information_schema. IsCatalogRelationOid() only excludes tables with
|
|
* relid < FirstBootstrapObjectId, making that test rather redundant,
|
|
* but really we should get rid of the FirstNormalObjectId test not
|
|
* IsCatalogRelationOid. We can't do so today because we don't want
|
|
* information_schema tables to be considered publishable; but this test
|
|
* is really inadequate for that, since the information_schema could be
|
|
* dropped and reloaded and then it'll be considered publishable. The best
|
|
* long-term solution may be to add a "relispublishable" bool to pg_class,
|
|
* and depend on that instead of OID checks.
|
|
*/
|
|
static bool
|
|
is_publishable_class(Oid relid, Form_pg_class reltuple)
|
|
{
|
|
return reltuple->relkind == RELKIND_RELATION &&
|
|
!IsCatalogRelationOid(relid) &&
|
|
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
|
|
relid >= FirstNormalObjectId;
|
|
}
|
|
|
|
/*
|
|
* Another variant of this, taking a Relation.
|
|
*/
|
|
bool
|
|
is_publishable_relation(Relation rel)
|
|
{
|
|
return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
|
|
}
|
|
|
|
|
|
/*
|
|
* SQL-callable variant of the above
|
|
*
|
|
* This returns null when the relation does not exist. This is intended to be
|
|
* used for example in psql to avoid gratuitous errors when there are
|
|
* concurrent catalog changes.
|
|
*/
|
|
Datum
|
|
pg_relation_is_publishable(PG_FUNCTION_ARGS)
|
|
{
|
|
Oid relid = PG_GETARG_OID(0);
|
|
HeapTuple tuple;
|
|
bool result;
|
|
|
|
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
|
|
if (!HeapTupleIsValid(tuple))
|
|
PG_RETURN_NULL();
|
|
result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
|
|
ReleaseSysCache(tuple);
|
|
PG_RETURN_BOOL(result);
|
|
}
|
|
|
|
|
|
/*
|
|
* Insert new publication / relation mapping.
|
|
*/
|
|
ObjectAddress
|
|
publication_add_relation(Oid pubid, Relation targetrel,
|
|
bool if_not_exists)
|
|
{
|
|
Relation rel;
|
|
HeapTuple tup;
|
|
Datum values[Natts_pg_publication_rel];
|
|
bool nulls[Natts_pg_publication_rel];
|
|
Oid relid = RelationGetRelid(targetrel);
|
|
Oid prrelid;
|
|
Publication *pub = GetPublication(pubid);
|
|
ObjectAddress myself,
|
|
referenced;
|
|
|
|
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
|
|
|
|
/*
|
|
* Check for duplicates. Note that this does not really prevent
|
|
* duplicates, it's here just to provide nicer error message in common
|
|
* case. The real protection is the unique key on the catalog.
|
|
*/
|
|
if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
|
|
ObjectIdGetDatum(pubid)))
|
|
{
|
|
table_close(rel, RowExclusiveLock);
|
|
|
|
if (if_not_exists)
|
|
return InvalidObjectAddress;
|
|
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_DUPLICATE_OBJECT),
|
|
errmsg("relation \"%s\" is already member of publication \"%s\"",
|
|
RelationGetRelationName(targetrel), pub->name)));
|
|
}
|
|
|
|
check_publication_add_relation(targetrel);
|
|
|
|
/* Form a tuple. */
|
|
memset(values, 0, sizeof(values));
|
|
memset(nulls, false, sizeof(nulls));
|
|
|
|
prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
|
|
Anum_pg_publication_rel_oid);
|
|
values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
|
|
values[Anum_pg_publication_rel_prpubid - 1] =
|
|
ObjectIdGetDatum(pubid);
|
|
values[Anum_pg_publication_rel_prrelid - 1] =
|
|
ObjectIdGetDatum(relid);
|
|
|
|
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
|
|
|
|
/* Insert tuple into catalog. */
|
|
CatalogTupleInsert(rel, tup);
|
|
heap_freetuple(tup);
|
|
|
|
ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
|
|
|
|
/* Add dependency on the publication */
|
|
ObjectAddressSet(referenced, PublicationRelationId, pubid);
|
|
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
|
|
|
|
/* Add dependency on the relation */
|
|
ObjectAddressSet(referenced, RelationRelationId, relid);
|
|
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
|
|
|
|
/* Close the table. */
|
|
table_close(rel, RowExclusiveLock);
|
|
|
|
/* Invalidate relcache so that publication info is rebuilt. */
|
|
CacheInvalidateRelcache(targetrel);
|
|
|
|
return myself;
|
|
}
|
|
|
|
|
|
/*
|
|
* Gets list of publication oids for a relation oid.
|
|
*/
|
|
List *
|
|
GetRelationPublications(Oid relid)
|
|
{
|
|
List *result = NIL;
|
|
CatCList *pubrellist;
|
|
int i;
|
|
|
|
/* Find all publications associated with the relation. */
|
|
pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
|
|
ObjectIdGetDatum(relid));
|
|
for (i = 0; i < pubrellist->n_members; i++)
|
|
{
|
|
HeapTuple tup = &pubrellist->members[i]->tuple;
|
|
Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
|
|
|
|
result = lappend_oid(result, pubid);
|
|
}
|
|
|
|
ReleaseSysCacheList(pubrellist);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Gets list of relation oids for a publication.
|
|
*
|
|
* This should only be used for normal publications, the FOR ALL TABLES
|
|
* should use GetAllTablesPublicationRelations().
|
|
*/
|
|
List *
|
|
GetPublicationRelations(Oid pubid)
|
|
{
|
|
List *result;
|
|
Relation pubrelsrel;
|
|
ScanKeyData scankey;
|
|
SysScanDesc scan;
|
|
HeapTuple tup;
|
|
|
|
/* Find all publications associated with the relation. */
|
|
pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&scankey,
|
|
Anum_pg_publication_rel_prpubid,
|
|
BTEqualStrategyNumber, F_OIDEQ,
|
|
ObjectIdGetDatum(pubid));
|
|
|
|
scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
|
|
true, NULL, 1, &scankey);
|
|
|
|
result = NIL;
|
|
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
|
{
|
|
Form_pg_publication_rel pubrel;
|
|
|
|
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
|
|
|
|
result = lappend_oid(result, pubrel->prrelid);
|
|
}
|
|
|
|
systable_endscan(scan);
|
|
table_close(pubrelsrel, AccessShareLock);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Gets list of publication oids for publications marked as FOR ALL TABLES.
|
|
*/
|
|
List *
|
|
GetAllTablesPublications(void)
|
|
{
|
|
List *result;
|
|
Relation rel;
|
|
ScanKeyData scankey;
|
|
SysScanDesc scan;
|
|
HeapTuple tup;
|
|
|
|
/* Find all publications that are marked as for all tables. */
|
|
rel = table_open(PublicationRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&scankey,
|
|
Anum_pg_publication_puballtables,
|
|
BTEqualStrategyNumber, F_BOOLEQ,
|
|
BoolGetDatum(true));
|
|
|
|
scan = systable_beginscan(rel, InvalidOid, false,
|
|
NULL, 1, &scankey);
|
|
|
|
result = NIL;
|
|
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
|
{
|
|
Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
|
|
|
|
result = lappend_oid(result, oid);
|
|
}
|
|
|
|
systable_endscan(scan);
|
|
table_close(rel, AccessShareLock);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Gets list of all relation published by FOR ALL TABLES publication(s).
|
|
*/
|
|
List *
|
|
GetAllTablesPublicationRelations(void)
|
|
{
|
|
Relation classRel;
|
|
ScanKeyData key[1];
|
|
TableScanDesc scan;
|
|
HeapTuple tuple;
|
|
List *result = NIL;
|
|
|
|
classRel = table_open(RelationRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&key[0],
|
|
Anum_pg_class_relkind,
|
|
BTEqualStrategyNumber, F_CHAREQ,
|
|
CharGetDatum(RELKIND_RELATION));
|
|
|
|
scan = table_beginscan_catalog(classRel, 1, key);
|
|
|
|
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
|
|
{
|
|
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
|
|
Oid relid = relForm->oid;
|
|
|
|
if (is_publishable_class(relid, relForm))
|
|
result = lappend_oid(result, relid);
|
|
}
|
|
|
|
table_endscan(scan);
|
|
table_close(classRel, AccessShareLock);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Get publication using oid
|
|
*
|
|
* The Publication struct and its data are palloc'ed here.
|
|
*/
|
|
Publication *
|
|
GetPublication(Oid pubid)
|
|
{
|
|
HeapTuple tup;
|
|
Publication *pub;
|
|
Form_pg_publication pubform;
|
|
|
|
tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "cache lookup failed for publication %u", pubid);
|
|
|
|
pubform = (Form_pg_publication) GETSTRUCT(tup);
|
|
|
|
pub = (Publication *) palloc(sizeof(Publication));
|
|
pub->oid = pubid;
|
|
pub->name = pstrdup(NameStr(pubform->pubname));
|
|
pub->alltables = pubform->puballtables;
|
|
pub->pubactions.pubinsert = pubform->pubinsert;
|
|
pub->pubactions.pubupdate = pubform->pubupdate;
|
|
pub->pubactions.pubdelete = pubform->pubdelete;
|
|
pub->pubactions.pubtruncate = pubform->pubtruncate;
|
|
|
|
ReleaseSysCache(tup);
|
|
|
|
return pub;
|
|
}
|
|
|
|
|
|
/*
|
|
* Get Publication using name.
|
|
*/
|
|
Publication *
|
|
GetPublicationByName(const char *pubname, bool missing_ok)
|
|
{
|
|
Oid oid;
|
|
|
|
oid = get_publication_oid(pubname, missing_ok);
|
|
|
|
return OidIsValid(oid) ? GetPublication(oid) : NULL;
|
|
}
|
|
|
|
/*
|
|
* get_publication_oid - given a publication name, look up the OID
|
|
*
|
|
* If missing_ok is false, throw an error if name not found. If true, just
|
|
* return InvalidOid.
|
|
*/
|
|
Oid
|
|
get_publication_oid(const char *pubname, bool missing_ok)
|
|
{
|
|
Oid oid;
|
|
|
|
oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
|
|
CStringGetDatum(pubname));
|
|
if (!OidIsValid(oid) && !missing_ok)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
errmsg("publication \"%s\" does not exist", pubname)));
|
|
return oid;
|
|
}
|
|
|
|
/*
|
|
* get_publication_name - given a publication Oid, look up the name
|
|
*
|
|
* If missing_ok is false, throw an error if name not found. If true, just
|
|
* return NULL.
|
|
*/
|
|
char *
|
|
get_publication_name(Oid pubid, bool missing_ok)
|
|
{
|
|
HeapTuple tup;
|
|
char *pubname;
|
|
Form_pg_publication pubform;
|
|
|
|
tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
{
|
|
if (!missing_ok)
|
|
elog(ERROR, "cache lookup failed for publication %u", pubid);
|
|
return NULL;
|
|
}
|
|
|
|
pubform = (Form_pg_publication) GETSTRUCT(tup);
|
|
pubname = pstrdup(NameStr(pubform->pubname));
|
|
|
|
ReleaseSysCache(tup);
|
|
|
|
return pubname;
|
|
}
|
|
|
|
/*
|
|
* Returns Oids of tables in a publication.
|
|
*/
|
|
Datum
|
|
pg_get_publication_tables(PG_FUNCTION_ARGS)
|
|
{
|
|
FuncCallContext *funcctx;
|
|
char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
|
|
Publication *publication;
|
|
List *tables;
|
|
|
|
/* stuff done only on the first call of the function */
|
|
if (SRF_IS_FIRSTCALL())
|
|
{
|
|
MemoryContext oldcontext;
|
|
|
|
/* create a function context for cross-call persistence */
|
|
funcctx = SRF_FIRSTCALL_INIT();
|
|
|
|
/* switch to memory context appropriate for multiple function calls */
|
|
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
|
|
|
publication = GetPublicationByName(pubname, false);
|
|
if (publication->alltables)
|
|
tables = GetAllTablesPublicationRelations();
|
|
else
|
|
tables = GetPublicationRelations(publication->oid);
|
|
funcctx->user_fctx = (void *) tables;
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
/* stuff done on every call of the function */
|
|
funcctx = SRF_PERCALL_SETUP();
|
|
tables = (List *) funcctx->user_fctx;
|
|
|
|
if (funcctx->call_cntr < list_length(tables))
|
|
{
|
|
Oid relid = list_nth_oid(tables, funcctx->call_cntr);
|
|
|
|
SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
|
|
}
|
|
|
|
SRF_RETURN_DONE(funcctx);
|
|
}
|