diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4e8e2965b8..8c7fad8f74 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -224,6 +224,8 @@ static void maybe_reread_subscription(void); /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); +static void apply_handle_commit_internal(StringInfo s, + LogicalRepCommitData* commit_data); static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot); static void apply_handle_update_internal(ResultRelInfo *relinfo, @@ -709,29 +711,7 @@ apply_handle_commit(StringInfo s) Assert(commit_data.commit_lsn == remote_final_lsn); - /* The synchronization worker runs in single transaction. */ - if (IsTransactionState() && !am_tablesync_worker()) - { - /* - * Update origin state so we can restart streaming from correct - * position in case of crash. - */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; - - CommitTransactionCommand(); - pgstat_report_stat(false); - - store_flush_position(commit_data.end_lsn); - } - else - { - /* Process any invalidation messages that might have accumulated. */ - AcceptInvalidationMessages(); - maybe_reread_subscription(); - } - - in_remote_transaction = false; + apply_handle_commit_internal(s, &commit_data); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -772,8 +752,10 @@ apply_handle_stream_start(StringInfo s) /* * Start a transaction on stream start, this transaction will be committed - * on the stream stop. We need the transaction for handling the buffile, - * used for serializing the streaming data and subxact info. + * on the stream stop unless it is a tablesync worker in which case it will + * be committed after processing all the messages. We need the transaction + * for handling the buffile, used for serializing the streaming data and + * subxact info. */ ensure_transaction(); @@ -825,8 +807,12 @@ apply_handle_stream_stop(StringInfo s) /* We must be in a valid transaction state */ Assert(IsTransactionState()); - /* Commit the per-stream transaction */ - CommitTransactionCommand(); + /* The synchronization worker runs in single transaction. */ + if (!am_tablesync_worker()) + { + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + } in_streamed_transaction = false; @@ -902,7 +888,10 @@ apply_handle_stream_abort(StringInfo s) { /* Cleanup the subxact info */ cleanup_subxact_info(); - CommitTransactionCommand(); + + /* The synchronization worker runs in single transaction */ + if (!am_tablesync_worker()) + CommitTransactionCommand(); return; } @@ -928,7 +917,9 @@ apply_handle_stream_abort(StringInfo s) /* write the updated subxact list */ subxact_info_write(MyLogicalRepWorker->subid, xid); - CommitTransactionCommand(); + + if (!am_tablesync_worker()) + CommitTransactionCommand(); } } @@ -1048,35 +1039,54 @@ apply_handle_stream_commit(StringInfo s) BufFileClose(fd); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = commit_data.end_lsn; - replorigin_session_origin_timestamp = commit_data.committime; - pfree(buffer); pfree(s2.data); - CommitTransactionCommand(); - pgstat_report_stat(false); - - store_flush_position(commit_data.end_lsn); - elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - in_remote_transaction = false; - - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); } +/* + * Helper function for apply_handle_commit and apply_handle_stream_commit. + */ +static void +apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data) +{ + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; +} + /* * Handle RELATION message. *