From 858ec11858a914d4c380971985709b6d6b7dd6fc Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 31 Jan 2014 22:45:17 -0500 Subject: [PATCH] Introduce replication slots. Replication slots are a crash-safe data structure which can be created on either a master or a standby to prevent premature removal of write-ahead log segments needed by a standby, as well as (with hot_standby_feedback=on) pruning of tuples whose removal would cause replication conflicts. Slots have some advantages over existing techniques, as explained in the documentation. In a few places, we refer to the type of replication slots introduced by this patch as "physical" slots, because forthcoming patches for logical decoding will also have slots, but with somewhat different properties. Andres Freund and Robert Haas --- doc/src/sgml/catalogs.sgml | 99 ++ doc/src/sgml/config.sgml | 19 + doc/src/sgml/func.sgml | 70 ++ doc/src/sgml/high-availability.sgml | 94 +- doc/src/sgml/protocol.sgml | 64 +- doc/src/sgml/recovery-config.sgml | 16 + doc/src/sgml/ref/pg_receivexlog.sgml | 18 + src/backend/access/transam/xlog.c | 95 +- src/backend/catalog/system_views.sql | 12 + src/backend/replication/Makefile | 2 +- src/backend/replication/README | 5 +- src/backend/replication/basebackup.c | 4 + .../libpqwalreceiver/libpqwalreceiver.c | 16 +- src/backend/replication/repl_gram.y | 54 +- src/backend/replication/repl_scanner.l | 57 +- src/backend/replication/slot.c | 1066 +++++++++++++++++ src/backend/replication/slotfuncs.c | 193 +++ src/backend/replication/walreceiver.c | 5 +- src/backend/replication/walreceiverfuncs.c | 13 +- src/backend/replication/walsender.c | 197 ++- src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 42 + src/backend/storage/lmgr/lwlock.c | 4 + src/backend/storage/lmgr/proc.c | 5 + src/backend/utils/misc/guc.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 3 + src/bin/initdb/initdb.c | 1 + src/bin/pg_basebackup/pg_receivexlog.c | 5 + src/bin/pg_basebackup/receivelog.c | 49 +- src/bin/pg_basebackup/streamutil.c | 1 + src/bin/pg_basebackup/streamutil.h | 1 + src/include/access/xlog.h | 1 + src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.h | 8 + src/include/nodes/nodes.h | 2 + src/include/nodes/replnodes.h | 32 + src/include/replication/slot.h | 120 ++ src/include/replication/walreceiver.h | 11 +- src/include/storage/lwlock.h | 4 +- src/include/storage/procarray.h | 2 + src/test/regress/expected/rules.out | 9 + src/tools/pgindent/typedefs.list | 2 + 42 files changed, 2356 insertions(+), 62 deletions(-) create mode 100644 src/backend/replication/slot.c create mode 100644 src/backend/replication/slotfuncs.c create mode 100644 src/include/replication/slot.h diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 3f8d9bfafb..dca24fc070 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -233,6 +233,11 @@ query rewrite rules + + pg_replication_slots + replication slot information + + pg_seclabel security labels on database objects @@ -5157,6 +5162,100 @@ + + <structname>pg_replication_slots</structname> + + + pg_replication_slots + + + + The pg_replication_slots view provides a listing + of all replication slots that currently exist on the database cluster, + along with their current state. + + + + For more on replication slots, + see . + + + + + <structname>pg_replication_slots</structname> Columns + + + + + Name + Type + References + Description + + + + + + slot_name + text + + A unique, cluster-wide identifier for the replication slot + + + + slot_type + text + + The slot type - physical or logical + + + + datoid + oid + pg_database.oid + The oid of the database this slot is associated with, or + null. Only logical slots have an associated database. + + + + database + text + pg_database.datname + The name of the database this slot is associated with, or + null. Only logical slots have an associated database. + + + + active + boolean + + True if this slot is currently actively being used + + + + xmin + xid + + The oldest transaction that this slot needs the database to + retain. VACUUM cannot remove tuples deleted + by any later transaction. + + + + + restart_lsn + text + + The address (LSN) of oldest WAL which still + might be required by the consumer of this slot and thus won't be + automatically removed during checkpoints. + + + + +
+
+ <structname>pg_seclabel</structname> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1b5f831d65..000a46fabb 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2348,6 +2348,25 @@ include 'filename' + + max_replication_slots (integer) + + max_replication_slots configuration parameter + + + + Specifies the maximum number of replication slots + (see that the server + can support. The default is zero. This parameter can only be set at + server start. + wal_level must be set + to archive or higher to allow replication slots to + be used. Setting it to a lower value than the number of currently + existing replication slots will prevent the server from starting. + + + + wal_keep_segments (integer) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 252539f93b..8cc65b94d1 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -16290,6 +16290,76 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); + + Replication Functions + + + PostgreSQL exposes a number of functions for controlling and interacting + with replication features. See + and . + + + + Many of these functions have equivalent commands in the replication + protocol; see . + + + + The sections , and are also relevant for replication. + + + + Replication <acronym>SQL</acronym> Functions + + + + Function + Return Type + Description + + + + + + + pg_create_physical_replication_slot + + pg_create_physical_replication_slot(slotname text, plugin text) + + + (slotname text, xlog_position text) + + + Creates a new physical replication slot named + slotname. Streaming changes from a physical slot + is only possible with the walsender protocol - see . Corresponds to the walsender protocol + command CREATE_REPLICATION_SLOT ... PHYSICAL. + + + + + + pg_drop_replication_slot + + pg_drop_replication_slot(slotname text) + + + (slotname text) + + + Drops the physical or logical replication slot + named slotname. Same as walsender protocol + command DROP_REPLICATION_SLOT. + + + + +
+
+ Database Object Management Functions diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index e2e5ac93ab..9d43586fe2 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -643,7 +643,9 @@ protocol to make nodes agree on a serializable transactional order. entries in pg_hba.conf with the database field set to replication. Also ensure max_wal_senders is set to a sufficiently large value in the configuration file of the primary - server. + server. If replication slots will be used, + ensure that max_replication_slots is set sufficiently + high as well. @@ -750,13 +752,14 @@ archive_cleanup_command = 'pg_archivecleanup /path/to/archive %r' If you use streaming replication without file-based continuous - archiving, you have to set wal_keep_segments in the master - to a value high enough to ensure that old WAL segments are not recycled - too early, while the standby might still need them to catch up. If the - standby falls behind too much, it needs to be reinitialized from a new - base backup. If you set up a WAL archive that's accessible from the - standby, wal_keep_segments is not required as the standby can always - use the archive to catch up. + archiving, the server might recycle old WAL segments before the standby + has received them. If this occurs, the standby will need to be + reinitialized from a new base backup. You can avoid this by setting + wal_keep_segments to a value large enough to ensure that + WAL segments are not recycled too early, or by configuration a replication + slot for the standby. If you set up a WAL archive that's accessible from + the standby, these solutions are not required, since the standby can + always use the archive to catch up provided it retains enough segments. @@ -871,6 +874,81 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' + + Replication Slots + + Replication Slots + + + Replication slots provide an automated way to ensure that the master does + not remove WAL segments until they have been received by all standbys, + and that the master does not remove rows which could cause a + recovery conflict even when the + standby is disconnected. + + + In lieu of using replication slots, it is possible to prevent the removal + of old WAL segments using , or by + storing the segments in an archive using . + However, these methods often result in retaining more WAL segments than + required, whereas replication slots retain only the number of segments + known to be needed. An advantage of these methods is that they bound + the space requirement for pg_xlog; there is currently no way + to do this using replication slots. + + + Similarly, hot_standby_feedback + and vacuum_defer_cleanup_age provide protection against + relevant rows being removed by vacuum, but the former provides no + protection during any time period when the standby is not connected, + and the latter often needs to be set to a high value to provide adequate + protection. Replication slots overcome these disadvantages. + + + Querying and manipulating replication slots + + Each replication slot has a name, which can contain lower-case letters, + numbers, and the underscore character. + + + Existing replication slots and their state can be seen in the + pg_replication_slots + view. + + + Slots can be created and dropped either via the streaming replication + protocol (see ) or via SQL + functions (see ). + + + + Configuration Example + + You can create a replication slot like this: + +postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot'); + slotname | xlog_position +-------------+--------------- + node_a_slot | + +postgres=# SELECT * FROM pg_replication_slots; + slot_name | slot_type | datoid | database | active | xmin | restart_lsn +-------------+-----------+--------+----------+--------+------+------------- + node_a_slot | physical | 0 | | f | | +(1 row) + + To configure the standby to use this slot, primary_slotname + should be configured in the standby's recovery.conf. + Here is a simple example: + +standby_mode = 'on' +primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' +primary_slotname = 'node_a_slot' + + + + + Cascading Replication diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 7d99976a49..832524e95e 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1401,15 +1401,39 @@ The commands accepted in walsender mode are:
- START_REPLICATION XXX/XXX TIMELINE tli + CREATE_REPLICATION_SLOT slotname PHYSICAL + CREATE_REPLICATION_SLOT + + + Create a physical replication + slot. See for more about + replication slots. + + + + slotname + + + The name of the slot to create. Must be a valid replication slot + name (see ). + + + + + + + + + START_REPLICATION [SLOT slotname] [PHYSICAL] XXX/XXX TIMELINE tli Instructs server to start streaming WAL, starting at - WAL position XXX/XXX on timeline - tli. - The server can reply with an error, e.g. if the requested section of WAL - has already been recycled. On success, server responds with a - CopyBothResponse message, and then starts to stream WAL to the frontend. + WAL position XXX/XXX. If specified, + streaming starts on timeline tli; + otherwise, the server's current timeline is selected. The server can + reply with an error, e.g. if the requested section of WAL has already + been recycled. On success, server responds with a CopyBothResponse + message, and then starts to stream WAL to the frontend. @@ -1443,6 +1467,14 @@ The commands accepted in walsender mode are: client contains a message of one of the following formats: + + If a slot's name is provided + via slotname, it will be updated + as replication progresses so that the server knows which WAL segments - + and if hot_standby_feedback is on which transactions - + are still needed by the standby. + + @@ -1719,6 +1751,26 @@ The commands accepted in walsender mode are: + + DROP_REPLICATION_SLOT slotname + + + Drops a replication slot, freeing any reserved server-side resources. If + the slot is currently in use by an active connection, this command fails. + + + + slotname + + + The name of the slot to drop. + + + + + + + BASE_BACKUP [LABEL 'label'] [PROGRESS] [FAST] [WAL] [NOWAIT] diff --git a/doc/src/sgml/recovery-config.sgml b/doc/src/sgml/recovery-config.sgml index 4a97bb7a9c..b69ce287c8 100644 --- a/doc/src/sgml/recovery-config.sgml +++ b/doc/src/sgml/recovery-config.sgml @@ -418,6 +418,22 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + primary_slotname (string) + + primary_slotname recovery parameter + + + + Optionally specifies an existing replication slot to be used when + connecting to the primary via streaming replication to control + resource removal on the upstream node + (see ). + This setting has no effect if primary_conninfo is not + set. + + + trigger_file (string) diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml index 19bebb62f7..2a44af46c5 100644 --- a/doc/src/sgml/ref/pg_receivexlog.sgml +++ b/doc/src/sgml/ref/pg_receivexlog.sgml @@ -225,6 +225,24 @@ PostgreSQL documentation + + + + + + Require pg_receivexlog to use an existing + replication slot (see ). + When this option is used, pg_receivexlog will report + a flush position to the server, indicating when each segment has been + synchronized to disk so that the server can remove that segment if it + is not otherwise needed. When using this paramter, it is important + to make sure that pg_receivexlog cannot become the + synchronous standby through an incautious setting of + ; it does not flush + data frequently enough for this to work correctly. + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b333d820c7..7f63185b1c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,6 +39,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/barrier.h" @@ -225,6 +226,7 @@ static TimestampTz recoveryDelayUntilTime; /* options taken from recovery.conf for XLOG streaming */ static bool StandbyModeRequested = false; static char *PrimaryConnInfo = NULL; +static char *PrimarySlotName = NULL; static char *TriggerFile = NULL; /* are we currently in standby mode? */ @@ -485,6 +487,8 @@ typedef struct XLogCtlData uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ + XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ + XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG * segment */ @@ -748,6 +752,7 @@ static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); @@ -2908,6 +2913,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) SetLatch(ProcGlobal->walwriterLatch); } +/* + * Record the LSN up to which we can remove WAL because it's not required by + * any replication slot. + */ +void +XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->replicationSlotMinLSN = lsn; + SpinLockRelease(&xlogctl->info_lck); +} + + +/* + * Return the oldest LSN we must retain to satisfy the needs of some + * replication slot. + */ +static XLogRecPtr +XLogGetReplicationSlotMinimumLSN(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr retval; + SpinLockAcquire(&xlogctl->info_lck); + retval = xlogctl->replicationSlotMinLSN; + SpinLockRelease(&xlogctl->info_lck); + + return retval; +} + /* * Advance minRecoveryPoint in control file. * @@ -5478,6 +5516,14 @@ readRecoveryCommandFile(void) (errmsg_internal("primary_conninfo = '%s'", PrimaryConnInfo))); } + else if (strcmp(item->name, "primary_slotname") == 0) + { + ReplicationSlotValidateName(item->value, ERROR); + PrimarySlotName = pstrdup(item->value); + ereport(DEBUG2, + (errmsg_internal("primary_slotname = '%s'", + PrimarySlotName))); + } else if (strcmp(item->name, "trigger_file") == 0) { TriggerFile = pstrdup(item->value); @@ -6505,6 +6551,12 @@ StartupXLOG(void) XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXid = checkPoint.nextXid; + /* + * Initialize replication slots, before there's a chance to remove + * required resources. + */ + StartupReplicationSlots(checkPoint.redo); + /* * Startup MultiXact. We need to do this early for two reasons: one * is that we might try to access multixacts when we do tuple freezing, @@ -8620,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointMultiXact(); CheckPointPredicate(); CheckPointRelationMap(); + CheckPointReplicationSlots(); CheckPointBuffers(flags); /* performs all required fsyncs */ /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); @@ -8938,24 +8991,43 @@ CreateRestartPoint(int flags) /* * Retreat *logSegNo to the last segment that we need to retain because of - * wal_keep_segments. This is calculated by subtracting wal_keep_segments - * from the given xlog location, recptr. + * either wal_keep_segments or replication slots. + * + * This is calculated by subtracting wal_keep_segments from the given xlog + * location, recptr and by making sure that that result is below the + * requirement of replication slots. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { XLogSegNo segno; - - if (wal_keep_segments == 0) - return; + XLogRecPtr keep; XLByteToSeg(recptr, segno); + keep = XLogGetReplicationSlotMinimumLSN(); - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; + /* compute limit for wal_keep_segments first */ + if (wal_keep_segments > 0) + { + /* avoid underflow, don't go below 1 */ + if (segno <= wal_keep_segments) + segno = 1; + else + segno = segno - wal_keep_segments; + } + + /* then check whether slots limit removal further */ + if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + { + XLogRecPtr slotSegNo; + + XLByteToSeg(keep, slotSegNo); + + if (slotSegNo <= 0) + segno = 1; + else if (slotSegNo < segno) + segno = slotSegNo; + } /* don't delete WAL segments newer than the calculated segment */ if (segno < *logSegNo) @@ -11026,7 +11098,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo, + PrimarySlotName); receivedUpto = 0; } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 277af61f9d..f02efeca97 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -613,6 +613,18 @@ CREATE VIEW pg_stat_replication AS WHERE S.usesysid = U.oid AND S.pid = W.pid; +CREATE VIEW pg_replication_slots AS + SELECT + L.slot_name, + L.slot_type, + L.datoid, + D.datname AS database, + L.active, + L.xmin, + L.restart_lsn + FROM pg_get_replication_slots() AS L + LEFT JOIN pg_database D ON (L.datoid = D.oid); + CREATE VIEW pg_stat_database AS SELECT D.oid AS datid, diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 2dde0118a4..7941cb8d5e 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o syncrep.o + repl_gram.o slot.o slotfuncs.o syncrep.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README index 60120ede29..2f5df49de6 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -47,8 +47,9 @@ to fetch more WAL (if streaming replication is configured). Walreceiver is a postmaster subprocess, so the startup process can't fork it directly. Instead, it sends a signal to postmaster, asking postmaster to launch -it. Before that, however, startup process fills in WalRcvData->conninfo, -and initializes the starting point in WalRcvData->receiveStart. +it. Before that, however, startup process fills in WalRcvData->conninfo +and WalRcvData->slotname, and initializes the starting point in +WalRcvData->receiveStart. As walreceiver receives WAL from the master server, and writes and flushes it to disk (in pg_xlog), it updates WalRcvData->receivedUpto and signals diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 7d0ed9ce4c..781f678097 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -847,6 +847,10 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces) if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0) continue; + /* Skip pg_replslot, not useful to copy */ + if (strcmp(de->d_name, "pg_replslot") == 0) + continue; + /* * Check if the postmaster has signaled us to exit, and abort with an * error in that case. The error handler further up will call diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2e057b8969..ecec8b3456 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -49,7 +49,8 @@ static char *recvBuf = NULL; static void libpqrcv_connect(char *conninfo); static void libpqrcv_identify_system(TimeLineID *primary_tli); static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); -static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint); +static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, + char *slotname); static void libpqrcv_endstreaming(TimeLineID *next_tli); static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); @@ -171,15 +172,20 @@ libpqrcv_identify_system(TimeLineID *primary_tli) * throws an ERROR. */ static bool -libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint) +libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname) { char cmd[64]; PGresult *res; /* Start streaming from the point requested by startup process */ - snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u", - (uint32) (startpoint >> 32), (uint32) startpoint, - tli); + if (slotname != NULL) + snprintf(cmd, sizeof(cmd), + "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname, + (uint32) (startpoint >> 32), (uint32) startpoint, tli); + else + snprintf(cmd, sizeof(cmd), + "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpoint >> 32), (uint32) startpoint, tli); res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) == PGRES_COMMAND_OK) diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 015aa44d89..d4bd59bab2 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -65,7 +65,7 @@ Node *replication_parse_result; } /* Non-keyword tokens */ -%token SCONST +%token SCONST IDENT %token UCONST %token RECPTR @@ -73,6 +73,8 @@ Node *replication_parse_result; %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM %token K_START_REPLICATION +%token K_CREATE_REPLICATION_SLOT +%token K_DROP_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_LABEL %token K_PROGRESS @@ -80,12 +82,15 @@ Node *replication_parse_result; %token K_NOWAIT %token K_WAL %token K_TIMELINE +%token K_PHYSICAL +%token K_SLOT %type command -%type base_backup start_replication identify_system timeline_history +%type base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history %type base_backup_opt_list %type base_backup_opt %type opt_timeline +%type opt_slot %% firstcmd: command opt_semicolon @@ -102,6 +107,8 @@ command: identify_system | base_backup | start_replication + | create_replication_slot + | drop_replication_slot | timeline_history ; @@ -158,18 +165,42 @@ base_backup_opt: } ; +/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */ +create_replication_slot: + K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL + { + CreateReplicationSlotCmd *cmd; + cmd = makeNode(CreateReplicationSlotCmd); + cmd->kind = REPLICATION_KIND_PHYSICAL; + cmd->slotname = $2; + $$ = (Node *) cmd; + } + ; + +/* DROP_REPLICATION_SLOT SLOT slot */ +drop_replication_slot: + K_DROP_REPLICATION_SLOT IDENT + { + DropReplicationSlotCmd *cmd; + cmd = makeNode(DropReplicationSlotCmd); + cmd->slotname = $2; + $$ = (Node *) cmd; + } + ; + /* - * START_REPLICATION %X/%X [TIMELINE %d] + * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ start_replication: - K_START_REPLICATION RECPTR opt_timeline + K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline { StartReplicationCmd *cmd; cmd = makeNode(StartReplicationCmd); - cmd->startpoint = $2; - cmd->timeline = $3; - + cmd->kind = REPLICATION_KIND_PHYSICAL; + cmd->slotname = $2; + cmd->startpoint = $4; + cmd->timeline = $5; $$ = (Node *) cmd; } ; @@ -205,6 +236,15 @@ timeline_history: $$ = (Node *) cmd; } ; + +opt_physical : K_PHYSICAL | /* EMPTY */; + + +opt_slot : K_SLOT IDENT + { + $$ = $2; + } + | /* nothing */ { $$ = NULL; } %% #include "repl_scanner.c" diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 01e5ac6efb..24195a5971 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -16,6 +16,7 @@ #include "postgres.h" #include "utils/builtins.h" +#include "parser/scansup.h" /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ #undef fprintf @@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar); %option warn %option prefix="replication_yy" -%x xq +%x xq xd /* Extended quote * xqdouble implements embedded quote, '''' @@ -57,12 +58,26 @@ xqstart {quote} xqdouble {quote}{quote} xqinside [^']+ +/* Double quote + * Allows embedded spaces and other special characters into identifiers. + */ +dquote \" +xdstart {dquote} +xdstop {dquote} +xddouble {dquote}{dquote} +xdinside [^"]+ + digit [0-9]+ hexdigit [0-9A-Za-z]+ quote ' quotestop {quote} +ident_start [A-Za-z\200-\377_] +ident_cont [A-Za-z\200-\377_0-9\$] + +identifier {ident_start}{ident_cont}* + %% BASE_BACKUP { return K_BASE_BACKUP; } @@ -74,9 +89,16 @@ PROGRESS { return K_PROGRESS; } WAL { return K_WAL; } TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } +CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } +DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } +PHYSICAL { return K_PHYSICAL; } +SLOT { return K_SLOT; } + "," { return ','; } ";" { return ';'; } +"(" { return '('; } +")" { return ')'; } [\n] ; [\t] ; @@ -100,20 +122,49 @@ TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } BEGIN(xq); startlit(); } + {quotestop} { yyless(1); BEGIN(INITIAL); yylval.str = litbufdup(); return SCONST; } -{xqdouble} { + +{xqdouble} { addlitchar('\''); } + {xqinside} { addlit(yytext, yyleng); } -<> { yyerror("unterminated quoted string"); } +{xdstart} { + BEGIN(xd); + startlit(); + } + +{xdstop} { + int len; + yyless(1); + BEGIN(INITIAL); + yylval.str = litbufdup(); + len = strlen(yylval.str); + truncate_identifier(yylval.str, len, true); + return IDENT; + } + +{xdinside} { + addlit(yytext, yyleng); + } + +{identifier} { + int len = strlen(yytext); + + yylval.str = downcase_truncate_identifier(yytext, len, true); + return IDENT; + } + +<> { yyerror("unterminated quoted string"); } <> { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c new file mode 100644 index 0000000000..30aff5f5e3 --- /dev/null +++ b/src/backend/replication/slot.c @@ -0,0 +1,1066 @@ +/*------------------------------------------------------------------------- + * + * slot.c + * Replication slot management. + * + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/slot.c + * + * NOTES + * + * Replication slots are used to keep state about replication streams + * originating from this cluster. Their primary purpose is to prevent the + * premature removal of WAL or of old tuple versions in a manner that would + * interfere with replication; they also useful for monitoring purposes. + * Slots need to be permanent (to allow restarts), crash-safe, and allocatable + * on standbys (to support cascading setups). The requirement that slots be + * usable on standbys precludes storing them in the system catalogs. + * + * Each replication slot gets its own directory inside the $PGDATA/pg_replslot + * directory. Inside that directory the state file will contain the slot's + * own data. Additional data can be stored alongside that file if required. + * While the server is running, the state data is also cached in memory for + * efficiency. + * + * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate + * or free a slot. ReplicationSlotControlLock must be taken in shared mode + * to iterate over the slots, and in exclusive mode to change the in_use flag + * of a slot. The remaining data in each slot is protected by its mutex. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include + +#include "access/transam.h" +#include "miscadmin.h" +#include "replication/slot.h" +#include "storage/fd.h" +#include "storage/procarray.h" + +/* + * Replication slot on-disk data structure. + */ +typedef struct ReplicationSlotOnDisk +{ + /* first part of this struct needs to be version independent */ + + /* data not covered by checksum */ + uint32 magic; + pg_crc32 checksum; + + /* data covered by checksum */ + uint32 version; + uint32 length; + + ReplicationSlotPersistentData slotdata; +} ReplicationSlotOnDisk; + +/* size of the part of the slot that is version independent */ +#define ReplicationSlotOnDiskConstantSize \ + offsetof(ReplicationSlotOnDisk, slotdata) +/* size of the slots that is not version indepenent */ +#define ReplicationSlotOnDiskDynamicSize \ + sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize + +#define SLOT_MAGIC 0x1051CA1 /* format identifier */ +#define SLOT_VERSION 1 /* version for new files */ + +/* Control array for replication slot management */ +ReplicationSlotCtlData *ReplicationSlotCtl = NULL; + +/* My backend's replication slot in the shared memory array */ +ReplicationSlot *MyReplicationSlot = NULL; + +/* GUCs */ +int max_replication_slots = 0; /* the maximum number of replication slots */ + +/* internal persistency functions */ +static void RestoreSlotFromDisk(const char *name); +static void CreateSlotOnDisk(ReplicationSlot *slot); +static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel); + +/* + * Report shared-memory space needed by ReplicationSlotShmemInit. + */ +Size +ReplicationSlotsShmemSize(void) +{ + Size size = 0; + + if (max_replication_slots == 0) + return size; + + size = offsetof(ReplicationSlotCtlData, replication_slots); + size = add_size(size, + mul_size(max_replication_slots, sizeof(ReplicationSlot))); + + return size; +} + +/* + * Allocate and initialize walsender-related shared memory. + */ +void +ReplicationSlotsShmemInit(void) +{ + bool found; + + if (max_replication_slots == 0) + return; + + ReplicationSlotCtl = (ReplicationSlotCtlData *) + ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(), + &found); + + if (!found) + { + int i; + + /* First time through, so initialize */ + MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize()); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; + + /* everything else is zeroed by the memset above */ + SpinLockInit(&slot->mutex); + slot->io_in_progress_lock = LWLockAssign(); + } + } +} + +/* + * Check whether the passed slot name is valid and report errors at elevel. + * + * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow + * the name to be uses as a directory name on every supported OS. + * + * Returns whether the directory name is valid or not if elevel < ERROR. + */ +bool +ReplicationSlotValidateName(const char *name, int elevel) +{ + const char *cp; + + if (strlen(name) == 0) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_NAME), + errmsg("replication slot name \"%s\" is too short", + name))); + return false; + } + + if (strlen(name) >= NAMEDATALEN) + { + ereport(elevel, + (errcode(ERRCODE_NAME_TOO_LONG), + errmsg("replication slot name \"%s\" is too long", + name))); + return false; + } + + for (cp = name; *cp; cp++) + { + if (!((*cp >= 'a' && *cp <= 'z') + || (*cp >= '0' && *cp <= '9') + || (*cp == '_'))) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_NAME), + errmsg("replication slot name \"%s\" contains invalid character", + name), + errhint("Replication slot names may only contain letters, numbers and the underscore character."))); + return false; + } + } + return true; +} + +/* + * Create a new replication slot and mark it as used by this backend. + * + * name: Name of the slot + * db_specific: changeset extraction is db specific, if the slot is going to + * be used for that pass true, otherwise false. + */ +void +ReplicationSlotCreate(const char *name, bool db_specific) +{ + ReplicationSlot *slot = NULL; + int i; + + Assert(MyReplicationSlot == NULL); + + ReplicationSlotValidateName(name, ERROR); + + /* + * If some other backend ran this code currently with us, we'd likely + * both allocate the same slot, and that would be bad. We'd also be + * at risk of missing a name collision. Also, we don't want to try to + * create a new slot while somebody's busy cleaning up an old one, because + * we might both be monkeying with the same directory. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + + /* + * Check for name collision, and identify an allocatable slot. We need + * to hold ReplicationSlotControlLock in shared mode for this, so that + * nobody else can change the in_use flags while we're looking at them. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication slot \"%s\" already exists", name))); + if (!s->in_use && slot == NULL) + slot = s; + } + LWLockRelease(ReplicationSlotControlLock); + + /* If all slots are in use, we're out of luck. */ + if (slot == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("all replication slots are in use"), + errhint("Free one or increase max_replication_slots."))); + + /* + * Since this slot is not in use, nobody should be looking at any + * part of it other than the in_use field unless they're trying to allocate + * it. And since we hold ReplicationSlotAllocationLock, nobody except us + * can be doing that. So it's safe to initialize the slot. + */ + Assert(!slot->in_use); + Assert(!slot->active); + slot->data.xmin = InvalidTransactionId; + slot->effective_xmin = InvalidTransactionId; + strncpy(NameStr(slot->data.name), name, NAMEDATALEN); + NameStr(slot->data.name)[NAMEDATALEN - 1] = '\0'; + slot->data.database = db_specific ? MyDatabaseId : InvalidOid; + slot->data.restart_lsn = InvalidXLogRecPtr; + + /* + * Create the slot on disk. We haven't actually marked the slot allocated + * yet, so no special cleanup is required if this errors out. + */ + CreateSlotOnDisk(slot); + + /* + * We need to briefly prevent any other backend from iterating over the + * slots while we flip the in_use flag. We also need to set the active + * flag while holding the ControlLock as otherwise a concurrent + * SlotAcquire() could acquire the slot as well. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + + slot->in_use = true; + + /* We can now mark the slot active, and that makes it our slot. */ + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&slot->mutex); + Assert(!vslot->active); + vslot->active = true; + SpinLockRelease(&slot->mutex); + MyReplicationSlot = slot; + } + + LWLockRelease(ReplicationSlotControlLock); + + /* + * Now that the slot has been marked as in_use and in_active, it's safe to + * let somebody else try to allocate a slot. + */ + LWLockRelease(ReplicationSlotAllocationLock); +} + +/* + * Find an previously created slot and mark it as used by this backend. + */ +void +ReplicationSlotAcquire(const char *name) +{ + ReplicationSlot *slot = NULL; + int i; + bool active = false; + + Assert(MyReplicationSlot == NULL); + + ReplicationSlotValidateName(name, ERROR); + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + active = vslot->active; + vslot->active = true; + SpinLockRelease(&s->mutex); + slot = s; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + /* If we did not find the slot or it was already active, error out. */ + if (slot == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", name))); + if (active) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is already active", name))); + + /* We made this slot active, so it's ours now. */ + MyReplicationSlot = slot; +} + +/* + * Release a replication slot, this or another backend can ReAcquire it + * later. Resources this slot requires will be preserved. + */ +void +ReplicationSlotRelease(void) +{ + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL && slot->active); + + /* Mark slot inactive. We're not freeing it, just disconnecting. */ + { + volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); + vslot->active = false; + SpinLockRelease(&slot->mutex); + MyReplicationSlot = NULL; + } +} + +/* + * Permanently drop replication slot identified by the passed in name. + */ +void +ReplicationSlotDrop(const char *name) +{ + ReplicationSlot *slot = NULL; + int i; + bool active; + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + + ReplicationSlotValidateName(name, ERROR); + + /* + * If some other backend ran this code currently with us, we might both + * try to free the same slot at the same time. Or we might try to delete + * a slot with a certain name while someone else was trying to create a + * slot with the same name. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + + /* Search for the named slot and mark it active if we find it. */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + active = vslot->active; + vslot->active = true; + SpinLockRelease(&s->mutex); + slot = s; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + /* If we did not find the slot or it was already active, error out. */ + if (slot == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", name))); + if (active) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is already active", name))); + + /* Generate pathnames. */ + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); + + /* + * Rename the slot directory on disk, so that we'll no longer recognize + * this as a valid slot. Note that if this fails, we've got to mark the + * slot inactive again before bailing out. + */ + if (rename(path, tmppath) != 0) + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&slot->mutex); + vslot->active = false; + SpinLockRelease(&slot->mutex); + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename \"%s\" to \"%s\": %m", + path, tmppath))); + } + + /* + * We need to fsync() the directory we just renamed and its parent to make + * sure that our changes are on disk in a crash-safe fashion. If fsync() + * fails, we can't be sure whether the changes are on disk or not. For + * now, we handle that by panicking; StartupReplicationSlots() will + * try to straighten it out after restart. + */ + START_CRIT_SECTION(); + fsync_fname(tmppath, true); + fsync_fname("pg_replslot", true); + END_CRIT_SECTION(); + + /* + * The slot is definitely gone. Lock out concurrent scans of the array + * long enough to kill it. It's OK to clear the active flag here without + * grabbing the mutex because nobody else can be scanning the array here, + * and nobody can be attached to this slot and thus access it without + * scanning the array. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + slot->active = false; + slot->in_use = false; + LWLockRelease(ReplicationSlotControlLock); + + /* + * Slot is dead and doesn't prevent resource removal anymore, recompute + * limits. + */ + ReplicationSlotsComputeRequiredXmin(); + ReplicationSlotsComputeRequiredLSN(); + + /* + * If removing the directory fails, the worst thing that will happen is + * that the user won't be able to create a new slot with the same name + * until the next server restart. We warn about it, but that's all. + */ + if (!rmtree(tmppath, true)) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove directory \"%s\"", tmppath))); + + /* + * We release this at the very end, so that nobody starts trying to create + * a slot while we're still cleaning up the detritus of the old one. + */ + LWLockRelease(ReplicationSlotAllocationLock); +} + +/* + * Serialize the currently acquired slot's state from memory to disk, thereby + * guaranteeing the current state will survive a crash. + */ +void +ReplicationSlotSave(void) +{ + char path[MAXPGPATH]; + + Assert(MyReplicationSlot != NULL); + + sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); + SaveSlotToPath(MyReplicationSlot, path, ERROR); +} + +/* + * Signal that it would be useful if the currently acquired slot would be + * flushed out to disk. + * + * Note that the actual flush to disk can be delayed for a long time, if + * required for correctness explicitly do a ReplicationSlotSave(). + */ +void +ReplicationSlotMarkDirty(void) +{ + Assert(MyReplicationSlot != NULL); + + { + volatile ReplicationSlot *vslot = MyReplicationSlot; + + SpinLockAcquire(&vslot->mutex); + MyReplicationSlot->just_dirtied = true; + MyReplicationSlot->dirty = true; + SpinLockRelease(&vslot->mutex); + } +} + +/* + * Compute the oldest xmin across all slots and store it in the ProcArray. + */ +void +ReplicationSlotsComputeRequiredXmin(void) +{ + int i; + TransactionId agg_xmin = InvalidTransactionId; + + Assert(ReplicationSlotCtl != NULL); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + TransactionId effective_xmin; + + if (!s->in_use) + continue; + + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + effective_xmin = vslot->effective_xmin; + SpinLockRelease(&s->mutex); + } + + /* check the data xmin */ + if (TransactionIdIsValid(effective_xmin) && + (!TransactionIdIsValid(agg_xmin) || + TransactionIdPrecedes(effective_xmin, agg_xmin))) + agg_xmin = effective_xmin; + } + LWLockRelease(ReplicationSlotControlLock); + + ProcArraySetReplicationSlotXmin(agg_xmin); +} + +/* + * Compute the oldest restart LSN across all slots and inform xlog module. + */ +void +ReplicationSlotsComputeRequiredLSN(void) +{ + int i; + XLogRecPtr min_required = InvalidXLogRecPtr; + + Assert(ReplicationSlotCtl != NULL); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + XLogRecPtr restart_lsn; + + if (!s->in_use) + continue; + + { + volatile ReplicationSlot *vslot = s; + + SpinLockAcquire(&s->mutex); + restart_lsn = vslot->data.restart_lsn; + SpinLockRelease(&s->mutex); + } + + if (restart_lsn != InvalidXLogRecPtr && + (min_required == InvalidXLogRecPtr || + restart_lsn < min_required)) + min_required = restart_lsn; + } + LWLockRelease(ReplicationSlotControlLock); + + XLogSetReplicationSlotMinimumLSN(min_required); +} + +/* + * Check whether the server's configuration supports using replication + * slots. + */ +void +CheckSlotRequirements(void) +{ + if (max_replication_slots == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("replication slots can only be used if max_replication_slots > 0")))); + + if (wal_level < WAL_LEVEL_ARCHIVE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slots can only be used if wal_level >= archive"))); +} + +/* + * Returns whether the string `str' has the postfix `end'. + */ +static bool +string_endswith(const char *str, const char *end) +{ + size_t slen = strlen(str); + size_t elen = strlen(end); + + /* can't be a postfix if longer */ + if (elen > slen) + return false; + + /* compare the end of the strings */ + str += slen - elen; + return strcmp(str, end) == 0; +} + +/* + * Flush all replication slots to disk. + * + * This needn't actually be part of a checkpoint, but it's a convenient + * location. + */ +void +CheckPointReplicationSlots(void) +{ + int i; + + ereport(DEBUG1, + (errmsg("performing replication slot checkpoint"))); + + /* + * Prevent any slot from being created/dropped while we're active. As we + * explicitly do *not* want to block iterating over replication_slots or + * acquiring a slot we cannot take the control lock - but that's OK, + * because holding ReplicationSlotAllocationLock is strictly stronger, + * and enough to guarantee that nobody can change the in_use bits on us. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + char path[MAXPGPATH]; + + if (!s->in_use) + continue; + + /* save the slot to disk, locking is handled in SaveSlotToPath() */ + sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); + SaveSlotToPath(s, path, LOG); + } + LWLockRelease(ReplicationSlotAllocationLock); +} + +/* + * Load all replication slots from disk into memory at server startup. This + * needs to be run before we start crash recovery. + */ +void +StartupReplicationSlots(XLogRecPtr checkPointRedo) +{ + DIR *replication_dir; + struct dirent *replication_de; + + ereport(DEBUG1, + (errmsg("starting up replication slots"))); + + /* restore all slots by iterating over all on-disk entries */ + replication_dir = AllocateDir("pg_replslot"); + while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL) + { + struct stat statbuf; + char path[MAXPGPATH]; + + if (strcmp(replication_de->d_name, ".") == 0 || + strcmp(replication_de->d_name, "..") == 0) + continue; + + snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name); + + /* we're only creating directories here, skip if it's not our's */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + continue; + + /* we crashed while a slot was being setup or deleted, clean up */ + if (string_endswith(replication_de->d_name, ".tmp")) + { + if (!rmtree(path, true)) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove directory \"%s\"", path))); + continue; + } + fsync_fname("pg_replslot", true); + continue; + } + + /* looks like a slot in a normal state, restore */ + RestoreSlotFromDisk(replication_de->d_name); + } + FreeDir(replication_dir); + + /* currently no slots exist, we're done. */ + if (max_replication_slots <= 0) + return; + + /* Now that we have recovered all the data, compute replication xmin */ + ReplicationSlotsComputeRequiredXmin(); + ReplicationSlotsComputeRequiredLSN(); +} + +/* ---- + * Manipulation of ondisk state of replication slots + * + * NB: none of the routines below should take any notice whether a slot is the + * current one or not, that's all handled a layer above. + * ---- + */ +static void +CreateSlotOnDisk(ReplicationSlot *slot) +{ + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + struct stat st; + + /* + * No need to take out the io_in_progress_lock, nobody else can see this + * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which + * takes out the lock, if we'd take the lock here, we'd deadlock. + */ + + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); + + /* + * It's just barely possible that some previous effort to create or + * drop a slot with this name left a temp directory lying around. + * If that seems to be the case, try to remove it. If the rmtree() + * fails, we'll error out at the mkdir() below, so we don't bother + * checking success. + */ + if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode)) + rmtree(tmppath, true); + + /* Create and fsync the temporary slot directory. */ + if (mkdir(tmppath, S_IRWXU) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + tmppath))); + fsync_fname(tmppath, true); + + /* Write the actual state file. */ + slot->dirty = true; /* signal that we really need to write */ + SaveSlotToPath(slot, tmppath, ERROR); + + /* Rename the directory into place. */ + if (rename(tmppath, path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + tmppath, path))); + + /* + * If we'd now fail - really unlikely - we wouldn't know wether this slot + * would persist after an OS crash or not - so, force a restart. The + * restart would try to fysnc this again till it works. + */ + START_CRIT_SECTION(); + + fsync_fname(path, true); + fsync_fname("pg_replslot", true); + + END_CRIT_SECTION(); +} + +/* + * Shared functionality between saving and creating a replication slot. + */ +static void +SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) +{ + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + int fd; + ReplicationSlotOnDisk cp; + bool was_dirty; + + /* first check whether there's something to write out */ + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&vslot->mutex); + was_dirty = vslot->dirty; + vslot->just_dirtied = false; + SpinLockRelease(&vslot->mutex); + } + + /* and don't do anything if there's nothing to write */ + if (!was_dirty) + return; + + LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE); + + /* silence valgrind :( */ + memset(&cp, 0, sizeof(ReplicationSlotOnDisk)); + + sprintf(tmppath, "%s/state.tmp", dir); + sprintf(path, "%s/state", dir); + + fd = OpenTransientFile(tmppath, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + { + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + tmppath))); + return; + } + + cp.magic = SLOT_MAGIC; + INIT_CRC32(cp.checksum); + cp.version = 1; + cp.length = ReplicationSlotOnDiskDynamicSize; + + SpinLockAcquire(&slot->mutex); + + memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData)); + + SpinLockRelease(&slot->mutex); + + COMP_CRC32(cp.checksum, + (char *)(&cp) + ReplicationSlotOnDiskConstantSize, + ReplicationSlotOnDiskDynamicSize); + + if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) + { + int save_errno = errno; + CloseTransientFile(fd); + errno = save_errno; + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + tmppath))); + return; + } + + /* fsync the temporary file */ + if (pg_fsync(fd) != 0) + { + int save_errno = errno; + CloseTransientFile(fd); + errno = save_errno; + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + tmppath))); + return; + } + + CloseTransientFile(fd); + + /* rename to permanent file, fsync file and directory */ + if (rename(tmppath, path) != 0) + { + ereport(elevel, + (errcode_for_file_access(), + errmsg("could not rename \"%s\" to \"%s\": %m", + tmppath, path))); + return; + } + + /* Check CreateSlot() for the reasoning of using a crit. section. */ + START_CRIT_SECTION(); + + fsync_fname(path, false); + fsync_fname((char *) dir, true); + fsync_fname("pg_replslot", true); + + END_CRIT_SECTION(); + + /* + * Successfully wrote, unset dirty bit, unless somebody dirtied again + * already. + */ + { + volatile ReplicationSlot *vslot = slot; + + SpinLockAcquire(&vslot->mutex); + if (!vslot->just_dirtied) + vslot->dirty = false; + SpinLockRelease(&vslot->mutex); + } + + LWLockRelease(slot->io_in_progress_lock); +} + +/* + * Load a single slot from disk into memory. + */ +static void +RestoreSlotFromDisk(const char *name) +{ + ReplicationSlotOnDisk cp; + int i; + char path[MAXPGPATH]; + int fd; + bool restored = false; + int readBytes; + pg_crc32 checksum; + + /* no need to lock here, no concurrent access allowed yet */ + + /* delete temp file if it exists */ + sprintf(path, "pg_replslot/%s/state.tmp", name); + if (unlink(path) < 0 && errno != ENOENT) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not unlink file \"%s\": %m", path))); + + sprintf(path, "pg_replslot/%s/state", name); + + elog(DEBUG1, "restoring replication slot from \"%s\"", path); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + + /* + * We do not need to handle this as we are rename()ing the directory into + * place only after we fsync()ed the state file. + */ + if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* + * Sync state file before we're reading from it. We might have crashed + * while it wasn't synced yet and we shouldn't continue on that basis. + */ + if (pg_fsync(fd) != 0) + { + CloseTransientFile(fd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + path))); + } + + /* Also sync the parent directory */ + START_CRIT_SECTION(); + fsync_fname(path, true); + END_CRIT_SECTION(); + + /* read part of statefile that's guaranteed to be version independent */ + readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize); + if (readBytes != ReplicationSlotOnDiskConstantSize) + { + int saved_errno = errno; + + CloseTransientFile(fd); + errno = saved_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\", read %d of %u: %m", + path, readBytes, + (uint32) ReplicationSlotOnDiskConstantSize))); + } + + /* verify magic */ + if (cp.magic != SLOT_MAGIC) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("replication slot file \"%s\" has wrong magic %u instead of %u", + path, cp.magic, SLOT_MAGIC))); + + /* verify version */ + if (cp.version != SLOT_VERSION) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("replication slot file \"%s\" has unsupported version %u", + path, cp.version))); + + /* boundary check on length */ + if (cp.length != ReplicationSlotOnDiskDynamicSize) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("replication slot file \"%s\" has corrupted length %u", + path, cp.length))); + + /* Now that we know the size, read the entire file */ + readBytes = read(fd, + (char *)&cp + ReplicationSlotOnDiskConstantSize, + cp.length); + if (readBytes != cp.length) + { + int saved_errno = errno; + + CloseTransientFile(fd); + errno = saved_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\", read %d of %u: %m", + path, readBytes, cp.length))); + } + + CloseTransientFile(fd); + + /* now verify the CRC32 */ + INIT_CRC32(checksum); + COMP_CRC32(checksum, + (char *)&cp + ReplicationSlotOnDiskConstantSize, + ReplicationSlotOnDiskDynamicSize); + + if (!EQ_CRC32(checksum, cp.checksum)) + ereport(PANIC, + (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u", + path, checksum, cp.checksum))); + + /* nothing can be active yet, don't lock anything */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot; + + slot = &ReplicationSlotCtl->replication_slots[i]; + + if (slot->in_use) + continue; + + /* restore the entire set of persistent data */ + memcpy(&slot->data, &cp.slotdata, + sizeof(ReplicationSlotPersistentData)); + + /* initialize in memory state */ + slot->effective_xmin = cp.slotdata.xmin; + slot->in_use = true; + slot->active = false; + + restored = true; + break; + } + + if (!restored) + ereport(PANIC, + (errmsg("too many replication slots active before shutdown"), + errhint("Increase max_replication_slots and try again."))); +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c new file mode 100644 index 0000000000..98a860e528 --- /dev/null +++ b/src/backend/replication/slotfuncs.c @@ -0,0 +1,193 @@ +/*------------------------------------------------------------------------- + * + * slotfuncs.c + * Support functions for replication slots + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/slotfuncs.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" +#include "access/htup_details.h" +#include "utils/builtins.h" +#include "replication/slot.h" + +Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS); +Datum pg_drop_replication_slot(PG_FUNCTION_ARGS); + +static void +check_permissions(void) +{ + if (!superuser() && !has_rolreplication(GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser or replication role to use replication slots")))); +} + +/* + * SQL function for creating a new physical (streaming replication) + * replication slot. + */ +Datum +pg_create_physical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Datum values[2]; + bool nulls[2]; + TupleDesc tupdesc; + HeapTuple tuple; + Datum result; + + check_permissions(); + + CheckSlotRequirements(); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* acquire replication slot, this will check for conflicting names*/ + ReplicationSlotCreate(NameStr(*name), false); + + values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); + + nulls[0] = false; + nulls[1] = true; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + ReplicationSlotRelease(); + + PG_RETURN_DATUM(result); +} + +/* + * SQL function for dropping a replication slot. + */ +Datum +pg_drop_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + + check_permissions(); + + CheckSlotRequirements(); + + ReplicationSlotDrop(NameStr(*name)); + + PG_RETURN_VOID(); +} + +/* + * pg_get_replication_slots - SQL SRF showing active replication slots. + */ +Datum +pg_get_replication_slots(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int slotno; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* + * We don't require any special permission to see this function's data + * because nothing should be sensitive. The most critical being the slot + * name, which shouldn't contain anything particularly sensitive. + */ + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + for (slotno = 0; slotno < max_replication_slots; slotno++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS]; + bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS]; + + TransactionId xmin; + XLogRecPtr restart_lsn; + bool active; + Oid database; + const char *slot_name; + + char restart_lsn_s[MAXFNAMELEN]; + int i; + + SpinLockAcquire(&slot->mutex); + if (!slot->in_use) + { + SpinLockRelease(&slot->mutex); + continue; + } + else + { + xmin = slot->data.xmin; + database = slot->data.database; + restart_lsn = slot->data.restart_lsn; + slot_name = pstrdup(NameStr(slot->data.name)); + + active = slot->active; + } + SpinLockRelease(&slot->mutex); + + memset(nulls, 0, sizeof(nulls)); + + snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X", + (uint32) (restart_lsn >> 32), (uint32) restart_lsn); + + i = 0; + values[i++] = CStringGetTextDatum(slot_name); + if (database == InvalidOid) + values[i++] = CStringGetTextDatum("physical"); + else + values[i++] = CStringGetTextDatum("logical"); + values[i++] = database; + values[i++] = BoolGetDatum(active); + if (xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(xmin); + else + nulls[i++] = true; + if (restart_lsn != InvalidTransactionId) + values[i++] = CStringGetTextDatum(restart_lsn_s); + else + nulls[i++] = true; + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 1fbd33ef61..cc3d775307 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -187,6 +187,7 @@ void WalReceiverMain(void) { char conninfo[MAXCONNINFO]; + char slotname[NAMEDATALEN]; XLogRecPtr startpoint; TimeLineID startpointTLI; TimeLineID primaryTLI; @@ -241,6 +242,7 @@ WalReceiverMain(void) /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); startpoint = walrcv->receiveStart; startpointTLI = walrcv->receiveStartTLI; @@ -355,7 +357,8 @@ WalReceiverMain(void) * on the new timeline. */ ThisTimeLineID = startpointTLI; - if (walrcv_startstreaming(startpointTLI, startpoint)) + if (walrcv_startstreaming(startpointTLI, startpoint, + slotname[0] != '\0' ? slotname : NULL)) { bool endofwal = false; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index cc96d7c2f8..acadec57f5 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -219,11 +219,13 @@ ShutdownWalRcv(void) /* * Request postmaster to start walreceiver. * - * recptr indicates the position where streaming should begin, and conninfo - * is a libpq connection string to use. + * recptr indicates the position where streaming should begin, conninfo + * is a libpq connection string to use, and slotname is, optionally, the name + * of a replication slot to acquire. */ void -RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) +RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, + const char *slotname) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -250,6 +252,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) else walrcv->conninfo[0] = '\0'; + if (slotname != NULL) + strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN); + else + walrcv->slotname[0] = '\0'; + if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 652487e3de..119a920af2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -53,6 +53,7 @@ #include "miscadmin.h" #include "nodes/replnodes.h" #include "replication/basebackup.h" +#include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -218,12 +219,17 @@ InitWalSender(void) void WalSndErrorCleanup() { + LWLockReleaseAll(); + if (sendFile >= 0) { close(sendFile); sendFile = -1; } + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + replication_active = false; if (walsender_ready_to_stop) proc_exit(0); @@ -421,6 +427,15 @@ StartReplication(StartReplicationCmd *cmd) * written at wal_level='minimal'. */ + if (cmd->slotname) + { + ReplicationSlotAcquire(cmd->slotname); + if (MyReplicationSlot->data.database != InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("cannot use a replication slot created for changeset extraction for streaming replication")))); + } + /* * Select the timeline. If it was given explicitly by the client, use * that. Otherwise use the timeline of the last replayed record, which is @@ -565,6 +580,9 @@ StartReplication(StartReplicationCmd *cmd) Assert(streamingDoneSending && streamingDoneReceiving); } + if (cmd->slotname) + ReplicationSlotRelease(); + /* * Copy is finished now. Send a single-row result set indicating the next * timeline. @@ -622,6 +640,75 @@ StartReplication(StartReplicationCmd *cmd) pq_puttextmessage('C', "START_STREAMING"); } +/* + * Create a new replication slot. + */ +static void +CreateReplicationSlot(CreateReplicationSlotCmd *cmd) +{ + const char *slot_name; + StringInfoData buf; + + Assert(!MyReplicationSlot); + + /* setup state for XLogReadPage */ + sendTimeLineIsHistoric = false; + sendTimeLine = ThisTimeLineID; + + ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL); + + initStringInfo(&output_message); + + slot_name = NameStr(MyReplicationSlot->data.name); + + /* + * It may seem somewhat pointless to send back the same slot name the + * client just requested and nothing else, but logical replication + * will add more fields here. (We could consider removing the slot + * name from what's sent back, though, since the client has specified + * that.) + */ + + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 1, 2); /* 1 field */ + + /* first field: slot name */ + pq_sendstring(&buf, "slot_name"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 1, 2); /* # of columns */ + + /* slot_name */ + pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */ + pq_sendbytes(&buf, slot_name, strlen(slot_name)); + + pq_endmessage(&buf); + + /* + * release active status again, START_REPLICATION will reacquire it + */ + ReplicationSlotRelease(); +} + +/* + * Get rid of a replication slot that is no longer wanted. + */ +static void +DropReplicationSlot(DropReplicationSlotCmd *cmd) +{ + ReplicationSlotDrop(cmd->slotname); + EndCommand("DROP_REPLICATION_SLOT", DestRemote); +} + /* * Execute an incoming replication command. */ @@ -660,14 +747,28 @@ exec_replication_command(const char *cmd_string) IdentifySystem(); break; - case T_StartReplicationCmd: - StartReplication((StartReplicationCmd *) cmd_node); - break; - case T_BaseBackupCmd: SendBaseBackup((BaseBackupCmd *) cmd_node); break; + case T_CreateReplicationSlotCmd: + CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + break; + + case T_DropReplicationSlotCmd: + DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + break; + + case T_StartReplicationCmd: + { + StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + if (cmd->kind == REPLICATION_KIND_PHYSICAL) + StartReplication(cmd); + else + elog(ERROR, "cannot handle changeset extraction yet"); + break; + } + case T_TimeLineHistoryCmd: SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); break; @@ -830,6 +931,39 @@ ProcessStandbyMessage(void) } } +/* + * Remember that a walreceiver just confirmed receipt of lsn `lsn`. + */ +static void +PhysicalConfirmReceivedLocation(XLogRecPtr lsn) +{ + bool changed = false; + /* use volatile pointer to prevent code rearrangement */ + volatile ReplicationSlot *slot = MyReplicationSlot; + + Assert(lsn != InvalidXLogRecPtr); + SpinLockAcquire(&slot->mutex); + if (slot->data.restart_lsn != lsn) + { + changed = true; + slot->data.restart_lsn = lsn; + } + SpinLockRelease(&slot->mutex); + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredLSN(); + } + + /* + * One could argue that the slot should saved to disk now, but that'd be + * energy wasted - the worst lost information can do here is give us wrong + * information in a statistics view - we'll just potentially be more + * conservative in removing files. + */ +} + /* * Regular reply from standby advising of WAL positions on standby server. */ @@ -875,6 +1009,48 @@ ProcessStandbyReplyMessage(void) if (!am_cascading_walsender) SyncRepReleaseWaiters(); + + /* + * Advance our local xmin horizon when the client confirmed a flush. + */ + if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) + { + if (MyReplicationSlot->data.database != InvalidOid) + elog(ERROR, "cannot handle changeset extraction yet"); + else + PhysicalConfirmReceivedLocation(flushPtr); + } +} + +/* compute new replication slot xmin horizon if needed */ +static void +PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) +{ + bool changed = false; + volatile ReplicationSlot *slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + MyPgXact->xmin = InvalidTransactionId; + /* + * For physical replication we don't need the the interlock provided + * by xmin and effective_xmin since the consequences of a missed increase + * are limited to query cancellations, so set both at once. + */ + if (!TransactionIdIsNormal(slot->data.xmin) || + !TransactionIdIsNormal(feedbackXmin) || + TransactionIdPrecedes(slot->data.xmin, feedbackXmin)) + { + changed = true; + slot->data.xmin = feedbackXmin; + slot->effective_xmin = feedbackXmin; + } + SpinLockRelease(&slot->mutex); + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(); + } } /* @@ -904,6 +1080,8 @@ ProcessStandbyHSFeedbackMessage(void) if (!TransactionIdIsNormal(feedbackXmin)) { MyPgXact->xmin = InvalidTransactionId; + if (MyReplicationSlot != NULL) + PhysicalReplicationSlotNewXmin(feedbackXmin); return; } @@ -951,8 +1129,17 @@ ProcessStandbyHSFeedbackMessage(void) * GetOldestXmin. (If we're moving our xmin forward, this is obviously * safe, and if we're moving it backwards, well, the data is at risk * already since a VACUUM could have just finished calling GetOldestXmin.) + * + * If we're using a replication slot we reserve the xmin via that, + * otherwise via the walsender's PGXACT entry. + + * XXX: It might make sense to introduce ephemeral slots and always use + * the slot mechanism. */ - MyPgXact->xmin = feedbackXmin; + if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ + PhysicalReplicationSlotNewXmin(feedbackXmin); + else + MyPgXact->xmin = feedbackXmin; } /* Main loop of walsender process that streams the WAL over Copy messages. */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2e717457b1..c392d4fa22 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -27,6 +27,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, ProcSignalShmemSize()); size = add_size(size, CheckpointerShmemSize()); size = add_size(size, AutoVacuumShmemSize()); + size = add_size(size, ReplicationSlotsShmemSize()); size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); @@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) ProcSignalShmemInit(); CheckpointerShmemInit(); AutoVacuumShmemInit(); + ReplicationSlotsShmemInit(); WalSndShmemInit(); WalRcvShmemInit(); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index b68c95612c..082115b4ff 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -82,6 +82,9 @@ typedef struct ProcArrayStruct */ TransactionId lastOverflowedXid; + /* oldest xmin of any replication slot */ + TransactionId replication_slot_xmin; + /* * We declare pgprocnos[] as 1 entry because C wants a fixed-size array, * but actually it is maxProcs entries long. @@ -228,6 +231,7 @@ CreateSharedProcArray(void) */ procArray->numProcs = 0; procArray->maxProcs = PROCARRAY_MAXPROCS; + procArray->replication_slot_xmin = InvalidTransactionId; procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS; procArray->numKnownAssignedXids = 0; procArray->tailKnownAssignedXids = 0; @@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) ProcArrayStruct *arrayP = procArray; TransactionId result; int index; + volatile TransactionId replication_slot_xmin = InvalidTransactionId; /* Cannot look for individual databases during recovery */ Assert(allDbs || !RecoveryInProgress()); @@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) } } + /* fetch into volatile var while ProcArrayLock is held */ + replication_slot_xmin = procArray->replication_slot_xmin; + if (RecoveryInProgress()) { /* @@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) result = FirstNormalTransactionId; } + /* + * Check whether there are replication slots requiring an older xmin. + */ + if (TransactionIdIsValid(replication_slot_xmin) && + NormalTransactionIdPrecedes(replication_slot_xmin, result)) + result = replication_slot_xmin; + return result; } @@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot) int count = 0; int subcount = 0; bool suboverflowed = false; + volatile TransactionId replication_slot_xmin = InvalidTransactionId; Assert(snapshot != NULL); @@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot) suboverflowed = true; } + + /* fetch into volatile var while ProcArrayLock is held */ + replication_slot_xmin = procArray->replication_slot_xmin; + if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); /* @@ -1506,6 +1527,12 @@ GetSnapshotData(Snapshot snapshot) RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age; if (!TransactionIdIsNormal(RecentGlobalXmin)) RecentGlobalXmin = FirstNormalTransactionId; + + /* Check whether there's a replication slot requiring an older xmin. */ + if (TransactionIdIsValid(replication_slot_xmin) && + NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) + RecentGlobalXmin = replication_slot_xmin; + RecentXmin = xmin; snapshot->xmin = xmin; @@ -2491,6 +2518,21 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) return true; /* timed out, still conflicts */ } +/* + * ProcArraySetReplicationSlotXmin + * + * Install limits to future computations of the xmin horizon to prevent vacuum + * and HOT pruning from removing affected rows still needed by clients with + * replicaton slots. + */ +void +ProcArraySetReplicationSlotXmin(TransactionId xmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + procArray->replication_slot_xmin = xmin; + LWLockRelease(ProcArrayLock); +} + #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 55d9d7837c..82ef440949 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -27,6 +27,7 @@ #include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" +#include "replication/slot.h" #include "storage/ipc.h" #include "storage/predicate.h" #include "storage/proc.h" @@ -238,6 +239,9 @@ NumLWLocks(void) /* predicate.c needs one per old serializable xid buffer */ numLocks += NUM_OLDSERXID_BUFFERS; + /* slot.c needs one for each slot */ + numLocks += max_replication_slots; + /* * Add any requested by loadable modules; for backwards-compatibility * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 9d32f9405d..fb449a8820 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -40,6 +40,7 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#include "replication/slot.h" #include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -780,6 +781,10 @@ ProcKill(int code, Datum arg) /* Make sure we're out of the sync rep lists */ SyncRepCleanupAtProcExit(); + /* Make sure active replication slots are released */ + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + #ifdef USE_ASSERT_CHECKING if (assert_enabled) { diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a9b9794965..70d73d9898 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -57,6 +57,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" +#include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -2123,6 +2124,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + /* see max_connections */ + {"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING, + gettext_noop("Sets the maximum number of simultaneously defined replication slots."), + NULL + }, + &max_replication_slots, + 0, 0, MAX_BACKENDS /* XXX?*/, + NULL, NULL, NULL + }, + { {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c8673b382d..d10e8a5783 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -226,6 +226,9 @@ #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables +#max_replication_slots = 0 # max number of replication slots. + # (change requires restart) + # - Master Server - # These settings are ignored on a standby server. diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 6b5302f6fd..a71320d945 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -195,6 +195,7 @@ const char *subdirs[] = { "pg_multixact/offsets", "base", "base/1", + "pg_replslot", "pg_tblspc", "pg_stat", "pg_stat_tmp" diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 3c6ab9a902..8a702e3388 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -67,6 +67,7 @@ usage(void) printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_(" --slot replication slot to use\n")); printf(_("\nReport bugs to .\n")); } @@ -343,6 +344,7 @@ main(int argc, char **argv) {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, + {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} }; @@ -409,6 +411,9 @@ main(int argc, char **argv) exit(1); } break; + case 'S': + replication_slot = pg_strdup(optarg); + break; case 'n': noloop = 1; break; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 2555904cd0..7d3c76c994 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -31,6 +31,8 @@ /* fd and filename for currently open WAL file */ static int walfile = -1; static char current_walfile_name[MAXPGPATH] = ""; +static bool reportFlushPosition = false; +static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, @@ -133,7 +135,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *partial_suffix) +close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) { off_t currpos; @@ -187,6 +189,7 @@ close_walfile(char *basedir, char *partial_suffix) _("%s: not renaming \"%s%s\", segment is not complete\n"), progname, current_walfile_name, partial_suffix); + lastFlushPosition = pos; return true; } @@ -421,7 +424,10 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) len += 1; sendint64(blockpos, &replybuf[len]); /* write */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + if (reportFlushPosition) + sendint64(lastFlushPosition, &replybuf[len]); /* flush */ + else + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ len += 8; sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ len += 8; @@ -511,6 +517,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int standby_message_timeout, char *partial_suffix) { char query[128]; + char slotcmd[128]; PGresult *res; XLogRecPtr stoppos; @@ -521,6 +528,29 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (!CheckServerVersionForStreaming(conn)) return false; + if (replication_slot != NULL) + { + /* + * Report the flush position, so the primary can know what WAL we'll + * possibly re-request, and remove older WAL safely. + * + * We only report it when a slot has explicitly been used, because + * reporting the flush position makes one elegible as a synchronous + * replica. People shouldn't include generic names in + * synchronous_standby_names, but we've protected them against it so + * far, so let's continue to do so in the situations when possible. + * If they've got a slot, though, we need to report the flush position, + * so that the master can remove WAL. + */ + reportFlushPosition = true; + sprintf(slotcmd, "SLOT \"%s\" ", replication_slot); + } + else + { + reportFlushPosition = false; + slotcmd[0] = 0; + } + if (sysidentifier != NULL) { /* Validate system identifier hasn't changed */ @@ -560,6 +590,12 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); } + /* + * initialize flush position to starting point, it's the caller's + * responsibility that that's sane. + */ + lastFlushPosition = startpos; + while (1) { /* @@ -606,7 +642,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, return true; /* Initiate the replication stream at specified location */ - snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u", + snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", + slotcmd, (uint32) (startpos >> 32), (uint32) startpos, timeline); res = PQexec(conn, query); @@ -810,7 +847,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, */ if (still_sending && stream_stop(blockpos, timeline, false)) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, blockpos)) { /* Potential error message is written by close_walfile */ goto error; @@ -909,7 +946,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, */ if (still_sending) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, blockpos)) { /* Error message written in close_walfile() */ goto error; @@ -1074,7 +1111,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Did we reach the end of a WAL segment? */ if (blockpos % XLOG_SEG_SIZE == 0) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, blockpos)) /* Error message written in close_walfile() */ goto error; diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 96fbed898f..041076ff1d 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -22,6 +22,7 @@ char *connection_string = NULL; char *dbhost = NULL; char *dbuser = NULL; char *dbport = NULL; +char *replication_slot = NULL; int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ static char *dbpassword = NULL; PGconn *conn = NULL; diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 77d6b86ced..bb3c34db07 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -6,6 +6,7 @@ extern char *dbhost; extern char *dbuser; extern char *dbport; extern int dbgetpassword; +extern char *replication_slot; /* Connection kept global so we can disconnect easily */ extern PGconn *conn; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 47e302276b..11ab277199 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -289,6 +289,7 @@ extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); extern void XLogSetAsyncXactLSN(XLogRecPtr record); +extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index c4661a8a66..ad4def37b9 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201401301 +#define CATALOG_VERSION_NO 201401311 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 1682fa9f99..d7bb21eccc 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -4782,6 +4782,14 @@ DESCR("SP-GiST support for quad tree over range"); DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_ spg_range_quad_leaf_consistent _null_ _null_ _null_ )); DESCR("SP-GiST support for quad tree over range"); +/* replication slots */ +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,25,25}" "{i,o,o}" "{slotname,slotname,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DESCR("create a physical replication slot"); +DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); +DESCR("drop a replication slot"); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DESCR("information about replication slots currently in use"); + /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); DESCR("list objects dropped by the current command"); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index dfcc01344e..5b8df59bc6 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -412,6 +412,8 @@ typedef enum NodeTag */ T_IdentifySystemCmd, T_BaseBackupCmd, + T_CreateReplicationSlotCmd, + T_DropReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 40971153b0..aac75fd102 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -17,6 +17,11 @@ #include "access/xlogdefs.h" #include "nodes/pg_list.h" +typedef enum ReplicationKind { + REPLICATION_KIND_PHYSICAL, + REPLICATION_KIND_LOGICAL +} ReplicationKind; + /* ---------------------- * IDENTIFY_SYSTEM command @@ -39,6 +44,30 @@ typedef struct BaseBackupCmd } BaseBackupCmd; +/* ---------------------- + * CREATE_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct CreateReplicationSlotCmd +{ + NodeTag type; + char *slotname; + ReplicationKind kind; + char *plugin; +} CreateReplicationSlotCmd; + + +/* ---------------------- + * DROP_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct DropReplicationSlotCmd +{ + NodeTag type; + char *slotname; +} DropReplicationSlotCmd; + + /* ---------------------- * START_REPLICATION command * ---------------------- @@ -46,8 +75,11 @@ typedef struct BaseBackupCmd typedef struct StartReplicationCmd { NodeTag type; + ReplicationKind kind; + char *slotname; TimeLineID timeline; XLogRecPtr startpoint; + List *options; } StartReplicationCmd; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h new file mode 100644 index 0000000000..089b0f4b70 --- /dev/null +++ b/src/include/replication/slot.h @@ -0,0 +1,120 @@ +/*------------------------------------------------------------------------- + * slot.h + * Replication slot management. + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef SLOT_H +#define SLOT_H + +#include "fmgr.h" +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +typedef struct ReplicationSlotPersistentData +{ + /* The slot's identifier */ + NameData name; + + /* database the slot is active on */ + Oid database; + + /* + * xmin horizon for data + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId xmin; + + /* oldest LSN that might be required by this replication slot */ + XLogRecPtr restart_lsn; + +} ReplicationSlotPersistentData; + +/* + * Shared memory state of a single replication slot. + */ +typedef struct ReplicationSlot +{ + /* lock, on same cacheline as effective_xmin */ + slock_t mutex; + + /* is this slot defined */ + bool in_use; + + /* is somebody streaming out changes for this slot */ + bool active; + + /* any outstanding modifications? */ + bool just_dirtied; + bool dirty; + + /* + * For logical decoding, it's extremely important that we never remove any + * data that's still needed for decoding purposes, even after a crash; + * otherwise, decoding will produce wrong answers. Ordinary streaming + * replication also needs to prevent old row versions from being removed + * too soon, but the worst consequence we might encounter there is unwanted + * query cancellations on the standby. Thus, for logical decoding, + * this value represents the latest xmin that has actually been + * written to disk, whereas for streaming replication, it's just the + * same as the persistent value (data.xmin). + */ + TransactionId effective_xmin; + + /* data surviving shutdowns and crashes */ + ReplicationSlotPersistentData data; + + /* is somebody performing io on this slot? */ + LWLock *io_in_progress_lock; +} ReplicationSlot; + +/* + * Shared memory control area for all of replication slots. + */ +typedef struct ReplicationSlotCtlData +{ + ReplicationSlot replication_slots[1]; +} ReplicationSlotCtlData; + +/* + * Pointers to shared memory + */ +extern ReplicationSlotCtlData *ReplicationSlotCtl; +extern ReplicationSlot *MyReplicationSlot; + +/* GUCs */ +extern PGDLLIMPORT int max_replication_slots; + +/* shmem initialization functions */ +extern Size ReplicationSlotsShmemSize(void); +extern void ReplicationSlotsShmemInit(void); + +/* management of individual slots */ +extern void ReplicationSlotCreate(const char *name, bool db_specific); +extern void ReplicationSlotDrop(const char *name); +extern void ReplicationSlotAcquire(const char *name); +extern void ReplicationSlotRelease(void); +extern void ReplicationSlotSave(void); +extern void ReplicationSlotMarkDirty(void); + +/* misc stuff */ +extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern void ReplicationSlotsComputeRequiredXmin(void); +extern void ReplicationSlotsComputeRequiredLSN(void); +extern void StartupReplicationSlots(XLogRecPtr checkPointRedo); +extern void CheckPointReplicationSlots(void); + +extern void CheckSlotRequirements(void); +extern void ReplicationSlotAtProcExit(void); + +/* SQL callable functions */ +extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS); + +#endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 3c656197cc..3d9401059b 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -103,6 +103,12 @@ typedef struct */ char conninfo[MAXCONNINFO]; + /* + * replication slot name; is also used for walreceiver to connect with + * the primary + */ + char slotname[NAMEDATALEN]; + slock_t mutex; /* locks shared variables shown above */ /* @@ -125,7 +131,7 @@ extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system; typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size); extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile; -typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint); +typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname); extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming; typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli); @@ -149,7 +155,8 @@ extern void WalRcvShmemInit(void); extern void ShutdownWalRcv(void); extern bool WalRcvStreaming(void); extern bool WalRcvRunning(void); -extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo); +extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, + const char *conninfo, const char *slotname); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 4507926274..c8ff4ebfb8 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -125,7 +125,9 @@ extern LWLockPadded *MainLWLockArray; #define BackgroundWorkerLock (&MainLWLockArray[33].lock) #define DynamicSharedMemoryControlLock (&MainLWLockArray[34].lock) #define AutoFileLock (&MainLWLockArray[35].lock) -#define NUM_INDIVIDUAL_LWLOCKS 36 +#define ReplicationSlotAllocationLock (&MainLWLockArray[36].lock) +#define ReplicationSlotControlLock (&MainLWLockArray[37].lock) +#define NUM_INDIVIDUAL_LWLOCKS 38 /* * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 2947cc4af9..d1a58a3661 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -77,4 +77,6 @@ extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); +extern void ProcArraySetReplicationSlotXmin(TransactionId xmin); + #endif /* PROCARRAY_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 540373dc48..220e18b0bb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1367,6 +1367,15 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_replication_slots| SELECT l.slot_name, + l.slot_type, + l.datoid, + d.datname AS database, + l.active, + l.xmin, + l.restart_lsn + FROM (pg_get_replication_slots() l(slot_name, slot_type, datoid, active, xmin, restart_lsn) + LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, pg_authid.rolinherit, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ad40735333..3b7f61ef20 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -343,6 +343,7 @@ CreateOpClassItem CreateOpClassStmt CreateOpFamilyStmt CreatePLangStmt +CreateReplicationSlotCmd CreateRangeStmt CreateRoleStmt CreateSchemaStmt @@ -416,6 +417,7 @@ DomainConstraintType DomainIOData DropBehavior DropOwnedStmt +DropReplicationSlotCmd DropRoleStmt DropStmt DropTableSpaceStmt