Perform apply of large transactions by parallel workers.

Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives the
commit, it reads from those files and applies the entire transaction. To
improve the performance of such transactions, we can instead allow them to
be applied via parallel workers.

In this approach, we assign a new parallel apply worker (if available) as
soon as the xact's first stream is received and the leader apply worker
will send changes to this new worker via shared memory. The parallel apply
worker will directly apply the change instead of writing it to temporary
files. However, if the leader apply worker times out while attempting to
send a message to the parallel apply worker, it will switch to
"partial serialize" mode -  in this mode, the leader serializes all
remaining changes to a file and notifies the parallel apply workers to
read and apply them at the end of the transaction. We use a non-blocking
way to send the messages from the leader apply worker to the parallel
apply to avoid deadlocks. We keep this parallel apply assigned till the
transaction commit is received and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and reading
from files in most cases. We still need to spill if there is no worker
available.

This patch also extends the SUBSCRIPTION 'streaming' parameter so that the
user can control whether to apply the streaming transaction in a parallel
apply worker or spill the change to disk. The user can set the streaming
parameter to 'on/off', or 'parallel'. The parameter value 'parallel' means
the streaming will be applied via a parallel apply worker, if available.
The parameter value 'on' means the streaming transaction will be spilled
to disk. The default value is 'off' (same as current behaviour).

In addition, the patch extends the logical replication STREAM_ABORT
message so that abort_lsn and abort_time can also be sent which can be
used to update the replication origin in parallel apply worker when the
streaming transaction is aborted. Because this message extension is needed
to support parallel streaming, parallel streaming is not supported for
publications on servers < PG16.

Author: Hou Zhijie, Wang wei, Amit Kapila with design inputs from Sawada Masahiko
Reviewed-by: Sawada Masahiko, Peter Smith, Dilip Kumar, Shi yu, Kuroda Hayato, Shveta Mallik
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
This commit is contained in:
Amit Kapila 2023-01-09 07:00:39 +05:30
parent 5687e7810f
commit 216a784829
58 changed files with 4497 additions and 745 deletions

View File

@ -7913,11 +7913,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>substream</structfield> <type>bool</type>
<structfield>substream</structfield> <type>char</type>
</para>
<para>
If true, the subscription will allow streaming of in-progress
transactions
Controls how to handle the streaming of in-progress transactions:
<literal>f</literal> = disallow streaming of in-progress transactions,
<literal>t</literal> = spill the changes of in-progress transactions to
disk and apply at once after the transaction is committed on the
publisher and received by the subscriber,
<literal>p</literal> = apply changes directly using a parallel apply
worker if available (same as 't' if no worker is available)
</para></entry>
</row>

View File

@ -4968,7 +4968,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<listitem>
<para>
Specifies maximum number of logical replication workers. This includes
both apply workers and table synchronization workers.
leader apply workers, parallel apply workers, and table synchronization
workers.
</para>
<para>
Logical replication workers are taken from the pool defined by
@ -5008,6 +5009,31 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
<varlistentry id="guc-max-parallel-apply-workers-per-subscription" xreflabel="max_parallel_apply_workers_per_subscription">
<term><varname>max_parallel_apply_workers_per_subscription</varname> (<type>integer</type>)
<indexterm>
<primary><varname>max_parallel_apply_workers_per_subscription</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Maximum number of parallel apply workers per subscription. This
parameter controls the amount of parallelism for streaming of
in-progress transactions with subscription parameter
<literal>streaming = parallel</literal>.
</para>
<para>
The parallel apply workers are taken from the pool defined by
<varname>max_logical_replication_workers</varname>.
</para>
<para>
The default value is 2. This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>

View File

@ -1501,6 +1501,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
might not violate any constraint. This can easily make the subscriber
inconsistent.
</para>
<para>
When the streaming mode is <literal>parallel</literal>, the finish LSN of
failed transactions may not be logged. In that case, it may be necessary to
change the streaming mode to <literal>on</literal> or <literal>off</literal> and
cause the same conflicts again so the finish LSN of the failed transaction will
be written to the server log. For the usage of finish LSN, please refer to <link
linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ...
SKIP</command></link>.
</para>
</sect1>
<sect1 id="logical-replication-restrictions">
@ -1809,8 +1819,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
<para>
<link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link>
must be set to at least the number of subscriptions (for apply workers), plus
some reserve for the table synchronization workers.
must be set to at least the number of subscriptions (for leader apply
workers), plus some reserve for the table synchronization workers and
parallel apply workers.
</para>
<para>
@ -1827,6 +1838,13 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
subscription initialization or when new tables are added.
</para>
<para>
<link linkend="guc-max-parallel-apply-workers-per-subscription"><varname>max_parallel_apply_workers_per_subscription</varname></link>
controls the amount of parallelism for streaming of in-progress
transactions with subscription parameter
<literal>streaming = parallel</literal>.
</para>
<para>
Logical replication workers are also affected by
<link linkend="guc-wal-receiver-timeout"><varname>wal_receiver_timeout</varname></link>,

View File

@ -1858,6 +1858,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>advisory</literal></entry>
<entry>Waiting to acquire an advisory user lock.</entry>
</row>
<row>
<entry><literal>applytransaction</literal></entry>
<entry>Waiting to acquire a lock on a remote transaction being applied
by a logical replication subscriber.</entry>
</row>
<row>
<entry><literal>extend</literal></entry>
<entry>Waiting to extend a relation.</entry>

View File

@ -3103,7 +3103,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<listitem>
<para>
Protocol version. Currently versions <literal>1</literal>, <literal>2</literal>,
and <literal>3</literal> are supported.
<literal>3</literal>, and <literal>4</literal> are supported.
</para>
<para>
Version <literal>2</literal> is supported only for server version 14
@ -3113,6 +3113,11 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
Version <literal>3</literal> is supported only for server version 15
and above, and it allows streaming of two-phase commits.
</para>
<para>
Version <literal>4</literal> is supported only for server version 16
and above, and it allows streams of large in-progress transactions to
be applied in parallel.
</para>
</listitem>
</varlistentry>
@ -6883,6 +6888,28 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>Int64 (XLogRecPtr)</term>
<listitem>
<para>
The LSN of the abort. This field is available since protocol version
4.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>Int64 (TimestampTz)</term>
<listitem>
<para>
Abort timestamp of the transaction. The value is in number
of microseconds since PostgreSQL epoch (2000-01-01). This field is
available since protocol version 4.
</para>
</listitem>
</varlistentry>
</variablelist>
</listitem>
</varlistentry>

View File

@ -228,13 +228,29 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</varlistentry>
<varlistentry>
<term><literal>streaming</literal> (<type>boolean</type>)</term>
<term><literal>streaming</literal> (<type>enum</type>)</term>
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
for this subscription. By default, all transactions
are fully decoded on the publisher and only then sent to the
subscriber as a whole.
for this subscription. The default value is <literal>off</literal>,
meaning all transactions are fully decoded on the publisher and only
then sent to the subscriber as a whole.
</para>
<para>
If set to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is
committed on the publisher and received by the subscriber.
</para>
<para>
If set to <literal>parallel</literal>, incoming changes are directly
applied via one of the parallel apply workers, if available. If no
parallel apply worker is free to handle streaming transactions then
the changes are written to temporary files and applied after the
transaction is committed. Note that if an error happens in a
parallel apply worker, the finish LSN of the remote transaction
might not be reported in the server log.
</para>
</listitem>
</varlistentry>

