Add confirmed_flush column to pg_replication_slots.

There's no reason not to expose both restart_lsn and confirmed_flush
since they have rather distinct meanings. The former is the oldest WAL
still required and valid for both physical and logical slots, whereas
the latter is the location up to which a logical slot's consumer has
confirmed receiving data. Most of the time a slot will require older
WAL (i.e. restart_lsn) than the confirmed
position (i.e. confirmed_flush_lsn).

Author: Marko Tiikkaja, editorialized by me
Discussion: 559D110B.1020109@joh.to
This commit is contained in:
Andres Freund 2015-08-10 13:28:18 +02:00
parent 5c4b25acce
commit 3f811c2d6f
9 changed files with 35 additions and 15 deletions

View File

@ -673,7 +673,7 @@ SELECT pg_drop_replication_slot('regression_slot');
/* check that the slot is gone */
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn
-----------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------
slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------+-----------+--------+----------+--------+------------+------+--------------+-------------+---------------------
(0 rows)

View File

@ -5577,6 +5577,17 @@
automatically removed during checkpoints.
</entry>
</row>
<row>
<entry><structfield>confirmed_flush</structfield></entry>
<entry><type>pg_lsn</type></entry>
<entry></entry>
<entry>The address (<literal>LSN</literal>) up to which the logical
slot's consumer has confirmed receiving data. Data older than this is
not available anymore. <literal>NULL</> for physical slots.
</entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -934,9 +934,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
node_a_slot |
postgres=# SELECT * FROM pg_replication_slots;
slot_name | slot_type | datoid | database | active | xmin | restart_lsn
-------------+-----------+--------+----------+--------+------+-------------
node_a_slot | physical | | | f | |
slot_name | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn
-------------+-----------+--------+----------+--------+------+-------------+---------------------
node_a_slot | physical | | | f | | |
(1 row)
</programlisting>
To configure the standby to use this slot, <varname>primary_slot_name</>

View File

@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
regression_slot | 0/16B1970
(1 row)
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn
-----------------+---------------+-----------+----------+--------+-------------
regression_slot | test_decoding | logical | postgres | f | 0/16A4408
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+-------------+-----------------
regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
(1 row)
postgres=# -- There are no changes to see yet

View File

@ -676,7 +676,8 @@ CREATE VIEW pg_replication_slots AS
L.active_pid,
L.xmin,
L.catalog_xmin,
L.restart_lsn
L.restart_lsn,
L.confirmed_flush_lsn
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

View File

@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 9
#define PG_GET_REPLICATION_SLOTS_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@ -206,6 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
TransactionId xmin;
TransactionId catalog_xmin;
XLogRecPtr restart_lsn;
XLogRecPtr confirmed_flush_lsn;
pid_t active_pid;
Oid database;
NameData slot_name;
@ -224,6 +225,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
catalog_xmin = slot->data.catalog_xmin;
database = slot->data.database;
restart_lsn = slot->data.restart_lsn;
confirmed_flush_lsn = slot->data.confirmed_flush;
namecpy(&slot_name, &slot->data.name);
namecpy(&plugin, &slot->data.plugin);
@ -273,6 +275,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
nulls[i++] = true;
if (confirmed_flush_lsn != InvalidXLogRecPtr)
values[i++] = LSNGetDatum(confirmed_flush_lsn);
else
nulls[i++] = true;
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201508062
#define CATALOG_VERSION_NO 201508101
#endif

View File

@ -5197,7 +5197,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
DESCR("set up a logical replication slot");

View File

@ -1416,8 +1416,9 @@ pg_replication_slots| SELECT l.slot_name,
l.active_pid,
l.xmin,
l.catalog_xmin,
l.restart_lsn
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn)
l.restart_lsn,
l.confirmed_flush_lsn
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,