diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 454a5cbfba..eb013c4608 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -169,30 +169,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) } /* - * Make sure that we started local transaction. + * Begin one step (one INSERT, UPDATE, etc) of a replication transaction. * - * Also switches to ApplyMessageContext as necessary. + * Start a transaction, if this is the first step (else we keep using the + * existing transaction). + * Also provide a global snapshot and ensure we run in ApplyMessageContext. */ -static bool -ensure_transaction(void) +static void +begin_replication_step(void) { - if (IsTransactionState()) + SetCurrentStatementStartTimestamp(); + + if (!IsTransactionState()) { - SetCurrentStatementStartTimestamp(); - - if (CurrentMemoryContext != ApplyMessageContext) - MemoryContextSwitchTo(ApplyMessageContext); - - return false; + StartTransactionCommand(); + maybe_reread_subscription(); } - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - - maybe_reread_subscription(); + PushActiveSnapshot(GetTransactionSnapshot()); MemoryContextSwitchTo(ApplyMessageContext); - return true; +} + +/* + * Finish up one step of a replication transaction. + * Callers of begin_replication_step() must also call this. + * + * We don't close out the transaction here, but we should increment + * the command counter to make the effects of this step visible. + */ +static void +end_replication_step(void) +{ + PopActiveSnapshot(); + + CommandCounterIncrement(); } @@ -210,13 +221,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel) ResultRelInfo *resultRelInfo; RangeTblEntry *rte; - /* - * Input functions may need an active snapshot, as may AFTER triggers - * invoked during finish_edata. For safety, ensure an active snapshot - * exists throughout all our usage of the executor. - */ - PushActiveSnapshot(GetTransactionSnapshot()); - edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData)); edata->targetRel = rel; @@ -277,8 +281,6 @@ finish_edata(ApplyExecutionData *edata) ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); pfree(edata); - - PopActiveSnapshot(); } /* @@ -673,7 +675,7 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - ensure_transaction(); + begin_replication_step(); relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); @@ -684,6 +686,7 @@ apply_handle_insert(StringInfo s) * transaction so it's safe to unlock it. */ logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); return; } @@ -712,7 +715,7 @@ apply_handle_insert(StringInfo s) logicalrep_rel_close(rel, NoLock); - CommandCounterIncrement(); + end_replication_step(); } /* Workhorse for apply_handle_insert() */ @@ -781,7 +784,7 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; - ensure_transaction(); + begin_replication_step(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, &newtup); @@ -793,6 +796,7 @@ apply_handle_update(StringInfo s) * transaction so it's safe to unlock it. */ logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); return; } @@ -849,7 +853,7 @@ apply_handle_update(StringInfo s) logicalrep_rel_close(rel, NoLock); - CommandCounterIncrement(); + end_replication_step(); } /* Workhorse for apply_handle_update() */ @@ -925,7 +929,7 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - ensure_transaction(); + begin_replication_step(); relid = logicalrep_read_delete(s, &oldtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); @@ -936,6 +940,7 @@ apply_handle_delete(StringInfo s) * transaction so it's safe to unlock it. */ logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); return; } @@ -966,7 +971,7 @@ apply_handle_delete(StringInfo s) logicalrep_rel_close(rel, NoLock); - CommandCounterIncrement(); + end_replication_step(); } /* Workhorse for apply_handle_delete() */ @@ -1291,7 +1296,7 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; - ensure_transaction(); + begin_replication_step(); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); @@ -1379,7 +1384,7 @@ apply_handle_truncate(StringInfo s) table_close(rel, NoLock); } - CommandCounterIncrement(); + end_replication_step(); }