postgresql/src/backend/replication/syncrep.c

678 lines
19 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* syncrep.c
*
* Synchronous replication is new as of PostgreSQL 9.1.
*
* If requested, transaction commits wait until their commit LSN is
* acknowledged by the sync standby.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
* replication transport remains within WALreceiver/WALsender modules.
*
* The essence of this design is that it isolates all logic about
* waiting/releasing onto the primary. The primary defines which standbys
* it wishes to wait for. The standby is completely unaware of the
* durability requirements of transactions on the primary, reducing the
* complexity of the code and streamlining both standby operations and
* network bandwidth because there is no requirement to ship
* per-transaction state information.
*
* Replication is either synchronous or not synchronous (async). If it is
* async, we just fastpath out of here. If it is sync, then in 9.1 we wait
* for the flush location on the standby before releasing the waiting backend.
* Further complexity in that interaction is expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
* single ordered queue of waiting backends, so that we can avoid
* searching the through all waiters each time we receive a reply.
*
* In 9.1 we support only a single synchronous standby, chosen from a
* priority list of synchronous_standby_names. Before it can become the
* synchronous standby it must have caught up with the primary; that may
* take some time. Once caught up, the current highest priority standby
* will release waiters from the queue.
*
* Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/syncrep.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include "access/xact.h"
#include "miscadmin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
2011-03-17 18:10:42 +01:00
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
/* User-settable parameters for sync rep */
2011-04-10 17:42:00 +02:00
char *SyncRepStandbyNames;
2011-03-17 18:10:42 +01:00
#define SyncStandbysDefined() \
(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
#define SyncRepRequested() \
(max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
static bool announce_next_takeover = true;
static void SyncRepQueueInsert(void);
2011-03-17 18:10:42 +01:00
static void SyncRepCancelWait(void);
2011-04-10 17:42:00 +02:00
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN(void);
#endif
/*
* ===========================================================
* Synchronous Replication functions for normal user backends
* ===========================================================
*/
/*
* Wait for synchronous replication, if requested by user.
2011-03-17 18:10:42 +01:00
*
* Initially backends start in state SYNC_REP_NOT_WAITING and then
* change that state to SYNC_REP_WAITING before adding ourselves
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING.
*/
void
SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
{
2011-04-10 17:42:00 +02:00
char *new_status = NULL;
const char *old_status;
/*
2011-04-10 17:42:00 +02:00
* Fast exit if user has not requested sync replication, or there are no
* sync replication standby names defined. Note that those standbys don't
* need to be connected.
*/
2011-03-17 18:10:42 +01:00
if (!SyncRepRequested() || !SyncStandbysDefined())
return;
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
2011-03-17 18:10:42 +01:00
Assert(WalSndCtl != NULL);
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
/*
2011-04-10 17:42:00 +02:00
* We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
* set. See SyncRepUpdateSyncStandbysDefined.
*
2011-04-10 17:42:00 +02:00
* Also check that the standby hasn't already replied. Unlikely race
* condition but we'll be fetching that cache line anyway so its likely to
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
XLByteLE(XactCommitLSN, WalSndCtl->lsn))
2011-03-17 18:10:42 +01:00
{
LWLockRelease(SyncRepLock);
return;
}
/*
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
2011-03-17 18:10:42 +01:00
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert();
Assert(SyncRepQueueIsOrderedByLSN());
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
if (update_process_title)
{
int len;
old_status = get_ps_display(&len);
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
/*
* Wait for specified LSN to be confirmed.
*
* Each proc has its own wait latch, so we perform a normal latch
* check/wait loop here.
*/
for (;;)
{
2011-04-10 17:42:00 +02:00
int syncRepState;
2011-03-17 18:10:42 +01:00
/* Must reset the latch before testing state. */
ResetLatch(&MyProc->procLatch);
/*
2011-04-10 17:42:00 +02:00
* Try checking the state without the lock first. There's no
* guarantee that we'll read the most up-to-date value, so if it looks
* like we're still waiting, recheck while holding the lock. But if
* it looks like we're done, we must really be done, because once
* walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
* never update it again, so we can't be seeing a stale value in that
* case.
*
* Note: on machines with weak memory ordering, the acquisition of
* the lock is essential to avoid race conditions: we cannot be sure
* the sender's state update has reached main memory until we acquire
* the lock. We could get rid of this dance if SetLatch/ResetLatch
* contained memory barriers.
*/
2011-03-17 18:10:42 +01:00
syncRepState = MyProc->syncRepState;
if (syncRepState == SYNC_REP_WAITING)
{
2011-03-17 18:10:42 +01:00
LWLockAcquire(SyncRepLock, LW_SHARED);
syncRepState = MyProc->syncRepState;
LWLockRelease(SyncRepLock);
}
2011-03-17 18:10:42 +01:00
if (syncRepState == SYNC_REP_WAIT_COMPLETE)
break;
/*
2011-03-17 18:10:42 +01:00
* If a wait for synchronous replication is pending, we can neither
2011-04-10 17:42:00 +02:00
* acknowledge the commit nor raise ERROR or FATAL. The latter would
* lead the client to believe that that the transaction aborted, which
* is not true: it's already committed locally. The former is no good
* either: the client has requested synchronous replication, and is
* entitled to assume that an acknowledged commit is also replicated,
* which might not be true. So in this case we issue a WARNING (which
2011-04-10 17:42:00 +02:00
* some clients may be able to interpret) and shut off further output.
* We do NOT reset ProcDiePending, so that the process will die after
* the commit is cleaned up.
*/
2011-03-17 18:10:42 +01:00
if (ProcDiePending)
{
ereport(WARNING,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
2011-03-17 18:10:42 +01:00
whereToSendOutput = DestNone;
SyncRepCancelWait();
break;
}
/*
* It's unclear what to do if a query cancel interrupt arrives. We
* can't actually abort at this point, but ignoring the interrupt
2011-04-10 17:42:00 +02:00
* altogether is not helpful, so we just terminate the wait with a
* suitable warning.
2011-03-17 18:10:42 +01:00
*/
if (QueryCancelPending)
{
QueryCancelPending = false;
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to user request"),
errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
2011-03-17 18:10:42 +01:00
SyncRepCancelWait();
break;
}
/*
2011-04-10 17:42:00 +02:00
* If the postmaster dies, we'll probably never get an
2011-06-09 20:32:50 +02:00
* acknowledgement, because all the wal sender processes will exit. So
* just bail out.
2011-03-17 18:10:42 +01:00
*/
if (!PostmasterIsAlive())
2011-03-17 18:10:42 +01:00
{
ProcDiePending = true;
whereToSendOutput = DestNone;
SyncRepCancelWait();
break;
}
/*
* Wait on latch. Any condition that should wake us up will set
* the latch, so no need for timeout.
*/
WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
2011-03-17 18:10:42 +01:00
}
/*
* WalSender has checked our LSN and has removed us from queue. Clean up
* state and leave. It's OK to reset these shared memory fields without
* holding SyncRepLock, because any walsenders will ignore us anyway when
* we're not on the queue.
*/
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
MyProc->waitLSN.xlogid = 0;
MyProc->waitLSN.xrecoff = 0;
if (new_status)
{
/* Reset ps display */
set_ps_display(new_status, false);
pfree(new_status);
}
}
/*
* Insert MyProc into SyncRepQueue, maintaining sorted invariant.
*
* Usually we will go at tail of queue, though it's possible that we arrive
* here out of order, so start at tail and work back to insertion point.
*/
static void
SyncRepQueueInsert(void)
{
2011-04-10 17:42:00 +02:00
PGPROC *proc;
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
&(WalSndCtl->SyncRepQueue),
offsetof(PGPROC, syncRepLinks));
while (proc)
{
/*
2011-04-10 17:42:00 +02:00
* Stop at the queue element that we should after to ensure the queue
* is ordered by LSN.
*/
if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
break;
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
if (proc)
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
else
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
}
2011-03-17 18:10:42 +01:00
/*
* Acquire SyncRepLock and cancel any wait currently in progress.
*/
static void
SyncRepCancelWait(void)
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
SHMQueueDelete(&(MyProc->syncRepLinks));
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
LWLockRelease(SyncRepLock);
}
void
SyncRepCleanupAtProcExit(void)
{
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
SHMQueueDelete(&(MyProc->syncRepLinks));
LWLockRelease(SyncRepLock);
}
}
/*
* ===========================================================
* Synchronous Replication functions for wal sender processes
* ===========================================================
*/
/*
* Take any action required to initialise sync rep state from config
* data. Called at WALSender startup and after each SIGHUP.
*/
void
SyncRepInitConfig(void)
{
2011-04-10 17:42:00 +02:00
int priority;
/*
* Determine if we are a potential sync standby and remember the result
* for handling replies from standby.
*/
priority = SyncRepGetStandbyPriority();
if (MyWalSnd->sync_standby_priority != priority)
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
MyWalSnd->sync_standby_priority = priority;
LWLockRelease(SyncRepLock);
ereport(DEBUG1,
2011-04-10 17:42:00 +02:00
(errmsg("standby \"%s\" now has synchronous standby priority %u",
application_name, priority)));
}
}
/*
* Update the LSNs on each queue based upon our latest state. This
* implements a simple policy of first-valid-standby-releases-waiter.
*
* Other policies are possible, which would change what we do here and what
* perhaps also which information we store as well.
*/
void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
2011-04-10 17:42:00 +02:00
int numprocs = 0;
int priority = 0;
int i;
/*
* If this WALSender is serving a standby that is not on the list of
2011-04-10 17:42:00 +02:00
* potential standbys then we have nothing to do. If we are still starting
* up or still running base backup, then leave quickly also.
*/
if (MyWalSnd->sync_standby_priority == 0 ||
MyWalSnd->state < WALSNDSTATE_STREAMING)
return;
/*
2011-04-10 17:42:00 +02:00
* We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities
* then we use the first mentioned standby. If you change this, also
* change pg_stat_get_wal_senders().
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &walsndctl->walsnds[i];
if (walsnd->pid != 0 &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
priority > walsnd->sync_standby_priority))
{
2011-04-10 17:42:00 +02:00
priority = walsnd->sync_standby_priority;
syncWalSnd = walsnd;
}
}
/*
* We should have found ourselves at least.
*/
Assert(syncWalSnd);
/*
* If we aren't managing the highest priority standby then just leave.
*/
if (syncWalSnd != MyWalSnd)
{
LWLockRelease(SyncRepLock);
announce_next_takeover = true;
return;
}
if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
{
/*
2011-04-10 17:42:00 +02:00
* Set the lsn first so that when we wake backends they will release
* up to this location.
*/
walsndctl->lsn = MyWalSnd->flush;
numprocs = SyncRepWakeQueue(false);
}
LWLockRelease(SyncRepLock);
elog(DEBUG3, "released %d procs up to %X/%X",
2011-04-10 17:42:00 +02:00
numprocs,
MyWalSnd->flush.xlogid,
MyWalSnd->flush.xrecoff);
/*
* If we are managing the highest priority standby, though we weren't
* prior to this, then announce we are now the sync standby.
*/
if (announce_next_takeover)
{
announce_next_takeover = false;
ereport(LOG,
(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
application_name, MyWalSnd->sync_standby_priority)));
}
}
/*
* Check if we are in the list of sync standbys, and if so, determine
* priority sequence. Return priority if set, or zero to indicate that
* we are not a potential sync standby.
*
* Compare the parameter SyncRepStandbyNames against the application_name
* for this WALSender, or allow any name if we find a wildcard "*".
*/
static int
SyncRepGetStandbyPriority(void)
{
char *rawstring;
List *elemlist;
ListCell *l;
int priority = 0;
bool found = false;
/*
* Since synchronous cascade replication is not allowed, we always
* set the priority of cascading walsender to zero.
*/
if (am_cascading_walsender)
return 0;
/* Need a modifiable copy of string */
rawstring = pstrdup(SyncRepStandbyNames);
/* Parse string into list of identifiers */
if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
/* syntax error in list */
pfree(rawstring);
list_free(elemlist);
/* GUC machinery will have already complained - no need to do again */
return 0;
}
foreach(l, elemlist)
{
char *standby_name = (char *) lfirst(l);
priority++;
if (pg_strcasecmp(standby_name, application_name) == 0 ||
pg_strcasecmp(standby_name, "*") == 0)
{
found = true;
break;
}
}
pfree(rawstring);
list_free(elemlist);
return (found ? priority : 0);
}
/*
* Walk queue from head. Set the state of any backends that need to be woken,
2011-04-10 17:42:00 +02:00
* remove them from the queue, and then wake them. Pass all = true to wake
* whole queue; otherwise, just wake up to the walsender's LSN.
*
* Must hold SyncRepLock.
*/
int
SyncRepWakeQueue(bool all)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
2011-04-10 17:42:00 +02:00
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
Assert(SyncRepQueueIsOrderedByLSN());
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
&(WalSndCtl->SyncRepQueue),
offsetof(PGPROC, syncRepLinks));
while (proc)
{
/*
* Assume the queue is ordered by LSN
*/
if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
return numprocs;
/*
* Move to next proc, so we can delete thisproc from the queue.
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
/*
2011-04-10 17:42:00 +02:00
* Set state to complete; see SyncRepWaitForLSN() for discussion of
* the various states.
*/
thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
/*
* Remove thisproc from queue.
*/
SHMQueueDelete(&(thisproc->syncRepLinks));
/*
* Wake only when we have set state and removed from queue.
*/
SetLatch(&(thisproc->procLatch));
numprocs++;
}
return numprocs;
}
2011-03-17 18:10:42 +01:00
/*
* The background writer calls this as needed to update the shared
* sync_standbys_defined flag, so that backends don't remain permanently wedged
* if synchronous_standby_names is unset. It's safe to check the current value
2011-03-17 18:10:42 +01:00
* without the lock, because it's only ever updated by one process. But we
* must take the lock to change it.
*/
void
SyncRepUpdateSyncStandbysDefined(void)
{
bool sync_standbys_defined = SyncStandbysDefined();
if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
/*
* If synchronous_standby_names has been reset to empty, it's futile
* for backends to continue to waiting. Since the user no longer
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
SyncRepWakeQueue(true);
/*
* Only allow people to join the queue when there are synchronous
* standbys defined. Without this interlock, there's a race
* condition: we might wake up all the current waiters; then, some
* backend that hasn't yet reloaded its config might go to sleep on
* the queue (and never wake up). This prevents that.
*/
WalSndCtl->sync_standbys_defined = sync_standbys_defined;
LWLockRelease(SyncRepLock);
}
}
#ifdef USE_ASSERT_CHECKING
static bool
SyncRepQueueIsOrderedByLSN(void)
{
2011-04-10 17:42:00 +02:00
PGPROC *proc = NULL;
XLogRecPtr lastLSN;
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
&(WalSndCtl->SyncRepQueue),
offsetof(PGPROC, syncRepLinks));
while (proc)
{
/*
2011-04-10 17:42:00 +02:00
* Check the queue is ordered by LSN and that multiple procs don't
* have matching LSNs
*/
if (XLByteLE(proc->waitLSN, lastLSN))
return false;
lastLSN = proc->waitLSN;
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
return true;
}
#endif
/*
* ===========================================================
* Synchronous Replication functions executed by any process
* ===========================================================
*/
bool
check_synchronous_standby_names(char **newval, void **extra, GucSource source)
{
char *rawstring;
List *elemlist;
/* Need a modifiable copy of string */
rawstring = pstrdup(*newval);
/* Parse string into list of identifiers */
if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
/* syntax error in list */
GUC_check_errdetail("List syntax is invalid.");
pfree(rawstring);
list_free(elemlist);
return false;
}
/*
* Any additional validation of standby names should go here.
*
* Don't attempt to set WALSender priority because this is executed by
2011-04-10 17:42:00 +02:00
* postmaster at startup, not WALSender, so the application_name is not
* yet correctly set.
*/
pfree(rawstring);
list_free(elemlist);
return true;
}