Optimize the origin drop functionality.

To interlock against concurrent drops, we use to hold ExclusiveLock on
pg_replication_origin till xact commit. This blocks even concurrent drops
of different origins by tablesync workers. So, instead, lock the specific
origin to interlock against concurrent drops.

This reduces the test time variability in src/test/subscription where
multiple tables are being synced.

Author: Vignesh C
Reviewed-by: Hou Zhijie, Amit Kapila
Discussion: https://postgr.es/m/1412708.1674417574@sss.pgh.pa.us
This commit is contained in:
Amit Kapila 2023-02-03 08:29:08 +05:30
parent 2e9f120b65
commit 3e577ff602
1 changed files with 33 additions and 29 deletions

View File

@ -338,16 +338,14 @@ replorigin_create(const char *roname)
* Helper function to drop a replication origin.
*/
static void
replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
replorigin_state_clear(RepOriginId roident, bool nowait)
{
HeapTuple tuple;
int i;
/*
* First, clean up the slot state info, if there is any matching slot.
* Clean up the slot state info, if there is any matching slot.
*/
restart:
tuple = NULL;
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
for (i = 0; i < max_replication_slots; i++)
@ -402,19 +400,6 @@ restart:
}
LWLockRelease(ReplicationOriginLock);
ConditionVariableCancelSleep();
/*
* Now, we can delete the catalog entry.
*/
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for replication origin with ID %d",
roident);
CatalogTupleDelete(rel, &tuple->t_self);
ReleaseSysCache(tuple);
CommandCounterIncrement();
}
/*
@ -427,24 +412,43 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
{
RepOriginId roident;
Relation rel;
HeapTuple tuple;
Assert(IsTransactionState());
/*
* To interlock against concurrent drops, we hold ExclusiveLock on
* pg_replication_origin till xact commit.
*
* XXX We can optimize this by acquiring the lock on a specific origin by
* using LockSharedObject if required. However, for that, we first to
* acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
* the specific origin and then re-check if the origin still exists.
*/
rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
roident = replorigin_by_name(name, missing_ok);
if (OidIsValid(roident))
replorigin_drop_guts(rel, roident, nowait);
/* Lock the origin to prevent concurrent drops. */
LockSharedObject(ReplicationOriginRelationId, roident, 0,
AccessExclusiveLock);
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
if (!HeapTupleIsValid(tuple))
{
if (!missing_ok)
elog(ERROR, "cache lookup failed for replication origin with ID %d",
roident);
/*
* We don't need to retain the locks if the origin is already dropped.
*/
UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
AccessExclusiveLock);
table_close(rel, RowExclusiveLock);
return;
}
replorigin_state_clear(roident, nowait);
/*
* Now, we can delete the catalog entry.
*/
CatalogTupleDelete(rel, &tuple->t_self);
ReleaseSysCache(tuple);
CommandCounterIncrement();
/* We keep the lock on pg_replication_origin until commit */
table_close(rel, NoLock);