Introduce replication progress tracking infrastructure.

When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
  e.g. to avoid loops in bi-directional replication setups

The solution to these problems, as implemented here, consist out of
three parts:

1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
   replication origin, how far replay has progressed in a efficient and
   crash safe manner.
3) The ability to filter out changes performed on the behest of a
   replication origin during logical decoding; this allows complex
   replication topologies. E.g. by filtering all replayed changes out.

Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated.  We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.

This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL.  Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.

For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.

Bumps both catversion and wal page magic.

Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
    20140923182422.GA15776@alap3.anarazel.de,
    20131114172632.GE7522@alap2.anarazel.de
This commit is contained in:
Andres Freund 2015-04-29 19:30:53 +02:00
parent c6e96a2f98
commit 5aa2350426
52 changed files with 2766 additions and 89 deletions

View File

@ -37,7 +37,8 @@ submake-isolation:
submake-test_decoding: submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding $(MAKE) -C $(top_builddir)/contrib/test_decoding
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
binary prepared replorigin
regresscheck: all | submake-regress submake-test_decoding temp-install regresscheck: all | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output $(MKDIR_P) regression_output

View File

@ -0,0 +1,141 @@
-- predictability
SET synchronous_commit = on;
CREATE TABLE origin_tbl(id serial primary key, data text);
CREATE TABLE target_tbl(id serial primary key, data text);
SELECT pg_replication_origin_create('test_decoding: regression_slot');
pg_replication_origin_create
------------------------------
1
(1 row)
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
--ensure deletions work (once)
SELECT pg_replication_origin_create('test_decoding: temp');
pg_replication_origin_create
------------------------------
2
(1 row)
SELECT pg_replication_origin_drop('test_decoding: temp');
pg_replication_origin_drop
----------------------------
(1 row)
SELECT pg_replication_origin_drop('test_decoding: temp');
ERROR: cache lookup failed for replication origin 'test_decoding: temp'
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
-- origin tx
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
INSERT INTO target_tbl(data)
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- as is normal, the insert into target_tbl shows up
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
COMMIT
(5 rows)
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
-- mark session as replaying
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
pg_replication_origin_session_setup
-------------------------------------
(1 row)
-- ensure we prevent duplicate setup
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
ERROR: cannot setup replication origin when one is already setup
BEGIN;
-- setup transaction origin
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
pg_replication_origin_xact_setup
----------------------------------
(1 row)
INSERT INTO target_tbl(data)
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
COMMIT;
-- check replication progress for the session is correct
SELECT pg_replication_origin_session_progress(false);
pg_replication_origin_session_progress
----------------------------------------
0/AABBCCDD
(1 row)
SELECT pg_replication_origin_session_progress(true);
pg_replication_origin_session_progress
----------------------------------------
0/AABBCCDD
(1 row)
SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-------------------------------------
(1 row)
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
local_id | external_id | remote_lsn | ?column?
----------+--------------------------------+------------+----------
1 | test_decoding: regression_slot | 0/AABBCCDD | t
(1 row)
-- check replication progress identified by name is correct
SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
pg_replication_origin_progress
--------------------------------
0/AABBCCDD
(1 row)
SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
pg_replication_origin_progress
--------------------------------
0/AABBCCDD
(1 row)
-- ensure reset requires previously setup state
SELECT pg_replication_origin_session_reset();
ERROR: no replication origin is configured
-- and magically the replayed xact will be filtered!
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
data
------
(0 rows)
--but new original changes still show up
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
data
--------------------------------------------------------------------------------
BEGIN
table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
COMMIT
(3 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
SELECT pg_replication_origin_drop('test_decoding: regression_slot');
pg_replication_origin_drop
----------------------------
(1 row)

View File

@ -0,0 +1,64 @@
-- predictability
SET synchronous_commit = on;
CREATE TABLE origin_tbl(id serial primary key, data text);
CREATE TABLE target_tbl(id serial primary key, data text);
SELECT pg_replication_origin_create('test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('test_decoding: regression_slot');
--ensure deletions work (once)
SELECT pg_replication_origin_create('test_decoding: temp');
SELECT pg_replication_origin_drop('test_decoding: temp');
SELECT pg_replication_origin_drop('test_decoding: temp');
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-- origin tx
INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
INSERT INTO target_tbl(data)
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- as is normal, the insert into target_tbl shows up
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
-- mark session as replaying
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
-- ensure we prevent duplicate setup
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
BEGIN;
-- setup transaction origin
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
INSERT INTO target_tbl(data)
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
COMMIT;
-- check replication progress for the session is correct
SELECT pg_replication_origin_session_progress(false);
SELECT pg_replication_origin_session_progress(true);
SELECT pg_replication_origin_session_reset();
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
-- check replication progress identified by name is correct
SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
-- ensure reset requires previously setup state
SELECT pg_replication_origin_session_reset();
-- and magically the replayed xact will be filtered!
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
--but new original changes still show up
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
SELECT pg_drop_replication_slot('regression_slot');
SELECT pg_replication_origin_drop('test_decoding: regression_slot');

View File

@ -21,6 +21,7 @@
#include "replication/output_plugin.h" #include "replication/output_plugin.h"
#include "replication/logical.h" #include "replication/logical.h"
#include "replication/origin.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -43,6 +44,7 @@ typedef struct
bool include_timestamp; bool include_timestamp;
bool skip_empty_xacts; bool skip_empty_xacts;
bool xact_wrote_changes; bool xact_wrote_changes;
bool only_local;
} TestDecodingData; } TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
static void pg_decode_change(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel, ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change); ReorderBufferChange *change);
static bool pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
void void
_PG_init(void) _PG_init(void)
@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pg_decode_begin_txn; cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change; cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn; cb->commit_cb = pg_decode_commit_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown; cb->shutdown_cb = pg_decode_shutdown;
} }
@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_xids = true; data->include_xids = true;
data->include_timestamp = false; data->include_timestamp = false;
data->skip_empty_xacts = false; data->skip_empty_xacts = false;
data->only_local = false;
ctx->output_plugin_private = data; ctx->output_plugin_private = data;
@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"", errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname))); strVal(elem->arg), elem->defname)));
} }
else if (strcmp(elem->defname, "only-local") == 0)
{
if (elem->arg == NULL)
data->only_local = true;
else if (!parse_bool(strVal(elem->arg), &data->only_local))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else else
{ {
ereport(ERROR, ereport(ERROR,
@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true); OutputPluginWrite(ctx, true);
} }
static bool
pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->only_local && origin_id != InvalidRepOriginId)
return true;
return false;
}
/* /*
* Print literal `outputstr' already represented as string of type `typid' * Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'. * into stringbuf `s'.

View File

@ -238,6 +238,16 @@
<entry>query rewrite rules</entry> <entry>query rewrite rules</entry>
</row> </row>
<row>
<entry><link linkend="catalog-pg-replication-origin"><structname>pg_replication_origin</structname></link></entry>
<entry>registered replication origins</entry>
</row>
<row>
<entry><link linkend="catalog-pg-replication-origin-status"><structname>pg_replication_origin_status</structname></link></entry>
<entry>information about replication origins, including replication progress</entry>
</row>
<row> <row>
<entry><link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link></entry> <entry><link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link></entry>
<entry>replication slot information</entry> <entry>replication slot information</entry>
@ -5337,6 +5347,119 @@
</sect1> </sect1>
<sect1 id="catalog-pg-replication-origin">
<title><structname>pg_replication_origin</structname></title>
<indexterm zone="catalog-pg-replication-origin">
<primary>pg_replication_origin</primary>
</indexterm>
<para>
The <structname>pg_replication_origin</structname> catalog contains
all replication origins created. For more on replication origins
see <xref linkend="replication-origins">.
</para>
<table>
<title><structname>pg_replication_origin</structname> Columns</title>
<tgroup cols="4">
<thead>
<row>
<entry>Name</entry>
<entry>Type</entry>
<entry>References</entry>
<entry>Description</entry>
</row>
</thead>
<tbody>
<row>
<entry><structfield>roident</structfield></entry>
<entry><type>Oid</type></entry>
<entry></entry>
<entry>A unique, cluster-wide identifier for the replication
origin. Should never leave the system.</entry>
</row>
<row>
<entry><structfield>roname</structfield></entry>
<entry><type>text</type></entry>
<entry></entry>
<entry>The external, user defined, name of a replication
origin.</entry>
</row>
</tbody>
</tgroup>
</table>
</sect1>
<sect1 id="catalog-pg-replication-origin-status">
<title><structname>pg_replication_origin_status</structname></title>
<indexterm zone="catalog-pg-replication-origin-status">
<primary>pg_replication_origin_status</primary>
</indexterm>
<para>
The <structname>pg_replication_origin_status</structname> view
contains information about how far replay for a certain origin has
progressed. For more on replication origins
see <xref linkend="replication-origins">.
</para>
<table>
<title><structname>pg_replication_origin_status</structname> Columns</title>
<tgroup cols="4">
<thead>
<row>
<entry>Name</entry>
<entry>Type</entry>
<entry>References</entry>
<entry>Description</entry>
</row>
</thead>
<tbody>
<row>
<entry><structfield>local_id</structfield></entry>
<entry><type>Oid</type></entry>
<entry><literal><link linkend="catalog-pg-replication-origin"><structname>pg_replication_origin</structname></link>.roident</literal></entry>
<entry>internal node identifier</entry>
</row>
<row>
<entry><structfield>external_id</structfield></entry>
<entry><type>text</type></entry>
<entry><literal><link linkend="catalog-pg-replication-origin"><structname>pg_replication_origin</structname></link>.roname</literal></entry>
<entry>external node identifier</entry>
</row>
<row>
<entry><structfield>remote_lsn</structfield></entry>
<entry><type>pg_lsn</type></entry>
<entry></entry>
<entry>The origin node's LSN up to which data has been replicated.</entry>
</row>
<row>
<entry><structfield>local_lsn</structfield></entry>
<entry><type>pg_lsn</type></entry>
<entry></entry>
<entry>This node's LSN that at
which <literal>remote_lsn</literal> has been replicated. Used to
flush commit records before persisting data to disk when using
asynchronous commits.</entry>
</row>
</tbody>
</tgroup>
</table>
</sect1>
<sect1 id="catalog-pg-replication-slots"> <sect1 id="catalog-pg-replication-slots">
<title><structname>pg_replication_slots</structname></title> <title><structname>pg_replication_slots</structname></title>

View File

@ -95,6 +95,7 @@
<!ENTITY fdwhandler SYSTEM "fdwhandler.sgml"> <!ENTITY fdwhandler SYSTEM "fdwhandler.sgml">
<!ENTITY custom-scan SYSTEM "custom-scan.sgml"> <!ENTITY custom-scan SYSTEM "custom-scan.sgml">
<!ENTITY logicaldecoding SYSTEM "logicaldecoding.sgml"> <!ENTITY logicaldecoding SYSTEM "logicaldecoding.sgml">
<!ENTITY replication-origins SYSTEM "replication-origins.sgml">
<!ENTITY protocol SYSTEM "protocol.sgml"> <!ENTITY protocol SYSTEM "protocol.sgml">
<!ENTITY sources SYSTEM "sources.sgml"> <!ENTITY sources SYSTEM "sources.sgml">
<!ENTITY storage SYSTEM "storage.sgml"> <!ENTITY storage SYSTEM "storage.sgml">

View File

@ -16879,11 +16879,13 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
<title>Replication Functions</title> <title>Replication Functions</title>
<para> <para>
The functions shown in <xref linkend="functions-replication-table"> are The functions shown
for controlling and interacting with replication features. in <xref linkend="functions-replication-table"> are for
See <xref linkend="streaming-replication"> controlling and interacting with replication features.
and <xref linkend="streaming-replication-slots"> for information about the See <xref linkend="streaming-replication">,
underlying features. Use of these functions is restricted to superusers. <xref linkend="streaming-replication-slots">, <xref linkend="replication-origins">
for information about the underlying features. Use of these
functions is restricted to superusers.
</para> </para>
<para> <para>
@ -17040,6 +17042,195 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
on future calls. on future calls.
</entry> </entry>
</row> </row>
<row id="pg-replication-origin-create">
<entry>
<indexterm>
<primary>pg_replication_origin_create</primary>
</indexterm>
<literal><function>pg_replication_origin_create(<parameter>node_name</parameter> <type>text</type>)</function></literal>
</entry>
<entry>
<parameter>internal_id</parameter> <type>oid</type>
</entry>
<entry>
Create a replication origin with the the passed in external
name, and create an internal id for it.
</entry>
</row>
<row id="pg-replication-origin-drop">
<entry>
<indexterm>
<primary>pg_replication_origin_drop</primary>
</indexterm>
<literal><function>pg_replication_origin_drop(<parameter>node_name</parameter> <type>text</type>)</function></literal>
</entry>
<entry>
void
</entry>
<entry>
Delete a previously created replication origin, including the
associated replay progress.
</entry>
</row>
<row>
<entry>
<indexterm>
<primary>pg_replication_origin_oid</primary>
</indexterm>
<literal><function>pg_replication_origin_oid(<parameter>node_name</parameter> <type>text</type>)</function></literal>
</entry>
<entry>
<parameter>internal_id</parameter> <type>oid</type>
</entry>
<entry>
Lookup replication origin by name and return the internal
oid. If no corresponding replication origin is found a error
is thrown.
</entry>
</row>
<row id="pg-replication-origin-session-setup">
<entry>
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
<literal><function>pg_replication_origin_setup_session(<parameter>node_name</parameter> <type>text</type>)</function></literal>
</entry>
<entry>
void
</entry>
<entry>
Configure the current session to be replaying from the passed in
origin, allowing replay progress to be tracked. Use
<function>pg_replication_origin_session_reset</function> to revert.
Can only be used if no previous origin is configured.
</entry>
</row>
<row>
<entry>
<indexterm>
<primary>pg_replication_origin_session_reset</primary>
</indexterm>
<literal><function>pg_replication_origin_session_reset()</function></literal>
</entry>
<entry>
void
</entry>
<entry>
Cancel the effects
of <function>pg_replication_origin_session_setup()</function>.
</entry>
</row>
<row>
<entry>
<indexterm>
<primary>pg_replication_session_is_setup</primary>
</indexterm>
<literal><function>pg_replication_session_is_setup()</function></literal>
</entry>
<entry>
bool
</entry>
<entry>
Has a replication origin been configured in the current session?
</entry>
</row>
<row id="pg-replication-origin-session-progress">
<entry>
<indexterm>
<primary>pg_replication_origin_session_progress</primary>
</indexterm>
<literal><function>pg_replication_origin_progress(<parameter>flush</parameter> <type>bool</type>)</function></literal>
</entry>
<entry>
pg_lsn
</entry>
<entry>
Return the replay position for the replication origin configured in
the current session. The parameter <parameter>flush</parameter>
determines whether the corresponding local transaction will be
guaranteed to have been flushed to disk or not.
</entry>
</row>
<row id="pg-replication-origin-xact-setup">
<entry>
<indexterm>
<primary>pg_replication_origin_xact_setup</primary>
</indexterm>
<literal><function>pg_replication_origin_xact_setup(<parameter>origin_lsn</parameter> <type>pg_lsn</type>, <parameter>origin_timestamp</parameter> <type>timestamptz</type>)</function></literal>
</entry>
<entry>
void
</entry>
<entry>
Mark the current transaction to be replaying a transaction that has
committed at the passed in <acronym>LSN</acronym> and timestamp. Can
only be called when a replication origin has previously been
configured using
<function>pg_replication_origin_session_setup()</function>.
</entry>
</row>
<row id="pg-replication-origin-xact-reset">
<entry>
<indexterm>
<primary>pg_replication_origin_xact_reset</primary>
</indexterm>
<literal><function>pg_replication_origin_xact_reset()</function></literal>
</entry>
<entry>
void
</entry>
<entry>
Cancel the effects of
<function>pg_replication_origin_xact_setup()</function>.
</entry>
</row>
<row>
<entry>
<indexterm>
<primary>pg_replication_origin_advance</primary>
</indexterm>
<literal>pg_replication_origin_advance<function>(<parameter>node_name</parameter> <type>text</type>, <parameter>pos</parameter> <type>pg_lsn</type>)</function></literal>
</entry>
<entry>
void
</entry>
<entry>
Set replication progress for the passed in node to the passed in
position. This primarily is useful for setting up the initial position
or a new position after configuration changes and similar. Be aware
that careless use of this function can lead to inconsistently
replicated data.
</entry>
</row>
<row id="pg-replication-origin-progress">
<entry>
<indexterm>
<primary>pg_replication_origin_progress</primary>
</indexterm>
<literal><function>pg_replication_origin_progress(<parameter>node_name</parameter> <type>text</type>, <parameter>flush</parameter> <type>bool</type>)</function></literal>
</entry>
<entry>
pg_lsn
</entry>
<entry>
Return the replay position for the passed in replication origin. The
parameter <parameter>flush</parameter> determines whether the
corresponding local transaction will be guaranteed to have been
flushed to disk or not.
</entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>

View File

@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb; LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb; LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb; LogicalDecodeCommitCB commit_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks; } OutputPluginCallbacks;
@ -370,7 +371,8 @@ typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
</programlisting> </programlisting>
The <function>begin_cb</function>, <function>change_cb</function> The <function>begin_cb</function>, <function>change_cb</function>
and <function>commit_cb</function> callbacks are required, and <function>commit_cb</function> callbacks are required,
while <function>startup_cb</function> while <function>startup_cb</function>,
<function>filter_by_origin_cb</function>
and <function>shutdown_cb</function> are optional. and <function>shutdown_cb</function> are optional.
</para> </para>
</sect2> </sect2>
@ -569,6 +571,37 @@ typedef void (*LogicalDecodeChangeCB) (
</para> </para>
</note> </note>
</sect3> </sect3>
<sect3 id="logicaldecoding-output-plugin-filter-by-origin">
<title>Origin Filter Callback</title>
<para>
The optional <function>filter_by_origin_cb</function> callback
is called to determine wheter data that has been replayed
from <parameter>origin_id</parameter> is of interest to the
output plugin.
<programlisting>
typedef bool (*LogicalDecodeChangeCB) (
struct LogicalDecodingContext *ctx,
RepNodeId origin_id
);
</programlisting>
The <parameter>ctx</parameter> parameter has the same contents
as for the other callbacks. No information but the origin is
available. To signal that changes originating on the passed in
node are irrelevant, return true, causing them to be filtered
away; false otherwise. The other callbacks will not be called
for transactions and changes that have been filtered away.
</para>
<para>
This is useful when implementing cascading or multi directional
replication solutions. Filtering by the origin allows to
prevent replicating the same changes back and forth in such
setups. While transactions and changes also carry information
about the origin, filtering via this callback is noticeably
more efficient.
</para>
</sect3>
</sect2> </sect2>
<sect2 id="logicaldecoding-output-plugin-output"> <sect2 id="logicaldecoding-output-plugin-output">

View File

@ -220,6 +220,7 @@
&spi; &spi;
&bgworker; &bgworker;
&logicaldecoding; &logicaldecoding;
&replication-origins;
</part> </part>

View File

@ -0,0 +1,93 @@
<!-- doc/src/sgml/replication-origins.sgml -->
<chapter id="replication-origins">
<title>Replication Progress Tracking</title>
<indexterm zone="replication-origins">
<primary>Replication Progress Tracking</primary>
</indexterm>
<indexterm zone="replication-origins">
<primary>Replication Origins</primary>
</indexterm>
<para>
Replication origins are intended to make it easier to implement
logical replication solutions on top
of <xref linkend="logicaldecoding">. They provide a solution to two
common problems:
<itemizedlist>
<listitem><para>How to safely keep track of replication progress</para></listitem>
<listitem><para>How to change replication behavior, based on the
origin of a row; e.g. to avoid loops in bi-directional replication
setups</para></listitem>
</itemizedlist>
</para>
<para>
Replication origins consist out of a name and a oid. The name, which
is what should be used to refer to the origin across systems, is
free-form text. It should be used in a way that makes conflicts
between replication origins created by different replication
solutions unlikely; e.g. by prefixing the replication solution's
name to it. The oid is used only to avoid having to store the long
version in situations where space efficiency is important. It should
never be shared between systems.
</para>
<para>
Replication origins can be created using the
<link linkend="pg-replication-origin-create"><function>pg_replication_origin_create()</function></link>;
dropped using
<link linkend="pg-replication-origin-drop"><function>pg_replication_origin_drop()</function></link>;
and seen in the
<link linkend="catalog-pg-replication-origin"><structname>pg_replication_origin</structname></link>
catalog.
</para>
<para>
When replicating from one system to another (independent of the fact that
those two might be in the same cluster, or even same database) one
nontrivial part of building a replication solution is to keep track of
replay progress in a safe manner. When the applying process, or the whole
cluster, dies, it needs to be possible to find out up to where data has
successfully been replicated. Naive solutions to this like updating a row in
a table for every replayed transaction have problems like runtime overhead
bloat.
</para>
<para>
Using the replication origin infrastructure a session can be
marked as replaying from a remote node (using the
<link linkend="pg-replication-origin-session-setup"><function>pg_replication_origin_session_setup()</function></link>
function. Additionally the <acronym>LSN</acronym> and commit
timestamp of every source transaction can be configured on a per
transaction basis using
<link linkend="pg-replication-origin-xact-setup"><function>pg_replication_origin_xact-setup()</function></link>.
If that's done replication progress will be persist in a crash safe
manner. Replay progress for all replication origins can be seen in the
<link linkend="catalog-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname>
</link> view. A individual origin's progress, e.g. when resuming
replication, can be acquired using
<link linkend="pg-replication-origin-progress"><function>pg_replication_origin_progress()</function></link>
for any origin or
<link linkend="pg-replication-origin-session-progress"><function>pg_replication_origin_session_progress()</function></link>
for the origin configured in the current session.
</para>
<para>
In more complex replication topologies than replication from exactly one
system to one other, another problem can be that, that it is hard to avoid
replicating replayed rows again. That can lead both to cycles in the
replication and inefficiencies. Replication origins provide a optional
mechanism to recognize and prevent that. When configured using the functions
referenced in the previous paragraph, every change and transaction passed to
output plugin callbacks (see <xref linkend="logicaldecoding-output-plugin">)
generated by the session is tagged with the replication origin of the
generating session. This allows to treat them differently in the output
plugin, e.g. ignoring all but locally originating rows. Additionally
the <link linkend="logicaldecoding-output-plugin-filter-by-origin">
<function>filter_by_origin_cb</function></link> callback can be used
to filter the logical decoding change stream based on the
source. While less flexible, filtering via that callback is
considerably more efficient.
</para>
</chapter>

View File

@ -2189,6 +2189,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
(char *) heaptup->t_data + SizeofHeapTupleHeader, (char *) heaptup->t_data + SizeofHeapTupleHeader,
heaptup->t_len - SizeofHeapTupleHeader); heaptup->t_len - SizeofHeapTupleHeader);
/* filtering by origin on a row level is much more efficient */
XLogIncludeOrigin();
recptr = XLogInsert(RM_HEAP_ID, info); recptr = XLogInsert(RM_HEAP_ID, info);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
@ -2499,6 +2502,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
XLogRegisterBufData(0, tupledata, totaldatalen); XLogRegisterBufData(0, tupledata, totaldatalen);
/* filtering by origin on a row level is much more efficient */
XLogIncludeOrigin();
recptr = XLogInsert(RM_HEAP2_ID, info); recptr = XLogInsert(RM_HEAP2_ID, info);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
@ -2920,6 +2927,9 @@ l1:
- SizeofHeapTupleHeader); - SizeofHeapTupleHeader);
} }
/* filtering by origin on a row level is much more efficient */
XLogIncludeOrigin();
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
@ -4650,6 +4660,8 @@ failed:
tuple->t_data->t_infomask2); tuple->t_data->t_infomask2);
XLogRegisterData((char *) &xlrec, SizeOfHeapLock); XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
/* we don't decode row locks atm, so no need to log the origin */
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
@ -5429,6 +5441,8 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen); XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
/* inplace updates aren't decoded atm, don't log the origin */
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
@ -6787,6 +6801,9 @@ log_heap_update(Relation reln, Buffer oldbuf,
old_key_tuple->t_len - SizeofHeapTupleHeader); old_key_tuple->t_len - SizeofHeapTupleHeader);
} }
/* filtering by origin on a row level is much more efficient */
XLogIncludeOrigin();
recptr = XLogInsert(RM_HEAP_ID, info); recptr = XLogInsert(RM_HEAP_ID, info);
return recptr; return recptr;
@ -6860,6 +6877,8 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid); XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
/* will be looked at irrespective of origin */
recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID); recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
return recptr; return recptr;

