Wake up a subscription's replication worker processes after DDL.

Waken related worker processes immediately at commit of a transaction
that has performed ALTER SUBSCRIPTION (including the RENAME and
OWNER variants).  This reduces the response time for such operations.
In the real world that might not be worth much, but it shaves several
seconds off the runtime for the subscription test suite.

In the case of PREPARE, we just throw away this notification state;
it doesn't seem worth the work to preserve it.  The workers will
still react after the eventual COMMIT PREPARED, but not as quickly.

Nathan Bossart

Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13
This commit is contained in:
Tom Lane 2023-01-06 16:08:20 -05:00
parent 4c032dd804
commit c6e1f62e2c
5 changed files with 72 additions and 0 deletions

View File

@ -47,6 +47,7 @@
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
@ -2360,6 +2361,7 @@ CommitTransaction(void)
AtEOXact_PgStat(true, is_parallel_worker);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
AtEOXact_LogicalRepWorkers(true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@ -2647,6 +2649,9 @@ PrepareTransaction(void)
AtEOXact_HashTables(true);
/* don't call AtEOXact_PgStat here; we fixed pgstat state above */
AtEOXact_Snapshot(true, true);
/* we treat PREPARE as ROLLBACK so far as waking workers goes */
AtEOXact_ApplyLauncher(false);
AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@ -2860,6 +2865,7 @@ AbortTransaction(void)
AtEOXact_HashTables(false);
AtEOXact_PgStat(false, is_parallel_worker);
AtEOXact_ApplyLauncher(false);
AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
}

View File

@ -59,6 +59,7 @@
#include "commands/user.h"
#include "miscadmin.h"
#include "parser/parse_func.h"
#include "replication/logicalworker.h"
#include "rewrite/rewriteDefine.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@ -279,6 +280,9 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
if (strncmp(new_name, "regress_", 8) != 0)
elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
#endif
/* Wake up related replication workers to handle this change quickly */
LogicalRepWorkersWakeupAtCommit(objectId);
}
else if (nameCacheId >= 0)
{

View File

@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
/* Wake up related replication workers to handle this change quickly. */
LogicalRepWorkersWakeupAtCommit(subid);
return myself;
}
@ -1732,7 +1736,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
InvokeObjectPostAlterHook(SubscriptionRelationId,
form->oid, 0);
/* Wake up related background processes to handle this change quickly. */
ApplyLauncherWakeupAtCommit();
LogicalRepWorkersWakeupAtCommit(form->oid);
}
/*

View File

@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
@ -4092,3 +4094,53 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.remote_attnum = -1;
set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
/*
* Request wakeup of the workers for the given subscription OID
* at commit of the current transaction.
*
* This is used to ensure that the workers process assorted changes
* as soon as possible.
*/
void
LogicalRepWorkersWakeupAtCommit(Oid subid)
{
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
on_commit_wakeup_workers_subids =
list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
MemoryContextSwitchTo(oldcxt);
}
/*
* Wake up the workers of any subscriptions that were changed in this xact.
*/
void
AtEOXact_LogicalRepWorkers(bool isCommit)
{
if (isCommit && on_commit_wakeup_workers_subids != NIL)
{
ListCell *lc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
foreach(lc, on_commit_wakeup_workers_subids)
{
Oid subid = lfirst_oid(lc);
List *workers;
ListCell *lc2;
workers = logicalrep_workers_find(subid, true);
foreach(lc2, workers)
{
LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
logicalrep_worker_wakeup_ptr(worker);
}
}
LWLockRelease(LogicalRepWorkerLock);
}
/* The List storage will be reclaimed automatically in xact cleanup. */
on_commit_wakeup_workers_subids = NIL;
}

View File

@ -16,4 +16,8 @@ extern void ApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
extern void AtEOXact_LogicalRepWorkers(bool isCommit);
#endif /* LOGICALWORKER_H */