diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7cc0a16d3b..72e44d5a02 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -468,39 +468,44 @@ retry: bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - if (is_parallel_apply_worker) + switch (worker->type) { - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication parallel apply worker for subscription %u", - subid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); - } - else if (is_tablesync_worker) - { - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication tablesync worker for subscription %u sync %u", - subid, - relid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); - } - else - { - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication apply worker for subscription %u", - subid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + case WORKERTYPE_APPLY: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + break; + + case WORKERTYPE_PARALLEL_APPLY: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication parallel apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); + + memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); + break; + + case WORKERTYPE_TABLESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication tablesync worker for subscription %u sync %u", + subid, + relid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "unknown worker type"); } bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; bgw.bgw_main_arg = Int32GetDatum(slot); - if (is_parallel_apply_worker) - memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); - if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { /* Failed to start worker, so clean up the worker slot. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 67bdd14095..e2cee92cf2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -649,18 +649,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { - /* - * Skip for parallel apply workers because they only operate on tables - * that are in a READY state. See pa_can_start() and - * should_apply_changes_for_rel(). - */ - if (am_parallel_apply_worker()) - return; + switch (MyLogicalRepWorker->type) + { + case WORKERTYPE_PARALLEL_APPLY: - if (am_tablesync_worker()) - process_syncing_tables_for_sync(current_lsn); - else - process_syncing_tables_for_apply(current_lsn); + /* + * Skip for parallel apply workers because they only operate on + * tables that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + break; + + case WORKERTYPE_TABLESYNC: + process_syncing_tables_for_sync(current_lsn); + break; + + case WORKERTYPE_APPLY: + process_syncing_tables_for_apply(current_lsn); + break; + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); + } } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a20d4c1171..597947410f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -485,25 +485,34 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { - if (am_tablesync_worker()) - return MyLogicalRepWorker->relid == rel->localreloid; - else if (am_parallel_apply_worker()) + switch (MyLogicalRepWorker->type) { - /* We don't synchronize rel's that are in unknown state. */ - if (rel->state != SUBREL_STATE_READY && - rel->state != SUBREL_STATE_UNKNOWN) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", - MySubscription->name), - errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); + case WORKERTYPE_TABLESYNC: + return MyLogicalRepWorker->relid == rel->localreloid; - return rel->state == SUBREL_STATE_READY; + case WORKERTYPE_PARALLEL_APPLY: + /* We don't synchronize rel's that are in unknown state. */ + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_UNKNOWN) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", + MySubscription->name), + errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); + + return rel->state == SUBREL_STATE_READY; + + case WORKERTYPE_APPLY: + return (rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn <= remote_final_lsn)); + + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "Unknown worker type"); } - else - return (rel->state == SUBREL_STATE_READY || - (rel->state == SUBREL_STATE_SYNCDONE && - rel->statelsn <= remote_final_lsn)); + + return false; /* dummy for compiler */ } /*