/*------------------------------------------------------------------------- * * execReplication.c * miscellaneous executor routines for logical replication * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/executor/execReplication.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/typcache.h" static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq); /* * Returns the fixed strategy number, if any, of the equality operator for the * given index access method, otherwise, InvalidStrategy. * * Currently, only Btree and Hash indexes are supported. The other index access * methods don't have a fixed strategy for equality operation - instead, the * support routines of each operator class interpret the strategy numbers * according to the operator class's definition. */ StrategyNumber get_equal_strategy_number_for_am(Oid am) { int ret; switch (am) { case BTREE_AM_OID: ret = BTEqualStrategyNumber; break; case HASH_AM_OID: ret = HTEqualStrategyNumber; break; default: /* XXX: Only Btree and Hash indexes are supported */ ret = InvalidStrategy; break; } return ret; } /* * Return the appropriate strategy number which corresponds to the equality * operator. */ static StrategyNumber get_equal_strategy_number(Oid opclass) { Oid am = get_opclass_method(opclass); return get_equal_strategy_number_for_am(am); } /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that * is setup to match 'rel' (*NOT* idxrel!). * * Returns how many columns to use for the index scan. * * This is not generic routine, idxrel must be PK, RI, or an index that can be * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull() * for details. * * By definition, replication identity of a rel meets all limitations associated * with that. Note that any other index could also meet these limitations. */ static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot) { int index_attoff; int skey_attoff = 0; Datum indclassDatum; oidvector *opclass; int2vector *indkey = &idxrel->rd_index->indkey; indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass); opclass = (oidvector *) DatumGetPointer(indclassDatum); /* Build scankey for every non-expression attribute in the index. */ for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); index_attoff++) { Oid operator; Oid optype; Oid opfamily; RegProcedure regop; int table_attno = indkey->values[index_attoff]; StrategyNumber eq_strategy; if (!AttributeNumberIsValid(table_attno)) { /* * XXX: Currently, we don't support expressions in the scan key, * see code below. */ continue; } /* * Load the operator info. We need this to get the equality operator * function for the scan key. */ optype = get_opclass_input_type(opclass->values[index_attoff]); opfamily = get_opclass_family(opclass->values[index_attoff]); eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]); operator = get_opfamily_member(opfamily, optype, optype, eq_strategy); if (!OidIsValid(operator)) elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", eq_strategy, optype, optype, opfamily); regop = get_opcode(operator); /* Initialize the scankey. */ ScanKeyInit(&skey[skey_attoff], index_attoff + 1, eq_strategy, regop, searchslot->tts_values[table_attno - 1]); skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff]; /* Check for null value. */ if (searchslot->tts_isnull[table_attno - 1]) skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL); skey_attoff++; } /* There must always be at least one attribute for the index scan. */ Assert(skey_attoff > 0); return skey_attoff; } /* * Search the relation 'rel' for tuple using the index. * * If a matching tuple is found, lock it with lockmode, fill the slot with its * contents, and return true. Return false otherwise. */ bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot) { ScanKeyData skey[INDEX_MAX_KEYS]; int skey_attoff; IndexScanDesc scan; SnapshotData snap; TransactionId xwait; Relation idxrel; bool found; TypeCacheEntry **eq = NULL; bool isIdxSafeToSkipDuplicates; /* Open the index. */ idxrel = index_open(idxoid, RowExclusiveLock); isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); InitDirtySnapshot(snap); /* Build scan key. */ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); /* Start an index scan. */ scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0); retry: found = false; index_rescan(scan, skey, skey_attoff, NULL, 0); /* Try to find the tuple */ while (index_getnext_slot(scan, ForwardScanDirection, outslot)) { /* * Avoid expensive equality check if the index is primary key or * replica identity index. */ if (!isIdxSafeToSkipDuplicates) { if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); if (!tuples_equal(outslot, searchslot, eq)) continue; } ExecMaterializeSlot(outslot); xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; /* * If the tuple is locked, wait for locking transaction to finish and * retry. */ if (TransactionIdIsValid(xwait)) { XactLockTableWait(xwait, NULL, NULL, XLTW_None); goto retry; } /* Found our tuple and it's not locked */ found = true; break; } /* Found tuple, try to lock it in the lockmode. */ if (found) { TM_FailureData tmfd; TM_Result res; PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(), outslot, GetCurrentCommandId(false), lockmode, LockWaitBlock, 0 /* don't follow updates */ , &tmfd); PopActiveSnapshot(); switch (res) { case TM_Ok: break; case TM_Updated: /* XXX: Improve handling here */ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); else ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; case TM_Deleted: /* XXX: Improve handling here */ ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent delete, retrying"))); goto retry; case TM_Invisible: elog(ERROR, "attempted to lock invisible tuple"); break; default: elog(ERROR, "unexpected table_tuple_lock status: %u", res); break; } } index_endscan(scan); /* Don't release lock until commit. */ index_close(idxrel, NoLock); return found; } /* * Compare the tuples in the slots by checking if they have equal values. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq) { int attrnum; Assert(slot1->tts_tupleDescriptor->natts == slot2->tts_tupleDescriptor->natts); slot_getallattrs(slot1); slot_getallattrs(slot2); /* Check equality of the attributes. */ for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++) { Form_pg_attribute att; TypeCacheEntry *typentry; att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum); /* * Ignore dropped and generated columns as the publisher doesn't send * those */ if (att->attisdropped || att->attgenerated) continue; /* * If one value is NULL and other is not, then they are certainly not * equal */ if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum]) return false; /* * If both are NULL, they can be considered equal. */ if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum]) continue; typentry = eq[attrnum]; if (typentry == NULL) { typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO); if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), errmsg("could not identify an equality operator for type %s", format_type_be(att->atttypid)))); eq[attrnum] = typentry; } if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo, att->attcollation, slot1->tts_values[attrnum], slot2->tts_values[attrnum]))) return false; } return true; } /* * Search the relation 'rel' for tuple using the sequential scan. * * If a matching tuple is found, lock it with lockmode, fill the slot with its * contents, and return true. Return false otherwise. * * Note that this stops on the first matching tuple. * * This can obviously be quite slow on tables that have more than few rows. */ bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot) { TupleTableSlot *scanslot; TableScanDesc scan; SnapshotData snap; TypeCacheEntry **eq; TransactionId xwait; bool found; TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor)); eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); /* Start a heap scan. */ InitDirtySnapshot(snap); scan = table_beginscan(rel, &snap, 0, NULL); scanslot = table_slot_create(rel, NULL); retry: found = false; table_rescan(scan, NULL); /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { if (!tuples_equal(scanslot, searchslot, eq)) continue; found = true; ExecCopySlot(outslot, scanslot); xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; /* * If the tuple is locked, wait for locking transaction to finish and * retry. */ if (TransactionIdIsValid(xwait)) { XactLockTableWait(xwait, NULL, NULL, XLTW_None); goto retry; } /* Found our tuple and it's not locked */ break; } /* Found tuple, try to lock it in the lockmode. */ if (found) { TM_FailureData tmfd; TM_Result res; PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(), outslot, GetCurrentCommandId(false), lockmode, LockWaitBlock, 0 /* don't follow updates */ , &tmfd); PopActiveSnapshot(); switch (res) { case TM_Ok: break; case TM_Updated: /* XXX: Improve handling here */ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); else ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; case TM_Deleted: /* XXX: Improve handling here */ ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent delete, retrying"))); goto retry; case TM_Invisible: elog(ERROR, "attempted to lock invisible tuple"); break; default: elog(ERROR, "unexpected table_tuple_lock status: %u", res); break; } } table_endscan(scan); ExecDropSingleTupleTableSlot(scanslot); return found; } /* * Insert tuple represented in the slot to the relation, update the indexes, * and execute any constraints and per-row triggers. * * Caller is responsible for opening the indexes. */ void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot) { bool skip_tuple = false; Relation rel = resultRelInfo->ri_RelationDesc; /* For now we support only tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); CheckCmdReplicaIdentity(rel, CMD_INSERT); /* BEFORE ROW INSERT Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) { if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) skip_tuple = true; /* "do nothing" */ } if (!skip_tuple) { List *recheckIndexes = NIL; /* Compute stored generated columns */ if (rel->rd_att->constr && rel->rd_att->constr->has_generated_stored) ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_INSERT); /* Check the constraints of the tuple */ if (rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); if (rel->rd_rel->relispartition) ExecPartitionCheck(resultRelInfo, slot, estate, true); /* OK, store the tuple and create index entries for it */ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, slot, estate, false, false, NULL, NIL, false); /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, NULL); /* * XXX we should in theory pass a TransitionCaptureState object to the * above to capture transition tuples, but after statement triggers * don't actually get fired by replication yet anyway */ list_free(recheckIndexes); } } /* * Find the searchslot tuple and update it with data in the slot, * update the indexes, and execute any constraints and per-row triggers. * * Caller is responsible for opening the indexes. */ void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot) { bool skip_tuple = false; Relation rel = resultRelInfo->ri_RelationDesc; ItemPointer tid = &(searchslot->tts_tid); /* For now we support only tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); CheckCmdReplicaIdentity(rel, CMD_UPDATE); /* BEFORE ROW UPDATE Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) { if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, tid, NULL, slot, NULL, NULL)) skip_tuple = true; /* "do nothing" */ } if (!skip_tuple) { List *recheckIndexes = NIL; TU_UpdateIndexes update_indexes; TupleTableSlot *oldSlot = NULL; /* Compute stored generated columns */ if (rel->rd_att->constr && rel->rd_att->constr->has_generated_stored) ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_UPDATE); /* Check the constraints of the tuple */ if (rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); if (rel->rd_rel->relispartition) ExecPartitionCheck(resultRelInfo, slot, estate, true); if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_after_row) oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo); simple_table_tuple_update(rel, tid, slot, estate->es_snapshot, &update_indexes, oldSlot); if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None)) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, slot, estate, true, false, NULL, NIL, (update_indexes == TU_Summarizing)); /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, NULL, NULL, NULL, oldSlot, slot, recheckIndexes, NULL, false); list_free(recheckIndexes); } } /* * Find the searchslot tuple and delete it, and execute any constraints * and per-row triggers. * * Caller is responsible for opening the indexes. */ void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot) { bool skip_tuple = false; Relation rel = resultRelInfo->ri_RelationDesc; ItemPointer tid = &searchslot->tts_tid; CheckCmdReplicaIdentity(rel, CMD_DELETE); /* BEFORE ROW DELETE Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_before_row) { skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo, tid, NULL, NULL, NULL, NULL); } if (!skip_tuple) { TupleTableSlot *oldSlot = NULL; if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_after_row) oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo); /* OK, delete the tuple */ simple_table_tuple_delete(rel, tid, estate->es_snapshot, oldSlot); /* AFTER ROW DELETE Triggers */ ExecARDeleteTriggers(estate, resultRelInfo, NULL, oldSlot, NULL, false); } } /* * Check if command can be executed with current replica identity. */ void CheckCmdReplicaIdentity(Relation rel, CmdType cmd) { PublicationDesc pubdesc; /* * Skip checking the replica identity for partitioned tables, because the * operations are actually performed on the leaf partitions. */ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) return; /* We only need to do checks for UPDATE and DELETE. */ if (cmd != CMD_UPDATE && cmd != CMD_DELETE) return; /* * It is only safe to execute UPDATE/DELETE when all columns, referenced * in the row filters from publications which the relation is in, are * valid - i.e. when all referenced columns are part of REPLICA IDENTITY * or the table does not publish UPDATEs or DELETEs. * * XXX We could optimize it by first checking whether any of the * publications have a row filter for this relation. If not and relation * has replica identity then we can avoid building the descriptor but as * this happens only one time it doesn't seem worth the additional * complexity. */ RelationBuildPublicationDesc(rel, &pubdesc); if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot update table \"%s\"", RelationGetRelationName(rel)), errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot update table \"%s\"", RelationGetRelationName(rel)), errdetail("Column list used by the publication does not cover the replica identity."))); else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot delete from table \"%s\"", RelationGetRelationName(rel)), errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot delete from table \"%s\"", RelationGetRelationName(rel)), errdetail("Column list used by the publication does not cover the replica identity."))); /* If relation has replica identity we are always good. */ if (OidIsValid(RelationGetReplicaIndex(rel))) return; /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */ if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) return; /* * This is UPDATE/DELETE and there is no replica identity. * * Check if the table publishes UPDATES or DELETES. */ if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates", RelationGetRelationName(rel)), errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE."))); else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes", RelationGetRelationName(rel)), errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE."))); } /* * Check if we support writing into specific relkind. * * The nspname and relname are only needed for error reporting. */ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", nspname, relname), errdetail_relkind_not_supported(relkind))); }