View File

@ -1379,8 +1379,9 @@
<literal>virtualxid</literal>,
<literal>spectoken</literal>,
<literal>object</literal>,
<literal>userlock</literal>, or
<literal>advisory</literal>.
<literal>userlock</literal>,
<literal>advisory</literal>, or
<literal>applytransaction</literal>.
(See also <xref linkend="wait-event-lock-table"/>.)
</para></entry>
</row>
@ -1594,6 +1595,15 @@
so the <structfield>database</structfield> column is meaningful for an advisory lock.
</para>
<para>
Apply transaction locks are used in parallel mode to apply the transaction
in logical replication. The remote transaction id is displayed in the
<structfield>transactionid</structfield> column. The <structfield>objsubid</structfield>
displays the lock subtype which is 0 for the lock used to synchronize the
set of changes, and 1 for the lock used to wait for the transaction to
finish to ensure commit order.
</para>
<para>
<structname>pg_locks</structname> provides a global view of all locks
in the database cluster, not only those relevant to the current database.

View File

@ -1713,6 +1713,7 @@ RecordTransactionAbort(bool isSubXact)
int nchildren;
TransactionId *children;
TimestampTz xact_time;
bool replorigin;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
@ -1743,6 +1744,13 @@ RecordTransactionAbort(bool isSubXact)
elog(PANIC, "cannot abort transaction %u, it was already committed",
xid);
/*
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId);
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
@ -1766,6 +1774,11 @@ RecordTransactionAbort(bool isSubXact)
MyXactFlags, InvalidTransactionId,
NULL);
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
XactLastRecEnd);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
* flush this abort. There's nothing to be gained by delaying this, since
@ -5873,11 +5886,10 @@ XactLogAbortRecord(TimestampTz abort_time,
}
/*
* Dump transaction origin information only for abort prepared. We need
* this during recovery to update the replication origin progress.
* Dump transaction origin information. We need this during recovery to
* update the replication origin progress.
*/
if ((replorigin_session_origin != InvalidRepOriginId) &&
TransactionIdIsValid(twophase_xid))
if (replorigin_session_origin != InvalidRepOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
@ -5934,8 +5946,8 @@ XactLogAbortRecord(TimestampTz abort_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
if (TransactionIdIsValid(twophase_xid))
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
/* Include the replication origin */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_XACT_ID, info);
}

View File

@ -85,7 +85,7 @@ typedef struct SubOpts
bool copy_data;
bool refresh;
bool binary;
bool streaming;
char streaming;
bool twophase;
bool disableonerr;
char *origin;
@ -139,7 +139,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = false;
opts->streaming = LOGICALREP_STREAM_OFF;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
@ -242,7 +242,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetBoolean(defel);
opts->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@ -630,7 +630,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
@ -1099,7 +1099,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(opts.streaming);
CharGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
@ -2128,3 +2128,60 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
return oldpublist;
}
/*
* Extract the streaming mode value from a DefElem. This is like
* defGetBoolean() but also accepts the special value of "parallel".
*/
char
defGetStreamingMode(DefElem *def)
{
/*
* If no parameter value given, assume "true" is meant.
*/
if (!def->arg)
return LOGICALREP_STREAM_ON;
/*
* Allow 0, 1, "false", "true", "off", "on" or "parallel".
*/
switch (nodeTag(def->arg))
{
case T_Integer:
switch (intVal(def->arg))
{
case 0:
return LOGICALREP_STREAM_OFF;
case 1:
return LOGICALREP_STREAM_ON;
default:
/* otherwise, error out below */
break;
}
break;
default:
{
char *sval = defGetString(def);
/*
* The set of strings accepted here should match up with the
* grammar's opt_boolean_or_string production.
*/
if (pg_strcasecmp(sval, "false") == 0 ||
pg_strcasecmp(sval, "off") == 0)
return LOGICALREP_STREAM_OFF;
if (pg_strcasecmp(sval, "true") == 0 ||
pg_strcasecmp(sval, "on") == 0)
return LOGICALREP_STREAM_ON;
if (pg_strcasecmp(sval, "parallel") == 0)
return LOGICALREP_STREAM_PARALLEL;
}
break;
}
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s requires a Boolean value or \"parallel\"",
def->defname)));
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
}

View File

@ -13,11 +13,13 @@
#include "postgres.h"
#include "access/parallel.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logicalworker.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
@ -162,9 +164,19 @@ mq_putmessage(char msgtype, const char *s, size_t len)
result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
if (pq_mq_parallel_leader_pid != 0)
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_MESSAGE,
pq_mq_parallel_leader_backend_id);
{
if (IsLogicalParallelApplyWorker())
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_APPLY_MESSAGE,
pq_mq_parallel_leader_backend_id);
else
{
Assert(IsParallelWorker());
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_MESSAGE,
pq_mq_parallel_leader_backend_id);
}
}
if (result != SHM_MQ_WOULD_BLOCK)
break;

View File

@ -128,6 +128,9 @@ static const struct
},
{
"ApplyWorkerMain", ApplyWorkerMain
},
{
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
}
};

View File

@ -98,8 +98,9 @@ SignalHandlerForCrashExit(SIGNAL_ARGS)
* shut down and exit.
*
* Typically, this handler would be used for SIGTERM, but some processes use
* other signals. In particular, the checkpointer exits on SIGUSR2, and the
* WAL writer exits on either SIGINT or SIGTERM.
* other signals. In particular, the checkpointer exits on SIGUSR2, and the WAL
* writer and the logical replication parallel apply worker exits on either
* SIGINT or SIGTERM.
*
* ShutdownRequestPending should be checked at a convenient place within the
* main loop, or else the main loop should call HandleMainLoopInterrupts.

View File

@ -443,9 +443,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);
if (options->proto.logical.streaming &&
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfoString(&cmd, ", streaming 'on'");
if (options->proto.logical.streaming_str)
appendStringInfo(&cmd, ", streaming '%s'",
options->proto.logical.streaming_str);
if (options->proto.logical.twophase &&
PQserverVersion(conn->streamConn) >= 150000)

View File

@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = \
applyparallelworker.o \
decode.o \
launcher.o \
logical.o \

File diff suppressed because it is too large Load Diff

View File

@ -822,10 +822,11 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
buf->record->EndRecPtr);
buf->record->EndRecPtr, abort_time);
}
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
abort_time);
}
/* update the decoding stats */

View File

