Introduce replication slots.

Replication slots are a crash-safe data structure which can be created
on either a master or a standby to prevent premature removal of
write-ahead log segments needed by a standby, as well as (with
hot_standby_feedback=on) pruning of tuples whose removal would cause
replication conflicts.  Slots have some advantages over existing
techniques, as explained in the documentation.

In a few places, we refer to the type of replication slots introduced
by this patch as "physical" slots, because forthcoming patches for
logical decoding will also have slots, but with somewhat different
properties.

Andres Freund and Robert Haas
This commit is contained in:
Robert Haas 2014-01-31 22:45:17 -05:00
parent 5bdef38b89
commit 858ec11858
42 changed files with 2356 additions and 62 deletions

View File

@ -233,6 +233,11 @@
<entry>query rewrite rules</entry>
</row>
<row>
<entry><link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link></entry>
<entry>replication slot information</entry>
</row>
<row>
<entry><link linkend="catalog-pg-seclabel"><structname>pg_seclabel</structname></link></entry>
<entry>security labels on database objects</entry>
@ -5157,6 +5162,100 @@
</sect1>
<sect1 id="catalog-pg-replication-slots">
<title><structname>pg_replication_slots</structname></title>
<indexterm zone="catalog-pg-replication-slots">
<primary>pg_replication_slots</primary>
</indexterm>
<para>
The <structname>pg_replication_slots</structname> view provides a listing
of all replication slots that currently exist on the database cluster,
along with their current state.
</para>
<para>
For more on replication slots,
see <xref linkend="streaming-replication-slots">.
</para>
<table>
<title><structname>pg_replication_slots</structname> Columns</title>
<tgroup cols="4">
<thead>
<row>
<entry>Name</entry>
<entry>Type</entry>
<entry>References</entry>
<entry>Description</entry>
</row>
</thead>
<tbody>
<row>
<entry><structfield>slot_name</structfield></entry>
<entry><type>text</type></entry>
<entry></entry>
<entry>A unique, cluster-wide identifier for the replication slot</entry>
</row>
<row>
<entry><structfield>slot_type</structfield></entry>
<entry><type>text</type></entry>
<entry></entry>
<entry>The slot type - <literal>physical</> or <literal>logical</></entry>
</row>
<row>
<entry><structfield>datoid</structfield></entry>
<entry><type>oid</type></entry>
<entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.oid</literal></entry>
<entry>The oid of the database this slot is associated with, or
null. Only logical slots have an associated database.</entry>
</row>
<row>
<entry><structfield>database</structfield></entry>
<entry><type>text</type></entry>
<entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.datname</literal></entry>
<entry>The name of the database this slot is associated with, or
null. Only logical slots have an associated database.</entry>
</row>
<row>
<entry><structfield>active</structfield></entry>
<entry><type>boolean</type></entry>
<entry></entry>
<entry>True if this slot is currently actively being used</entry>
</row>
<row>
<entry><structfield>xmin</structfield></entry>
<entry><type>xid</type></entry>
<entry></entry>
<entry>The oldest transaction that this slot needs the database to
retain. <literal>VACUUM</literal> cannot remove tuples deleted
by any later transaction.
</entry>
</row>
<row>
<entry><structfield>restart_lsn</structfield></entry>
<entry><type>text</type></entry>
<entry></entry>
<entry>The address (<literal>LSN</literal>) of oldest WAL which still
might be required by the consumer of this slot and thus won't be
automatically removed during checkpoints.
</entry>
</row>
</tbody>
</tgroup>
</table>
</sect1>
<sect1 id="catalog-pg-seclabel">
<title><structname>pg_seclabel</structname></title>

View File

@ -2348,6 +2348,25 @@ include 'filename'
</listitem>
</varlistentry>
<varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
<term><varname>max_replication_slots</varname> (<type>integer</type>)</term>
<indexterm>
<primary><varname>max_replication_slots</> configuration parameter</primary>
</indexterm>
<listitem>
<para>
Specifies the maximum number of replication slots
(see <xref linkend="streaming-replication-slots"> that the server
can support. The default is zero. This parameter can only be set at
server start.
<varname>wal_level</varname> must be set
to <literal>archive</literal> or higher to allow replication slots to
be used. Setting it to a lower value than the number of currently
existing replication slots will prevent the server from starting.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-wal-keep-segments" xreflabel="wal_keep_segments">
<term><varname>wal_keep_segments</varname> (<type>integer</type>)</term>
<indexterm>

View File

@ -16290,6 +16290,76 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
</para>
</sect2>
<sect2 id="functions-replication">
<title>Replication Functions</title>
<para>
PostgreSQL exposes a number of functions for controlling and interacting
with replication features. See <xref linkend="streaming-replication">
and <xref linkend="streaming-replication-slots">.
</para>
<para>
Many of these functions have equivalent commands in the replication
protocol; see <xref linkend="protocol-replication">.
</para>
<para>
The sections <xref linkend="functions-snapshot-synchronization">, <xref
linkend="functions-recovery-control"> and <xref
linkend="functions-admin-backup"> are also relevant for replication.
</para>
<table id="functions-replication-table">
<title>Replication <acronym>SQL</acronym> Functions</title>
<tgroup cols="3">
<thead>
<row>
<entry>Function</entry>
<entry>Return Type</entry>
<entry>Description</entry>
</row>
</thead>
<tbody>
<row>
<entry>
<indexterm>
<primary>pg_create_physical_replication_slot</primary>
</indexterm>
<literal><function>pg_create_physical_replication_slot(<parameter>slotname</parameter> <type>text</type>, <parameter>plugin</parameter> <type>text</type>)</function></literal>
</entry>
<entry>
(<parameter>slotname</parameter> <type>text</type>, <parameter>xlog_position</parameter> <type>text</type>)
</entry>
<entry>
Creates a new physical replication slot named
<parameter>slotname</parameter>. Streaming changes from a physical slot
is only possible with the walsender protocol - see <xref
linkend="protocol-replication">. Corresponds to the walsender protocol
command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
</entry>
</row>
<row>
<entry>
<indexterm>
<primary>pg_drop_replication_slot</primary>
</indexterm>
<literal><function>pg_drop_replication_slot(<parameter>slotname</parameter> <type>text</type>)</function></literal>
</entry>
<entry>
(<parameter>slotname</parameter> <type>text</type>)
</entry>
<entry>
Drops the physical or logical replication slot
named <parameter>slotname</parameter>. Same as walsender protocol
command <literal>DROP_REPLICATION_SLOT</>.
</entry>
</row>
</tbody>
</tgroup>
</table>
</sect2>
<sect2 id="functions-admin-dbobject">
<title>Database Object Management Functions</title>

View File

@ -643,7 +643,9 @@ protocol to make nodes agree on a serializable transactional order.
entries in <filename>pg_hba.conf</> with the database field set to
<literal>replication</>. Also ensure <varname>max_wal_senders</> is set
to a sufficiently large value in the configuration file of the primary
server.
server. If replication slots will be used,
ensure that <varname>max_replication_slots</varname> is set sufficiently
high as well.
</para>
<para>
@ -750,13 +752,14 @@ archive_cleanup_command = 'pg_archivecleanup /path/to/archive %r'
<para>
If you use streaming replication without file-based continuous
archiving, you have to set <varname>wal_keep_segments</> in the master
to a value high enough to ensure that old WAL segments are not recycled
too early, while the standby might still need them to catch up. If the
standby falls behind too much, it needs to be reinitialized from a new
base backup. If you set up a WAL archive that's accessible from the
standby, <varname>wal_keep_segments</> is not required as the standby can always
use the archive to catch up.
archiving, the server might recycle old WAL segments before the standby
has received them. If this occurs, the standby will need to be
reinitialized from a new base backup. You can avoid this by setting
<varname>wal_keep_segments</> to a value large enough to ensure that
WAL segments are not recycled too early, or by configuration a replication
slot for the standby. If you set up a WAL archive that's accessible from
the standby, these solutions are not required, since the standby can
always use the archive to catch up provided it retains enough segments.
</para>
<para>
@ -871,6 +874,81 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
</sect3>
</sect2>
<sect2 id="streaming-replication-slots">
<title>Replication Slots</title>
<indexterm zone="high-availability">
<primary>Replication Slots</primary>
</indexterm>
<para>
Replication slots provide an automated way to ensure that the master does
not remove WAL segments until they have been received by all standbys,
and that the master does not remove rows which could cause a
<link linkend="hot-standby-conflict">recovery conflict</> even when the
standby is disconnected.
</para>
<para>
In lieu of using replication slots, it is possible to prevent the removal
of old WAL segments using <xref linkend="guc-wal-keep-segments">, or by
storing the segments in an archive using <xref linkend="restore-command">.
However, these methods often result in retaining more WAL segments than
required, whereas replication slots retain only the number of segments
known to be needed. An advantage of these methods is that they bound
the space requirement for <literal>pg_xlog</>; there is currently no way
to do this using replication slots.
</para>
<para>
Similarly, <varname>hot_standby_feedback</varname>
and <varname>vacuum_defer_cleanup_age</varname> provide protection against
relevant rows being removed by vacuum, but the former provides no
protection during any time period when the standby is not connected,
and the latter often needs to be set to a high value to provide adequate
protection. Replication slots overcome these disadvantages.
</para>
<sect3 id="streaming-replication-slots-manipulation">
<title>Querying and manipulating replication slots</title>
<para>
Each replication slot has a name, which can contain lower-case letters,
numbers, and the underscore character.
</para>
<para>
Existing replication slots and their state can be seen in the
<link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link>
view.
</para>
<para>
Slots can be created and dropped either via the streaming replication
protocol (see <xref linkend="protocol-replication">) or via SQL
functions (see <xref linkend="functions-replication">).
</para>
</sect3>
<sect3 id="streaming-replication-slots-config">
<title>Configuration Example</title>
<para>
You can create a replication slot like this:
<programlisting>
postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
slotname | xlog_position
-------------+---------------
node_a_slot |
postgres=# SELECT * FROM pg_replication_slots;
slot_name | slot_type | datoid | database | active | xmin | restart_lsn
-------------+-----------+--------+----------+--------+------+-------------
node_a_slot | physical | 0 | | f | |
(1 row)
</programlisting>
To configure the standby to use this slot, <varname>primary_slotname</>
should be configured in the standby's <filename>recovery.conf</>.
Here is a simple example:
<programlisting>
standby_mode = 'on'
primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
primary_slotname = 'node_a_slot'
</programlisting>
</para>
</sect3>
</sect2>
<sect2 id="cascading-replication">
<title>Cascading Replication</title>

View File

@ -1401,15 +1401,39 @@ The commands accepted in walsender mode are:
</varlistentry>
<varlistentry>
<term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term>
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</> <literal>PHYSICAL</literal></term>
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
<listitem>
<para>
Create a physical replication
slot. See <xref linkend="streaming-replication-slots"> for more about
replication slots.
</para>
<variablelist>
<varlistentry>
<term><replaceable class="parameter">slotname</></term>
<listitem>
<para>
The name of the slot to create. Must be a valid replication slot
name (see <xref linkend="streaming-replication-slots-manipulation">).
</para>
</listitem>
</varlistentry>
</variablelist>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>START_REPLICATION</literal> [<literal>SLOT</literal> <replaceable class="parameter">slotname</>] [<literal>PHYSICAL</literal>] <replaceable class="parameter">XXX/XXX</> <literal>TIMELINE</literal> <replaceable class="parameter">tli</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
WAL position <replaceable class="parameter">XXX/XXX</> on timeline
<replaceable class="parameter">tli</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyBothResponse message, and then starts to stream WAL to the frontend.
WAL position <replaceable class="parameter">XXX/XXX</>. If specified,
streaming starts on timeline <replaceable class="parameter">tli</>;
otherwise, the server's current timeline is selected. The server can
reply with an error, e.g. if the requested section of WAL has already
been recycled. On success, server responds with a CopyBothResponse
message, and then starts to stream WAL to the frontend.
</para>
<para>
@ -1443,6 +1467,14 @@ The commands accepted in walsender mode are:
client contains a message of one of the following formats:
</para>
<para>
If a slot's name is provided
via <replaceable class="parameter">slotname</>, it will be updated
as replication progresses so that the server knows which WAL segments -
and if <varname>hot_standby_feedback</> is on which transactions -
are still needed by the standby.
</para>
<para>
<variablelist>
<varlistentry>
@ -1719,6 +1751,26 @@ The commands accepted in walsender mode are:
</listitem>
</varlistentry>
<varlistentry>
<term><literal>DROP_REPLICATION_SLOT</literal> <replaceable class="parameter">slotname</></term>
<listitem>
<para>
Drops a replication slot, freeing any reserved server-side resources. If
the slot is currently in use by an active connection, this command fails.
</para>
<variablelist>
<varlistentry>
<term><replaceable class="parameter">slotname</></term>
<listitem>
<para>
The name of the slot to drop.
</para>
</listitem>
</varlistentry>
</variablelist>
</listitem>
</varlistentry>
<varlistentry>
<term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term>
<listitem>

View File

@ -418,6 +418,22 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
</para>
</listitem>
</varlistentry>
<varlistentry id="primary-slotname" xreflabel="primary_slotname">
<term><varname>primary_slotname</varname> (<type>string</type>)</term>
<indexterm>
<primary><varname>primary_slotname</> recovery parameter</primary>
</indexterm>
<listitem>
<para>
Optionally specifies an existing replication slot to be used when
connecting to the primary via streaming replication to control
resource removal on the upstream node
(see <xref linkend="streaming-replication-slots">).
This setting has no effect if <varname>primary_conninfo</> is not
set.
</para>
</listitem>
</varlistentry>
<varlistentry id="trigger-file" xreflabel="trigger_file">
<term><varname>trigger_file</varname> (<type>string</type>)</term>
<indexterm>

View File

@ -225,6 +225,24 @@ PostgreSQL documentation
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--slot</option></term>
<listitem>
<para>
Require <application>pg_receivexlog</application> to use an existing
replication slot (see <xref linkend="streaming-replication-slots">).
When this option is used, <application>pg_receivexlog</> will report
a flush position to the server, indicating when each segment has been
synchronized to disk so that the server can remove that segment if it
is not otherwise needed. When using this paramter, it is important
to make sure that <application>pg_receivexlog</> cannot become the
synchronous standby through an incautious setting of
<xref linkend="guc-synchronous-standby-names">; it does not flush
data frequently enough for this to work correctly.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>

View File

@ -39,6 +39,7 @@
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/barrier.h"
@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false;
static char *PrimaryConnInfo = NULL;
static char *PrimarySlotName = NULL;
static char *TriggerFile = NULL;
/* are we currently in standby mode? */
@ -485,6 +487,8 @@ typedef struct XLogCtlData
uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
* segment */
@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void);
static void CreateEndOfRecoveryRecord(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
@ -2908,6 +2913,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
SetLatch(ProcGlobal->walwriterLatch);
}
/*
* Record the LSN up to which we can remove WAL because it's not required by
* any replication slot.
*/
void
XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->replicationSlotMinLSN = lsn;
SpinLockRelease(&xlogctl->info_lck);
}
/*
* Return the oldest LSN we must retain to satisfy the needs of some
* replication slot.
*/
static XLogRecPtr
XLogGetReplicationSlotMinimumLSN(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr retval;
SpinLockAcquire(&xlogctl->info_lck);
retval = xlogctl->replicationSlotMinLSN;
SpinLockRelease(&xlogctl->info_lck);
return retval;
}
/*
* Advance minRecoveryPoint in control file.
*
@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void)
(errmsg_internal("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
else if (strcmp(item->name, "primary_slotname") == 0)
{
ReplicationSlotValidateName(item->value, ERROR);
PrimarySlotName = pstrdup(item->value);
ereport(DEBUG2,
(errmsg_internal("primary_slotname = '%s'",
PrimarySlotName)));
}
else if (strcmp(item->name, "trigger_file") == 0)
{
TriggerFile = pstrdup(item->value);
@ -6505,6 +6551,12 @@ StartupXLOG(void)
XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
XLogCtl->ckptXid = checkPoint.nextXid;
/*
* Initialize replication slots, before there's a chance to remove
* required resources.
*/
StartupReplicationSlots(checkPoint.redo);
/*
* Startup MultiXact. We need to do this early for two reasons: one
* is that we might try to access multixacts when we do tuple freezing,
@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointMultiXact();
CheckPointPredicate();
CheckPointRelationMap();
CheckPointReplicationSlots();
CheckPointBuffers(flags); /* performs all required fsyncs */
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags)
/*
* Retreat *logSegNo to the last segment that we need to retain because of
* wal_keep_segments. This is calculated by subtracting wal_keep_segments
* from the given xlog location, recptr.
* either wal_keep_segments or replication slots.
*
* This is calculated by subtracting wal_keep_segments from the given xlog
* location, recptr and by making sure that that result is below the
* requirement of replication slots.
*/
static void
KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
{
XLogSegNo segno;
if (wal_keep_segments == 0)
return;
XLogRecPtr keep;
XLByteToSeg(recptr, segno);
keep = XLogGetReplicationSlotMinimumLSN();
/* avoid underflow, don't go below 1 */
if (segno <= wal_keep_segments)
segno = 1;
else
segno = segno - wal_keep_segments;
/* compute limit for wal_keep_segments first */
if (wal_keep_segments > 0)
{
/* avoid underflow, don't go below 1 */
if (segno <= wal_keep_segments)
segno = 1;
else
segno = segno - wal_keep_segments;
}
/* then check whether slots limit removal further */
if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
{
XLogRecPtr slotSegNo;
XLByteToSeg(keep, slotSegNo);
if (slotSegNo <= 0)
segno = 1;
else if (slotSegNo < segno)
segno = slotSegNo;
}
/* don't delete WAL segments newer than the calculated segment */
if (segno < *logSegNo)
@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
tli, curFileTLI);
}
curFileTLI = tli;
RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
PrimarySlotName);
receivedUpto = 0;
}

View File

@ -613,6 +613,18 @@ CREATE VIEW pg_stat_replication AS
WHERE S.usesysid = U.oid AND
S.pid = W.pid;
CREATE VIEW pg_replication_slots AS
SELECT
L.slot_name,
L.slot_type,
L.datoid,
D.datname AS database,
L.active,
L.xmin,
L.restart_lsn
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
CREATE VIEW pg_stat_database AS
SELECT
D.oid AS datid,

View File

@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
repl_gram.o syncrep.o
repl_gram.o slot.o slotfuncs.o syncrep.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -47,8 +47,9 @@ to fetch more WAL (if streaming replication is configured).
Walreceiver is a postmaster subprocess, so the startup process can't fork it
directly. Instead, it sends a signal to postmaster, asking postmaster to launch
it. Before that, however, startup process fills in WalRcvData->conninfo,
and initializes the starting point in WalRcvData->receiveStart.
it. Before that, however, startup process fills in WalRcvData->conninfo
and WalRcvData->slotname, and initializes the starting point in
WalRcvData->receiveStart.
As walreceiver receives WAL from the master server, and writes and flushes
it to disk (in pg_xlog), it updates WalRcvData->receivedUpto and signals

View File

@ -847,6 +847,10 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
continue;
/* Skip pg_replslot, not useful to copy */
if (strcmp(de->d_name, "pg_replslot") == 0)
continue;
/*
* Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call

View File

@ -49,7 +49,8 @@ static char *recvBuf = NULL;
static void libpqrcv_connect(char *conninfo);
static void libpqrcv_identify_system(TimeLineID *primary_tli);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
@ -171,15 +172,20 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
* throws an ERROR.
*/
static bool
libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
{
char cmd[64];
PGresult *res;
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpoint >> 32), (uint32) startpoint,
tli);
if (slotname != NULL)
snprintf(cmd, sizeof(cmd),
"START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
(uint32) (startpoint >> 32), (uint32) startpoint, tli);
else
snprintf(cmd, sizeof(cmd),
"START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpoint >> 32), (uint32) startpoint, tli);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) == PGRES_COMMAND_OK)

View File

@ -65,7 +65,7 @@ Node *replication_parse_result;
}
/* Non-keyword tokens */
%token <str> SCONST
%token <str> SCONST IDENT
%token <uintval> UCONST
%token <recptr> RECPTR
@ -73,6 +73,8 @@ Node *replication_parse_result;
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
%token K_DROP_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_LABEL
%token K_PROGRESS
@ -80,12 +82,15 @@ Node *replication_parse_result;
%token K_NOWAIT
%token K_WAL
%token K_TIMELINE
%token K_PHYSICAL
%token K_SLOT
%type <node> command
%type <node> base_backup start_replication identify_system timeline_history
%type <node> base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
%type <list> base_backup_opt_list
%type <defelt> base_backup_opt
%type <uintval> opt_timeline
%type <str> opt_slot
%%
firstcmd: command opt_semicolon
@ -102,6 +107,8 @@ command:
identify_system
| base_backup
| start_replication
| create_replication_slot
| drop_replication_slot
| timeline_history
;
@ -158,18 +165,42 @@ base_backup_opt:
}
;
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
create_replication_slot:
K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
$$ = (Node *) cmd;
}
;
/* DROP_REPLICATION_SLOT SLOT slot */
drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT
{
DropReplicationSlotCmd *cmd;
cmd = makeNode(DropReplicationSlotCmd);
cmd->slotname = $2;
$$ = (Node *) cmd;
}
;
/*
* START_REPLICATION %X/%X [TIMELINE %d]
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/
start_replication:
K_START_REPLICATION RECPTR opt_timeline
K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
{
StartReplicationCmd *cmd;
cmd = makeNode(StartReplicationCmd);
cmd->startpoint = $2;
cmd->timeline = $3;
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->startpoint = $4;
cmd->timeline = $5;
$$ = (Node *) cmd;
}
;
@ -205,6 +236,15 @@ timeline_history:
$$ = (Node *) cmd;
}
;
opt_physical : K_PHYSICAL | /* EMPTY */;
opt_slot : K_SLOT IDENT
{
$$ = $2;
}
| /* nothing */ { $$ = NULL; }
%%
#include "repl_scanner.c"

