From d6da71fa8f28faa68823e163f318ffb38a7a9a54 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 11 May 2022 10:51:04 +0530 Subject: [PATCH] Fix the logical replication timeout during large transactions. The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com --- src/backend/replication/logical/logical.c | 27 +++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 44 ++++++++++++++++++++- src/backend/replication/walsender.c | 38 +++++++++++++++--- src/include/replication/logical.h | 3 ++ 4 files changed, 105 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d536a5f3ba..f7d1491907 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -724,6 +724,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.startup_cb(ctx, opt, is_init); @@ -751,6 +752,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.shutdown_cb(ctx); @@ -786,6 +788,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.begin_cb(ctx, txn); @@ -817,6 +820,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* do the actual work: call callback */ ctx->callbacks.commit_cb(ctx, txn, commit_lsn); @@ -857,6 +861,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* * If the plugin supports two-phase commits then begin prepare callback is @@ -901,6 +906,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin supports two-phase commits then prepare callback is @@ -945,6 +951,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then commit prepared callback @@ -990,6 +997,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then rollback prepared callback @@ -1040,6 +1048,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.change_cb(ctx, txn, relation, change); /* Pop the error context stack */ @@ -1080,6 +1090,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ @@ -1107,6 +1119,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid); @@ -1137,6 +1150,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); @@ -1174,6 +1188,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1217,6 +1232,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = first_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_start_cb is required */ if (ctx->callbacks.stream_start_cb == NULL) ereport(ERROR, @@ -1264,6 +1281,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = last_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_stop_cb is required */ if (ctx->callbacks.stream_stop_cb == NULL) ereport(ERROR, @@ -1303,6 +1322,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = abort_lsn; + ctx->end_xact = true; /* in streaming mode, stream_abort_cb is required */ if (ctx->callbacks.stream_abort_cb == NULL) @@ -1347,6 +1367,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode with two-phase commits, stream_prepare_cb is required */ if (ctx->callbacks.stream_prepare_cb == NULL) @@ -1387,6 +1408,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode, stream_commit_cb is required */ if (ctx->callbacks.stream_commit_cb == NULL) @@ -1435,6 +1457,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + /* in streaming mode, stream_change_cb is required */ if (ctx->callbacks.stream_change_cb == NULL) ereport(ERROR, @@ -1479,6 +1503,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1527,6 +1552,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 2ea540ce4d..ff9cf5d406 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -70,6 +70,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static void update_replication_progress(LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -381,7 +382,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + update_replication_progress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -535,6 +536,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + update_replication_progress(ctx); + if (!is_publishable_relation(relation)) return; @@ -677,6 +680,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; + update_replication_progress(ctx); + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -735,6 +740,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; + update_replication_progress(ctx); + if (!data->messages) return; @@ -921,7 +928,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + update_replication_progress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1304,3 +1311,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubtruncate = false; } } + +/* + * Try to update progress and send a keepalive message if too many changes were + * processed. + * + * For a large transaction, if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. + * This can happen when all or most of the changes are not published. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx); + changes_count = 0; + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3b245c619f..28f0a29473 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -240,6 +240,7 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); +static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); @@ -1288,6 +1289,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } /* If we have pending write here, go to slow path */ + ProcessPendingWrites(); +} + +/* + * Wait until there is no pending write. Also process replies from the other + * side and check timeouts during that. + */ +static void +ProcessPendingWrites(void) +{ for (;;) { long sleeptime; @@ -1342,18 +1353,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); + bool end_xact = ctx->end_xact; /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. + * + * We don't have a mechanism to get the ack for any LSN other than end + * xact LSN from the downstream. So, we track lag only for end of + * transaction LSN. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (!TimestampDifferenceExceeds(sendTime, now, - WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) - return; + if (end_xact && TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + { + LagTrackerWrite(lsn, now); + sendTime = now; + } - LagTrackerWrite(lsn, now); - sendTime = now; + /* + * Try to send a keepalive if required. We don't need to try sending keep + * alive messages at the transaction end as that will be done at a later + * point in time. This is required only for large transactions where we + * don't send any changes to the downstream and the receiver can timeout + * due to that. + */ + if (!end_xact && + now >= TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2)) + ProcessPendingWrites(); } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index af551d6f4e..25490f221f 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -49,6 +49,9 @@ typedef struct LogicalDecodingContext */ bool fast_forward; + /* Are we processing the end LSN of a transaction? */ + bool end_xact; + OutputPluginCallbacks callbacks; OutputPluginOptions options;