@ -55,6 +55,7 @@
/* GUC variables */
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@ -74,6 +75,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
static int logicalrep_pa_worker_count(Oid subid);
static bool on_commit_launcher_wakeup = false;
@ -152,8 +154,10 @@ get_subscription_list(void)
*
* This is only needed for cleaning up the shared memory in case the worker
* fails to attach.
*
* Returns whether the attach was successful.
*/
static void
static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
@ -169,11 +173,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
/* Worker either died or has started; no need to do anything. */
/* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
return;
return worker->in_use;
}
LWLockRelease(LogicalRepWorkerLock);
@ -188,7 +192,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
return;
return false;
}
/*
@ -210,6 +214,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
* subscription id and relid.
*
* We are only interested in the leader apply worker or table sync worker.
*/
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
@ -224,6 +230,10 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
/* Skip parallel apply workers. */
if (isParallelApplyWorker(w))
continue;
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@ -260,11 +270,13 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
/*
* Start new apply background worker, if possible.
* Start new logical replication background worker, if possible.
*
* Returns true on success, false on failure.
*/
void
bool
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid)
Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@ -273,7 +285,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int slot = 0;
LogicalRepWorker *worker = NULL;
int nsyncworkers;
int nparallelapplyworkers;
TimestampTz now;
bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
/* Sanity check - tablesync worker cannot be a subworker */
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@ -351,7 +368,20 @@ retry:
if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return;
return false;
}
nparallelapplyworkers = logicalrep_pa_worker_count(subid);
/*
* Return false if the number of parallel apply workers reached the limit
* per subscription.
*/
if (is_parallel_apply_worker &&
nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
/*
@ -365,7 +395,7 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
errhint("You might need to increase max_logical_replication_workers.")));
return;
return false;
}
/* Prepare the worker slot. */
@ -380,6 +410,8 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@ -397,19 +429,34 @@ retry:
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
if (is_parallel_apply_worker)
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
else
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u sync %u", subid, relid);
else if (is_parallel_apply_worker)
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication parallel apply worker for subscription %u", subid);
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
"logical replication apply worker for subscription %u", subid);
if (is_parallel_apply_worker)
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
else
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
if (is_parallel_apply_worker)
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
/* Failed to start worker, so clean up the worker slot. */
@ -422,33 +469,23 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase max_worker_processes.")));
return;
return false;
}
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
}
/*
* Stop the logical replication worker for subid/relid, if any, and wait until
* it detaches from the slot.
* Internal function to stop the worker and wait until it detaches from the
* slot.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
static void
logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
{
LogicalRepWorker *worker;
uint16 generation;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, false);
/* No worker, nothing to do. */
if (!worker)
{
LWLockRelease(LogicalRepWorkerLock);
return;
}
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
/*
* Remember which generation was our worker so we can check if what we see
@ -486,10 +523,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
* different, meaning that a different worker has taken the slot.
*/
if (!worker->in_use || worker->generation != generation)
{
LWLockRelease(LogicalRepWorkerLock);
return;
}
/* Worker has assigned proc, so it has started. */
if (worker->proc)
@ -497,7 +531,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
kill(worker->proc->pid, signo);
/* ... and wait for it to die. */
for (;;)
@ -523,6 +557,53 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
}
}
/*
* Stop the logical replication worker for subid/relid, if any.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, false);
if (worker)
{
Assert(!isParallelApplyWorker(worker));
logicalrep_worker_stop_internal(worker, SIGTERM);
}
LWLockRelease(LogicalRepWorkerLock);
}
/*
* Stop the logical replication parallel apply worker corresponding to the
* input slot number.
*
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
* worker so that the worker exits cleanly.
*/
void
logicalrep_pa_worker_stop(int slot_no, uint16 generation)
{
LogicalRepWorker *worker;
Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = &LogicalRepCtx->workers[slot_no];
Assert(isParallelApplyWorker(worker));
/*
* Only stop the worker if the generation matches and the worker is alive.
*/
if (worker->generation == generation && worker->proc)
logicalrep_worker_stop_internal(worker, SIGINT);
LWLockRelease(LogicalRepWorkerLock);
}
@ -595,11 +676,40 @@ logicalrep_worker_attach(int slot)
}
/*
* Detach the worker (cleans up the worker info).
* Stop the parallel apply workers if any, and detach the leader apply worker
* (cleans up the worker info).
*/
static void
logicalrep_worker_detach(void)
{
/* Stop the parallel apply workers. */
if (am_leader_apply_worker())
{
List *workers;
ListCell *lc;
/*
* Detach from the error_mq_handle for all parallel apply workers
* before terminating them. This prevents the leader apply worker from
* receiving the worker termination message and sending it to logs
* when the same is already done by the parallel worker.
*/
pa_detach_all_error_mq();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
foreach(lc, workers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
if (isParallelApplyWorker(w))
logicalrep_worker_stop_internal(w, SIGTERM);
}
LWLockRelease(LogicalRepWorkerLock);
}
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
@ -622,6 +732,8 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->relid = InvalidOid;
worker->apply_leader_pid = InvalidPid;
worker->parallel_apply = false;
}
/*
@ -653,6 +765,13 @@ logicalrep_worker_onexit(int code, Datum arg)
if (MyLogicalRepWorker->stream_fileset != NULL)
FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
/*
* Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker
* terminates, so manually release all locks before the worker exits.
*/
LockReleaseAll(DEFAULT_LOCKMETHOD, true);
ApplyLauncherWakeup();
}
@ -680,6 +799,33 @@ logicalrep_sync_worker_count(Oid subid)
return res;
}
/*
* Count the number of registered (but not necessarily running) parallel apply
* workers for a subscription.
*/
static int
logicalrep_pa_worker_count(Oid subid)
{
int i;
int res = 0;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/*
* Scan all attached parallel apply workers, only counting those which
* have the given subscription id.
*/
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->subid == subid && isParallelApplyWorker(w))
res++;
}
return res;
}
/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
@ -869,7 +1015,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid);
sub->owner, InvalidOid, DSM_HANDLE_INVALID);
}
}
@ -952,6 +1098,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if (OidIsValid(subid) && worker.subid != subid)
continue;
/* Skip if this is a parallel apply worker */
if (isParallelApplyWorker(&worker))
continue;
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);

View File

