From 879ad9f90e83b94db14b8be11f1cc1fb38187cc0 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Mon, 8 Jun 2020 10:12:24 +0900 Subject: [PATCH] Fix crash in WAL sender when starting physical replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since database connections can be used with WAL senders in 9.4, it is possible to use physical replication. This commit fixes a crash when starting physical replication with a WAL sender using a database connection, caused by the refactoring done in 850196b. There have been discussions about forbidding the use of physical replication in a database connection, but this is left for later, taking care only of the crash new to 13. While on it, add a test to check for a failure when attempting logical replication if the WAL sender does not have a database connection. This part is extracted from a larger patch by Kyotaro Horiguchi. Reported-by: Vladimir Sitnikov Author: Michael Paquier, Kyotaro Horiguchi Reviewed-by: Kyotaro Horiguchi, Álvaro Herrera Discussion: https://postgr.es/m/CAB=Je-GOWMj1PTPkeUhjqQp-4W3=nW-pXe2Hjax6rJFffB5_Aw@mail.gmail.com Backpatch-through: 13 --- src/backend/access/transam/xlogreader.c | 4 ++- src/backend/replication/walsender.c | 38 ++++++++++----------- src/include/access/xlogreader.h | 4 --- src/test/recovery/t/006_logical_decoding.pl | 11 +++++- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 5995798b58..cb76be4f46 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -44,6 +44,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); +static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, + int segsize, const char *waldir); /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 @@ -210,7 +212,7 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) /* * Initialize the passed segment structs. */ -void +static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2364cbfc61..e2477c47e0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -130,13 +130,11 @@ bool log_replication_commands = false; bool wake_wal_senders = false; /* - * Physical walsender does not use xlogreader to read WAL, but it does use a - * fake one to keep state. Logical walsender uses a proper xlogreader. Both - * keep the 'xlogreader' pointer to the right one, for the sake of common - * routines. + * xlogreader used for replication. Note that a WAL sender doing physical + * replication does not need xlogreader to read WAL, but it needs one to + * keep a state of its work. */ -static XLogReaderState fake_xlogreader; -static XLogReaderState *xlogreader; +static XLogReaderState *xlogreader = NULL; /* * These variables keep track of the state of the timeline we're currently @@ -285,20 +283,6 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); - - /* - * Prepare physical walsender's fake xlogreader struct. Logical walsender - * does this later. - */ - if (!am_db_walsender) - { - xlogreader = &fake_xlogreader; - xlogreader->routine = - *XL_ROUTINE(.segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close); - WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt, - wal_segment_size, NULL); - } } /* @@ -594,6 +578,18 @@ StartReplication(StartReplicationCmd *cmd) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); + /* create xlogreader for physical replication */ + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + NULL); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + /* * We assume here that we're logging enough information in the WAL for * log-shipping, since this is checked in PostmasterMain(). @@ -1643,6 +1639,8 @@ exec_replication_command(const char *cmd_string) StartReplication(cmd); else StartLogicalReplication(cmd); + + Assert(xlogreader != NULL); break; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d930fe957d..b0f2a6ed43 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -262,10 +262,6 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); -/* Initialize supporting structures */ -extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, - int segsize, const char *waldir); - /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index ee05535b1c..78229a7b92 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 13; +use Test::More tests => 14; use Config; # Initialize master node @@ -36,6 +36,15 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +# Check case of walsender not using a database connection. Logical +# decoding should not be allowed. +($result, $stdout, $stderr) = $node_master->psql( + 'template1', + qq[START_REPLICATION SLOT s1 LOGICAL 0/1], + replication => 'true'); +ok($stderr =~ /ERROR: logical decoding requires a database connection/, + "Logical decoding fails on non-database connection"); + $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;] );