diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 85e480db4b..6e268f3521 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -146,7 +146,12 @@ finish_sync_worker(void) /* * Wait until the table synchronization change. * - * Returns false if the relation subscription state disappeared. + * If called from apply worker, it will wait for the synchronization worker to + * change table state in shmem. If called from synchronization worker, it + * will wait for apply worker to change table state in shmem. + * + * Returns false if the opposite worker has disappeared or the table state has + * been reset. */ static bool wait_for_sync_status_change(Oid relid, char origstate) @@ -161,14 +166,27 @@ wait_for_sync_status_change(Oid relid, char origstate) CHECK_FOR_INTERRUPTS(); LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if the opposite worker is still running and bail if not. */ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - relid, false); + am_tablesync_worker() ? InvalidOid : relid, + false); if (!worker) { LWLockRelease(LogicalRepWorkerLock); return false; } + + /* + * If I'm the synchronization worker, look at my own state. Otherwise + * look at the state of the synchronization worker we found above. + */ + if (am_tablesync_worker()) + worker = MyLogicalRepWorker; + + Assert(worker->relid == relid); state = worker->relstate; + LWLockRelease(LogicalRepWorkerLock); if (state == SUBREL_STATE_UNKNOWN) @@ -179,7 +197,7 @@ wait_for_sync_status_change(Oid relid, char origstate) rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH)