|
|
|
@ -55,6 +55,7 @@
|
|
|
|
|
#include "storage/ipc.h"
|
|
|
|
|
#include "storage/pmsignal.h"
|
|
|
|
|
#include "storage/proc.h"
|
|
|
|
|
#include "tcop/tcopprot.h"
|
|
|
|
|
#include "utils/builtins.h"
|
|
|
|
|
#include "utils/guc.h"
|
|
|
|
|
#include "utils/guc_tables.h"
|
|
|
|
@ -65,10 +66,13 @@
|
|
|
|
|
bool synchronous_replication = false; /* Only set in user backends */
|
|
|
|
|
char *SyncRepStandbyNames;
|
|
|
|
|
|
|
|
|
|
static bool sync_standbys_defined = false; /* Is there at least one name? */
|
|
|
|
|
#define SyncStandbysDefined() \
|
|
|
|
|
(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
|
|
|
|
|
|
|
|
|
|
static bool announce_next_takeover = true;
|
|
|
|
|
|
|
|
|
|
static void SyncRepQueueInsert(void);
|
|
|
|
|
static void SyncRepCancelWait(void);
|
|
|
|
|
|
|
|
|
|
static int SyncRepGetStandbyPriority(void);
|
|
|
|
|
#ifdef USE_ASSERT_CHECKING
|
|
|
|
@ -83,6 +87,12 @@ static bool SyncRepQueueIsOrderedByLSN(void);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Wait for synchronous replication, if requested by user.
|
|
|
|
|
*
|
|
|
|
|
* Initially backends start in state SYNC_REP_NOT_WAITING and then
|
|
|
|
|
* change that state to SYNC_REP_WAITING before adding ourselves
|
|
|
|
|
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
|
|
|
|
|
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
|
|
|
|
|
* This backend then resets its state to SYNC_REP_NOT_WAITING.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
|
|
|
|
@ -95,10 +105,49 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
|
|
|
|
|
* there are no sync replication standby names defined.
|
|
|
|
|
* Note that those standbys don't need to be connected.
|
|
|
|
|
*/
|
|
|
|
|
if (!SyncRepRequested() || !sync_standbys_defined)
|
|
|
|
|
if (!SyncRepRequested() || !SyncStandbysDefined())
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
|
|
|
|
Assert(WalSndCtl != NULL);
|
|
|
|
|
|
|
|
|
|
/* Reset the latch before adding ourselves to the queue. */
|
|
|
|
|
ResetLatch(&MyProc->waitLatch);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set our waitLSN so WALSender will know when to wake us, and add
|
|
|
|
|
* ourselves to the queue.
|
|
|
|
|
*/
|
|
|
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
|
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
|
|
|
|
|
if (!WalSndCtl->sync_standbys_defined)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* We don't wait for sync rep if WalSndCtl->sync_standbys_defined is
|
|
|
|
|
* not set. See SyncRepUpdateSyncStandbysDefined.
|
|
|
|
|
*/
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
MyProc->waitLSN = XactCommitLSN;
|
|
|
|
|
MyProc->syncRepState = SYNC_REP_WAITING;
|
|
|
|
|
SyncRepQueueInsert();
|
|
|
|
|
Assert(SyncRepQueueIsOrderedByLSN());
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
|
|
|
|
|
/* Alter ps display to show waiting for sync rep. */
|
|
|
|
|
if (update_process_title)
|
|
|
|
|
{
|
|
|
|
|
int len;
|
|
|
|
|
|
|
|
|
|
old_status = get_ps_display(&len);
|
|
|
|
|
new_status = (char *) palloc(len + 32 + 1);
|
|
|
|
|
memcpy(new_status, old_status, len);
|
|
|
|
|
sprintf(new_status + len, " waiting for %X/%X",
|
|
|
|
|
XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
|
|
|
|
|
set_ps_display(new_status, false);
|
|
|
|
|
new_status[len] = '\0'; /* truncate off " waiting ..." */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Wait for specified LSN to be confirmed.
|
|
|
|
@ -108,110 +157,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
|
|
|
|
|
*/
|
|
|
|
|
for (;;)
|
|
|
|
|
{
|
|
|
|
|
ResetLatch(&MyProc->waitLatch);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Synchronous Replication state machine within user backend
|
|
|
|
|
*
|
|
|
|
|
* Initially backends start in state SYNC_REP_NOT_WAITING and then
|
|
|
|
|
* change that state to SYNC_REP_WAITING before adding ourselves
|
|
|
|
|
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
|
|
|
|
|
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
|
|
|
|
|
* This backend then resets its state to SYNC_REP_NOT_WAITING when
|
|
|
|
|
* we exit normally, or SYNC_REP_MUST_DISCONNECT in abnormal cases.
|
|
|
|
|
*
|
|
|
|
|
* We read MyProc->syncRepState without SyncRepLock, which
|
|
|
|
|
* assumes that read access is atomic.
|
|
|
|
|
*/
|
|
|
|
|
switch (MyProc->syncRepState)
|
|
|
|
|
{
|
|
|
|
|
case SYNC_REP_NOT_WAITING:
|
|
|
|
|
/*
|
|
|
|
|
* Set our waitLSN so WALSender will know when to wake us.
|
|
|
|
|
* We set this before we add ourselves to queue, so that
|
|
|
|
|
* any proc on the queue can be examined freely without
|
|
|
|
|
* taking a lock on each process in the queue, as long as
|
|
|
|
|
* they hold SyncRepLock.
|
|
|
|
|
*/
|
|
|
|
|
MyProc->waitLSN = XactCommitLSN;
|
|
|
|
|
MyProc->syncRepState = SYNC_REP_WAITING;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Add to queue while holding lock.
|
|
|
|
|
*/
|
|
|
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
|
SyncRepQueueInsert();
|
|
|
|
|
Assert(SyncRepQueueIsOrderedByLSN());
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Alter ps display to show waiting for sync rep.
|
|
|
|
|
*/
|
|
|
|
|
if (update_process_title)
|
|
|
|
|
{
|
|
|
|
|
int len;
|
|
|
|
|
|
|
|
|
|
old_status = get_ps_display(&len);
|
|
|
|
|
new_status = (char *) palloc(len + 32 + 1);
|
|
|
|
|
memcpy(new_status, old_status, len);
|
|
|
|
|
sprintf(new_status + len, " waiting for %X/%X",
|
|
|
|
|
XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
|
|
|
|
|
set_ps_display(new_status, false);
|
|
|
|
|
new_status[len] = '\0'; /* truncate off " waiting ..." */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case SYNC_REP_WAITING:
|
|
|
|
|
/*
|
|
|
|
|
* Check for conditions that would cause us to leave the
|
|
|
|
|
* wait state before the LSN has been reached.
|
|
|
|
|
*/
|
|
|
|
|
if (!PostmasterIsAlive(true))
|
|
|
|
|
{
|
|
|
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
|
SHMQueueDelete(&(MyProc->syncRepLinks));
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
|
|
|
|
|
MyProc->syncRepState = SYNC_REP_MUST_DISCONNECT;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We don't receive SIGHUPs at this point, so resetting
|
|
|
|
|
* synchronous_standby_names has no effect on waiters.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/* Continue waiting */
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case SYNC_REP_WAIT_COMPLETE:
|
|
|
|
|
/*
|
|
|
|
|
* WalSender has checked our LSN and has removed us from
|
|
|
|
|
* queue. Cleanup local state and leave.
|
|
|
|
|
*/
|
|
|
|
|
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
|
|
|
|
|
|
|
|
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
|
|
|
|
MyProc->waitLSN.xlogid = 0;
|
|
|
|
|
MyProc->waitLSN.xrecoff = 0;
|
|
|
|
|
|
|
|
|
|
if (new_status)
|
|
|
|
|
{
|
|
|
|
|
/* Reset ps display */
|
|
|
|
|
set_ps_display(new_status, false);
|
|
|
|
|
pfree(new_status);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
case SYNC_REP_MUST_DISCONNECT:
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
elog(FATAL, "invalid syncRepState");
|
|
|
|
|
}
|
|
|
|
|
int syncRepState;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Wait on latch for up to 60 seconds. This allows us to
|
|
|
|
@ -219,6 +165,97 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
|
|
|
|
|
* Note that timeout here does not necessarily release from loop.
|
|
|
|
|
*/
|
|
|
|
|
WaitLatch(&MyProc->waitLatch, 60000000L);
|
|
|
|
|
|
|
|
|
|
/* Must reset the latch before testing state. */
|
|
|
|
|
ResetLatch(&MyProc->waitLatch);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Try checking the state without the lock first. There's no guarantee
|
|
|
|
|
* that we'll read the most up-to-date value, so if it looks like we're
|
|
|
|
|
* still waiting, recheck while holding the lock. But if it looks like
|
|
|
|
|
* we're done, we must really be done, because once walsender changes
|
|
|
|
|
* the state to SYNC_REP_WAIT_COMPLETE, it will never update it again,
|
|
|
|
|
* so we can't be seeing a stale value in that case.
|
|
|
|
|
*/
|
|
|
|
|
syncRepState = MyProc->syncRepState;
|
|
|
|
|
if (syncRepState == SYNC_REP_WAITING)
|
|
|
|
|
{
|
|
|
|
|
LWLockAcquire(SyncRepLock, LW_SHARED);
|
|
|
|
|
syncRepState = MyProc->syncRepState;
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
}
|
|
|
|
|
if (syncRepState == SYNC_REP_WAIT_COMPLETE)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If a wait for synchronous replication is pending, we can neither
|
|
|
|
|
* acknowledge the commit nor raise ERROR or FATAL. The latter
|
|
|
|
|
* would lead the client to believe that that the transaction
|
|
|
|
|
* aborted, which is not true: it's already committed locally.
|
|
|
|
|
* The former is no good either: the client has requested
|
|
|
|
|
* synchronous replication, and is entitled to assume that an
|
|
|
|
|
* acknowledged commit is also replicated, which may not be true.
|
|
|
|
|
* So in this case we issue a WARNING (which some clients may
|
|
|
|
|
* be able to interpret) and shut off further output. We do NOT
|
|
|
|
|
* reset ProcDiePending, so that the process will die after the
|
|
|
|
|
* commit is cleaned up.
|
|
|
|
|
*/
|
|
|
|
|
if (ProcDiePending)
|
|
|
|
|
{
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errcode(ERRCODE_ADMIN_SHUTDOWN),
|
|
|
|
|
errmsg("canceling the wait for replication and terminating connection due to administrator command"),
|
|
|
|
|
errdetail("The transaction has already been committed locally but might have not been replicated to the standby.")));
|
|
|
|
|
whereToSendOutput = DestNone;
|
|
|
|
|
SyncRepCancelWait();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* It's unclear what to do if a query cancel interrupt arrives. We
|
|
|
|
|
* can't actually abort at this point, but ignoring the interrupt
|
|
|
|
|
* altogether is not helpful, so we just terminate the wait with
|
|
|
|
|
* a suitable warning.
|
|
|
|
|
*/
|
|
|
|
|
if (QueryCancelPending)
|
|
|
|
|
{
|
|
|
|
|
QueryCancelPending = false;
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errmsg("canceling wait for synchronous replication due to user request"),
|
|
|
|
|
errdetail("The transaction has committed locally, but may not have replicated to the standby.")));
|
|
|
|
|
SyncRepCancelWait();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If the postmaster dies, we'll probably never get an acknowledgement,
|
|
|
|
|
* because all the wal sender processes will exit. So just bail out.
|
|
|
|
|
*/
|
|
|
|
|
if (!PostmasterIsAlive(true))
|
|
|
|
|
{
|
|
|
|
|
ProcDiePending = true;
|
|
|
|
|
whereToSendOutput = DestNone;
|
|
|
|
|
SyncRepCancelWait();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* WalSender has checked our LSN and has removed us from queue. Clean up
|
|
|
|
|
* state and leave. It's OK to reset these shared memory fields without
|
|
|
|
|
* holding SyncRepLock, because any walsenders will ignore us anyway when
|
|
|
|
|
* we're not on the queue.
|
|
|
|
|
*/
|
|
|
|
|
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
|
|
|
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
|
|
|
|
MyProc->waitLSN.xlogid = 0;
|
|
|
|
|
MyProc->waitLSN.xrecoff = 0;
|
|
|
|
|
|
|
|
|
|
if (new_status)
|
|
|
|
|
{
|
|
|
|
|
/* Reset ps display */
|
|
|
|
|
set_ps_display(new_status, false);
|
|
|
|
|
pfree(new_status);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -257,6 +294,19 @@ SyncRepQueueInsert(void)
|
|
|
|
|
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Acquire SyncRepLock and cancel any wait currently in progress.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
SyncRepCancelWait(void)
|
|
|
|
|
{
|
|
|
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
|
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
|
|
|
|
|
SHMQueueDelete(&(MyProc->syncRepLinks));
|
|
|
|
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
SyncRepCleanupAtProcExit(int code, Datum arg)
|
|
|
|
|
{
|
|
|
|
@ -506,6 +556,43 @@ SyncRepWakeQueue(bool all)
|
|
|
|
|
return numprocs;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* WAL writer calls this as needed to update the shared sync_standbys_defined
|
|
|
|
|
* flag, so that backends don't remain permanently wedged if
|
|
|
|
|
* synchronous_standby_names is unset. It's safe to check the current value
|
|
|
|
|
* without the lock, because it's only ever updated by one process. But we
|
|
|
|
|
* must take the lock to change it.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
SyncRepUpdateSyncStandbysDefined(void)
|
|
|
|
|
{
|
|
|
|
|
bool sync_standbys_defined = SyncStandbysDefined();
|
|
|
|
|
|
|
|
|
|
if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
|
|
|
|
|
{
|
|
|
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If synchronous_standby_names has been reset to empty, it's futile
|
|
|
|
|
* for backends to continue to waiting. Since the user no longer
|
|
|
|
|
* wants synchronous replication, we'd better wake them up.
|
|
|
|
|
*/
|
|
|
|
|
if (!sync_standbys_defined)
|
|
|
|
|
SyncRepWakeQueue(true);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Only allow people to join the queue when there are synchronous
|
|
|
|
|
* standbys defined. Without this interlock, there's a race
|
|
|
|
|
* condition: we might wake up all the current waiters; then, some
|
|
|
|
|
* backend that hasn't yet reloaded its config might go to sleep on
|
|
|
|
|
* the queue (and never wake up). This prevents that.
|
|
|
|
|
*/
|
|
|
|
|
WalSndCtl->sync_standbys_defined = sync_standbys_defined;
|
|
|
|
|
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef USE_ASSERT_CHECKING
|
|
|
|
|
static bool
|
|
|
|
|
SyncRepQueueIsOrderedByLSN(void)
|
|
|
|
@ -567,13 +654,6 @@ assign_synchronous_standby_names(const char *newval, bool doit, GucSource source
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Is there at least one sync standby? If so cache this knowledge to
|
|
|
|
|
* improve performance of SyncRepWaitForLSN() for all-async configs.
|
|
|
|
|
*/
|
|
|
|
|
if (doit && list_length(elemlist) > 0)
|
|
|
|
|
sync_standbys_defined = true;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Any additional validation of standby names should go here.
|
|
|
|
|
*
|
|
|
|
|