View File

@ -9,8 +9,8 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \ OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
hashdesc.o heapdesc.o \ hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \ replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,61 @@
/*-------------------------------------------------------------------------
*
* replorigindesc.c
* rmgr descriptor routines for replication/logical/replication_origin.c
*
* Portions Copyright (c) 2015, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/backend/access/rmgrdesc/replorigindesc.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "replication/origin.h"
void
replorigin_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info)
{
case XLOG_REPLORIGIN_SET:
{
xl_replorigin_set *xlrec;
xlrec = (xl_replorigin_set *) rec;
appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
xlrec->node_id,
(uint32) (xlrec->remote_lsn >> 32),
(uint32) xlrec->remote_lsn,
xlrec->force);
break;
}
case XLOG_REPLORIGIN_DROP:
{
xl_replorigin_drop *xlrec;
xlrec = (xl_replorigin_drop *) rec;
appendStringInfo(buf, "drop %u", xlrec->node_id);
break;
}
}
}
const char *
replorigin_identify(uint8 info)
{
switch (info)
{
case XLOG_REPLORIGIN_SET:
return "SET";
case XLOG_REPLORIGIN_DROP:
return "DROP";
default:
return NULL;
}
}

View File

@ -101,6 +101,16 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
data += sizeof(xl_xact_twophase); data += sizeof(xl_xact_twophase);
} }
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
xl_xact_origin *xl_origin = (xl_xact_origin *) data;
parsed->origin_lsn = xl_origin->origin_lsn;
parsed->origin_timestamp = xl_origin->origin_timestamp;
data += sizeof(xl_xact_origin);
}
} }
void void
@ -156,7 +166,7 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
} }
static void static void
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec) xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
{ {
xl_xact_parsed_commit parsed; xl_xact_parsed_commit parsed;
int i; int i;
@ -218,6 +228,15 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
if (XactCompletionForceSyncCommit(parsed.xinfo)) if (XactCompletionForceSyncCommit(parsed.xinfo))
appendStringInfo(buf, "; sync"); appendStringInfo(buf, "; sync");
if (parsed.xinfo & XACT_XINFO_HAS_ORIGIN)
{
appendStringInfo(buf, "; origin: node %u, lsn %X/%X, at %s",
origin_id,
(uint32)(parsed.origin_lsn >> 32),
(uint32)parsed.origin_lsn,
timestamptz_to_str(parsed.origin_timestamp));
}
} }
static void static void
@ -274,7 +293,8 @@ xact_desc(StringInfo buf, XLogReaderState *record)
{ {
xl_xact_commit *xlrec = (xl_xact_commit *) rec; xl_xact_commit *xlrec = (xl_xact_commit *) rec;
xact_desc_commit(buf, XLogRecGetInfo(record), xlrec); xact_desc_commit(buf, XLogRecGetInfo(record), xlrec,
XLogRecGetOrigin(record));
} }
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{ {

View File

@ -49,18 +49,18 @@
*/ */
/* /*
* We need 8+4 bytes per xact. Note that enlarging this struct might mean * We need 8+2 bytes per xact. Note that enlarging this struct might mean
* the largest possible file name is more than 5 chars long; see * the largest possible file name is more than 5 chars long; see
* SlruScanDirectory. * SlruScanDirectory.
*/ */
typedef struct CommitTimestampEntry typedef struct CommitTimestampEntry
{ {
TimestampTz time; TimestampTz time;
CommitTsNodeId nodeid; RepOriginId nodeid;
} CommitTimestampEntry; } CommitTimestampEntry;
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
sizeof(CommitTsNodeId)) sizeof(RepOriginId))
#define COMMIT_TS_XACTS_PER_PAGE \ #define COMMIT_TS_XACTS_PER_PAGE \
(BLCKSZ / SizeOfCommitTimestampEntry) (BLCKSZ / SizeOfCommitTimestampEntry)
@ -93,43 +93,18 @@ CommitTimestampShared *commitTsShared;
/* GUC variable */ /* GUC variable */
bool track_commit_timestamp; bool track_commit_timestamp;
static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts, TransactionId *subxids, TimestampTz ts,
CommitTsNodeId nodeid, int pageno); RepOriginId nodeid, int pageno);
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
CommitTsNodeId nodeid, int slotno); RepOriginId nodeid, int slotno);
static int ZeroCommitTsPage(int pageno, bool writeXlog); static int ZeroCommitTsPage(int pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2); static bool CommitTsPagePrecedes(int page1, int page2);
static void WriteZeroPageXlogRec(int pageno); static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec(int pageno); static void WriteTruncateXlogRec(int pageno);
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp, TransactionId *subxids, TimestampTz timestamp,
CommitTsNodeId nodeid); RepOriginId nodeid);
/*
* CommitTsSetDefaultNodeId
*
* Set default nodeid for current backend.
*/
void
CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
{
default_node_id = nodeid;
}
/*
* CommitTsGetDefaultNodeId
*
* Set default nodeid for current backend.
*/
CommitTsNodeId
CommitTsGetDefaultNodeId(void)
{
return default_node_id;
}
/* /*
* TransactionTreeSetCommitTsData * TransactionTreeSetCommitTsData
@ -156,7 +131,7 @@ CommitTsGetDefaultNodeId(void)
void void
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp, TransactionId *subxids, TimestampTz timestamp,
CommitTsNodeId nodeid, bool do_xlog) RepOriginId nodeid, bool do_xlog)
{ {
int i; int i;
TransactionId headxid; TransactionId headxid;
@ -234,7 +209,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
static void static void
SetXidCommitTsInPage(TransactionId xid, int nsubxids, SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts, TransactionId *subxids, TimestampTz ts,
CommitTsNodeId nodeid, int pageno) RepOriginId nodeid, int pageno)
{ {
int slotno; int slotno;
int i; int i;
@ -259,7 +234,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
*/ */
static void static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
CommitTsNodeId nodeid, int slotno) RepOriginId nodeid, int slotno)
{ {
int entryno = TransactionIdToCTsEntry(xid); int entryno = TransactionIdToCTsEntry(xid);
CommitTimestampEntry entry; CommitTimestampEntry entry;
@ -282,7 +257,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
*/ */
bool bool
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
CommitTsNodeId *nodeid) RepOriginId *nodeid)
{ {
int pageno = TransactionIdToCTsPage(xid); int pageno = TransactionIdToCTsPage(xid);
int entryno = TransactionIdToCTsEntry(xid); int entryno = TransactionIdToCTsEntry(xid);
@ -322,7 +297,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
if (ts) if (ts)
*ts = 0; *ts = 0;
if (nodeid) if (nodeid)
*nodeid = InvalidCommitTsNodeId; *nodeid = InvalidRepOriginId;
return false; return false;
} }
@ -373,7 +348,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
* as NULL if not wanted. * as NULL if not wanted.
*/ */
TransactionId TransactionId
GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid) GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
{ {
TransactionId xid; TransactionId xid;
@ -503,7 +478,7 @@ CommitTsShmemInit(void)
commitTsShared->xidLastCommit = InvalidTransactionId; commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId; commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
} }
else else
Assert(found); Assert(found);
@ -857,7 +832,7 @@ WriteTruncateXlogRec(int pageno)
static void static void
WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp, TransactionId *subxids, TimestampTz timestamp,
CommitTsNodeId nodeid) RepOriginId nodeid)
{ {
xl_commit_ts_set record; xl_commit_ts_set record;

View File

@ -23,6 +23,7 @@
#include "commands/dbcommands_xlog.h" #include "commands/dbcommands_xlog.h"
#include "commands/sequence.h" #include "commands/sequence.h"
#include "commands/tablespace.h" #include "commands/tablespace.h"
#include "replication/origin.h"
#include "storage/standby.h" #include "storage/standby.h"
#include "utils/relmapper.h" #include "utils/relmapper.h"

View File

@ -40,8 +40,10 @@
#include "libpq/pqsignal.h" #include "libpq/pqsignal.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "replication/logical.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "replication/syncrep.h" #include "replication/syncrep.h"
#include "replication/origin.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/predicate.h" #include "storage/predicate.h"
@ -1073,21 +1075,27 @@ RecordTransactionCommit(void)
nmsgs, invalMessages, nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit, RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */); InvalidTransactionId /* plain commit */);
}
/* /*
* We only need to log the commit timestamp separately if the node * Record plain commit ts if not replaying remote actions, or if no
* identifier is a valid value; the commit record above already contains * timestamp is configured.
* the timestamp info otherwise, and will be used to load it.
*/ */
if (markXidCommitted) if (replorigin_sesssion_origin == InvalidRepOriginId ||
{ replorigin_sesssion_origin == DoNotReplicateId ||
CommitTsNodeId node_id; replorigin_sesssion_origin_timestamp == 0)
replorigin_sesssion_origin_timestamp = xactStopTimestamp;
else
replorigin_session_advance(replorigin_sesssion_origin_lsn,
XactLastRecEnd);
node_id = CommitTsGetDefaultNodeId(); /*
* We don't need to WAL log origin or timestamp here, the commit
* record contains all the necessary information and will redo the SET
* action during replay.
*/
TransactionTreeSetCommitTsData(xid, nchildren, children, TransactionTreeSetCommitTsData(xid, nchildren, children,
xactStopTimestamp, replorigin_sesssion_origin_timestamp,
node_id, node_id != InvalidCommitTsNodeId); replorigin_sesssion_origin, false);
} }
/* /*
@ -1176,9 +1184,11 @@ RecordTransactionCommit(void)
if (wrote_xlog && markXidCommitted) if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd); SyncRepWaitForLSN(XactLastRecEnd);
/* remember end of last commit record */
XactLastCommitEnd = XactLastRecEnd;
/* Reset XactLastRecEnd until the next transaction writes something */ /* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0; XactLastRecEnd = 0;
cleanup: cleanup:
/* Clean up local data */ /* Clean up local data */
if (rels) if (rels)
@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_relfilenodes xl_relfilenodes; xl_xact_relfilenodes xl_relfilenodes;
xl_xact_invals xl_invals; xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase; xl_xact_twophase xl_twophase;
xl_xact_origin xl_origin;
uint8 info; uint8 info;
@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_twophase.xid = twophase_xid; xl_twophase.xid = twophase_xid;
} }
/* dump transaction origin information */
if (replorigin_sesssion_origin != InvalidRepOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
xl_origin.origin_lsn = replorigin_sesssion_origin_lsn;
xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp;
}
if (xl_xinfo.xinfo != 0) if (xl_xinfo.xinfo != 0)
info |= XLOG_XACT_HAS_INFO; info |= XLOG_XACT_HAS_INFO;
@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
/* we allow filtering by xacts */
XLogIncludeOrigin();
return XLogInsert(RM_XACT_ID, info); return XLogInsert(RM_XACT_ID, info);
} }
@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time,
static void static void
xact_redo_commit(xl_xact_parsed_commit *parsed, xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid, TransactionId xid,
XLogRecPtr lsn) XLogRecPtr lsn,
RepOriginId origin_id)
{ {
TransactionId max_xid; TransactionId max_xid;
int i; int i;
TimestampTz commit_time;
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
LWLockRelease(XidGenLock); LWLockRelease(XidGenLock);
} }
Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId));
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
commit_time = parsed->origin_timestamp;
else
commit_time = parsed->xact_time;
/* Set the transaction commit timestamp and metadata */ /* Set the transaction commit timestamp and metadata */
TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
parsed->xact_time, InvalidCommitTsNodeId, commit_time, origin_id,
false); false);
if (standbyState == STANDBY_DISABLED) if (standbyState == STANDBY_DISABLED)
@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
StandbyReleaseLockTree(xid, 0, NULL); StandbyReleaseLockTree(xid, 0, NULL);
} }
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
/* recover apply progress */
replorigin_advance(origin_id, parsed->origin_lsn, lsn,
false /* backward */, false /* WAL */);
}
/* Make sure files supposed to be dropped are dropped */ /* Make sure files supposed to be dropped are dropped */
if (parsed->nrels > 0) if (parsed->nrels > 0)
{ {
@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record)
{ {
Assert(!TransactionIdIsValid(parsed.twophase_xid)); Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, XLogRecGetXid(record), xact_redo_commit(&parsed, XLogRecGetXid(record),
record->EndRecPtr); record->EndRecPtr, XLogRecGetOrigin(record));
} }
else else
{ {
Assert(TransactionIdIsValid(parsed.twophase_xid)); Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, parsed.twophase_xid, xact_redo_commit(&parsed, parsed.twophase_xid,
record->EndRecPtr); record->EndRecPtr, XLogRecGetOrigin(record));
RemoveTwoPhaseFile(parsed.twophase_xid, false); RemoveTwoPhaseFile(parsed.twophase_xid, false);
} }
} }

