diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2fdfeb5b4c..09b3e8b32a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + bool should_exit = false; Assert(!IsTransactionState()); @@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = NULL; } - /* - * Even when the two_phase mode is requested by the user, it remains as - * 'pending' until all tablesyncs have reached READY state. - * - * When this happens, we restart the apply worker and (if the conditions - * are still ok) then the two_phase tri-state will become 'enabled' at - * that time. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); - - proc_exit(0); - } - /* * Process all tables that are being synchronized. */ @@ -619,9 +598,36 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (started_tx) { + /* + * Even when the two_phase mode is requested by the user, it remains + * as 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the + * conditions are still ok) then the two_phase tri-state will become + * 'enabled' at that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + { + CommandCounterIncrement(); /* make updates visible */ + if (AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + should_exit = true; + } + } + CommitTransactionCommand(); pgstat_report_stat(true); } + + if (should_exit) + proc_exit(0); } /* @@ -802,6 +808,7 @@ fetch_remote_table_info(char *nspname, char *relname, TupleTableSlot *tslot; Oid attrsRow[] = {INT2VECTOROID}; StringInfoData pub_names; + initStringInfo(&pub_names); foreach(lc, MySubscription->publications) {