Make logical WAL sender report streaming state appropriately

WAL senders sending logically-decoded data fail to properly report in
"streaming" state when starting up, hence as long as one extra record is
not replayed, such WAL senders would remain in a "catchup" state, which
is inconsistent with the physical cousin.

This can be easily reproduced by for example using pg_recvlogical and
restarting the upstream server.  The TAP tests have been slightly
modified to detect the failure and strengthened so as future tests also
make sure that a node is in streaming state when waiting for its
catchup.

Backpatch down to 9.4 where this code has been introduced.

Reported-by: Sawada Masahiko
Author: Simon Riggs, Sawada Masahiko
Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran
Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
This commit is contained in:
Michael Paquier 2018-07-12 10:20:14 +09:00
parent 4b8860e2dd
commit d5eb1fe0dc

View File

@ -1926,8 +1926,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
ereport(DEBUG1,
(errmsg("standby \"%s\" has now caught up with primary",
application_name)));
(errmsg("\"%s\" has now caught up with upstream server",
application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
}
@ -2483,10 +2483,10 @@ XLogSendLogical(void)
char *errm;
/*
* Don't know whether we've caught up yet. We'll set it to true in
* WalSndWaitForWal, if we're actually waiting. We also set to true if
* XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
* i.e. when we're shutting down.
* Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
* true in WalSndWaitForWal, if we're actually waiting. We also set to
* true if XLogReadRecord() had to stop reading but WalSndWaitForWal
* didn't wait - i.e. when we're shutting down.
*/
WalSndCaughtUp = false;
@ -2499,9 +2499,19 @@ XLogSendLogical(void)
if (record != NULL)
{
/* XXX: Note that logical decoding cannot be used while in recovery */
XLogRecPtr flushPtr = GetFlushRecPtr();
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
/*
* If we have sent a record that is at or beyond the flushed point, we
* have caught up.
*/
if (sentPtr >= flushPtr)
WalSndCaughtUp = true;
}
else
{