Add subtransaction handling for table synchronization workers.

Since the old logic was completely unaware of subtransactions, a
change made in a subsequently-aborted subtransaction would still cause
workers to be stopped at toplevel transaction commit.  Fix that by
managing a stack of worker lists rather than just one.

Amit Khandekar and Robert Haas

Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
This commit is contained in:
Robert Haas 2018-07-16 17:33:22 -04:00
parent f7cb2842bf
commit 32df1c9afa
4 changed files with 112 additions and 8 deletions

View File

@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
AtEOSubXact_HashTables(true, s->nestingLevel);
AtEOSubXact_PgStat(true, s->nestingLevel);
AtSubCommit_Snapshot(s->nestingLevel);
AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
/*
* We need to restore the upper transaction's read-only state, in case the
@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
AtEOSubXact_HashTables(false, s->nestingLevel);
AtEOSubXact_PgStat(false, s->nestingLevel);
AtSubAbort_Snapshot(s->nestingLevel);
AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
}
/*

View File

@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
Oid relid;
} LogicalRepWorkerId;
static List *on_commit_stop_workers = NIL;
typedef struct StopWorkersData
{
int nestDepth; /* Sub-transaction nest level */
List *workers; /* List of LogicalRepWorkerId */
struct StopWorkersData *parent; /* This need not be an immediate
* subtransaction parent */
} StopWorkersData;
/*
* Stack of StopWorkersData elements. Each stack element contains the workers
* to be stopped for that subtransaction.
*/
static StopWorkersData *on_commit_stop_workers = NULL;
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
void
logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
{
int nestDepth = GetCurrentTransactionNestLevel();
LogicalRepWorkerId *wid;
MemoryContext oldctx;
/* Make sure we store the info in context that survives until commit. */
oldctx = MemoryContextSwitchTo(TopTransactionContext);
/* Check that previous transactions were properly cleaned up. */
Assert(on_commit_stop_workers == NULL ||
nestDepth >= on_commit_stop_workers->nestDepth);
/*
* Push a new stack element if we don't already have one for the current
* nestDepth.
*/
if (on_commit_stop_workers == NULL ||
nestDepth > on_commit_stop_workers->nestDepth)
{
StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
newdata->nestDepth = nestDepth;
newdata->workers = NIL;
newdata->parent = on_commit_stop_workers;
on_commit_stop_workers = newdata;
}
/*
* Finally add a new worker into the worker list of the current
* subtransaction.
*/
wid = palloc(sizeof(LogicalRepWorkerId));
wid->subid = subid;
wid->relid = relid;
on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
on_commit_stop_workers->workers =
lappend(on_commit_stop_workers->workers, wid);
MemoryContextSwitchTo(oldctx);
}
@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
bool
XactManipulatesLogicalReplicationWorkers(void)
{
return (on_commit_stop_workers != NIL);
return (on_commit_stop_workers != NULL);
}
/*
@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
void
AtEOXact_ApplyLauncher(bool isCommit)
{
Assert(on_commit_stop_workers == NULL ||
(on_commit_stop_workers->nestDepth == 1 &&
on_commit_stop_workers->parent == NULL));
if (isCommit)
{
ListCell *lc;
foreach(lc, on_commit_stop_workers)
if (on_commit_stop_workers != NULL)
{
LogicalRepWorkerId *wid = lfirst(lc);
List *workers = on_commit_stop_workers->workers;
logicalrep_worker_stop(wid->subid, wid->relid);
foreach(lc, workers)
{
LogicalRepWorkerId *wid = lfirst(lc);
logicalrep_worker_stop(wid->subid, wid->relid);
}
}
if (on_commit_launcher_wakeup)
@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
* No need to pfree on_commit_stop_workers. It was allocated in
* transaction memory context, which is going to be cleaned soon.
*/
on_commit_stop_workers = NIL;
on_commit_stop_workers = NULL;
on_commit_launcher_wakeup = false;
}
/*
* On commit, merge the current on_commit_stop_workers list into the
* immediate parent, if present.
* On rollback, discard the current on_commit_stop_workers list.
* Pop out the stack.
*/
void
AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
{
StopWorkersData *parent;
/* Exit immediately if there's no work to do at this level. */
if (on_commit_stop_workers == NULL ||
on_commit_stop_workers->nestDepth < nestDepth)
return;
Assert(on_commit_stop_workers->nestDepth == nestDepth);
parent = on_commit_stop_workers->parent;
if (isCommit)
{
/*
* If the upper stack element is not an immediate parent
* subtransaction, just decrement the notional nesting depth without
* doing any real work. Else, we need to merge the current workers
* list into the parent.
*/
if (!parent || parent->nestDepth < nestDepth - 1)
{
on_commit_stop_workers->nestDepth--;
return;
}
parent->workers =
list_concat(parent->workers, on_commit_stop_workers->workers);
}
else
{
/*
* Abandon everything that was done at this nesting level. Explicitly
* free memory to avoid a transaction-lifespan leak.
*/
list_free_deep(on_commit_stop_workers->workers);
}
/*
* We have taken care of the current subtransaction workers list for both
* abort or commit. So we are ready to pop the stack.
*/
pfree(on_commit_stop_workers);
on_commit_stop_workers = parent;
}
/*
* Request wakeup of the launcher on commit of the transaction.
*

View File

@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
extern bool IsLogicalLauncher(void);

View File

@ -2227,6 +2227,7 @@ StdAnalyzeData
StdRdOptions
Step
StopList
StopWorkersData
StrategyNumber
StreamCtl
StringInfo