@ -1,6 +1,7 @@
# Copyright (c) 2022-2023, PostgreSQL Global Development Group
backend_sources += files(
'applyparallelworker.c',
'decode.c',
'launcher.c',
'logical.c',

View File

@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
* array doesn't have to be searched when calling
* replorigin_session_advance().
*
* Obviously only one such cached origin can exist per process and the current
* cached value can only be set again after the previous value is torn down
* with replorigin_session_reset().
* Normally only one such cached origin can exist per process so the cached
* value can only be set again after the previous value is torn down with
* replorigin_session_reset(). For this normal case pass acquired_by = 0
* (meaning the slot is not allowed to be already acquired by another process).
*
* However, sometimes multiple processes can safely re-use the same origin slot
* (for example, multiple parallel apply processes can safely use the same
* origin, provided they maintain commit order by allowing only one process to
* commit at a time). For this case the first process must pass acquired_by =
* 0, and then the other processes sharing that same origin can pass
* acquired_by = PID of the first process.
*/
void
replorigin_session_setup(RepOriginId node)
replorigin_session_setup(RepOriginId node, int acquired_by)
{
static bool registered_cleanup;
int i;
@ -1122,7 +1130,7 @@ replorigin_session_setup(RepOriginId node)
if (curstate->roident != node)
continue;
else if (curstate->acquired_by != 0)
else if (curstate->acquired_by != 0 && acquired_by == 0)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
@ -1153,7 +1161,11 @@ replorigin_session_setup(RepOriginId node)
Assert(session_replication_state->roident != InvalidRepOriginId);
session_replication_state->acquired_by = MyProcPid;
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
else if (session_replication_state->acquired_by != acquired_by)
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by);
LWLockRelease(ReplicationOriginLock);
@ -1337,7 +1349,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
replorigin_session_setup(origin);
replorigin_session_setup(origin, 0);
replorigin_session_origin = origin;

View File

@ -1164,10 +1164,14 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
/*
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
* same for the top-level transaction abort.
*
* If write_abort_info is true, send the abort_lsn and abort_time fields,
* otherwise don't.
*/
void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid)
TransactionId subxid, XLogRecPtr abort_lsn,
TimestampTz abort_time, bool write_abort_info)
{
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
@ -1176,19 +1180,40 @@ logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
/* transaction ID */
pq_sendint32(out, xid);
pq_sendint32(out, subxid);
if (write_abort_info)
{
pq_sendint64(out, abort_lsn);
pq_sendint64(out, abort_time);
}
}
/*
* Read STREAM ABORT from the output stream.
*
* If read_abort_info is true, read the abort_lsn and abort_time fields,
* otherwise don't.
*/
void
logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid)
logicalrep_read_stream_abort(StringInfo in,
LogicalRepStreamAbortData *abort_data,
bool read_abort_info)
{
Assert(xid && subxid);
Assert(abort_data);
*xid = pq_getmsgint(in, 4);
*subxid = pq_getmsgint(in, 4);
abort_data->xid = pq_getmsgint(in, 4);
abort_data->subxid = pq_getmsgint(in, 4);
if (read_abort_info)
{
abort_data->abort_lsn = pq_getmsgint64(in);
abort_data->abort_time = pq_getmsgint64(in);
}
else
{
abort_data->abort_lsn = InvalidXLogRecPtr;
abort_data->abort_time = 0;
}
}
/*

View File

@ -2873,7 +2873,8 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* disk.
*/
void
ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
TimestampTz abort_time)
{
ReorderBufferTXN *txn;
@ -2884,6 +2885,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
if (txn == NULL)
return;
txn->xact_time.abort_time = abort_time;
/* For streamed transactions notify the remote node about the abort. */
if (rbtxn_is_streamed(txn))
{

View File

@ -14,7 +14,7 @@
* The initial data synchronization is done separately for each table,
* in a separate apply worker that only fetches the initial snapshot data
* from the publisher and then synchronizes the position in the stream with
* the main apply worker.
* the leader apply worker.
*
* There are several reasons for doing the synchronization this way:
* - It allows us to parallelize the initial data synchronization
@ -153,7 +153,7 @@ finish_sync_worker(void)
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
/* Find the main apply worker and signal it. */
/* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
@ -588,7 +588,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
rstate->relid);
rstate->relid,
DSM_HANDLE_INVALID);
hentry->last_start_time = now;
}
}
@ -636,6 +637,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
/*
* Skip for parallel apply workers because they only operate on tables
* that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
if (am_parallel_apply_worker())
return;
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
@ -1254,7 +1263,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Here we use the slot name instead of the subscription name as the
* application_name, so that it is different from the main apply worker,
* application_name, so that it is different from the leader apply worker,
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
@ -1302,7 +1311,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* time this tablesync was launched.
*/
originid = replorigin_by_name(originname, false);
replorigin_session_setup(originid);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
@ -1413,7 +1422,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
true /* go backward */ , true /* WAL log */ );
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_session_setup(originid);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
}
else
@ -1468,8 +1477,8 @@ copy_table_done:
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
* Finally, wait until the main apply worker tells us to catch up and then
* return to let LogicalRepApplyLoop do it.
* Finally, wait until the leader apply worker tells us to catch up and
* then return to let LogicalRepApplyLoop do it.
*/
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;

1366
src/backend/replication/logical/worker.c Normal file → Executable file

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
@ -290,7 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool origin_option_given = false;
data->binary = false;
data->streaming = false;
data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
@ -369,7 +370,7 @@ parse_output_parameters(List *options, PGOutputData *data)
errmsg("conflicting or redundant options")));
streaming_given = true;
data->streaming = defGetBoolean(defel);
data->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@ -461,13 +462,20 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
if (!data->streaming)
if (data->streaming == LOGICALREP_STREAM_OFF)
ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
else if (data->streaming == LOGICALREP_STREAM_ON &&
data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -1841,6 +1849,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
/*
* The abort should happen outside streaming block, even for streamed
@ -1854,7 +1864,9 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
txn->xact_time.abort_time, write_abort_info);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);

View File

@ -22,6 +22,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logicalworker.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h"
@ -657,6 +658,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_LOG_MEMORY_CONTEXT))
HandleLogMemoryContextInterrupt();
if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
HandleParallelApplyMessageInterrupt();
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);

View File

@ -1117,6 +1117,45 @@ UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
LockRelease(&tag, lockmode, true);
}
/*
* LockApplyTransactionForSession
*
* Obtain a session-level lock on a transaction being applied on a logical
* replication subscriber. See LockRelationIdForSession for notes about
* session-level locks.
*/
void
LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_APPLY_TRANSACTION(tag,
MyDatabaseId,
suboid,
xid,
objid);
(void) LockAcquire(&tag, lockmode, true, false);
}
/*
* UnlockApplyTransactionForSession
*/
void
UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_APPLY_TRANSACTION(tag,
MyDatabaseId,
suboid,
xid,
objid);
LockRelease(&tag, lockmode, true);
}
/*
* Append a description of a lockable object to buf.
@ -1202,6 +1241,13 @@ DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
tag->locktag_field3,
tag->locktag_field4);
break;
case LOCKTAG_APPLY_TRANSACTION:
appendStringInfo(buf,
_("remote transaction %u of subscription %u of database %u"),
tag->locktag_field3,
tag->locktag_field2,
tag->locktag_field1);
break;
default:
appendStringInfo(buf,
_("unrecognized locktag type %d"),

View File

@ -3391,6 +3391,9 @@ ProcessInterrupts(void)
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
if (ParallelApplyMessagePending)
HandleParallelApplyMessages();
}
/*

View File

@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
event_name = "LogicalLauncherMain";
break;
case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN:
event_name = "LogicalParallelApplyMain";
break;
case WAIT_EVENT_RECOVERY_WAL_STREAM:
event_name = "RecoveryWalStream";
break;
@ -388,6 +391,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT:
event_name = "HashGrowBucketsReinsert";
break;
case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE:
event_name = "LogicalParallelApplyStateChange";
break;
case WAIT_EVENT_LOGICAL_SYNC_DATA:
event_name = "LogicalSyncData";
break;

View File

@ -37,10 +37,11 @@ const char *const LockTagTypeNames[] = {
"spectoken",
"object",
"userlock",
"advisory"
"advisory",
"applytransaction"
};
StaticAssertDecl(lengthof(LockTagTypeNames) == (LOCKTAG_ADVISORY + 1),
StaticAssertDecl(lengthof(LockTagTypeNames) == (LOCKTAG_LAST_TYPE + 1),
"array length mismatch");
/* This must match enum PredicateLockTargetType (predicate_internals.h) */
@ -312,6 +313,17 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[8] = true;
nulls[9] = true;
break;
case LOCKTAG_APPLY_TRANSACTION:
values[1] = ObjectIdGetDatum(instance->locktag.locktag_field1);
values[8] = ObjectIdGetDatum(instance->locktag.locktag_field2);
values[6] = ObjectIdGetDatum(instance->locktag.locktag_field3);
values[9] = Int16GetDatum(instance->locktag.locktag_field4);
nulls[2] = true;
nulls[3] = true;
nulls[4] = true;
nulls[5] = true;
nulls[7] = true;
break;
case LOCKTAG_OBJECT:
case LOCKTAG_USERLOCK:
case LOCKTAG_ADVISORY:

