diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 3f8d9bfafb..dca24fc070 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -233,6 +233,11 @@
query rewrite rules
+
+ pg_replication_slots
+ replication slot information
+
+
pg_seclabelsecurity labels on database objects
@@ -5157,6 +5162,100 @@
+
+ pg_replication_slots
+
+
+ pg_replication_slots
+
+
+
+ The pg_replication_slots view provides a listing
+ of all replication slots that currently exist on the database cluster,
+ along with their current state.
+
+
+
+ For more on replication slots,
+ see .
+
+
+
+
+ pg_replication_slots Columns
+
+
+
+
+ Name
+ Type
+ References
+ Description
+
+
+
+
+
+ slot_name
+ text
+
+ A unique, cluster-wide identifier for the replication slot
+
+
+
+ slot_type
+ text
+
+ The slot type - physical> or logical>
+
+
+
+ datoid
+ oid
+ pg_database.oid
+ The oid of the database this slot is associated with, or
+ null. Only logical slots have an associated database.
+
+
+
+ database
+ text
+ pg_database.datname
+ The name of the database this slot is associated with, or
+ null. Only logical slots have an associated database.
+
+
+
+ active
+ boolean
+
+ True if this slot is currently actively being used
+
+
+
+ xmin
+ xid
+
+ The oldest transaction that this slot needs the database to
+ retain. VACUUM cannot remove tuples deleted
+ by any later transaction.
+
+
+
+
+ restart_lsn
+ text
+
+ The address (LSN) of oldest WAL which still
+ might be required by the consumer of this slot and thus won't be
+ automatically removed during checkpoints.
+
+
+
+
+
+
+
pg_seclabel
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 1b5f831d65..000a46fabb 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2348,6 +2348,25 @@ include 'filename'
+
+ max_replication_slots (integer)
+
+ max_replication_slots> configuration parameter
+
+
+
+ Specifies the maximum number of replication slots
+ (see that the server
+ can support. The default is zero. This parameter can only be set at
+ server start.
+ wal_level must be set
+ to archive or higher to allow replication slots to
+ be used. Setting it to a lower value than the number of currently
+ existing replication slots will prevent the server from starting.
+
+
+
+
wal_keep_segments (integer)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 252539f93b..8cc65b94d1 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -16290,6 +16290,76 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
+
+ Replication Functions
+
+
+ PostgreSQL exposes a number of functions for controlling and interacting
+ with replication features. See
+ and .
+
+
+
+ Many of these functions have equivalent commands in the replication
+ protocol; see .
+
+
+
+ The sections , and are also relevant for replication.
+
+
+
+ Replication SQL Functions
+
+
+
+ Function
+ Return Type
+ Description
+
+
+
+
+
+
+ pg_create_physical_replication_slot
+
+ pg_create_physical_replication_slot(slotnametext, plugintext)
+
+
+ (slotnametext, xlog_positiontext)
+
+
+ Creates a new physical replication slot named
+ slotname. Streaming changes from a physical slot
+ is only possible with the walsender protocol - see . Corresponds to the walsender protocol
+ command CREATE_REPLICATION_SLOT ... PHYSICAL.
+
+
+
+
+
+ pg_drop_replication_slot
+
+ pg_drop_replication_slot(slotnametext)
+
+
+ (slotnametext)
+
+
+ Drops the physical or logical replication slot
+ named slotname. Same as walsender protocol
+ command DROP_REPLICATION_SLOT>.
+
+
+
+
+
+
+
Database Object Management Functions
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index e2e5ac93ab..9d43586fe2 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -643,7 +643,9 @@ protocol to make nodes agree on a serializable transactional order.
entries in pg_hba.conf> with the database field set to
replication>. Also ensure max_wal_senders> is set
to a sufficiently large value in the configuration file of the primary
- server.
+ server. If replication slots will be used,
+ ensure that max_replication_slots is set sufficiently
+ high as well.
@@ -750,13 +752,14 @@ archive_cleanup_command = 'pg_archivecleanup /path/to/archive %r'
If you use streaming replication without file-based continuous
- archiving, you have to set wal_keep_segments> in the master
- to a value high enough to ensure that old WAL segments are not recycled
- too early, while the standby might still need them to catch up. If the
- standby falls behind too much, it needs to be reinitialized from a new
- base backup. If you set up a WAL archive that's accessible from the
- standby, wal_keep_segments> is not required as the standby can always
- use the archive to catch up.
+ archiving, the server might recycle old WAL segments before the standby
+ has received them. If this occurs, the standby will need to be
+ reinitialized from a new base backup. You can avoid this by setting
+ wal_keep_segments> to a value large enough to ensure that
+ WAL segments are not recycled too early, or by configuration a replication
+ slot for the standby. If you set up a WAL archive that's accessible from
+ the standby, these solutions are not required, since the standby can
+ always use the archive to catch up provided it retains enough segments.
@@ -871,6 +874,81 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
+
+ Replication Slots
+
+ Replication Slots
+
+
+ Replication slots provide an automated way to ensure that the master does
+ not remove WAL segments until they have been received by all standbys,
+ and that the master does not remove rows which could cause a
+ recovery conflict> even when the
+ standby is disconnected.
+
+
+ In lieu of using replication slots, it is possible to prevent the removal
+ of old WAL segments using , or by
+ storing the segments in an archive using .
+ However, these methods often result in retaining more WAL segments than
+ required, whereas replication slots retain only the number of segments
+ known to be needed. An advantage of these methods is that they bound
+ the space requirement for pg_xlog>; there is currently no way
+ to do this using replication slots.
+
+
+ Similarly, hot_standby_feedback
+ and vacuum_defer_cleanup_age provide protection against
+ relevant rows being removed by vacuum, but the former provides no
+ protection during any time period when the standby is not connected,
+ and the latter often needs to be set to a high value to provide adequate
+ protection. Replication slots overcome these disadvantages.
+
+
+ Querying and manipulating replication slots
+
+ Each replication slot has a name, which can contain lower-case letters,
+ numbers, and the underscore character.
+
+
+ Existing replication slots and their state can be seen in the
+ pg_replication_slots
+ view.
+
+
+ Slots can be created and dropped either via the streaming replication
+ protocol (see ) or via SQL
+ functions (see ).
+
+
+
+ Configuration Example
+
+ You can create a replication slot like this:
+
+postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
+ slotname | xlog_position
+-------------+---------------
+ node_a_slot |
+
+postgres=# SELECT * FROM pg_replication_slots;
+ slot_name | slot_type | datoid | database | active | xmin | restart_lsn
+-------------+-----------+--------+----------+--------+------+-------------
+ node_a_slot | physical | 0 | | f | |
+(1 row)
+
+ To configure the standby to use this slot, primary_slotname>
+ should be configured in the standby's recovery.conf>.
+ Here is a simple example:
+
+standby_mode = 'on'
+primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
+primary_slotname = 'node_a_slot'
+
+
+
+
+
Cascading Replication
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 7d99976a49..832524e95e 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1401,15 +1401,39 @@ The commands accepted in walsender mode are:
- START_REPLICATION XXX/XXX> TIMELINE tli>
+ CREATE_REPLICATION_SLOTslotname> PHYSICAL
+ CREATE_REPLICATION_SLOT
+
+
+ Create a physical replication
+ slot. See for more about
+ replication slots.
+
+
+
+ slotname>
+
+
+ The name of the slot to create. Must be a valid replication slot
+ name (see ).
+
+
+
+
+
+
+
+
+ START_REPLICATION [SLOTslotname>] [PHYSICAL] XXX/XXX> TIMELINEtli>
Instructs server to start streaming WAL, starting at
- WAL position XXX/XXX> on timeline
- tli>.
- The server can reply with an error, e.g. if the requested section of WAL
- has already been recycled. On success, server responds with a
- CopyBothResponse message, and then starts to stream WAL to the frontend.
+ WAL position XXX/XXX>. If specified,
+ streaming starts on timeline tli>;
+ otherwise, the server's current timeline is selected. The server can
+ reply with an error, e.g. if the requested section of WAL has already
+ been recycled. On success, server responds with a CopyBothResponse
+ message, and then starts to stream WAL to the frontend.
@@ -1443,6 +1467,14 @@ The commands accepted in walsender mode are:
client contains a message of one of the following formats:
+
+ If a slot's name is provided
+ via slotname>, it will be updated
+ as replication progresses so that the server knows which WAL segments -
+ and if hot_standby_feedback> is on which transactions -
+ are still needed by the standby.
+
+
@@ -1719,6 +1751,26 @@ The commands accepted in walsender mode are:
+
+ DROP_REPLICATION_SLOTslotname>
+
+
+ Drops a replication slot, freeing any reserved server-side resources. If
+ the slot is currently in use by an active connection, this command fails.
+
+
+
+ slotname>
+
+
+ The name of the slot to drop.
+
+
+
+
+
+
+
BASE_BACKUP [LABEL'label'] [PROGRESS] [FAST] [WAL] [NOWAIT]
diff --git a/doc/src/sgml/recovery-config.sgml b/doc/src/sgml/recovery-config.sgml
index 4a97bb7a9c..b69ce287c8 100644
--- a/doc/src/sgml/recovery-config.sgml
+++ b/doc/src/sgml/recovery-config.sgml
@@ -418,6 +418,22 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
+
+ primary_slotname (string)
+
+ primary_slotname> recovery parameter
+
+
+
+ Optionally specifies an existing replication slot to be used when
+ connecting to the primary via streaming replication to control
+ resource removal on the upstream node
+ (see ).
+ This setting has no effect if primary_conninfo> is not
+ set.
+
+
+ trigger_file (string)
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index 19bebb62f7..2a44af46c5 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -225,6 +225,24 @@ PostgreSQL documentation
+
+
+
+
+
+ Require pg_receivexlog to use an existing
+ replication slot (see ).
+ When this option is used, pg_receivexlog> will report
+ a flush position to the server, indicating when each segment has been
+ synchronized to disk so that the server can remove that segment if it
+ is not otherwise needed. When using this paramter, it is important
+ to make sure that pg_receivexlog> cannot become the
+ synchronous standby through an incautious setting of
+ ; it does not flush
+ data frequently enough for this to work correctly.
+
+
+
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b333d820c7..7f63185b1c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -39,6 +39,7 @@
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/barrier.h"
@@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false;
static char *PrimaryConnInfo = NULL;
+static char *PrimarySlotName = NULL;
static char *TriggerFile = NULL;
/* are we currently in standby mode? */
@@ -485,6 +487,8 @@ typedef struct XLogCtlData
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
+ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
+
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
* segment */
@@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void);
static void CreateEndOfRecoveryRecord(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2908,6 +2913,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
SetLatch(ProcGlobal->walwriterLatch);
}
+/*
+ * Record the LSN up to which we can remove WAL because it's not required by
+ * any replication slot.
+ */
+void
+XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->replicationSlotMinLSN = lsn;
+ SpinLockRelease(&xlogctl->info_lck);
+}
+
+
+/*
+ * Return the oldest LSN we must retain to satisfy the needs of some
+ * replication slot.
+ */
+static XLogRecPtr
+XLogGetReplicationSlotMinimumLSN(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr retval;
+ SpinLockAcquire(&xlogctl->info_lck);
+ retval = xlogctl->replicationSlotMinLSN;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return retval;
+}
+
/*
* Advance minRecoveryPoint in control file.
*
@@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void)
(errmsg_internal("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
+ else if (strcmp(item->name, "primary_slotname") == 0)
+ {
+ ReplicationSlotValidateName(item->value, ERROR);
+ PrimarySlotName = pstrdup(item->value);
+ ereport(DEBUG2,
+ (errmsg_internal("primary_slotname = '%s'",
+ PrimarySlotName)));
+ }
else if (strcmp(item->name, "trigger_file") == 0)
{
TriggerFile = pstrdup(item->value);
@@ -6505,6 +6551,12 @@ StartupXLOG(void)
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
+ /*
+ * Initialize replication slots, before there's a chance to remove
+ * required resources.
+ */
+ StartupReplicationSlots(checkPoint.redo);
+
/*
* Startup MultiXact. We need to do this early for two reasons: one
* is that we might try to access multixacts when we do tuple freezing,
@@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointMultiXact();
CheckPointPredicate();
CheckPointRelationMap();
+ CheckPointReplicationSlots();
CheckPointBuffers(flags); /* performs all required fsyncs */
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
@@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags)
/*
* Retreat *logSegNo to the last segment that we need to retain because of
- * wal_keep_segments. This is calculated by subtracting wal_keep_segments
- * from the given xlog location, recptr.
+ * either wal_keep_segments or replication slots.
+ *
+ * This is calculated by subtracting wal_keep_segments from the given xlog
+ * location, recptr and by making sure that that result is below the
+ * requirement of replication slots.
*/
static void
KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
{
XLogSegNo segno;
-
- if (wal_keep_segments == 0)
- return;
+ XLogRecPtr keep;
XLByteToSeg(recptr, segno);
+ keep = XLogGetReplicationSlotMinimumLSN();
- /* avoid underflow, don't go below 1 */
- if (segno <= wal_keep_segments)
- segno = 1;
- else
- segno = segno - wal_keep_segments;
+ /* compute limit for wal_keep_segments first */
+ if (wal_keep_segments > 0)
+ {
+ /* avoid underflow, don't go below 1 */
+ if (segno <= wal_keep_segments)
+ segno = 1;
+ else
+ segno = segno - wal_keep_segments;
+ }
+
+ /* then check whether slots limit removal further */
+ if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+ {
+ XLogRecPtr slotSegNo;
+
+ XLByteToSeg(keep, slotSegNo);
+
+ if (slotSegNo <= 0)
+ segno = 1;
+ else if (slotSegNo < segno)
+ segno = slotSegNo;
+ }
/* don't delete WAL segments newer than the calculated segment */
if (segno < *logSegNo)
@@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
tli, curFileTLI);
}
curFileTLI = tli;
- RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+ RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
+ PrimarySlotName);
receivedUpto = 0;
}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 277af61f9d..f02efeca97 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -613,6 +613,18 @@ CREATE VIEW pg_stat_replication AS
WHERE S.usesysid = U.oid AND
S.pid = W.pid;
+CREATE VIEW pg_replication_slots AS
+ SELECT
+ L.slot_name,
+ L.slot_type,
+ L.datoid,
+ D.datname AS database,
+ L.active,
+ L.xmin,
+ L.restart_lsn
+ FROM pg_get_replication_slots() AS L
+ LEFT JOIN pg_database D ON (L.datoid = D.oid);
+
CREATE VIEW pg_stat_database AS
SELECT
D.oid AS datid,
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 2dde0118a4..7941cb8d5e 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
- repl_gram.o syncrep.o
+ repl_gram.o slot.o slotfuncs.o syncrep.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 60120ede29..2f5df49de6 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -47,8 +47,9 @@ to fetch more WAL (if streaming replication is configured).
Walreceiver is a postmaster subprocess, so the startup process can't fork it
directly. Instead, it sends a signal to postmaster, asking postmaster to launch
-it. Before that, however, startup process fills in WalRcvData->conninfo,
-and initializes the starting point in WalRcvData->receiveStart.
+it. Before that, however, startup process fills in WalRcvData->conninfo
+and WalRcvData->slotname, and initializes the starting point in
+WalRcvData->receiveStart.
As walreceiver receives WAL from the master server, and writes and flushes
it to disk (in pg_xlog), it updates WalRcvData->receivedUpto and signals
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 7d0ed9ce4c..781f678097 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -847,6 +847,10 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
continue;
+ /* Skip pg_replslot, not useful to copy */
+ if (strcmp(de->d_name, "pg_replslot") == 0)
+ continue;
+
/*
* Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 2e057b8969..ecec8b3456 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -49,7 +49,8 @@ static char *recvBuf = NULL;
static void libpqrcv_connect(char *conninfo);
static void libpqrcv_identify_system(TimeLineID *primary_tli);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
-static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
+static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
+ char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
@@ -171,15 +172,20 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
* throws an ERROR.
*/
static bool
-libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
+libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
{
char cmd[64];
PGresult *res;
/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
- (uint32) (startpoint >> 32), (uint32) startpoint,
- tli);
+ if (slotname != NULL)
+ snprintf(cmd, sizeof(cmd),
+ "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
+ (uint32) (startpoint >> 32), (uint32) startpoint, tli);
+ else
+ snprintf(cmd, sizeof(cmd),
+ "START_REPLICATION %X/%X TIMELINE %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint, tli);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 015aa44d89..d4bd59bab2 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -65,7 +65,7 @@ Node *replication_parse_result;
}
/* Non-keyword tokens */
-%token SCONST
+%token SCONST IDENT
%token UCONST
%token RECPTR
@@ -73,6 +73,8 @@ Node *replication_parse_result;
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
%token K_START_REPLICATION
+%token K_CREATE_REPLICATION_SLOT
+%token K_DROP_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_LABEL
%token K_PROGRESS
@@ -80,12 +82,15 @@ Node *replication_parse_result;
%token K_NOWAIT
%token K_WAL
%token K_TIMELINE
+%token K_PHYSICAL
+%token K_SLOT
%type command
-%type base_backup start_replication identify_system timeline_history
+%type base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
%type base_backup_opt_list
%type base_backup_opt
%type opt_timeline
+%type opt_slot
%%
firstcmd: command opt_semicolon
@@ -102,6 +107,8 @@ command:
identify_system
| base_backup
| start_replication
+ | create_replication_slot
+ | drop_replication_slot
| timeline_history
;
@@ -158,18 +165,42 @@ base_backup_opt:
}
;
+/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
+create_replication_slot:
+ K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
+ {
+ CreateReplicationSlotCmd *cmd;
+ cmd = makeNode(CreateReplicationSlotCmd);
+ cmd->kind = REPLICATION_KIND_PHYSICAL;
+ cmd->slotname = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
+/* DROP_REPLICATION_SLOT SLOT slot */
+drop_replication_slot:
+ K_DROP_REPLICATION_SLOT IDENT
+ {
+ DropReplicationSlotCmd *cmd;
+ cmd = makeNode(DropReplicationSlotCmd);
+ cmd->slotname = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
/*
- * START_REPLICATION %X/%X [TIMELINE %d]
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/
start_replication:
- K_START_REPLICATION RECPTR opt_timeline
+ K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
{
StartReplicationCmd *cmd;
cmd = makeNode(StartReplicationCmd);
- cmd->startpoint = $2;
- cmd->timeline = $3;
-
+ cmd->kind = REPLICATION_KIND_PHYSICAL;
+ cmd->slotname = $2;
+ cmd->startpoint = $4;
+ cmd->timeline = $5;
$$ = (Node *) cmd;
}
;
@@ -205,6 +236,15 @@ timeline_history:
$$ = (Node *) cmd;
}
;
+
+opt_physical : K_PHYSICAL | /* EMPTY */;
+
+
+opt_slot : K_SLOT IDENT
+ {
+ $$ = $2;
+ }
+ | /* nothing */ { $$ = NULL; }
%%
#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 01e5ac6efb..24195a5971 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -16,6 +16,7 @@
#include "postgres.h"
#include "utils/builtins.h"
+#include "parser/scansup.h"
/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
#undef fprintf
@@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar);
%option warn
%option prefix="replication_yy"
-%x xq
+%x xq xd
/* Extended quote
* xqdouble implements embedded quote, ''''
@@ -57,12 +58,26 @@ xqstart {quote}
xqdouble {quote}{quote}
xqinside [^']+
+/* Double quote
+ * Allows embedded spaces and other special characters into identifiers.
+ */
+dquote \"
+xdstart {dquote}
+xdstop {dquote}
+xddouble {dquote}{dquote}
+xdinside [^"]+
+
digit [0-9]+
hexdigit [0-9A-Za-z]+
quote '
quotestop {quote}
+ident_start [A-Za-z\200-\377_]
+ident_cont [A-Za-z\200-\377_0-9\$]
+
+identifier {ident_start}{ident_cont}*
+
%%
BASE_BACKUP { return K_BASE_BACKUP; }
@@ -74,9 +89,16 @@ PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
+CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
+DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
+PHYSICAL { return K_PHYSICAL; }
+SLOT { return K_SLOT; }
+
"," { return ','; }
";" { return ';'; }
+"(" { return '('; }
+")" { return ')'; }
[\n] ;
[\t] ;
@@ -100,20 +122,49 @@ TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
BEGIN(xq);
startlit();
}
+
{quotestop} {
yyless(1);
BEGIN(INITIAL);
yylval.str = litbufdup();
return SCONST;
}
-{xqdouble} {
+
+{xqdouble} {
addlitchar('\'');
}
+
{xqinside} {
addlit(yytext, yyleng);
}
-<> { yyerror("unterminated quoted string"); }
+{xdstart} {
+ BEGIN(xd);
+ startlit();
+ }
+
+{xdstop} {
+ int len;
+ yyless(1);
+ BEGIN(INITIAL);
+ yylval.str = litbufdup();
+ len = strlen(yylval.str);
+ truncate_identifier(yylval.str, len, true);
+ return IDENT;
+ }
+
+{xdinside} {
+ addlit(yytext, yyleng);
+ }
+
+{identifier} {
+ int len = strlen(yytext);
+
+ yylval.str = downcase_truncate_identifier(yytext, len, true);
+ return IDENT;
+ }
+
+<> { yyerror("unterminated quoted string"); }
<> {
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
new file mode 100644
index 0000000000..30aff5f5e3
--- /dev/null
+++ b/src/backend/replication/slot.c
@@ -0,0 +1,1066 @@
+/*-------------------------------------------------------------------------
+ *
+ * slot.c
+ * Replication slot management.
+ *
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/slot.c
+ *
+ * NOTES
+ *
+ * Replication slots are used to keep state about replication streams
+ * originating from this cluster. Their primary purpose is to prevent the
+ * premature removal of WAL or of old tuple versions in a manner that would
+ * interfere with replication; they also useful for monitoring purposes.
+ * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
+ * on standbys (to support cascading setups). The requirement that slots be
+ * usable on standbys precludes storing them in the system catalogs.
+ *
+ * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
+ * directory. Inside that directory the state file will contain the slot's
+ * own data. Additional data can be stored alongside that file if required.
+ * While the server is running, the state data is also cached in memory for
+ * efficiency.
+ *
+ * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
+ * or free a slot. ReplicationSlotControlLock must be taken in shared mode
+ * to iterate over the slots, and in exclusive mode to change the in_use flag
+ * of a slot. The remaining data in each slot is protected by its mutex.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include
+#include
+
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "storage/fd.h"
+#include "storage/procarray.h"
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+ /* first part of this struct needs to be version independent */
+
+ /* data not covered by checksum */
+ uint32 magic;
+ pg_crc32 checksum;
+
+ /* data covered by checksum */
+ uint32 version;
+ uint32 length;
+
+ ReplicationSlotPersistentData slotdata;
+} ReplicationSlotOnDisk;
+
+/* size of the part of the slot that is version independent */
+#define ReplicationSlotOnDiskConstantSize \
+ offsetof(ReplicationSlotOnDisk, slotdata)
+/* size of the slots that is not version indepenent */
+#define ReplicationSlotOnDiskDynamicSize \
+ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC 0x1051CA1 /* format identifier */
+#define SLOT_VERSION 1 /* version for new files */
+
+/* Control array for replication slot management */
+ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
+
+/* My backend's replication slot in the shared memory array */
+ReplicationSlot *MyReplicationSlot = NULL;
+
+/* GUCs */
+int max_replication_slots = 0; /* the maximum number of replication slots */
+
+/* internal persistency functions */
+static void RestoreSlotFromDisk(const char *name);
+static void CreateSlotOnDisk(ReplicationSlot *slot);
+static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
+
+/*
+ * Report shared-memory space needed by ReplicationSlotShmemInit.
+ */
+Size
+ReplicationSlotsShmemSize(void)
+{
+ Size size = 0;
+
+ if (max_replication_slots == 0)
+ return size;
+
+ size = offsetof(ReplicationSlotCtlData, replication_slots);
+ size = add_size(size,
+ mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+
+ return size;
+}
+
+/*
+ * Allocate and initialize walsender-related shared memory.
+ */
+void
+ReplicationSlotsShmemInit(void)
+{
+ bool found;
+
+ if (max_replication_slots == 0)
+ return;
+
+ ReplicationSlotCtl = (ReplicationSlotCtlData *)
+ ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ int i;
+
+ /* First time through, so initialize */
+ MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
+
+ /* everything else is zeroed by the memset above */
+ SpinLockInit(&slot->mutex);
+ slot->io_in_progress_lock = LWLockAssign();
+ }
+ }
+}
+
+/*
+ * Check whether the passed slot name is valid and report errors at elevel.
+ *
+ * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
+ * the name to be uses as a directory name on every supported OS.
+ *
+ * Returns whether the directory name is valid or not if elevel < ERROR.
+ */
+bool
+ReplicationSlotValidateName(const char *name, int elevel)
+{
+ const char *cp;
+
+ if (strlen(name) == 0)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("replication slot name \"%s\" is too short",
+ name)));
+ return false;
+ }
+
+ if (strlen(name) >= NAMEDATALEN)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("replication slot name \"%s\" is too long",
+ name)));
+ return false;
+ }
+
+ for (cp = name; *cp; cp++)
+ {
+ if (!((*cp >= 'a' && *cp <= 'z')
+ || (*cp >= '0' && *cp <= '9')
+ || (*cp == '_')))
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("replication slot name \"%s\" contains invalid character",
+ name),
+ errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
+ return false;
+ }
+ }
+ return true;
+}
+
+/*
+ * Create a new replication slot and mark it as used by this backend.
+ *
+ * name: Name of the slot
+ * db_specific: changeset extraction is db specific, if the slot is going to
+ * be used for that pass true, otherwise false.
+ */
+void
+ReplicationSlotCreate(const char *name, bool db_specific)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /*
+ * If some other backend ran this code currently with us, we'd likely
+ * both allocate the same slot, and that would be bad. We'd also be
+ * at risk of missing a name collision. Also, we don't want to try to
+ * create a new slot while somebody's busy cleaning up an old one, because
+ * we might both be monkeying with the same directory.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+ /*
+ * Check for name collision, and identify an allocatable slot. We need
+ * to hold ReplicationSlotControlLock in shared mode for this, so that
+ * nobody else can change the in_use flags while we're looking at them.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("replication slot \"%s\" already exists", name)));
+ if (!s->in_use && slot == NULL)
+ slot = s;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* If all slots are in use, we're out of luck. */
+ if (slot == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("all replication slots are in use"),
+ errhint("Free one or increase max_replication_slots.")));
+
+ /*
+ * Since this slot is not in use, nobody should be looking at any
+ * part of it other than the in_use field unless they're trying to allocate
+ * it. And since we hold ReplicationSlotAllocationLock, nobody except us
+ * can be doing that. So it's safe to initialize the slot.
+ */
+ Assert(!slot->in_use);
+ Assert(!slot->active);
+ slot->data.xmin = InvalidTransactionId;
+ slot->effective_xmin = InvalidTransactionId;
+ strncpy(NameStr(slot->data.name), name, NAMEDATALEN);
+ NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0';
+ slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
+ slot->data.restart_lsn = InvalidXLogRecPtr;
+
+ /*
+ * Create the slot on disk. We haven't actually marked the slot allocated
+ * yet, so no special cleanup is required if this errors out.
+ */
+ CreateSlotOnDisk(slot);
+
+ /*
+ * We need to briefly prevent any other backend from iterating over the
+ * slots while we flip the in_use flag. We also need to set the active
+ * flag while holding the ControlLock as otherwise a concurrent
+ * SlotAcquire() could acquire the slot as well.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+
+ slot->in_use = true;
+
+ /* We can now mark the slot active, and that makes it our slot. */
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&slot->mutex);
+ Assert(!vslot->active);
+ vslot->active = true;
+ SpinLockRelease(&slot->mutex);
+ MyReplicationSlot = slot;
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Now that the slot has been marked as in_use and in_active, it's safe to
+ * let somebody else try to allocate a slot.
+ */
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Find an previously created slot and mark it as used by this backend.
+ */
+void
+ReplicationSlotAcquire(const char *name)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+ bool active = false;
+
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ active = vslot->active;
+ vslot->active = true;
+ SpinLockRelease(&s->mutex);
+ slot = s;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* If we did not find the slot or it was already active, error out. */
+ if (slot == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
+ if (active)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is already active", name)));
+
+ /* We made this slot active, so it's ours now. */
+ MyReplicationSlot = slot;
+}
+
+/*
+ * Release a replication slot, this or another backend can ReAcquire it
+ * later. Resources this slot requires will be preserved.
+ */
+void
+ReplicationSlotRelease(void)
+{
+ ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(slot != NULL && slot->active);
+
+ /* Mark slot inactive. We're not freeing it, just disconnecting. */
+ {
+ volatile ReplicationSlot *vslot = slot;
+ SpinLockAcquire(&slot->mutex);
+ vslot->active = false;
+ SpinLockRelease(&slot->mutex);
+ MyReplicationSlot = NULL;
+ }
+}
+
+/*
+ * Permanently drop replication slot identified by the passed in name.
+ */
+void
+ReplicationSlotDrop(const char *name)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+ bool active;
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /*
+ * If some other backend ran this code currently with us, we might both
+ * try to free the same slot at the same time. Or we might try to delete
+ * a slot with a certain name while someone else was trying to create a
+ * slot with the same name.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
+
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ active = vslot->active;
+ vslot->active = true;
+ SpinLockRelease(&s->mutex);
+ slot = s;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* If we did not find the slot or it was already active, error out. */
+ if (slot == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
+ if (active)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is already active", name)));
+
+ /* Generate pathnames. */
+ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
+
+ /*
+ * Rename the slot directory on disk, so that we'll no longer recognize
+ * this as a valid slot. Note that if this fails, we've got to mark the
+ * slot inactive again before bailing out.
+ */
+ if (rename(path, tmppath) != 0)
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&slot->mutex);
+ vslot->active = false;
+ SpinLockRelease(&slot->mutex);
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ path, tmppath)));
+ }
+
+ /*
+ * We need to fsync() the directory we just renamed and its parent to make
+ * sure that our changes are on disk in a crash-safe fashion. If fsync()
+ * fails, we can't be sure whether the changes are on disk or not. For
+ * now, we handle that by panicking; StartupReplicationSlots() will
+ * try to straighten it out after restart.
+ */
+ START_CRIT_SECTION();
+ fsync_fname(tmppath, true);
+ fsync_fname("pg_replslot", true);
+ END_CRIT_SECTION();
+
+ /*
+ * The slot is definitely gone. Lock out concurrent scans of the array
+ * long enough to kill it. It's OK to clear the active flag here without
+ * grabbing the mutex because nobody else can be scanning the array here,
+ * and nobody can be attached to this slot and thus access it without
+ * scanning the array.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
+ slot->active = false;
+ slot->in_use = false;
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Slot is dead and doesn't prevent resource removal anymore, recompute
+ * limits.
+ */
+ ReplicationSlotsComputeRequiredXmin();
+ ReplicationSlotsComputeRequiredLSN();
+
+ /*
+ * If removing the directory fails, the worst thing that will happen is
+ * that the user won't be able to create a new slot with the same name
+ * until the next server restart. We warn about it, but that's all.
+ */
+ if (!rmtree(tmppath, true))
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", tmppath)));
+
+ /*
+ * We release this at the very end, so that nobody starts trying to create
+ * a slot while we're still cleaning up the detritus of the old one.
+ */
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Serialize the currently acquired slot's state from memory to disk, thereby
+ * guaranteeing the current state will survive a crash.
+ */
+void
+ReplicationSlotSave(void)
+{
+ char path[MAXPGPATH];
+
+ Assert(MyReplicationSlot != NULL);
+
+ sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
+ SaveSlotToPath(MyReplicationSlot, path, ERROR);
+}
+
+/*
+ * Signal that it would be useful if the currently acquired slot would be
+ * flushed out to disk.
+ *
+ * Note that the actual flush to disk can be delayed for a long time, if
+ * required for correctness explicitly do a ReplicationSlotSave().
+ */
+void
+ReplicationSlotMarkDirty(void)
+{
+ Assert(MyReplicationSlot != NULL);
+
+ {
+ volatile ReplicationSlot *vslot = MyReplicationSlot;
+
+ SpinLockAcquire(&vslot->mutex);
+ MyReplicationSlot->just_dirtied = true;
+ MyReplicationSlot->dirty = true;
+ SpinLockRelease(&vslot->mutex);
+ }
+}
+
+/*
+ * Compute the oldest xmin across all slots and store it in the ProcArray.
+ */
+void
+ReplicationSlotsComputeRequiredXmin(void)
+{
+ int i;
+ TransactionId agg_xmin = InvalidTransactionId;
+
+ Assert(ReplicationSlotCtl != NULL);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ TransactionId effective_xmin;
+
+ if (!s->in_use)
+ continue;
+
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ effective_xmin = vslot->effective_xmin;
+ SpinLockRelease(&s->mutex);
+ }
+
+ /* check the data xmin */
+ if (TransactionIdIsValid(effective_xmin) &&
+ (!TransactionIdIsValid(agg_xmin) ||
+ TransactionIdPrecedes(effective_xmin, agg_xmin)))
+ agg_xmin = effective_xmin;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ ProcArraySetReplicationSlotXmin(agg_xmin);
+}
+
+/*
+ * Compute the oldest restart LSN across all slots and inform xlog module.
+ */
+void
+ReplicationSlotsComputeRequiredLSN(void)
+{
+ int i;
+ XLogRecPtr min_required = InvalidXLogRecPtr;
+
+ Assert(ReplicationSlotCtl != NULL);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ XLogRecPtr restart_lsn;
+
+ if (!s->in_use)
+ continue;
+
+ {
+ volatile ReplicationSlot *vslot = s;
+
+ SpinLockAcquire(&s->mutex);
+ restart_lsn = vslot->data.restart_lsn;
+ SpinLockRelease(&s->mutex);
+ }
+
+ if (restart_lsn != InvalidXLogRecPtr &&
+ (min_required == InvalidXLogRecPtr ||
+ restart_lsn < min_required))
+ min_required = restart_lsn;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ XLogSetReplicationSlotMinimumLSN(min_required);
+}
+
+/*
+ * Check whether the server's configuration supports using replication
+ * slots.
+ */
+void
+CheckSlotRequirements(void)
+{
+ if (max_replication_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ (errmsg("replication slots can only be used if max_replication_slots > 0"))));
+
+ if (wal_level < WAL_LEVEL_ARCHIVE)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slots can only be used if wal_level >= archive")));
+}
+
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+string_endswith(const char *str, const char *end)
+{
+ size_t slen = strlen(str);
+ size_t elen = strlen(end);
+
+ /* can't be a postfix if longer */
+ if (elen > slen)
+ return false;
+
+ /* compare the end of the strings */
+ str += slen - elen;
+ return strcmp(str, end) == 0;
+}
+
+/*
+ * Flush all replication slots to disk.
+ *
+ * This needn't actually be part of a checkpoint, but it's a convenient
+ * location.
+ */
+void
+CheckPointReplicationSlots(void)
+{
+ int i;
+
+ ereport(DEBUG1,
+ (errmsg("performing replication slot checkpoint")));
+
+ /*
+ * Prevent any slot from being created/dropped while we're active. As we
+ * explicitly do *not* want to block iterating over replication_slots or
+ * acquiring a slot we cannot take the control lock - but that's OK,
+ * because holding ReplicationSlotAllocationLock is strictly stronger,
+ * and enough to guarantee that nobody can change the in_use bits on us.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ char path[MAXPGPATH];
+
+ if (!s->in_use)
+ continue;
+
+ /* save the slot to disk, locking is handled in SaveSlotToPath() */
+ sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+ SaveSlotToPath(s, path, LOG);
+ }
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
+/*
+ * Load all replication slots from disk into memory at server startup. This
+ * needs to be run before we start crash recovery.
+ */
+void
+StartupReplicationSlots(XLogRecPtr checkPointRedo)
+{
+ DIR *replication_dir;
+ struct dirent *replication_de;
+
+ ereport(DEBUG1,
+ (errmsg("starting up replication slots")));
+
+ /* restore all slots by iterating over all on-disk entries */
+ replication_dir = AllocateDir("pg_replslot");
+ while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
+ {
+ struct stat statbuf;
+ char path[MAXPGPATH];
+
+ if (strcmp(replication_de->d_name, ".") == 0 ||
+ strcmp(replication_de->d_name, "..") == 0)
+ continue;
+
+ snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
+
+ /* we're only creating directories here, skip if it's not our's */
+ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+ continue;
+
+ /* we crashed while a slot was being setup or deleted, clean up */
+ if (string_endswith(replication_de->d_name, ".tmp"))
+ {
+ if (!rmtree(path, true))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", path)));
+ continue;
+ }
+ fsync_fname("pg_replslot", true);
+ continue;
+ }
+
+ /* looks like a slot in a normal state, restore */
+ RestoreSlotFromDisk(replication_de->d_name);
+ }
+ FreeDir(replication_dir);
+
+ /* currently no slots exist, we're done. */
+ if (max_replication_slots <= 0)
+ return;
+
+ /* Now that we have recovered all the data, compute replication xmin */
+ ReplicationSlotsComputeRequiredXmin();
+ ReplicationSlotsComputeRequiredLSN();
+}
+
+/* ----
+ * Manipulation of ondisk state of replication slots
+ *
+ * NB: none of the routines below should take any notice whether a slot is the
+ * current one or not, that's all handled a layer above.
+ * ----
+ */
+static void
+CreateSlotOnDisk(ReplicationSlot *slot)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ struct stat st;
+
+ /*
+ * No need to take out the io_in_progress_lock, nobody else can see this
+ * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which
+ * takes out the lock, if we'd take the lock here, we'd deadlock.
+ */
+
+ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
+
+ /*
+ * It's just barely possible that some previous effort to create or
+ * drop a slot with this name left a temp directory lying around.
+ * If that seems to be the case, try to remove it. If the rmtree()
+ * fails, we'll error out at the mkdir() below, so we don't bother
+ * checking success.
+ */
+ if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
+ rmtree(tmppath, true);
+
+ /* Create and fsync the temporary slot directory. */
+ if (mkdir(tmppath, S_IRWXU) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m",
+ tmppath)));
+ fsync_fname(tmppath, true);
+
+ /* Write the actual state file. */
+ slot->dirty = true; /* signal that we really need to write */
+ SaveSlotToPath(slot, tmppath, ERROR);
+
+ /* Rename the directory into place. */
+ if (rename(tmppath, path) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+
+ /*
+ * If we'd now fail - really unlikely - we wouldn't know wether this slot
+ * would persist after an OS crash or not - so, force a restart. The
+ * restart would try to fysnc this again till it works.
+ */
+ START_CRIT_SECTION();
+
+ fsync_fname(path, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Shared functionality between saving and creating a replication slot.
+ */
+static void
+SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ int fd;
+ ReplicationSlotOnDisk cp;
+ bool was_dirty;
+
+ /* first check whether there's something to write out */
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&vslot->mutex);
+ was_dirty = vslot->dirty;
+ vslot->just_dirtied = false;
+ SpinLockRelease(&vslot->mutex);
+ }
+
+ /* and don't do anything if there's nothing to write */
+ if (!was_dirty)
+ return;
+
+ LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
+
+ /* silence valgrind :( */
+ memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
+
+ sprintf(tmppath, "%s/state.tmp", dir);
+ sprintf(path, "%s/state", dir);
+
+ fd = OpenTransientFile(tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ tmppath)));
+ return;
+ }
+
+ cp.magic = SLOT_MAGIC;
+ INIT_CRC32(cp.checksum);
+ cp.version = 1;
+ cp.length = ReplicationSlotOnDiskDynamicSize;
+
+ SpinLockAcquire(&slot->mutex);
+
+ memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
+
+ SpinLockRelease(&slot->mutex);
+
+ COMP_CRC32(cp.checksum,
+ (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
+ ReplicationSlotOnDiskDynamicSize);
+
+ if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
+ {
+ int save_errno = errno;
+ CloseTransientFile(fd);
+ errno = save_errno;
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ return;
+ }
+
+ /* fsync the temporary file */
+ if (pg_fsync(fd) != 0)
+ {
+ int save_errno = errno;
+ CloseTransientFile(fd);
+ errno = save_errno;
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ tmppath)));
+ return;
+ }
+
+ CloseTransientFile(fd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ return;
+ }
+
+ /* Check CreateSlot() for the reasoning of using a crit. section. */
+ START_CRIT_SECTION();
+
+ fsync_fname(path, false);
+ fsync_fname((char *) dir, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+
+ /*
+ * Successfully wrote, unset dirty bit, unless somebody dirtied again
+ * already.
+ */
+ {
+ volatile ReplicationSlot *vslot = slot;
+
+ SpinLockAcquire(&vslot->mutex);
+ if (!vslot->just_dirtied)
+ vslot->dirty = false;
+ SpinLockRelease(&vslot->mutex);
+ }
+
+ LWLockRelease(slot->io_in_progress_lock);
+}
+
+/*
+ * Load a single slot from disk into memory.
+ */
+static void
+RestoreSlotFromDisk(const char *name)
+{
+ ReplicationSlotOnDisk cp;
+ int i;
+ char path[MAXPGPATH];
+ int fd;
+ bool restored = false;
+ int readBytes;
+ pg_crc32 checksum;
+
+ /* no need to lock here, no concurrent access allowed yet */
+
+ /* delete temp file if it exists */
+ sprintf(path, "pg_replslot/%s/state.tmp", name);
+ if (unlink(path) < 0 && errno != ENOENT)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not unlink file \"%s\": %m", path)));
+
+ sprintf(path, "pg_replslot/%s/state", name);
+
+ elog(DEBUG1, "restoring replication slot from \"%s\"", path);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * We do not need to handle this as we are rename()ing the directory into
+ * place only after we fsync()ed the state file.
+ */
+ if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ /*
+ * Sync state file before we're reading from it. We might have crashed
+ * while it wasn't synced yet and we shouldn't continue on that basis.
+ */
+ if (pg_fsync(fd) != 0)
+ {
+ CloseTransientFile(fd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ path)));
+ }
+
+ /* Also sync the parent directory */
+ START_CRIT_SECTION();
+ fsync_fname(path, true);
+ END_CRIT_SECTION();
+
+ /* read part of statefile that's guaranteed to be version independent */
+ readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+ if (readBytes != ReplicationSlotOnDiskConstantSize)
+ {
+ int saved_errno = errno;
+
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\", read %d of %u: %m",
+ path, readBytes,
+ (uint32) ReplicationSlotOnDiskConstantSize)));
+ }
+
+ /* verify magic */
+ if (cp.magic != SLOT_MAGIC)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
+ path, cp.magic, SLOT_MAGIC)));
+
+ /* verify version */
+ if (cp.version != SLOT_VERSION)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has unsupported version %u",
+ path, cp.version)));
+
+ /* boundary check on length */
+ if (cp.length != ReplicationSlotOnDiskDynamicSize)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has corrupted length %u",
+ path, cp.length)));
+
+ /* Now that we know the size, read the entire file */
+ readBytes = read(fd,
+ (char *)&cp + ReplicationSlotOnDiskConstantSize,
+ cp.length);
+ if (readBytes != cp.length)
+ {
+ int saved_errno = errno;
+
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\", read %d of %u: %m",
+ path, readBytes, cp.length)));
+ }
+
+ CloseTransientFile(fd);
+
+ /* now verify the CRC32 */
+ INIT_CRC32(checksum);
+ COMP_CRC32(checksum,
+ (char *)&cp + ReplicationSlotOnDiskConstantSize,
+ ReplicationSlotOnDiskDynamicSize);
+
+ if (!EQ_CRC32(checksum, cp.checksum))
+ ereport(PANIC,
+ (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
+ path, checksum, cp.checksum)));
+
+ /* nothing can be active yet, don't lock anything */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot;
+
+ slot = &ReplicationSlotCtl->replication_slots[i];
+
+ if (slot->in_use)
+ continue;
+
+ /* restore the entire set of persistent data */
+ memcpy(&slot->data, &cp.slotdata,
+ sizeof(ReplicationSlotPersistentData));
+
+ /* initialize in memory state */
+ slot->effective_xmin = cp.slotdata.xmin;
+ slot->in_use = true;
+ slot->active = false;
+
+ restored = true;
+ break;
+ }
+
+ if (!restored)
+ ereport(PANIC,
+ (errmsg("too many replication slots active before shutdown"),
+ errhint("Increase max_replication_slots and try again.")));
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
new file mode 100644
index 0000000000..98a860e528
--- /dev/null
+++ b/src/backend/replication/slotfuncs.c
@@ -0,0 +1,193 @@
+/*-------------------------------------------------------------------------
+ *
+ * slotfuncs.c
+ * Support functions for replication slots
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/slotfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "access/htup_details.h"
+#include "utils/builtins.h"
+#include "replication/slot.h"
+
+Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
+Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
+
+static void
+check_permissions(void)
+{
+ if (!superuser() && !has_rolreplication(GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser or replication role to use replication slots"))));
+}
+
+/*
+ * SQL function for creating a new physical (streaming replication)
+ * replication slot.
+ */
+Datum
+pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ Datum values[2];
+ bool nulls[2];
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum result;
+
+ check_permissions();
+
+ CheckSlotRequirements();
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /* acquire replication slot, this will check for conflicting names*/
+ ReplicationSlotCreate(NameStr(*name), false);
+
+ values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
+
+ nulls[0] = false;
+ nulls[1] = true;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_DATUM(result);
+}
+
+/*
+ * SQL function for dropping a replication slot.
+ */
+Datum
+pg_drop_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+
+ check_permissions();
+
+ CheckSlotRequirements();
+
+ ReplicationSlotDrop(NameStr(*name));
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * pg_get_replication_slots - SQL SRF showing active replication slots.
+ */
+Datum
+pg_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int slotno;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not " \
+ "allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /*
+ * We don't require any special permission to see this function's data
+ * because nothing should be sensitive. The most critical being the slot
+ * name, which shouldn't contain anything particularly sensitive.
+ */
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ for (slotno = 0; slotno < max_replication_slots; slotno++)
+ {
+ ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+ Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+ bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
+
+ TransactionId xmin;
+ XLogRecPtr restart_lsn;
+ bool active;
+ Oid database;
+ const char *slot_name;
+
+ char restart_lsn_s[MAXFNAMELEN];
+ int i;
+
+ SpinLockAcquire(&slot->mutex);
+ if (!slot->in_use)
+ {
+ SpinLockRelease(&slot->mutex);
+ continue;
+ }
+ else
+ {
+ xmin = slot->data.xmin;
+ database = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ slot_name = pstrdup(NameStr(slot->data.name));
+
+ active = slot->active;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ memset(nulls, 0, sizeof(nulls));
+
+ snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
+ (uint32) (restart_lsn >> 32), (uint32) restart_lsn);
+
+ i = 0;
+ values[i++] = CStringGetTextDatum(slot_name);
+ if (database == InvalidOid)
+ values[i++] = CStringGetTextDatum("physical");
+ else
+ values[i++] = CStringGetTextDatum("logical");
+ values[i++] = database;
+ values[i++] = BoolGetDatum(active);
+ if (xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(xmin);
+ else
+ nulls[i++] = true;
+ if (restart_lsn != InvalidTransactionId)
+ values[i++] = CStringGetTextDatum(restart_lsn_s);
+ else
+ nulls[i++] = true;
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 1fbd33ef61..cc3d775307 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -187,6 +187,7 @@ void
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
+ char slotname[NAMEDATALEN];
XLogRecPtr startpoint;
TimeLineID startpointTLI;
TimeLineID primaryTLI;
@@ -241,6 +242,7 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
startpoint = walrcv->receiveStart;
startpointTLI = walrcv->receiveStartTLI;
@@ -355,7 +357,8 @@ WalReceiverMain(void)
* on the new timeline.
*/
ThisTimeLineID = startpointTLI;
- if (walrcv_startstreaming(startpointTLI, startpoint))
+ if (walrcv_startstreaming(startpointTLI, startpoint,
+ slotname[0] != '\0' ? slotname : NULL))
{
bool endofwal = false;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index cc96d7c2f8..acadec57f5 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -219,11 +219,13 @@ ShutdownWalRcv(void)
/*
* Request postmaster to start walreceiver.
*
- * recptr indicates the position where streaming should begin, and conninfo
- * is a libpq connection string to use.
+ * recptr indicates the position where streaming should begin, conninfo
+ * is a libpq connection string to use, and slotname is, optionally, the name
+ * of a replication slot to acquire.
*/
void
-RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
+ const char *slotname)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
@@ -250,6 +252,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
else
walrcv->conninfo[0] = '\0';
+ if (slotname != NULL)
+ strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
+ else
+ walrcv->slotname[0] = '\0';
+
if (walrcv->walRcvState == WALRCV_STOPPED)
{
launch = true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 652487e3de..119a920af2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -53,6 +53,7 @@
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@@ -218,12 +219,17 @@ InitWalSender(void)
void
WalSndErrorCleanup()
{
+ LWLockReleaseAll();
+
if (sendFile >= 0)
{
close(sendFile);
sendFile = -1;
}
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
replication_active = false;
if (walsender_ready_to_stop)
proc_exit(0);
@@ -421,6 +427,15 @@ StartReplication(StartReplicationCmd *cmd)
* written at wal_level='minimal'.
*/
+ if (cmd->slotname)
+ {
+ ReplicationSlotAcquire(cmd->slotname);
+ if (MyReplicationSlot->data.database != InvalidOid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ (errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
+ }
+
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
@@ -565,6 +580,9 @@ StartReplication(StartReplicationCmd *cmd)
Assert(streamingDoneSending && streamingDoneReceiving);
}
+ if (cmd->slotname)
+ ReplicationSlotRelease();
+
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
@@ -622,6 +640,75 @@ StartReplication(StartReplicationCmd *cmd)
pq_puttextmessage('C', "START_STREAMING");
}
+/*
+ * Create a new replication slot.
+ */
+static void
+CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
+{
+ const char *slot_name;
+ StringInfoData buf;
+
+ Assert(!MyReplicationSlot);
+
+ /* setup state for XLogReadPage */
+ sendTimeLineIsHistoric = false;
+ sendTimeLine = ThisTimeLineID;
+
+ ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
+
+ initStringInfo(&output_message);
+
+ slot_name = NameStr(MyReplicationSlot->data.name);
+
+ /*
+ * It may seem somewhat pointless to send back the same slot name the
+ * client just requested and nothing else, but logical replication
+ * will add more fields here. (We could consider removing the slot
+ * name from what's sent back, though, since the client has specified
+ * that.)
+ */
+
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 1, 2); /* 1 field */
+
+ /* first field: slot name */
+ pq_sendstring(&buf, "slot_name"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 1, 2); /* # of columns */
+
+ /* slot_name */
+ pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+ pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+ pq_endmessage(&buf);
+
+ /*
+ * release active status again, START_REPLICATION will reacquire it
+ */
+ ReplicationSlotRelease();
+}
+
+/*
+ * Get rid of a replication slot that is no longer wanted.
+ */
+static void
+DropReplicationSlot(DropReplicationSlotCmd *cmd)
+{
+ ReplicationSlotDrop(cmd->slotname);
+ EndCommand("DROP_REPLICATION_SLOT", DestRemote);
+}
+
/*
* Execute an incoming replication command.
*/
@@ -660,14 +747,28 @@ exec_replication_command(const char *cmd_string)
IdentifySystem();
break;
- case T_StartReplicationCmd:
- StartReplication((StartReplicationCmd *) cmd_node);
- break;
-
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
+ case T_CreateReplicationSlotCmd:
+ CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ break;
+
+ case T_DropReplicationSlotCmd:
+ DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ break;
+
+ case T_StartReplicationCmd:
+ {
+ StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+ if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+ StartReplication(cmd);
+ else
+ elog(ERROR, "cannot handle changeset extraction yet");
+ break;
+ }
+
case T_TimeLineHistoryCmd:
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
@@ -830,6 +931,39 @@ ProcessStandbyMessage(void)
}
}
+/*
+ * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
+ */
+static void
+PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
+{
+ bool changed = false;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(lsn != InvalidXLogRecPtr);
+ SpinLockAcquire(&slot->mutex);
+ if (slot->data.restart_lsn != lsn)
+ {
+ changed = true;
+ slot->data.restart_lsn = lsn;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (changed)
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredLSN();
+ }
+
+ /*
+ * One could argue that the slot should saved to disk now, but that'd be
+ * energy wasted - the worst lost information can do here is give us wrong
+ * information in a statistics view - we'll just potentially be more
+ * conservative in removing files.
+ */
+}
+
/*
* Regular reply from standby advising of WAL positions on standby server.
*/
@@ -875,6 +1009,48 @@ ProcessStandbyReplyMessage(void)
if (!am_cascading_walsender)
SyncRepReleaseWaiters();
+
+ /*
+ * Advance our local xmin horizon when the client confirmed a flush.
+ */
+ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+ {
+ if (MyReplicationSlot->data.database != InvalidOid)
+ elog(ERROR, "cannot handle changeset extraction yet");
+ else
+ PhysicalConfirmReceivedLocation(flushPtr);
+ }
+}
+
+/* compute new replication slot xmin horizon if needed */
+static void
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+{
+ bool changed = false;
+ volatile ReplicationSlot *slot = MyReplicationSlot;
+
+ SpinLockAcquire(&slot->mutex);
+ MyPgXact->xmin = InvalidTransactionId;
+ /*
+ * For physical replication we don't need the the interlock provided
+ * by xmin and effective_xmin since the consequences of a missed increase
+ * are limited to query cancellations, so set both at once.
+ */
+ if (!TransactionIdIsNormal(slot->data.xmin) ||
+ !TransactionIdIsNormal(feedbackXmin) ||
+ TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
+ {
+ changed = true;
+ slot->data.xmin = feedbackXmin;
+ slot->effective_xmin = feedbackXmin;
+ }
+ SpinLockRelease(&slot->mutex);
+
+ if (changed)
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin();
+ }
}
/*
@@ -904,6 +1080,8 @@ ProcessStandbyHSFeedbackMessage(void)
if (!TransactionIdIsNormal(feedbackXmin))
{
MyPgXact->xmin = InvalidTransactionId;
+ if (MyReplicationSlot != NULL)
+ PhysicalReplicationSlotNewXmin(feedbackXmin);
return;
}
@@ -951,8 +1129,17 @@ ProcessStandbyHSFeedbackMessage(void)
* GetOldestXmin. (If we're moving our xmin forward, this is obviously
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
+ *
+ * If we're using a replication slot we reserve the xmin via that,
+ * otherwise via the walsender's PGXACT entry.
+
+ * XXX: It might make sense to introduce ephemeral slots and always use
+ * the slot mechanism.
*/
- MyPgXact->xmin = feedbackXmin;
+ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
+ PhysicalReplicationSlotNewXmin(feedbackXmin);
+ else
+ MyPgXact->xmin = feedbackXmin;
}
/* Main loop of walsender process that streams the WAL over Copy messages. */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2e717457b1..c392d4fa22 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -27,6 +27,7 @@
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
+ size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
ProcSignalShmemInit();
CheckpointerShmemInit();
AutoVacuumShmemInit();
+ ReplicationSlotsShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index b68c95612c..082115b4ff 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -82,6 +82,9 @@ typedef struct ProcArrayStruct
*/
TransactionId lastOverflowedXid;
+ /* oldest xmin of any replication slot */
+ TransactionId replication_slot_xmin;
+
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
* but actually it is maxProcs entries long.
@@ -228,6 +231,7 @@ CreateSharedProcArray(void)
*/
procArray->numProcs = 0;
procArray->maxProcs = PROCARRAY_MAXPROCS;
+ procArray->replication_slot_xmin = InvalidTransactionId;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
@@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
@@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
}
}
+ /* fetch into volatile var while ProcArrayLock is held */
+ replication_slot_xmin = procArray->replication_slot_xmin;
+
if (RecoveryInProgress())
{
/*
@@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
result = FirstNormalTransactionId;
}
+ /*
+ * Check whether there are replication slots requiring an older xmin.
+ */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, result))
+ result = replication_slot_xmin;
+
return result;
}
@@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
@@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot)
suboverflowed = true;
}
+
+ /* fetch into volatile var while ProcArrayLock is held */
+ replication_slot_xmin = procArray->replication_slot_xmin;
+
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
+
LWLockRelease(ProcArrayLock);
/*
@@ -1506,6 +1527,12 @@ GetSnapshotData(Snapshot snapshot)
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
+
+ /* Check whether there's a replication slot requiring an older xmin. */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = replication_slot_xmin;
+
RecentXmin = xmin;
snapshot->xmin = xmin;
@@ -2491,6 +2518,21 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
return true; /* timed out, still conflicts */
}
+/*
+ * ProcArraySetReplicationSlotXmin
+ *
+ * Install limits to future computations of the xmin horizon to prevent vacuum
+ * and HOT pruning from removing affected rows still needed by clients with
+ * replicaton slots.
+ */
+void
+ProcArraySetReplicationSlotXmin(TransactionId xmin)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ procArray->replication_slot_xmin = xmin;
+ LWLockRelease(ProcArrayLock);
+}
+
#define XidCacheRemove(i) \
do { \
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 55d9d7837c..82ef440949 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -27,6 +27,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "replication/slot.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
#include "storage/proc.h"
@@ -238,6 +239,9 @@ NumLWLocks(void)
/* predicate.c needs one per old serializable xid buffer */
numLocks += NUM_OLDSERXID_BUFFERS;
+ /* slot.c needs one for each slot */
+ numLocks += max_replication_slots;
+
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 9d32f9405d..fb449a8820 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -40,6 +40,7 @@
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -780,6 +781,10 @@ ProcKill(int code, Datum arg)
/* Make sure we're out of the sync rep lists */
SyncRepCleanupAtProcExit();
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
#ifdef USE_ASSERT_CHECKING
if (assert_enabled)
{
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a9b9794965..70d73d9898 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -57,6 +57,7 @@
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@@ -2123,6 +2124,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ /* see max_connections */
+ {"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING,
+ gettext_noop("Sets the maximum number of simultaneously defined replication slots."),
+ NULL
+ },
+ &max_replication_slots,
+ 0, 0, MAX_BACKENDS /* XXX?*/,
+ NULL, NULL, NULL
+ },
+
{
{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c8673b382d..d10e8a5783 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -226,6 +226,9 @@
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
+#max_replication_slots = 0 # max number of replication slots.
+ # (change requires restart)
+
# - Master Server -
# These settings are ignored on a standby server.
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 6b5302f6fd..a71320d945 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -195,6 +195,7 @@ const char *subdirs[] = {
"pg_multixact/offsets",
"base",
"base/1",
+ "pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp"
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 3c6ab9a902..8a702e3388 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -67,6 +67,7 @@ usage(void)
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_(" --slot replication slot to use\n"));
printf(_("\nReport bugs to .\n"));
}
@@ -343,6 +344,7 @@ main(int argc, char **argv)
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
+ {"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
@@ -409,6 +411,9 @@ main(int argc, char **argv)
exit(1);
}
break;
+ case 'S':
+ replication_slot = pg_strdup(optarg);
+ break;
case 'n':
noloop = 1;
break;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 2555904cd0..7d3c76c994 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,6 +31,8 @@
/* fd and filename for currently open WAL file */
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
+static bool reportFlushPosition = false;
+static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
@@ -133,7 +135,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
{
off_t currpos;
@@ -187,6 +189,7 @@ close_walfile(char *basedir, char *partial_suffix)
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
+ lastFlushPosition = pos;
return true;
}
@@ -421,7 +424,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
len += 1;
sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
- sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
+ if (reportFlushPosition)
+ sendint64(lastFlushPosition, &replybuf[len]); /* flush */
+ else
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
@@ -511,6 +517,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int standby_message_timeout, char *partial_suffix)
{
char query[128];
+ char slotcmd[128];
PGresult *res;
XLogRecPtr stoppos;
@@ -521,6 +528,29 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (!CheckServerVersionForStreaming(conn))
return false;
+ if (replication_slot != NULL)
+ {
+ /*
+ * Report the flush position, so the primary can know what WAL we'll
+ * possibly re-request, and remove older WAL safely.
+ *
+ * We only report it when a slot has explicitly been used, because
+ * reporting the flush position makes one elegible as a synchronous
+ * replica. People shouldn't include generic names in
+ * synchronous_standby_names, but we've protected them against it so
+ * far, so let's continue to do so in the situations when possible.
+ * If they've got a slot, though, we need to report the flush position,
+ * so that the master can remove WAL.
+ */
+ reportFlushPosition = true;
+ sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
+ }
+ else
+ {
+ reportFlushPosition = false;
+ slotcmd[0] = 0;
+ }
+
if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
@@ -560,6 +590,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
}
+ /*
+ * initialize flush position to starting point, it's the caller's
+ * responsibility that that's sane.
+ */
+ lastFlushPosition = startpos;
+
while (1)
{
/*
@@ -606,7 +642,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
return true;
/* Initiate the replication stream at specified location */
- snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+ slotcmd,
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
@@ -810,7 +847,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
goto error;
@@ -909,7 +946,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
goto error;
@@ -1074,7 +1111,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, blockpos))
/* Error message written in close_walfile() */
goto error;
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 96fbed898f..041076ff1d 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -22,6 +22,7 @@ char *connection_string = NULL;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
+char *replication_slot = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
static char *dbpassword = NULL;
PGconn *conn = NULL;
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 77d6b86ced..bb3c34db07 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -6,6 +6,7 @@ extern char *dbhost;
extern char *dbuser;
extern char *dbport;
extern int dbgetpassword;
+extern char *replication_slot;
/* Connection kept global so we can disconnect easily */
extern PGconn *conn;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 47e302276b..11ab277199 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -289,6 +289,7 @@ extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
+extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
int block_index,
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index c4661a8a66..ad4def37b9 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201401301
+#define CATALOG_VERSION_NO 201401311
#endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 1682fa9f99..d7bb21eccc 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4782,6 +4782,14 @@ DESCR("SP-GiST support for quad tree over range");
DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_ spg_range_quad_leaf_consistent _null_ _null_ _null_ ));
DESCR("SP-GiST support for quad tree over range");
+/* replication slots */
+DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,25,25}" "{i,o,o}" "{slotname,slotname,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DESCR("create a physical replication slot");
+DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
+DESCR("drop a replication slot");
+DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DESCR("information about replication slots currently in use");
+
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
DESCR("list objects dropped by the current command");
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index dfcc01344e..5b8df59bc6 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -412,6 +412,8 @@ typedef enum NodeTag
*/
T_IdentifySystemCmd,
T_BaseBackupCmd,
+ T_CreateReplicationSlotCmd,
+ T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 40971153b0..aac75fd102 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -17,6 +17,11 @@
#include "access/xlogdefs.h"
#include "nodes/pg_list.h"
+typedef enum ReplicationKind {
+ REPLICATION_KIND_PHYSICAL,
+ REPLICATION_KIND_LOGICAL
+} ReplicationKind;
+
/* ----------------------
* IDENTIFY_SYSTEM command
@@ -39,6 +44,30 @@ typedef struct BaseBackupCmd
} BaseBackupCmd;
+/* ----------------------
+ * CREATE_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct CreateReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+ ReplicationKind kind;
+ char *plugin;
+} CreateReplicationSlotCmd;
+
+
+/* ----------------------
+ * DROP_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct DropReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+} DropReplicationSlotCmd;
+
+
/* ----------------------
* START_REPLICATION command
* ----------------------
@@ -46,8 +75,11 @@ typedef struct BaseBackupCmd
typedef struct StartReplicationCmd
{
NodeTag type;
+ ReplicationKind kind;
+ char *slotname;
TimeLineID timeline;
XLogRecPtr startpoint;
+ List *options;
} StartReplicationCmd;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
new file mode 100644
index 0000000000..089b0f4b70
--- /dev/null
+++ b/src/include/replication/slot.h
@@ -0,0 +1,120 @@
+/*-------------------------------------------------------------------------
+ * slot.h
+ * Replication slot management.
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_H
+#define SLOT_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+typedef struct ReplicationSlotPersistentData
+{
+ /* The slot's identifier */
+ NameData name;
+
+ /* database the slot is active on */
+ Oid database;
+
+ /*
+ * xmin horizon for data
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId xmin;
+
+ /* oldest LSN that might be required by this replication slot */
+ XLogRecPtr restart_lsn;
+
+} ReplicationSlotPersistentData;
+
+/*
+ * Shared memory state of a single replication slot.
+ */
+typedef struct ReplicationSlot
+{
+ /* lock, on same cacheline as effective_xmin */
+ slock_t mutex;
+
+ /* is this slot defined */
+ bool in_use;
+
+ /* is somebody streaming out changes for this slot */
+ bool active;
+
+ /* any outstanding modifications? */
+ bool just_dirtied;
+ bool dirty;
+
+ /*
+ * For logical decoding, it's extremely important that we never remove any
+ * data that's still needed for decoding purposes, even after a crash;
+ * otherwise, decoding will produce wrong answers. Ordinary streaming
+ * replication also needs to prevent old row versions from being removed
+ * too soon, but the worst consequence we might encounter there is unwanted
+ * query cancellations on the standby. Thus, for logical decoding,
+ * this value represents the latest xmin that has actually been
+ * written to disk, whereas for streaming replication, it's just the
+ * same as the persistent value (data.xmin).
+ */
+ TransactionId effective_xmin;
+
+ /* data surviving shutdowns and crashes */
+ ReplicationSlotPersistentData data;
+
+ /* is somebody performing io on this slot? */
+ LWLock *io_in_progress_lock;
+} ReplicationSlot;
+
+/*
+ * Shared memory control area for all of replication slots.
+ */
+typedef struct ReplicationSlotCtlData
+{
+ ReplicationSlot replication_slots[1];
+} ReplicationSlotCtlData;
+
+/*
+ * Pointers to shared memory
+ */
+extern ReplicationSlotCtlData *ReplicationSlotCtl;
+extern ReplicationSlot *MyReplicationSlot;
+
+/* GUCs */
+extern PGDLLIMPORT int max_replication_slots;
+
+/* shmem initialization functions */
+extern Size ReplicationSlotsShmemSize(void);
+extern void ReplicationSlotsShmemInit(void);
+
+/* management of individual slots */
+extern void ReplicationSlotCreate(const char *name, bool db_specific);
+extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotSave(void);
+extern void ReplicationSlotMarkDirty(void);
+
+/* misc stuff */
+extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotsComputeRequiredXmin(void);
+extern void ReplicationSlotsComputeRequiredLSN(void);
+extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void CheckPointReplicationSlots(void);
+
+extern void CheckSlotRequirements(void);
+extern void ReplicationSlotAtProcExit(void);
+
+/* SQL callable functions */
+extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
+
+#endif /* SLOT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 3c656197cc..3d9401059b 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -103,6 +103,12 @@ typedef struct
*/
char conninfo[MAXCONNINFO];
+ /*
+ * replication slot name; is also used for walreceiver to connect with
+ * the primary
+ */
+ char slotname[NAMEDATALEN];
+
slock_t mutex; /* locks shared variables shown above */
/*
@@ -125,7 +131,7 @@ extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
-typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
+typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
@@ -149,7 +155,8 @@ extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvStreaming(void);
extern bool WalRcvRunning(void);
-extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
+extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
+ const char *conninfo, const char *slotname);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 4507926274..c8ff4ebfb8 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -125,7 +125,9 @@ extern LWLockPadded *MainLWLockArray;
#define BackgroundWorkerLock (&MainLWLockArray[33].lock)
#define DynamicSharedMemoryControlLock (&MainLWLockArray[34].lock)
#define AutoFileLock (&MainLWLockArray[35].lock)
-#define NUM_INDIVIDUAL_LWLOCKS 36
+#define ReplicationSlotAllocationLock (&MainLWLockArray[36].lock)
+#define ReplicationSlotControlLock (&MainLWLockArray[37].lock)
+#define NUM_INDIVIDUAL_LWLOCKS 38
/*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 2947cc4af9..d1a58a3661 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -77,4 +77,6 @@ extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, const TransactionId *xids,
TransactionId latestXid);
+extern void ProcArraySetReplicationSlotXmin(TransactionId xmin);
+
#endif /* PROCARRAY_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 540373dc48..220e18b0bb 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1367,6 +1367,15 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_slots| SELECT l.slot_name,
+ l.slot_type,
+ l.datoid,
+ d.datname AS database,
+ l.active,
+ l.xmin,
+ l.restart_lsn
+ FROM (pg_get_replication_slots() l(slot_name, slot_type, datoid, active, xmin, restart_lsn)
+ LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
pg_authid.rolinherit,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ad40735333..3b7f61ef20 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -343,6 +343,7 @@ CreateOpClassItem
CreateOpClassStmt
CreateOpFamilyStmt
CreatePLangStmt
+CreateReplicationSlotCmd
CreateRangeStmt
CreateRoleStmt
CreateSchemaStmt
@@ -416,6 +417,7 @@ DomainConstraintType
DomainIOData
DropBehavior
DropOwnedStmt
+DropReplicationSlotCmd
DropRoleStmt
DropStmt
DropTableSpaceStmt