From 3f811c2d6f51b13b71adff99e82894dd48cee055 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 10 Aug 2015 13:28:18 +0200 Subject: [PATCH] Add confirmed_flush column to pg_replication_slots. There's no reason not to expose both restart_lsn and confirmed_flush since they have rather distinct meanings. The former is the oldest WAL still required and valid for both physical and logical slots, whereas the latter is the location up to which a logical slot's consumer has confirmed receiving data. Most of the time a slot will require older WAL (i.e. restart_lsn) than the confirmed position (i.e. confirmed_flush_lsn). Author: Marko Tiikkaja, editorialized by me Discussion: 559D110B.1020109@joh.to --- contrib/test_decoding/expected/ddl.out | 4 ++-- doc/src/sgml/catalogs.sgml | 11 +++++++++++ doc/src/sgml/high-availability.sgml | 6 +++--- doc/src/sgml/logicaldecoding.sgml | 8 ++++---- src/backend/catalog/system_views.sql | 3 ++- src/backend/replication/slotfuncs.c | 9 ++++++++- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.h | 2 +- src/test/regress/expected/rules.out | 5 +++-- 9 files changed, 35 insertions(+), 15 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 728798b3b7..11b2d9c07f 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -673,7 +673,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn ------------+--------+-----------+--------+----------+--------+------------+------+--------------+------------- + slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn +-----------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+--------------------- (0 rows) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 12471d1c6e..1e895e4999 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5577,6 +5577,17 @@ automatically removed during checkpoints. + + + confirmed_flush + pg_lsn + + The address (LSN) up to which the logical + slot's consumer has confirmed receiving data. Data older than this is + not available anymore. NULL for physical slots. + + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index d2f7fec523..37aa0470ab 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -934,9 +934,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot'); node_a_slot | postgres=# SELECT * FROM pg_replication_slots; - slot_name | slot_type | datoid | database | active | xmin | restart_lsn --------------+-----------+--------+----------+--------+------+------------- - node_a_slot | physical | | | f | | + slot_name | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn +-------------+-----------+--------+----------+--------+------+-------------+--------------------- + node_a_slot | physical | | | f | | | (1 row) To configure the standby to use this slot, primary_slot_name diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5fa2f77ea8..4f57765e91 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', ' regression_slot | 0/16B1970 (1 row) -postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots; - slot_name | plugin | slot_type | database | active | restart_lsn ------------------+---------------+-----------+----------+--------+------------- - regression_slot | test_decoding | logical | postgres | f | 0/16A4408 +postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; + slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn +-----------------+---------------+-----------+----------+--------+-------------+----------------- + regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440 (1 row) postgres=# -- There are no changes to see yet diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c0bd6fa96b..3190c7f7e0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -676,7 +676,8 @@ CREATE VIEW pg_replication_slots AS L.active_pid, L.xmin, L.catalog_xmin, - L.restart_lsn + L.restart_lsn, + L.confirmed_flush_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 348c7fe9fc..ecfcb0754b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 9 +#define PG_GET_REPLICATION_SLOTS_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -206,6 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; + XLogRecPtr confirmed_flush_lsn; pid_t active_pid; Oid database; NameData slot_name; @@ -224,6 +225,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) catalog_xmin = slot->data.catalog_xmin; database = slot->data.database; restart_lsn = slot->data.restart_lsn; + confirmed_flush_lsn = slot->data.confirmed_flush; namecpy(&slot_name, &slot->data.name); namecpy(&plugin, &slot->data.plugin); @@ -273,6 +275,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (confirmed_flush_lsn != InvalidXLogRecPtr) + values[i++] = LSNGetDatum(confirmed_flush_lsn); + else + nulls[i++] = true; + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 19c40e4978..8cd6772987 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201508062 +#define CATALOG_VERSION_NO 201508101 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index c9fe0f8778..51639624a9 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5197,7 +5197,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6206c819cd..44c6740582 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1416,8 +1416,9 @@ pg_replication_slots| SELECT l.slot_name, l.active_pid, l.xmin, l.catalog_xmin, - l.restart_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn) + l.restart_lsn, + l.confirmed_flush_lsn + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,