diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1da1f13ef3..9aa63c8792 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -4637,6 +4637,7 @@ CommitSubTransaction(void) AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(true, s->nestingLevel); /* * We need to restore the upper transaction's read-only state, in case the @@ -4790,6 +4791,7 @@ AbortSubTransaction(void) AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); + AtEOSubXact_ApplyLauncher(false, s->nestingLevel); } /* diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6ef333b725..ada16adb67 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId Oid relid; } LogicalRepWorkerId; -static List *on_commit_stop_workers = NIL; +typedef struct StopWorkersData +{ + int nestDepth; /* Sub-transaction nest level */ + List *workers; /* List of LogicalRepWorkerId */ + struct StopWorkersData *parent; /* This need not be an immediate + * subtransaction parent */ +} StopWorkersData; + +/* + * Stack of StopWorkersData elements. Each stack element contains the workers + * to be stopped for that subtransaction. + */ +static StopWorkersData *on_commit_stop_workers = NULL; static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); @@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid) void logicalrep_worker_stop_at_commit(Oid subid, Oid relid) { + int nestDepth = GetCurrentTransactionNestLevel(); LogicalRepWorkerId *wid; MemoryContext oldctx; /* Make sure we store the info in context that survives until commit. */ oldctx = MemoryContextSwitchTo(TopTransactionContext); + /* Check that previous transactions were properly cleaned up. */ + Assert(on_commit_stop_workers == NULL || + nestDepth >= on_commit_stop_workers->nestDepth); + + /* + * Push a new stack element if we don't already have one for the current + * nestDepth. + */ + if (on_commit_stop_workers == NULL || + nestDepth > on_commit_stop_workers->nestDepth) + { + StopWorkersData *newdata = palloc(sizeof(StopWorkersData)); + + newdata->nestDepth = nestDepth; + newdata->workers = NIL; + newdata->parent = on_commit_stop_workers; + on_commit_stop_workers = newdata; + } + + /* + * Finally add a new worker into the worker list of the current + * subtransaction. + */ wid = palloc(sizeof(LogicalRepWorkerId)); wid->subid = subid; wid->relid = relid; - - on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + on_commit_stop_workers->workers = + lappend(on_commit_stop_workers->workers, wid); MemoryContextSwitchTo(oldctx); } @@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void) bool XactManipulatesLogicalReplicationWorkers(void) { - return (on_commit_stop_workers != NIL); + return (on_commit_stop_workers != NULL); } /* @@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void) void AtEOXact_ApplyLauncher(bool isCommit) { + + Assert(on_commit_stop_workers == NULL || + (on_commit_stop_workers->nestDepth == 1 && + on_commit_stop_workers->parent == NULL)); + if (isCommit) { ListCell *lc; - foreach(lc, on_commit_stop_workers) + if (on_commit_stop_workers != NULL) { - LogicalRepWorkerId *wid = lfirst(lc); + List *workers = on_commit_stop_workers->workers; - logicalrep_worker_stop(wid->subid, wid->relid); + foreach(lc, workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + + logicalrep_worker_stop(wid->subid, wid->relid); + } } if (on_commit_launcher_wakeup) @@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit) * No need to pfree on_commit_stop_workers. It was allocated in * transaction memory context, which is going to be cleaned soon. */ - on_commit_stop_workers = NIL; + on_commit_stop_workers = NULL; on_commit_launcher_wakeup = false; } +/* + * On commit, merge the current on_commit_stop_workers list into the + * immediate parent, if present. + * On rollback, discard the current on_commit_stop_workers list. + * Pop out the stack. + */ +void +AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth) +{ + StopWorkersData *parent; + + /* Exit immediately if there's no work to do at this level. */ + if (on_commit_stop_workers == NULL || + on_commit_stop_workers->nestDepth < nestDepth) + return; + + Assert(on_commit_stop_workers->nestDepth == nestDepth); + + parent = on_commit_stop_workers->parent; + + if (isCommit) + { + /* + * If the upper stack element is not an immediate parent + * subtransaction, just decrement the notional nesting depth without + * doing any real work. Else, we need to merge the current workers + * list into the parent. + */ + if (!parent || parent->nestDepth < nestDepth - 1) + { + on_commit_stop_workers->nestDepth--; + return; + } + + parent->workers = + list_concat(parent->workers, on_commit_stop_workers->workers); + } + else + { + /* + * Abandon everything that was done at this nesting level. Explicitly + * free memory to avoid a transaction-lifespan leak. + */ + list_free_deep(on_commit_stop_workers->workers); + } + + /* + * We have taken care of the current subtransaction workers list for both + * abort or commit. So we are ready to pop the stack. + */ + pfree(on_commit_stop_workers); + on_commit_stop_workers = parent; +} + /* * Request wakeup of the launcher on commit of the transaction. * diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index ef02512412..9f840b7bc1 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherWakeupAtCommit(void); extern bool XactManipulatesLogicalReplicationWorkers(void); extern void AtEOXact_ApplyLauncher(bool isCommit); +extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth); extern bool IsLogicalLauncher(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 03867cbce5..ed68cc4085 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2227,6 +2227,7 @@ StdAnalyzeData StdRdOptions Step StopList +StopWorkersData StrategyNumber StreamCtl StringInfo