Split the SetSubscriptionRelState function into two

We don't actually need the insert-or-update logic, so it's clearer to
have separate functions for the inserting and updating.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
This commit is contained in:
Peter Eisentraut 2018-04-06 10:00:26 -04:00
parent c25304a945
commit bcf79b5bb6
4 changed files with 93 additions and 78 deletions

View File

@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
} }
/* /*
* Set the state of a subscription table. * Add new state record for a subscription table.
*
* If update_only is true and the record for given table doesn't exist, do
* nothing. This can be used to avoid inserting a new record that was deleted
* by someone else. Generally, subscription DDL commands should use false,
* workers should use true.
*
* The insert-or-update logic in this function is not concurrency safe so it
* might raise an error in rare circumstances. But if we took a stronger lock
* such as ShareRowExclusiveLock, we would risk more deadlocks.
*/ */
Oid Oid
SetSubscriptionRelState(Oid subid, Oid relid, char state, AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool update_only) XLogRecPtr sublsn)
{ {
Relation rel; Relation rel;
HeapTuple tup; HeapTuple tup;
Oid subrelid = InvalidOid; Oid subrelid;
bool nulls[Natts_pg_subscription_rel]; bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel];
@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid), ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid)); ObjectIdGetDatum(subid));
if (HeapTupleIsValid(tup))
elog(ERROR, "subscription table %u in subscription %u already exists",
relid, subid);
/* /* Form the tuple. */
* If the record for given table does not exist yet create new record, memset(values, 0, sizeof(values));
* otherwise update the existing one. memset(nulls, false, sizeof(nulls));
*/ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
if (!HeapTupleIsValid(tup) && !update_only) values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
{ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
/* Form the tuple. */ if (sublsn != InvalidXLogRecPtr)
memset(values, 0, sizeof(values)); values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
memset(nulls, false, sizeof(nulls)); else
values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
if (sublsn != InvalidXLogRecPtr)
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
else
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */ /* Insert tuple into catalog. */
subrelid = CatalogTupleInsert(rel, tup); subrelid = CatalogTupleInsert(rel, tup);
heap_freetuple(tup); heap_freetuple(tup);
}
else if (HeapTupleIsValid(tup))
{
bool replaces[Natts_pg_subscription_rel];
/* Update the tuple. */ /* Cleanup. */
memset(values, 0, sizeof(values)); heap_close(rel, NoLock);
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; return subrelid;
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); }
replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; /*
if (sublsn != InvalidXLogRecPtr) * Update the state of a subscription table.
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); */
else Oid
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
bool replaces[Natts_pg_subscription_rel];
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
replaces);
/* Update the catalog. */ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
CatalogTupleUpdate(rel, &tup->t_self, tup);
subrelid = HeapTupleGetOid(tup); /* Try finding existing mapping. */
} tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "subscription table %u in subscription %u does not exist",
relid, subid);
/* Update the tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
if (sublsn != InvalidXLogRecPtr)
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
else
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
/* Update the catalog. */
CatalogTupleUpdate(rel, &tup->t_self, tup);
subrelid = HeapTupleGetOid(tup);
/* Cleanup. */ /* Cleanup. */
heap_close(rel, NoLock); heap_close(rel, NoLock);

View File

@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CheckSubscriptionRelkind(get_rel_relkind(relid), CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname); rv->schemaname, rv->relname);
SetSubscriptionRelState(subid, relid, table_state, AddSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr, false); InvalidXLogRecPtr);
} }
/* /*
@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
if (!bsearch(&relid, subrel_local_oids, if (!bsearch(&relid, subrel_local_oids,
list_length(subrel_states), sizeof(Oid), oid_cmp)) list_length(subrel_states), sizeof(Oid), oid_cmp))
{ {
SetSubscriptionRelState(sub->oid, relid, AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr, false); InvalidXLogRecPtr);
ereport(DEBUG1, ereport(DEBUG1,
(errmsg("table \"%s.%s\" added to subscription \"%s\"", (errmsg("table \"%s.%s\" added to subscription \"%s\"",
rv->schemaname, rv->relname, sub->name))); rv->schemaname, rv->relname, sub->name)));

View File

@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex); SpinLockRelease(&MyLogicalRepWorker->relmutex);
SetSubscriptionRelState(MyLogicalRepWorker->subid, UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn, MyLogicalRepWorker->relstate_lsn);
true);
walrcv_endstreaming(wrconn, &tli); walrcv_endstreaming(wrconn, &tli);
finish_sync_worker(); finish_sync_worker();
@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand(); StartTransactionCommand();
started_tx = true; started_tx = true;
} }
SetSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state, UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->lsn, true); rstate->relid, rstate->state,
rstate->lsn);
} }
} }
else else
@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */ /* Update the state and make it visible to others. */
StartTransactionCommand(); StartTransactionCommand();
SetSubscriptionRelState(MyLogicalRepWorker->subid, UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn, MyLogicalRepWorker->relstate_lsn);
true);
CommitTransactionCommand(); CommitTransactionCommand();
pgstat_report_stat(false); pgstat_report_stat(false);
@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother * Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good. * with the shmem state as we are exiting for good.
*/ */
SetSubscriptionRelState(MyLogicalRepWorker->subid, UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid, MyLogicalRepWorker->relid,
SUBREL_STATE_SYNCDONE, SUBREL_STATE_SYNCDONE,
*origin_startpos, *origin_startpos);
true);
finish_sync_worker(); finish_sync_worker();
} }
break; break;

View File

@ -67,8 +67,10 @@ typedef struct SubscriptionRelState
char state; char state;
} SubscriptionRelState; } SubscriptionRelState;
extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool update_only); XLogRecPtr sublsn);
extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid, extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok); XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern void RemoveSubscriptionRel(Oid subid, Oid relid);