diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ad4a732fd2..e5f0aaa8f7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -113,6 +113,16 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +static void apply_handle_insert_internal(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *remoteslot); +static void apply_handle_update_internal(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry); +static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, + TupleTableSlot *remoteslot, + LogicalRepRelation *remoterel); + /* * Should this worker apply changes for given relation. * @@ -582,6 +592,7 @@ GetRelationIdentityOrPK(Relation rel) /* * Handle INSERT message. */ + static void apply_handle_insert(StringInfo s) { @@ -621,13 +632,10 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - ExecOpenIndices(estate->es_result_relation_info, false); + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_insert_internal(estate->es_result_relation_info, estate, + remoteslot); - /* Do the insert. */ - ExecSimpleRelationInsert(estate, remoteslot); - - /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ @@ -641,6 +649,20 @@ apply_handle_insert(StringInfo s) CommandCounterIncrement(); } +/* Workhorse for apply_handle_insert() */ +static void +apply_handle_insert_internal(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *remoteslot) +{ + ExecOpenIndices(relinfo, false); + + /* Do the insert. */ + ExecSimpleRelationInsert(estate, remoteslot); + + /* Cleanup. */ + ExecCloseIndices(relinfo); +} + /* * Check if the logical replication relation is updatable and throw * appropriate error if it isn't. @@ -684,16 +706,12 @@ apply_handle_update(StringInfo s) { LogicalRepRelMapEntry *rel; LogicalRepRelId relid; - Oid idxoid; EState *estate; - EPQState epqstate; LogicalRepTupleData oldtup; LogicalRepTupleData newtup; bool has_oldtup; - TupleTableSlot *localslot; TupleTableSlot *remoteslot; RangeTblEntry *target_rte; - bool found; MemoryContext oldctx; ensure_transaction(); @@ -719,9 +737,6 @@ apply_handle_update(StringInfo s) remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); - localslot = table_slot_create(rel->localrel, - &estate->es_tupleTable); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); /* * Populate updatedCols so that per-column triggers can fire. This could @@ -741,7 +756,6 @@ apply_handle_update(StringInfo s) fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel)); PushActiveSnapshot(GetTransactionSnapshot()); - ExecOpenIndices(estate->es_result_relation_info, false); /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); @@ -749,20 +763,57 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_update_internal(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel); + + PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + + logicalrep_rel_close(rel, NoLock); + + CommandCounterIncrement(); +} + +/* Workhorse for apply_handle_update() */ +static void +apply_handle_update_internal(ResultRelInfo *relinfo, + EState *estate, TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry) +{ + Relation localrel = relinfo->ri_RelationDesc; + LogicalRepRelation *remoterel = &relmapentry->remoterel; + Oid idxoid; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; + + localslot = table_slot_create(localrel, &estate->es_tupleTable); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + ExecOpenIndices(relinfo, false); + /* * Try to find tuple using either replica identity index, primary key or * if needed, sequential scan. */ - idxoid = GetRelationIdentityOrPK(rel->localrel); + idxoid = GetRelationIdentityOrPK(localrel); Assert(OidIsValid(idxoid) || - (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); + (remoterel->replident == REPLICA_IDENTITY_FULL)); if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(rel->localrel, idxoid, + found = RelationFindReplTupleByIndex(localrel, idxoid, LockTupleExclusive, remoteslot, localslot); else - found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, + found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, remoteslot, localslot); ExecClearTuple(remoteslot); @@ -776,8 +827,8 @@ apply_handle_update(StringInfo s) { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, rel, - newtup.values, newtup.changed); + slot_modify_cstrings(remoteslot, localslot, relmapentry, + newtup->values, newtup->changed); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -795,23 +846,12 @@ apply_handle_update(StringInfo s) elog(DEBUG1, "logical replication did not find row for update " "in replication target relation \"%s\"", - RelationGetRelationName(rel->localrel)); + RelationGetRelationName(localrel)); } /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - + ExecCloseIndices(relinfo); EvalPlanQualEnd(&epqstate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); - - logicalrep_rel_close(rel, NoLock); - - CommandCounterIncrement(); } /* @@ -825,12 +865,8 @@ apply_handle_delete(StringInfo s) LogicalRepRelMapEntry *rel; LogicalRepTupleData oldtup; LogicalRepRelId relid; - Oid idxoid; EState *estate; - EPQState epqstate; TupleTableSlot *remoteslot; - TupleTableSlot *localslot; - bool found; MemoryContext oldctx; ensure_transaction(); @@ -855,33 +891,64 @@ apply_handle_delete(StringInfo s) remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); - localslot = table_slot_create(rel->localrel, - &estate->es_tupleTable); - EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); PushActiveSnapshot(GetTransactionSnapshot()); - ExecOpenIndices(estate->es_result_relation_info, false); - /* Find the tuple using the replica identity index. */ + /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, oldtup.values); MemoryContextSwitchTo(oldctx); + Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); + apply_handle_delete_internal(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); + + PopActiveSnapshot(); + + /* Handle queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + + logicalrep_rel_close(rel, NoLock); + + CommandCounterIncrement(); +} + +/* Workhorse for apply_handle_delete() */ +static void +apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, + TupleTableSlot *remoteslot, + LogicalRepRelation *remoterel) +{ + Relation localrel = relinfo->ri_RelationDesc; + Oid idxoid; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + + localslot = table_slot_create(localrel, &estate->es_tupleTable); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + ExecOpenIndices(relinfo, false); + /* * Try to find tuple using either replica identity index, primary key or * if needed, sequential scan. */ - idxoid = GetRelationIdentityOrPK(rel->localrel); + idxoid = GetRelationIdentityOrPK(localrel); Assert(OidIsValid(idxoid) || - (rel->remoterel.replident == REPLICA_IDENTITY_FULL)); + (remoterel->replident == REPLICA_IDENTITY_FULL)); if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(rel->localrel, idxoid, + found = RelationFindReplTupleByIndex(localrel, idxoid, LockTupleExclusive, remoteslot, localslot); else - found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, + found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, remoteslot, localslot); + /* If found delete it. */ if (found) { @@ -896,23 +963,12 @@ apply_handle_delete(StringInfo s) elog(DEBUG1, "logical replication could not find row for delete " "in replication target relation \"%s\"", - RelationGetRelationName(rel->localrel)); + RelationGetRelationName(localrel)); } /* Cleanup. */ - ExecCloseIndices(estate->es_result_relation_info); - PopActiveSnapshot(); - - /* Handle queued AFTER triggers. */ - AfterTriggerEndQuery(estate); - + ExecCloseIndices(relinfo); EvalPlanQualEnd(&epqstate); - ExecResetTupleTable(estate->es_tupleTable, false); - FreeExecutorState(estate); - - logicalrep_rel_close(rel, NoLock); - - CommandCounterIncrement(); } /*