View File

@ -44,6 +44,7 @@
#include "postmaster/startup.h" #include "postmaster/startup.h"
#include "replication/logical.h" #include "replication/logical.h"
#include "replication/slot.h" #include "replication/slot.h"
#include "replication/origin.h"
#include "replication/snapbuild.h" #include "replication/snapbuild.h"
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "replication/walsender.h" #include "replication/walsender.h"
@ -295,6 +296,7 @@ static TimeLineID curFileTLI;
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
/* /*
* RedoRecPtr is this backend's local copy of the REDO record pointer * RedoRecPtr is this backend's local copy of the REDO record pointer
@ -6211,6 +6213,11 @@ StartupXLOG(void)
*/ */
StartupMultiXact(); StartupMultiXact();
/*
* Recover knowledge about replay progress of known replication partners.
*/
StartupReplicationOrigin();
/* /*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the * Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so * control file. On recovery, all unlogged relations are blown away, so
@ -8394,6 +8401,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointSnapBuild(); CheckPointSnapBuild();
CheckPointLogicalRewriteHeap(); CheckPointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */ CheckPointBuffers(flags); /* performs all required fsyncs */
CheckPointReplicationOrigin();
/* We deliberately delay 2PC checkpointing as long as possible */ /* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo); CheckPointTwoPhase(checkPointRedo);
} }

View File

@ -26,6 +26,7 @@
#include "catalog/pg_control.h" #include "catalog/pg_control.h"
#include "common/pg_lzcompress.h" #include "common/pg_lzcompress.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "replication/origin.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -72,6 +73,9 @@ static XLogRecData *mainrdata_head;
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
static uint32 mainrdata_len; /* total # of bytes in chain */ static uint32 mainrdata_len; /* total # of bytes in chain */
/* Should te in-progress insertion log the origin */
static bool include_origin = false;
/* /*
* These are used to hold the record header while constructing a record. * These are used to hold the record header while constructing a record.
* 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
@ -83,10 +87,12 @@ static uint32 mainrdata_len; /* total # of bytes in chain */
static XLogRecData hdr_rdt; static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL; static char *hdr_scratch = NULL;
#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
#define HEADER_SCRATCH_SIZE \ #define HEADER_SCRATCH_SIZE \
(SizeOfXLogRecord + \ (SizeOfXLogRecord + \
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
SizeOfXLogRecordDataHeaderLong) SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
/* /*
* An array of XLogRecData structs, to hold registered data. * An array of XLogRecData structs, to hold registered data.
@ -193,6 +199,7 @@ XLogResetInsertion(void)
max_registered_block_id = 0; max_registered_block_id = 0;
mainrdata_len = 0; mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head; mainrdata_last = (XLogRecData *) &mainrdata_head;
include_origin = false;
begininsert_called = false; begininsert_called = false;
} }
@ -374,6 +381,16 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
regbuf->rdata_len += len; regbuf->rdata_len += len;
} }
/*
* Should this record include the replication origin if one is set up?
*/
void
XLogIncludeOrigin(void)
{
Assert(begininsert_called);
include_origin = true;
}
/* /*
* Insert an XLOG record having the specified RMID and info bytes, with the * Insert an XLOG record having the specified RMID and info bytes, with the
* body of the record being the data and buffer references registered earlier * body of the record being the data and buffer references registered earlier
@ -678,6 +695,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
scratch += sizeof(BlockNumber); scratch += sizeof(BlockNumber);
} }
/* followed by the record's origin, if any */
if (include_origin && replorigin_sesssion_origin != InvalidRepOriginId)
{
*(scratch++) = XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_sesssion_origin, sizeof(replorigin_sesssion_origin));
scratch += sizeof(replorigin_sesssion_origin);
}
/* followed by main data, if any */ /* followed by main data, if any */
if (mainrdata_len > 0) if (mainrdata_len > 0)
{ {

View File

@ -21,6 +21,7 @@
#include "access/xlogreader.h" #include "access/xlogreader.h"
#include "catalog/pg_control.h" #include "catalog/pg_control.h"
#include "common/pg_lzcompress.h" #include "common/pg_lzcompress.h"
#include "replication/origin.h"
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
@ -975,6 +976,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
ResetDecoder(state); ResetDecoder(state);
state->decoded_record = record; state->decoded_record = record;
state->record_origin = InvalidRepOriginId;
ptr = (char *) record; ptr = (char *) record;
ptr += SizeOfXLogRecord; ptr += SizeOfXLogRecord;
@ -1009,6 +1011,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
break; /* by convention, the main data fragment is break; /* by convention, the main data fragment is
* always last */ * always last */
} }
else if (block_id == XLR_BLOCK_ID_ORIGIN)
{
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
}
else if (block_id <= XLR_MAX_BLOCK_ID) else if (block_id <= XLR_MAX_BLOCK_ID)
{ {
/* XLogRecordBlockHeader */ /* XLogRecordBlockHeader */

View File

@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \ pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
pg_ts_parser.h pg_ts_template.h pg_extension.h \ pg_ts_parser.h pg_ts_template.h pg_extension.h \
pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \ pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
pg_foreign_table.h pg_policy.h \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \
pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \ pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
pg_transform.h \ pg_transform.h \
toasting.h indexing.h \ toasting.h indexing.h \

View File

@ -32,6 +32,7 @@
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
#include "catalog/pg_pltemplate.h" #include "catalog/pg_pltemplate.h"
#include "catalog/pg_db_role_setting.h" #include "catalog/pg_db_role_setting.h"
#include "catalog/pg_replication_origin.h"
#include "catalog/pg_shdepend.h" #include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h" #include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h" #include "catalog/pg_shseclabel.h"
@ -224,7 +225,8 @@ IsSharedRelation(Oid relationId)
relationId == SharedDependRelationId || relationId == SharedDependRelationId ||
relationId == SharedSecLabelRelationId || relationId == SharedSecLabelRelationId ||
relationId == TableSpaceRelationId || relationId == TableSpaceRelationId ||
relationId == DbRoleSettingRelationId) relationId == DbRoleSettingRelationId ||
relationId == ReplicationOriginRelationId)
return true; return true;
/* These are their indexes (see indexing.h) */ /* These are their indexes (see indexing.h) */
if (relationId == AuthIdRolnameIndexId || if (relationId == AuthIdRolnameIndexId ||
@ -240,7 +242,9 @@ IsSharedRelation(Oid relationId)
relationId == SharedSecLabelObjectIndexId || relationId == SharedSecLabelObjectIndexId ||
relationId == TablespaceOidIndexId || relationId == TablespaceOidIndexId ||
relationId == TablespaceNameIndexId || relationId == TablespaceNameIndexId ||
relationId == DbRoleSettingDatidRolidIndexId) relationId == DbRoleSettingDatidRolidIndexId ||
relationId == ReplicationOriginIdentIndex ||
relationId == ReplicationOriginNameIndex)
return true; return true;
/* These are their toast tables and toast indexes (see toasting.h) */ /* These are their toast tables and toast indexes (see toasting.h) */
if (relationId == PgShdescriptionToastTable || if (relationId == PgShdescriptionToastTable ||

View File

@ -778,6 +778,13 @@ CREATE VIEW pg_user_mappings AS
REVOKE ALL on pg_user_mapping FROM public; REVOKE ALL on pg_user_mapping FROM public;
CREATE VIEW pg_replication_origin_status AS
SELECT *
FROM pg_show_replication_origin_status();
REVOKE ALL ON pg_replication_origin_status FROM public;
-- --
-- We have a few function definitions in here, too. -- We have a few function definitions in here, too.
-- At some point there might be enough to justify breaking them out into -- At some point there might be enough to justify breaking them out into

View File

@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
snapbuild.o
include $(top_srcdir)/src/backend/common.mk include $(top_srcdir)/src/backend/common.mk

View File

@ -40,6 +40,7 @@
#include "replication/decode.h" #include "replication/decode.h"
#include "replication/logical.h" #include "replication/logical.h"
#include "replication/reorderbuffer.h" #include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h" #include "replication/snapbuild.h"
#include "storage/standby.h" #include "storage/standby.h"
@ -131,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_SPGIST_ID: case RM_SPGIST_ID:
case RM_BRIN_ID: case RM_BRIN_ID:
case RM_COMMIT_TS_ID: case RM_COMMIT_TS_ID:
case RM_REPLORIGIN_ID:
break; break;
case RM_NEXT_ID: case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
@ -422,6 +424,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
} }
} }
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
if (ctx->callbacks.filter_by_origin_cb == NULL)
return false;
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
/* /*
* Consolidated commit record handling between the different form of commit * Consolidated commit record handling between the different form of commit
* records. * records.
@ -430,8 +441,17 @@ static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid) xl_xact_parsed_commit *parsed, TransactionId xid)
{ {
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
XLogRecPtr commit_time = InvalidXLogRecPtr;
XLogRecPtr origin_id = InvalidRepOriginId;
int i; int i;
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
origin_lsn = parsed->origin_lsn;
commit_time = parsed->origin_timestamp;
}
/* /*
* Process invalidation messages, even if we're not interested in the * Process invalidation messages, even if we're not interested in the
* transaction's contents, since the various caches need to always be * transaction's contents, since the various caches need to always be
@ -452,12 +472,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions * the reorderbuffer to forget the content of the (sub-)transactions
* if not. * if not.
* *
* There basically two reasons we might not be interested in this * There can be several reasons we might not be interested in this
* transaction: * transaction:
* 1) We might not be interested in decoding transactions up to this * 1) We might not be interested in decoding transactions up to this
* LSN. This can happen because we previously decoded it and now just * LSN. This can happen because we previously decoded it and now just
* are restarting or if we haven't assembled a consistent snapshot yet. * are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database. * 2) The transaction happened in another database.
* 3) The output plugin is not interested in the origin.
* *
* We can't just use ReorderBufferAbort() here, because we need to execute * We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if * the transaction's invalidations. This currently won't be needed if
@ -472,7 +493,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* --- * ---
*/ */
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database)) (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
FilterByOrigin(ctx, origin_id))
{ {
for (i = 0; i < parsed->nsubxacts; i++) for (i = 0; i < parsed->nsubxacts; i++)
{ {
@ -492,7 +514,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* replay actions of all transaction + subtransactions in order */ /* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
parsed->xact_time); commit_time, origin_id, origin_lsn);
} }
/* /*
@ -537,8 +559,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database) if (target_node.dbNode != ctx->slot->data.database)
return; return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder); change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT; change->action = REORDER_BUFFER_CHANGE_INSERT;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@ -579,8 +606,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database) if (target_node.dbNode != ctx->slot->data.database)
return; return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder); change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE; change->action = REORDER_BUFFER_CHANGE_UPDATE;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@ -628,8 +660,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database) if (target_node.dbNode != ctx->slot->data.database)
return; return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder); change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE; change->action = REORDER_BUFFER_CHANGE_DELETE;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
@ -673,6 +710,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (rnode.dbNode != ctx->slot->data.database) if (rnode.dbNode != ctx->slot->data.database)
return; return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
tupledata = XLogRecGetBlockData(r, 0, &tuplelen); tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
data = tupledata; data = tupledata;
@ -685,6 +726,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder); change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT; change->action = REORDER_BUFFER_CHANGE_INSERT;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
/* /*

View File

@ -39,6 +39,7 @@
#include "replication/decode.h" #include "replication/decode.h"
#include "replication/logical.h" #include "replication/logical.h"
#include "replication/reorderbuffer.h" #include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h" #include "replication/snapbuild.h"
#include "storage/proc.h" #include "storage/proc.h"
@ -720,6 +721,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous; error_context_stack = errcallback.previous;
} }
bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
bool ret;
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "shutdown";
state.report_location = InvalidXLogRecPtr;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
return ret;
}
/* /*
* Set the required catalog xmin horizon for historic snapshots in the current * Set the required catalog xmin horizon for historic snapshots in the current
* replication slot. * replication slot.

File diff suppressed because it is too large Load Diff

View File

@ -1255,7 +1255,8 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time) TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{ {
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
volatile Snapshot snapshot_now; volatile Snapshot snapshot_now;
@ -1273,6 +1274,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->final_lsn = commit_lsn; txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn; txn->end_lsn = end_lsn;
txn->commit_time = commit_time; txn->commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
/* serialize the last bunch of changes if we need start earlier anyway */ /* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries) if (txn->nentries_mem != txn->nentries)

View File

@ -31,6 +31,7 @@
#include "replication/slot.h" #include "replication/slot.h"
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "replication/origin.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/dsm.h" #include "storage/dsm.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@ -132,6 +133,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, CheckpointerShmemSize()); size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize()); size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, ReplicationSlotsShmemSize()); size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize()); size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize()); size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize()); size = add_size(size, BTreeShmemSize());
@ -238,6 +240,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
CheckpointerShmemInit(); CheckpointerShmemInit();
AutoVacuumShmemInit(); AutoVacuumShmemInit();
ReplicationSlotsShmemInit(); ReplicationSlotsShmemInit();
ReplicationOriginShmemInit();
WalSndShmemInit(); WalSndShmemInit();
WalRcvShmemInit(); WalRcvShmemInit();

View File

@ -54,6 +54,7 @@
#include "catalog/pg_shdepend.h" #include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h" #include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h" #include "catalog/pg_shseclabel.h"
#include "catalog/pg_replication_origin.h"
#include "catalog/pg_statistic.h" #include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h" #include "catalog/pg_tablespace.h"
#include "catalog/pg_transform.h" #include "catalog/pg_transform.h"
@ -621,6 +622,28 @@ static const struct cachedesc cacheinfo[] = {
}, },
128 128
}, },
{ReplicationOriginRelationId, /* REPLORIGIDENT */
ReplicationOriginIdentIndex,
1,
{
Anum_pg_replication_origin_roident,
0,
0,
0
},
16
},
{ReplicationOriginRelationId, /* REPLORIGNAME */
ReplicationOriginNameIndex,
1,
{
Anum_pg_replication_origin_roname,
0,
0,
0
},
16
},
{RewriteRelationId, /* RULERELNAME */ {RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId, RewriteRelRulenameIndexId,
2, 2,

View File

@ -56,6 +56,8 @@
#include "common/restricted_token.h" #include "common/restricted_token.h"
#include "storage/large_object.h" #include "storage/large_object.h"
#include "pg_getopt.h" #include "pg_getopt.h"
#include "replication/logical.h"
#include "replication/origin.h"
static ControlFileData ControlFile; /* pg_control values */ static ControlFileData ControlFile; /* pg_control values */
@ -1091,6 +1093,7 @@ WriteEmptyXLOG(void)
record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint); record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID; record->xl_rmid = RM_XLOG_ID;
recptr += SizeOfXLogRecord; recptr += SizeOfXLogRecord;
*(recptr++) = XLR_BLOCK_ID_DATA_SHORT; *(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
*(recptr++) = sizeof(CheckPoint); *(recptr++) = sizeof(CheckPoint);

View File

@ -13,6 +13,7 @@
#include "access/xlog.h" #include "access/xlog.h"
#include "datatype/timestamp.h" #include "datatype/timestamp.h"
#include "replication/origin.h"
#include "utils/guc.h" #include "utils/guc.h"
@ -21,18 +22,13 @@ extern PGDLLIMPORT bool track_commit_timestamp;
extern bool check_track_commit_timestamp(bool *newval, void **extra, extern bool check_track_commit_timestamp(bool *newval, void **extra,
GucSource source); GucSource source);
typedef uint32 CommitTsNodeId;
#define InvalidCommitTsNodeId 0
extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid);
extern CommitTsNodeId CommitTsGetDefaultNodeId(void);
extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp, TransactionId *subxids, TimestampTz timestamp,
CommitTsNodeId nodeid, bool do_xlog); RepOriginId nodeid, bool do_xlog);
extern bool TransactionIdGetCommitTsData(TransactionId xid, extern bool TransactionIdGetCommitTsData(TransactionId xid,
TimestampTz *ts, CommitTsNodeId *nodeid); TimestampTz *ts, RepOriginId *nodeid);
extern TransactionId GetLatestCommitTsData(TimestampTz *ts, extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
CommitTsNodeId *nodeid); RepOriginId *nodeid);
extern Size CommitTsShmemBuffers(void); extern Size CommitTsShmemBuffers(void);
extern Size CommitTsShmemSize(void); extern Size CommitTsShmemSize(void);
@ -58,7 +54,7 @@ extern void AdvanceOldestCommitTs(TransactionId oldestXact);
typedef struct xl_commit_ts_set typedef struct xl_commit_ts_set
{ {
TimestampTz timestamp; TimestampTz timestamp;
CommitTsNodeId nodeid; RepOriginId nodeid;
TransactionId mainxid; TransactionId mainxid;
/* subxact Xids follow */ /* subxact Xids follow */
} xl_commit_ts_set; } xl_commit_ts_set;

View File

@ -44,3 +44,4 @@ PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)

