/*------------------------------------------------------------------------- * * pg_publication.c * publication C API manipulation * * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/catalog/pg_publication.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" /* * Check if relation can be in given publication and throws appropriate * error if not. */ static void check_publication_add_relation(Relation targetrel) { /* Must be a regular or partitioned table, or a sequence */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE && RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind))); /* Can't be system table */ if (IsCatalogRelation(targetrel)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for system tables."))); /* UNLOGGED and TEMP relations cannot be part of publication. */ if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for temporary tables."))); else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); } /* * Check if schema can be in given publication and throw appropriate error if * not. */ static void check_publication_add_schema(Oid schemaid) { /* Can't be system namespace */ if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add schema \"%s\" to publication", get_namespace_name(schemaid)), errdetail("This operation is not supported for system schemas."))); /* Can't be temporary namespace */ if (isAnyTempNamespace(schemaid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add schema \"%s\" to publication", get_namespace_name(schemaid)), errdetail("Temporary schemas cannot be replicated."))); } /* * Returns if relation represented by oid and Form_pg_class entry * is publishable. * * Does same checks as the above, but does not need relation to be opened * and also does not throw errors. * * XXX This also excludes all tables with relid < FirstNormalObjectId, * ie all tables created during initdb. This mainly affects the preinstalled * information_schema. IsCatalogRelationOid() only excludes tables with * relid < FirstUnpinnedObjectId, making that test rather redundant, * but really we should get rid of the FirstNormalObjectId test not * IsCatalogRelationOid. We can't do so today because we don't want * information_schema tables to be considered publishable; but this test * is really inadequate for that, since the information_schema could be * dropped and reloaded and then it'll be considered publishable. The best * long-term solution may be to add a "relispublishable" bool to pg_class, * and depend on that instead of OID checks. */ static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { return (reltuple->relkind == RELKIND_RELATION || reltuple->relkind == RELKIND_PARTITIONED_TABLE || reltuple->relkind == RELKIND_SEQUENCE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; } /* * Filter out the partitions whose parent tables were also specified in * the publication. */ static List * filter_partitions(List *relids) { List *result = NIL; ListCell *lc; ListCell *lc2; foreach(lc, relids) { bool skip = false; List *ancestors = NIL; Oid relid = lfirst_oid(lc); if (get_rel_relispartition(relid)) ancestors = get_partition_ancestors(relid); foreach(lc2, ancestors) { Oid ancestor = lfirst_oid(lc2); /* Check if the parent table exists in the published table list. */ if (list_member_oid(relids, ancestor)) { skip = true; break; } } if (!skip) result = lappend_oid(result, relid); } return result; } /* * Check the character is a valid object type for schema publication. * * This recognizes either 't' for tables or 's' for sequences. Places that * need to handle 'u' for unsupported relkinds need to do that explicitlyl */ static void AssertObjectTypeValid(char objectType) { #ifdef USE_ASSERT_CHECKING Assert(objectType == PUB_OBJTYPE_SEQUENCE || objectType == PUB_OBJTYPE_TABLE); #endif } /* * Determine object type matching a given a relkind value. */ char pub_get_object_type_for_relkind(char relkind) { /* sequence maps directly to sequence relkind */ if (relkind == RELKIND_SEQUENCE) return PUB_OBJTYPE_SEQUENCE; /* for table, we match either regular or partitioned table */ if (relkind == RELKIND_RELATION || relkind == RELKIND_PARTITIONED_TABLE) return PUB_OBJTYPE_TABLE; return PUB_OBJTYPE_UNSUPPORTED; } /* * Determine if publication object type matches the relkind. * * Returns true if the relation matches object type replicated by this schema, * false otherwise. */ static bool pub_object_type_matches_relkind(char objectType, char relkind) { AssertObjectTypeValid(objectType); return (pub_get_object_type_for_relkind(relkind) == objectType); } /* * Another variant of this, taking a Relation. */ bool is_publishable_relation(Relation rel) { return is_publishable_class(RelationGetRelid(rel), rel->rd_rel); } /* * Returns true if any schema is associated with the publication, false if no * schema is associated with the publication. */ bool is_schema_publication(Oid pubid) { Relation pubschsrel; ScanKeyData scankey; SysScanDesc scan; HeapTuple tup; bool result = false; pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); ScanKeyInit(&scankey, Anum_pg_publication_namespace_pnpubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pubid)); scan = systable_beginscan(pubschsrel, PublicationNamespacePnnspidPnpubidPntypeIndexId, true, NULL, 1, &scankey); tup = systable_getnext(scan); result = HeapTupleIsValid(tup); systable_endscan(scan); table_close(pubschsrel, AccessShareLock); return result; } /* * SQL-callable variant of the above * * This returns null when the relation does not exist. This is intended to be * used for example in psql to avoid gratuitous errors when there are * concurrent catalog changes. */ Datum pg_relation_is_publishable(PG_FUNCTION_ARGS) { Oid relid = PG_GETARG_OID(0); HeapTuple tuple; bool result; tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); if (!HeapTupleIsValid(tuple)) PG_RETURN_NULL(); result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)); ReleaseSysCache(tuple); PG_RETURN_BOOL(result); } /* * Gets the relations based on the publication partition option for a specified * relation. */ List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid) { if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE && pub_partopt != PUBLICATION_PART_ROOT) { List *all_parts = find_all_inheritors(relid, NoLock, NULL); if (pub_partopt == PUBLICATION_PART_ALL) result = list_concat(result, all_parts); else if (pub_partopt == PUBLICATION_PART_LEAF) { ListCell *lc; foreach(lc, all_parts) { Oid partOid = lfirst_oid(lc); if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) result = lappend_oid(result, partOid); } } else Assert(false); } else result = lappend_oid(result, relid); return result; } /* * Returns the relid of the topmost ancestor that is published via this * publication if any and set its ancestor level to ancestor_level, * otherwise returns InvalidOid. * * The ancestor_level value allows us to compare the results for multiple * publications, and decide which value is higher up. * * Note that the list of ancestors should be ordered such that the topmost * ancestor is at the end of the list. */ Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level) { ListCell *lc; Oid topmost_relid = InvalidOid; int level = 0; /* * Find the "topmost" ancestor that is in this publication. */ foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); List *apubids = GetRelationPublications(ancestor); List *aschemaPubids = NIL; level++; if (list_member_oid(apubids, puboid)) { topmost_relid = ancestor; if (ancestor_level) *ancestor_level = level; } else { /* we only search for ancestors of tables, so PUB_OBJTYPE_TABLE */ aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor), PUB_OBJTYPE_TABLE); if (list_member_oid(aschemaPubids, puboid)) { topmost_relid = ancestor; if (ancestor_level) *ancestor_level = level; } } list_free(apubids); list_free(aschemaPubids); } return topmost_relid; } /* * Insert new publication / relation mapping. */ ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; Relation targetrel = pri->relation; Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; List *relids = NIL; rel = table_open(PublicationRelRelationId, RowExclusiveLock); /* * Check for duplicates. Note that this does not really prevent * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. */ if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid))) { table_close(rel, RowExclusiveLock); if (if_not_exists) return InvalidObjectAddress; ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", RelationGetRelationName(targetrel), pub->name))); } check_publication_add_relation(targetrel); /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, Anum_pg_publication_rel_oid); values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid); values[Anum_pg_publication_rel_prpubid - 1] = ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); /* Add qualifications, if available */ if (pri->whereClause != NULL) values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause)); else nulls[Anum_pg_publication_rel_prqual - 1] = true; tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ CatalogTupleInsert(rel, tup); heap_freetuple(tup); /* Register dependencies as needed */ ObjectAddressSet(myself, PublicationRelRelationId, pubreloid); /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); /* Add dependency on the relation */ ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); /* Add dependency on the objects mentioned in the qualifications */ if (pri->whereClause) recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid, DEPENDENCY_NORMAL, DEPENDENCY_NORMAL, false); /* Close the table. */ table_close(rel, RowExclusiveLock); /* * Invalidate relcache so that publication info is rebuilt. * * For the partitioned tables, we must invalidate all partitions contained * in the respective partition hierarchies, not just the one explicitly * mentioned in the publication. This is required because we implicitly * publish the child tables when the parent table is published. */ relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL, relid); InvalidatePublicationRels(relids); return myself; } /* * Insert new publication / schema mapping. */ ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, char objectType, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_namespace]; bool nulls[Natts_pg_publication_namespace]; Oid psschid; Publication *pub = GetPublication(pubid); List *schemaRels = NIL; ObjectAddress myself, referenced; AssertObjectTypeValid(objectType); rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); /* * Check for duplicates. Note that this does not really prevent * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. */ if (SearchSysCacheExists3(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid), ObjectIdGetDatum(pubid), CharGetDatum(objectType))) { table_close(rel, RowExclusiveLock); if (if_not_exists) return InvalidObjectAddress; ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("schema \"%s\" is already member of publication \"%s\"", get_namespace_name(schemaid), pub->name))); } check_publication_add_schema(schemaid); /* Form a tuple */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId, Anum_pg_publication_namespace_oid); values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid); values[Anum_pg_publication_namespace_pnpubid - 1] = ObjectIdGetDatum(pubid); values[Anum_pg_publication_namespace_pnnspid - 1] = ObjectIdGetDatum(schemaid); values[Anum_pg_publication_namespace_pntype - 1] = CharGetDatum(objectType); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog */ CatalogTupleInsert(rel, tup); heap_freetuple(tup); ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid); /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); /* Add dependency on the schema */ ObjectAddressSet(referenced, NamespaceRelationId, schemaid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); /* Close the table */ table_close(rel, RowExclusiveLock); /* * Invalidate relcache so that publication info is rebuilt. See * publication_add_relation for why we need to consider all the * partitions. */ schemaRels = GetSchemaPublicationRelations(schemaid, objectType, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); return myself; } /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) { List *result = NIL; CatCList *pubrellist; int i; /* Find all publications associated with the relation. */ pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, ObjectIdGetDatum(relid)); for (i = 0; i < pubrellist->n_members; i++) { HeapTuple tup = &pubrellist->members[i]->tuple; Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; result = lappend_oid(result, pubid); } ReleaseSysCacheList(pubrellist); return result; } /* * Gets list of relation oids for a publication. * * This should only be used FOR TABLE / FOR SEQUENCE publications, the FOR * ALL TABLES / SEQUENCES should use GetAllTablesPublicationRelations() * and GetAllSequencesPublicationRelations(). * * XXX pub_partopt only matters for tables, not sequences. */ List * GetPublicationRelations(Oid pubid, char objectType, PublicationPartOpt pub_partopt) { List *result; Relation pubrelsrel; ScanKeyData scankey; SysScanDesc scan; HeapTuple tup; AssertObjectTypeValid(objectType); /* Find all publications associated with the relation. */ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); ScanKeyInit(&scankey, Anum_pg_publication_rel_prpubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pubid)); scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId, true, NULL, 1, &scankey); result = NIL; while (HeapTupleIsValid(tup = systable_getnext(scan))) { char relkind; Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); relkind = get_rel_relkind(pubrel->prrelid); /* * If the relkind does not match the requested object type, ignore the * relation. For example we might be interested only in sequences, so * we ignore tables. */ if (!pub_object_type_matches_relkind(objectType, relkind)) continue; /* * We don't have partitioned sequences, so just add them to the list. * Otherwise consider adding all child relations, if requested. */ if (relkind == RELKIND_SEQUENCE) result = lappend_oid(result, pubrel->prrelid); else result = GetPubPartitionOptionRelations(result, pub_partopt, pubrel->prrelid); } systable_endscan(scan); table_close(pubrelsrel, AccessShareLock); /* Now sort and de-duplicate the result list */ list_sort(result, list_oid_cmp); list_deduplicate_oid(result); return result; } /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ List * GetAllTablesPublications(void) { List *result; Relation rel; ScanKeyData scankey; SysScanDesc scan; HeapTuple tup; /* Find all publications that are marked as for all tables. */ rel = table_open(PublicationRelationId, AccessShareLock); ScanKeyInit(&scankey, Anum_pg_publication_puballtables, BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, &scankey); result = NIL; while (HeapTupleIsValid(tup = systable_getnext(scan))) { Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid; result = lappend_oid(result, oid); } systable_endscan(scan); table_close(rel, AccessShareLock); return result; } /* * Gets list of publication oids for publications marked as FOR ALL SEQUENCES. */ List * GetAllSequencesPublications(void) { List *result; Relation rel; ScanKeyData scankey; SysScanDesc scan; HeapTuple tup; /* Find all publications that are marked as for all sequences. */ rel = table_open(PublicationRelationId, AccessShareLock); ScanKeyInit(&scankey, Anum_pg_publication_puballsequences, BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, &scankey); result = NIL; while (HeapTupleIsValid(tup = systable_getnext(scan))) { Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid; result = lappend_oid(result, oid); } systable_endscan(scan); table_close(rel, AccessShareLock); return result; } /* * Gets list of all relation published by FOR ALL TABLES publication(s). * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the * root partitioned tables. */ List * GetAllTablesPublicationRelations(bool pubviaroot) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; classRel = table_open(RelationRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_class_relkind, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(RELKIND_RELATION)); scan = table_beginscan_catalog(classRel, 1, key); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); } table_endscan(scan); if (pubviaroot) { ScanKeyInit(&key[0], Anum_pg_class_relkind, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(RELKIND_PARTITIONED_TABLE)); scan = table_beginscan_catalog(classRel, 1, key); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && !relForm->relispartition) result = lappend_oid(result, relid); } table_endscan(scan); } table_close(classRel, AccessShareLock); return result; } /* * Gets the list of schema oids for a publication. * * This should only be used FOR ALL TABLES IN SCHEMA and FOR ALL SEQUENCES * publications. * * 'objectType' determines whether to get FOR TABLE or FOR SEQUENCES schemas */ List * GetPublicationSchemas(Oid pubid, char objectType) { List *result = NIL; Relation pubschsrel; ScanKeyData scankey[2]; SysScanDesc scan; HeapTuple tup; AssertObjectTypeValid(objectType); /* Find all schemas associated with the publication */ pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); ScanKeyInit(&scankey[0], Anum_pg_publication_namespace_pnpubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pubid)); ScanKeyInit(&scankey[1], Anum_pg_publication_namespace_pntype, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(objectType)); scan = systable_beginscan(pubschsrel, PublicationNamespacePnnspidPnpubidPntypeIndexId, true, NULL, 2, scankey); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_publication_namespace pubsch; pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); result = lappend_oid(result, pubsch->pnnspid); } systable_endscan(scan); table_close(pubschsrel, AccessShareLock); return result; } /* * Gets the list of publication oids associated with a specified schema. * * objectType specifies whether we're looking for schemas including tables or * sequences. * * Note: relcache calls this for all object types, not just tables and sequences. * Which is why we handle the PUB_OBJTYPE_UNSUPPORTED object type too. */ List * GetSchemaPublications(Oid schemaid, char objectType) { List *result = NIL; CatCList *pubschlist; int i; /* unsupported object type */ if (objectType == PUB_OBJTYPE_UNSUPPORTED) return result; AssertObjectTypeValid(objectType); /* Find all publications associated with the schema */ pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, ObjectIdGetDatum(schemaid)); for (i = 0; i < pubschlist->n_members; i++) { HeapTuple tup = &pubschlist->members[i]->tuple; Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; char pntype = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pntype; /* Skip schemas publishing a different object type. */ if (pntype != objectType) continue; result = lappend_oid(result, pubid); } ReleaseSysCacheList(pubschlist); return result; } /* * Get the list of publishable relation oids for a specified schema. * * objectType specifies whether this is FOR ALL TABLES IN SCHEMA or FOR ALL * SEQUENCES IN SCHEMA */ List * GetSchemaPublicationRelations(Oid schemaid, char objectType, PublicationPartOpt pub_partopt) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; Assert(OidIsValid(schemaid)); AssertObjectTypeValid(objectType); classRel = table_open(RelationRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_class_relnamespace, BTEqualStrategyNumber, F_OIDEQ, schemaid); /* get all the relations present in the specified schema */ scan = table_beginscan_catalog(classRel, 1, key); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; char relkind; if (!is_publishable_class(relid, relForm)) continue; relkind = get_rel_relkind(relid); /* Skip if the relkind does not match FOR ALL TABLES / SEQUENCES. */ if (!pub_object_type_matches_relkind(objectType, relkind)) continue; /* * If the object is a partitioned table, lookup all the child relations * (if requested). Otherwise just add the object to the list. */ if (relkind == RELKIND_PARTITIONED_TABLE) { List *partitionrels = NIL; /* * It is quite possible that some of the partitions are in a * different schema than the parent table, so we need to get such * partitions separately. */ partitionrels = GetPubPartitionOptionRelations(partitionrels, pub_partopt, relForm->oid); result = list_concat_unique_oid(result, partitionrels); continue; } /* non-partitioned tables and sequences */ result = lappend_oid(result, relid); } table_endscan(scan); table_close(classRel, AccessShareLock); return result; } /* * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA * or FOR ALL SEQUENCES IN SCHEMA publication. */ List * GetAllSchemaPublicationRelations(Oid pubid, char objectType, PublicationPartOpt pub_partopt) { List *result = NIL; List *pubschemalist = GetPublicationSchemas(pubid, objectType); ListCell *cell; AssertObjectTypeValid(objectType); foreach(cell, pubschemalist) { Oid schemaid = lfirst_oid(cell); List *schemaRels = NIL; schemaRels = GetSchemaPublicationRelations(schemaid, objectType, pub_partopt); result = list_concat(result, schemaRels); } return result; } /* * Gets list of all relation published by FOR ALL SEQUENCES publication(s). */ List * GetAllSequencesPublicationRelations(void) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; classRel = table_open(RelationRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_class_relkind, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(RELKIND_SEQUENCE)); scan = table_beginscan_catalog(classRel, 1, key); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; if (is_publishable_class(relid, relForm)) result = lappend_oid(result, relid); } table_endscan(scan); table_close(classRel, AccessShareLock); return result; } /* * Get publication using oid * * The Publication struct and its data are palloc'ed here. */ Publication * GetPublication(Oid pubid) { HeapTuple tup; Publication *pub; Form_pg_publication pubform; tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for publication %u", pubid); pubform = (Form_pg_publication) GETSTRUCT(tup); pub = (Publication *) palloc(sizeof(Publication)); pub->oid = pubid; pub->name = pstrdup(NameStr(pubform->pubname)); pub->alltables = pubform->puballtables; pub->allsequences = pubform->puballsequences; pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubactions.pubsequence = pubform->pubsequence; pub->pubviaroot = pubform->pubviaroot; ReleaseSysCache(tup); return pub; } /* * Get Publication using name. */ Publication * GetPublicationByName(const char *pubname, bool missing_ok) { Oid oid; oid = get_publication_oid(pubname, missing_ok); return OidIsValid(oid) ? GetPublication(oid) : NULL; } /* * get_publication_oid - given a publication name, look up the OID * * If missing_ok is false, throw an error if name not found. If true, just * return InvalidOid. */ Oid get_publication_oid(const char *pubname, bool missing_ok) { Oid oid; oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid, CStringGetDatum(pubname)); if (!OidIsValid(oid) && !missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("publication \"%s\" does not exist", pubname))); return oid; } /* * get_publication_name - given a publication Oid, look up the name * * If missing_ok is false, throw an error if name not found. If true, just * return NULL. */ char * get_publication_name(Oid pubid, bool missing_ok) { HeapTuple tup; char *pubname; Form_pg_publication pubform; tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); if (!HeapTupleIsValid(tup)) { if (!missing_ok) elog(ERROR, "cache lookup failed for publication %u", pubid); return NULL; } pubform = (Form_pg_publication) GETSTRUCT(tup); pubname = pstrdup(NameStr(pubform->pubname)); ReleaseSysCache(tup); return pubname; } /* * Returns Oids of tables in a publication. */ Datum pg_get_publication_tables(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); Publication *publication; List *tables; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { MemoryContext oldcontext; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); /* switch to memory context appropriate for multiple function calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); publication = GetPublicationByName(pubname, false); /* * Publications support partitioned tables, although all changes are * replicated using leaf partition identity and schema, so we only * need those. */ if (publication->alltables) { tables = GetAllTablesPublicationRelations(publication->pubviaroot); } else { List *relids, *schemarelids; relids = GetPublicationRelations(publication->oid, PUB_OBJTYPE_TABLE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); schemarelids = GetAllSchemaPublicationRelations(publication->oid, PUB_OBJTYPE_TABLE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); tables = list_concat_unique_oid(relids, schemarelids); /* * If the publication publishes partition changes via their * respective root partitioned tables, we must exclude partitions * in favor of including the root partitioned tables. Otherwise, * the function could return both the child and parent tables * which could cause data of the child table to be * double-published on the subscriber side. */ if (publication->pubviaroot) tables = filter_partitions(tables); } funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); tables = (List *) funcctx->user_fctx; if (funcctx->call_cntr < list_length(tables)) { Oid relid = list_nth_oid(tables, funcctx->call_cntr); SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); } SRF_RETURN_DONE(funcctx); } /* * Returns Oids of sequences in a publication. */ Datum pg_get_publication_sequences(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); Publication *publication; List *sequences; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { MemoryContext oldcontext; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); /* switch to memory context appropriate for multiple function calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); publication = GetPublicationByName(pubname, false); /* * Publications support partitioned tables, although all changes are * replicated using leaf partition identity and schema, so we only * need those. */ if (publication->allsequences) sequences = GetAllSequencesPublicationRelations(); else { List *relids, *schemarelids; relids = GetPublicationRelations(publication->oid, PUB_OBJTYPE_SEQUENCE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); schemarelids = GetAllSchemaPublicationRelations(publication->oid, PUB_OBJTYPE_SEQUENCE, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); sequences = list_concat_unique_oid(relids, schemarelids); } funcctx->user_fctx = (void *) sequences; MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); sequences = (List *) funcctx->user_fctx; if (funcctx->call_cntr < list_length(sequences)) { Oid relid = list_nth_oid(sequences, funcctx->call_cntr); SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); } SRF_RETURN_DONE(funcctx); }