View File

@ -3002,6 +3002,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
{
{"max_parallel_apply_workers_per_subscription",
PGC_SIGHUP,
REPLICATION_SUBSCRIBERS,
gettext_noop("Maximum number of parallel apply workers per subscription."),
NULL,
},
&max_parallel_apply_workers_per_subscription,
2, 0, MAX_PARALLEL_WORKER_LIMIT,
NULL, NULL, NULL
},
{
{"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE,
gettext_noop("Sets the amount of time to wait before forcing "

View File

@ -359,6 +359,7 @@
#max_logical_replication_workers = 4 # taken from max_worker_processes
# (change requires restart)
#max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers
#max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers
#------------------------------------------------------------------------------

View File

@ -4533,7 +4533,7 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 140000)
appendPQExpBufferStr(query, " s.substream,\n");
else
appendPQExpBufferStr(query, " false AS substream,\n");
appendPQExpBufferStr(query, " 'f' AS substream,\n");
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
@ -4670,8 +4670,10 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
if (strcmp(subinfo->subbinary, "t") == 0)
appendPQExpBufferStr(query, ", binary = true");
if (strcmp(subinfo->substream, "f") != 0)
if (strcmp(subinfo->substream, "t") == 0)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");

View File

@ -6500,11 +6500,24 @@ describeSubscriptions(const char *pattern, bool verbose)
{
/* Binary mode and streaming are only supported in v14 and higher */
if (pset.sversion >= 140000)
{
appendPQExpBuffer(&buf,
", subbinary AS \"%s\"\n"
", substream AS \"%s\"\n",
gettext_noop("Binary"),
gettext_noop("Streaming"));
", subbinary AS \"%s\"\n",
gettext_noop("Binary"));
if (pset.sversion >= 160000)
appendPQExpBuffer(&buf,
", (CASE substream\n"
" WHEN 'f' THEN 'off'\n"
" WHEN 't' THEN 'on'\n"
" WHEN 'p' THEN 'parallel'\n"
" END) AS \"%s\"\n",
gettext_noop("Streaming"));
else
appendPQExpBuffer(&buf,
", substream AS \"%s\"\n",
gettext_noop("Streaming"));
}
/* Two_phase and disable_on_error are only supported in v15 and higher */
if (pset.sversion >= 150000)

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202212241
#define CATALOG_VERSION_NO 202301091
#endif

View File

@ -80,7 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
bool substream; /* Stream in-progress transactions. */
char substream; /* Stream in-progress transactions. See
* LOGICALREP_STREAM_xxx constants. */
char subtwophasestate; /* Stream two-phase transactions */
@ -124,7 +125,8 @@ typedef struct Subscription
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
bool stream; /* Allow streaming in-progress transactions. */
char stream; /* Allow streaming in-progress transactions.
* See LOGICALREP_STREAM_xxx constants. */
char twophasestate; /* Allow streaming two-phase transactions */
bool disableonerr; /* Indicates if the subscription should be
* automatically disabled if a worker error
@ -137,6 +139,21 @@ typedef struct Subscription
* specified origin */
} Subscription;
/* Disallow streaming in-progress transactions. */
#define LOGICALREP_STREAM_OFF 'f'
/*
* Streaming in-progress transactions are written to a temporary file and
* applied only after the transaction is committed on upstream.
*/
#define LOGICALREP_STREAM_ON 't'
/*
* Streaming in-progress transactions are applied immediately via a parallel
* apply worker.
*/
#define LOGICALREP_STREAM_PARALLEL 'p'
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);

View File

@ -26,4 +26,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
extern char defGetStreamingMode(DefElem *def);
#endif /* SUBSCRIPTIONCMDS_H */

View File

@ -14,6 +14,7 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);

View File

@ -32,12 +32,17 @@
*
* LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
* support for two-phase commit decoding (at prepare time). Introduced in PG15.
*
* LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
* where we support applying large streaming transactions in parallel.
* Introduced in PG16.
*/
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
/*
* Logical message types
@ -175,6 +180,17 @@ typedef struct LogicalRepRollbackPreparedTxnData
char gid[GIDSIZE];
} LogicalRepRollbackPreparedTxnData;
/*
* Transaction protocol information for stream abort.
*/
typedef struct LogicalRepStreamAbortData
{
TransactionId xid;
TransactionId subxid;
XLogRecPtr abort_lsn;
TimestampTz abort_time;
} LogicalRepStreamAbortData;
extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
extern void logicalrep_read_begin(StringInfo in,
LogicalRepBeginData *begin_data);
@ -246,9 +262,13 @@ extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn
extern TransactionId logicalrep_read_stream_commit(StringInfo in,
LogicalRepCommitData *commit_data);
extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid);
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid);
TransactionId subxid,
XLogRecPtr abort_lsn,
TimestampTz abort_time,
bool write_abort_info);
extern void logicalrep_read_stream_abort(StringInfo in,
LogicalRepStreamAbortData *abort_data,
bool read_abort_info);
extern char *logicalrep_message_type(LogicalRepMsgType action);
#endif /* LOGICAL_PROTO_H */

View File

@ -12,9 +12,18 @@
#ifndef LOGICALWORKER_H
#define LOGICALWORKER_H
#include <signal.h>
extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);
extern void HandleParallelApplyMessageInterrupt(void);
extern void HandleParallelApplyMessages(void);
extern void LogicalRepWorkersWakeupAtCommit(Oid subid);

View File

@ -53,7 +53,7 @@ extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
extern void replorigin_session_advance(XLogRecPtr remote_commit,
XLogRecPtr local_commit);
extern void replorigin_session_setup(RepOriginId node);
extern void replorigin_session_setup(RepOriginId node, int acquired_by);
extern void replorigin_session_reset(void);
extern XLogRecPtr replorigin_session_get_progress(bool flush);

View File

@ -26,7 +26,7 @@ typedef struct PGOutputData
List *publication_names;
List *publications;
bool binary;
bool streaming;
char streaming;
bool messages;
bool two_phase;
char *origin;

View File

