diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c06b2fa285..5ccfd3105f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -305,10 +305,18 @@ CreateInitDecodingContext(char *plugin, * Create a new decoding context, for a logical slot that has previously been * used already. * - * start_lsn contains the LSN of the last received data or InvalidXLogRecPtr - * output_plugin_options contains options passed to the output plugin - * read_page, prepare_write, do_write are callbacks that have to be filled to - * perform the use-case dependent, actual, work. + * start_lsn + * The LSN at which to start decoding. If InvalidXLogRecPtr, restart + * from the slot's confirmed_flush; otherwise, start from the specified + * location (but move it forwards to confirmed_flush if it's older than + * that, see below). + * + * output_plugin_options + * contains options passed to the output plugin. + * + * read_page, prepare_write, do_write + * callbacks that have to be filled to perform the use-case dependent, + * actual work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -745,7 +753,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, * replication slot. * * Note that in the most cases, we won't be able to immediately use the xmin - * to increase the xmin horizon, we need to wait till the client has confirmed + * to increase the xmin horizon: we need to wait till the client has confirmed * receiving current_lsn with LogicalConfirmReceivedLocation(). */ void @@ -890,7 +898,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) MyReplicationSlot->data.confirmed_flush = lsn; - /* if were past the location required for bumping xmin, do so */ + /* if we're past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && MyReplicationSlot->candidate_xmin_lsn <= lsn) { @@ -926,7 +934,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); - /* first write new xmin to disk, so we know whats up after a crash */ + /* first write new xmin to disk, so we know what's up after a crash */ if (updated_xmin || updated_restart) { ReplicationSlotMarkDirty(); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 84b4d573e7..99112ac1b4 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -238,10 +238,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin PG_TRY(); { - /* - * Passing InvalidXLogRecPtr here causes replay to start at the slot's - * confirmed_flush. - */ + /* restart at slot's confirmed_flush */ ctx = CreateDecodingContext(InvalidXLogRecPtr, options, logical_read_local_xlog_page, @@ -265,13 +262,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx->output_writer_private = p; /* - * We start reading xlog from the restart lsn, even though in - * CreateDecodingContext we set the snapshot builder up using the - * slot's confirmed_flush. This means we might read xlog we don't - * actually decode rows from, but the snapshot builder might need it - * to get to a consistent point. The point we start returning data to - * *users* at is the confirmed_flush lsn set up in the decoding - * context. + * Decoding of WAL must start at restart_lsn so that the entirety of + * xacts that committed after the slot's confirmed_flush can be + * accumulated into reorder buffers. */ startptr = MyReplicationSlot->data.restart_lsn; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index efcce5fdd5..4a56df7103 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -66,7 +66,12 @@ typedef struct ReplicationSlotPersistentData /* oldest LSN that might be required by this replication slot */ XLogRecPtr restart_lsn; - /* oldest LSN that the client has acked receipt for */ + /* + * Oldest LSN that the client has acked receipt for. This is used as the + * start_lsn point in case the client doesn't specify one, and also as a + * safety measure to back off in case the client specifies a start_lsn + * that's further in the future than this value. + */ XLogRecPtr confirmed_flush; /* plugin name */ @@ -113,11 +118,10 @@ typedef struct ReplicationSlot /* all the remaining data is only used for logical slots */ - /* ---- + /* * When the client has confirmed flushes >= candidate_xmin_lsn we can - * advance the catalog xmin, when restart_valid has been passed, + * advance the catalog xmin. When restart_valid has been passed, * restart_lsn can be increased. - * ---- */ TransactionId candidate_catalog_xmin; XLogRecPtr candidate_xmin_lsn;