Fix crash in WAL sender when starting physical replication

Since database connections can be used with WAL senders in 9.4, it is
possible to use physical replication.  This commit fixes a crash when
starting physical replication with a WAL sender using a database
connection, caused by the refactoring done in 850196b.

There have been discussions about forbidding the use of physical
replication in a database connection, but this is left for later,
taking care only of the crash new to 13.

While on it, add a test to check for a failure when attempting logical
replication if the WAL sender does not have a database connection.  This
part is extracted from a larger patch by Kyotaro Horiguchi.

Reported-by: Vladimir Sitnikov
Author: Michael Paquier, Kyotaro Horiguchi
Reviewed-by: Kyotaro Horiguchi, Álvaro Herrera
Discussion: https://postgr.es/m/CAB=Je-GOWMj1PTPkeUhjqQp-4W3=nW-pXe2Hjax6rJFffB5_Aw@mail.gmail.com
Backpatch-through: 13
This commit is contained in:
Michael Paquier 2020-06-08 10:12:24 +09:00
parent 5a2398b0b2
commit 879ad9f90e
4 changed files with 31 additions and 26 deletions

View File

@ -44,6 +44,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
XLogRecPtr recptr); XLogRecPtr recptr);
static void ResetDecoder(XLogReaderState *state); static void ResetDecoder(XLogReaderState *state);
static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
/* size of the buffer allocated for error message. */ /* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000 #define MAX_ERRORMSG_LEN 1000
@ -210,7 +212,7 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
/* /*
* Initialize the passed segment structs. * Initialize the passed segment structs.
*/ */
void static void
WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir) int segsize, const char *waldir)
{ {

View File

@ -130,13 +130,11 @@ bool log_replication_commands = false;
bool wake_wal_senders = false; bool wake_wal_senders = false;
/* /*
* Physical walsender does not use xlogreader to read WAL, but it does use a * xlogreader used for replication. Note that a WAL sender doing physical
* fake one to keep state. Logical walsender uses a proper xlogreader. Both * replication does not need xlogreader to read WAL, but it needs one to
* keep the 'xlogreader' pointer to the right one, for the sake of common * keep a state of its work.
* routines.
*/ */
static XLogReaderState fake_xlogreader; static XLogReaderState *xlogreader = NULL;
static XLogReaderState *xlogreader;
/* /*
* These variables keep track of the state of the timeline we're currently * These variables keep track of the state of the timeline we're currently
@ -285,20 +283,6 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */ /* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
/*
* Prepare physical walsender's fake xlogreader struct. Logical walsender
* does this later.
*/
if (!am_db_walsender)
{
xlogreader = &fake_xlogreader;
xlogreader->routine =
*XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close);
WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
wal_segment_size, NULL);
}
} }
/* /*
@ -594,6 +578,18 @@ StartReplication(StartReplicationCmd *cmd)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
/* create xlogreader for physical replication */
xlogreader =
XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close),
NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
/* /*
* We assume here that we're logging enough information in the WAL for * We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain(). * log-shipping, since this is checked in PostmasterMain().
@ -1643,6 +1639,8 @@ exec_replication_command(const char *cmd_string)
StartReplication(cmd); StartReplication(cmd);
else else
StartLogicalReplication(cmd); StartLogicalReplication(cmd);
Assert(xlogreader != NULL);
break; break;
} }

View File

@ -262,10 +262,6 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */ /* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state); extern void XLogReaderFree(XLogReaderState *state);
/* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
/* Position the XLogReader to given record */ /* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND #ifdef FRONTEND

View File

@ -7,7 +7,7 @@ use strict;
use warnings; use warnings;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 13; use Test::More tests => 14;
use Config; use Config;
# Initialize master node # Initialize master node
@ -36,6 +36,15 @@ ok( $stderr =~
m/replication slot "test_slot" was not created in this database/, m/replication slot "test_slot" was not created in this database/,
"Logical decoding correctly fails to start"); "Logical decoding correctly fails to start");
# Check case of walsender not using a database connection. Logical
# decoding should not be allowed.
($result, $stdout, $stderr) = $node_master->psql(
'template1',
qq[START_REPLICATION SLOT s1 LOGICAL 0/1],
replication => 'true');
ok($stderr =~ /ERROR: logical decoding requires a database connection/,
"Logical decoding fails on non-database connection");
$node_master->safe_psql('postgres', $node_master->safe_psql('postgres',
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;] qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
); );