Fix assertion failure in apply worker.

During exit, the logical replication apply worker tries to release session
level locks, if any. However, if the apply worker exits due to an error
before its connection is initialized, trying to release locks can lead to
assertion failure. The locks will be acquired once the worker is
initialized, so we don't need to release them till the worker
initialization is complete.

Reported-by: Alexander Lakhin
Author: Hou Zhijie based on inputs from Sawada Masahiko and Amit Kapila
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/2185d65f-5aae-3efa-c48f-fb42b173ef5c@gmail.com
This commit is contained in:
Amit Kapila 2023-05-03 10:13:13 +05:30
parent 6489875ce6
commit de63f8dade
4 changed files with 17 additions and 1 deletions

View File

@ -873,6 +873,8 @@ ParallelApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg); int worker_slot = DatumGetInt32(main_arg);
char originname[NAMEDATALEN]; char originname[NAMEDATALEN];
InitializingApplyWorker = true;
/* Setup signal handling. */ /* Setup signal handling. */
pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest); pqsignal(SIGINT, SignalHandlerForShutdownRequest);
@ -940,6 +942,8 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializeApplyWorker(); InitializeApplyWorker();
InitializingApplyWorker = false;
/* Setup replication origin tracking. */ /* Setup replication origin tracking. */
StartTransactionCommand(); StartTransactionCommand();
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,

View File

@ -797,7 +797,10 @@ logicalrep_worker_onexit(int code, Datum arg)
* Session level locks may be acquired outside of a transaction in * Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker * parallel apply mode and will not be released when the worker
* terminates, so manually release all locks before the worker exits. * terminates, so manually release all locks before the worker exits.
*
* The locks will be acquired once the worker is initialized.
*/ */
if (!InitializingApplyWorker)
LockReleaseAll(DEFAULT_LOCKMETHOD, true); LockReleaseAll(DEFAULT_LOCKMETHOD, true);
ApplyLauncherWakeup(); ApplyLauncherWakeup();

View File

@ -331,6 +331,9 @@ static TransactionId stream_xid = InvalidTransactionId;
*/ */
static uint32 parallel_stream_nchanges = 0; static uint32 parallel_stream_nchanges = 0;
/* Are we initializing a apply worker? */
bool InitializingApplyWorker = false;
/* /*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn. * the subscription if the remote transaction's finish LSN matches the subskiplsn.
@ -4526,6 +4529,8 @@ ApplyWorkerMain(Datum main_arg)
WalRcvStreamOptions options; WalRcvStreamOptions options;
int server_version; int server_version;
InitializingApplyWorker = true;
/* Attach to slot */ /* Attach to slot */
logicalrep_worker_attach(worker_slot); logicalrep_worker_attach(worker_slot);
@ -4548,6 +4553,8 @@ ApplyWorkerMain(Datum main_arg)
InitializeApplyWorker(); InitializeApplyWorker();
InitializingApplyWorker = false;
/* Connect to the origin and start the replication. */ /* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"", elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo); MySubscription->conninfo);

View File

@ -225,6 +225,8 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
extern void logicalrep_worker_attach(int slot); extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running); bool only_running);