@ -316,6 +316,7 @@ typedef struct ReorderBufferTXN
{
TimestampTz commit_time;
TimestampTz prepare_time;
TimestampTz abort_time;
} xact_time;
/*
@ -678,7 +679,8 @@ extern void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
extern void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr commit_lsn,
XLogRecPtr end_lsn);
extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
TimestampTz abort_time);
extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);

View File

@ -182,7 +182,7 @@ typedef struct
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
bool streaming; /* Streaming of large transactions */
char *streaming_str; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
char *origin; /* Only publish data originating from the

View File

@ -17,8 +17,13 @@
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "miscadmin.h"
#include "replication/logicalrelation.h"
#include "storage/buffile.h"
#include "storage/fileset.h"
#include "storage/lock.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
@ -53,13 +58,24 @@ typedef struct LogicalRepWorker
/*
* Used to create the changes and subxact files for the streaming
* transactions. Upon the arrival of the first streaming transaction, the
* fileset will be initialized, and it will be deleted when the worker
* exits. Under this, separate buffiles would be created for each
* transaction which will be deleted after the transaction is finished.
* transactions. Upon the arrival of the first streaming transaction or
* when the first-time leader apply worker times out while sending changes
* to the parallel apply worker, the fileset will be initialized, and it
* will be deleted when the worker exits. Under this, separate buffiles
* would be created for each transaction which will be deleted after the
* transaction is finished.
*/
FileSet *stream_fileset;
/*
* PID of leader apply worker if this slot is used for a parallel apply
* worker, InvalidPid otherwise.
*/
pid_t apply_leader_pid;
/* Indicates whether apply can be performed in parallel. */
bool parallel_apply;
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@ -68,9 +84,138 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
/*
* State of the transaction in parallel apply worker.
*
* The enum values must have the same order as the transaction state
* transitions.
*/
typedef enum ParallelTransState
{
PARALLEL_TRANS_UNKNOWN,
PARALLEL_TRANS_STARTED,
PARALLEL_TRANS_FINISHED
} ParallelTransState;
/*
* State of fileset used to communicate changes from leader to parallel
* apply worker.
*
* FS_EMPTY indicates an initial state where the leader doesn't need to use
* the file to communicate with the parallel apply worker.
*
* FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
* to the file.
*
* FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
* the file.
*
* FS_READY indicates that it is now ok for a parallel apply worker to
* read the file.
*/
typedef enum PartialFileSetState
{
FS_EMPTY,
FS_SERIALIZE_IN_PROGRESS,
FS_SERIALIZE_DONE,
FS_READY
} PartialFileSetState;
/*
* Struct for sharing information between leader apply worker and parallel
* apply workers.
*/
typedef struct ParallelApplyWorkerShared
{
slock_t mutex;
TransactionId xid;
/*
* State used to ensure commit ordering.
*
* The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
* handling the transaction finish commands while the apply leader will
* wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
* transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
* STREAM_ABORT).
*/
ParallelTransState xact_state;
/* Information from the corresponding LogicalRepWorker slot. */
uint16 logicalrep_worker_generation;
int logicalrep_worker_slot_no;
/*
* Indicates whether there are pending streaming blocks in the queue. The
* parallel apply worker will check it before starting to wait.
*/
pg_atomic_uint32 pending_stream_count;
/*
* XactLastCommitEnd from the parallel apply worker. This is required by
* the leader worker so it can update the lsn_mappings.
*/
XLogRecPtr last_commit_end;
/*
* After entering PARTIAL_SERIALIZE mode, the leader apply worker will
* serialize changes to the file, and share the fileset with the parallel
* apply worker when processing the transaction finish command. Then the
* parallel apply worker will apply all the spooled messages.
*
* FileSet is used here instead of SharedFileSet because we need it to
* survive after releasing the shared memory so that the leader apply
* worker can re-use the same fileset for the next streaming transaction.
*/
PartialFileSetState fileset_state;
FileSet fileset;
} ParallelApplyWorkerShared;
/*
* Information which is used to manage the parallel apply worker.
*/
typedef struct ParallelApplyWorkerInfo
{
/*
* This queue is used to send changes from the leader apply worker to the
* parallel apply worker.
*/
shm_mq_handle *mq_handle;
/*
* This queue is used to transfer error messages from the parallel apply
* worker to the leader apply worker.
*/
shm_mq_handle *error_mq_handle;
dsm_segment *dsm_seg;
/*
* Indicates whether the leader apply worker needs to serialize the
* remaining changes to a file due to timeout when attempting to send data
* to the parallel apply worker via shared memory.
*/
bool serialize_changes;
/*
* True if the worker is being used to process a parallel apply
* transaction. False indicates this worker is available for re-use.
*/
bool in_use;
ParallelApplyWorkerShared *shared;
} ParallelApplyWorkerInfo;
/* Main memory context for apply worker. Permanent during worker lifetime. */
extern PGDLLIMPORT MemoryContext ApplyContext;
extern PGDLLIMPORT MemoryContext ApplyMessageContext;
extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
/* libpqreceiver connection */
extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
@ -84,9 +229,11 @@ extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running);
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
dsm_handle subworker_dsm);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
@ -103,10 +250,78 @@ extern void process_syncing_tables(XLogRecPtr current_lsn);
extern void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
/* Common streaming function to apply all the spooled messages */
extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
XLogRecPtr lsn);
extern void apply_dispatch(StringInfo s);
extern void maybe_reread_subscription(void);
extern void stream_cleanup_files(Oid subid, TransactionId xid);
extern void InitializeApplyWorker(void);
extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
/* Function for apply error callback */
extern void apply_error_callback(void *arg);
extern void set_apply_error_context_origin(char *originname);
/* Parallel apply worker setup and interactions */
extern void pa_allocate_worker(TransactionId xid);
extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
extern void pa_detach_all_error_mq(void);
extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
const void *data);
extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
bool stream_locked);
extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
ParallelTransState in_xact);
extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
extern void pa_start_subtrans(TransactionId current_xid,
TransactionId top_xid);
extern void pa_reset_subtrans(void);
extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
PartialFileSetState fileset_state);
extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid)
static inline bool
am_tablesync_worker(void)
{
return OidIsValid(MyLogicalRepWorker->relid);
}
static inline bool
am_leader_apply_worker(void)
{
return (!am_tablesync_worker() &&
!isParallelApplyWorker(MyLogicalRepWorker));
}
static inline bool
am_parallel_apply_worker(void)
{
return isParallelApplyWorker(MyLogicalRepWorker);
}
#endif /* WORKER_INTERNAL_H */

View File

@ -107,6 +107,11 @@ extern void LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
extern void UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
LOCKMODE lockmode);
extern void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode);
extern void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode);
/* Describe a locktag for error messages */
extern void DescribeLockTag(StringInfo buf, const LOCKTAG *tag);

View File

