Rearrange logrep worker's snapshot handling some more.

It turns out that worker.c's code path for TRUNCATE was also
careless about establishing a snapshot while executing user-defined
code, allowing the checks added by commit 84f5c2908 to fail when
a trigger is fired in that context.

We could just wrap Push/PopActiveSnapshot around the truncate call,
but it seems better to establish a policy of holding a snapshot
throughout execution of a replication step.  To help with that and
possible future requirements, replace the previous ensure_transaction
calls with pairs of begin/end_replication_step calls.

Per report from Mark Dilger.  Back-patch to v11, like the previous
changes.

Discussion: https://postgr.es/m/B4A3AF82-79ED-4F4C-A4E5-CD2622098972@enterprisedb.com
This commit is contained in:
Tom Lane 2021-06-10 12:27:27 -04:00
parent 534b9be805
commit eea081ad01

View File

@ -154,30 +154,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
} }
/* /*
* Make sure that we started local transaction. * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
* *
* Also switches to ApplyMessageContext as necessary. * Start a transaction, if this is the first step (else we keep using the
* existing transaction).
* Also provide a global snapshot and ensure we run in ApplyMessageContext.
*/ */
static bool static void
ensure_transaction(void) begin_replication_step(void)
{ {
if (IsTransactionState()) SetCurrentStatementStartTimestamp();
if (!IsTransactionState())
{ {
SetCurrentStatementStartTimestamp(); StartTransactionCommand();
maybe_reread_subscription();
if (CurrentMemoryContext != ApplyMessageContext)
MemoryContextSwitchTo(ApplyMessageContext);
return false;
} }
SetCurrentStatementStartTimestamp(); PushActiveSnapshot(GetTransactionSnapshot());
StartTransactionCommand();
maybe_reread_subscription();
MemoryContextSwitchTo(ApplyMessageContext); MemoryContextSwitchTo(ApplyMessageContext);
return true; }
/*
* Finish up one step of a replication transaction.
* Callers of begin_replication_step() must also call this.
*
* We don't close out the transaction here, but we should increment
* the command counter to make the effects of this step visible.
*/
static void
end_replication_step(void)
{
PopActiveSnapshot();
CommandCounterIncrement();
} }
@ -194,13 +205,6 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
ResultRelInfo *resultRelInfo; ResultRelInfo *resultRelInfo;
RangeTblEntry *rte; RangeTblEntry *rte;
/*
* Input functions may need an active snapshot, as may AFTER triggers
* invoked during finish_estate. For safety, ensure an active snapshot
* exists throughout all our usage of the executor.
*/
PushActiveSnapshot(GetTransactionSnapshot());
estate = CreateExecutorState(); estate = CreateExecutorState();
rte = makeNode(RangeTblEntry); rte = makeNode(RangeTblEntry);
@ -241,7 +245,6 @@ finish_estate(EState *estate)
/* Cleanup. */ /* Cleanup. */
ExecResetTupleTable(estate->es_tupleTable, false); ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate); FreeExecutorState(estate);
PopActiveSnapshot();
} }
/* /*
@ -631,7 +634,7 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot; TupleTableSlot *remoteslot;
MemoryContext oldctx; MemoryContext oldctx;
ensure_transaction(); begin_replication_step();
relid = logicalrep_read_insert(s, &newtup); relid = logicalrep_read_insert(s, &newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock); rel = logicalrep_rel_open(relid, RowExclusiveLock);
@ -642,6 +645,7 @@ apply_handle_insert(StringInfo s)
* transaction so it's safe to unlock it. * transaction so it's safe to unlock it.
*/ */
logicalrep_rel_close(rel, RowExclusiveLock); logicalrep_rel_close(rel, RowExclusiveLock);
end_replication_step();
return; return;
} }
@ -668,7 +672,7 @@ apply_handle_insert(StringInfo s)
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement(); end_replication_step();
} }
/* /*
@ -727,7 +731,7 @@ apply_handle_update(StringInfo s)
bool found; bool found;
MemoryContext oldctx; MemoryContext oldctx;
ensure_transaction(); begin_replication_step();
relid = logicalrep_read_update(s, &has_oldtup, &oldtup, relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
&newtup); &newtup);
@ -739,6 +743,7 @@ apply_handle_update(StringInfo s)
* transaction so it's safe to unlock it. * transaction so it's safe to unlock it.
*/ */
logicalrep_rel_close(rel, RowExclusiveLock); logicalrep_rel_close(rel, RowExclusiveLock);
end_replication_step();
return; return;
} }
@ -840,7 +845,7 @@ apply_handle_update(StringInfo s)
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement(); end_replication_step();
} }
/* /*
@ -862,7 +867,7 @@ apply_handle_delete(StringInfo s)
bool found; bool found;
MemoryContext oldctx; MemoryContext oldctx;
ensure_transaction(); begin_replication_step();
relid = logicalrep_read_delete(s, &oldtup); relid = logicalrep_read_delete(s, &oldtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock); rel = logicalrep_rel_open(relid, RowExclusiveLock);
@ -873,6 +878,7 @@ apply_handle_delete(StringInfo s)
* transaction so it's safe to unlock it. * transaction so it's safe to unlock it.
*/ */
logicalrep_rel_close(rel, RowExclusiveLock); logicalrep_rel_close(rel, RowExclusiveLock);
end_replication_step();
return; return;
} }
@ -934,7 +940,7 @@ apply_handle_delete(StringInfo s)
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement(); end_replication_step();
} }
/* /*
@ -955,7 +961,7 @@ apply_handle_truncate(StringInfo s)
ListCell *lc; ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock; LOCKMODE lockmode = AccessExclusiveLock;
ensure_transaction(); begin_replication_step();
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
@ -996,7 +1002,7 @@ apply_handle_truncate(StringInfo s)
logicalrep_rel_close(rel, NoLock); logicalrep_rel_close(rel, NoLock);
} }
CommandCounterIncrement(); end_replication_step();
} }