View File

@ -16,6 +16,7 @@
#include "postgres.h"
#include "utils/builtins.h"
#include "parser/scansup.h"
/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
#undef fprintf
@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar);
%option warn
%option prefix="replication_yy"
%x xq
%x xq xd
/* Extended quote
* xqdouble implements embedded quote, ''''
@ -57,12 +58,26 @@ xqstart {quote}
xqdouble {quote}{quote}
xqinside [^']+
/* Double quote
* Allows embedded spaces and other special characters into identifiers.
*/
dquote \"
xdstart {dquote}
xdstop {dquote}
xddouble {dquote}{dquote}
xdinside [^"]+
digit [0-9]+
hexdigit [0-9A-Za-z]+
quote '
quotestop {quote}
ident_start [A-Za-z\200-\377_]
ident_cont [A-Za-z\200-\377_0-9\$]
identifier {ident_start}{ident_cont}*
%%
BASE_BACKUP { return K_BASE_BACKUP; }
@ -74,9 +89,16 @@ PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; }
SLOT { return K_SLOT; }
"," { return ','; }
";" { return ';'; }
"(" { return '('; }
")" { return ')'; }
[\n] ;
[\t] ;
@ -100,20 +122,49 @@ TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
BEGIN(xq);
startlit();
}
<xq>{quotestop} {
yyless(1);
BEGIN(INITIAL);
yylval.str = litbufdup();
return SCONST;
}
<xq>{xqdouble} {
<xq>{xqdouble} {
addlitchar('\'');
}
<xq>{xqinside} {
addlit(yytext, yyleng);
}
<xq><<EOF>> { yyerror("unterminated quoted string"); }
{xdstart} {
BEGIN(xd);
startlit();
}
<xd>{xdstop} {
int len;
yyless(1);
BEGIN(INITIAL);
yylval.str = litbufdup();
len = strlen(yylval.str);
truncate_identifier(yylval.str, len, true);
return IDENT;
}
<xd>{xdinside} {
addlit(yytext, yyleng);
}
{identifier} {
int len = strlen(yytext);
yylval.str = downcase_truncate_identifier(yytext, len, true);
return IDENT;
}
<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
<<EOF>> {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,193 @@
/*-------------------------------------------------------------------------
*
* slotfuncs.c
* Support functions for replication slots
*
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/slotfuncs.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "replication/slot.h"
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
static void
check_permissions(void)
{
if (!superuser() && !has_rolreplication(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser or replication role to use replication slots"))));
}
/*
* SQL function for creating a new physical (streaming replication)
* replication slot.
*/
Datum
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
Datum values[2];
bool nulls[2];
TupleDesc tupdesc;
HeapTuple tuple;
Datum result;
check_permissions();
CheckSlotRequirements();
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
/* acquire replication slot, this will check for conflicting names*/
ReplicationSlotCreate(NameStr(*name), false);
values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
nulls[0] = false;
nulls[1] = true;
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
ReplicationSlotRelease();
PG_RETURN_DATUM(result);
}
/*
* SQL function for dropping a replication slot.
*/
Datum
pg_drop_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
check_permissions();
CheckSlotRequirements();
ReplicationSlotDrop(NameStr(*name));
PG_RETURN_VOID();
}
/*
* pg_get_replication_slots - SQL SRF showing active replication slots.
*/
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
int slotno;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
/*
* We don't require any special permission to see this function's data
* because nothing should be sensitive. The most critical being the slot
* name, which shouldn't contain anything particularly sensitive.
*/
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS];
TransactionId xmin;
XLogRecPtr restart_lsn;
bool active;
Oid database;
const char *slot_name;
char restart_lsn_s[MAXFNAMELEN];
int i;
SpinLockAcquire(&slot->mutex);
if (!slot->in_use)
{
SpinLockRelease(&slot->mutex);
continue;
}
else
{
xmin = slot->data.xmin;
database = slot->data.database;
restart_lsn = slot->data.restart_lsn;
slot_name = pstrdup(NameStr(slot->data.name));
active = slot->active;
}
SpinLockRelease(&slot->mutex);
memset(nulls, 0, sizeof(nulls));
snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X",
(uint32) (restart_lsn >> 32), (uint32) restart_lsn);
i = 0;
values[i++] = CStringGetTextDatum(slot_name);
if (database == InvalidOid)
values[i++] = CStringGetTextDatum("physical");
else
values[i++] = CStringGetTextDatum("logical");
values[i++] = database;
values[i++] = BoolGetDatum(active);
if (xmin != InvalidTransactionId)
values[i++] = TransactionIdGetDatum(xmin);
else
nulls[i++] = true;
if (restart_lsn != InvalidTransactionId)
values[i++] = CStringGetTextDatum(restart_lsn_s);
else
nulls[i++] = true;
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
tuplestore_donestoring(tupstore);
return (Datum) 0;
}

