Make the tablesync worker's replication origin drop logic robust.

In commit f6c5edb8ab, we started to drop the replication origin slots
before tablesync worker exits to avoid consuming more slots than required.
We were dropping the replication origin in the same transaction where we
were marking the tablesync state as SYNCDONE. Now, if there is any error
after we have dropped the origin but before we commit the containing
transaction, the in-memory state of replication progress won't be rolled
back. Due to this, after the restart, tablesync worker can start streaming
from the wrong location and can apply the already processed transaction.

To fix this, we need to opportunistically drop the origin after marking
the tablesync state as SYNCDONE. Even, if the tablesync worker fails to
remove the replication origin before exit, the apply worker ensures to
clean it up afterward.

Reported by Tom Lane as per buildfarm.
Diagnosed-by: Masahiko Sawada
Author: Hou Zhijie
Reviewed-By: Masahiko Sawada, Amit Kapila
Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com
Discussion: https://postgr.es/m/CAD21AoAw0Oofi4kiDpJBOwpYyBBBkJj=sLUOn4Gd2GjUAKG-fw@mail.gmail.com
This commit is contained in:
Amit Kapila 2022-09-12 12:40:57 +05:30
parent 5015e1e1b5
commit 88f488319b
2 changed files with 75 additions and 41 deletions

View File

@ -931,10 +931,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
logicalrep_worker_stop(sub->oid, relid);
/*
* For READY state and SYNCDONE state, we would have already
* dropped the tablesync origin.
* For READY state, we would have already dropped the
* tablesync origin.
*/
if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
if (state != SUBREL_STATE_READY)
{
char originname[NAMEDATALEN];
@ -942,8 +942,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Drop the tablesync's origin tracking if exists.
*
* It is possible that the origin is not yet created for
* tablesync worker so passing missing_ok = true. This can
* happen for the states before SUBREL_STATE_FINISHEDCOPY.
* tablesync worker, this can happen for the states before
* SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
* apply worker can also concurrently try to drop the
* origin and by this time the origin might be already
* removed. For these reasons, passing missing_ok = true.
*/
ReplicationOriginNameForTablesync(sub->oid, relid, originname,
sizeof(originname));
@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/*
* Drop the tablesync's origin tracking if exists.
*
* For SYNCDONE/READY states, the tablesync origin tracking is known
* to have already been dropped by the tablesync worker.
*
* It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true. This can happen for the states
* before SUBREL_STATE_FINISHEDCOPY.
*/
if (rstate->state != SUBREL_STATE_SYNCDONE)
{
ReplicationOriginNameForTablesync(subid, relid, originname,
sizeof(originname));
replorigin_drop_by_name(originname, true, false);
}
ReplicationOriginNameForTablesync(subid, relid, originname,
sizeof(originname));
replorigin_drop_by_name(originname, true, false);
}
/* Clean up dependencies */

View File

@ -300,7 +300,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/*
* UpdateSubscriptionRelState must be called within a transaction.
* That transaction will be ended within the finish_sync_worker().
*/
if (!IsTransactionState())
StartTransactionCommand();
@ -310,30 +309,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
/*
* Cleanup the tablesync origin tracking.
*
* Resetting the origin session removes the ownership of the slot.
* This is needed to allow the origin to be dropped.
*/
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
originname,
sizeof(originname));
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
/*
* We expect that origin must be present. The concurrent operations
* that remove origin like a refresh for the subscription take an
* access exclusive lock on pg_subscription which prevent the previous
* operation to update the rel state to SUBREL_STATE_SYNCDONE to
* succeed.
*/
replorigin_drop_by_name(originname, false, false);
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
* the slot.
@ -343,7 +318,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/*
* Cleanup the tablesync slot.
*
* This has to be done after the data changes because otherwise if
* This has to be done after updating the state because otherwise if
* there is an error while doing the database operations we won't be
* able to rollback dropped slot.
*/
@ -359,6 +334,49 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*/
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
CommitTransactionCommand();
pgstat_report_stat(false);
/*
* Start a new transaction to clean up the tablesync origin tracking.
* This transaction will be ended within the finish_sync_worker().
* Now, even, if we fail to remove this here, the apply worker will
* ensure to clean it up afterward.
*
* We need to do this after the table state is set to SYNCDONE.
* Otherwise, if an error occurs while performing the database
* operation, the worker will be restarted and the in-memory state of
* replication progress (remote_lsn) won't be rolled-back which would
* have been cleared before restart. So, the restarted worker will use
* invalid replication progress state resulting in replay of
* transactions that have already been applied.
*/
StartTransactionCommand();
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
originname,
sizeof(originname));
/*
* Resetting the origin session removes the ownership of the slot.
* This is needed to allow the origin to be dropped.
*/
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
/*
* Drop the tablesync's origin tracking if exists.
*
* There is a chance that the user is concurrently performing refresh
* for the subscription where we remove the table state and its origin
* or the apply worker would have removed this origin. So passing
* missing_ok = true.
*/
replorigin_drop_by_name(originname, true, false);
finish_sync_worker();
}
else
@ -466,6 +484,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
if (current_lsn >= rstate->lsn)
{
char originname[NAMEDATALEN];
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
@ -475,7 +495,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
/*
* Update the state to READY.
* Remove the tablesync origin tracking if exists.
*
* There is a chance that the user is concurrently performing
* refresh for the subscription where we remove the table
* state and its origin or the tablesync worker would have
* already removed this origin. We can't rely on tablesync
* worker to remove the origin tracking as if there is any
* error while dropping we won't restart it to drop the
* origin. So passing missing_ok = true.
*/
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
rstate->relid,
originname,
sizeof(originname));
replorigin_drop_by_name(originname, true, false);
/*
* Update the state to READY only after the origin cleanup.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,