View File

@ -131,6 +131,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XACT_XINFO_HAS_RELFILENODES (1U << 2) #define XACT_XINFO_HAS_RELFILENODES (1U << 2)
#define XACT_XINFO_HAS_INVALS (1U << 3) #define XACT_XINFO_HAS_INVALS (1U << 3)
#define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_TWOPHASE (1U << 4)
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
/* /*
* Also stored in xinfo, these indicating a variety of additional actions that * Also stored in xinfo, these indicating a variety of additional actions that
@ -217,6 +218,12 @@ typedef struct xl_xact_twophase
} xl_xact_twophase; } xl_xact_twophase;
#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs) #define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
typedef struct xl_xact_origin
{
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_origin;
typedef struct xl_xact_commit typedef struct xl_xact_commit
{ {
TimestampTz xact_time; /* time of commit */ TimestampTz xact_time; /* time of commit */
@ -227,6 +234,7 @@ typedef struct xl_xact_commit
/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */ /* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
/* xl_xact_invals follows if XINFO_HAS_INVALS */ /* xl_xact_invals follows if XINFO_HAS_INVALS */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
/* xl_xact_origin follows if XINFO_HAS_ORIGIN */
} xl_xact_commit; } xl_xact_commit;
#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
@ -267,6 +275,9 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs; SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */ TransactionId twophase_xid; /* only for 2PC */
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_commit; } xl_xact_parsed_commit;
typedef struct xl_xact_parsed_abort typedef struct xl_xact_parsed_abort

