1631 lines
49 KiB
C
1631 lines
49 KiB
C
/*-------------------------------------------------------------------------
|
|
* applyparallelworker.c
|
|
* Support routines for applying xact by parallel apply worker
|
|
*
|
|
* Copyright (c) 2023, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/logical/applyparallelworker.c
|
|
*
|
|
* This file contains the code to launch, set up, and teardown a parallel apply
|
|
* worker which receives the changes from the leader worker and invokes routines
|
|
* to apply those on the subscriber database. Additionally, this file contains
|
|
* routines that are intended to support setting up, using, and tearing down a
|
|
* ParallelApplyWorkerInfo which is required so the leader worker and parallel
|
|
* apply workers can communicate with each other.
|
|
*
|
|
* The parallel apply workers are assigned (if available) as soon as xact's
|
|
* first stream is received for subscriptions that have set their 'streaming'
|
|
* option as parallel. The leader apply worker will send changes to this new
|
|
* worker via shared memory. We keep this worker assigned till the transaction
|
|
* commit is received and also wait for the worker to finish at commit. This
|
|
* preserves commit ordering and avoid file I/O in most cases, although we
|
|
* still need to spill to a file if there is no worker available. See comments
|
|
* atop logical/worker to know more about streamed xacts whose changes are
|
|
* spilled to disk. It is important to maintain commit order to avoid failures
|
|
* due to: (a) transaction dependencies - say if we insert a row in the first
|
|
* transaction and update it in the second transaction on publisher then
|
|
* allowing the subscriber to apply both in parallel can lead to failure in the
|
|
* update; (b) deadlocks - allowing transactions that update the same set of
|
|
* rows/tables in the opposite order to be applied in parallel can lead to
|
|
* deadlocks.
|
|
*
|
|
* A worker pool is used to avoid restarting workers for each streaming
|
|
* transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
|
|
* in the ParallelApplyWorkerPool. After successfully launching a new worker,
|
|
* its information is added to the ParallelApplyWorkerPool. Once the worker
|
|
* finishes applying the transaction, it is marked as available for re-use.
|
|
* Now, before starting a new worker to apply the streaming transaction, we
|
|
* check the list for any available worker. Note that we retain a maximum of
|
|
* half the max_parallel_apply_workers_per_subscription workers in the pool and
|
|
* after that, we simply exit the worker after applying the transaction.
|
|
*
|
|
* XXX This worker pool threshold is arbitrary and we can provide a GUC
|
|
* variable for this in the future if required.
|
|
*
|
|
* The leader apply worker will create a separate dynamic shared memory segment
|
|
* when each parallel apply worker starts. The reason for this design is that
|
|
* we cannot predict how many workers will be needed. It may be possible to
|
|
* allocate enough shared memory in one segment based on the maximum number of
|
|
* parallel apply workers (max_parallel_apply_workers_per_subscription), but
|
|
* this would waste memory if no process is actually started.
|
|
*
|
|
* The dynamic shared memory segment contains: (a) a shm_mq that is used to
|
|
* send changes in the transaction from leader apply worker to parallel apply
|
|
* worker; (b) another shm_mq that is used to send errors (and other messages
|
|
* reported via elog/ereport) from the parallel apply worker to leader apply
|
|
* worker; (c) necessary information to be shared among parallel apply workers
|
|
* and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
|
|
*
|
|
* Locking Considerations
|
|
* ----------------------
|
|
* We have a risk of deadlock due to concurrently applying the transactions in
|
|
* parallel mode that were independent on the publisher side but became
|
|
* dependent on the subscriber side due to the different database structures
|
|
* (like schema of subscription tables, constraints, etc.) on each side. This
|
|
* can happen even without parallel mode when there are concurrent operations
|
|
* on the subscriber. In order to detect the deadlocks among leader (LA) and
|
|
* parallel apply (PA) workers, we used lmgr locks when the PA waits for the
|
|
* next stream (set of changes) and LA waits for PA to finish the transaction.
|
|
* An alternative approach could be to not allow parallelism when the schema of
|
|
* tables is different between the publisher and subscriber but that would be
|
|
* too restrictive and would require the publisher to send much more
|
|
* information than it is currently sending.
|
|
*
|
|
* Consider a case where the subscribed table does not have a unique key on the
|
|
* publisher and has a unique key on the subscriber. The deadlock can happen in
|
|
* the following ways:
|
|
*
|
|
* 1) Deadlock between the leader apply worker and a parallel apply worker
|
|
*
|
|
* Consider that the parallel apply worker (PA) is executing TX-1 and the
|
|
* leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
|
|
* Now, LA is waiting for PA because of the unique key constraint of the
|
|
* subscribed table while PA is waiting for LA to send the next stream of
|
|
* changes or transaction finish command message.
|
|
*
|
|
* In order for lmgr to detect this, we have LA acquire a session lock on the
|
|
* remote transaction (by pa_lock_stream()) and have PA wait on the lock before
|
|
* trying to receive the next stream of changes. Specifically, LA will acquire
|
|
* the lock in AccessExclusive mode before sending the STREAM_STOP and will
|
|
* release it if already acquired after sending the STREAM_START, STREAM_ABORT
|
|
* (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
|
|
* acquire the lock in AccessShare mode after processing STREAM_STOP and
|
|
* STREAM_ABORT (for subtransaction) and then release the lock immediately
|
|
* after acquiring it.
|
|
*
|
|
* The lock graph for the above example will look as follows:
|
|
* LA (waiting to acquire the lock on the unique index) -> PA (waiting to
|
|
* acquire the stream lock) -> LA
|
|
*
|
|
* This way, when PA is waiting for LA for the next stream of changes, we can
|
|
* have a wait-edge from PA to LA in lmgr, which will make us detect the
|
|
* deadlock between LA and PA.
|
|
*
|
|
* 2) Deadlock between the leader apply worker and parallel apply workers
|
|
*
|
|
* This scenario is similar to the first case but TX-1 and TX-2 are executed by
|
|
* two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
|
|
* PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
|
|
* for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
|
|
* transaction in order to preserve the commit order. There is a deadlock among
|
|
* the three processes.
|
|
*
|
|
* In order for lmgr to detect this, we have PA acquire a session lock (this is
|
|
* a different lock than referred in the previous case, see
|
|
* pa_lock_transaction()) on the transaction being applied and have LA wait on
|
|
* the lock before proceeding in the transaction finish commands. Specifically,
|
|
* PA will acquire this lock in AccessExclusive mode before executing the first
|
|
* message of the transaction and release it at the xact end. LA will acquire
|
|
* this lock in AccessShare mode at transaction finish commands and release it
|
|
* immediately.
|
|
*
|
|
* The lock graph for the above example will look as follows:
|
|
* LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
|
|
* lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
|
|
* lock) -> LA
|
|
*
|
|
* This way when LA is waiting to finish the transaction end command to preserve
|
|
* the commit order, we will be able to detect deadlock, if any.
|
|
*
|
|
* One might think we can use XactLockTableWait(), but XactLockTableWait()
|
|
* considers PREPARED TRANSACTION as still in progress which means the lock
|
|
* won't be released even after the parallel apply worker has prepared the
|
|
* transaction.
|
|
*
|
|
* 3) Deadlock when the shm_mq buffer is full
|
|
*
|
|
* In the previous scenario (ie. PA-1 and PA-2 are executing transactions
|
|
* concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
|
|
* wait to send messages, and this wait doesn't appear in lmgr.
|
|
*
|
|
* To avoid this wait, we use a non-blocking write and wait with a timeout. If
|
|
* the timeout is exceeded, the LA will serialize all the pending messages to
|
|
* a file and indicate PA-2 that it needs to read that file for the remaining
|
|
* messages. Then LA will start waiting for commit as in the previous case
|
|
* which will detect deadlock if any. See pa_send_data() and
|
|
* enum TransApplyAction.
|
|
*
|
|
* Lock types
|
|
* ----------
|
|
* Both the stream lock and the transaction lock mentioned above are
|
|
* session-level locks because both locks could be acquired outside the
|
|
* transaction, and the stream lock in the leader needs to persist across
|
|
* transaction boundaries i.e. until the end of the streaming transaction.
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "libpq/pqformat.h"
|
|
#include "libpq/pqmq.h"
|
|
#include "pgstat.h"
|
|
#include "postmaster/interrupt.h"
|
|
#include "replication/logicallauncher.h"
|
|
#include "replication/logicalworker.h"
|
|
#include "replication/origin.h"
|
|
#include "replication/worker_internal.h"
|
|
#include "storage/ipc.h"
|
|
#include "storage/lmgr.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/inval.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/syscache.h"
|
|
|
|
#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
|
|
|
|
/*
|
|
* DSM keys for parallel apply worker. Unlike other parallel execution code,
|
|
* since we don't need to worry about DSM keys conflicting with plan_node_id we
|
|
* can use small integers.
|
|
*/
|
|
#define PARALLEL_APPLY_KEY_SHARED 1
|
|
#define PARALLEL_APPLY_KEY_MQ 2
|
|
#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
|
|
|
|
/* Queue size of DSM, 16 MB for now. */
|
|
#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
|
|
|
|
/*
|
|
* Error queue size of DSM. It is desirable to make it large enough that a
|
|
* typical ErrorResponse can be sent without blocking. That way, a worker that
|
|
* errors out can write the whole message into the queue and terminate without
|
|
* waiting for the user backend.
|
|
*/
|
|
#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
|
|
|
|
/*
|
|
* There are three fields in each message received by the parallel apply
|
|
* worker: start_lsn, end_lsn and send_time. Because we have updated these
|
|
* statistics in the leader apply worker, we can ignore these fields in the
|
|
* parallel apply worker (see function LogicalRepApplyLoop).
|
|
*/
|
|
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
|
|
|
|
/*
|
|
* The type of session-level lock on a transaction being applied on a logical
|
|
* replication subscriber.
|
|
*/
|
|
#define PARALLEL_APPLY_LOCK_STREAM 0
|
|
#define PARALLEL_APPLY_LOCK_XACT 1
|
|
|
|
/*
|
|
* Hash table entry to map xid to the parallel apply worker state.
|
|
*/
|
|
typedef struct ParallelApplyWorkerEntry
|
|
{
|
|
TransactionId xid; /* Hash key -- must be first */
|
|
ParallelApplyWorkerInfo *winfo;
|
|
} ParallelApplyWorkerEntry;
|
|
|
|
/*
|
|
* A hash table used to cache the state of streaming transactions being applied
|
|
* by the parallel apply workers.
|
|
*/
|
|
static HTAB *ParallelApplyTxnHash = NULL;
|
|
|
|
/*
|
|
* A list (pool) of active parallel apply workers. The information for
|
|
* the new worker is added to the list after successfully launching it. The
|
|
* list entry is removed if there are already enough workers in the worker
|
|
* pool at the end of the transaction. For more information about the worker
|
|
* pool, see comments atop this file.
|
|
*/
|
|
static List *ParallelApplyWorkerPool = NIL;
|
|
|
|
/*
|
|
* Information shared between leader apply worker and parallel apply worker.
|
|
*/
|
|
ParallelApplyWorkerShared *MyParallelShared = NULL;
|
|
|
|
/*
|
|
* Is there a message sent by a parallel apply worker that the leader apply
|
|
* worker needs to receive?
|
|
*/
|
|
volatile sig_atomic_t ParallelApplyMessagePending = false;
|
|
|
|
/*
|
|
* Cache the parallel apply worker information required for applying the
|
|
* current streaming transaction. It is used to save the cost of searching the
|
|
* hash table when applying the changes between STREAM_START and STREAM_STOP.
|
|
*/
|
|
static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
|
|
|
|
/* A list to maintain subtransactions, if any. */
|
|
static List *subxactlist = NIL;
|
|
|
|
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
|
|
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
|
|
static PartialFileSetState pa_get_fileset_state(void);
|
|
|
|
/*
|
|
* Returns true if it is OK to start a parallel apply worker, false otherwise.
|
|
*/
|
|
static bool
|
|
pa_can_start(void)
|
|
{
|
|
/* Only leader apply workers can start parallel apply workers. */
|
|
if (!am_leader_apply_worker())
|
|
return false;
|
|
|
|
/*
|
|
* It is good to check for any change in the subscription parameter to
|
|
* avoid the case where for a very long time the change doesn't get
|
|
* reflected. This can happen when there is a constant flow of streaming
|
|
* transactions that are handled by parallel apply workers.
|
|
*
|
|
* It is better to do it before the below checks so that the latest values
|
|
* of subscription can be used for the checks.
|
|
*/
|
|
maybe_reread_subscription();
|
|
|
|
/*
|
|
* Don't start a new parallel apply worker if the subscription is not
|
|
* using parallel streaming mode, or if the publisher does not support
|
|
* parallel apply.
|
|
*/
|
|
if (!MyLogicalRepWorker->parallel_apply)
|
|
return false;
|
|
|
|
/*
|
|
* Don't start a new parallel worker if user has set skiplsn as it's
|
|
* possible that they want to skip the streaming transaction. For
|
|
* streaming transactions, we need to serialize the transaction to a file
|
|
* so that we can get the last LSN of the transaction to judge whether to
|
|
* skip before starting to apply the change.
|
|
*
|
|
* One might think that we could allow parallelism if the first lsn of the
|
|
* transaction is greater than skiplsn, but we don't send it with the
|
|
* STREAM START message, and it doesn't seem worth sending the extra eight
|
|
* bytes with the STREAM START to enable parallelism for this case.
|
|
*/
|
|
if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
|
|
return false;
|
|
|
|
/*
|
|
* For streaming transactions that are being applied using a parallel
|
|
* apply worker, we cannot decide whether to apply the change for a
|
|
* relation that is not in the READY state (see
|
|
* should_apply_changes_for_rel) as we won't know remote_final_lsn by that
|
|
* time. So, we don't start the new parallel apply worker in this case.
|
|
*/
|
|
if (!AllTablesyncsReady())
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Set up a dynamic shared memory segment.
|
|
*
|
|
* We set up a control region that contains a fixed-size worker info
|
|
* (ParallelApplyWorkerShared), a message queue, and an error queue.
|
|
*
|
|
* Returns true on success, false on failure.
|
|
*/
|
|
static bool
|
|
pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
|
|
{
|
|
shm_toc_estimator e;
|
|
Size segsize;
|
|
dsm_segment *seg;
|
|
shm_toc *toc;
|
|
ParallelApplyWorkerShared *shared;
|
|
shm_mq *mq;
|
|
Size queue_size = DSM_QUEUE_SIZE;
|
|
Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
|
|
|
|
/*
|
|
* Estimate how much shared memory we need.
|
|
*
|
|
* Because the TOC machinery may choose to insert padding of oddly-sized
|
|
* requests, we must estimate each chunk separately.
|
|
*
|
|
* We need one key to register the location of the header, and two other
|
|
* keys to track the locations of the message queue and the error message
|
|
* queue.
|
|
*/
|
|
shm_toc_initialize_estimator(&e);
|
|
shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
|
|
shm_toc_estimate_chunk(&e, queue_size);
|
|
shm_toc_estimate_chunk(&e, error_queue_size);
|
|
|
|
shm_toc_estimate_keys(&e, 3);
|
|
segsize = shm_toc_estimate(&e);
|
|
|
|
/* Create the shared memory segment and establish a table of contents. */
|
|
seg = dsm_create(shm_toc_estimate(&e), 0);
|
|
if (!seg)
|
|
return false;
|
|
|
|
toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
|
|
segsize);
|
|
|
|
/* Set up the header region. */
|
|
shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
|
|
SpinLockInit(&shared->mutex);
|
|
|
|
shared->xact_state = PARALLEL_TRANS_UNKNOWN;
|
|
pg_atomic_init_u32(&(shared->pending_stream_count), 0);
|
|
shared->last_commit_end = InvalidXLogRecPtr;
|
|
shared->fileset_state = FS_EMPTY;
|
|
|
|
shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
|
|
|
|
/* Set up message queue for the worker. */
|
|
mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
|
|
shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
|
|
shm_mq_set_sender(mq, MyProc);
|
|
|
|
/* Attach the queue. */
|
|
winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
|
|
|
|
/* Set up error queue for the worker. */
|
|
mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
|
|
error_queue_size);
|
|
shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
|
|
shm_mq_set_receiver(mq, MyProc);
|
|
|
|
/* Attach the queue. */
|
|
winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
|
|
|
|
/* Return results to caller. */
|
|
winfo->dsm_seg = seg;
|
|
winfo->shared = shared;
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Try to get a parallel apply worker from the pool. If none is available then
|
|
* start a new one.
|
|
*/
|
|
static ParallelApplyWorkerInfo *
|
|
pa_launch_parallel_worker(void)
|
|
{
|
|
MemoryContext oldcontext;
|
|
bool launched;
|
|
ParallelApplyWorkerInfo *winfo;
|
|
ListCell *lc;
|
|
|
|
/* Try to get an available parallel apply worker from the worker pool. */
|
|
foreach(lc, ParallelApplyWorkerPool)
|
|
{
|
|
winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
|
|
|
|
if (!winfo->in_use)
|
|
return winfo;
|
|
}
|
|
|
|
/*
|
|
* Start a new parallel apply worker.
|
|
*
|
|
* The worker info can be used for the lifetime of the worker process, so
|
|
* create it in a permanent context.
|
|
*/
|
|
oldcontext = MemoryContextSwitchTo(ApplyContext);
|
|
|
|
winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
|
|
|
|
/* Setup shared memory. */
|
|
if (!pa_setup_dsm(winfo))
|
|
{
|
|
MemoryContextSwitchTo(oldcontext);
|
|
pfree(winfo);
|
|
return NULL;
|
|
}
|
|
|
|
launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
|
MySubscription->oid,
|
|
MySubscription->name,
|
|
MyLogicalRepWorker->userid,
|
|
InvalidOid,
|
|
dsm_segment_handle(winfo->dsm_seg));
|
|
|
|
if (launched)
|
|
{
|
|
ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
|
|
}
|
|
else
|
|
{
|
|
pa_free_worker_info(winfo);
|
|
winfo = NULL;
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return winfo;
|
|
}
|
|
|
|
/*
|
|
* Allocate a parallel apply worker that will be used for the specified xid.
|
|
*
|
|
* We first try to get an available worker from the pool, if any and then try
|
|
* to launch a new worker. On successful allocation, remember the worker
|
|
* information in the hash table so that we can get it later for processing the
|
|
* streaming changes.
|
|
*/
|
|
void
|
|
pa_allocate_worker(TransactionId xid)
|
|
{
|
|
bool found;
|
|
ParallelApplyWorkerInfo *winfo = NULL;
|
|
ParallelApplyWorkerEntry *entry;
|
|
|
|
if (!pa_can_start())
|
|
return;
|
|
|
|
winfo = pa_launch_parallel_worker();
|
|
if (!winfo)
|
|
return;
|
|
|
|
/* First time through, initialize parallel apply worker state hashtable. */
|
|
if (!ParallelApplyTxnHash)
|
|
{
|
|
HASHCTL ctl;
|
|
|
|
MemSet(&ctl, 0, sizeof(ctl));
|
|
ctl.keysize = sizeof(TransactionId);
|
|
ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
|
|
ctl.hcxt = ApplyContext;
|
|
|
|
ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
|
|
16, &ctl,
|
|
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
|
}
|
|
|
|
/* Create an entry for the requested transaction. */
|
|
entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
|
|
if (found)
|
|
elog(ERROR, "hash table corrupted");
|
|
|
|
/* Update the transaction information in shared memory. */
|
|
SpinLockAcquire(&winfo->shared->mutex);
|
|
winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
|
|
winfo->shared->xid = xid;
|
|
SpinLockRelease(&winfo->shared->mutex);
|
|
|
|
winfo->in_use = true;
|
|
winfo->serialize_changes = false;
|
|
entry->winfo = winfo;
|
|
entry->xid = xid;
|
|
}
|
|
|
|
/*
|
|
* Find the assigned worker for the given transaction, if any.
|
|
*/
|
|
ParallelApplyWorkerInfo *
|
|
pa_find_worker(TransactionId xid)
|
|
{
|
|
bool found;
|
|
ParallelApplyWorkerEntry *entry;
|
|
|
|
if (!TransactionIdIsValid(xid))
|
|
return NULL;
|
|
|
|
if (!ParallelApplyTxnHash)
|
|
return NULL;
|
|
|
|
/* Return the cached parallel apply worker if valid. */
|
|
if (stream_apply_worker)
|
|
return stream_apply_worker;
|
|
|
|
/* Find an entry for the requested transaction. */
|
|
entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
|
|
if (found)
|
|
{
|
|
/* The worker must not have exited. */
|
|
Assert(entry->winfo->in_use);
|
|
return entry->winfo;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* Makes the worker available for reuse.
|
|
*
|
|
* This removes the parallel apply worker entry from the hash table so that it
|
|
* can't be used. If there are enough workers in the pool, it stops the worker
|
|
* and frees the corresponding info. Otherwise it just marks the worker as
|
|
* available for reuse.
|
|
*
|
|
* For more information about the worker pool, see comments atop this file.
|
|
*/
|
|
static void
|
|
pa_free_worker(ParallelApplyWorkerInfo *winfo)
|
|
{
|
|
Assert(!am_parallel_apply_worker());
|
|
Assert(winfo->in_use);
|
|
Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
|
|
|
|
if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
|
|
elog(ERROR, "hash table corrupted");
|
|
|
|
/*
|
|
* Stop the worker if there are enough workers in the pool.
|
|
*
|
|
* XXX Additionally, we also stop the worker if the leader apply worker
|
|
* serialize part of the transaction data due to a send timeout. This is
|
|
* because the message could be partially written to the queue and there
|
|
* is no way to clean the queue other than resending the message until it
|
|
* succeeds. Instead of trying to send the data which anyway would have
|
|
* been serialized and then letting the parallel apply worker deal with
|
|
* the spurious message, we stop the worker.
|
|
*/
|
|
if (winfo->serialize_changes ||
|
|
list_length(ParallelApplyWorkerPool) >
|
|
(max_parallel_apply_workers_per_subscription / 2))
|
|
{
|
|
int slot_no;
|
|
uint16 generation;
|
|
|
|
SpinLockAcquire(&winfo->shared->mutex);
|
|
generation = winfo->shared->logicalrep_worker_generation;
|
|
slot_no = winfo->shared->logicalrep_worker_slot_no;
|
|
SpinLockRelease(&winfo->shared->mutex);
|
|
|
|
logicalrep_pa_worker_stop(slot_no, generation);
|
|
|
|
pa_free_worker_info(winfo);
|
|
|
|
return;
|
|
}
|
|
|
|
winfo->in_use = false;
|
|
winfo->serialize_changes = false;
|
|
}
|
|
|
|
/*
|
|
* Free the parallel apply worker information and unlink the files with
|
|
* serialized changes if any.
|
|
*/
|
|
static void
|
|
pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
|
|
{
|
|
Assert(winfo);
|
|
|
|
if (winfo->mq_handle)
|
|
shm_mq_detach(winfo->mq_handle);
|
|
|
|
if (winfo->error_mq_handle)
|
|
shm_mq_detach(winfo->error_mq_handle);
|
|
|
|
/* Unlink the files with serialized changes. */
|
|
if (winfo->serialize_changes)
|
|
stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
|
|
|
|
if (winfo->dsm_seg)
|
|
dsm_detach(winfo->dsm_seg);
|
|
|
|
/* Remove from the worker pool. */
|
|
ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
|
|
|
|
pfree(winfo);
|
|
}
|
|
|
|
/*
|
|
* Detach the error queue for all parallel apply workers.
|
|
*/
|
|
void
|
|
pa_detach_all_error_mq(void)
|
|
{
|
|
ListCell *lc;
|
|
|
|
foreach(lc, ParallelApplyWorkerPool)
|
|
{
|
|
ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
|
|
|
|
shm_mq_detach(winfo->error_mq_handle);
|
|
winfo->error_mq_handle = NULL;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Check if there are any pending spooled messages.
|
|
*/
|
|
static bool
|
|
pa_has_spooled_message_pending()
|
|
{
|
|
PartialFileSetState fileset_state;
|
|
|
|
fileset_state = pa_get_fileset_state();
|
|
|
|
return (fileset_state != FS_EMPTY);
|
|
}
|
|
|
|
/*
|
|
* Replay the spooled messages once the leader apply worker has finished
|
|
* serializing changes to the file.
|
|
*
|
|
* Returns false if there aren't any pending spooled messages, true otherwise.
|
|
*/
|
|
static bool
|
|
pa_process_spooled_messages_if_required(void)
|
|
{
|
|
PartialFileSetState fileset_state;
|
|
|
|
fileset_state = pa_get_fileset_state();
|
|
|
|
if (fileset_state == FS_EMPTY)
|
|
return false;
|
|
|
|
/*
|
|
* If the leader apply worker is busy serializing the partial changes then
|
|
* acquire the stream lock now and wait for the leader worker to finish
|
|
* serializing the changes. Otherwise, the parallel apply worker won't get
|
|
* a chance to receive a STREAM_STOP (and acquire the stream lock) until
|
|
* the leader had serialized all changes which can lead to undetected
|
|
* deadlock.
|
|
*
|
|
* Note that the fileset state can be FS_SERIALIZE_DONE once the leader
|
|
* worker has finished serializing the changes.
|
|
*/
|
|
if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
|
|
{
|
|
pa_lock_stream(MyParallelShared->xid, AccessShareLock);
|
|
pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
|
|
|
|
fileset_state = pa_get_fileset_state();
|
|
}
|
|
|
|
/*
|
|
* We cannot read the file immediately after the leader has serialized all
|
|
* changes to the file because there may still be messages in the memory
|
|
* queue. We will apply all spooled messages the next time we call this
|
|
* function and that will ensure there are no messages left in the memory
|
|
* queue.
|
|
*/
|
|
if (fileset_state == FS_SERIALIZE_DONE)
|
|
{
|
|
pa_set_fileset_state(MyParallelShared, FS_READY);
|
|
}
|
|
else if (fileset_state == FS_READY)
|
|
{
|
|
apply_spooled_messages(&MyParallelShared->fileset,
|
|
MyParallelShared->xid,
|
|
InvalidXLogRecPtr);
|
|
pa_set_fileset_state(MyParallelShared, FS_EMPTY);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Interrupt handler for main loop of parallel apply worker.
|
|
*/
|
|
static void
|
|
ProcessParallelApplyInterrupts(void)
|
|
{
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
if (ShutdownRequestPending)
|
|
{
|
|
ereport(LOG,
|
|
(errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
|
|
MySubscription->name)));
|
|
|
|
proc_exit(0);
|
|
}
|
|
|
|
if (ConfigReloadPending)
|
|
{
|
|
ConfigReloadPending = false;
|
|
ProcessConfigFile(PGC_SIGHUP);
|
|
}
|
|
}
|
|
|
|
/* Parallel apply worker main loop. */
|
|
static void
|
|
LogicalParallelApplyLoop(shm_mq_handle *mqh)
|
|
{
|
|
shm_mq_result shmq_res;
|
|
ErrorContextCallback errcallback;
|
|
MemoryContext oldcxt = CurrentMemoryContext;
|
|
|
|
/*
|
|
* Init the ApplyMessageContext which we clean up after each replication
|
|
* protocol message.
|
|
*/
|
|
ApplyMessageContext = AllocSetContextCreate(ApplyContext,
|
|
"ApplyMessageContext",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
/*
|
|
* Push apply error context callback. Fields will be filled while applying
|
|
* a change.
|
|
*/
|
|
errcallback.callback = apply_error_callback;
|
|
errcallback.previous = error_context_stack;
|
|
error_context_stack = &errcallback;
|
|
|
|
for (;;)
|
|
{
|
|
void *data;
|
|
Size len;
|
|
|
|
ProcessParallelApplyInterrupts();
|
|
|
|
/* Ensure we are reading the data into our memory context. */
|
|
MemoryContextSwitchTo(ApplyMessageContext);
|
|
|
|
shmq_res = shm_mq_receive(mqh, &len, &data, true);
|
|
|
|
if (shmq_res == SHM_MQ_SUCCESS)
|
|
{
|
|
StringInfoData s;
|
|
int c;
|
|
|
|
if (len == 0)
|
|
elog(ERROR, "invalid message length");
|
|
|
|
s.cursor = 0;
|
|
s.maxlen = -1;
|
|
s.data = (char *) data;
|
|
s.len = len;
|
|
|
|
/*
|
|
* The first byte of messages sent from leader apply worker to
|
|
* parallel apply workers can only be 'w'.
|
|
*/
|
|
c = pq_getmsgbyte(&s);
|
|
if (c != 'w')
|
|
elog(ERROR, "unexpected message \"%c\"", c);
|
|
|
|
/*
|
|
* Ignore statistics fields that have been updated by the leader
|
|
* apply worker.
|
|
*
|
|
* XXX We can avoid sending the statistics fields from the leader
|
|
* apply worker but for that, it needs to rebuild the entire
|
|
* message by removing these fields which could be more work than
|
|
* simply ignoring these fields in the parallel apply worker.
|
|
*/
|
|
s.cursor += SIZE_STATS_MESSAGE;
|
|
|
|
apply_dispatch(&s);
|
|
}
|
|
else if (shmq_res == SHM_MQ_WOULD_BLOCK)
|
|
{
|
|
/* Replay the changes from the file, if any. */
|
|
if (!pa_process_spooled_messages_if_required())
|
|
{
|
|
int rc;
|
|
|
|
/* Wait for more work. */
|
|
rc = WaitLatch(MyLatch,
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
1000L,
|
|
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
|
|
|
|
if (rc & WL_LATCH_SET)
|
|
ResetLatch(MyLatch);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Assert(shmq_res == SHM_MQ_DETACHED);
|
|
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("lost connection to the logical replication apply worker")));
|
|
}
|
|
|
|
MemoryContextReset(ApplyMessageContext);
|
|
MemoryContextSwitchTo(oldcxt);
|
|
}
|
|
|
|
/* Pop the error context stack. */
|
|
error_context_stack = errcallback.previous;
|
|
|
|
MemoryContextSwitchTo(oldcxt);
|
|
}
|
|
|
|
/*
|
|
* Make sure the leader apply worker tries to read from our error queue one more
|
|
* time. This guards against the case where we exit uncleanly without sending
|
|
* an ErrorResponse, for example because some code calls proc_exit directly.
|
|
*/
|
|
static void
|
|
pa_shutdown(int code, Datum arg)
|
|
{
|
|
SendProcSignal(MyLogicalRepWorker->leader_pid,
|
|
PROCSIG_PARALLEL_APPLY_MESSAGE,
|
|
InvalidBackendId);
|
|
|
|
dsm_detach((dsm_segment *) DatumGetPointer(arg));
|
|
}
|
|
|
|
/*
|
|
* Parallel apply worker entry point.
|
|
*/
|
|
void
|
|
ParallelApplyWorkerMain(Datum main_arg)
|
|
{
|
|
ParallelApplyWorkerShared *shared;
|
|
dsm_handle handle;
|
|
dsm_segment *seg;
|
|
shm_toc *toc;
|
|
shm_mq *mq;
|
|
shm_mq_handle *mqh;
|
|
shm_mq_handle *error_mqh;
|
|
RepOriginId originid;
|
|
int worker_slot = DatumGetInt32(main_arg);
|
|
char originname[NAMEDATALEN];
|
|
|
|
/* Setup signal handling. */
|
|
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
|
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
|
|
pqsignal(SIGTERM, die);
|
|
BackgroundWorkerUnblockSignals();
|
|
|
|
/*
|
|
* Attach to the dynamic shared memory segment for the parallel apply, and
|
|
* find its table of contents.
|
|
*
|
|
* Like parallel query, we don't need resource owner by this time. See
|
|
* ParallelWorkerMain.
|
|
*/
|
|
memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
|
|
seg = dsm_attach(handle);
|
|
if (!seg)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("unable to map dynamic shared memory segment")));
|
|
|
|
toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
|
|
if (!toc)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("bad magic number in dynamic shared memory segment")));
|
|
|
|
before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
|
|
|
|
/* Look up the shared information. */
|
|
shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
|
|
MyParallelShared = shared;
|
|
|
|
/*
|
|
* Attach to the message queue.
|
|
*/
|
|
mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
|
|
shm_mq_set_receiver(mq, MyProc);
|
|
mqh = shm_mq_attach(mq, seg, NULL);
|
|
|
|
/*
|
|
* Primary initialization is complete. Now, we can attach to our slot.
|
|
* This is to ensure that the leader apply worker does not write data to
|
|
* the uninitialized memory queue.
|
|
*/
|
|
logicalrep_worker_attach(worker_slot);
|
|
|
|
SpinLockAcquire(&MyParallelShared->mutex);
|
|
MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
|
|
MyParallelShared->logicalrep_worker_slot_no = worker_slot;
|
|
SpinLockRelease(&MyParallelShared->mutex);
|
|
|
|
/*
|
|
* Attach to the error queue.
|
|
*/
|
|
mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
|
|
shm_mq_set_sender(mq, MyProc);
|
|
error_mqh = shm_mq_attach(mq, seg, NULL);
|
|
|
|
pq_redirect_to_shm_mq(seg, error_mqh);
|
|
pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
|
|
InvalidBackendId);
|
|
|
|
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
|
|
MyLogicalRepWorker->reply_time = 0;
|
|
|
|
InitializeApplyWorker();
|
|
|
|
/* Setup replication origin tracking. */
|
|
StartTransactionCommand();
|
|
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
|
|
originname, sizeof(originname));
|
|
originid = replorigin_by_name(originname, false);
|
|
|
|
/*
|
|
* The parallel apply worker doesn't need to monopolize this replication
|
|
* origin which was already acquired by its leader process.
|
|
*/
|
|
replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
|
|
replorigin_session_origin = originid;
|
|
CommitTransactionCommand();
|
|
|
|
/*
|
|
* Setup callback for syscache so that we know when something changes in
|
|
* the subscription relation state.
|
|
*/
|
|
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
|
|
invalidate_syncing_table_states,
|
|
(Datum) 0);
|
|
|
|
set_apply_error_context_origin(originname);
|
|
|
|
LogicalParallelApplyLoop(mqh);
|
|
|
|
/*
|
|
* The parallel apply worker must not get here because the parallel apply
|
|
* worker will only stop when it receives a SIGTERM or SIGINT from the
|
|
* leader, or when there is an error. None of these cases will allow the
|
|
* code to reach here.
|
|
*/
|
|
Assert(false);
|
|
}
|
|
|
|
/*
|
|
* Handle receipt of an interrupt indicating a parallel apply worker message.
|
|
*
|
|
* Note: this is called within a signal handler! All we can do is set a flag
|
|
* that will cause the next CHECK_FOR_INTERRUPTS() to invoke
|
|
* HandleParallelApplyMessages().
|
|
*/
|
|
void
|
|
HandleParallelApplyMessageInterrupt(void)
|
|
{
|
|
InterruptPending = true;
|
|
ParallelApplyMessagePending = true;
|
|
SetLatch(MyLatch);
|
|
}
|
|
|
|
/*
|
|
* Handle a single protocol message received from a single parallel apply
|
|
* worker.
|
|
*/
|
|
static void
|
|
HandleParallelApplyMessage(StringInfo msg)
|
|
{
|
|
char msgtype;
|
|
|
|
msgtype = pq_getmsgbyte(msg);
|
|
|
|
switch (msgtype)
|
|
{
|
|
case 'E': /* ErrorResponse */
|
|
{
|
|
ErrorData edata;
|
|
|
|
/* Parse ErrorResponse. */
|
|
pq_parse_errornotice(msg, &edata);
|
|
|
|
/*
|
|
* If desired, add a context line to show that this is a
|
|
* message propagated from a parallel apply worker. Otherwise,
|
|
* it can sometimes be confusing to understand what actually
|
|
* happened.
|
|
*/
|
|
if (edata.context)
|
|
edata.context = psprintf("%s\n%s", edata.context,
|
|
_("logical replication parallel apply worker"));
|
|
else
|
|
edata.context = pstrdup(_("logical replication parallel apply worker"));
|
|
|
|
/*
|
|
* Context beyond that should use the error context callbacks
|
|
* that were in effect in LogicalRepApplyLoop().
|
|
*/
|
|
error_context_stack = apply_error_context_stack;
|
|
|
|
/*
|
|
* The actual error must have been reported by the parallel
|
|
* apply worker.
|
|
*/
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("logical replication parallel apply worker exited due to error"),
|
|
errcontext("%s", edata.context)));
|
|
}
|
|
|
|
/*
|
|
* Don't need to do anything about NoticeResponse and
|
|
* NotifyResponse as the logical replication worker doesn't need
|
|
* to send messages to the client.
|
|
*/
|
|
case 'N':
|
|
case 'A':
|
|
break;
|
|
|
|
default:
|
|
elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
|
|
msgtype, msg->len);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handle any queued protocol messages received from parallel apply workers.
|
|
*/
|
|
void
|
|
HandleParallelApplyMessages(void)
|
|
{
|
|
ListCell *lc;
|
|
MemoryContext oldcontext;
|
|
|
|
static MemoryContext hpam_context = NULL;
|
|
|
|
/*
|
|
* This is invoked from ProcessInterrupts(), and since some of the
|
|
* functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
|
|
* for recursive calls if more signals are received while this runs. It's
|
|
* unclear that recursive entry would be safe, and it doesn't seem useful
|
|
* even if it is safe, so let's block interrupts until done.
|
|
*/
|
|
HOLD_INTERRUPTS();
|
|
|
|
/*
|
|
* Moreover, CurrentMemoryContext might be pointing almost anywhere. We
|
|
* don't want to risk leaking data into long-lived contexts, so let's do
|
|
* our work here in a private context that we can reset on each use.
|
|
*/
|
|
if (!hpam_context) /* first time through? */
|
|
hpam_context = AllocSetContextCreate(TopMemoryContext,
|
|
"HandleParallelApplyMessages",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
else
|
|
MemoryContextReset(hpam_context);
|
|
|
|
oldcontext = MemoryContextSwitchTo(hpam_context);
|
|
|
|
ParallelApplyMessagePending = false;
|
|
|
|
foreach(lc, ParallelApplyWorkerPool)
|
|
{
|
|
shm_mq_result res;
|
|
Size nbytes;
|
|
void *data;
|
|
ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
|
|
|
|
/*
|
|
* The leader will detach from the error queue and set it to NULL
|
|
* before preparing to stop all parallel apply workers, so we don't
|
|
* need to handle error messages anymore. See
|
|
* logicalrep_worker_detach.
|
|
*/
|
|
if (!winfo->error_mq_handle)
|
|
continue;
|
|
|
|
res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
|
|
|
|
if (res == SHM_MQ_WOULD_BLOCK)
|
|
continue;
|
|
else if (res == SHM_MQ_SUCCESS)
|
|
{
|
|
StringInfoData msg;
|
|
|
|
initStringInfo(&msg);
|
|
appendBinaryStringInfo(&msg, data, nbytes);
|
|
HandleParallelApplyMessage(&msg);
|
|
pfree(msg.data);
|
|
}
|
|
else
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("lost connection to the logical replication parallel apply worker")));
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/* Might as well clear the context on our way out */
|
|
MemoryContextReset(hpam_context);
|
|
|
|
RESUME_INTERRUPTS();
|
|
}
|
|
|
|
/*
|
|
* Send the data to the specified parallel apply worker via shared-memory
|
|
* queue.
|
|
*
|
|
* Returns false if the attempt to send data via shared memory times out, true
|
|
* otherwise.
|
|
*/
|
|
bool
|
|
pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
|
|
{
|
|
int rc;
|
|
shm_mq_result result;
|
|
TimestampTz startTime = 0;
|
|
|
|
Assert(!IsTransactionState());
|
|
Assert(!winfo->serialize_changes);
|
|
|
|
/*
|
|
* This timeout is a bit arbitrary but testing revealed that it is sufficient
|
|
* to send the message unless the parallel apply worker is waiting on some
|
|
* lock or there is a serious resource crunch. See the comments atop this file
|
|
* to know why we are using a non-blocking way to send the message.
|
|
*/
|
|
#define SHM_SEND_RETRY_INTERVAL_MS 1000
|
|
#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
|
|
|
|
for (;;)
|
|
{
|
|
result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
|
|
|
|
if (result == SHM_MQ_SUCCESS)
|
|
return true;
|
|
else if (result == SHM_MQ_DETACHED)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("could not send data to shared-memory queue")));
|
|
|
|
Assert(result == SHM_MQ_WOULD_BLOCK);
|
|
|
|
/* Wait before retrying. */
|
|
rc = WaitLatch(MyLatch,
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
SHM_SEND_RETRY_INTERVAL_MS,
|
|
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
|
|
|
|
if (rc & WL_LATCH_SET)
|
|
{
|
|
ResetLatch(MyLatch);
|
|
CHECK_FOR_INTERRUPTS();
|
|
}
|
|
|
|
if (startTime == 0)
|
|
startTime = GetCurrentTimestamp();
|
|
else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
|
|
SHM_SEND_TIMEOUT_MS))
|
|
{
|
|
ereport(LOG,
|
|
(errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
|
|
winfo->shared->xid)));
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
|
|
* that the current data and any subsequent data for this transaction will be
|
|
* serialized to a file. This is done to prevent possible deadlocks with
|
|
* another parallel apply worker (refer to the comments atop this file).
|
|
*/
|
|
void
|
|
pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
|
|
bool stream_locked)
|
|
{
|
|
/*
|
|
* The parallel apply worker could be stuck for some reason (say waiting
|
|
* on some lock by other backend), so stop trying to send data directly to
|
|
* it and start serializing data to the file instead.
|
|
*/
|
|
winfo->serialize_changes = true;
|
|
|
|
/* Initialize the stream fileset. */
|
|
stream_start_internal(winfo->shared->xid, true);
|
|
|
|
/*
|
|
* Acquires the stream lock if not already to make sure that the parallel
|
|
* apply worker will wait for the leader to release the stream lock until
|
|
* the end of the transaction.
|
|
*/
|
|
if (!stream_locked)
|
|
pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
|
|
|
|
pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
|
|
}
|
|
|
|
/*
|
|
* Wait until the parallel apply worker's transaction state has reached or
|
|
* exceeded the given xact_state.
|
|
*/
|
|
static void
|
|
pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
|
|
ParallelTransState xact_state)
|
|
{
|
|
for (;;)
|
|
{
|
|
/*
|
|
* Stop if the transaction state has reached or exceeded the given
|
|
* xact_state.
|
|
*/
|
|
if (pa_get_xact_state(winfo->shared) >= xact_state)
|
|
break;
|
|
|
|
/* Wait to be signalled. */
|
|
(void) WaitLatch(MyLatch,
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
10L,
|
|
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
|
|
|
|
/* Reset the latch so we don't spin. */
|
|
ResetLatch(MyLatch);
|
|
|
|
/* An interrupt may have occurred while we were waiting. */
|
|
CHECK_FOR_INTERRUPTS();
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Wait until the parallel apply worker's transaction finishes.
|
|
*/
|
|
static void
|
|
pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
|
|
{
|
|
/*
|
|
* Wait until the parallel apply worker set the state to
|
|
* PARALLEL_TRANS_STARTED which means it has acquired the transaction
|
|
* lock. This is to prevent leader apply worker from acquiring the
|
|
* transaction lock earlier than the parallel apply worker.
|
|
*/
|
|
pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
|
|
|
|
/*
|
|
* Wait for the transaction lock to be released. This is required to
|
|
* detect deadlock among leader and parallel apply workers. Refer to the
|
|
* comments atop this file.
|
|
*/
|
|
pa_lock_transaction(winfo->shared->xid, AccessShareLock);
|
|
pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
|
|
|
|
/*
|
|
* Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
|
|
* apply worker failed while applying changes causing the lock to be
|
|
* released.
|
|
*/
|
|
if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("lost connection to the logical replication parallel apply worker")));
|
|
}
|
|
|
|
/*
|
|
* Set the transaction state for a given parallel apply worker.
|
|
*/
|
|
void
|
|
pa_set_xact_state(ParallelApplyWorkerShared *wshared,
|
|
ParallelTransState xact_state)
|
|
{
|
|
SpinLockAcquire(&wshared->mutex);
|
|
wshared->xact_state = xact_state;
|
|
SpinLockRelease(&wshared->mutex);
|
|
}
|
|
|
|
/*
|
|
* Get the transaction state for a given parallel apply worker.
|
|
*/
|
|
static ParallelTransState
|
|
pa_get_xact_state(ParallelApplyWorkerShared *wshared)
|
|
{
|
|
ParallelTransState xact_state;
|
|
|
|
SpinLockAcquire(&wshared->mutex);
|
|
xact_state = wshared->xact_state;
|
|
SpinLockRelease(&wshared->mutex);
|
|
|
|
return xact_state;
|
|
}
|
|
|
|
/*
|
|
* Cache the parallel apply worker information.
|
|
*/
|
|
void
|
|
pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
|
|
{
|
|
stream_apply_worker = winfo;
|
|
}
|
|
|
|
/*
|
|
* Form a unique savepoint name for the streaming transaction.
|
|
*
|
|
* Note that different subscriptions for publications on different nodes can
|
|
* receive same remote xid, so we need to use subscription id along with it.
|
|
*
|
|
* Returns the name in the supplied buffer.
|
|
*/
|
|
static void
|
|
pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
|
|
{
|
|
snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
|
|
}
|
|
|
|
/*
|
|
* Define a savepoint for a subxact in parallel apply worker if needed.
|
|
*
|
|
* The parallel apply worker can figure out if a new subtransaction was
|
|
* started by checking if the new change arrived with a different xid. In that
|
|
* case define a named savepoint, so that we are able to rollback to it
|
|
* if required.
|
|
*/
|
|
void
|
|
pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
|
|
{
|
|
if (current_xid != top_xid &&
|
|
!list_member_xid(subxactlist, current_xid))
|
|
{
|
|
MemoryContext oldctx;
|
|
char spname[NAMEDATALEN];
|
|
|
|
pa_savepoint_name(MySubscription->oid, current_xid,
|
|
spname, sizeof(spname));
|
|
|
|
elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
|
|
|
|
/* We must be in transaction block to define the SAVEPOINT. */
|
|
if (!IsTransactionBlock())
|
|
{
|
|
if (!IsTransactionState())
|
|
StartTransactionCommand();
|
|
|
|
BeginTransactionBlock();
|
|
CommitTransactionCommand();
|
|
}
|
|
|
|
DefineSavepoint(spname);
|
|
|
|
/*
|
|
* CommitTransactionCommand is needed to start a subtransaction after
|
|
* issuing a SAVEPOINT inside a transaction block (see
|
|
* StartSubTransaction()).
|
|
*/
|
|
CommitTransactionCommand();
|
|
|
|
oldctx = MemoryContextSwitchTo(TopTransactionContext);
|
|
subxactlist = lappend_xid(subxactlist, current_xid);
|
|
MemoryContextSwitchTo(oldctx);
|
|
}
|
|
}
|
|
|
|
/* Reset the list that maintains subtransactions. */
|
|
void
|
|
pa_reset_subtrans(void)
|
|
{
|
|
/*
|
|
* We don't need to free this explicitly as the allocated memory will be
|
|
* freed at the transaction end.
|
|
*/
|
|
subxactlist = NIL;
|
|
}
|
|
|
|
/*
|
|
* Handle STREAM ABORT message when the transaction was applied in a parallel
|
|
* apply worker.
|
|
*/
|
|
void
|
|
pa_stream_abort(LogicalRepStreamAbortData *abort_data)
|
|
{
|
|
TransactionId xid = abort_data->xid;
|
|
TransactionId subxid = abort_data->subxid;
|
|
|
|
/*
|
|
* Update origin state so we can restart streaming from correct position
|
|
* in case of crash.
|
|
*/
|
|
replorigin_session_origin_lsn = abort_data->abort_lsn;
|
|
replorigin_session_origin_timestamp = abort_data->abort_time;
|
|
|
|
/*
|
|
* If the two XIDs are the same, it's in fact abort of toplevel xact, so
|
|
* just free the subxactlist.
|
|
*/
|
|
if (subxid == xid)
|
|
{
|
|
pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
|
|
|
|
/*
|
|
* Release the lock as we might be processing an empty streaming
|
|
* transaction in which case the lock won't be released during
|
|
* transaction rollback.
|
|
*
|
|
* Note that it's ok to release the transaction lock before aborting
|
|
* the transaction because even if the parallel apply worker dies due
|
|
* to crash or some other reason, such a transaction would still be
|
|
* considered aborted.
|
|
*/
|
|
pa_unlock_transaction(xid, AccessExclusiveLock);
|
|
|
|
AbortCurrentTransaction();
|
|
|
|
if (IsTransactionBlock())
|
|
{
|
|
EndTransactionBlock(false);
|
|
CommitTransactionCommand();
|
|
}
|
|
|
|
pa_reset_subtrans();
|
|
|
|
pgstat_report_activity(STATE_IDLE, NULL);
|
|
}
|
|
else
|
|
{
|
|
/* OK, so it's a subxact. Rollback to the savepoint. */
|
|
int i;
|
|
char spname[NAMEDATALEN];
|
|
|
|
pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
|
|
|
|
elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
|
|
|
|
/*
|
|
* Search the subxactlist, determine the offset tracked for the
|
|
* subxact, and truncate the list.
|
|
*
|
|
* Note that for an empty sub-transaction we won't find the subxid
|
|
* here.
|
|
*/
|
|
for (i = list_length(subxactlist) - 1; i >= 0; i--)
|
|
{
|
|
TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
|
|
|
|
if (xid_tmp == subxid)
|
|
{
|
|
RollbackToSavepoint(spname);
|
|
CommitTransactionCommand();
|
|
subxactlist = list_truncate(subxactlist, i);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Set the fileset state for a particular parallel apply worker. The fileset
|
|
* will be set once the leader worker serialized all changes to the file
|
|
* so that it can be used by parallel apply worker.
|
|
*/
|
|
void
|
|
pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
|
|
PartialFileSetState fileset_state)
|
|
{
|
|
SpinLockAcquire(&wshared->mutex);
|
|
wshared->fileset_state = fileset_state;
|
|
|
|
if (fileset_state == FS_SERIALIZE_DONE)
|
|
{
|
|
Assert(am_leader_apply_worker());
|
|
Assert(MyLogicalRepWorker->stream_fileset);
|
|
wshared->fileset = *MyLogicalRepWorker->stream_fileset;
|
|
}
|
|
|
|
SpinLockRelease(&wshared->mutex);
|
|
}
|
|
|
|
/*
|
|
* Get the fileset state for the current parallel apply worker.
|
|
*/
|
|
static PartialFileSetState
|
|
pa_get_fileset_state(void)
|
|
{
|
|
PartialFileSetState fileset_state;
|
|
|
|
Assert(am_parallel_apply_worker());
|
|
|
|
SpinLockAcquire(&MyParallelShared->mutex);
|
|
fileset_state = MyParallelShared->fileset_state;
|
|
SpinLockRelease(&MyParallelShared->mutex);
|
|
|
|
return fileset_state;
|
|
}
|
|
|
|
/*
|
|
* Helper functions to acquire and release a lock for each stream block.
|
|
*
|
|
* Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
|
|
* stream lock.
|
|
*
|
|
* Refer to the comments atop this file to see how the stream lock is used.
|
|
*/
|
|
void
|
|
pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
|
|
{
|
|
LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
|
|
PARALLEL_APPLY_LOCK_STREAM, lockmode);
|
|
}
|
|
|
|
void
|
|
pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
|
|
{
|
|
UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
|
|
PARALLEL_APPLY_LOCK_STREAM, lockmode);
|
|
}
|
|
|
|
/*
|
|
* Helper functions to acquire and release a lock for each local transaction
|
|
* apply.
|
|
*
|
|
* Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
|
|
* transaction lock.
|
|
*
|
|
* Note that all the callers must pass a remote transaction ID instead of a
|
|
* local transaction ID as xid. This is because the local transaction ID will
|
|
* only be assigned while applying the first change in the parallel apply but
|
|
* it's possible that the first change in the parallel apply worker is blocked
|
|
* by a concurrently executing transaction in another parallel apply worker. We
|
|
* can only communicate the local transaction id to the leader after applying
|
|
* the first change so it won't be able to wait after sending the xact finish
|
|
* command using this lock.
|
|
*
|
|
* Refer to the comments atop this file to see how the transaction lock is
|
|
* used.
|
|
*/
|
|
void
|
|
pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
|
|
{
|
|
LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
|
|
PARALLEL_APPLY_LOCK_XACT, lockmode);
|
|
}
|
|
|
|
void
|
|
pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
|
|
{
|
|
UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
|
|
PARALLEL_APPLY_LOCK_XACT, lockmode);
|
|
}
|
|
|
|
/*
|
|
* Decrement the number of pending streaming blocks and wait on the stream lock
|
|
* if there is no pending block available.
|
|
*/
|
|
void
|
|
pa_decr_and_wait_stream_block(void)
|
|
{
|
|
Assert(am_parallel_apply_worker());
|
|
|
|
/*
|
|
* It is only possible to not have any pending stream chunks when we are
|
|
* applying spooled messages.
|
|
*/
|
|
if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
|
|
{
|
|
if (pa_has_spooled_message_pending())
|
|
return;
|
|
|
|
elog(ERROR, "invalid pending streaming chunk 0");
|
|
}
|
|
|
|
if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
|
|
{
|
|
pa_lock_stream(MyParallelShared->xid, AccessShareLock);
|
|
pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Finish processing the streaming transaction in the leader apply worker.
|
|
*/
|
|
void
|
|
pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
|
|
{
|
|
Assert(am_leader_apply_worker());
|
|
|
|
/*
|
|
* Unlock the shared object lock so that parallel apply worker can
|
|
* continue to receive and apply changes.
|
|
*/
|
|
pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
|
|
|
|
/*
|
|
* Wait for that worker to finish. This is necessary to maintain commit
|
|
* order which avoids failures due to transaction dependencies and
|
|
* deadlocks.
|
|
*/
|
|
pa_wait_for_xact_finish(winfo);
|
|
|
|
if (!XLogRecPtrIsInvalid(remote_lsn))
|
|
store_flush_position(remote_lsn, winfo->shared->last_commit_end);
|
|
|
|
pa_free_worker(winfo);
|
|
}
|