Add PublicationTable and PublicationRelInfo structs

These encapsulate a relation when referred from replication DDL.
Currently they don't do anything useful (they're just wrappers around
RangeVar and Relation respectively) but in the future they'll be used to
carry column lists.

Extracted from a larger patch by Rahila Syed.

Author: Rahila Syed <rahilasyed90@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2021-09-06 14:24:50 -03:00
parent 89dba59590
commit 0c6828fa98
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
9 changed files with 95 additions and 31 deletions

View File

@ -141,14 +141,14 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
* Insert new publication / relation mapping.
*/
ObjectAddress
publication_add_relation(Oid pubid, Relation targetrel,
publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
bool if_not_exists)
{
Relation rel;
HeapTuple tup;
Datum values[Natts_pg_publication_rel];
bool nulls[Natts_pg_publication_rel];
Oid relid = RelationGetRelid(targetrel);
Oid relid = RelationGetRelid(targetrel->relation);
Oid prrelid;
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
@ -172,10 +172,10 @@ publication_add_relation(Oid pubid, Relation targetrel,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("relation \"%s\" is already member of publication \"%s\"",
RelationGetRelationName(targetrel), pub->name)));
RelationGetRelationName(targetrel->relation), pub->name)));
}
check_publication_add_relation(targetrel);
check_publication_add_relation(targetrel->relation);
/* Form a tuple. */
memset(values, 0, sizeof(values));
@ -209,7 +209,7 @@ publication_add_relation(Oid pubid, Relation targetrel,
table_close(rel, RowExclusiveLock);
/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcache(targetrel);
CacheInvalidateRelcache(targetrel->relation);
return myself;
}

View File

