diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9eca63cbbf..3f92482b42 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6270,7 +6270,7 @@ StartupXLOG(void) * Initialize replication slots, before there's a chance to remove * required resources. */ - StartupReplicationSlots(checkPoint.redo); + StartupReplicationSlots(); /* * Startup logical state, needs to be setup now so we have proper data diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b82580fbcd..9eb5cd5ee4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -125,7 +125,7 @@ StartupDecodingContext(List *output_plugin_options, slot = MyReplicationSlot; context = AllocSetContextCreate(CurrentMemoryContext, - "Changeset Extraction Context", + "Logical Decoding Context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index ee0c7c07a9..5671ac1d14 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -829,7 +829,7 @@ CheckPointReplicationSlots(void) * needs to be run before we start crash recovery. */ void -StartupReplicationSlots(XLogRecPtr checkPointRedo) +StartupReplicationSlots(void) { DIR *replication_dir; struct dirent *replication_de; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index dc94f504ee..bd4701f97d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -46,13 +46,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) HeapTuple tuple; Datum result; - check_permissions(); - - CheckSlotRequirements(); + Assert(!MyReplicationSlot); if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); + check_permissions(); + + CheckSlotRequirements(); + /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); @@ -87,6 +89,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Datum values[2]; bool nulls[2]; + Assert(!MyReplicationSlot); + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); @@ -94,10 +98,11 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) CheckLogicalDecodingRequirements(); - Assert(!MyReplicationSlot); - /* - * Acquire a logical decoding slot, this will check for conflicting names. + * Acquire a logical decoding slot, this will check for conflicting + * names. Initially create it as ephemeral - that allows us to nicely + * handle errors during initialization because it'll get dropped if this + * transaction fails. We'll make it persistent at the end. */ ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 088ee2c097..318979342e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -781,6 +781,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) else { CheckLogicalDecodingRequirements(); + /* + * Initially create the slot as ephemeral - that allows us to nicely + * handle errors during initialization because it'll get dropped if + * this transaction fails. We'll make it persistent at the end. + */ ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL); } @@ -1682,8 +1687,8 @@ ProcessStandbyHSFeedbackMessage(void) * If we're using a replication slot we reserve the xmin via that, * otherwise via the walsender's PGXACT entry. * - * XXX: It might make sense to introduce ephemeral slots and always use - * the slot mechanism. + * XXX: It might make sense to generalize the ephemeral slot concept and + * always use the slot mechanism to handle the feedback xmin. */ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ PhysicalReplicationSlotNewXmin(feedbackXmin); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 341e829bbb..c129a4a771 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -164,7 +164,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); -extern void StartupReplicationSlots(XLogRecPtr checkPointRedo); +extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void);