@ -149,10 +149,12 @@ typedef enum LockTagType
LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */
LOCKTAG_OBJECT, /* non-relation database object */
LOCKTAG_USERLOCK, /* reserved for old contrib/userlock code */
LOCKTAG_ADVISORY /* advisory user locks */
LOCKTAG_ADVISORY, /* advisory user locks */
LOCKTAG_APPLY_TRANSACTION /* transaction being applied on a logical
* replication subscriber */
} LockTagType;
#define LOCKTAG_LAST_TYPE LOCKTAG_ADVISORY
#define LOCKTAG_LAST_TYPE LOCKTAG_APPLY_TRANSACTION
extern PGDLLIMPORT const char *const LockTagTypeNames[];
@ -278,6 +280,17 @@ typedef struct LOCKTAG
(locktag).locktag_type = LOCKTAG_ADVISORY, \
(locktag).locktag_lockmethodid = USER_LOCKMETHOD)
/*
* ID info for a remote transaction on a logical replication subscriber is: DB
* OID + SUBSCRIPTION OID + TRANSACTION ID + OBJID
*/
#define SET_LOCKTAG_APPLY_TRANSACTION(locktag,dboid,suboid,xid,objid) \
((locktag).locktag_field1 = (dboid), \
(locktag).locktag_field2 = (suboid), \
(locktag).locktag_field3 = (xid), \
(locktag).locktag_field4 = (objid), \
(locktag).locktag_type = LOCKTAG_APPLY_TRANSACTION, \
(locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
/*
* Per-locked-object lock information:

View File

@ -35,6 +35,7 @@ typedef enum
PROCSIG_WALSND_INIT_STOPPING, /* ask walsenders to prepare for shutdown */
PROCSIG_BARRIER, /* global barrier interrupt */
PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_DATABASE,

View File

@ -42,6 +42,7 @@ typedef enum
WAIT_EVENT_CHECKPOINTER_MAIN,
WAIT_EVENT_LOGICAL_APPLY_MAIN,
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN,
WAIT_EVENT_RECOVERY_WAL_STREAM,
WAIT_EVENT_SYSLOGGER_MAIN,
WAIT_EVENT_WAL_RECEIVER_MAIN,
@ -105,6 +106,7 @@ typedef enum
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT,
WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE,
WAIT_EVENT_LOGICAL_SYNC_DATA,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
WAIT_EVENT_MQ_INTERNAL,

View File

@ -117,7 +117,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | none | off | dbname=regress_doesnotexist | 0/0
regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@ -125,7 +125,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@ -138,7 +138,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -158,7 +158,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@ -170,7 +170,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@ -205,7 +205,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@ -242,7 +242,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@ -251,13 +251,13 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
-- fail - streaming must be boolean or 'parallel'
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
ERROR: streaming requires a Boolean value
ERROR: streaming requires a Boolean value or "parallel"
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: subscription was created, but is not connected
@ -266,7 +266,15 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
@ -275,7 +283,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@ -293,7 +301,7 @@ ERROR: publication "testpub1" is already in subscription "regress_testsub"
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more then once
@ -311,7 +319,7 @@ ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (ref
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@ -350,7 +358,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@ -362,7 +370,7 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -375,7 +383,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -391,7 +399,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@ -399,7 +407,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

View File

@ -167,7 +167,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
-- fail - streaming must be boolean or 'parallel'
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
-- now it works
@ -175,6 +175,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

View File

@ -8,6 +8,128 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
# Interleave a pair of transactions, each exceeding the 64kB limit.
my $in = '';
my $out = '';
my $offset = 0;
my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
on_error_stop => 0);
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$in .= q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
};
$h->pump_nb;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i);
DELETE FROM test_tab WHERE a > 5000;
COMMIT;
});
$in .= q{
COMMIT;
\q
};
$h->finish; # errors make the next test fail, so ignore them here
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3334|3334|3334),
'check extra columns contain local defaults');
# Test the streaming in binary mode
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET (binary = on)");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667),
'check extra columns contain local defaults');
# Change the local values of the extra columns on the subscriber,
# update publisher, and check that subscriber retains the expected
# values. This is to ensure that non-streaming transactions behave
# properly after a streaming transaction.
$node_subscriber->safe_psql('postgres',
"UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"
);
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql('postgres',
"UPDATE test_tab SET b = md5(a::text)");
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"
);
is($result, qq(6667|6667|6667),
'check extra columns contain locally changed data');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
@ -26,17 +148,27 @@ $node_publisher->safe_psql('postgres',
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
$node_subscriber->safe_psql('postgres',
"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
@ -49,6 +181,43 @@ my $result =
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 0);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
# Test that the deadlock is detected among the leader and parallel apply
# workers.
$node_subscriber->append_conf('postgresql.conf', "deadlock_timeout = 10ms");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
# Interleave a pair of transactions, each exceeding the 64kB limit.
my $in = '';
my $out = '';
@ -58,73 +227,90 @@ my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
on_error_stop => 0);
# Confirm if a deadlock between the leader apply worker and the parallel apply
# worker can be detected.
my $offset = -s $node_subscriber->logfile;
$in .= q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
};
$h->pump_nb;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i);
DELETE FROM test_tab WHERE a > 5000;
COMMIT;
});
# Ensure that the parallel apply worker executes the insert command before the
# leader worker.
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/,
$offset);
$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
$in .= q{
COMMIT;
\q
};
$h->finish; # errors make the next test fail, so ignore them here
$h->finish;
$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
$offset);
# In order for the two transactions to be completed normally without causing
# conflicts due to the unique index, we temporarily drop it.
$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");
# Wait for this streaming transaction to be applied in the apply worker.
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(5001), 'data replicated to subscriber after dropping index');
# Test the streaming in binary mode
# Clean up test data from the environment.
$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
$node_publisher->wait_for_catchup($appname);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET (binary = on)");
"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql(
'postgres', q{
# Confirm if a deadlock between two parallel apply workers can be detected.
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$in .= q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
};
$h->pump_nb;
# Ensure that the first parallel apply worker executes the insert command
# before the second one.
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/,
$offset);
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
$in .= q{
COMMIT;
\q
};
$h->finish;
$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
$offset);
# In order for the two transactions to be completed normally without causing
# conflicts due to the unique index, we temporarily drop it.
$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");
# Wait for this streaming transaction to be applied in the apply worker.
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
# Change the local values of the extra columns on the subscriber,
# update publisher, and check that subscriber retains the expected
# values. This is to ensure that non-streaming transactions behave
# properly after a streaming transaction.
$node_subscriber->safe_psql('postgres',
"UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"
);
$node_publisher->safe_psql('postgres',
"UPDATE test_tab SET b = md5(a::text)");
$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"
);
is($result, qq(6667|6667|6667),
'check extra columns contain locally changed data');
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(10000), 'data replicated to subscriber after dropping index');
$node_subscriber->stop;
$node_publisher->stop;

View File

@ -8,6 +8,73 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s4;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(12|12|12),
'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
);
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
@ -37,6 +104,10 @@ $node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
@ -49,41 +120,34 @@ my $result =
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s4;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
test_streaming($node_publisher, $node_subscriber, $appname, 0);
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(12|12|12),
'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
$node_subscriber->stop;
$node_publisher->stop;

View File

@ -2,6 +2,9 @@
# Copyright (c) 2021-2023, PostgreSQL Global Development Group
# Test streaming of large transaction with DDL and subtransactions
#
# This file is mainly to test the DDL/DML interaction of the publisher side,
# so we didn't add a parallel apply version for the tests in this file.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;

View File

