Rename GUC logical_decoding_mode to logical_replication_mode.

Rename the developer option 'logical_decoding_mode' to the more flexible
name 'logical_replication_mode' because doing so will make it easier to
extend this option in the future to help test other areas of logical
replication.

Currently, it is used on the publisher side to allow streaming or
serializing each change in logical decoding. In the upcoming patch, we are
planning to use it on the subscriber. On the subscriber, it will allow
serializing the changes to file and notifies the parallel apply workers to
read and apply them at the end of the transaction.

We discussed exposing this parameter as a subscription option but
it did not seem advisable since it is primarily used for testing/debugging
and there is no other such parameter. We also discussed having separate
GUCs for publisher and subscriber but for current testing/debugging
requirements, one GUC is sufficient.

Author: Hou Zhijie
Reviewed-by: Peter Smith, Kuroda Hayato, Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoAy2c=Mx=FTCs+EwUsf2kQL5MmU3N18X84k0EmCXntK4g@mail.gmail.com
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
This commit is contained in:
Amit Kapila 2023-01-30 08:02:08 +05:30
parent 8d2c1913ed
commit 1e8b61735c
9 changed files with 30 additions and 29 deletions

View File

@ -11693,16 +11693,16 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-logical-decoding-mode" xreflabel="logical_decoding_mode"> <varlistentry id="guc-logical-replication-mode" xreflabel="logical_replication_mode">
<term><varname>logical_decoding_mode</varname> (<type>enum</type>) <term><varname>logical_replication_mode</varname> (<type>enum</type>)
<indexterm> <indexterm>
<primary><varname>logical_decoding_mode</varname> configuration parameter</primary> <primary><varname>logical_replication_mode</varname> configuration parameter</primary>
</indexterm> </indexterm>
</term> </term>
<listitem> <listitem>
<para> <para>
Allows streaming or serializing changes immediately in logical decoding. Allows streaming or serializing changes immediately in logical decoding.
The allowed values of <varname>logical_decoding_mode</varname> are The allowed values of <varname>logical_replication_mode</varname> are
<literal>buffered</literal> and <literal>immediate</literal>. When set <literal>buffered</literal> and <literal>immediate</literal>. When set
to <literal>immediate</literal>, stream each change if to <literal>immediate</literal>, stream each change if
<literal>streaming</literal> option (see optional parameters set by <literal>streaming</literal> option (see optional parameters set by

View File

@ -210,7 +210,7 @@ int logical_decoding_work_mem;
static const Size max_changes_in_memory = 4096; /* XXX for restore only */ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
/* GUC variable */ /* GUC variable */
int logical_decoding_mode = LOGICAL_DECODING_MODE_BUFFERED; int logical_replication_mode = LOGICAL_REP_MODE_BUFFERED;
/* --------------------------------------- /* ---------------------------------------
* primary reorderbuffer support routines * primary reorderbuffer support routines
@ -3552,8 +3552,8 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
* pick the largest (sub)transaction at-a-time to evict and spill its changes to * pick the largest (sub)transaction at-a-time to evict and spill its changes to
* disk or send to the output plugin until we reach under the memory limit. * disk or send to the output plugin until we reach under the memory limit.
* *
* If logical_decoding_mode is set to "immediate", stream or serialize the changes * If logical_replication_mode is set to "immediate", stream or serialize the
* immediately. * changes immediately.
* *
* XXX At this point we select the transactions until we reach under the memory * XXX At this point we select the transactions until we reach under the memory
* limit, but we might also adapt a more elaborate eviction strategy - for example * limit, but we might also adapt a more elaborate eviction strategy - for example
@ -3566,15 +3566,15 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
ReorderBufferTXN *txn; ReorderBufferTXN *txn;
/* /*
* Bail out if logical_decoding_mode is buffered and we haven't exceeded * Bail out if logical_replication_mode is buffered and we haven't exceeded
* the memory limit. * the memory limit.
*/ */
if (logical_decoding_mode == LOGICAL_DECODING_MODE_BUFFERED && if (logical_replication_mode == LOGICAL_REP_MODE_BUFFERED &&
rb->size < logical_decoding_work_mem * 1024L) rb->size < logical_decoding_work_mem * 1024L)
return; return;
/* /*
* If logical_decoding_mode is immediate, loop until there's no change. * If logical_replication_mode is immediate, loop until there's no change.
* Otherwise, loop until we reach under the memory limit. One might think * Otherwise, loop until we reach under the memory limit. One might think
* that just by evicting the largest (sub)transaction we will come under * that just by evicting the largest (sub)transaction we will come under
* the memory limit based on assumption that the selected transaction is * the memory limit based on assumption that the selected transaction is
@ -3584,7 +3584,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
* change. * change.
*/ */
while (rb->size >= logical_decoding_work_mem * 1024L || while (rb->size >= logical_decoding_work_mem * 1024L ||
(logical_decoding_mode == LOGICAL_DECODING_MODE_IMMEDIATE && (logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE &&
rb->size > 0)) rb->size > 0))
{ {
/* /*

View File

@ -395,9 +395,9 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = {
{NULL, 0, false} {NULL, 0, false}
}; };
static const struct config_enum_entry logical_decoding_mode_options[] = { static const struct config_enum_entry logical_replication_mode_options[] = {
{"buffered", LOGICAL_DECODING_MODE_BUFFERED, false}, {"buffered", LOGICAL_REP_MODE_BUFFERED, false},
{"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false}, {"immediate", LOGICAL_REP_MODE_IMMEDIATE, false},
{NULL, 0, false} {NULL, 0, false}
}; };
@ -4919,13 +4919,13 @@ struct config_enum ConfigureNamesEnum[] =
}, },
{ {
{"logical_decoding_mode", PGC_USERSET, DEVELOPER_OPTIONS, {"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Allows streaming or serializing each change in logical decoding."), gettext_noop("Controls when to replicate each change."),
NULL, gettext_noop("On the publisher, it allows streaming or serializing each change in logical decoding."),
GUC_NOT_IN_SAMPLE GUC_NOT_IN_SAMPLE
}, },
&logical_decoding_mode, &logical_replication_mode,
LOGICAL_DECODING_MODE_BUFFERED, logical_decoding_mode_options, LOGICAL_REP_MODE_BUFFERED, logical_replication_mode_options,
NULL, NULL, NULL NULL, NULL, NULL
}, },

View File

@ -17,15 +17,16 @@
#include "utils/snapshot.h" #include "utils/snapshot.h"
#include "utils/timestamp.h" #include "utils/timestamp.h"
/* GUC variables */
extern PGDLLIMPORT int logical_decoding_work_mem; extern PGDLLIMPORT int logical_decoding_work_mem;
extern PGDLLIMPORT int logical_decoding_mode; extern PGDLLIMPORT int logical_replication_mode;
/* possible values for logical_decoding_mode */ /* possible values for logical_replication_mode */
typedef enum typedef enum
{ {
LOGICAL_DECODING_MODE_BUFFERED, LOGICAL_REP_MODE_BUFFERED,
LOGICAL_DECODING_MODE_IMMEDIATE LOGICAL_REP_MODE_IMMEDIATE
} LogicalDecodingMode; } LogicalRepMode;
/* an individual tuple, stored in one chunk of memory */ /* an individual tuple, stored in one chunk of memory */
typedef struct ReorderBufferTupleBuf typedef struct ReorderBufferTupleBuf

View File

@ -79,7 +79,7 @@ sub test_streaming
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical'); $node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', $node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate'); 'logical_replication_mode = immediate');
$node_publisher->start; $node_publisher->start;
# Create subscriber node # Create subscriber node

View File

@ -130,7 +130,7 @@ sub test_streaming
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical'); $node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', $node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate'); 'logical_replication_mode = immediate');
$node_publisher->start; $node_publisher->start;
# Create subscriber node # Create subscriber node

View File

@ -16,7 +16,7 @@ use Test::More;
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical'); $node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', $node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate'); 'logical_replication_mode = immediate');
$node_publisher->start; $node_publisher->start;
# Create subscriber node # Create subscriber node

View File

@ -301,7 +301,7 @@ $node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf( $node_publisher->append_conf(
'postgresql.conf', qq( 'postgresql.conf', qq(
max_prepared_transactions = 10 max_prepared_transactions = 10
logical_decoding_mode = immediate logical_replication_mode = immediate
)); ));
$node_publisher->start; $node_publisher->start;

View File

@ -1458,7 +1458,6 @@ LogicalDecodeStreamStopCB
LogicalDecodeStreamTruncateCB LogicalDecodeStreamTruncateCB
LogicalDecodeTruncateCB LogicalDecodeTruncateCB
LogicalDecodingContext LogicalDecodingContext
LogicalDecodingMode
LogicalErrorCallbackState LogicalErrorCallbackState
LogicalOutputPluginInit LogicalOutputPluginInit
LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterPrepareWrite
@ -1468,6 +1467,7 @@ LogicalRepBeginData
LogicalRepCommitData LogicalRepCommitData
LogicalRepCommitPreparedTxnData LogicalRepCommitPreparedTxnData
LogicalRepCtxStruct LogicalRepCtxStruct
LogicalRepMode
LogicalRepMsgType LogicalRepMsgType
LogicalRepPartMapEntry LogicalRepPartMapEntry
LogicalRepPreparedTxnData LogicalRepPreparedTxnData