diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 4fcc4fe5aa..b1df55d7f6 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); pg_create_physical_replication_slot - pg_create_physical_replication_slot(slot_name name) + pg_create_physical_replication_slot(slot_name name, immediately_reserve boolean ) (slot_name name, xlog_position pg_lsn) @@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); slot_name. Streaming changes from a physical slot is only possible with the streaming-replication protocol - see . Corresponds to the replication protocol - command CREATE_REPLICATION_SLOT ... PHYSICAL. + command CREATE_REPLICATION_SLOT ... PHYSICAL. The optional + second parameter, when true, specifies that the LSN + for this replication slot be reserved immediately; the + is otherwise reserved on first connection from a streaming replication + client. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3190c7f7e0..ccc030fd7f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -917,6 +917,13 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( + IN slot_name name, IN immediately_reserve boolean DEFAULT false, + OUT slot_name name, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +AS 'pg_create_physical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5411e599eb..5a07e1d9a6 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -250,52 +250,7 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this and checkpoints required WAL - * could be removed before ReplicationSlotsComputeRequiredLSN() has been - * called to prevent that. In the very unlikely case that this happens - * we'll just retry. - */ - while (true) - { - XLogSegNo segno; - - /* - * Let's start with enough information if we can, so log a standby - * snapshot and start decoding at exactly that position. - */ - if (!RecoveryInProgress()) - { - XLogRecPtr flushptr; - - /* start at current insert position */ - slot->data.restart_lsn = GetXLogInsertRecPtr(); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - slot->data.restart_lsn = GetRedoRecPtr(); - - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno); - if (XLogGetLastRemovedSegno() < segno) - break; - } - + ReplicationSlotReserveWal(); /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1f013af887..c66619cda2 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -40,6 +40,7 @@ #include #include "access/transam.h" +#include "access/xlog_internal.h" #include "common/string.h" #include "miscadmin.h" #include "replication/slot.h" @@ -781,6 +782,76 @@ CheckSlotRequirements(void) errmsg("replication slots can only be used if wal_level >= archive"))); } +/* + * Reserve WAL for the currently active slot. + * + * Compute and set restart_lsn in a manner that's appropriate for the type of + * the slot and concurrency safe. + */ +void +ReplicationSlotReserveWal(void) +{ + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + + /* + * The replication slot mechanism is used to prevent removal of required + * WAL. As there is no interlock between this routine and checkpoints, WAL + * segments could concurrently be removed when a now stale return value of + * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that + * this happens we'll just retry. + */ + while (true) + { + XLogSegNo segno; + + /* + * For logical slots log a standby snapshot and start logical decoding + * at exactly that position. That allows the slot to start up more + * quickly. + * + * That's not needed (or indeed helpful) for physical slots as they'll + * start replay at the last logged checkpoint anyway. Instead return + * the location of the last redo LSN. While that slightly increases + * the chance that we have to retry, it's where a base backup has to + * start replay at. + */ + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* start at current insert position */ + slot->data.restart_lsn = GetXLogInsertRecPtr(); + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } + else + { + slot->data.restart_lsn = GetRedoRecPtr(); + } + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + XLByteToSeg(slot->data.restart_lsn, segno); + if (XLogGetLastRemovedSegno() < segno) + break; + } +} + /* * Flush all replication slots to disk. * diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ecfcb0754b..2dc6827990 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -40,6 +40,7 @@ Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); + bool immediately_reserve = PG_GETARG_BOOL(1); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; - nulls[1] = true; + + if (immediately_reserve) + { + /* Reserve WAL as the user asked for it */ + ReplicationSlotReserveWal(); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); + nulls[1] = false; + } + else + { + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[1] = true; + } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 8cd6772987..b58fe46447 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201508101 +#define CATALOG_VERSION_NO 201508111 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 51639624a9..ddf7c67371 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 DESCR("SP-GiST support for quad tree over range"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 367ef0a38d..20dd7a283c 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);