@ -8,6 +8,124 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with DDL, DML and ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (3, md5(3::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (4, md5(4::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (5, md5(5::text));
SAVEPOINT s3;
INSERT INTO test_tab VALUES (6, md5(6::text));
ROLLBACK TO s2;
INSERT INTO test_tab VALUES (7, md5(7::text));
ROLLBACK TO s1;
INSERT INTO test_tab VALUES (8, md5(8::text));
SAVEPOINT s4;
INSERT INTO test_tab VALUES (9, md5(9::text));
SAVEPOINT s5;
INSERT INTO test_tab VALUES (10, md5(10::text));
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(6|0),
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
);
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with subscriber receiving out of order
# subtransaction ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (11, md5(11::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (12, md5(12::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (13, md5(13::text));
SAVEPOINT s3;
INSERT INTO test_tab VALUES (14, md5(14::text));
RELEASE s2;
INSERT INTO test_tab VALUES (15, md5(15::text));
ROLLBACK TO s1;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0),
'check rollback to savepoint was reflected on subscriber');
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with subscriber receiving rollback
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (16, md5(16::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (17, md5(17::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (18, md5(18::text));
ROLLBACK;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0), 'check rollback was reflected on subscriber');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
@ -36,6 +154,10 @@ $node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
@ -48,81 +170,33 @@ my $result =
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
# streamed transaction with DDL, DML and ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (3, md5(3::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (4, md5(4::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (5, md5(5::text));
SAVEPOINT s3;
INSERT INTO test_tab VALUES (6, md5(6::text));
ROLLBACK TO s2;
INSERT INTO test_tab VALUES (7, md5(7::text));
ROLLBACK TO s1;
INSERT INTO test_tab VALUES (8, md5(8::text));
SAVEPOINT s4;
INSERT INTO test_tab VALUES (9, md5(9::text));
SAVEPOINT s5;
INSERT INTO test_tab VALUES (10, md5(10::text));
COMMIT;
});
test_streaming($node_publisher, $node_subscriber, $appname, 0);
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(6|0),
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
# streamed transaction with subscriber receiving out of order subtransaction
# ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (11, md5(11::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (12, md5(12::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (13, md5(13::text));
SAVEPOINT s3;
INSERT INTO test_tab VALUES (14, md5(14::text));
RELEASE s2;
INSERT INTO test_tab VALUES (15, md5(15::text));
ROLLBACK TO s1;
COMMIT;
});
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->wait_for_catchup($appname);
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0),
'check rollback to savepoint was reflected on subscriber');
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# streamed transaction with subscriber receiving rollback
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (16, md5(16::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (17, md5(17::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (18, md5(18::text));
ROLLBACK;
});
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0), 'check rollback was reflected on subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 1);
$node_subscriber->stop;
$node_publisher->stop;

View File

@ -3,6 +3,9 @@
# Test streaming of transaction with subtransactions, DDLs, DMLs, and
# rollbacks
#
# This file is mainly to test the DDL/DML interaction of the publisher side,
# so we didn't add a parallel apply version for the tests in this file.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;

View File

@ -5,6 +5,8 @@
#
# Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
#
# Two-phase and parallel apply will be tested in 023_twophase_stream, so we
# didn't add a parallel apply version for the tests in this file.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;

View File

@ -8,6 +8,289 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
# 1. Data is streamed as a 2PC transaction.
# 2. Then do commit prepared.
#
# Expect all data is replicated on subscriber side after the commit.
###############################
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# 2PC transaction gets committed
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is committed on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(4|4|4),
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
###############################
# Test 2PC PREPARE / ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. Do rollback prepared.
#
# Expect data rolls back leaving only the original 2 rows.
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# 2PC transaction gets aborted
$node_publisher->safe_psql('postgres',
"ROLLBACK PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is aborted on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2),
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
# 1. insert, update and delete some rows.
# 2. Then server crashes before the 2PC transaction is committed.
# 3. After servers are restarted the pending transaction is committed.
#
# Expect all data is replicated on subscriber side after the commit.
# Note: both publisher and subscriber do crash/restart.
###############################
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_subscriber->stop('immediate');
$node_publisher->stop('immediate');
$node_publisher->start;
$node_subscriber->start;
# We don't try to check the log for parallel option here as the subscriber
# may have stopped after finishing the prepare and before logging the
# appropriate message.
# commit post the restart
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check inserts are visible
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(4|4|4),
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
);
###############################
# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a ROLLBACK PREPARED.
#
# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
# (the original 2 + inserted 1).
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (99999, 'foobar')");
# 2PC transaction gets aborted
$node_publisher->safe_psql('postgres',
"ROLLBACK PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is aborted on subscriber,
# but the extra INSERT outside of the 2PC still was replicated
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3|3|3),
'check the outside insert was copied to subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# Do INSERT after the PREPARE but before COMMIT PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a COMMIT PREPARED.
#
# Expect 2PC data + the extra row are on the subscriber
# (the 3334 + inserted 1 = 3335).
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (99999, 'foobar')");
# 2PC transaction gets committed
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is committed on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(5|5|5),
'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
$node_publisher->wait_for_catchup($appname);
}
###############################
# Setup
###############################
@ -48,6 +331,10 @@ $node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub
@ -64,236 +351,38 @@ my $twophase_query =
$node_subscriber->poll_query_until('postgres', $twophase_query)
or die "Timed out while waiting for subscriber to enable twophase";
###############################
# Check initial data was copied to subscriber
###############################
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
# 1. Data is streamed as a 2PC transaction.
# 2. Then do commit prepared.
#
# Expect all data is replicated on subscriber side after the commit.
###############################
test_streaming($node_publisher, $node_subscriber, $appname, 0);
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# 2PC transaction gets committed
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is committed on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(4|4|4),
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
###############################
# Test 2PC PREPARE / ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. Do rollback prepared.
#
# Expect data rolls back leaving only the original 2 rows.
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# 2PC transaction gets aborted
$node_publisher->safe_psql('postgres',
"ROLLBACK PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is aborted on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2),
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
# 1. insert, update and delete some rows.
# 2. Then server crashes before the 2PC transaction is committed.
# 3. After servers are restarted the pending transaction is committed.
#
# Expect all data is replicated on subscriber side after the commit.
# Note: both publisher and subscriber do crash/restart.
###############################
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_subscriber->stop('immediate');
$node_publisher->stop('immediate');
$node_publisher->start;
$node_subscriber->start;
# commit post the restart
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check inserts are visible
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(4|4|4),
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
###############################
# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a ROLLBACK PREPARED.
#
# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
# (the original 2 + inserted 1).
###############################
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
$node_publisher->wait_for_catchup($appname);
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (99999, 'foobar')");
# 2PC transaction gets aborted
$node_publisher->safe_psql('postgres',
"ROLLBACK PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is aborted on subscriber,
# but the extra INSERT outside of the 2PC still was replicated
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# Do INSERT after the PREPARE but before COMMIT PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a COMMIT PREPARED.
#
# Expect 2PC data + the extra row are on the subscriber
# (the 3334 + inserted 1 = 3335).
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (99999, 'foobar')");
# 2PC transaction gets committed
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is committed on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(5|5|5),
'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 1);
###############################
# check all the cleanup

View File

@ -1473,6 +1473,7 @@ LogicalRepRelId
LogicalRepRelMapEntry
LogicalRepRelation
LogicalRepRollbackPreparedTxnData
LogicalRepStreamAbortData
LogicalRepTupleData
LogicalRepTyp
LogicalRepWorker
@ -1876,6 +1877,9 @@ PageXLogRecPtr
PagetableEntry
Pairs
ParallelAppendState
ParallelApplyWorkerEntry
ParallelApplyWorkerInfo
ParallelApplyWorkerShared
ParallelBitmapHeapState
ParallelBlockTableScanDesc
ParallelBlockTableScanWorker
@ -1895,6 +1899,7 @@ ParallelSlotResultHandler
ParallelState
ParallelTableScanDesc
ParallelTableScanDescData
ParallelTransState
ParallelVacuumState
ParallelWorkerContext
ParallelWorkerInfo
@ -1924,6 +1929,7 @@ ParserState
PartClauseInfo
PartClauseMatchStatus
PartClauseTarget
PartialFileSetState
PartitionBoundInfo
PartitionBoundInfoData
PartitionBoundSpec
@ -2774,6 +2780,7 @@ TocEntry
TokenAuxData
TokenizedAuthLine
TrackItem
TransApplyAction
TransInvalidationInfo
TransState
TransactionId