diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8684414751..822ef4b60c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2018,6 +2018,92 @@ SET ENABLE_SEQSCAN TO OFF;
+
+ Synchronous Replication
+
+
+ These settings control the behavior of the built-in
+ synchronous replication> feature.
+ These parameters would be set on the primary server that is
+ to send replication data to one or more standby servers.
+
+
+
+
+ synchronous_replication (boolean)
+
+ synchronous_replication> configuration parameter
+
+
+
+ Specifies whether transaction commit will wait for WAL records
+ to be replicated before the command returns a success>
+ indication to the client. The default setting is off>.
+ When on>, there will be a delay while the client waits
+ for confirmation of successful replication. That delay will
+ increase depending upon the physical distance and network activity
+ between primary and standby. The commit wait will last until a
+ reply from the current synchronous standby indicates it has received
+ the commit record of the transaction. Synchronous standbys must
+ already have been defined (see ).
+
+
+ This parameter can be changed at any time; the
+ behavior for any one transaction is determined by the setting in
+ effect when it commits. It is therefore possible, and useful, to have
+ some transactions replicate synchronously and others asynchronously.
+ For example, to make a single multistatement transaction commit
+ asynchronously when the default is synchronous replication, issue
+ SET LOCAL synchronous_replication TO OFF> within the
+ transaction.
+
+
+
+
+
+ synchronous_standby_names (integer)
+
+ synchronous_standby_names> configuration parameter
+
+
+
+ Specifies a priority ordered list of standby names that can offer
+ synchronous replication. At any one time there will be just one
+ synchronous standby that will wake sleeping users following commit.
+ The synchronous standby will be the first named standby that is
+ both currently connected and streaming in real-time to the standby
+ (as shown by a state of "STREAMING"). Other standby servers
+ with listed later will become potential synchronous standbys.
+ If the current synchronous standby disconnects for whatever reason
+ it will be replaced immediately with the next highest priority standby.
+ Specifying more than one standby name can allow very high availability.
+
+
+ The standby name is currently taken as the application_name of the
+ standby, as set in the primary_conninfo on the standby. Names are
+ not enforced for uniqueness. In case of duplicates one of the standbys
+ will be chosen to be the synchronous standby, though exactly which
+ one is indeterminate.
+
+
+ No value is set by default.
+ The special entry *> matches any application_name, including
+ the default application name of walreceiver>.
+
+
+ If a standby is removed from the list of servers then it will stop
+ being the synchronous standby, allowing another to take it's place.
+ If the list is empty, synchronous replication will not be
+ possible, whatever the setting of synchronous_replication>,
+ however, already waiting commits will continue to wait.
+ Standbys may also be added to the list without restarting the server.
+
+
+
+
+
+
+
Standby Servers
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 37ba43b5fd..e30552f09f 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -875,6 +875,209 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
+
+ Synchronous Replication
+
+
+ Synchronous Replication
+
+
+
+ PostgreSQL> streaming replication is asynchronous by
+ default. If the primary server
+ crashes then some transactions that were committed may not have been
+ replicated to the standby server, causing data loss. The amount
+ of data loss is proportional to the replication delay at the time of
+ failover.
+
+
+
+ Synchronous replication offers the ability to confirm that all changes
+ made by a transaction have been transferred to one synchronous standby
+ server. This extends the standard level of durability
+ offered by a transaction commit. This level of protection is referred
+ to as 2-safe replication in computer science theory.
+
+
+
+ When requesting synchronous replication, each commit of a
+ write transaction will wait until confirmation is
+ received that the commit has been written to the transaction log on disk
+ of both the primary and standby server. The only possibility that data
+ can be lost is if both the primary and the standby suffer crashes at the
+ same time. This can provide a much higher level of durability, though only
+ if the sysadmin is cautious about the placement and management of the two
+ servers. Waiting for confirmation increases the user's confidence that the
+ changes will not be lost in the event of server crashes but it also
+ necessarily increases the response time for the requesting transaction.
+ The minimum wait time is the roundtrip time between primary to standby.
+
+
+
+ Read only transactions and transaction rollbacks need not wait for
+ replies from standby servers. Subtransaction commits do not wait for
+ responses from standby servers, only top-level commits. Long
+ running actions such as data loading or index building do not wait
+ until the very final commit message. All two-phase commit actions
+ require commit waits, including both prepare and commit.
+
+
+
+ Basic Configuration
+
+
+ All parameters have useful default values, so we can enable
+ synchronous replication easily just by setting this on the primary
+
+
+synchronous_replication = on
+
+
+ When synchronous_replication> is set, a commit will wait
+ for confirmation that the standby has received the commit record,
+ even if that takes a very long time.
+ synchronous_replication> can be set by individual
+ users, so can be configured in the configuration file, for particular
+ users or databases, or dynamically by applications programs.
+
+
+
+ After a commit record has been written to disk on the primary the
+ WAL record is then sent to the standby. The standby sends reply
+ messages each time a new batch of WAL data is received, unless
+ wal_receiver_status_interval> is set to zero on the standby.
+ If the standby is the first matching standby, as specified in
+ synchronous_standby_names> on the primary, the reply
+ messages from that standby will be used to wake users waiting for
+ confirmation the commit record has been received. These parameters
+ allow the administrator to specify which standby servers should be
+ synchronous standbys. Note that the configuration of synchronous
+ replication is mainly on the master.
+
+
+
+ Users will stop waiting if a fast shutdown is requested, though the
+ server does not fully shutdown until all outstanding WAL records are
+ transferred to standby servers.
+
+
+
+ Note also that synchronous_commit> is used when the user
+ specifies synchronous_replication>, overriding even an
+ explicit setting of synchronous_commit> to off>.
+ This is because we must write WAL to disk on primary before we replicate
+ to ensure the standby never gets ahead of the primary.
+
+
+
+
+
+ Planning for Performance
+
+
+ Synchronous replication usually requires carefully planned and placed
+ standby servers to ensure applications perform acceptably. Waiting
+ doesn't utilise system resources, but transaction locks continue to be
+ held until the transfer is confirmed. As a result, incautious use of
+ synchronous replication will reduce performance for database
+ applications because of increased response times and higher contention.
+
+
+
+ PostgreSQL> allows the application developer
+ to specify the durability level required via replication. This can be
+ specified for the system overall, though it can also be specified for
+ specific users or connections, or even individual transactions.
+
+
+
+ For example, an application workload might consist of:
+ 10% of changes are important customer details, while
+ 90% of changes are less important data that the business can more
+ easily survive if it is lost, such as chat messages between users.
+
+
+
+ With synchronous replication options specified at the application level
+ (on the primary) we can offer sync rep for the most important changes,
+ without slowing down the bulk of the total workload. Application level
+ options are an important and practical tool for allowing the benefits of
+ synchronous replication for high performance applications.
+
+
+
+ You should consider that the network bandwidth must be higher than
+ the rate of generation of WAL data.
+ 10% of changes are important customer details, while
+ 90% of changes are less important data that the business can more
+ easily survive if it is lost, such as chat messages between users.
+
+
+
+
+
+ Planning for High Availability
+
+
+ Commits made when synchronous_replication is set will wait until at
+ the sync standby responds. The response may never occur if the last,
+ or only, standby should crash.
+
+
+
+ The best solution for avoiding data loss is to ensure you don't lose
+ your last remaining sync standby. This can be achieved by naming multiple
+ potential synchronous standbys using synchronous_standby_names>.
+ The first named standby will be used as the synchronous standby. Standbys
+ listed after this will takeover the role of synchronous standby if the
+ first one should fail.
+
+
+
+ When a standby first attaches to the primary, it will not yet be properly
+ synchronized. This is described as CATCHUP> mode. Once
+ the lag between standby and primary reaches zero for the first time
+ we move to real-time STREAMING> state.
+ The catch-up duration may be long immediately after the standby has
+ been created. If the standby is shutdown, then the catch-up period
+ will increase according to the length of time the standby has been down.
+ The standby is only able to become a synchronous standby
+ once it has reached STREAMING> state.
+
+
+
+ If primary restarts while commits are waiting for acknowledgement, those
+ waiting transactions will be marked fully committed once the primary
+ database recovers.
+ There is no way to be certain that all standbys have received all
+ outstanding WAL data at time of the crash of the primary. Some
+ transactions may not show as committed on the standby, even though
+ they show as committed on the primary. The guarantee we offer is that
+ the application will not receive explicit acknowledgement of the
+ successful commit of a transaction until the WAL data is known to be
+ safely received by the standby.
+
+
+
+ If you really do lose your last standby server then you should disable
+ synchronous_standby_names> and restart the primary server.
+
+
+
+ If the primary is isolated from remaining standby severs you should
+ failover to the best candidate of those other remaining standby servers.
+
+
+
+ If you need to re-create a standby server while transactions are
+ waiting, make sure that the commands to run pg_start_backup() and
+ pg_stop_backup() are run in a session with
+ synchronous_replication = off, otherwise those requests will wait
+ forever for the standby to appear.
+
+
+
+
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index aaa613e988..319a57c6e2 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -306,8 +306,11 @@ postgres: user> database> host> synchronous_standby_names> then the sync_priority
+ is shown here also, that is the order in which standbys will become
+ the synchronous standby. The columns detailing what exactly the connection
+ is doing are only visible if the user examining the view is a superuser.
The client's hostname will be available only if
is set or if the user's hostname
needed to be looked up during pg_hba.conf
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 287ad26698..729c7b72e0 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -56,6 +56,7 @@
#include "pg_trace.h"
#include "pgstat.h"
#include "replication/walsender.h"
+#include "replication/syncrep.h"
#include "storage/fd.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
@@ -1071,6 +1072,14 @@ EndPrepare(GlobalTransaction gxact)
END_CRIT_SECTION();
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked the prepare, but still show as
+ * running in the procarray (twice!) and continue to hold locks.
+ */
+ SyncRepWaitForLSN(gxact->prepare_lsn);
+
records.tail = records.head = NULL;
}
@@ -2030,6 +2039,14 @@ RecordTransactionCommitPrepared(TransactionId xid,
MyProc->inCommit = false;
END_CRIT_SECTION();
+
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(recptr);
}
/*
@@ -2109,4 +2126,12 @@ RecordTransactionAbortPrepared(TransactionId xid,
TransactionIdAbortTree(xid, nchildren, children);
END_CRIT_SECTION();
+
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(recptr);
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4b407015df..c8b582cce8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -37,6 +37,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walsender.h"
+#include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
@@ -1055,7 +1056,7 @@ RecordTransactionCommit(void)
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
*/
- if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0)
+ if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested())
{
/*
* Synchronous commit case:
@@ -1125,6 +1126,14 @@ RecordTransactionCommit(void)
/* Compute latestXid while we have the child XIDs handy */
latestXid = TransactionIdLatest(xid, nchildren, children);
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(XactLastRecEnd);
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd.xrecoff = 0;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c7f43afd81..3f7d7d913a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -520,7 +520,9 @@ CREATE VIEW pg_stat_replication AS
W.sent_location,
W.write_location,
W.flush_location,
- W.replay_location
+ W.replay_location,
+ W.sync_priority,
+ W.sync_state
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
WHERE S.usesysid = U.oid AND
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 7307c4177c..efc8e7cc82 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -1526,6 +1526,13 @@ AutoVacWorkerMain(int argc, char *argv[])
*/
SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
+ /*
+ * Force synchronous replication off to allow regular maintenance even
+ * if we are waiting for standbys to connect. This is important to
+ * ensure we aren't blocked from performing anti-wraparound tasks.
+ */
+ SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE);
+
/*
* Get the info about the database we're going to work on.
*/
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 997af5bf07..372fec7560 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -735,6 +735,9 @@ PostmasterMain(int argc, char *argv[])
if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
ereport(ERROR,
(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
+ if (strlen(SyncRepStandbyNames) > 0 && max_wal_senders == 0)
+ ereport(ERROR,
+ (errmsg("Synchronous replication requires WAL streaming (max_wal_senders > 0)")));
/*
* Other one-time internal sanity checks can go here, if they are fast.
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 42c6eaf26c..3fe490e580 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
- repl_gram.o
+ repl_gram.o syncrep.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 32a1575ab0..47a980db20 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -317,13 +317,9 @@ WalReceiverMain(void)
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
- /* Let the master know that we received some data. */
- XLogWalRcvSendReply();
- XLogWalRcvSendHSFeedback();
-
/*
* If we've written some records, flush them to disk and let the
- * startup process know about them.
+ * startup process and primary server know about them.
*/
XLogWalRcvFlush(false);
}
@@ -581,7 +577,10 @@ XLogWalRcvFlush(bool dying)
/* Also let the master know that we made some progress */
if (!dying)
+ {
XLogWalRcvSendReply();
+ XLogWalRcvSendHSFeedback();
+ }
}
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 49b49d2a18..94547245fe 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,7 +66,7 @@
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
-static WalSnd *MyWalSnd = NULL;
+WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
@@ -174,6 +174,8 @@ WalSenderMain(void)
SpinLockRelease(&walsnd->mutex);
}
+ SyncRepInitConfig();
+
/* Main loop of walsender */
return WalSndLoop();
}
@@ -584,6 +586,8 @@ ProcessStandbyReplyMessage(void)
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+
+ SyncRepReleaseWaiters();
}
/*
@@ -700,6 +704,7 @@ WalSndLoop(void)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
}
/*
@@ -771,7 +776,12 @@ WalSndLoop(void)
* that point might wait for some time.
*/
if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
+ {
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" has now caught up with primary",
+ application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
+ }
ProcessRepliesIfAny();
}
@@ -1238,6 +1248,8 @@ WalSndShmemInit(void)
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
+ SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+
for (i = 0; i < max_wal_senders; i++)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
@@ -1304,12 +1316,15 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 6
+#define PG_STAT_GET_WAL_SENDERS_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
+ int sync_priority[max_wal_senders];
+ int priority = 0;
+ int sync_standby = -1;
int i;
/* check to see if caller supports us returning a tuplestore */
@@ -1337,6 +1352,33 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
+ /*
+ * Get the priorities of sync standbys all in one go, to minimise
+ * lock acquisitions and to allow us to evaluate who is the current
+ * sync standby. This code must match the code in SyncRepReleaseWaiters().
+ */
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid != 0)
+ {
+ sync_priority[i] = walsnd->sync_standby_priority;
+
+ if (walsnd->state == WALSNDSTATE_STREAMING &&
+ walsnd->sync_standby_priority > 0 &&
+ (priority == 0 ||
+ priority > walsnd->sync_standby_priority))
+ {
+ priority = walsnd->sync_standby_priority;
+ sync_standby = i;
+ }
+ }
+ }
+ LWLockRelease(SyncRepLock);
+
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
@@ -1370,11 +1412,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* Only superusers can see details. Other users only get
* the pid value to know it's a walsender, but no details.
*/
- nulls[1] = true;
- nulls[2] = true;
- nulls[3] = true;
- nulls[4] = true;
- nulls[5] = true;
+ MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
}
else
{
@@ -1401,6 +1439,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
snprintf(location, sizeof(location), "%X/%X",
apply.xlogid, apply.xrecoff);
values[5] = CStringGetTextDatum(location);
+
+ values[6] = Int32GetDatum(sync_priority[i]);
+
+ /*
+ * More easily understood version of standby state.
+ * This is purely informational, not different from priority.
+ */
+ if (sync_priority[i] == 0)
+ values[7] = CStringGetTextDatum("ASYNC");
+ else if (i == sync_standby)
+ values[7] = CStringGetTextDatum("SYNC");
+ else
+ values[7] = CStringGetTextDatum("POTENTIAL");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c
index 1cf69a09c8..5d684b2b85 100644
--- a/src/backend/storage/ipc/shmqueue.c
+++ b/src/backend/storage/ipc/shmqueue.c
@@ -104,7 +104,6 @@ SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem)
* element. Inserting "after" the queue head puts the elem
* at the head of the queue.
*/
-#ifdef NOT_USED
void
SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
{
@@ -118,7 +117,6 @@ SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
queue->next = elem;
nextPtr->prev = elem;
}
-#endif /* NOT_USED */
/*--------------------
* SHMQueueNext -- Get the next element from a queue
@@ -156,6 +154,25 @@ SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
return (Pointer) (((char *) elemPtr) - linkOffset);
}
+/*--------------------
+ * SHMQueuePrev -- Get the previous element from a queue
+ *
+ * Same as SHMQueueNext, just starting at tail and moving towards head
+ * All other comments and usage applies.
+ */
+Pointer
+SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
+{
+ SHM_QUEUE *elemPtr = curElem->prev;
+
+ Assert(ShmemAddrIsValid(curElem));
+
+ if (elemPtr == queue) /* back to the queue head? */
+ return NULL;
+
+ return (Pointer) (((char *) elemPtr) - linkOffset);
+}
+
/*
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
*/
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index afaf5995f0..ee03316050 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -39,6 +39,7 @@
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+#include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/pmsignal.h"
@@ -196,6 +197,7 @@ InitProcGlobal(void)
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
ProcGlobal->freeProcs = &procs[i];
+ InitSharedLatch(&procs[i].waitLatch);
}
/*
@@ -214,6 +216,7 @@ InitProcGlobal(void)
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = &procs[i];
+ InitSharedLatch(&procs[i].waitLatch);
}
/*
@@ -224,6 +227,7 @@ InitProcGlobal(void)
{
AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */
PGSemaphoreCreate(&(AuxiliaryProcs[i].sem));
+ InitSharedLatch(&procs[i].waitLatch);
}
/* Create ProcStructLock spinlock, too */
@@ -326,6 +330,13 @@ InitProcess(void)
SHMQueueInit(&(MyProc->myProcLocks[i]));
MyProc->recoveryConflictPending = false;
+ /* Initialise for sync rep */
+ MyProc->waitLSN.xlogid = 0;
+ MyProc->waitLSN.xrecoff = 0;
+ MyProc->syncRepState = SYNC_REP_NOT_WAITING;
+ SHMQueueElemInit(&(MyProc->syncRepLinks));
+ OwnLatch((Latch *) &MyProc->waitLatch);
+
/*
* We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly
@@ -365,6 +376,7 @@ InitProcessPhase2(void)
/*
* Arrange to clean that up at backend exit.
*/
+ on_shmem_exit(SyncRepCleanupAtProcExit, 0);
on_shmem_exit(RemoveProcFromArray, 0);
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 529148a040..0bf1845599 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -55,6 +55,7 @@
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
+#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
@@ -753,6 +754,14 @@ static struct config_bool ConfigureNamesBool[] =
&XactSyncCommit,
true, NULL, NULL
},
+ {
+ {"synchronous_replication", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Requests synchronous replication."),
+ NULL
+ },
+ &sync_rep_mode,
+ false, NULL, NULL
+ },
{
{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing past damaged page headers."),
@@ -2716,6 +2725,16 @@ static struct config_string ConfigureNamesString[] =
"pg_stat_tmp", assign_pgstat_temp_directory, NULL
},
+ {
+ {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION,
+ gettext_noop("List of potential standby names to synchronise with."),
+ NULL,
+ GUC_LIST_INPUT
+ },
+ &SyncRepStandbyNames,
+ "", assign_synchronous_standby_names, NULL
+ },
+
{
{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
gettext_noop("Sets default text search configuration."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6bfd0fd87c..ed70223f13 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -184,7 +184,16 @@
#archive_timeout = 0 # force a logfile segment switch after this
# number of seconds; 0 disables
-# - Streaming Replication -
+# - Replication - User Settings
+
+#synchronous_replication = off # does commit wait for reply from standby
+
+# - Streaming Replication - Server Settings
+
+#synchronous_standby_names = '' # standby servers that provide sync rep
+ # comma-separated list of application_name from standby(s);
+ # '*' = all
+
#max_wal_senders = 0 # max number of walsender processes
# (change requires restart)
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 96a463398c..0533e5a686 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2542,7 +2542,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23,25}" "{o,o,o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 5843307c9d..8a8c9398d1 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -15,6 +15,7 @@
#include "access/xlog.h"
#include "nodes/nodes.h"
#include "storage/latch.h"
+#include "replication/syncrep.h"
#include "storage/spin.h"
@@ -52,11 +53,32 @@ typedef struct WalSnd
* to do.
*/
Latch latch;
+
+ /*
+ * The priority order of the standby managed by this WALSender, as
+ * listed in synchronous_standby_names, or 0 if not-listed.
+ * Protected by SyncRepLock.
+ */
+ int sync_standby_priority;
} WalSnd;
+extern WalSnd *MyWalSnd;
+
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
+ /*
+ * Synchronous replication queue. Protected by SyncRepLock.
+ */
+ SHM_QUEUE SyncRepQueue;
+
+ /*
+ * Current location of the head of the queue. All waiters should have
+ * a waitLSN that follows this value, or they are currently being woken
+ * to remove themselves from the queue. Protected by SyncRepLock.
+ */
+ XLogRecPtr lsn;
+
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index ad0bcd775b..438a48d8dc 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -78,6 +78,7 @@ typedef enum LWLockId
SerializableFinishedListLock,
SerializablePredicateLockListLock,
OldSerXidLock,
+ SyncRepLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 78dbadef4c..1d6642c6c7 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -14,6 +14,9 @@
#ifndef _PROC_H_
#define _PROC_H_
+#include "access/xlog.h"
+#include "replication/syncrep.h"
+#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
#include "utils/timestamp.h"
@@ -115,6 +118,17 @@ struct PGPROC
LOCKMASK heldLocks; /* bitmask for lock types already held on this
* lock object by this backend */
+ /*
+ * Info to allow us to wait for synchronous replication, if needed.
+ * waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend.
+ * syncRepState must not be touched except by owning process or WALSender.
+ * syncRep_links used only while holding SyncRepLock.
+ */
+ Latch waitLatch; /* allow us to wait for sync rep */
+ XLogRecPtr waitLSN; /* waiting for this LSN or higher */
+ int syncRepState; /* wait state for sync rep */
+ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */
+
/*
* All PROCLOCK objects for locks held or awaited by this backend are
* linked into one of these lists, according to the partition number of
diff --git a/src/include/storage/shmem.h b/src/include/storage/shmem.h
index f23740c9e3..0b7da77ccd 100644
--- a/src/include/storage/shmem.h
+++ b/src/include/storage/shmem.h
@@ -67,8 +67,11 @@ extern void SHMQueueInit(SHM_QUEUE *queue);
extern void SHMQueueElemInit(SHM_QUEUE *queue);
extern void SHMQueueDelete(SHM_QUEUE *queue);
extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
+extern void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem);
extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem,
Size linkOffset);
+extern Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem,
+ Size linkOffset);
extern bool SHMQueueEmpty(const SHM_QUEUE *queue);
extern bool SHMQueueIsDetached(const SHM_QUEUE *queue);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 02043ab42c..20cdc39752 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1298,7 +1298,7 @@ SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
- pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
+ pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority, w.sync_state FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));