View File

@ -85,6 +85,7 @@ typedef enum
} RecoveryTargetType; } RecoveryTargetType;
extern XLogRecPtr XactLastRecEnd; extern XLogRecPtr XactLastRecEnd;
extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
extern bool reachedConsistency; extern bool reachedConsistency;

View File

@ -31,7 +31,7 @@
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD085 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {

View File

@ -44,6 +44,12 @@ typedef uint64 XLogSegNo;
*/ */
typedef uint32 TimeLineID; typedef uint32 TimeLineID;
/*
* Replication origin id - this is located in this file to avoid having to
* include origin.h in a bunch of xlog related places.
*/
typedef uint16 RepOriginId;
/* /*
* Because O_DIRECT bypasses the kernel buffers, and because we never * Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal, * read those buffers except during crash recovery or if wal_level != minimal,

View File

@ -39,6 +39,7 @@
/* prototypes for public functions in xloginsert.c: */ /* prototypes for public functions in xloginsert.c: */
extern void XLogBeginInsert(void); extern void XLogBeginInsert(void);
extern void XLogIncludeOrigin(void);
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info); extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
extern void XLogEnsureRecordSpace(int nbuffers, int ndatas); extern void XLogEnsureRecordSpace(int nbuffers, int ndatas);
extern void XLogRegisterData(char *data, int len); extern void XLogRegisterData(char *data, int len);