View File

@ -187,6 +187,7 @@ void
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
char slotname[NAMEDATALEN];
XLogRecPtr startpoint;
TimeLineID startpointTLI;
TimeLineID primaryTLI;
@ -241,6 +242,7 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
startpoint = walrcv->receiveStart;
startpointTLI = walrcv->receiveStartTLI;
@ -355,7 +357,8 @@ WalReceiverMain(void)
* on the new timeline.
*/
ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(startpointTLI, startpoint))
if (walrcv_startstreaming(startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
bool endofwal = false;

View File

@ -219,11 +219,13 @@ ShutdownWalRcv(void)
/*
* Request postmaster to start walreceiver.
*
* recptr indicates the position where streaming should begin, and conninfo
* is a libpq connection string to use.
* recptr indicates the position where streaming should begin, conninfo
* is a libpq connection string to use, and slotname is, optionally, the name
* of a replication slot to acquire.
*/
void
RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
const char *slotname)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
@ -250,6 +252,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
else
walrcv->conninfo[0] = '\0';
if (slotname != NULL)
strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
else
walrcv->slotname[0] = '\0';
if (walrcv->walRcvState == WALRCV_STOPPED)
{
launch = true;

View File

@ -53,6 +53,7 @@
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@ -218,12 +219,17 @@ InitWalSender(void)
void
WalSndErrorCleanup()
{
LWLockReleaseAll();
if (sendFile >= 0)
{
close(sendFile);
sendFile = -1;
}
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
replication_active = false;
if (walsender_ready_to_stop)
proc_exit(0);
@ -421,6 +427,15 @@ StartReplication(StartReplicationCmd *cmd)
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname);
if (MyReplicationSlot->data.database != InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
@ -565,6 +580,9 @@ StartReplication(StartReplicationCmd *cmd)
Assert(streamingDoneSending && streamingDoneReceiving);
}
if (cmd->slotname)
ReplicationSlotRelease();
/*
* Copy is finished now. Send a single-row result set indicating the next
* timeline.
@ -622,6 +640,75 @@ StartReplication(StartReplicationCmd *cmd)
pq_puttextmessage('C', "START_STREAMING");
}
/*
* Create a new replication slot.
*/
static void
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
const char *slot_name;
StringInfoData buf;
Assert(!MyReplicationSlot);
/* setup state for XLogReadPage */
sendTimeLineIsHistoric = false;
sendTimeLine = ThisTimeLineID;
ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
initStringInfo(&output_message);
slot_name = NameStr(MyReplicationSlot->data.name);
/*
* It may seem somewhat pointless to send back the same slot name the
* client just requested and nothing else, but logical replication
* will add more fields here. (We could consider removing the slot
* name from what's sent back, though, since the client has specified
* that.)
*/
pq_beginmessage(&buf, 'T');
pq_sendint(&buf, 1, 2); /* 1 field */
/* first field: slot name */
pq_sendstring(&buf, "slot_name"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2); /* typlen */
pq_sendint(&buf, 0, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
pq_endmessage(&buf);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 1, 2); /* # of columns */
/* slot_name */
pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
pq_sendbytes(&buf, slot_name, strlen(slot_name));
pq_endmessage(&buf);
/*
* release active status again, START_REPLICATION will reacquire it
*/
ReplicationSlotRelease();
}
/*
* Get rid of a replication slot that is no longer wanted.
*/
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
ReplicationSlotDrop(cmd->slotname);
EndCommand("DROP_REPLICATION_SLOT", DestRemote);
}
/*
* Execute an incoming replication command.
*/
@ -660,14 +747,28 @@ exec_replication_command(const char *cmd_string)
IdentifySystem();
break;
case T_StartReplicationCmd:
StartReplication((StartReplicationCmd *) cmd_node);
break;
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
case T_CreateReplicationSlotCmd:
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
break;
case T_DropReplicationSlotCmd:
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
elog(ERROR, "cannot handle changeset extraction yet");
break;
}
case T_TimeLineHistoryCmd:
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
@ -830,6 +931,39 @@ ProcessStandbyMessage(void)
}
}
/*
* Remember that a walreceiver just confirmed receipt of lsn `lsn`.
*/
static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool changed = false;
/* use volatile pointer to prevent code rearrangement */
volatile ReplicationSlot *slot = MyReplicationSlot;
Assert(lsn != InvalidXLogRecPtr);
SpinLockAcquire(&slot->mutex);
if (slot->data.restart_lsn != lsn)
{
changed = true;
slot->data.restart_lsn = lsn;
}
SpinLockRelease(&slot->mutex);
if (changed)
{
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredLSN();
}
/*
* One could argue that the slot should saved to disk now, but that'd be
* energy wasted - the worst lost information can do here is give us wrong
* information in a statistics view - we'll just potentially be more
* conservative in removing files.
*/
}
/*
* Regular reply from standby advising of WAL positions on standby server.
*/
@ -875,6 +1009,48 @@ ProcessStandbyReplyMessage(void)
if (!am_cascading_walsender)
SyncRepReleaseWaiters();
/*
* Advance our local xmin horizon when the client confirmed a flush.
*/
if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
{
if (MyReplicationSlot->data.database != InvalidOid)
elog(ERROR, "cannot handle changeset extraction yet");
else
PhysicalConfirmReceivedLocation(flushPtr);
}
}
/* compute new replication slot xmin horizon if needed */
static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
{
bool changed = false;
volatile ReplicationSlot *slot = MyReplicationSlot;
SpinLockAcquire(&slot->mutex);
MyPgXact->xmin = InvalidTransactionId;
/*
* For physical replication we don't need the the interlock provided
* by xmin and effective_xmin since the consequences of a missed increase
* are limited to query cancellations, so set both at once.
*/
if (!TransactionIdIsNormal(slot->data.xmin) ||
!TransactionIdIsNormal(feedbackXmin) ||
TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
{
changed = true;
slot->data.xmin = feedbackXmin;
slot->effective_xmin = feedbackXmin;
}
SpinLockRelease(&slot->mutex);
if (changed)
{
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredXmin();
}
}
/*
@ -904,6 +1080,8 @@ ProcessStandbyHSFeedbackMessage(void)
if (!TransactionIdIsNormal(feedbackXmin))
{
MyPgXact->xmin = InvalidTransactionId;
if (MyReplicationSlot != NULL)
PhysicalReplicationSlotNewXmin(feedbackXmin);
return;
}
@ -951,8 +1129,17 @@ ProcessStandbyHSFeedbackMessage(void)
* GetOldestXmin. (If we're moving our xmin forward, this is obviously
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
*
* If we're using a replication slot we reserve the xmin via that,
* otherwise via the walsender's PGXACT entry.
* XXX: It might make sense to introduce ephemeral slots and always use
* the slot mechanism.
*/
MyPgXact->xmin = feedbackXmin;
if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
PhysicalReplicationSlotNewXmin(feedbackXmin);
else
MyPgXact->xmin = feedbackXmin;
}
/* Main loop of walsender process that streams the WAL over Copy messages. */

