For cascading replication, wake physical and logical walsenders separately

Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Per idea from Jeff Davis and Amit Kapila.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-By: Jeff Davis <pgsql@j-davis.com>
Reviewed-By: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com
This commit is contained in:
Andres Freund 2023-04-08 00:24:24 -07:00
parent 26669757b6
commit e101dfac3a
7 changed files with 84 additions and 29 deletions

View File

@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* If we still haven't flushed to the request point then we have a
@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* Great, done. To take some work off the critical path, try to initialize
@ -5765,7 +5765,7 @@ StartupXLOG(void)
* If there were cascading standby servers connected to us, nudge any wal
* sender processes to notice that we've been promoted.
*/
WalSndWakeup();
WalSndWakeup(true, true);
/*
* If this was a promotion, request an (online) checkpoint now. This isn't

View File

@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
* if we restored something other than a WAL segment, but it does no harm
* either.
*/
WalSndWakeup();
WalSndWakeup(true, false);
}
/*

View File

@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
/* ------
* Wakeup walsenders:
*
* On the standby, the WAL is flushed first (which will only wake up
* physical walsenders) and then applied, which will only wake up logical
* walsenders.
*
* Indeed, logical walsenders on standby can't decode and send data until
* it's been applied.
*
* Physical walsenders don't need to be woken up during replay unless
* cascading replication is allowed and time line change occurred (so that
* they can notice that they are on a new time line).
*
* That's why the wake up conditions are for:
*
* - physical walsenders in case of new time line and cascade
* replication is allowed
* - logical walsenders in case cascade replication is allowed (could not
* be created otherwise)
* ------
*/
if (AllowCascadeReplication())
WalSndWakeup(switchedTLI, true);
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
*/
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
/*
* Wake up any walsenders to notice that we are on a new timeline.
*/
if (AllowCascadeReplication())
WalSndWakeup();
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}
@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
{
/*
* When we find that WAL ends in an incomplete record, keep track
* of that record. After recovery is done, we'll write a record to
* indicate to downstream WAL readers that that portion is to be
* ignored.
* of that record. After recovery is done, we'll write a record
* to indicate to downstream WAL readers that that portion is to
* be ignored.
*
* However, when ArchiveRecoveryRequested = true, we're going to
* switch to a new timeline at the end of recovery. We will only

View File

@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
if (AllowCascadeReplication())
WalSndWakeup();
WalSndWakeup(true, false);
/* Report XLOG streaming progress in PS display */
if (update_process_title)

View File

@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
/*
* The kind assignment is done here and not in StartReplication()
* and StartLogicalReplication(). Indeed, the logical walsender
* needs to read WAL records (like snapshot of running
* transactions) during the slot creation. So it needs to be woken
* up based on its kind.
*
* The kind assignment could also be done in StartReplication(),
* StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
* seems better to set it on one place.
*/
if (MyDatabaseId == InvalidOid)
walsnd->kind = REPLICATION_KIND_PHYSICAL;
else
walsnd->kind = REPLICATION_KIND_LOGICAL;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
}
/*
* Wake up all walsenders
* Wake up physical, logical or both kinds of walsenders
*
* The distinction between physical and logical walsenders is done, because:
* - physical walsenders can't send data until it's been flushed
* - logical walsenders on standby can't decode and send data until it's been
* applied
*
* For cascading replication we need to wake up physical walsenders separately
* from logical walsenders (see the comment before calling WalSndWakeup() in
* ApplyWalRecord() for more details).
*
* This will be called inside critical sections, so throwing an error is not
* advisable.
*/
void
WalSndWakeup(void)
WalSndWakeup(bool physical, bool logical)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
Latch *latch;
ReplicationKind kind;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
/*
* Get latch pointer with spinlock held, for the unlikely case that
* pointer reads aren't atomic (as they're 8 bytes).
* pointer reads aren't atomic (as they're 8 bytes). While at it, also
* get kind.
*/
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
kind = walsnd->kind;
SpinLockRelease(&walsnd->mutex);
if (latch != NULL)
if (latch == NULL)
continue;
if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
(logical && kind == REPLICATION_KIND_LOGICAL))
SetLatch(latch);
}
}

View File

@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndWakeup(bool physical, bool logical);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
/*
* wakeup walsenders if there is work to be done
*/
#define WalSndWakeupProcessRequests() \
do \
{ \
if (wake_wal_senders) \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
static inline void
WalSndWakeupProcessRequests(bool physical, bool logical)
{
if (wake_wal_senders)
{
wake_wal_senders = false;
if (max_wal_senders > 0)
WalSndWakeup(physical, logical);
}
}
#endif /* _WALSENDER_H */

View File

@ -15,6 +15,7 @@
#include "access/xlog.h"
#include "lib/ilist.h"
#include "nodes/nodes.h"
#include "nodes/replnodes.h"
#include "replication/syncrep.h"
#include "storage/latch.h"
#include "storage/shmem.h"
@ -79,6 +80,8 @@ typedef struct WalSnd
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
ReplicationKind kind;
} WalSnd;
extern PGDLLIMPORT WalSnd *MyWalSnd;