View File

@ -127,6 +127,8 @@ struct XLogReaderState
uint32 main_data_len; /* main data portion's length */ uint32 main_data_len; /* main data portion's length */
uint32 main_data_bufsz; /* allocated size of the buffer */ uint32 main_data_bufsz; /* allocated size of the buffer */
RepOriginId record_origin;
/* information about blocks referenced by the record. */ /* information about blocks referenced by the record. */
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
@ -186,6 +188,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) #define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
#define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetData(decoder) ((decoder)->main_data)
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)

View File

@ -212,5 +212,6 @@ typedef struct XLogRecordDataHeaderLong
#define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_SHORT 255
#define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_DATA_LONG 254
#define XLR_BLOCK_ID_ORIGIN 253
#endif /* XLOGRECORD_H */ #endif /* XLOGRECORD_H */

View File

@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201504261 #define CATALOG_VERSION_NO 201504291
#endif #endif

View File

@ -310,6 +310,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid
DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops)); DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops));
#define PolicyPolrelidPolnameIndexId 3258 #define PolicyPolrelidPolnameIndexId 3258
DECLARE_UNIQUE_INDEX(pg_replication_origin_roiident_index, 6001, on pg_replication_origin using btree(roident oid_ops));
#define ReplicationOriginIdentIndex 6001
DECLARE_UNIQUE_INDEX(pg_replication_origin_roname_index, 6002, on pg_replication_origin using btree(roname varchar_pattern_ops));
#define ReplicationOriginNameIndex 6002
/* last step of initialization script: build the indexes declared above */ /* last step of initialization script: build the indexes declared above */
BUILD_INDICES BUILD_INDICES

