Lag tracking for logical replication

Lag tracking is called for each commit, but we introduce
a pacing delay to ensure we don't swamp the lag tracker.

Author: Petr Jelinek, with minor pacing delay code from me
This commit is contained in:
Simon Riggs 2017-05-12 10:50:56 +01:00
parent efa2c18f4e
commit 024711bb54
7 changed files with 79 additions and 23 deletions

View File

@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
{
ReplicationSlot *slot;
MemoryContext context,
@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
ctx->update_progress = update_progress;
ctx->output_plugin_options = output_plugin_options;
@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
*
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
* read_page, prepare_write, do_write are callbacks that have to be filled to
* perform the use-case dependent, actual, work.
* read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
need_full_snapshot, read_page, prepare_write,
do_write);
do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
* output_plugin_options
* contains options passed to the output plugin.
*
* read_page, prepare_write, do_write
* read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
read_page, prepare_write, do_write);
read_page, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@ -503,6 +509,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
ctx->prepared_write = false;
}
/*
* Update progress tracking (if supported).
*/
void
OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
{
if (!ctx->update_progress)
return;
ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
}
/*
* Load the output plugin, lookup its output plugin init function, and check
* that it provides the required callbacks.

View File

@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
options,
logical_read_local_xlog_page,
LogicalOutputPrepareWrite,
LogicalOutputWrite);
LogicalOutputWrite, NULL);
MemoryContextSwitchTo(oldcontext);

View File

@ -244,6 +244,8 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
OutputPluginUpdateProgress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);

View File

@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
*/
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
false, /* do not build snapshot */
logical_read_local_xlog_page, NULL, NULL);
logical_read_local_xlog_page, NULL, NULL,
NULL);
/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);

View File

@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData);
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
/*
* Signal that we don't need the timeout mechanism. We're just
@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.
*/
logical_decoding_ctx = CreateDecodingContext(
cmd->startpoint, cmd->options,
logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData);
WalSndPrepareWrite,
WalSndWriteData,
WalSndUpdateProgress);
/* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn;
@ -1239,6 +1243,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
SetLatch(MyLatch);
}
/*
* LogicalDecodingContext 'progress_update' callback.
*
* Write the current position to the log tracker (see XLogSendPhysical).
*/
static void
WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
/*
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
* to avoid flooding the lag tracker when we commit frequently.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
if (!TimestampDifferenceExceeds(sendTime, now,
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
return;
LagTrackerWrite(lsn, now);
sendTime = now;
}
/*
* Wait till WAL < loc is flushed to disk so it can be safely read.
*/
@ -2730,9 +2758,9 @@ XLogSendLogical(void)
if (record != NULL)
{
/*
* Note the lack of any call to LagTrackerWrite() which is the responsibility
* of the logical decoding plugin. Response messages are handled normally,
* so this responsibility does not extend to needing to call LagTrackerRead().
* Note the lack of any call to LagTrackerWrite() which is handled
* by WalSndUpdateProgress which is called by output plugin through
* logical decoding write api.
*/
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
* LagTrackerRead can compute the elapsed time (lag) when this WAL position is
* eventually reported to have been written, flushed and applied by the
* standby in a reply message.
* Exported to allow logical decoding plugins to call this when they choose.
*/
void
static void
LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
{
bool buffer_full;

View File

@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
typedef void (*LogicalOutputPluginWriterUpdateProgress) (
struct LogicalDecodingContext *lr,
XLogRecPtr Ptr,
TransactionId xid
);
typedef struct LogicalDecodingContext
{
/* memory context this is all allocated in */
@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
*/
LogicalOutputPluginWriterPrepareWrite prepare_write;
LogicalOutputPluginWriterWrite write;
LogicalOutputPluginWriterUpdateProgress update_progress;
/*
* Output buffer.
@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(
XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
#endif

View File

@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
/* Functions in replication/logical/logical.c */
extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
#endif /* OUTPUT_PLUGIN_H */