postgresql/src/backend/replication/pgoutput/pgoutput.c

2363 lines
69 KiB
C

/*-------------------------------------------------------------------------
*
* pgoutput.c
* Logical Replication output plugin
*
* Copyright (c) 2012-2023, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/pgoutput/pgoutput.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
PG_MODULE_MAGIC;
static void pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt, bool is_init);
static void pgoutput_shutdown(LogicalDecodingContext *ctx);
static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation relation,
ReorderBufferChange *change);
static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change);
static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
static bool in_streaming;
static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
/*
* Only 3 publication actions are used for row filtering ("insert", "update",
* "delete"). See RelationSyncEntry.exprstate[].
*/
enum RowFilterPubAction
{
PUBACTION_INSERT,
PUBACTION_UPDATE,
PUBACTION_DELETE
};
#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
/*
* Entry in the map used to remember which relation schemas we sent.
*
* The schema_sent flag determines if the current schema record for the
* relation (and for its ancestor if publish_as_relid is set) was already
* sent to the subscriber (in which case we don't need to send it again).
*
* The schema cache on downstream is however updated only at commit time,
* and with streamed transactions the commit order may be different from
* the order the transactions are sent in. Also, the (sub) transactions
* might get aborted so we need to send the schema for each (sub) transaction
* so that we don't lose the schema information on abort. For handling this,
* we maintain the list of xids (streamed_txns) for those we have already sent
* the schema.
*
* For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors.
*/
typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
bool replicate_valid; /* overall validity flag for entry */
bool schema_sent;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
/* are we publishing this rel? */
PublicationActions pubactions;
/*
* ExprState array for row filter. Different publication actions don't
* allow multiple expressions to always be combined into one, because
* updates or deletes restrict the column in expression to be part of the
* replica identity index whereas inserts do not have this restriction, so
* there is one ExprState per publication action.
*/
ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
EState *estate; /* executor state used for row filter */
TupleTableSlot *new_slot; /* slot for storing new tuple */
TupleTableSlot *old_slot; /* slot for storing old tuple */
/*
* OID of the relation to publish changes as. For a partition, this may
* be set to one of its ancestors whose schema will be used when
* replicating changes, if publish_via_partition_root is set for the
* publication.
*/
Oid publish_as_relid;
/*
* Map used when replicating using an ancestor's schema to convert tuples
* from partition's type to the ancestor's; NULL if publish_as_relid is
* same as 'relid' or if unnecessary due to partition and the ancestor
* having identical TupleDesc.
*/
AttrMap *attrmap;
/*
* Columns included in the publication, or NULL if all columns are
* included implicitly. Note that the attnums in this bitmap are not
* shifted by FirstLowInvalidHeapAttributeNumber.
*/
Bitmapset *columns;
/*
* Private context to store additional data for this entry - state for the
* row filter expressions, column list, etc.
*/
MemoryContext entry_cxt;
} RelationSyncEntry;
/*
* Maintain a per-transaction level variable to track whether the transaction
* has sent BEGIN. BEGIN is only sent when the first change in a transaction
* is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
* messages for empty transactions which saves network bandwidth.
*
* This optimization is not used for prepared transactions because if the
* WALSender restarts after prepare of a transaction and before commit prepared
* of the same transaction then we won't be able to figure out if we have
* skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
* because we would have lost the in-memory txndata information that was
* present prior to the restart. This will result in sending a spurious
* COMMIT PREPARED without a corresponding prepared transaction at the
* downstream which would lead to an error when it tries to process it.
*
* XXX We could achieve this optimization by changing protocol to send
* additional information so that downstream can detect that the corresponding
* prepare has not been sent. However, adding such a check for every
* transaction in the downstream could be costly so we might want to do it
* optionally.
*
* We also don't have this optimization for streamed transactions because
* they can contain prepared transactions.
*/
typedef struct PGOutputTxnData
{
bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
} PGOutputTxnData;
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static void init_tuple_slot(PGOutputData *data, Relation relation,
RelationSyncEntry *entry);
/* row filter routines */
static EState *create_estate_for_relation(Relation rel);
static void pgoutput_row_filter_init(PGOutputData *data,
List *publications,
RelationSyncEntry *entry);
static bool pgoutput_row_filter_exec_expr(ExprState *state,
ExprContext *econtext);
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
TupleTableSlot **new_slot_ptr,
RelationSyncEntry *entry,
ReorderBufferChangeType *action);
/* column list routines */
static void pgoutput_column_list_init(PGOutputData *data,
List *publications,
RelationSyncEntry *entry);
/*
* Specify output plugin callbacks
*/
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
cb->prepare_cb = pgoutput_prepare_txn;
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
cb->stream_start_cb = pgoutput_stream_start;
cb->stream_stop_cb = pgoutput_stream_stop;
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
}
static void
parse_output_parameters(List *options, PGOutputData *data)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
bool origin_option_given = false;
data->binary = false;
data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
foreach(lc, options)
{
DefElem *defel = (DefElem *) lfirst(lc);
Assert(defel->arg == NULL || IsA(defel->arg, String));
/* Check each param, whether or not we recognize it */
if (strcmp(defel->defname, "proto_version") == 0)
{
unsigned long parsed;
char *endptr;
if (protocol_version_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
protocol_version_given = true;
errno = 0;
parsed = strtoul(strVal(defel->arg), &endptr, 10);
if (errno != 0 || *endptr != '\0')
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid proto_version")));
if (parsed > PG_UINT32_MAX)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("proto_version \"%s\" out of range",
strVal(defel->arg))));
data->protocol_version = (uint32) parsed;
}
else if (strcmp(defel->defname, "publication_names") == 0)
{
if (publication_names_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
publication_names_given = true;
if (!SplitIdentifierString(strVal(defel->arg), ',',
&data->publication_names))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("invalid publication_names syntax")));
}
else if (strcmp(defel->defname, "binary") == 0)
{
if (binary_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
binary_option_given = true;
data->binary = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "messages") == 0)
{
if (messages_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
messages_option_given = true;
data->messages = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
streaming_given = true;
data->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
if (two_phase_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
two_phase_option_given = true;
data->two_phase = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "origin") == 0)
{
if (origin_option_given)
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"));
origin_option_given = true;
data->origin = defGetString(defel);
if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
publish_no_origin = true;
else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
publish_no_origin = false;
else
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized origin value: \"%s\"", data->origin));
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
}
/*
* Initialize this plugin
*/
static void
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
PGOutputData *data = palloc0(sizeof(PGOutputData));
static bool publication_callback_registered = false;
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
"logical replication output context",
ALLOCSET_DEFAULT_SIZES);
data->cachectx = AllocSetContextCreate(ctx->context,
"logical replication cache context",
ALLOCSET_DEFAULT_SIZES);
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
/*
* This is replication start and not slot initialization.
*
* Parse and validate options passed by the client.
*/
if (!is_init)
{
/* Parse the params and ERROR if we see any we don't recognize */
parse_output_parameters(ctx->output_plugin_options, data);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
if (data->publication_names == NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("publication_names parameter missing")));
/*
* Decide whether to enable streaming. It is disabled by default, in
* which case we just update the flag in decoding context. Otherwise
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
if (data->streaming == LOGICALREP_STREAM_OFF)
ctx->streaming = false;
else if (data->streaming == LOGICALREP_STREAM_ON &&
data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
* remains enabled if the previous start-up has done so. But we only
* allow the option to be passed in with sufficient version of the
* protocol, and when the output plugin supports it.
*/
if (!data->two_phase)
ctx->twophase_opt_given = false;
else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
else if (!ctx->twophase)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("two-phase commit requested, but not supported by output plugin")));
else
ctx->twophase_opt_given = true;
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
/*
* Register callback for pg_publication if we didn't already do that
* during some previous call in this process.
*/
if (!publication_callback_registered)
{
CacheRegisterSyscacheCallback(PUBLICATIONOID,
publication_invalidation_cb,
(Datum) 0);
publication_callback_registered = true;
}
/* Initialize relation schema cache. */
init_rel_sync_cache(CacheMemoryContext);
}
else
{
/*
* Disable the streaming and prepared transactions during the slot
* initialization mode.
*/
ctx->streaming = false;
ctx->twophase = false;
}
}
/*
* BEGIN callback.
*
* Don't send the BEGIN message here instead postpone it until the first
* change. In logical replication, a common scenario is to replicate a set of
* tables (instead of all tables) and transactions whose changes were on
* the table(s) that are not published will produce empty transactions. These
* empty transactions will send BEGIN and COMMIT messages to subscribers,
* using bandwidth on something with little/no use for logical replication.
*/
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
sizeof(PGOutputTxnData));
txn->output_plugin_private = txndata;
}
/*
* Send BEGIN.
*
* This is called while processing the first change of the transaction.
*/
static void
pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
Assert(txndata);
Assert(!txndata->sent_begin_txn);
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
txndata->sent_begin_txn = true;
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
OutputPluginWrite(ctx, true);
}
/*
* COMMIT callback
*/
static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
bool sent_begin_txn;
Assert(txndata);
/*
* We don't need to send the commit message unless some relevant change
* from this transaction has been sent to the downstream.
*/
sent_begin_txn = txndata->sent_begin_txn;
OutputPluginUpdateProgress(ctx, !sent_begin_txn);
pfree(txndata);
txn->output_plugin_private = NULL;
if (!sent_begin_txn)
{
elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
return;
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
}
/*
* BEGIN PREPARE callback
*/
static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
OutputPluginWrite(ctx, true);
}
/*
* PREPARE callback
*/
static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
}
/*
* COMMIT PREPARED callback
*/
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
}
/*
* ROLLBACK PREPARED callback
*/
static void
pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
prepare_time);
OutputPluginWrite(ctx, true);
}
/*
* Write the current schema of the relation and its ancestor (if any) if not
* done yet.
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
/*
* Remember XID of the (sub)transaction for the change. We don't care if
* it's top-level transaction or not (we have already sent that XID in
* start of the current streaming block).
*
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
if (in_streaming)
xid = change->txn->xid;
if (rbtxn_is_subtxn(change->txn))
topxid = rbtxn_get_toptxn(change->txn)->xid;
else
topxid = xid;
/*
* Do we need to send the schema? We do track streamed transactions
* separately, because those may be applied later (and the regular
* transactions won't see their effects until then) and in an order that
* we don't know at this point.
*
* XXX There is a scope of optimization here. Currently, we always send
* the schema first time in a streaming transaction but we can probably
* avoid that by checking 'relentry->schema_sent' flag. However, before
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
if (in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
/* Nothing to do if we already sent the schema. */
if (schema_sent)
return;
/*
* Send the schema. If the changes will be published using an ancestor's
* schema, not the relation's own, send that ancestor's schema before
* sending relation's own (XXX - maybe sending only the former suffices?).
*/
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
RelationClose(ancestor);
}
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
}
/*
* Sends a relation
*/
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
Bitmapset *columns)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
/*
* Write out type info if needed. We do that only for user-created types.
* We use FirstGenbkiObjectId as the cutoff, so that we only consider
* objects with hand-assigned OIDs to be "built in", not for instance any
* function or type defined in the information_schema. This is important
* because only hand-assigned OIDs can be expected to remain stable across
* major versions.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped || att->attgenerated)
continue;
if (att->atttypid < FirstGenbkiObjectId)
continue;
/* Skip this attribute if it's not present in the column list */
if (columns != NULL && !bms_is_member(att->attnum, columns))
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, xid, relation, columns);
OutputPluginWrite(ctx, false);
}
/*
* Executor state preparation for evaluation of row filter expressions for the
* specified relation.
*/
static EState *
create_estate_for_relation(Relation rel)
{
EState *estate;
RangeTblEntry *rte;
List *perminfos = NIL;
estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = RelationGetRelid(rel);
rte->relkind = rel->rd_rel->relkind;
rte->rellockmode = AccessShareLock;
addRTEPermissionInfo(&perminfos, rte);
ExecInitRangeTable(estate, list_make1(rte), perminfos);
estate->es_output_cid = GetCurrentCommandId(false);
return estate;
}
/*
* Evaluates row filter.
*
* If the row filter evaluates to NULL, it is taken as false i.e. the change
* isn't replicated.
*/
static bool
pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
{
Datum ret;
bool isnull;
Assert(state != NULL);
ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
isnull ? "true" : "false");
if (isnull)
return false;
return DatumGetBool(ret);
}
/*
* Make sure the per-entry memory context exists.
*/
static void
pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
{
Relation relation;
/* The context may already exist, in which case bail out. */
if (entry->entry_cxt)
return;
relation = RelationIdGetRelation(entry->publish_as_relid);
entry->entry_cxt = AllocSetContextCreate(data->cachectx,
"entry private context",
ALLOCSET_SMALL_SIZES);
MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
RelationGetRelationName(relation));
}
/*
* Initialize the row filter.
*/
static void
pgoutput_row_filter_init(PGOutputData *data, List *publications,
RelationSyncEntry *entry)
{
ListCell *lc;
List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
bool no_filter[] = {false, false, false}; /* One per pubaction */
MemoryContext oldctx;
int idx;
bool has_filter = true;
Oid schemaid = get_rel_namespace(entry->publish_as_relid);
/*
* Find if there are any row filters for this relation. If there are, then
* prepare the necessary ExprState and cache it in entry->exprstate. To
* build an expression state, we need to ensure the following:
*
* All the given publication-table mappings must be checked.
*
* Multiple publications might have multiple row filters for this
* relation. Since row filter usage depends on the DML operation, there
* are multiple lists (one for each operation) to which row filters will
* be appended.
*
* FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
* expression" so it takes precedence.
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
HeapTuple rftuple = NULL;
Datum rfdatum = 0;
bool pub_no_filter = true;
/*
* If the publication is FOR ALL TABLES, or the publication includes a
* FOR TABLES IN SCHEMA where the table belongs to the referred
* schema, then it is treated the same as if there are no row filters
* (even if other publications have a row filter).
*/
if (!pub->alltables &&
!SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
ObjectIdGetDatum(schemaid),
ObjectIdGetDatum(pub->oid)))
{
/*
* Check for the presence of a row filter in this publication.
*/
rftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(entry->publish_as_relid),
ObjectIdGetDatum(pub->oid));
if (HeapTupleIsValid(rftuple))
{
/* Null indicates no filter. */
rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
Anum_pg_publication_rel_prqual,
&pub_no_filter);
}
}
if (pub_no_filter)
{
if (rftuple)
ReleaseSysCache(rftuple);
no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
/*
* Quick exit if all the DML actions are publicized via this
* publication.
*/
if (no_filter[PUBACTION_INSERT] &&
no_filter[PUBACTION_UPDATE] &&
no_filter[PUBACTION_DELETE])
{
has_filter = false;
break;
}
/* No additional work for this publication. Next one. */
continue;
}
/* Form the per pubaction row filter lists. */
if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
TextDatumGetCString(rfdatum));
if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
TextDatumGetCString(rfdatum));
if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
TextDatumGetCString(rfdatum));
ReleaseSysCache(rftuple);
} /* loop all subscribed publications */
/* Clean the row filter */
for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
{
if (no_filter[idx])
{
list_free_deep(rfnodes[idx]);
rfnodes[idx] = NIL;
}
}
if (has_filter)
{
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
pgoutput_ensure_entry_cxt(data, entry);
/*
* Now all the filters for all pubactions are known. Combine them when
* their pubactions are the same.
*/
oldctx = MemoryContextSwitchTo(entry->entry_cxt);
entry->estate = create_estate_for_relation(relation);
for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
{
List *filters = NIL;
Expr *rfnode;
if (rfnodes[idx] == NIL)
continue;
foreach(lc, rfnodes[idx])
filters = lappend(filters, stringToNode((char *) lfirst(lc)));
/* combine the row filter and cache the ExprState */
rfnode = make_orclause(filters);
entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
} /* for each pubaction */
MemoryContextSwitchTo(oldctx);
RelationClose(relation);
}
}
/*
* Initialize the column list.
*/
static void
pgoutput_column_list_init(PGOutputData *data, List *publications,
RelationSyncEntry *entry)
{
ListCell *lc;
bool first = true;
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
/*
* Find if there are any column lists for this relation. If there are,
* build a bitmap using the column lists.
*
* Multiple publications might have multiple column lists for this
* relation.
*
* Note that we don't support the case where the column list is different
* for the same table when combining publications. See comments atop
* fetch_table_list. But one can later change the publication so we still
* need to check all the given publication-table mappings and report an
* error if any publications have a different column list.
*
* FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
HeapTuple cftuple = NULL;
Datum cfdatum = 0;
Bitmapset *cols = NULL;
/*
* If the publication is FOR ALL TABLES then it is treated the same as
* if there are no column lists (even if other publications have a
* list).
*/
if (!pub->alltables)
{
bool pub_no_list = true;
/*
* Check for the presence of a column list in this publication.
*
* Note: If we find no pg_publication_rel row, it's a publication
* defined for a whole schema, so it can't have a column list,
* just like a FOR ALL TABLES publication.
*/
cftuple = SearchSysCache2(PUBLICATIONRELMAP,
ObjectIdGetDatum(entry->publish_as_relid),
ObjectIdGetDatum(pub->oid));
if (HeapTupleIsValid(cftuple))
{
/* Lookup the column list attribute. */
cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
Anum_pg_publication_rel_prattrs,
&pub_no_list);
/* Build the column list bitmap in the per-entry context. */
if (!pub_no_list) /* when not null */
{
int i;
int nliveatts = 0;
TupleDesc desc = RelationGetDescr(relation);
pgoutput_ensure_entry_cxt(data, entry);
cols = pub_collist_to_bitmapset(cols, cfdatum,
entry->entry_cxt);
/* Get the number of live attributes. */
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (att->attisdropped || att->attgenerated)
continue;
nliveatts++;
}
/*
* If column list includes all the columns of the table,
* set it to NULL.
*/
if (bms_num_members(cols) == nliveatts)
{
bms_free(cols);
cols = NULL;
}
}
ReleaseSysCache(cftuple);
}
}
if (first)
{
entry->columns = cols;
first = false;
}
else if (!bms_equal(entry->columns, cols))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
get_namespace_name(RelationGetNamespace(relation)),
RelationGetRelationName(relation)));
} /* loop all subscribed publications */
RelationClose(relation);
}
/*
* Initialize the slot for storing new and old tuples, and build the map that
* will be used to convert the relation's tuples into the ancestor's format.
*/
static void
init_tuple_slot(PGOutputData *data, Relation relation,
RelationSyncEntry *entry)
{
MemoryContext oldctx;
TupleDesc oldtupdesc;
TupleDesc newtupdesc;
oldctx = MemoryContextSwitchTo(data->cachectx);
/*
* Create tuple table slots. Create a copy of the TupleDesc as it needs to
* live as long as the cache remains.
*/
oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
MemoryContextSwitchTo(oldctx);
/*
* Cache the map that will be used to convert the relation's tuples into
* the ancestor's format, if needed.
*/
if (entry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
TupleDesc indesc = RelationGetDescr(relation);
TupleDesc outdesc = RelationGetDescr(ancestor);
/* Map must live as long as the session does. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
MemoryContextSwitchTo(oldctx);
RelationClose(ancestor);
}
}
/*
* Change is checked against the row filter if any.
*
* Returns true if the change is to be replicated, else false.
*
* For inserts, evaluate the row filter for new tuple.
* For deletes, evaluate the row filter for old tuple.
* For updates, evaluate the row filter for old and new tuple.
*
* For updates, if both evaluations are true, we allow sending the UPDATE and
* if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
* only one of the tuples matches the row filter expression, we transform
* UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
* following rules:
*
* Case 1: old-row (no match) new-row (no match) -> (drop change)
* Case 2: old-row (no match) new row (match) -> INSERT
* Case 3: old-row (match) new-row (no match) -> DELETE
* Case 4: old-row (match) new row (match) -> UPDATE
*
* The new action is updated in the action parameter.
*
* The new slot could be updated when transforming the UPDATE into INSERT,
* because the original new tuple might not have column values from the replica
* identity.
*
* Examples:
* Let's say the old tuple satisfies the row filter but the new tuple doesn't.
* Since the old tuple satisfies, the initial table synchronization copied this
* row (or another method was used to guarantee that there is data
* consistency). However, after the UPDATE the new tuple doesn't satisfy the
* row filter, so from a data consistency perspective, that row should be
* removed on the subscriber. The UPDATE should be transformed into a DELETE
* statement and be sent to the subscriber. Keeping this row on the subscriber
* is undesirable because it doesn't reflect what was defined in the row filter
* expression on the publisher. This row on the subscriber would likely not be
* modified by replication again. If someone inserted a new row with the same
* old identifier, replication could stop due to a constraint violation.
*
* Let's say the old tuple doesn't match the row filter but the new tuple does.
* Since the old tuple doesn't satisfy, the initial table synchronization
* probably didn't copy this row. However, after the UPDATE the new tuple does
* satisfy the row filter, so from a data consistency perspective, that row
* should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
* statements have no effect (it matches no row -- see
* apply_handle_update_internal()). So, the UPDATE should be transformed into a
* INSERT statement and be sent to the subscriber. However, this might surprise
* someone who expects the data set to satisfy the row filter expression on the
* provider.
*/
static bool
pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
ReorderBufferChangeType *action)
{
TupleDesc desc;
int i;
bool old_matched,
new_matched,
result;
TupleTableSlot *tmp_new_slot;
TupleTableSlot *new_slot = *new_slot_ptr;
ExprContext *ecxt;
ExprState *filter_exprstate;
/*
* We need this map to avoid relying on ReorderBufferChangeType enums
* having specific values.
*/
static const int map_changetype_pubaction[] = {
[REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
[REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
[REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
};
Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
*action == REORDER_BUFFER_CHANGE_UPDATE ||
*action == REORDER_BUFFER_CHANGE_DELETE);
Assert(new_slot || old_slot);
/* Get the corresponding row filter */
filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
/* Bail out if there is no row filter */
if (!filter_exprstate)
return true;
elog(DEBUG3, "table \"%s.%s\" has row filter",
get_namespace_name(RelationGetNamespace(relation)),
RelationGetRelationName(relation));
ResetPerTupleExprContext(entry->estate);
ecxt = GetPerTupleExprContext(entry->estate);
/*
* For the following occasions where there is only one tuple, we can
* evaluate the row filter for that tuple and return.
*
* For inserts, we only have the new tuple.
*
* For updates, we can have only a new tuple when none of the replica
* identity columns changed and none of those columns have external data
* but we still need to evaluate the row filter for the new tuple as the
* existing values of those columns might not match the filter. Also,
* users can use constant expressions in the row filter, so we anyway need
* to evaluate it for the new tuple.
*
* For deletes, we only have the old tuple.
*/
if (!new_slot || !old_slot)
{
ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
return result;
}
/*
* Both the old and new tuples must be valid only for updates and need to
* be checked against the row filter.
*/
Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
slot_getallattrs(new_slot);
slot_getallattrs(old_slot);
tmp_new_slot = NULL;
desc = RelationGetDescr(relation);
/*
* The new tuple might not have all the replica identity columns, in which
* case it needs to be copied over from the old tuple.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
/*
* if the column in the new tuple or old tuple is null, nothing to do
*/
if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
continue;
/*
* Unchanged toasted replica identity columns are only logged in the
* old tuple. Copy this over to the new tuple. The changed (or WAL
* Logged) toast values are always assembled in memory and set as
* VARTAG_INDIRECT. See ReorderBufferToastReplace.
*/
if (att->attlen == -1 &&
VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
{
if (!tmp_new_slot)
{
tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
ExecClearTuple(tmp_new_slot);
memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
desc->natts * sizeof(Datum));
memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
desc->natts * sizeof(bool));
}
tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
}
}
ecxt->ecxt_scantuple = old_slot;
old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
if (tmp_new_slot)
{
ExecStoreVirtualTuple(tmp_new_slot);
ecxt->ecxt_scantuple = tmp_new_slot;
}
else
ecxt->ecxt_scantuple = new_slot;
new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
/*
* Case 1: if both tuples don't match the row filter, bailout. Send
* nothing.
*/
if (!old_matched && !new_matched)
return false;
/*
* Case 2: if the old tuple doesn't satisfy the row filter but the new
* tuple does, transform the UPDATE into INSERT.
*
* Use the newly transformed tuple that must contain the column values for
* all the replica identity columns. This is required to ensure that the
* while inserting the tuple in the downstream node, we have all the
* required column values.
*/
if (!old_matched && new_matched)
{
*action = REORDER_BUFFER_CHANGE_INSERT;
if (tmp_new_slot)
*new_slot_ptr = tmp_new_slot;
}
/*
* Case 3: if the old tuple satisfies the row filter but the new tuple
* doesn't, transform the UPDATE into DELETE.
*
* This transformation does not require another tuple. The Old tuple will
* be used for DELETE.
*/
else if (old_matched && !new_matched)
*action = REORDER_BUFFER_CHANGE_DELETE;
/*
* Case 4: if both tuples match the row filter, transformation isn't
* required. (*action is default UPDATE).
*/
return true;
}
/*
* Sends the decoded DML over wire.
*
* This is called both in streaming and non-streaming modes.
*/
static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
Relation targetrel = relation;
ReorderBufferChangeType action = change->action;
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
if (!is_publishable_relation(relation))
return;
/*
* Remember the xid for the change in streaming mode. We need to send xid
* with each change in the streaming mode so that subscriber can make
* their association and on aborts, it can discard the corresponding
* changes.
*/
if (in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
/* First check the table filter */
switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
return;
break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate)
return;
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
return;
/*
* This is only possible if deletes are allowed even when replica
* identity is not defined for a table. Since the DELETE action
* can't be published, we simply return.
*/
if (!change->data.tp.oldtuple)
{
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
return;
}
break;
default:
Assert(false);
}
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
targetrel = ancestor;
}
if (change->data.tp.oldtuple)
{
old_slot = relentry->old_slot;
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
/* Convert tuple if needed. */
if (relentry->attrmap)
{
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
&TTSOpsVirtual);
old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
}
}
if (change->data.tp.newtuple)
{
new_slot = relentry->new_slot;
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
/* Convert tuple if needed. */
if (relentry->attrmap)
{
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
&TTSOpsVirtual);
new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
}
}
/*
* Check row filter.
*
* Updates could be transformed to inserts or deletes based on the results
* of the row filter for old and new tuple.
*/
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
goto cleanup;
/*
* Send BEGIN if we haven't yet.
*
* We send the BEGIN message after ensuring that we will actually send the
* change. This avoids sending a pair of BEGIN/COMMIT messages for empty
* transactions.
*/
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
/*
* Schema should be sent using the original relation because it also sends
* the ancestor's relation.
*/
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
/* Send the data */
switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
data->binary, relentry->columns);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
new_slot, data->binary, relentry->columns);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
data->binary, relentry->columns);
break;
default:
Assert(false);
}
OutputPluginWrite(ctx, true);
cleanup:
if (RelationIsValid(ancestor))
{
RelationClose(ancestor);
ancestor = NULL;
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
static void
pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
int i;
int nrelids;
Oid *relids;
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
relids = palloc0(nrelations * sizeof(Oid));
nrelids = 0;
for (i = 0; i < nrelations; i++)
{
Relation relation = relations[i];
Oid relid = RelationGetRelid(relation);
if (!is_publishable_relation(relation))
continue;
relentry = get_rel_sync_entry(data, relation);
if (!relentry->pubactions.pubtruncate)
continue;
/*
* Don't send partitions if the publication wants to send only the
* root tables through it.
*/
if (relation->rd_rel->relispartition &&
relentry->publish_as_relid != relid)
continue;
relids[nrelids++] = relid;
/* Send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry);
}
if (nrelids > 0)
{
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_truncate(ctx->out,
xid,
nrelids,
relids,
change->data.truncate.cascade,
change->data.truncate.restart_seqs);
OutputPluginWrite(ctx, true);
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
static void
pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
if (!data->messages)
return;
/*
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
if (in_streaming)
xid = txn->xid;
/*
* Output BEGIN if we haven't yet. Avoid for non-transactional messages.
*/
if (transactional)
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
/* Send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_message(ctx->out,
xid,
message_lsn,
transactional,
prefix,
sz,
message);
OutputPluginWrite(ctx, true);
}
/*
* Return true if the data is associated with an origin and the user has
* requested the changes that don't have an origin, false otherwise.
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
if (publish_no_origin && origin_id != InvalidRepOriginId)
return true;
return false;
}
/*
* Shutdown the output plugin.
*
* Note, we don't need to clean the data->context and data->cachectx as
* they are child contexts of the ctx->context so they will be cleaned up by
* logical decoding machinery.
*/
static void
pgoutput_shutdown(LogicalDecodingContext *ctx)
{
if (RelationSyncCache)
{
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
}
}
/*
* Load publications from the list of publication names.
*/
static List *
LoadPublications(List *pubnames)
{
List *result = NIL;
ListCell *lc;
foreach(lc, pubnames)
{
char *pubname = (char *) lfirst(lc);
Publication *pub = GetPublicationByName(pubname, false);
result = lappend(result, pub);
}
return result;
}
/*
* Publication syscache invalidation callback.
*
* Called for invalidations on pg_publication.
*/
static void
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
{
publications_valid = false;
/*
* Also invalidate per-relation cache so that next time the filtering info
* is checked it will be updated with the new publication settings.
*/
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
/*
* START STREAM callback
*/
static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
Assert(!in_streaming);
/*
* If we already sent the first stream for this transaction then don't
* send the origin id in the subsequent streams.
*/
if (rbtxn_is_streamed(txn))
send_replication_origin = false;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
send_replication_origin);
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
in_streaming = true;
}
/*
* STOP STREAM callback
*/
static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
/* we should be streaming a transaction */
Assert(in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
in_streaming = false;
}
/*
* Notify downstream to discard the streamed transaction (along with all
* it's subtransactions, if it's a toplevel transaction).
*/
static void
pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
/*
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
/* determine the toplevel transaction */
toptxn = rbtxn_get_toptxn(txn);
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
txn->xact_time.abort_time, write_abort_info);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);
}
/*
* Notify downstream to apply the streamed transaction (along with all
* it's subtransactions).
*/
static void
pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(txn->xid, true);
}
/*
* PREPARE callback (for streaming two-phase commit).
*
* Notify the downstream to prepare the transaction.
*/
static void
pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
}
/*
* Initialize the relation schema sync cache for a decoding session.
*
* The hash table is destroyed at the end of a decoding session. While
* relcache invalidations still exist and will still be invoked, they
* will just see the null hash table global and take no action.
*/
static void
init_rel_sync_cache(MemoryContext cachectx)
{
HASHCTL ctl;
static bool relation_callbacks_registered = false;
/* Nothing to do if hash table already exists */
if (RelationSyncCache != NULL)
return;
/* Make a new hash table for the cache */
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(RelationSyncEntry);
ctl.hcxt = cachectx;
RelationSyncCache = hash_create("logical replication output relation cache",
128, &ctl,
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
Assert(RelationSyncCache != NULL);
/* No more to do if we already registered callbacks */
if (relation_callbacks_registered)
return;
/* We must update the cache entry for a relation after a relcache flush */
CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
/*
* Flush all cache entries after a pg_namespace change, in case it was a
* schema rename affecting a relation being replicated.
*/
CacheRegisterSyscacheCallback(NAMESPACEOID,
rel_sync_cache_publication_cb,
(Datum) 0);
/*
* Flush all cache entries after any publication changes. (We need no
* callback entry for pg_publication, because publication_invalidation_cb
* will take care of it.)
*/
CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
rel_sync_cache_publication_cb,
(Datum) 0);
CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
rel_sync_cache_publication_cb,
(Datum) 0);
relation_callbacks_registered = true;
}
/*
* We expect relatively small number of streamed transactions.
*/
static bool
get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
{
return list_member_xid(entry->streamed_txns, xid);
}
/*
* Add the xid in the rel sync entry for which we have already sent the schema
* of the relation.
*/
static void
set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
{
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
MemoryContextSwitchTo(oldctx);
}
/*
* Find or create entry in the relation schema cache.
*
* This looks up publications that the given relation is directly or
* indirectly part of (the latter if it's really the relation's ancestor that
* is part of a publication) and fills up the found entry with the information
* about which operations to publish and whether to use an ancestor's schema
* when publishing.
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Relation relation)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
Oid relid = RelationGetRelid(relation);
Assert(RelationSyncCache != NULL);
/* Find cached relation info, creating if not found */
entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
&relid,
HASH_ENTER, &found);
Assert(entry != NULL);
/* initialize entry, if it's new */
if (!found)
{
entry->replicate_valid = false;
entry->schema_sent = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
entry->entry_cxt = NULL;
entry->publish_as_relid = InvalidOid;
entry->columns = NULL;
entry->attrmap = NULL;
}
/* Validate the entry */
if (!entry->replicate_valid)
{
Oid schemaId = get_rel_namespace(relid);
List *pubids = GetRelationPublications(relid);
/*
* We don't acquire a lock on the namespace system table as we build
* the cache entry using a historic snapshot and all the later changes
* are absorbed while decoding WAL.
*/
List *schemaPubids = GetSchemaPublications(schemaId);
ListCell *lc;
Oid publish_as_relid = relid;
int publish_ancestor_level = 0;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
List *rel_publications = NIL;
/* Reload publications if needed before use. */
if (!publications_valid)
{
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
if (data->publications)
{
list_free_deep(data->publications);
data->publications = NIL;
}
data->publications = LoadPublications(data->publication_names);
MemoryContextSwitchTo(oldctx);
publications_valid = true;
}
/*
* Reset schema_sent status as the relation definition may have
* changed. Also reset pubactions to empty in case rel was dropped
* from a publication. Also free any objects that depended on the
* earlier definition.
*/
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
bms_free(entry->columns);
entry->columns = NULL;
entry->pubactions.pubinsert = false;
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
/*
* Tuple slots cleanups. (Will be rebuilt later if needed).
*/
if (entry->old_slot)
ExecDropSingleTupleTableSlot(entry->old_slot);
if (entry->new_slot)
ExecDropSingleTupleTableSlot(entry->new_slot);
entry->old_slot = NULL;
entry->new_slot = NULL;
if (entry->attrmap)
free_attrmap(entry->attrmap);
entry->attrmap = NULL;
/*
* Row filter cache cleanups.
*/
if (entry->entry_cxt)
MemoryContextDelete(entry->entry_cxt);
entry->entry_cxt = NULL;
entry->estate = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
/*
* Build publication cache. We can't use one provided by relcache as
* relcache considers all publications that the given relation is in,
* but here we only need to consider ones that the subscriber
* requested.
*/
foreach(lc, data->publications)
{
Publication *pub = lfirst(lc);
bool publish = false;
/*
* Under what relid should we publish changes in this publication?
* We'll use the top-most relid across all publications. Also
* track the ancestor level for this publication.
*/
Oid pub_relid = relid;
int ancestor_level = 0;
/*
* If this is a FOR ALL TABLES publication, pick the partition
* root and set the ancestor level accordingly.
*/
if (pub->alltables)
{
publish = true;
if (pub->pubviaroot && am_partition)
{
List *ancestors = get_partition_ancestors(relid);
pub_relid = llast_oid(ancestors);
ancestor_level = list_length(ancestors);
}
}
if (!publish)
{
bool ancestor_published = false;
/*
* For a partition, check if any of the ancestors are
* published. If so, note down the topmost ancestor that is
* published via this publication, which will be used as the
* relation via which to publish the partition's changes.
*/
if (am_partition)
{
Oid ancestor;
int level;
List *ancestors = get_partition_ancestors(relid);
ancestor = GetTopMostAncestorInPublication(pub->oid,
ancestors,
&level);
if (ancestor != InvalidOid)
{
ancestor_published = true;
if (pub->pubviaroot)
{
pub_relid = ancestor;
ancestor_level = level;
}
}
}
if (list_member_oid(pubids, pub->oid) ||
list_member_oid(schemaPubids, pub->oid) ||
ancestor_published)
publish = true;
}
/*
* If the relation is to be published, determine actions to
* publish, and list of columns, if appropriate.
*
* Don't publish changes for partitioned tables, because
* publishing those of its partitions suffices, unless partition
* changes won't be published due to pubviaroot being set.
*/
if (publish &&
(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
{
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
/*
* We want to publish the changes as the top-most ancestor
* across all publications. So we need to check if the already
* calculated level is higher than the new one. If yes, we can
* ignore the new value (as it's a child). Otherwise the new
* value is an ancestor, so we keep it.
*/
if (publish_ancestor_level > ancestor_level)
continue;
/*
* If we found an ancestor higher up in the tree, discard the
* list of publications through which we replicate it, and use
* the new ancestor.
*/
if (publish_ancestor_level < ancestor_level)
{
publish_as_relid = pub_relid;
publish_ancestor_level = ancestor_level;
/* reset the publication list for this relation */
rel_publications = NIL;
}
else
{
/* Same ancestor level, has to be the same OID. */
Assert(publish_as_relid == pub_relid);
}
/* Track publications for this ancestor. */
rel_publications = lappend(rel_publications, pub);
}
}
entry->publish_as_relid = publish_as_relid;
/*
* Initialize the tuple slot, map, and row filter. These are only used
* when publishing inserts, updates, or deletes.
*/
if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
entry->pubactions.pubdelete)
{
/* Initialize the tuple slot and map */
init_tuple_slot(data, relation, entry);
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
/* Initialize the column list */
pgoutput_column_list_init(data, rel_publications, entry);
}
list_free(pubids);
list_free(schemaPubids);
list_free(rel_publications);
entry->replicate_valid = true;
}
return entry;
}
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
* When a streamed transaction commits or aborts, we need to remove the
* toplevel XID from the schema cache. If the transaction aborted, the
* subscriber will simply throw away the schema records we streamed, so
* we don't need to do anything else.
*
* If the transaction is committed, the subscriber will update the relation
* cache - so tweak the schema_sent flag accordingly.
*/
static void
cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
ListCell *lc;
Assert(RelationSyncCache != NULL);
hash_seq_init(&hash_seq, RelationSyncCache);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
/*
* We can set the schema_sent flag for an entry that has committed xid
* in the list as that ensures that the subscriber would have the
* corresponding schema and we don't need to send it unless there is
* any invalidation for that relation.
*/
foreach(lc, entry->streamed_txns)
{
if (xid == lfirst_xid(lc))
{
if (is_commit)
entry->schema_sent = true;
entry->streamed_txns =
foreach_delete_current(entry->streamed_txns, lc);
break;
}
}
}
}
/*
* Relcache invalidation callback
*/
static void
rel_sync_cache_relation_cb(Datum arg, Oid relid)
{
RelationSyncEntry *entry;
/*
* We can get here if the plugin was used in SQL interface as the
* RelSchemaSyncCache is destroyed when the decoding finishes, but there
* is no way to unregister the relcache invalidation callback.
*/
if (RelationSyncCache == NULL)
return;
/*
* Nobody keeps pointers to entries in this hash table around outside
* logical decoding callback calls - but invalidation events can come in
* *during* a callback if we do any syscache access in the callback.
* Because of that we must mark the cache entry as invalid but not damage
* any of its substructure here. The next get_rel_sync_entry() call will
* rebuild it all.
*/
if (OidIsValid(relid))
{
/*
* Getting invalidations for relations that aren't in the table is
* entirely normal. So we don't care if it's found or not.
*/
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
if (entry != NULL)
entry->replicate_valid = false;
}
else
{
/* Whole cache must be flushed. */
HASH_SEQ_STATUS status;
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
}
}
}
/*
* Publication relation/schema map syscache invalidation callback
*
* Called for invalidations on pg_publication, pg_publication_rel,
* pg_publication_namespace, and pg_namespace.
*/
static void
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
RelationSyncEntry *entry;
/*
* We can get here if the plugin was used in SQL interface as the
* RelSchemaSyncCache is destroyed when the decoding finishes, but there
* is no way to unregister the invalidation callbacks.
*/
if (RelationSyncCache == NULL)
return;
/*
* We have no easy way to identify which cache entries this invalidation
* event might have affected, so just mark them all invalid.
*/
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
}
}
/* Send Replication origin */
static void
send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
XLogRecPtr origin_lsn, bool send_origin)
{
if (send_origin)
{
char *origin;
/*----------
* XXX: which behaviour do we want here?
*
* Alternatives:
* - don't send origin message if origin name not found
* (that's what we do now)
* - throw error - that will break replication, not good
* - send some special "unknown" origin
*----------
*/
if (replorigin_by_oid(origin_id, true, &origin))
{
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_origin(ctx->out, origin, origin_lsn);
}
}
}