View File

@ -5203,6 +5203,42 @@ DESCR("for use by pg_upgrade");
DATA(insert OID = 3591 ( binary_upgrade_create_empty_extension PGNSP PGUID 12 1 0 0 0 f f f f f f v 7 0 2278 "25 25 16 25 1028 1009 1009" _null_ _null_ _null_ _null_ _null_ binary_upgrade_create_empty_extension _null_ _null_ _null_ )); DATA(insert OID = 3591 ( binary_upgrade_create_empty_extension PGNSP PGUID 12 1 0 0 0 f f f f f f v 7 0 2278 "25 25 16 25 1028 1009 1009" _null_ _null_ _null_ _null_ _null_ binary_upgrade_create_empty_extension _null_ _null_ _null_ ));
DESCR("for use by pg_upgrade"); DESCR("for use by pg_upgrade");
/* replication/origin.h */
DATA(insert OID = 6003 ( pg_replication_origin_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_create _null_ _null_ _null_ ));
DESCR("create a replication origin");
DATA(insert OID = 6004 ( pg_replication_origin_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_drop _null_ _null_ _null_ ));
DESCR("drop replication origin identified by its name");
DATA(insert OID = 6005 ( pg_replication_origin_oid PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_oid _null_ _null_ _null_ ));
DESCR("translate the replication origin's name to its id");
DATA(insert OID = 6006 ( pg_replication_origin_session_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_setup _null_ _null_ _null_ ));
DESCR("configure session to maintain replication progress tracking for the passed in origin");
DATA(insert OID = 6007 ( pg_replication_origin_session_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_reset _null_ _null_ _null_ ));
DESCR("teardown configured replication progress tracking");
DATA(insert OID = 6008 ( pg_replication_origin_session_is_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_is_setup _null_ _null_ _null_ ));
DESCR("is a replication origin configured in this session");
DATA(insert OID = 6009 ( pg_replication_origin_session_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 3220 "16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_progress _null_ _null_ _null_ ));
DESCR("get the replication progress of the current session");
DATA(insert OID = 6010 ( pg_replication_origin_xact_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_setup _null_ _null_ _null_ ));
DESCR("setup the transaction's origin lsn and timestamp");
DATA(insert OID = 6011 ( pg_replication_origin_xact_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_reset _null_ _null_ _null_ ));
DESCR("reset the transaction's origin lsn and timestamp");
DATA(insert OID = 6012 ( pg_replication_origin_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "25 3220" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_advance _null_ _null_ _null_ ));
DESCR("advance replication itentifier to specific location");
DATA(insert OID = 6013 ( pg_replication_origin_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_progress _null_ _null_ _null_ ));
DESCR("get an individual replication origin's replication progress");
DATA(insert OID = 6014 ( pg_show_replication_origin_status PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ _null_ pg_show_replication_origin_status _null_ _null_ _null_ ));
DESCR("get progress for all replication origins");
/* /*
* Symbolic values for provolatile column: these indicate whether the result * Symbolic values for provolatile column: these indicate whether the result

View File

@ -0,0 +1,70 @@
/*-------------------------------------------------------------------------
*
* pg_replication_origin.h
* Persistent replication origin registry
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/catalog/pg_replication_origin.h
*
* NOTES
* the genbki.pl script reads this file and generates .bki
* information from the DATA() statements.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_REPLICATION_ORIGIN_H
#define PG_REPLICATION_ORIGIN_H
#include "catalog/genbki.h"
#include "access/xlogdefs.h"
/* ----------------
* pg_replication_origin. cpp turns this into
* typedef struct FormData_pg_replication_origin
* ----------------
*/
#define ReplicationOriginRelationId 6000
CATALOG(pg_replication_origin,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
{
/*
* Locally known id that get included into WAL.
*
* This should never leave the system.
*
* Needs to fit into a uint16, so we don't waste too much space in WAL
* records. For this reason we don't use a normal Oid column here, since
* we need to handle allocation of new values manually.
*/
Oid roident;
/*
* Variable-length fields start here, but we allow direct access to
* roname.
*/
/* external, free-format, name */
text roname BKI_FORCE_NOT_NULL;
#ifdef CATALOG_VARLEN /* further variable-length fields */
#endif
} FormData_pg_replication_origin;
typedef FormData_pg_replication_origin *Form_pg_replication_origin;
/* ----------------
* compiler constants for pg_replication_origin
* ----------------
*/
#define Natts_pg_replication_origin 2
#define Anum_pg_replication_origin_roident 1
#define Anum_pg_replication_origin_roname 2
/* ----------------
* pg_replication_origin has no initial contents
* ----------------
*/
#endif /* PG_REPLICATION_ORIGIN_H */

View File

@ -97,4 +97,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn); XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
#endif #endif

View File

@ -0,0 +1,86 @@
/*-------------------------------------------------------------------------
* origin.h
* Exports from replication/logical/origin.c
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
* src/include/replication/origin.h
*-------------------------------------------------------------------------
*/
#ifndef PG_ORIGIN_H
#define PG_ORIGIN_H
#include "access/xlogdefs.h"
#include "catalog/pg_replication_origin.h"
#include "replication/logical.h"
typedef struct xl_replorigin_set
{
XLogRecPtr remote_lsn;
RepOriginId node_id;
bool force;
} xl_replorigin_set;
typedef struct xl_replorigin_drop
{
RepOriginId node_id;
} xl_replorigin_drop;
#define XLOG_REPLORIGIN_SET 0x00
#define XLOG_REPLORIGIN_DROP 0x10
#define InvalidRepOriginId 0
#define DoNotReplicateId UINT16_MAX
extern PGDLLIMPORT RepOriginId replorigin_sesssion_origin;
extern PGDLLIMPORT XLogRecPtr replorigin_sesssion_origin_lsn;
extern PGDLLIMPORT TimestampTz replorigin_sesssion_origin_timestamp;
/* API for querying & manipulating replication origins */
extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
extern RepOriginId replorigin_create(char *name);
extern void replorigin_drop(RepOriginId roident);
extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
char **roname);
/* API for querying & manipulating replication progress tracking */
extern void replorigin_advance(RepOriginId node,
XLogRecPtr remote_commit,
XLogRecPtr local_commit,
bool go_backward, bool wal_log);
extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
extern void replorigin_session_advance(XLogRecPtr remote_commit,
XLogRecPtr local_commit);
extern void replorigin_session_setup(RepOriginId node);
extern void replorigin_session_reset(void);
extern XLogRecPtr replorigin_session_get_progress(bool flush);
/* Checkpoint/Startup integration */
extern void CheckPointReplicationOrigin(void);
extern void StartupReplicationOrigin(void);
/* WAL logging */
void replorigin_redo(XLogReaderState *record);
void replorigin_desc(StringInfo buf, XLogReaderState *record);
const char * replorigin_identify(uint8 info);
/* shared memory allocation */
extern Size ReplicationOriginShmemSize(void);
extern void ReplicationOriginShmemInit(void);
/* SQL callable functions */
extern Datum pg_replication_origin_create(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_drop(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_oid(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_advance(PG_FUNCTION_ARGS);
extern Datum pg_replication_origin_progress(PG_FUNCTION_ARGS);
extern Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS);
#endif /* PG_ORIGIN_H */

View File

@ -73,6 +73,13 @@ typedef void (*LogicalDecodeCommitCB) (
ReorderBufferTXN *txn, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn); XLogRecPtr commit_lsn);
/*
* Filter changes by origin.
*/
typedef bool (*LogicalDecodeFilterByOriginCB) (
struct LogicalDecodingContext *,
RepOriginId origin_id);
/* /*
* Called to shutdown an output plugin. * Called to shutdown an output plugin.
*/ */
@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb; LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb; LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb; LogicalDecodeCommitCB commit_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks; } OutputPluginCallbacks;

View File

@ -68,6 +68,8 @@ typedef struct ReorderBufferChange
/* The type of change. */ /* The type of change. */
enum ReorderBufferChangeType action; enum ReorderBufferChangeType action;
RepOriginId origin_id;
/* /*
* Context data for the change, which part of the union is valid depends * Context data for the change, which part of the union is valid depends
* on action/action_internal. * on action/action_internal.
@ -166,6 +168,10 @@ typedef struct ReorderBufferTXN
*/ */
XLogRecPtr restart_decoding_lsn; XLogRecPtr restart_decoding_lsn;
/* origin of the change that caused this transaction */
RepOriginId origin_id;
XLogRecPtr origin_lsn;
/* /*
* Commit time, only known when we read the actual commit record. * Commit time, only known when we read the actual commit record.
*/ */
@ -339,7 +345,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
void ReorderBufferCommit(ReorderBuffer *, TransactionId, void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time); TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn); XLogRecPtr commit_lsn, XLogRecPtr end_lsn);