View File

@ -27,6 +27,7 @@
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
ProcSignalShmemInit();
CheckpointerShmemInit();
AutoVacuumShmemInit();
ReplicationSlotsShmemInit();
WalSndShmemInit();
WalRcvShmemInit();

View File

@ -82,6 +82,9 @@ typedef struct ProcArrayStruct
*/
TransactionId lastOverflowedXid;
/* oldest xmin of any replication slot */
TransactionId replication_slot_xmin;
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
* but actually it is maxProcs entries long.
@ -228,6 +231,7 @@ CreateSharedProcArray(void)
*/
procArray->numProcs = 0;
procArray->maxProcs = PROCARRAY_MAXPROCS;
procArray->replication_slot_xmin = InvalidTransactionId;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
}
}
/* fetch into volatile var while ProcArrayLock is held */
replication_slot_xmin = procArray->replication_slot_xmin;
if (RecoveryInProgress())
{
/*
@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
result = FirstNormalTransactionId;
}
/*
* Check whether there are replication slots requiring an older xmin.
*/
if (TransactionIdIsValid(replication_slot_xmin) &&
NormalTransactionIdPrecedes(replication_slot_xmin, result))
result = replication_slot_xmin;
return result;
}
@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot)
suboverflowed = true;
}
/* fetch into volatile var while ProcArrayLock is held */
replication_slot_xmin = procArray->replication_slot_xmin;
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
LWLockRelease(ProcArrayLock);
/*
@ -1506,6 +1527,12 @@ GetSnapshotData(Snapshot snapshot)
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
/* Check whether there's a replication slot requiring an older xmin. */
if (TransactionIdIsValid(replication_slot_xmin) &&
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
RecentGlobalXmin = replication_slot_xmin;
RecentXmin = xmin;
snapshot->xmin = xmin;
@ -2491,6 +2518,21 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
return true; /* timed out, still conflicts */
}
/*
* ProcArraySetReplicationSlotXmin
*
* Install limits to future computations of the xmin horizon to prevent vacuum
* and HOT pruning from removing affected rows still needed by clients with
* replicaton slots.
*/
void
ProcArraySetReplicationSlotXmin(TransactionId xmin)
{
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
procArray->replication_slot_xmin = xmin;
LWLockRelease(ProcArrayLock);
}
#define XidCacheRemove(i) \
do { \

View File

@ -27,6 +27,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "replication/slot.h"
#include "storage/ipc.h"
#include "storage/predicate.h"
#include "storage/proc.h"
@ -238,6 +239,9 @@ NumLWLocks(void)
/* predicate.c needs one per old serializable xid buffer */
numLocks += NUM_OLDSERXID_BUFFERS;
/* slot.c needs one for each slot */
numLocks += max_replication_slots;
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if

View File

@ -40,6 +40,7 @@
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@ -780,6 +781,10 @@ ProcKill(int code, Datum arg)
/* Make sure we're out of the sync rep lists */
SyncRepCleanupAtProcExit();
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
#ifdef USE_ASSERT_CHECKING
if (assert_enabled)
{

View File

@ -57,6 +57,7 @@
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@ -2123,6 +2124,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
{
/* see max_connections */
{"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING,
gettext_noop("Sets the maximum number of simultaneously defined replication slots."),
NULL
},
&max_replication_slots,
0, 0, MAX_BACKENDS /* XXX?*/,
NULL, NULL, NULL
},
{
{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for WAL replication."),

View File

@ -226,6 +226,9 @@
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
#max_replication_slots = 0 # max number of replication slots.
# (change requires restart)
# - Master Server -
# These settings are ignored on a standby server.

View File

@ -195,6 +195,7 @@ const char *subdirs[] = {
"pg_multixact/offsets",
"base",
"base/1",
"pg_replslot",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp"

View File

@ -67,6 +67,7 @@ usage(void)
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
printf(_(" --slot replication slot to use\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
@ -343,6 +344,7 @@ main(int argc, char **argv)
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
{"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
@ -409,6 +411,9 @@ main(int argc, char **argv)
exit(1);
}
break;
case 'S':
replication_slot = pg_strdup(optarg);
break;
case 'n':
noloop = 1;
break;

View File

@ -31,6 +31,8 @@
/* fd and filename for currently open WAL file */
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
@ -133,7 +135,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
close_walfile(char *basedir, char *partial_suffix)
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
{
off_t currpos;
@ -187,6 +189,7 @@ close_walfile(char *basedir, char *partial_suffix)
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos;
return true;
}
@ -421,7 +424,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
len += 1;
sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
if (reportFlushPosition)
sendint64(lastFlushPosition, &replybuf[len]); /* flush */
else
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
@ -511,6 +517,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int standby_message_timeout, char *partial_suffix)
{
char query[128];
char slotcmd[128];
PGresult *res;
XLogRecPtr stoppos;
@ -521,6 +528,29 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (!CheckServerVersionForStreaming(conn))
return false;
if (replication_slot != NULL)
{
/*
* Report the flush position, so the primary can know what WAL we'll
* possibly re-request, and remove older WAL safely.
*
* We only report it when a slot has explicitly been used, because
* reporting the flush position makes one elegible as a synchronous
* replica. People shouldn't include generic names in
* synchronous_standby_names, but we've protected them against it so
* far, so let's continue to do so in the situations when possible.
* If they've got a slot, though, we need to report the flush position,
* so that the master can remove WAL.
*/
reportFlushPosition = true;
sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
}
else
{
reportFlushPosition = false;
slotcmd[0] = 0;
}
if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
@ -560,6 +590,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
}
/*
* initialize flush position to starting point, it's the caller's
* responsibility that that's sane.
*/
lastFlushPosition = startpos;
while (1)
{
/*
@ -606,7 +642,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
@ -810,7 +847,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
if (!close_walfile(basedir, partial_suffix))
if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
goto error;
@ -909,7 +946,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending)
{
if (!close_walfile(basedir, partial_suffix))
if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
goto error;
@ -1074,7 +1111,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
if (!close_walfile(basedir, partial_suffix))
if (!close_walfile(basedir, partial_suffix, blockpos))
/* Error message written in close_walfile() */
goto error;

View File

@ -22,6 +22,7 @@ char *connection_string = NULL;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
char *replication_slot = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
static char *dbpassword = NULL;
PGconn *conn = NULL;

View File

@ -6,6 +6,7 @@ extern char *dbhost;
extern char *dbuser;
extern char *dbport;
extern int dbgetpassword;
extern char *replication_slot;
/* Connection kept global so we can disconnect easily */
extern PGconn *conn;

View File

@ -289,6 +289,7 @@ extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
int block_index,

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201401301
#define CATALOG_VERSION_NO 201401311
#endif

View File

@ -4782,6 +4782,14 @@ DESCR("SP-GiST support for quad tree over range");
DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_ spg_range_quad_leaf_consistent _null_ _null_ _null_ ));
DESCR("SP-GiST support for quad tree over range");
/* replication slots */
DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,25,25}" "{i,o,o}" "{slotname,slotname,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
DESCR("list objects dropped by the current command");

View File

@ -412,6 +412,8 @@ typedef enum NodeTag
*/
T_IdentifySystemCmd,
T_BaseBackupCmd,
T_CreateReplicationSlotCmd,
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,

View File

@ -17,6 +17,11 @@
#include "access/xlogdefs.h"
#include "nodes/pg_list.h"
typedef enum ReplicationKind {
REPLICATION_KIND_PHYSICAL,
REPLICATION_KIND_LOGICAL
} ReplicationKind;
/* ----------------------
* IDENTIFY_SYSTEM command
@ -39,6 +44,30 @@ typedef struct BaseBackupCmd
} BaseBackupCmd;
/* ----------------------
* CREATE_REPLICATION_SLOT command
* ----------------------
*/
typedef struct CreateReplicationSlotCmd
{
NodeTag type;
char *slotname;
ReplicationKind kind;
char *plugin;
} CreateReplicationSlotCmd;
/* ----------------------
* DROP_REPLICATION_SLOT command
* ----------------------
*/
typedef struct DropReplicationSlotCmd
{
NodeTag type;
char *slotname;
} DropReplicationSlotCmd;
/* ----------------------
* START_REPLICATION command
* ----------------------
@ -46,8 +75,11 @@ typedef struct BaseBackupCmd
typedef struct StartReplicationCmd
{
NodeTag type;
ReplicationKind kind;
char *slotname;
TimeLineID timeline;
XLogRecPtr startpoint;
List *options;
} StartReplicationCmd;

View File

@ -0,0 +1,120 @@
/*-------------------------------------------------------------------------
* slot.h
* Replication slot management.
*
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef SLOT_H
#define SLOT_H
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
typedef struct ReplicationSlotPersistentData
{
/* The slot's identifier */
NameData name;
/* database the slot is active on */
Oid database;
/*
* xmin horizon for data
*
* NB: This may represent a value that hasn't been written to disk yet;
* see notes for effective_xmin, below.
*/
TransactionId xmin;
/* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn;
} ReplicationSlotPersistentData;
/*
* Shared memory state of a single replication slot.
*/
typedef struct ReplicationSlot
{
/* lock, on same cacheline as effective_xmin */
slock_t mutex;
/* is this slot defined */
bool in_use;
/* is somebody streaming out changes for this slot */
bool active;
/* any outstanding modifications? */
bool just_dirtied;
bool dirty;
/*
* For logical decoding, it's extremely important that we never remove any
* data that's still needed for decoding purposes, even after a crash;
* otherwise, decoding will produce wrong answers. Ordinary streaming
* replication also needs to prevent old row versions from being removed
* too soon, but the worst consequence we might encounter there is unwanted
* query cancellations on the standby. Thus, for logical decoding,
* this value represents the latest xmin that has actually been
* written to disk, whereas for streaming replication, it's just the
* same as the persistent value (data.xmin).
*/
TransactionId effective_xmin;
/* data surviving shutdowns and crashes */
ReplicationSlotPersistentData data;
/* is somebody performing io on this slot? */
LWLock *io_in_progress_lock;
} ReplicationSlot;
/*
* Shared memory control area for all of replication slots.
*/
typedef struct ReplicationSlotCtlData
{
ReplicationSlot replication_slots[1];
} ReplicationSlotCtlData;
/*
* Pointers to shared memory
*/
extern ReplicationSlotCtlData *ReplicationSlotCtl;
extern ReplicationSlot *MyReplicationSlot;
/* GUCs */
extern PGDLLIMPORT int max_replication_slots;
/* shmem initialization functions */
extern Size ReplicationSlotsShmemSize(void);
extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific);
extern void ReplicationSlotDrop(const char *name);
extern void ReplicationSlotAcquire(const char *name);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotSave(void);
extern void ReplicationSlotMarkDirty(void);
/* misc stuff */
extern bool ReplicationSlotValidateName(const char *name, int elevel);
extern void ReplicationSlotsComputeRequiredXmin(void);
extern void ReplicationSlotsComputeRequiredLSN(void);
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
extern void ReplicationSlotAtProcExit(void);
/* SQL callable functions */
extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
#endif /* SLOT_H */

View File

@ -103,6 +103,12 @@ typedef struct
*/
char conninfo[MAXCONNINFO];
/*
* replication slot name; is also used for walreceiver to connect with
* the primary
*/
char slotname[NAMEDATALEN];
slock_t mutex; /* locks shared variables shown above */
/*
@ -125,7 +131,7 @@ extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
@ -149,7 +155,8 @@ extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvStreaming(void);
extern bool WalRcvRunning(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
const char *conninfo, const char *slotname);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);

View File

@ -125,7 +125,9 @@ extern LWLockPadded *MainLWLockArray;
#define BackgroundWorkerLock (&MainLWLockArray[33].lock)
#define DynamicSharedMemoryControlLock (&MainLWLockArray[34].lock)
#define AutoFileLock (&MainLWLockArray[35].lock)
#define NUM_INDIVIDUAL_LWLOCKS 36
#define ReplicationSlotAllocationLock (&MainLWLockArray[36].lock)
#define ReplicationSlotControlLock (&MainLWLockArray[37].lock)
#define NUM_INDIVIDUAL_LWLOCKS 38
/*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS

View File

@ -77,4 +77,6 @@ extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, const TransactionId *xids,
TransactionId latestXid);
extern void ProcArraySetReplicationSlotXmin(TransactionId xmin);
#endif /* PROCARRAY_H */

View File

@ -1367,6 +1367,15 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
pg_replication_slots| SELECT l.slot_name,
l.slot_type,
l.datoid,
d.datname AS database,
l.active,
l.xmin,
l.restart_lsn
FROM (pg_get_replication_slots() l(slot_name, slot_type, datoid, active, xmin, restart_lsn)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
pg_authid.rolinherit,

View File

@ -343,6 +343,7 @@ CreateOpClassItem
CreateOpClassStmt
CreateOpFamilyStmt
CreatePLangStmt
CreateReplicationSlotCmd
CreateRangeStmt
CreateRoleStmt
CreateSchemaStmt
@ -416,6 +417,7 @@ DomainConstraintType
DomainIOData
DropBehavior
DropOwnedStmt
DropReplicationSlotCmd
DropRoleStmt
DropStmt
DropTableSpaceStmt