2017-03-23 13:36:36 +01:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
* tablesync.c
|
|
|
|
* PostgreSQL logical replication
|
|
|
|
*
|
2020-01-01 18:21:45 +01:00
|
|
|
* Copyright (c) 2012-2020, PostgreSQL Global Development Group
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* src/backend/replication/logical/tablesync.c
|
|
|
|
*
|
|
|
|
* NOTES
|
|
|
|
* This file contains code for initial table data synchronization for
|
|
|
|
* logical replication.
|
|
|
|
*
|
|
|
|
* The initial data synchronization is done separately for each table,
|
2017-06-13 16:43:36 +02:00
|
|
|
* in a separate apply worker that only fetches the initial snapshot data
|
|
|
|
* from the publisher and then synchronizes the position in the stream with
|
2017-03-23 13:36:36 +01:00
|
|
|
* the main apply worker.
|
|
|
|
*
|
2017-06-13 16:43:36 +02:00
|
|
|
* There are several reasons for doing the synchronization this way:
|
2017-03-23 13:36:36 +01:00
|
|
|
* - It allows us to parallelize the initial data synchronization
|
|
|
|
* which lowers the time needed for it to happen.
|
|
|
|
* - The initial synchronization does not have to hold the xid and LSN
|
|
|
|
* for the time it takes to copy data of all tables, causing less
|
|
|
|
* bloat and lower disk consumption compared to doing the
|
2017-06-13 16:43:36 +02:00
|
|
|
* synchronization in a single process for the whole database.
|
|
|
|
* - It allows us to synchronize any tables added after the initial
|
2017-03-23 13:36:36 +01:00
|
|
|
* synchronization has finished.
|
|
|
|
*
|
|
|
|
* The stream position synchronization works in multiple steps.
|
2017-06-06 20:38:44 +02:00
|
|
|
* - Sync finishes copy and sets worker state as SYNCWAIT and waits for
|
|
|
|
* state to change in a loop.
|
2017-03-23 13:36:36 +01:00
|
|
|
* - Apply periodically checks tables that are synchronizing for SYNCWAIT.
|
2017-06-06 20:38:44 +02:00
|
|
|
* When the desired state appears, it will set the worker state to
|
|
|
|
* CATCHUP and starts loop-waiting until either the table state is set
|
|
|
|
* to SYNCDONE or the sync worker exits.
|
|
|
|
* - After the sync worker has seen the state change to CATCHUP, it will
|
|
|
|
* read the stream and apply changes (acting like an apply worker) until
|
|
|
|
* it catches up to the specified stream position. Then it sets the
|
|
|
|
* state to SYNCDONE. There might be zero changes applied between
|
|
|
|
* CATCHUP and SYNCDONE, because the sync worker might be ahead of the
|
|
|
|
* apply worker.
|
|
|
|
* - Once the state was set to SYNCDONE, the apply will continue tracking
|
|
|
|
* the table until it reaches the SYNCDONE stream position, at which
|
|
|
|
* point it sets state to READY and stops tracking. Again, there might
|
|
|
|
* be zero changes in between.
|
|
|
|
*
|
2017-06-13 19:05:59 +02:00
|
|
|
* So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
|
|
|
|
* SYNCDONE -> READY.
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
|
|
|
* The catalog pg_subscription_rel is used to keep information about
|
2017-06-06 20:38:44 +02:00
|
|
|
* subscribed tables and their state. Some transient state during data
|
|
|
|
* synchronization is kept in shared memory. The states SYNCWAIT and
|
|
|
|
* CATCHUP only appear in memory.
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
|
|
|
* Example flows look like this:
|
|
|
|
* - Apply is in front:
|
|
|
|
* sync:8
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in memory SYNCWAIT
|
2017-03-23 13:36:36 +01:00
|
|
|
* apply:10
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in memory CATCHUP
|
2017-03-23 13:36:36 +01:00
|
|
|
* -> enter wait-loop
|
|
|
|
* sync:10
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in catalog SYNCDONE
|
2017-03-23 13:36:36 +01:00
|
|
|
* -> exit
|
|
|
|
* apply:10
|
|
|
|
* -> exit wait-loop
|
|
|
|
* -> continue rep
|
2017-06-06 20:38:44 +02:00
|
|
|
* apply:11
|
|
|
|
* -> set in catalog READY
|
2017-03-23 13:36:36 +01:00
|
|
|
* - Sync in front:
|
|
|
|
* sync:10
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in memory SYNCWAIT
|
2017-03-23 13:36:36 +01:00
|
|
|
* apply:8
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in memory CATCHUP
|
2017-03-23 13:36:36 +01:00
|
|
|
* -> continue per-table filtering
|
|
|
|
* sync:10
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in catalog SYNCDONE
|
2017-03-23 13:36:36 +01:00
|
|
|
* -> exit
|
|
|
|
* apply:10
|
2017-06-06 20:38:44 +02:00
|
|
|
* -> set in catalog READY
|
2017-03-23 13:36:36 +01:00
|
|
|
* -> stop per-table filtering
|
|
|
|
* -> continue rep
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres.h"
|
|
|
|
|
2019-01-21 19:18:20 +01:00
|
|
|
#include "access/table.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "access/xact.h"
|
|
|
|
#include "catalog/pg_subscription_rel.h"
|
|
|
|
#include "catalog/pg_type.h"
|
|
|
|
#include "commands/copy.h"
|
2019-11-12 04:00:16 +01:00
|
|
|
#include "miscadmin.h"
|
2017-04-18 05:22:04 +02:00
|
|
|
#include "parser/parse_relation.h"
|
2019-11-12 04:00:16 +01:00
|
|
|
#include "pgstat.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
#include "replication/logicallauncher.h"
|
|
|
|
#include "replication/logicalrelation.h"
|
|
|
|
#include "replication/walreceiver.h"
|
|
|
|
#include "replication/worker_internal.h"
|
|
|
|
#include "storage/ipc.h"
|
|
|
|
#include "utils/builtins.h"
|
|
|
|
#include "utils/lsyscache.h"
|
|
|
|
#include "utils/memutils.h"
|
2019-11-12 04:00:16 +01:00
|
|
|
#include "utils/snapmgr.h"
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
static bool table_states_valid = false;
|
|
|
|
|
|
|
|
StringInfo copybuf = NULL;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Exit routine for synchronization worker.
|
|
|
|
*/
|
2017-05-17 22:31:56 +02:00
|
|
|
static void
|
|
|
|
pg_attribute_noreturn()
|
2017-03-23 13:36:36 +01:00
|
|
|
finish_sync_worker(void)
|
|
|
|
{
|
2017-04-14 20:35:05 +02:00
|
|
|
/*
|
|
|
|
* Commit any outstanding transaction. This is the usual case, unless
|
|
|
|
* there was nothing to do for the table.
|
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
if (IsTransactionState())
|
2017-04-14 20:35:05 +02:00
|
|
|
{
|
2017-03-23 13:36:36 +01:00
|
|
|
CommitTransactionCommand();
|
2017-04-14 20:35:05 +02:00
|
|
|
pgstat_report_stat(false);
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* And flush all writes. */
|
|
|
|
XLogFlush(GetXLogWriteRecPtr());
|
|
|
|
|
2017-05-25 00:56:21 +02:00
|
|
|
StartTransactionCommand();
|
2017-03-23 13:36:36 +01:00
|
|
|
ereport(LOG,
|
2017-05-25 00:56:21 +02:00
|
|
|
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
|
2017-06-13 19:05:59 +02:00
|
|
|
MySubscription->name,
|
|
|
|
get_rel_name(MyLogicalRepWorker->relid))));
|
2017-05-25 00:56:21 +02:00
|
|
|
CommitTransactionCommand();
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-06 20:38:44 +02:00
|
|
|
/* Find the main apply worker and signal it. */
|
|
|
|
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
/* Stop gracefully */
|
|
|
|
proc_exit(0);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-06-13 16:43:36 +02:00
|
|
|
* Wait until the relation synchronization state is set in the catalog to the
|
2017-06-06 20:38:44 +02:00
|
|
|
* expected one.
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
2017-06-06 20:38:44 +02:00
|
|
|
* Used when transitioning from CATCHUP state to SYNCDONE.
|
2017-06-03 15:18:52 +02:00
|
|
|
*
|
2017-06-06 20:38:44 +02:00
|
|
|
* Returns false if the synchronization worker has disappeared or the table state
|
|
|
|
* has been reset.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
static bool
|
2017-06-06 20:38:44 +02:00
|
|
|
wait_for_relation_state_change(Oid relid, char expected_state)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
2017-06-06 20:38:44 +02:00
|
|
|
char state;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-02 20:46:00 +02:00
|
|
|
for (;;)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
LogicalRepWorker *worker;
|
2017-06-13 19:05:59 +02:00
|
|
|
XLogRecPtr statelsn;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-02 20:46:00 +02:00
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
2017-06-06 20:38:44 +02:00
|
|
|
/* XXX use cache invalidation here to improve performance? */
|
|
|
|
PushActiveSnapshot(GetLatestSnapshot());
|
|
|
|
state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
|
|
|
|
relid, &statelsn, true);
|
|
|
|
PopActiveSnapshot();
|
|
|
|
|
|
|
|
if (state == SUBREL_STATE_UNKNOWN)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
if (state == expected_state)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
/* Check if the sync worker is still running and bail if not. */
|
2017-03-23 13:36:36 +01:00
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
2017-06-03 15:18:52 +02:00
|
|
|
|
|
|
|
/* Check if the opposite worker is still running and bail if not. */
|
2017-03-23 13:36:36 +01:00
|
|
|
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:35:54 +02:00
|
|
|
am_tablesync_worker() ? InvalidOid : relid,
|
2017-06-03 15:18:52 +02:00
|
|
|
false);
|
2017-06-06 20:38:44 +02:00
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
2017-03-23 13:36:36 +01:00
|
|
|
if (!worker)
|
|
|
|
return false;
|
2017-06-03 15:18:52 +02:00
|
|
|
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
2018-11-23 08:16:41 +01:00
|
|
|
(void) WaitLatch(MyLatch,
|
|
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
|
|
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
|
2017-06-03 15:18:52 +02:00
|
|
|
|
2017-06-07 01:13:00 +02:00
|
|
|
ResetLatch(MyLatch);
|
2017-06-06 20:38:44 +02:00
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-06 20:38:44 +02:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-06-13 16:43:36 +02:00
|
|
|
* Wait until the apply worker changes the state of our synchronization
|
2017-06-06 20:38:44 +02:00
|
|
|
* worker to the expected one.
|
|
|
|
*
|
|
|
|
* Used when transitioning from SYNCWAIT state to CATCHUP.
|
|
|
|
*
|
2017-06-30 20:57:06 +02:00
|
|
|
* Returns false if the apply worker has disappeared.
|
2017-06-06 20:38:44 +02:00
|
|
|
*/
|
|
|
|
static bool
|
|
|
|
wait_for_worker_state_change(char expected_state)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *worker;
|
|
|
|
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
2017-06-30 20:57:06 +02:00
|
|
|
/*
|
|
|
|
* Done if already in correct state. (We assume this fetch is atomic
|
|
|
|
* enough to not give a misleading answer if we do it with no lock.)
|
|
|
|
*/
|
|
|
|
if (MyLogicalRepWorker->relstate == expected_state)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Bail out if the apply worker has died, else signal it we're
|
|
|
|
* waiting.
|
|
|
|
*/
|
2017-06-06 20:38:44 +02:00
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
|
|
|
|
InvalidOid, false);
|
2017-06-30 20:57:06 +02:00
|
|
|
if (worker && worker->proc)
|
|
|
|
logicalrep_worker_wakeup_ptr(worker);
|
2017-06-06 20:38:44 +02:00
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
if (!worker)
|
2017-06-30 20:57:06 +02:00
|
|
|
break;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-30 20:57:06 +02:00
|
|
|
/*
|
|
|
|
* Wait. We expect to get a latch signal back from the apply worker,
|
|
|
|
* but use a timeout in case it dies without sending one.
|
|
|
|
*/
|
2017-06-07 01:13:00 +02:00
|
|
|
rc = WaitLatch(MyLatch,
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
2018-11-23 08:16:41 +01:00
|
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
2017-06-03 15:18:52 +02:00
|
|
|
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-30 20:57:06 +02:00
|
|
|
if (rc & WL_LATCH_SET)
|
|
|
|
ResetLatch(MyLatch);
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Callback from syscache invalidation.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
|
|
|
|
{
|
|
|
|
table_states_valid = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Handle table synchronization cooperation from the synchronization
|
|
|
|
* worker.
|
|
|
|
*
|
2017-06-06 20:38:44 +02:00
|
|
|
* If the sync worker is in CATCHUP state and reached (or passed) the
|
|
|
|
* predetermined synchronization point in the WAL stream, mark the table as
|
|
|
|
* SYNCDONE and finish.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
static void
|
|
|
|
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
|
|
|
|
{
|
|
|
|
Assert(IsTransactionState());
|
|
|
|
|
|
|
|
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
|
|
|
|
|
|
|
if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
|
|
|
|
current_lsn >= MyLogicalRepWorker->relstate_lsn)
|
|
|
|
{
|
|
|
|
TimeLineID tli;
|
|
|
|
|
2017-06-06 20:38:44 +02:00
|
|
|
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
|
2017-03-23 13:36:36 +01:00
|
|
|
MyLogicalRepWorker->relstate_lsn = current_lsn;
|
|
|
|
|
|
|
|
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
|
|
|
|
2018-04-06 16:00:26 +02:00
|
|
|
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
|
|
|
|
MyLogicalRepWorker->relid,
|
|
|
|
MyLogicalRepWorker->relstate,
|
|
|
|
MyLogicalRepWorker->relstate_lsn);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
walrcv_endstreaming(wrconn, &tli);
|
|
|
|
finish_sync_worker();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Handle table synchronization cooperation from the apply worker.
|
|
|
|
*
|
|
|
|
* Walk over all subscription tables that are individually tracked by the
|
|
|
|
* apply process (currently, all that have state other than
|
|
|
|
* SUBREL_STATE_READY) and manage synchronization for them.
|
|
|
|
*
|
|
|
|
* If there are tables that need synchronizing and are not being synchronized
|
|
|
|
* yet, start sync workers for them (if there are free slots for sync
|
2017-04-27 20:57:26 +02:00
|
|
|
* workers). To prevent starting the sync worker for the same relation at a
|
|
|
|
* high frequency after a failure, we store its last start time with each sync
|
|
|
|
* state info. We start the sync worker for the same relation after waiting
|
|
|
|
* at least wal_retrieve_retry_interval.
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
|
|
|
* For tables that are being synchronized already, check if sync workers
|
2017-06-06 20:38:44 +02:00
|
|
|
* either need action from the apply worker or have finished. This is the
|
|
|
|
* SYNCWAIT to CATCHUP transition.
|
2017-03-23 13:36:36 +01:00
|
|
|
*
|
2017-06-06 20:38:44 +02:00
|
|
|
* If the synchronization position is reached (SYNCDONE), then the table can
|
|
|
|
* be marked as READY and is no longer tracked.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
static void
|
|
|
|
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
|
|
|
{
|
2017-04-27 20:57:26 +02:00
|
|
|
struct tablesync_start_time_mapping
|
|
|
|
{
|
|
|
|
Oid relid;
|
2017-05-17 22:31:56 +02:00
|
|
|
TimestampTz last_start_time;
|
2017-04-27 20:57:26 +02:00
|
|
|
};
|
2017-03-23 13:36:36 +01:00
|
|
|
static List *table_states = NIL;
|
2017-04-27 20:57:26 +02:00
|
|
|
static HTAB *last_start_times = NULL;
|
2017-03-23 13:36:36 +01:00
|
|
|
ListCell *lc;
|
2017-05-08 18:07:59 +02:00
|
|
|
bool started_tx = false;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
Assert(!IsTransactionState());
|
|
|
|
|
2017-06-13 16:43:36 +02:00
|
|
|
/* We need up-to-date sync state info for subscription tables here. */
|
2017-03-23 13:36:36 +01:00
|
|
|
if (!table_states_valid)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
MemoryContext oldctx;
|
|
|
|
List *rstates;
|
|
|
|
ListCell *lc;
|
2017-03-23 13:36:36 +01:00
|
|
|
SubscriptionRelState *rstate;
|
|
|
|
|
|
|
|
/* Clean the old list. */
|
|
|
|
list_free_deep(table_states);
|
|
|
|
table_states = NIL;
|
|
|
|
|
|
|
|
StartTransactionCommand();
|
2017-05-08 18:07:59 +02:00
|
|
|
started_tx = true;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Fetch all non-ready tables. */
|
2017-05-17 22:31:56 +02:00
|
|
|
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Allocate the tracking info in a permanent memory context. */
|
|
|
|
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
|
|
|
|
foreach(lc, rstates)
|
|
|
|
{
|
|
|
|
rstate = palloc(sizeof(SubscriptionRelState));
|
|
|
|
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
|
|
|
|
table_states = lappend(table_states, rstate);
|
|
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldctx);
|
|
|
|
|
|
|
|
table_states_valid = true;
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:57:26 +02:00
|
|
|
/*
|
2017-06-13 16:43:36 +02:00
|
|
|
* Prepare a hash table for tracking last start times of workers, to avoid
|
2017-04-27 20:57:26 +02:00
|
|
|
* immediate restarts. We don't need it if there are no tables that need
|
|
|
|
* syncing.
|
|
|
|
*/
|
|
|
|
if (table_states && !last_start_times)
|
|
|
|
{
|
|
|
|
HASHCTL ctl;
|
|
|
|
|
|
|
|
memset(&ctl, 0, sizeof(ctl));
|
|
|
|
ctl.keysize = sizeof(Oid);
|
|
|
|
ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
|
|
|
|
last_start_times = hash_create("Logical replication table sync worker start times",
|
|
|
|
256, &ctl, HASH_ELEM | HASH_BLOBS);
|
|
|
|
}
|
2017-05-17 22:31:56 +02:00
|
|
|
|
2017-04-27 20:57:26 +02:00
|
|
|
/*
|
|
|
|
* Clean up the hash table when we're done with all tables (just to
|
|
|
|
* release the bit of memory).
|
|
|
|
*/
|
|
|
|
else if (!table_states && last_start_times)
|
|
|
|
{
|
|
|
|
hash_destroy(last_start_times);
|
|
|
|
last_start_times = NULL;
|
|
|
|
}
|
|
|
|
|
2017-06-06 20:38:44 +02:00
|
|
|
/*
|
|
|
|
* Process all tables that are being synchronized.
|
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
foreach(lc, table_states)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
if (rstate->state == SUBREL_STATE_SYNCDONE)
|
|
|
|
{
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* Apply has caught up to the position where the table sync has
|
2017-06-13 19:05:59 +02:00
|
|
|
* finished. Mark the table as ready so that the apply will just
|
|
|
|
* continue to replicate it normally.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
if (current_lsn >= rstate->lsn)
|
|
|
|
{
|
|
|
|
rstate->state = SUBREL_STATE_READY;
|
|
|
|
rstate->lsn = current_lsn;
|
2017-05-08 18:07:59 +02:00
|
|
|
if (!started_tx)
|
|
|
|
{
|
|
|
|
StartTransactionCommand();
|
|
|
|
started_tx = true;
|
|
|
|
}
|
2018-04-06 16:00:26 +02:00
|
|
|
|
|
|
|
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
|
|
|
|
rstate->relid, rstate->state,
|
|
|
|
rstate->lsn);
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
LogicalRepWorker *syncworker;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-30 20:57:06 +02:00
|
|
|
/*
|
|
|
|
* Look for a sync worker for this relation.
|
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
2017-06-30 20:57:06 +02:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
|
|
|
|
rstate->relid, false);
|
2017-06-30 20:57:06 +02:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
if (syncworker)
|
|
|
|
{
|
2017-06-30 20:57:06 +02:00
|
|
|
/* Found one, update our copy of its state */
|
2017-03-23 13:36:36 +01:00
|
|
|
SpinLockAcquire(&syncworker->relmutex);
|
|
|
|
rstate->state = syncworker->relstate;
|
|
|
|
rstate->lsn = syncworker->relstate_lsn;
|
2017-06-30 20:57:06 +02:00
|
|
|
if (rstate->state == SUBREL_STATE_SYNCWAIT)
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* Sync worker is waiting for apply. Tell sync worker it
|
|
|
|
* can catchup now.
|
|
|
|
*/
|
|
|
|
syncworker->relstate = SUBREL_STATE_CATCHUP;
|
|
|
|
syncworker->relstate_lsn =
|
|
|
|
Max(syncworker->relstate_lsn, current_lsn);
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
SpinLockRelease(&syncworker->relmutex);
|
2017-06-30 20:57:06 +02:00
|
|
|
|
|
|
|
/* If we told worker to catch up, wait for it. */
|
|
|
|
if (rstate->state == SUBREL_STATE_SYNCWAIT)
|
|
|
|
{
|
|
|
|
/* Signal the sync worker, as it may be waiting for us. */
|
|
|
|
if (syncworker->proc)
|
|
|
|
logicalrep_worker_wakeup_ptr(syncworker);
|
|
|
|
|
|
|
|
/* Now safe to release the LWLock */
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Enter busy loop and wait for synchronization worker to
|
|
|
|
* reach expected state (or die trying).
|
|
|
|
*/
|
|
|
|
if (!started_tx)
|
|
|
|
{
|
|
|
|
StartTransactionCommand();
|
|
|
|
started_tx = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
wait_for_relation_state_change(rstate->relid,
|
|
|
|
SUBREL_STATE_SYNCDONE);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
else
|
2017-06-30 20:57:06 +02:00
|
|
|
{
|
2017-03-23 13:36:36 +01:00
|
|
|
/*
|
2017-06-13 19:05:59 +02:00
|
|
|
* If there is no sync worker for this table yet, count
|
|
|
|
* running sync workers for this subscription, while we have
|
2017-06-30 20:57:06 +02:00
|
|
|
* the lock.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
2017-06-30 20:57:06 +02:00
|
|
|
int nsyncworkers =
|
|
|
|
logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-30 20:57:06 +02:00
|
|
|
/* Now safe to release the LWLock */
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/*
|
2017-06-30 20:57:06 +02:00
|
|
|
* If there are free sync worker slot(s), start a new sync
|
|
|
|
* worker for the table.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
2017-06-30 20:57:06 +02:00
|
|
|
if (nsyncworkers < max_sync_workers_per_subscription)
|
2017-04-27 20:57:26 +02:00
|
|
|
{
|
2017-06-30 20:57:06 +02:00
|
|
|
TimestampTz now = GetCurrentTimestamp();
|
|
|
|
struct tablesync_start_time_mapping *hentry;
|
|
|
|
bool found;
|
|
|
|
|
|
|
|
hentry = hash_search(last_start_times, &rstate->relid,
|
|
|
|
HASH_ENTER, &found);
|
|
|
|
|
|
|
|
if (!found ||
|
|
|
|
TimestampDifferenceExceeds(hentry->last_start_time, now,
|
|
|
|
wal_retrieve_retry_interval))
|
|
|
|
{
|
|
|
|
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
|
|
|
MySubscription->oid,
|
|
|
|
MySubscription->name,
|
|
|
|
MyLogicalRepWorker->userid,
|
|
|
|
rstate->relid);
|
|
|
|
hentry->last_start_time = now;
|
|
|
|
}
|
2017-04-27 20:57:26 +02:00
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-05-08 18:07:59 +02:00
|
|
|
|
|
|
|
if (started_tx)
|
|
|
|
{
|
|
|
|
CommitTransactionCommand();
|
|
|
|
pgstat_report_stat(false);
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-06-30 20:57:06 +02:00
|
|
|
* Process possible state change(s) of tables that are being synchronized.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
void
|
|
|
|
process_syncing_tables(XLogRecPtr current_lsn)
|
|
|
|
{
|
|
|
|
if (am_tablesync_worker())
|
|
|
|
process_syncing_tables_for_sync(current_lsn);
|
|
|
|
else
|
|
|
|
process_syncing_tables_for_apply(current_lsn);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Create list of columns for COPY based on logical relation mapping.
|
|
|
|
*/
|
|
|
|
static List *
|
|
|
|
make_copy_attnamelist(LogicalRepRelMapEntry *rel)
|
|
|
|
{
|
|
|
|
List *attnamelist = NIL;
|
|
|
|
int i;
|
|
|
|
|
2017-05-18 20:16:16 +02:00
|
|
|
for (i = 0; i < rel->remoterel.natts; i++)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
|
|
|
attnamelist = lappend(attnamelist,
|
2017-05-18 20:16:16 +02:00
|
|
|
makeString(rel->remoterel.attnames[i]));
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
|
2017-05-18 20:16:16 +02:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
return attnamelist;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Data source callback for the COPY FROM, which reads from the remote
|
|
|
|
* connection and passes the data back to our local COPY.
|
|
|
|
*/
|
|
|
|
static int
|
|
|
|
copy_read_data(void *outbuf, int minread, int maxread)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
int bytesread = 0;
|
|
|
|
int avail;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-13 16:43:36 +02:00
|
|
|
/* If there are some leftover data from previous read, use it. */
|
2017-03-23 13:36:36 +01:00
|
|
|
avail = copybuf->len - copybuf->cursor;
|
|
|
|
if (avail)
|
|
|
|
{
|
|
|
|
if (avail > maxread)
|
|
|
|
avail = maxread;
|
|
|
|
memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
|
|
|
|
copybuf->cursor += avail;
|
|
|
|
maxread -= avail;
|
|
|
|
bytesread += avail;
|
|
|
|
}
|
|
|
|
|
2017-06-02 20:46:00 +02:00
|
|
|
while (maxread > 0 && bytesread < minread)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
|
|
|
pgsocket fd = PGINVALID_SOCKET;
|
|
|
|
int len;
|
|
|
|
char *buf = NULL;
|
|
|
|
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
/* Try read the data. */
|
|
|
|
len = walrcv_receive(wrconn, &buf, &fd);
|
|
|
|
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
|
|
|
if (len == 0)
|
|
|
|
break;
|
|
|
|
else if (len < 0)
|
|
|
|
return bytesread;
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/* Process the data */
|
|
|
|
copybuf->data = buf;
|
|
|
|
copybuf->len = len;
|
|
|
|
copybuf->cursor = 0;
|
|
|
|
|
|
|
|
avail = copybuf->len - copybuf->cursor;
|
|
|
|
if (avail > maxread)
|
|
|
|
avail = maxread;
|
|
|
|
memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
|
|
|
|
outbuf = (void *) ((char *) outbuf + avail);
|
|
|
|
copybuf->cursor += avail;
|
|
|
|
maxread -= avail;
|
|
|
|
bytesread += avail;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (maxread <= 0 || bytesread >= minread)
|
|
|
|
return bytesread;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Wait for more data or latch.
|
|
|
|
*/
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
2018-11-23 08:16:41 +01:00
|
|
|
(void) WaitLatchOrSocket(MyLatch,
|
|
|
|
WL_SOCKET_READABLE | WL_LATCH_SET |
|
|
|
|
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
|
|
fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
2017-06-07 01:13:00 +02:00
|
|
|
ResetLatch(MyLatch);
|
2017-03-23 13:36:36 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return bytesread;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Get information about remote relation in similar fashion the RELATION
|
|
|
|
* message provides during replication.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
fetch_remote_table_info(char *nspname, char *relname,
|
|
|
|
LogicalRepRelation *lrel)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
WalRcvExecResult *res;
|
|
|
|
StringInfoData cmd;
|
|
|
|
TupleTableSlot *slot;
|
2020-03-19 08:17:50 +01:00
|
|
|
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
|
|
|
|
Oid attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
|
2017-05-17 22:31:56 +02:00
|
|
|
bool isnull;
|
|
|
|
int natt;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
lrel->nspname = nspname;
|
|
|
|
lrel->relname = relname;
|
|
|
|
|
|
|
|
/* First fetch Oid and replica identity. */
|
|
|
|
initStringInfo(&cmd);
|
2020-03-19 08:17:50 +01:00
|
|
|
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
|
2017-05-17 22:31:56 +02:00
|
|
|
" FROM pg_catalog.pg_class c"
|
|
|
|
" INNER JOIN pg_catalog.pg_namespace n"
|
|
|
|
" ON (c.relnamespace = n.oid)"
|
|
|
|
" WHERE n.nspname = %s"
|
2020-03-19 08:17:50 +01:00
|
|
|
" AND c.relname = %s",
|
2017-05-17 22:31:56 +02:00
|
|
|
quote_literal_cstr(nspname),
|
|
|
|
quote_literal_cstr(relname));
|
2020-03-19 08:17:50 +01:00
|
|
|
res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
if (res->status != WALRCV_OK_TUPLES)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
|
|
|
|
nspname, relname, res->err)));
|
|
|
|
|
Introduce notion of different types of slots (without implementing them).
Upcoming work intends to allow pluggable ways to introduce new ways of
storing table data. Accessing those table access methods from the
executor requires TupleTableSlots to be carry tuples in the native
format of such storage methods; otherwise there'll be a significant
conversion overhead.
Different access methods will require different data to store tuples
efficiently (just like virtual, minimal, heap already require fields
in TupleTableSlot). To allow that without requiring additional pointer
indirections, we want to have different structs (embedding
TupleTableSlot) for different types of slots. Thus different types of
slots are needed, which requires adapting creators of slots.
The slot that most efficiently can represent a type of tuple in an
executor node will often depend on the type of slot a child node
uses. Therefore we need to track the type of slot is returned by
nodes, so parent slots can create slots based on that.
Relatedly, JIT compilation of tuple deforming needs to know which type
of slot a certain expression refers to, so it can create an
appropriate deforming function for the type of tuple in the slot.
But not all nodes will only return one type of slot, e.g. an append
node will potentially return different types of slots for each of its
subplans.
Therefore add function that allows to query the type of a node's
result slot, and whether it'll always be the same type (whether it's
fixed). This can be queried using ExecGetResultSlotOps().
The scan, result, inner, outer type of slots are automatically
inferred from ExecInitScanTupleSlot(), ExecInitResultSlot(),
left/right subtrees respectively. If that's not correct for a node,
that can be overwritten using new fields in PlanState.
This commit does not introduce the actually abstracted implementation
of different kind of TupleTableSlots, that will be left for a followup
commit. The different types of slots introduced will, for now, still
use the same backing implementation.
While this already partially invalidates the big comment in
tuptable.h, it seems to make more sense to update it later, when the
different TupleTableSlot implementations actually exist.
Author: Ashutosh Bapat and Andres Freund, with changes by Amit Khandekar
Discussion: https://postgr.es/m/20181105210039.hh4vvi4vwoq5ba2q@alap3.anarazel.de
2018-11-16 07:00:30 +01:00
|
|
|
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
|
2017-03-23 13:36:36 +01:00
|
|
|
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("table \"%s.%s\" not found on publisher",
|
|
|
|
nspname, relname)));
|
|
|
|
|
|
|
|
lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
|
|
|
|
Assert(!isnull);
|
|
|
|
lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
|
|
|
|
Assert(!isnull);
|
2020-03-19 08:17:50 +01:00
|
|
|
lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
|
|
|
|
Assert(!isnull);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
ExecDropSingleTupleTableSlot(slot);
|
|
|
|
walrcv_clear_result(res);
|
|
|
|
|
|
|
|
/* Now fetch columns. */
|
|
|
|
resetStringInfo(&cmd);
|
|
|
|
appendStringInfo(&cmd,
|
|
|
|
"SELECT a.attname,"
|
|
|
|
" a.atttypid,"
|
|
|
|
" a.atttypmod,"
|
|
|
|
" a.attnum = ANY(i.indkey)"
|
|
|
|
" FROM pg_catalog.pg_attribute a"
|
|
|
|
" LEFT JOIN pg_catalog.pg_index i"
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:35:54 +02:00
|
|
|
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
|
2017-03-23 13:36:36 +01:00
|
|
|
" WHERE a.attnum > 0::pg_catalog.int2"
|
2019-03-30 08:13:09 +01:00
|
|
|
" AND NOT a.attisdropped %s"
|
2017-03-23 13:36:36 +01:00
|
|
|
" AND a.attrelid = %u"
|
|
|
|
" ORDER BY a.attnum",
|
2019-03-30 08:13:09 +01:00
|
|
|
lrel->remoteid,
|
|
|
|
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
|
|
|
|
lrel->remoteid);
|
2020-03-19 08:17:50 +01:00
|
|
|
res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
if (res->status != WALRCV_OK_TUPLES)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not fetch table info for table \"%s.%s\": %s",
|
|
|
|
nspname, relname, res->err)));
|
|
|
|
|
2017-06-13 16:43:36 +02:00
|
|
|
/* We don't know the number of rows coming, so allocate enough space. */
|
2017-03-23 13:36:36 +01:00
|
|
|
lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
|
|
|
|
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
|
|
|
|
lrel->attkeys = NULL;
|
|
|
|
|
|
|
|
natt = 0;
|
Introduce notion of different types of slots (without implementing them).
Upcoming work intends to allow pluggable ways to introduce new ways of
storing table data. Accessing those table access methods from the
executor requires TupleTableSlots to be carry tuples in the native
format of such storage methods; otherwise there'll be a significant
conversion overhead.
Different access methods will require different data to store tuples
efficiently (just like virtual, minimal, heap already require fields
in TupleTableSlot). To allow that without requiring additional pointer
indirections, we want to have different structs (embedding
TupleTableSlot) for different types of slots. Thus different types of
slots are needed, which requires adapting creators of slots.
The slot that most efficiently can represent a type of tuple in an
executor node will often depend on the type of slot a child node
uses. Therefore we need to track the type of slot is returned by
nodes, so parent slots can create slots based on that.
Relatedly, JIT compilation of tuple deforming needs to know which type
of slot a certain expression refers to, so it can create an
appropriate deforming function for the type of tuple in the slot.
But not all nodes will only return one type of slot, e.g. an append
node will potentially return different types of slots for each of its
subplans.
Therefore add function that allows to query the type of a node's
result slot, and whether it'll always be the same type (whether it's
fixed). This can be queried using ExecGetResultSlotOps().
The scan, result, inner, outer type of slots are automatically
inferred from ExecInitScanTupleSlot(), ExecInitResultSlot(),
left/right subtrees respectively. If that's not correct for a node,
that can be overwritten using new fields in PlanState.
This commit does not introduce the actually abstracted implementation
of different kind of TupleTableSlots, that will be left for a followup
commit. The different types of slots introduced will, for now, still
use the same backing implementation.
While this already partially invalidates the big comment in
tuptable.h, it seems to make more sense to update it later, when the
different TupleTableSlot implementations actually exist.
Author: Ashutosh Bapat and Andres Freund, with changes by Amit Khandekar
Discussion: https://postgr.es/m/20181105210039.hh4vvi4vwoq5ba2q@alap3.anarazel.de
2018-11-16 07:00:30 +01:00
|
|
|
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
|
2017-03-23 13:36:36 +01:00
|
|
|
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
|
|
|
{
|
|
|
|
lrel->attnames[natt] =
|
2017-04-14 18:54:09 +02:00
|
|
|
TextDatumGetCString(slot_getattr(slot, 1, &isnull));
|
2017-03-23 13:36:36 +01:00
|
|
|
Assert(!isnull);
|
|
|
|
lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
|
|
|
|
Assert(!isnull);
|
|
|
|
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
|
|
|
|
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
|
|
|
|
|
|
|
|
/* Should never happen. */
|
|
|
|
if (++natt >= MaxTupleAttributeNumber)
|
|
|
|
elog(ERROR, "too many columns in remote table \"%s.%s\"",
|
2017-05-17 22:31:56 +02:00
|
|
|
nspname, relname);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
ExecClearTuple(slot);
|
|
|
|
}
|
|
|
|
ExecDropSingleTupleTableSlot(slot);
|
|
|
|
|
|
|
|
lrel->natts = natt;
|
|
|
|
|
|
|
|
walrcv_clear_result(res);
|
|
|
|
pfree(cmd.data);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Copy existing data of a table from publisher.
|
|
|
|
*
|
|
|
|
* Caller is responsible for locking the local relation.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
copy_table(Relation rel)
|
|
|
|
{
|
|
|
|
LogicalRepRelMapEntry *relmapentry;
|
2017-05-17 22:31:56 +02:00
|
|
|
LogicalRepRelation lrel;
|
|
|
|
WalRcvExecResult *res;
|
|
|
|
StringInfoData cmd;
|
2017-03-23 13:36:36 +01:00
|
|
|
CopyState cstate;
|
|
|
|
List *attnamelist;
|
2017-04-18 05:22:04 +02:00
|
|
|
ParseState *pstate;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Get the publisher relation info. */
|
|
|
|
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
|
|
|
|
RelationGetRelationName(rel), &lrel);
|
|
|
|
|
|
|
|
/* Put the relation into relmap. */
|
|
|
|
logicalrep_relmap_update(&lrel);
|
|
|
|
|
|
|
|
/* Map the publisher relation to local one. */
|
|
|
|
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
|
|
|
|
Assert(rel == relmapentry->localrel);
|
|
|
|
|
|
|
|
/* Start copy on the publisher. */
|
|
|
|
initStringInfo(&cmd);
|
2020-03-19 08:17:50 +01:00
|
|
|
if (lrel.relkind == RELKIND_RELATION)
|
|
|
|
appendStringInfo(&cmd, "COPY %s TO STDOUT",
|
|
|
|
quote_qualified_identifier(lrel.nspname, lrel.relname));
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* For non-tables, we need to do COPY (SELECT ...), but we can't just
|
|
|
|
* do SELECT * because we need to not copy generated columns.
|
|
|
|
*/
|
2020-10-15 09:35:17 +02:00
|
|
|
appendStringInfoString(&cmd, "COPY (SELECT ");
|
2020-03-19 08:17:50 +01:00
|
|
|
for (int i = 0; i < lrel.natts; i++)
|
|
|
|
{
|
|
|
|
appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
|
|
|
|
if (i < lrel.natts - 1)
|
|
|
|
appendStringInfoString(&cmd, ", ");
|
|
|
|
}
|
|
|
|
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
|
|
|
|
quote_qualified_identifier(lrel.nspname, lrel.relname));
|
|
|
|
}
|
2017-03-23 13:36:36 +01:00
|
|
|
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
|
|
|
|
pfree(cmd.data);
|
|
|
|
if (res->status != WALRCV_OK_COPY_OUT)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
|
|
|
|
lrel.nspname, lrel.relname, res->err)));
|
|
|
|
walrcv_clear_result(res);
|
|
|
|
|
|
|
|
copybuf = makeStringInfo();
|
|
|
|
|
2017-04-18 05:22:04 +02:00
|
|
|
pstate = make_parsestate(NULL);
|
Make parser rely more heavily on the ParseNamespaceItem data structure.
When I added the ParseNamespaceItem data structure (in commit 5ebaaa494),
it wasn't very tightly integrated into the parser's APIs. In the wake of
adding p_rtindex to that struct (commit b541e9acc), there is a good reason
to make more use of it: by passing around ParseNamespaceItem pointers
instead of bare RTE pointers, we can get rid of various messy methods for
passing back or deducing the rangetable index of an RTE during parsing.
Hence, refactor the addRangeTableEntryXXX functions to build and return
a ParseNamespaceItem struct, not just the RTE proper; and replace
addRTEtoQuery with addNSItemToQuery, which is passed a ParseNamespaceItem
rather than building one internally.
Also, add per-column data (a ParseNamespaceColumn array) to each
ParseNamespaceItem. These arrays are built during addRangeTableEntryXXX,
where we have column type data at hand so that it's nearly free to fill
the data structure. Later, when we need to build Vars referencing RTEs,
we can use the ParseNamespaceColumn info to avoid the rather expensive
operations done in get_rte_attribute_type() or expandRTE().
get_rte_attribute_type() is indeed dead code now, so I've removed it.
This makes for a useful improvement in parse analysis speed, around 20%
in one moderately-complex test query.
The ParseNamespaceColumn structs also include Var identity information
(varno/varattno). That info isn't actually being used in this patch,
except that p_varno == 0 is a handy test for a dropped column.
A follow-on patch will make more use of it.
Discussion: https://postgr.es/m/2461.1577764221@sss.pgh.pa.us
2020-01-02 17:29:01 +01:00
|
|
|
(void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
|
|
|
|
NULL, false, false);
|
2017-04-18 05:22:04 +02:00
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
attnamelist = make_copy_attnamelist(relmapentry);
|
2017-04-18 05:22:04 +02:00
|
|
|
cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Do the copy */
|
|
|
|
(void) CopyFrom(cstate);
|
|
|
|
|
|
|
|
logicalrep_rel_close(relmapentry, NoLock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Start syncing the table in the sync worker.
|
|
|
|
*
|
2017-04-26 18:04:44 +02:00
|
|
|
* The returned slot name is palloc'ed in current memory context.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
char *
|
|
|
|
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
char *slotname;
|
|
|
|
char *err;
|
2017-04-20 16:12:57 +02:00
|
|
|
char relstate;
|
|
|
|
XLogRecPtr relstate_lsn;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Check the state of the table synchronization. */
|
|
|
|
StartTransactionCommand();
|
2017-04-20 16:12:57 +02:00
|
|
|
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
|
|
|
|
MyLogicalRepWorker->relid,
|
2017-06-09 15:20:54 +02:00
|
|
|
&relstate_lsn, true);
|
2017-04-20 16:12:57 +02:00
|
|
|
CommitTransactionCommand();
|
|
|
|
|
2017-03-23 13:36:36 +01:00
|
|
|
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
2017-04-20 16:12:57 +02:00
|
|
|
MyLogicalRepWorker->relstate = relstate;
|
|
|
|
MyLogicalRepWorker->relstate_lsn = relstate_lsn;
|
2017-03-23 13:36:36 +01:00
|
|
|
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* To build a slot name for the sync work, we are limited to NAMEDATALEN -
|
|
|
|
* 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
|
|
|
|
* and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
|
|
|
|
* NAMEDATALEN on the remote that matters, but this scheme will also work
|
|
|
|
* reasonably if that is different.)
|
|
|
|
*/
|
Phase 2 of pgindent updates.
Change pg_bsd_indent to follow upstream rules for placement of comments
to the right of code, and remove pgindent hack that caused comments
following #endif to not obey the general rule.
Commit e3860ffa4dd0dad0dd9eea4be9cc1412373a8c89 wasn't actually using
the published version of pg_bsd_indent, but a hacked-up version that
tried to minimize the amount of movement of comments to the right of
code. The situation of interest is where such a comment has to be
moved to the right of its default placement at column 33 because there's
code there. BSD indent has always moved right in units of tab stops
in such cases --- but in the previous incarnation, indent was working
in 8-space tab stops, while now it knows we use 4-space tabs. So the
net result is that in about half the cases, such comments are placed
one tab stop left of before. This is better all around: it leaves
more room on the line for comment text, and it means that in such
cases the comment uniformly starts at the next 4-space tab stop after
the code, rather than sometimes one and sometimes two tabs after.
Also, ensure that comments following #endif are indented the same
as comments following other preprocessor commands such as #else.
That inconsistency turns out to have been self-inflicted damage
from a poorly-thought-through post-indent "fixup" in pgindent.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
2017-06-21 21:18:54 +02:00
|
|
|
StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
|
2017-03-23 13:36:36 +01:00
|
|
|
slotname = psprintf("%.*s_%u_sync_%u",
|
|
|
|
NAMEDATALEN - 28,
|
|
|
|
MySubscription->slotname,
|
|
|
|
MySubscription->oid,
|
|
|
|
MyLogicalRepWorker->relid);
|
|
|
|
|
2017-06-07 03:51:31 +02:00
|
|
|
/*
|
|
|
|
* Here we use the slot name instead of the subscription name as the
|
|
|
|
* application_name, so that it is different from the main apply worker,
|
|
|
|
* so that synchronous replication can distinguish them.
|
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
|
|
|
|
if (wrconn == NULL)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("could not connect to the publisher: %s", err)));
|
|
|
|
|
|
|
|
switch (MyLogicalRepWorker->relstate)
|
|
|
|
{
|
|
|
|
case SUBREL_STATE_INIT:
|
|
|
|
case SUBREL_STATE_DATASYNC:
|
|
|
|
{
|
|
|
|
Relation rel;
|
2017-05-17 22:31:56 +02:00
|
|
|
WalRcvExecResult *res;
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
|
|
|
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
|
|
|
|
MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
|
|
|
|
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
|
|
|
|
|
|
|
/* Update the state and make it visible to others. */
|
|
|
|
StartTransactionCommand();
|
2018-04-06 16:00:26 +02:00
|
|
|
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
|
|
|
|
MyLogicalRepWorker->relid,
|
|
|
|
MyLogicalRepWorker->relstate,
|
|
|
|
MyLogicalRepWorker->relstate_lsn);
|
2017-03-23 13:36:36 +01:00
|
|
|
CommitTransactionCommand();
|
2017-05-08 18:07:59 +02:00
|
|
|
pgstat_report_stat(false);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/*
|
2017-06-13 16:43:36 +02:00
|
|
|
* We want to do the table data sync in a single transaction.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
StartTransactionCommand();
|
|
|
|
|
|
|
|
/*
|
2017-06-13 16:43:36 +02:00
|
|
|
* Use a standard write lock here. It might be better to
|
2017-06-13 19:05:59 +02:00
|
|
|
* disallow access to the table while it's being synchronized.
|
|
|
|
* But we don't want to block the main apply process from
|
|
|
|
* working and it has to open the relation in RowExclusiveLock
|
|
|
|
* when remapping remote relation id to local one.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
2019-01-21 19:32:19 +01:00
|
|
|
rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/*
|
2017-06-13 16:43:36 +02:00
|
|
|
* Create a temporary slot for the sync process. We do this
|
2017-06-13 19:05:59 +02:00
|
|
|
* inside the transaction so that we can use the snapshot made
|
|
|
|
* by the slot to get existing data.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
res = walrcv_exec(wrconn,
|
|
|
|
"BEGIN READ ONLY ISOLATION LEVEL "
|
|
|
|
"REPEATABLE READ", 0, NULL);
|
|
|
|
if (res->status != WALRCV_OK_COMMAND)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("table copy could not start transaction on publisher"),
|
|
|
|
errdetail("The error was: %s", res->err)));
|
|
|
|
walrcv_clear_result(res);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Create new temporary logical decoding slot.
|
|
|
|
*
|
2017-05-17 22:31:56 +02:00
|
|
|
* We'll use slot for data copy so make sure the snapshot is
|
2017-06-13 16:43:36 +02:00
|
|
|
* used for the transaction; that way the COPY will get data
|
2017-05-17 22:31:56 +02:00
|
|
|
* that is consistent with the lsn used by the slot to start
|
|
|
|
* decoding.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
walrcv_create_slot(wrconn, slotname, true,
|
|
|
|
CRS_USE_SNAPSHOT, origin_startpos);
|
|
|
|
|
2017-08-02 16:59:01 +02:00
|
|
|
PushActiveSnapshot(GetTransactionSnapshot());
|
2017-03-23 13:36:36 +01:00
|
|
|
copy_table(rel);
|
2017-08-02 16:59:01 +02:00
|
|
|
PopActiveSnapshot();
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
|
|
|
|
if (res->status != WALRCV_OK_COMMAND)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errmsg("table copy could not finish transaction on publisher"),
|
|
|
|
errdetail("The error was: %s", res->err)));
|
|
|
|
walrcv_clear_result(res);
|
|
|
|
|
2019-01-21 19:32:19 +01:00
|
|
|
table_close(rel, NoLock);
|
2017-03-23 13:36:36 +01:00
|
|
|
|
|
|
|
/* Make the copy visible. */
|
|
|
|
CommandCounterIncrement();
|
|
|
|
|
|
|
|
/*
|
2017-05-17 22:31:56 +02:00
|
|
|
* We are done with the initial data synchronization, update
|
|
|
|
* the state.
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
|
|
|
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
|
|
|
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
|
|
|
|
MyLogicalRepWorker->relstate_lsn = *origin_startpos;
|
|
|
|
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
|
|
|
|
2017-06-06 20:38:44 +02:00
|
|
|
/* Wait for main apply worker to tell us to catchup. */
|
|
|
|
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
|
|
|
|
|
2017-06-13 19:05:59 +02:00
|
|
|
/*----------
|
2017-06-06 20:38:44 +02:00
|
|
|
* There are now two possible states here:
|
|
|
|
* a) Sync is behind the apply. If that's the case we need to
|
2017-06-13 19:05:59 +02:00
|
|
|
* catch up with it by consuming the logical replication
|
|
|
|
* stream up to the relstate_lsn. For that, we exit this
|
|
|
|
* function and continue in ApplyWorkerMain().
|
2017-06-06 20:38:44 +02:00
|
|
|
* b) Sync is caught up with the apply. So it can just set
|
2017-06-13 19:05:59 +02:00
|
|
|
* the state to SYNCDONE and finish.
|
|
|
|
*----------
|
2017-03-23 13:36:36 +01:00
|
|
|
*/
|
2017-06-06 20:38:44 +02:00
|
|
|
if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
|
2017-03-23 13:36:36 +01:00
|
|
|
{
|
2017-06-06 20:38:44 +02:00
|
|
|
/*
|
|
|
|
* Update the new state in catalog. No need to bother
|
|
|
|
* with the shmem state as we are exiting for good.
|
|
|
|
*/
|
2018-04-06 16:00:26 +02:00
|
|
|
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
|
|
|
|
MyLogicalRepWorker->relid,
|
|
|
|
SUBREL_STATE_SYNCDONE,
|
|
|
|
*origin_startpos);
|
2017-03-23 13:36:36 +01:00
|
|
|
finish_sync_worker();
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case SUBREL_STATE_SYNCDONE:
|
|
|
|
case SUBREL_STATE_READY:
|
2017-06-09 15:20:54 +02:00
|
|
|
case SUBREL_STATE_UNKNOWN:
|
2017-06-13 19:05:59 +02:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Nothing to do here but finish. (UNKNOWN means the relation was
|
2017-06-09 15:20:54 +02:00
|
|
|
* removed from pg_subscription_rel before the sync worker could
|
2017-06-13 19:05:59 +02:00
|
|
|
* start.)
|
|
|
|
*/
|
2017-03-23 13:36:36 +01:00
|
|
|
finish_sync_worker();
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
elog(ERROR, "unknown relation state \"%c\"",
|
|
|
|
MyLogicalRepWorker->relstate);
|
|
|
|
}
|
|
|
|
|
|
|
|
return slotname;
|
|
|
|
}
|