View File

@ -134,8 +134,9 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
#define ReplicationSlotControlLock (&MainLWLockArray[37].lock) #define ReplicationSlotControlLock (&MainLWLockArray[37].lock)
#define CommitTsControlLock (&MainLWLockArray[38].lock) #define CommitTsControlLock (&MainLWLockArray[38].lock)
#define CommitTsLock (&MainLWLockArray[39].lock) #define CommitTsLock (&MainLWLockArray[39].lock)
#define ReplicationOriginLock (&MainLWLockArray[40].lock)
#define NUM_INDIVIDUAL_LWLOCKS 40 #define NUM_INDIVIDUAL_LWLOCKS 41
/* /*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS

View File

@ -77,6 +77,8 @@ enum SysCacheIdentifier
RANGETYPE, RANGETYPE,
RELNAMENSP, RELNAMENSP,
RELOID, RELOID,
REPLORIGIDENT,
REPLORIGNAME,
RULERELNAME, RULERELNAME,
STATRELATTINH, STATRELATTINH,
TABLESPACEOID, TABLESPACEOID,

View File

@ -1390,6 +1390,11 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid))); LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
pg_replication_origin_status| SELECT pg_show_replication_origin_status.local_id,
pg_show_replication_origin_status.external_id,
pg_show_replication_origin_status.remote_lsn,
pg_show_replication_origin_status.local_lsn
FROM pg_show_replication_origin_status() pg_show_replication_origin_status(local_id, external_id, remote_lsn, local_lsn);
pg_replication_slots| SELECT l.slot_name, pg_replication_slots| SELECT l.slot_name,
l.plugin, l.plugin,
l.slot_type, l.slot_type,

View File

@ -121,6 +121,7 @@ pg_pltemplate|t
pg_policy|t pg_policy|t
pg_proc|t pg_proc|t
pg_range|t pg_range|t
pg_replication_origin|t
pg_rewrite|t pg_rewrite|t
pg_seclabel|t pg_seclabel|t
pg_shdepend|t pg_shdepend|t