diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6c894421a3..44bdcab3b9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -168,14 +168,11 @@ get_subscription_list(void) */ static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, + uint16 generation, BackgroundWorkerHandle *handle) { BgwHandleStatus status; int rc; - uint16 generation; - - /* Remember generation for future identification. */ - generation = worker->generation; for (;;) { @@ -282,7 +279,7 @@ logicalrep_workers_find(Oid subid, bool only_running) } /* - * Start new apply background worker. + * Start new apply background worker, if possible. */ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, @@ -290,6 +287,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; + uint16 generation; int i; int slot = 0; LogicalRepWorker *worker = NULL; @@ -406,6 +404,9 @@ retry: worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); + /* Before releasing lock, remember generation for future identification. */ + generation = worker->generation; + LWLockRelease(LogicalRepWorkerLock); /* Register the new dynamic worker. */ @@ -428,6 +429,12 @@ retry: if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->generation); + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), @@ -436,7 +443,7 @@ retry: } /* Now wait until it attaches. */ - WaitForReplicationWorkerAttach(worker, bgw_handle); + WaitForReplicationWorkerAttach(worker, generation, bgw_handle); } /*