diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 728798b3b7..11b2d9c07f 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -673,7 +673,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn ------------+--------+-----------+--------+----------+--------+------------+------+--------------+------------- + slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn +-----------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+--------------------- (0 rows) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 12471d1c6e..1e895e4999 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5577,6 +5577,17 @@ automatically removed during checkpoints. + + + confirmed_flush + pg_lsn + + The address (LSN) up to which the logical + slot's consumer has confirmed receiving data. Data older than this is + not available anymore. NULL for physical slots. + + + diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index d2f7fec523..37aa0470ab 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -934,9 +934,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot'); node_a_slot | postgres=# SELECT * FROM pg_replication_slots; - slot_name | slot_type | datoid | database | active | xmin | restart_lsn --------------+-----------+--------+----------+--------+------+------------- - node_a_slot | physical | | | f | | + slot_name | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn +-------------+-----------+--------+----------+--------+------+-------------+--------------------- + node_a_slot | physical | | | f | | | (1 row) To configure the standby to use this slot, primary_slot_name diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5fa2f77ea8..4f57765e91 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', ' regression_slot | 0/16B1970 (1 row) -postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots; - slot_name | plugin | slot_type | database | active | restart_lsn ------------------+---------------+-----------+----------+--------+------------- - regression_slot | test_decoding | logical | postgres | f | 0/16A4408 +postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; + slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn +-----------------+---------------+-----------+----------+--------+-------------+----------------- + regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440 (1 row) postgres=# -- There are no changes to see yet diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c0bd6fa96b..3190c7f7e0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -676,7 +676,8 @@ CREATE VIEW pg_replication_slots AS L.active_pid, L.xmin, L.catalog_xmin, - L.restart_lsn + L.restart_lsn, + L.confirmed_flush_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 348c7fe9fc..ecfcb0754b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 9 +#define PG_GET_REPLICATION_SLOTS_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -206,6 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; + XLogRecPtr confirmed_flush_lsn; pid_t active_pid; Oid database; NameData slot_name; @@ -224,6 +225,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) catalog_xmin = slot->data.catalog_xmin; database = slot->data.database; restart_lsn = slot->data.restart_lsn; + confirmed_flush_lsn = slot->data.confirmed_flush; namecpy(&slot_name, &slot->data.name); namecpy(&plugin, &slot->data.plugin); @@ -273,6 +275,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + if (confirmed_flush_lsn != InvalidXLogRecPtr) + values[i++] = LSNGetDatum(confirmed_flush_lsn); + else + nulls[i++] = true; + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 19c40e4978..8cd6772987 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201508062 +#define CATALOG_VERSION_NO 201508101 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index c9fe0f8778..51639624a9 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5197,7 +5197,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6206c819cd..44c6740582 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1416,8 +1416,9 @@ pg_replication_slots| SELECT l.slot_name, l.active_pid, l.xmin, l.catalog_xmin, - l.restart_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn) + l.restart_lsn, + l.confirmed_flush_lsn + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,