diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 5995798b58..cb76be4f46 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -44,6 +44,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); +static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, + int segsize, const char *waldir); /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 @@ -210,7 +212,7 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) /* * Initialize the passed segment structs. */ -void +static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2364cbfc61..e2477c47e0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -130,13 +130,11 @@ bool log_replication_commands = false; bool wake_wal_senders = false; /* - * Physical walsender does not use xlogreader to read WAL, but it does use a - * fake one to keep state. Logical walsender uses a proper xlogreader. Both - * keep the 'xlogreader' pointer to the right one, for the sake of common - * routines. + * xlogreader used for replication. Note that a WAL sender doing physical + * replication does not need xlogreader to read WAL, but it needs one to + * keep a state of its work. */ -static XLogReaderState fake_xlogreader; -static XLogReaderState *xlogreader; +static XLogReaderState *xlogreader = NULL; /* * These variables keep track of the state of the timeline we're currently @@ -285,20 +283,6 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); - - /* - * Prepare physical walsender's fake xlogreader struct. Logical walsender - * does this later. - */ - if (!am_db_walsender) - { - xlogreader = &fake_xlogreader; - xlogreader->routine = - *XL_ROUTINE(.segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close); - WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt, - wal_segment_size, NULL); - } } /* @@ -594,6 +578,18 @@ StartReplication(StartReplicationCmd *cmd) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); + /* create xlogreader for physical replication */ + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + NULL); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + /* * We assume here that we're logging enough information in the WAL for * log-shipping, since this is checked in PostmasterMain(). @@ -1643,6 +1639,8 @@ exec_replication_command(const char *cmd_string) StartReplication(cmd); else StartLogicalReplication(cmd); + + Assert(xlogreader != NULL); break; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d930fe957d..b0f2a6ed43 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -262,10 +262,6 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); -/* Initialize supporting structures */ -extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, - int segsize, const char *waldir); - /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index ee05535b1c..78229a7b92 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 13; +use Test::More tests => 14; use Config; # Initialize master node @@ -36,6 +36,15 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +# Check case of walsender not using a database connection. Logical +# decoding should not be allowed. +($result, $stdout, $stderr) = $node_master->psql( + 'template1', + qq[START_REPLICATION SLOT s1 LOGICAL 0/1], + replication => 'true'); +ok($stderr =~ /ERROR: logical decoding requires a database connection/, + "Logical decoding fails on non-database connection"); + $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;] );