Simplify the logical worker type checks by using the switch on worker type.

The current code uses if/else statements at various places to take worker
specific actions. Change those to use the switch on worker type added by
commit 2a8b40e368. This makes code easier to read and understand.

Author: Peter Smith
Reviewed-by: Amit Kapila, Hou Zhijie
Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
This commit is contained in:
Amit Kapila 2023-08-22 08:44:09 +05:30
parent 6fde2d9a00
commit 1cdc6d86bf
3 changed files with 78 additions and 53 deletions

View File

@ -468,39 +468,44 @@ retry:
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
if (is_parallel_apply_worker)
switch (worker->type)
{
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication parallel apply worker for subscription %u",
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
}
else if (is_tablesync_worker)
{
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication tablesync worker for subscription %u sync %u",
subid,
relid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
}
else
{
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication apply worker for subscription %u",
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
case WORKERTYPE_APPLY:
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication apply worker for subscription %u",
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
break;
case WORKERTYPE_PARALLEL_APPLY:
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication parallel apply worker for subscription %u",
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
break;
case WORKERTYPE_TABLESYNC:
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication tablesync worker for subscription %u sync %u",
subid,
relid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
break;
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "unknown worker type");
}
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
if (is_parallel_apply_worker)
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
/* Failed to start worker, so clean up the worker slot. */

View File

@ -649,18 +649,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
/*
* Skip for parallel apply workers because they only operate on tables
* that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
if (am_parallel_apply_worker())
return;
switch (MyLogicalRepWorker->type)
{
case WORKERTYPE_PARALLEL_APPLY:
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
process_syncing_tables_for_apply(current_lsn);
/*
* Skip for parallel apply workers because they only operate on
* tables that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
break;
case WORKERTYPE_TABLESYNC:
process_syncing_tables_for_sync(current_lsn);
break;
case WORKERTYPE_APPLY:
process_syncing_tables_for_apply(current_lsn);
break;
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
}
}
/*

View File

@ -485,25 +485,34 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
if (am_tablesync_worker())
return MyLogicalRepWorker->relid == rel->localreloid;
else if (am_parallel_apply_worker())
switch (MyLogicalRepWorker->type)
{
/* We don't synchronize rel's that are in unknown state. */
if (rel->state != SUBREL_STATE_READY &&
rel->state != SUBREL_STATE_UNKNOWN)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
MySubscription->name),
errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
case WORKERTYPE_TABLESYNC:
return MyLogicalRepWorker->relid == rel->localreloid;
return rel->state == SUBREL_STATE_READY;
case WORKERTYPE_PARALLEL_APPLY:
/* We don't synchronize rel's that are in unknown state. */
if (rel->state != SUBREL_STATE_READY &&
rel->state != SUBREL_STATE_UNKNOWN)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
MySubscription->name),
errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
return rel->state == SUBREL_STATE_READY;
case WORKERTYPE_APPLY:
return (rel->state == SUBREL_STATE_READY ||
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
}
else
return (rel->state == SUBREL_STATE_READY ||
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
return false; /* dummy for compiler */
}
/*