/*------------------------------------------------------------------------- * relation.c * PostgreSQL logical replication * * Copyright (c) 2016-2018, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/relation.c * * NOTES * This file contains helper functions for logical replication relation * mapping cache. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/heapam.h" #include "access/sysattr.h" #include "catalog/namespace.h" #include "catalog/pg_subscription_rel.h" #include "executor/executor.h" #include "nodes/makefuncs.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" static MemoryContext LogicalRepRelMapContext = NULL; static HTAB *LogicalRepRelMap = NULL; static HTAB *LogicalRepTypMap = NULL; /* * Relcache invalidation callback for our relation map cache. */ static void logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) { LogicalRepRelMapEntry *entry; /* Just to be sure. */ if (LogicalRepRelMap == NULL) return; if (reloid != InvalidOid) { HASH_SEQ_STATUS status; hash_seq_init(&status, LogicalRepRelMap); /* TODO, use inverse lookup hashtable? */ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) { if (entry->localreloid == reloid) { entry->localreloid = InvalidOid; hash_seq_term(&status); break; } } } else { /* invalidate all cache entries */ HASH_SEQ_STATUS status; hash_seq_init(&status, LogicalRepRelMap); while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) entry->localreloid = InvalidOid; } } /* * Initialize the relation map cache. */ static void logicalrep_relmap_init(void) { HASHCTL ctl; if (!LogicalRepRelMapContext) LogicalRepRelMapContext = AllocSetContextCreate(CacheMemoryContext, "LogicalRepRelMapContext", ALLOCSET_DEFAULT_SIZES); /* Initialize the relation hash table. */ MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(LogicalRepRelId); ctl.entrysize = sizeof(LogicalRepRelMapEntry); ctl.hcxt = LogicalRepRelMapContext; LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); /* Initialize the type hash table. */ MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(Oid); ctl.entrysize = sizeof(LogicalRepTyp); ctl.hcxt = LogicalRepRelMapContext; /* This will usually be small. */ LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, (Datum) 0); } /* * Free the entry of a relation map cache. */ static void logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) { LogicalRepRelation *remoterel; remoterel = &entry->remoterel; pfree(remoterel->nspname); pfree(remoterel->relname); if (remoterel->natts > 0) { int i; for (i = 0; i < remoterel->natts; i++) pfree(remoterel->attnames[i]); pfree(remoterel->attnames); pfree(remoterel->atttyps); } bms_free(remoterel->attkeys); if (entry->attrmap) pfree(entry->attrmap); } /* * Add new entry or update existing entry in the relation map cache. * * Called when new relation mapping is sent by the publisher to update * our expected view of incoming data from said publisher. */ void logicalrep_relmap_update(LogicalRepRelation *remoterel) { MemoryContext oldctx; LogicalRepRelMapEntry *entry; bool found; int i; if (LogicalRepRelMap == NULL) logicalrep_relmap_init(); /* * HASH_ENTER returns the existing entry if present or creates a new one. */ entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid, HASH_ENTER, &found); if (found) logicalrep_relmap_free_entry(entry); memset(entry, 0, sizeof(LogicalRepRelMapEntry)); /* Make cached copy of the data */ oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); entry->remoterel.remoteid = remoterel->remoteid; entry->remoterel.nspname = pstrdup(remoterel->nspname); entry->remoterel.relname = pstrdup(remoterel->relname); entry->remoterel.natts = remoterel->natts; entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); for (i = 0; i < remoterel->natts; i++) { entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); entry->remoterel.atttyps[i] = remoterel->atttyps[i]; } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); MemoryContextSwitchTo(oldctx); } /* * Find attribute index in TupleDesc struct by attribute name. * * Returns -1 if not found. */ static int logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) { int i; for (i = 0; i < remoterel->natts; i++) { if (strcmp(remoterel->attnames[i], attname) == 0) return i; } return -1; } /* * Open the local relation associated with the remote one. * * Optionally rebuilds the Relcache mapping if it was invalidated * by local DDL. */ LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) { LogicalRepRelMapEntry *entry; bool found; if (LogicalRepRelMap == NULL) logicalrep_relmap_init(); /* Search for existing entry. */ entry = hash_search(LogicalRepRelMap, (void *) &remoteid, HASH_FIND, &found); if (!found) elog(ERROR, "no relation map entry for remote relation ID %u", remoteid); /* Need to update the local cache? */ if (!OidIsValid(entry->localreloid)) { Oid relid; int i; int found; Bitmapset *idkey; TupleDesc desc; LogicalRepRelation *remoterel; MemoryContext oldctx; remoterel = &entry->remoterel; /* Try to find and lock the relation by name. */ relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, remoterel->relname, -1), lockmode, true); if (!OidIsValid(relid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication target relation \"%s.%s\" does not exist", remoterel->nspname, remoterel->relname))); entry->localrel = heap_open(relid, NoLock); /* Check for supported relkind. */ CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, remoterel->nspname, remoterel->relname); /* * Build the mapping of local attribute numbers to remote attribute * numbers and validate that we don't miss any replicated columns as * that would result in potentially unwanted data loss. */ desc = RelationGetDescr(entry->localrel); oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); entry->attrmap = palloc(desc->natts * sizeof(int)); MemoryContextSwitchTo(oldctx); found = 0; for (i = 0; i < desc->natts; i++) { int attnum; Form_pg_attribute attr = TupleDescAttr(desc, i); if (attr->attisdropped) { entry->attrmap[i] = -1; continue; } attnum = logicalrep_rel_att_by_name(remoterel, NameStr(attr->attname)); entry->attrmap[i] = attnum; if (attnum >= 0) found++; } /* TODO, detail message with names of missing columns */ if (found < remoterel->natts) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication target relation \"%s.%s\" is missing " "some replicated columns", remoterel->nspname, remoterel->relname))); /* * Check that replica identity matches. We allow for stricter replica * identity (fewer columns) on subscriber as that will not stop us * from finding unique tuple. IE, if publisher has identity * (id,timestamp) and subscriber just (id) this will not be a problem, * but in the opposite scenario it will. * * Don't throw any error here just mark the relation entry as not * updatable, as replica identity is only for updates and deletes but * inserts can be replicated even without it. */ entry->updatable = true; idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_IDENTITY_KEY); /* fallback to PK if no replica identity */ if (idkey == NULL) { idkey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_PRIMARY_KEY); /* * If no replica identity index and no PK, the published table * must have replica identity FULL. */ if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) entry->updatable = false; } i = -1; while ((i = bms_next_member(idkey, i)) >= 0) { int attnum = i + FirstLowInvalidHeapAttributeNumber; if (!AttrNumberIsForUserDefinedAttr(attnum)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication target relation \"%s.%s\" uses " "system columns in REPLICA IDENTITY index", remoterel->nspname, remoterel->relname))); attnum = AttrNumberGetAttrOffset(attnum); if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys)) { entry->updatable = false; break; } } entry->localreloid = relid; } else entry->localrel = heap_open(entry->localreloid, lockmode); if (entry->state != SUBREL_STATE_READY) entry->state = GetSubscriptionRelState(MySubscription->oid, entry->localreloid, &entry->statelsn, true); return entry; } /* * Close the previously opened logical relation. */ void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) { heap_close(rel->localrel, lockmode); rel->localrel = NULL; } /* * Free the type map cache entry data. */ static void logicalrep_typmap_free_entry(LogicalRepTyp *entry) { pfree(entry->nspname); pfree(entry->typname); } /* * Add new entry or update existing entry in the type map cache. */ void logicalrep_typmap_update(LogicalRepTyp *remotetyp) { MemoryContext oldctx; LogicalRepTyp *entry; bool found; if (LogicalRepTypMap == NULL) logicalrep_relmap_init(); /* * HASH_ENTER returns the existing entry if present or creates a new one. */ entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid, HASH_ENTER, &found); if (found) logicalrep_typmap_free_entry(entry); /* Make cached copy of the data */ entry->remoteid = remotetyp->remoteid; oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); entry->nspname = pstrdup(remotetyp->nspname); entry->typname = pstrdup(remotetyp->typname); MemoryContextSwitchTo(oldctx); } /* * Fetch type name from the cache by remote type OID. * * Return a substitute value if we cannot find the data type; no message is * sent to the log in that case, because this is used by error callback * already. */ char * logicalrep_typmap_gettypname(Oid remoteid) { LogicalRepTyp *entry; bool found; /* Internal types are mapped directly. */ if (remoteid < FirstNormalObjectId) { if (!get_typisdefined(remoteid)) { /* * This can be caused by having a publisher with a higher * PostgreSQL major version than the subscriber. */ return psprintf("unrecognized %u", remoteid); } return format_type_be(remoteid); } if (LogicalRepTypMap == NULL) { /* * If the typemap is not initialized yet, we cannot possibly attempt * to search the hash table; but there's no way we know the type * locally yet, since we haven't received a message about this type, * so this is the best we can do. */ return psprintf("unrecognized %u", remoteid); } /* search the mapping */ entry = hash_search(LogicalRepTypMap, (void *) &remoteid, HASH_FIND, &found); if (!found) return psprintf("unrecognized %u", remoteid); Assert(OidIsValid(entry->remoteid)); return psprintf("%s.%s", entry->nspname, entry->typname); }