diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 018fdf3d34..b67da2ee92 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -118,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) return NULL; } +#ifndef FRONTEND + /* Will be loaded on first read */ + state->timelineHistory = NIL; +#endif + return state; } @@ -137,6 +142,10 @@ XLogReaderFree(XLogReaderState *state) pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf); +#ifndef FRONTEND + if (state->timelineHistory) + list_free_deep(state->timelineHistory); +#endif pfree(state->readBuf); pfree(state); } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 2635d80dc0..f6ca2b95e5 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -19,6 +19,7 @@ #include +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -659,6 +660,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) /* state maintained across calls */ static int sendFile = -1; static XLogSegNo sendSegNo = 0; + static TimeLineID sendTLI = 0; static uint32 sendOff = 0; p = buf; @@ -674,7 +676,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || + sendTLI != tli) { char path[MAXPGPATH]; @@ -701,6 +704,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + sendTLI = tli; } /* Need to seek in the file? */ @@ -748,6 +752,147 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) } } +/* + * Determine XLogReaderState->currTLI and ->currTLIValidUntil; + * XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the + * decision. This may later be used to determine which xlog segment file to + * open, etc. + * + * We switch to an xlog segment from the new timeline eagerly when on a + * historical timeline, as soon as we reach the start of the xlog segment + * containing the timeline switch. The server copied the segment to the new + * timeline so all the data up to the switch point is the same, but there's no + * guarantee the old segment will still exist. It may have been deleted or + * renamed with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * Because of this, callers MAY NOT assume that currTLI is the timeline that + * will be in a page's xlp_tli; the page may begin on an older timeline or we + * might be reading from historical timeline data on a segment that's been + * copied to a new timeline. + */ +static void +XLogReadDetermineTimeline(XLogReaderState *state) +{ + /* Read the history on first time through */ + if (state->timelineHistory == NIL) + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); + + /* + * Are we reading the record immediately following the one we read last + * time? If not, then don't use the cached timeline info. + */ + if (state->currRecPtr != state->EndRecPtr) + { + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + /* + * Are we reading a timeline that used to be the latest one, but became + * historical? This can happen in a replica that gets promoted, and in a + * cascading replica whose upstream gets promoted. In either case, + * re-read the timeline history data. We cannot read past the timeline + * switch point, because either the records in the old timeline might be + * invalid, or worse, they may valid but *different* from the ones we + * should be reading. + */ + if (state->currTLIValidUntil == InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0) + { + /* re-read timeline history */ + list_free_deep(state->timelineHistory); + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); + + elog(DEBUG2, "timeline %u became historical during decoding", + state->currTLI); + + /* then invalidate the cached timeline info */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + /* + * Are we reading a record immediately following a timeline switch? If + * so, we must follow the switch too. + */ + if (state->currRecPtr == state->EndRecPtr && + state->currTLI != 0 && + state->currTLIValidUntil != InvalidXLogRecPtr && + state->currRecPtr >= state->currTLIValidUntil) + { + elog(DEBUG2, + "requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline", + (uint32) (state->currRecPtr >> 32), + (uint32) state->currRecPtr, + state->currTLI, + (uint32) (state->currTLIValidUntil >> 32), + (uint32) (state->currTLIValidUntil)); + + /* invalidate TLI info so we look up the next TLI */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + if (state->currTLI == 0) + { + /* + * Something changed; work out what timeline this record is on. We + * might read it from the segment on this TLI or, if the segment is + * also contained by newer timelines, the copy from a newer TLI. + */ + state->currTLI = tliOfPointInHistory(state->currRecPtr, + state->timelineHistory); + + /* + * Look for the most recent timeline that's on the same xlog segment + * as this record, since that's the only one we can assume is still + * readable. + */ + while (state->currTLI != ThisTimeLineID && + state->currTLIValidUntil == InvalidXLogRecPtr) + { + XLogRecPtr tliSwitch; + TimeLineID nextTLI; + + CHECK_FOR_INTERRUPTS(); + + tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory, + &nextTLI); + + /* round ValidUntil down to start of seg containing the switch */ + state->currTLIValidUntil = + ((tliSwitch / XLogSegSize) * XLogSegSize); + + if (state->currRecPtr >= state->currTLIValidUntil) + { + /* + * The new currTLI ends on this WAL segment so check the next + * TLI to see if it's the last one on the segment. + * + * If that's the current TLI we'll stop searching. + */ + state->currTLI = nextTLI; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + } + + /* + * We're now either reading from the first xlog segment in the current + * server's timeline or the most recent historical timeline that + * exists on the target segment. + */ + elog(DEBUG2, "XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u", + (uint32) (state->currRecPtr >> 32), + (uint32) state->currRecPtr, + state->currTLI, + (uint32) (state->currTLIValidUntil >> 32), + (uint32) (state->currTLIValidUntil), + ThisTimeLineID); + } +} + /* * read_page callback for reading local xlog files * @@ -761,48 +906,101 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) */ int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page, + TimeLineID *pageTLI) { - XLogRecPtr flushptr, + XLogRecPtr read_upto, loc; int count; loc = targetPagePtr + reqLen; + + /* Make sure enough xlog is available... */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. */ - if (!RecoveryInProgress()) + XLogReadDetermineTimeline(state); + + if (state->currTLI == ThisTimeLineID) { - *pageTLI = ThisTimeLineID; - flushptr = GetFlushRecPtr(); + /* + * We're reading from the current timeline so we might have to + * wait for the desired record to be generated (or, for a standby, + * received & replayed) + */ + if (!RecoveryInProgress()) + { + *pageTLI = ThisTimeLineID; + read_upto = GetFlushRecPtr(); + } + else + read_upto = GetXLogReplayRecPtr(pageTLI); + + if (loc <= read_upto) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - flushptr = GetXLogReplayRecPtr(pageTLI); + { + /* + * We're on a historical timeline, so limit reading to the switch + * point where we moved to the next timeline. + * + * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know + * about the new timeline, so we must've received past the end of + * it. + */ + read_upto = state->currTLIValidUntil; - if (loc <= flushptr) + /* + * Setting pageTLI to our wanted record's TLI is slightly wrong; + * the page might begin on an older timeline if it contains a + * timeline switch, since its xlog segment will have been copied + * from the prior timeline. This is pretty harmless though, as + * nothing cares so long as the timeline doesn't go backwards. We + * should read the page header instead; FIXME someday. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } - /* more than one block available */ - if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + if (targetPagePtr + XLOG_BLCKSZ <= read_upto) + { + /* + * more than one block available; read only that block, have caller + * come back if they need more. + */ count = XLOG_BLCKSZ; - /* not enough data there */ - else if (targetPagePtr + reqLen > flushptr) + } + else if (targetPagePtr + reqLen > read_upto) + { + /* not enough data there */ return -1; - /* part of the page available */ + } else - count = flushptr - targetPagePtr; + { + /* enough bytes available to satisfy the request */ + count = read_upto - targetPagePtr; + } + /* + * Even though we just determined how much of the page can be validly read + * as 'count', read the whole page anyway. It's guaranteed to be + * zero-padded up to the page boundary if it's incomplete. + */ XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ); + /* number of valid bytes in the buffer */ return count; } diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 3853ab4cf5..dd6cd62ccd 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; - /* compute the current end-of-wal */ - if (!RecoveryInProgress()) - end_of_wal = GetFlushRecPtr(); - else - end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name)); PG_TRY(); @@ -273,7 +267,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * slot's confirmed_flush. This means we might read xlog we don't * actually decode rows from, but the snapshot builder might need it * to get to a consistent point. The point we start returning data to - * *users* at is the candidate restart lsn from the decoding context. + * *users* at is the confirmed_flush lsn set up in the decoding + * context. */ startptr = MyReplicationSlot->data.restart_lsn; @@ -282,8 +277,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + if (!RecoveryInProgress()) + end_of_wal = GetFlushRecPtr(); + else + end_of_wal = GetXLogReplayRecPtr(NULL); + + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || - (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal)) + (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { XLogRecord *record; char *errm = NULL; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index deaa7f5128..300747dbf7 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -27,6 +27,10 @@ #include "access/xlogrecord.h" +#ifndef FRONTEND +#include "nodes/pg_list.h" +#endif + typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ @@ -160,11 +164,25 @@ struct XLogReaderState /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; + /* timeline to read it from, 0 if a lookup is required */ + TimeLineID currTLI; + /* + * Safe point to read to in currTLI. If currTLI is historical, then this + * is set to the end of the last whole segment that contains that TLI; + * if currTLI is ThisTimeLineID, this is InvalidXLogRecPtr. This is *not* + * the tliSwitchPoint. + */ + XLogRecPtr currTLIValidUntil; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; uint32 readRecordBufSize; +#ifndef FRONTEND + /* cached timeline history, only available in backend */ + List *timelineHistory; +#endif + /* Buffer to hold error message */ char *errormsg_buf; }; diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 6167ec1344..ebdcdc8c2a 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -13,6 +13,7 @@ SUBDIRS = \ test_parser \ test_rls_hooks \ test_shm_mq \ + test_slot_timelines \ worker_spi all: submake-errcodes diff --git a/src/test/modules/test_slot_timelines/.gitignore b/src/test/modules/test_slot_timelines/.gitignore new file mode 100644 index 0000000000..543c50d1bb --- /dev/null +++ b/src/test/modules/test_slot_timelines/.gitignore @@ -0,0 +1,3 @@ +results/ +tmp_check/ +log/ diff --git a/src/test/modules/test_slot_timelines/Makefile b/src/test/modules/test_slot_timelines/Makefile new file mode 100644 index 0000000000..21757c5bcb --- /dev/null +++ b/src/test/modules/test_slot_timelines/Makefile @@ -0,0 +1,22 @@ +# src/test/modules/test_slot_timelines/Makefile + +MODULES = test_slot_timelines +PGFILEDESC = "test_slot_timelines - test utility for slot timeline following" + +EXTENSION = test_slot_timelines +DATA = test_slot_timelines--1.0.sql + +EXTRA_INSTALL=contrib/test_decoding +REGRESS=load_extension +REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_slot_timelines/test_slot_timelines.conf + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_slot_timelines +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_slot_timelines/README b/src/test/modules/test_slot_timelines/README new file mode 100644 index 0000000000..585f02f1e6 --- /dev/null +++ b/src/test/modules/test_slot_timelines/README @@ -0,0 +1,19 @@ +A test module for logical decoding failover and timeline following. + +This module provides a minimal way to maintain logical slots on replicas +that mirror the state on the master. It doesn't make decoding possible, +just tracking slot state so that a decoding client that's using the master +can follow a physical failover to the standby. The master doesn't know +about the slots on the standby, they're synced by a client that connects +to both. + +This is intentionally not part of the test_decoding module because that's meant +to serve as example code, where this module exercises internal server features +by unsafely exposing internal state to SQL. It's not the right way to do +failover, it's just a simple way to test it from the perl TAP framework to +prove the feature works. + +In a practical implementation of this approach a bgworker on the master would +monitor slot positions and relay them to a bgworker on the standby that applies +the position updates without exposing slot internals to SQL. That's too complex +for this test framework though. diff --git a/src/test/modules/test_slot_timelines/expected/load_extension.out b/src/test/modules/test_slot_timelines/expected/load_extension.out new file mode 100644 index 0000000000..14a414aa7e --- /dev/null +++ b/src/test/modules/test_slot_timelines/expected/load_extension.out @@ -0,0 +1,19 @@ +CREATE EXTENSION test_slot_timelines; +SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding'); + test_slot_timelines_create_logical_slot +----------------------------------------- + +(1 row) + +SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location()); + test_slot_timelines_advance_logical_slot +------------------------------------------ + +(1 row) + +SELECT pg_drop_replication_slot('test_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/src/test/modules/test_slot_timelines/sql/load_extension.sql b/src/test/modules/test_slot_timelines/sql/load_extension.sql new file mode 100644 index 0000000000..a71127d471 --- /dev/null +++ b/src/test/modules/test_slot_timelines/sql/load_extension.sql @@ -0,0 +1,7 @@ +CREATE EXTENSION test_slot_timelines; + +SELECT test_slot_timelines_create_logical_slot('test_slot', 'test_decoding'); + +SELECT test_slot_timelines_advance_logical_slot('test_slot', txid_current(), txid_current(), pg_current_xlog_location(), pg_current_xlog_location()); + +SELECT pg_drop_replication_slot('test_slot'); diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql new file mode 100644 index 0000000000..31d7f8ef1c --- /dev/null +++ b/src/test/modules/test_slot_timelines/test_slot_timelines--1.0.sql @@ -0,0 +1,16 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_slot_timelines" to load this file. \quit + +CREATE OR REPLACE FUNCTION test_slot_timelines_create_logical_slot(slot_name text, plugin text) +RETURNS void +LANGUAGE c AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION test_slot_timelines_create_logical_slot(text, text) +IS 'Create a logical slot at a particular lsn and xid. Do not use in production servers, it is not safe. The slot is created with an invalid xmin and lsn.'; + +CREATE OR REPLACE FUNCTION test_slot_timelines_advance_logical_slot(slot_name text, new_xmin bigint, new_catalog_xmin bigint, new_restart_lsn pg_lsn, new_confirmed_lsn pg_lsn) +RETURNS void +LANGUAGE c AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION test_slot_timelines_advance_logical_slot(text, bigint, bigint, pg_lsn, pg_lsn) +IS 'Advance a logical slot directly. Do not use this in production servers, it is not safe.'; diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.c b/src/test/modules/test_slot_timelines/test_slot_timelines.c new file mode 100644 index 0000000000..74dd1a041b --- /dev/null +++ b/src/test/modules/test_slot_timelines/test_slot_timelines.c @@ -0,0 +1,133 @@ +/*-------------------------------------------------------------------------- + * + * test_slot_timelines.c + * Test harness code for slot timeline following + * + * Copyright (c) 2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_slot_timelines/test_slot_timelines.c + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/transam.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "replication/slot.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_slot_timelines_create_logical_slot); +PG_FUNCTION_INFO_V1(test_slot_timelines_advance_logical_slot); + +static void clear_slot_transient_state(void); + +/* + * Create a new logical slot, with invalid LSN and xid, directly. This does not + * use the snapshot builder or logical decoding machinery. It's only intended + * for creating a slot on a replica that mirrors the state of a slot on an + * upstream master. + * + * Note that this is test harness code. You shouldn't expose slot internals + * to SQL like this for any real world usage. See the README. + */ +Datum +test_slot_timelines_create_logical_slot(PG_FUNCTION_ARGS) +{ + char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0)); + char *plugin = text_to_cstring(PG_GETARG_TEXT_P(1)); + + CheckSlotRequirements(); + + ReplicationSlotCreate(slotname, true, RS_PERSISTENT); + + /* register the plugin name with the slot */ + StrNCpy(NameStr(MyReplicationSlot->data.plugin), plugin, NAMEDATALEN); + + /* + * Initialize persistent state to placeholders to be set by + * test_slot_timelines_advance_logical_slot . + */ + MyReplicationSlot->data.xmin = InvalidTransactionId; + MyReplicationSlot->data.catalog_xmin = InvalidTransactionId; + MyReplicationSlot->data.restart_lsn = InvalidXLogRecPtr; + MyReplicationSlot->data.confirmed_flush = InvalidXLogRecPtr; + + clear_slot_transient_state(); + + ReplicationSlotRelease(); + + PG_RETURN_VOID(); +} + +/* + * Set the state of a slot. + * + * This doesn't maintain the non-persistent state at all, + * but since the slot isn't in use that's OK. + * + * There's intentionally no check to prevent slots going backwards + * because they can actually go backwards if the master crashes when + * it hasn't yet flushed slot state to disk then we copy the older + * slot state after recovery. + * + * There's no checking done for xmin or catalog xmin either, since + * we can't really do anything useful that accounts for xid wrap-around. + * + * Note that this is test harness code. You shouldn't expose slot internals + * to SQL like this for any real world usage. See the README. + */ +Datum +test_slot_timelines_advance_logical_slot(PG_FUNCTION_ARGS) +{ + char *slotname = text_to_cstring(PG_GETARG_TEXT_P(0)); + TransactionId new_xmin = (TransactionId) PG_GETARG_INT64(1); + TransactionId new_catalog_xmin = (TransactionId) PG_GETARG_INT64(2); + XLogRecPtr restart_lsn = PG_GETARG_LSN(3); + XLogRecPtr confirmed_lsn = PG_GETARG_LSN(4); + + CheckSlotRequirements(); + + ReplicationSlotAcquire(slotname); + + if (MyReplicationSlot->data.database != MyDatabaseId) + elog(ERROR, "Trying to update a slot on a different database"); + + MyReplicationSlot->data.xmin = new_xmin; + MyReplicationSlot->data.catalog_xmin = new_catalog_xmin; + MyReplicationSlot->data.restart_lsn = restart_lsn; + MyReplicationSlot->data.confirmed_flush = confirmed_lsn; + + clear_slot_transient_state(); + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); + + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + + PG_RETURN_VOID(); +} + +static void +clear_slot_transient_state(void) +{ + Assert(MyReplicationSlot != NULL); + + /* + * Make sure the slot state is the same as if it were newly loaded from + * disk on recovery. + */ + MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin; + MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; + + MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId; + MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr; + MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; + MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; +} diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.conf b/src/test/modules/test_slot_timelines/test_slot_timelines.conf new file mode 100644 index 0000000000..56b46d7e27 --- /dev/null +++ b/src/test/modules/test_slot_timelines/test_slot_timelines.conf @@ -0,0 +1,2 @@ +max_replication_slots=2 +wal_level=logical diff --git a/src/test/modules/test_slot_timelines/test_slot_timelines.control b/src/test/modules/test_slot_timelines/test_slot_timelines.control new file mode 100644 index 0000000000..dcee1a7257 --- /dev/null +++ b/src/test/modules/test_slot_timelines/test_slot_timelines.control @@ -0,0 +1,5 @@ +# test_slot_timelines extension +comment = 'Test utility for slot timeline following and logical decoding' +default_version = '1.0' +module_pathname = '$libdir/test_slot_timelines' +relocatable = true diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index 929071909a..78570dd156 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,6 +9,8 @@ # #------------------------------------------------------------------------- +EXTRA_INSTALL=contrib/test_decoding src/test/modules/test_slot_timelines + subdir = src/test/recovery top_builddir = ../../.. include $(top_builddir)/src/Makefile.global diff --git a/src/test/recovery/t/006_logical_decoding_timelines.pl b/src/test/recovery/t/006_logical_decoding_timelines.pl new file mode 100644 index 0000000000..bc20f405d7 --- /dev/null +++ b/src/test/recovery/t/006_logical_decoding_timelines.pl @@ -0,0 +1,304 @@ +# Demonstrate that logical can follow timeline switches. +# +# Logical replication slots can follow timeline switches but it's +# normally not possible to have a logical slot on a replica where +# promotion and a timeline switch can occur. The only ways +# we can create that circumstance are: +# +# * By doing a filesystem-level copy of the DB, since pg_basebackup +# excludes pg_replslot but we can copy it directly; or +# +# * by creating a slot directly at the C level on the replica and +# advancing it as we go using the low level APIs. It can't be done +# from SQL since logical decoding isn't allowed on replicas. +# +# This module uses the first approach to show that timeline following +# on a logical slot works. +# +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 20; +use RecursiveCopy; +use File::Copy; + +my ($stdout, $stderr, $ret); + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n"); +$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n"); +$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n"); +$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n"); +$node_master->dump_info; +$node_master->start; + +diag "Testing logical timeline following with a filesystem-level copy"; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);"); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('beforebb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my $backup_name = 'b1'; +$node_master->backup_fs_hot($backup_name); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->start; + +$node_master->safe_psql('postgres', +"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" +); +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('afterbb');"); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +# Verify that only the before base_backup slot is on the replica +$stdout = $node_replica->safe_psql('postgres', + 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, 'before_basebackup', + 'Expected to find only slot before_basebackup on replica'); + +# Boom, crash +$node_master->stop('immediate'); + +$node_replica->promote; +$node_replica->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();"); + +$node_replica->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('after failover');"); + +# Shouldn't be able to read from slot created after base backup +($ret, $stdout, $stderr) = $node_replica->psql('postgres', +"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');" +); +is($ret, 3, 'replaying from after_basebackup slot fails'); +like( + $stderr, + qr/replication slot "after_basebackup" does not exist/, + 'after_basebackup slot missing'); + +# Should be able to read from slot created before base backup +($ret, $stdout, $stderr) = $node_replica->psql( + 'postgres', +"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", + timeout => 30); +is($ret, 0, 'replay from slot before_basebackup succeeds'); +is( $stdout, q(BEGIN +table public.decoding: INSERT: blah[text]:'beforebb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT), 'decoded expected data from slot before_basebackup'); +is($stderr, '', 'replay from slot before_basebackup produces no stderr'); + +# We don't need the standby anymore +$node_replica->teardown_node(); + + +# OK, time to try the same thing again, but this time we'll be using slot +# mirroring on the standby and a pg_basebackup of the master. + +diag "Testing logical timeline following with test_slot_timelines module"; + +$node_master->start(); + +# Clean up after the last test +$node_master->safe_psql('postgres', 'DELETE FROM decoding;'); +is( $node_master->psql( + 'postgres', +'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'), + 0, + 'dropping slots succeeds via pg_drop_replication_slot'); + +# Same as before, we'll make one slot before basebackup, one after. This time +# the basebackup will be with pg_basebackup so it'll omit both slots, then +# we'll use SQL functions provided by the test_slot_timelines test module to sync +# them to the replica, do some work, sync them and fail over then test again. +# This time we should have both the before- and after-basebackup slots working. + +is( $node_master->psql( + 'postgres', +"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');" + ), + 0, + 'creating slot before_basebackup succeeds'); + +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('beforebb');"); + +$backup_name = 'b2'; +$node_master->backup($backup_name); + +is( $node_master->psql( + 'postgres', +"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');" + ), + 0, + 'creating slot after_basebackup succeeds'); + +$node_master->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('afterbb');"); + +$node_replica = get_new_node('replica2'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); + +$node_replica->start; + +# Verify the slots are both absent on the replica +$stdout = $node_replica->safe_psql('postgres', + 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is($stdout, '', 'No slots exist on the replica'); + +# Now do our magic to sync the slot states across. Normally +# this would be being done continuously by a bgworker but +# we're just doing it by hand for this test. This is exposing +# postgres innards to SQL so it's unsafe except for testing. +$node_master->safe_psql('postgres', 'CREATE EXTENSION test_slot_timelines;'); +my $slotinfo = $node_master->safe_psql('postgres', +'SELECT slot_name, plugin, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots ORDER BY slot_name' +); +diag "Copying slots to replica"; +open my $fh, '<', \$slotinfo or die $!; +while (<$fh>) +{ + print $_; + chomp $_; + my ($slot_name, $plugin, $xmin, $catalog_xmin, $restart_lsn, + $confirmed_flush_lsn) + = map { + if ($_ ne '') { "'$_'" } + else { 'NULL' } + } split qr/\|/, $_; + + print +"# Copying slot $slot_name,$plugin,$xmin,$catalog_xmin,$restart_lsn,$confirmed_flush_lsn\n"; + $node_replica->safe_psql('postgres', + "SELECT test_slot_timelines_create_logical_slot($slot_name, $plugin);" + ); + $node_replica->safe_psql('postgres', +"SELECT test_slot_timelines_advance_logical_slot($slot_name, $xmin, $catalog_xmin, $restart_lsn, $confirmed_flush_lsn);" + ); +} +close $fh or die $!; + +# Now both slots are present on the replica and exactly match the master +$stdout = $node_replica->safe_psql('postgres', + 'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name'); +is( $stdout, + "after_basebackup\nbefore_basebackup", + 'both slots now exist on replica'); + +$stdout = $node_replica->safe_psql( + 'postgres', + qq{SELECT slot_name, plugin, xmin, catalog_xmin, + restart_lsn, confirmed_flush_lsn + FROM pg_replication_slots + ORDER BY slot_name}); +is($stdout, $slotinfo, + "slot data read back from replica matches slot data on master"); + +# We now have to copy some extra WAL to satisfy the requirements of the oldest +# replication slot. pg_basebackup doesn't know to copy the extra WAL for slots +# so we have to help out. We know the WAL is still retained on the master +# because we haven't advanced the slots there. +# +# Figure out what the oldest segment we need is by looking at the restart_lsn +# of the oldest slot. +# +# It only makes sense to do this once the slots are created on the replica, +# otherwise it might just delete the segments again. + +my $oldest_needed_segment = $node_master->safe_psql( + 'postgres', + qq{SELECT pg_xlogfile_name(( + SELECT restart_lsn + FROM pg_replication_slots + ORDER BY restart_lsn ASC + LIMIT 1 + ));} +); + +diag "oldest needed xlog seg is $oldest_needed_segment "; + +# WAL segment names sort lexically so we can just grab everything > than this +# segment. +opendir(my $pg_xlog, $node_master->data_dir . "/pg_xlog") or die $!; +while (my $seg = readdir $pg_xlog) +{ + next unless $seg >= $oldest_needed_segment && $seg =~ /^[0-9]{24}/; + diag "copying xlog seg $seg"; + copy( + $node_master->data_dir . "/pg_xlog/" . $seg, + $node_replica->data_dir . "/pg_xlog/" . $seg + ) or die "copy of xlog seg $seg failed: $!"; +} +closedir $pg_xlog; + +# Boom, crash the master +$node_master->stop('immediate'); + +$node_replica->promote; +$node_replica->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();"); + +$node_replica->safe_psql('postgres', + "INSERT INTO decoding(blah) VALUES ('after failover');"); + +# This time we can read from both slots +($ret, $stdout, $stderr) = $node_replica->psql( + 'postgres', +"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", + timeout => 30); +is($ret, 0, 'replay from slot after_basebackup succeeds'); +is( $stdout, q(BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT), 'decoded expected data from slot after_basebackup'); +is($stderr, '', 'replay from slot after_basebackup produces no stderr'); + +# Should be able to read from slot created before base backup +# +# This would fail with an error about missing WAL segments if we hadn't +# copied extra WAL earlier. +($ret, $stdout, $stderr) = $node_replica->psql( + 'postgres', +"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');", + timeout => 30); +is($ret, 0, 'replay from slot before_basebackup succeeds'); +is( $stdout, q(BEGIN +table public.decoding: INSERT: blah[text]:'beforebb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'afterbb' +COMMIT +BEGIN +table public.decoding: INSERT: blah[text]:'after failover' +COMMIT), 'decoded expected data from slot before_basebackup'); +is($stderr, '', 'replay from slot before_basebackup produces no stderr'); + +($ret, $stdout, $stderr) = $node_replica->psql('postgres', + 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;'); +is($ret, 0, 'dropping slots succeeds via pg_drop_replication_slot'); +is($stderr, '', 'dropping slots produces no stderr output'); + +1;