@ -393,21 +393,28 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
foreach(newlc, rels)
{
Relation newrel = (Relation) lfirst(newlc);
PublicationRelInfo *newpubrel;
if (RelationGetRelid(newrel) == oldrelid)
newpubrel = (PublicationRelInfo *) lfirst(newlc);
if (RelationGetRelid(newpubrel->relation) == oldrelid)
{
found = true;
break;
}
}
/* Not yet in the list, open it and add to the list */
if (!found)
{
Relation oldrel = table_open(oldrelid,
ShareUpdateExclusiveLock);
Relation oldrel;
PublicationRelInfo *pubrel;
delrels = lappend(delrels, oldrel);
/* Wrap relation into PublicationRelInfo */
oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
pubrel = palloc(sizeof(PublicationRelInfo));
pubrel->relation = oldrel;
delrels = lappend(delrels, pubrel);
}
}
@ -498,9 +505,9 @@ RemovePublicationRelById(Oid proid)
}
/*
* Open relations specified by a RangeVar list.
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
* add them to a publication.
* Open relations specified by a PublicationTable list.
* In the returned list of PublicationRelInfo, tables are locked
* in ShareUpdateExclusiveLock mode in order to add them to a publication.
*/
static List *
OpenTableList(List *tables)
@ -514,15 +521,16 @@ OpenTableList(List *tables)
*/
foreach(lc, tables)
{
RangeVar *rv = lfirst_node(RangeVar, lc);
bool recurse = rv->inh;
PublicationTable *t = lfirst_node(PublicationTable, lc);
bool recurse = t->relation->inh;
Relation rel;
Oid myrelid;
PublicationRelInfo *pub_rel;
/* Allow query cancel in case this takes a long time */
CHECK_FOR_INTERRUPTS();
rel = table_openrv(rv, ShareUpdateExclusiveLock);
rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
myrelid = RelationGetRelid(rel);
/*
@ -538,7 +546,9 @@ OpenTableList(List *tables)
continue;
}
rels = lappend(rels, rel);
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, myrelid);
/*
@ -571,7 +581,9 @@ OpenTableList(List *tables)
/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
rels = lappend(rels, rel);
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, childrelid);
}
}
@ -592,9 +604,10 @@ CloseTableList(List *rels)
foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelInfo *pub_rel;
table_close(rel, NoLock);
pub_rel = (PublicationRelInfo *) lfirst(lc);
table_close(pub_rel->relation, NoLock);
}
}
@ -611,7 +624,8 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
Relation rel = pub_rel->relation;
ObjectAddress obj;
/* Must be owner of the table or superuser. */
@ -619,7 +633,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
RelationGetRelationName(rel));
obj = publication_add_relation(pubid, rel, if_not_exists);
obj = publication_add_relation(pubid, pub_rel, if_not_exists);
if (stmt)
{
EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
@ -643,7 +657,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
Relation rel = pubrel->relation;
Oid relid = RelationGetRelid(rel);
prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,

View File

@ -4939,6 +4939,15 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from)
return newnode;
}
static PublicationTable *
_copyPublicationTable(const PublicationTable *from)
{
PublicationTable *newnode = makeNode(PublicationTable);
COPY_NODE_FIELD(relation);
return newnode;
}
/*
* copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h
@ -5854,6 +5863,9 @@ copyObjectImpl(const void *from)
case T_PartitionCmd:
retval = _copyPartitionCmd(from);
break;
case T_PublicationTable:
retval = _copyPublicationTable(from);
break;
/*
* MISCELLANEOUS NODES

View File

@ -3114,6 +3114,14 @@ _equalValue(const Value *a, const Value *b)
return true;
}
static bool
_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
return true;
}
/*
* equal
* returns whether two nodes are equal
@ -3862,6 +3870,9 @@ equal(const void *a, const void *b)
case T_PartitionCmd:
retval = _equalPartitionCmd(a, b);
break;
case T_PublicationTable:
retval = _equalPublicationTable(a, b);
break;
default:
elog(ERROR, "unrecognized node type: %d",

View File

@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
transform_element_list transform_type_list
TriggerTransitions TriggerReferencing
vacuum_relation_list opt_vacuum_relation_list
drop_option_list
drop_option_list publication_table_list
%type <node> opt_routine_body
%type <groupclause> group_clause
%type <list> group_by_list
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
%type <node> opt_publication_for_tables publication_for_tables
%type <node> opt_publication_for_tables publication_for_tables publication_table
%type <list> opt_fdw_options fdw_options
%type <defelt> fdw_option
@ -9620,7 +9620,7 @@ opt_publication_for_tables:
;
publication_for_tables:
FOR TABLE relation_expr_list
FOR TABLE publication_table_list
{
$$ = (Node *) $3;
}
@ -9630,6 +9630,20 @@ publication_for_tables:
}
;
publication_table_list:
publication_table
{ $$ = list_make1($1); }
| publication_table_list ',' publication_table
{ $$ = lappend($1, $3); }
;
publication_table: relation_expr
{
PublicationTable *n = makeNode(PublicationTable);
n->relation = $1;
$$ = (Node *) n;
}
;
/*****************************************************************************
*
@ -9651,7 +9665,7 @@ AlterPublicationStmt:
n->options = $5;
$$ = (Node *)n;
}
| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
| ALTER PUBLICATION name ADD_P TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
@ -9659,7 +9673,7 @@ AlterPublicationStmt:
n->tableAction = DEFELEM_ADD;
$$ = (Node *)n;
}
| ALTER PUBLICATION name SET TABLE relation_expr_list
| ALTER PUBLICATION name SET TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
@ -9667,7 +9681,7 @@ AlterPublicationStmt:
n->tableAction = DEFELEM_SET;
$$ = (Node *)n;
}
| ALTER PUBLICATION name DROP TABLE relation_expr_list
| ALTER PUBLICATION name DROP TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;

View File

@ -83,6 +83,11 @@ typedef struct Publication
PublicationActions pubactions;
} Publication;
typedef struct PublicationRelInfo
{
Relation relation;
} PublicationRelInfo;
extern Publication *GetPublication(Oid pubid);
extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
extern List *GetRelationPublications(Oid relid);
@ -108,7 +113,7 @@ extern List *GetAllTablesPublications(void);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
extern bool is_publishable_relation(Relation rel);
extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
bool if_not_exists);
extern Oid get_publication_oid(const char *pubname, bool missing_ok);

View File

@ -490,6 +490,7 @@ typedef enum NodeTag
T_PartitionRangeDatum,
T_PartitionCmd,
T_VacuumRelation,
T_PublicationTable,
/*
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)

View File

@ -3624,6 +3624,11 @@ typedef struct AlterTSConfigurationStmt
bool missing_ok; /* for DROP - skip error if missing? */
} AlterTSConfigurationStmt;
typedef struct PublicationTable
{
NodeTag type;
RangeVar *relation; /* relation to be published */
} PublicationTable;
typedef struct CreatePublicationStmt
{

View File

@ -2047,6 +2047,7 @@ PublicationActions
PublicationInfo
PublicationPartOpt
PublicationRelInfo
PublicationTable
PullFilter
PullFilterOps
PushFilter