1265 lines
30 KiB
C
1265 lines
30 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* proto.c
|
|
* logical replication protocol functions
|
|
*
|
|
* Copyright (c) 2015-2023, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/logical/proto.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "access/sysattr.h"
|
|
#include "catalog/pg_namespace.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "replication/logicalproto.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/syscache.h"
|
|
|
|
/*
|
|
* Protocol message flags.
|
|
*/
|
|
#define LOGICALREP_IS_REPLICA_IDENTITY 1
|
|
|
|
#define MESSAGE_TRANSACTIONAL (1<<0)
|
|
#define TRUNCATE_CASCADE (1<<0)
|
|
#define TRUNCATE_RESTART_SEQS (1<<1)
|
|
|
|
static void logicalrep_write_attrs(StringInfo out, Relation rel,
|
|
Bitmapset *columns);
|
|
static void logicalrep_write_tuple(StringInfo out, Relation rel,
|
|
TupleTableSlot *slot,
|
|
bool binary, Bitmapset *columns);
|
|
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
|
|
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
|
|
|
|
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
|
|
static const char *logicalrep_read_namespace(StringInfo in);
|
|
|
|
/*
|
|
* Check if a column is covered by a column list.
|
|
*
|
|
* Need to be careful about NULL, which is treated as a column list covering
|
|
* all columns.
|
|
*/
|
|
static bool
|
|
column_in_column_list(int attnum, Bitmapset *columns)
|
|
{
|
|
return (columns == NULL || bms_is_member(attnum, columns));
|
|
}
|
|
|
|
|
|
/*
|
|
* Write BEGIN to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
|
|
|
|
/* fixed fields */
|
|
pq_sendint64(out, txn->final_lsn);
|
|
pq_sendint64(out, txn->xact_time.commit_time);
|
|
pq_sendint32(out, txn->xid);
|
|
}
|
|
|
|
/*
|
|
* Read transaction BEGIN from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
|
|
{
|
|
/* read fields */
|
|
begin_data->final_lsn = pq_getmsgint64(in);
|
|
if (begin_data->final_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "final_lsn not set in begin message");
|
|
begin_data->committime = pq_getmsgint64(in);
|
|
begin_data->xid = pq_getmsgint(in, 4);
|
|
}
|
|
|
|
|
|
/*
|
|
* Write COMMIT to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn)
|
|
{
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
|
|
|
|
/* send the flags field (unused for now) */
|
|
pq_sendbyte(out, flags);
|
|
|
|
/* send fields */
|
|
pq_sendint64(out, commit_lsn);
|
|
pq_sendint64(out, txn->end_lsn);
|
|
pq_sendint64(out, txn->xact_time.commit_time);
|
|
}
|
|
|
|
/*
|
|
* Read transaction COMMIT from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
|
|
{
|
|
/* read flags (unused for now) */
|
|
uint8 flags = pq_getmsgbyte(in);
|
|
|
|
if (flags != 0)
|
|
elog(ERROR, "unrecognized flags %u in commit message", flags);
|
|
|
|
/* read fields */
|
|
commit_data->commit_lsn = pq_getmsgint64(in);
|
|
commit_data->end_lsn = pq_getmsgint64(in);
|
|
commit_data->committime = pq_getmsgint64(in);
|
|
}
|
|
|
|
/*
|
|
* Write BEGIN PREPARE to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
|
|
|
|
/* fixed fields */
|
|
pq_sendint64(out, txn->final_lsn);
|
|
pq_sendint64(out, txn->end_lsn);
|
|
pq_sendint64(out, txn->xact_time.prepare_time);
|
|
pq_sendint32(out, txn->xid);
|
|
|
|
/* send gid */
|
|
pq_sendstring(out, txn->gid);
|
|
}
|
|
|
|
/*
|
|
* Read transaction BEGIN PREPARE from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
|
|
{
|
|
/* read fields */
|
|
begin_data->prepare_lsn = pq_getmsgint64(in);
|
|
if (begin_data->prepare_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "prepare_lsn not set in begin prepare message");
|
|
begin_data->end_lsn = pq_getmsgint64(in);
|
|
if (begin_data->end_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "end_lsn not set in begin prepare message");
|
|
begin_data->prepare_time = pq_getmsgint64(in);
|
|
begin_data->xid = pq_getmsgint(in, 4);
|
|
|
|
/* read gid (copy it into a pre-allocated buffer) */
|
|
strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
|
|
}
|
|
|
|
/*
|
|
* The core functionality for logicalrep_write_prepare and
|
|
* logicalrep_write_stream_prepare.
|
|
*/
|
|
static void
|
|
logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
|
|
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
|
|
{
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, type);
|
|
|
|
/*
|
|
* This should only ever happen for two-phase commit transactions, in
|
|
* which case we expect to have a valid GID.
|
|
*/
|
|
Assert(txn->gid != NULL);
|
|
Assert(rbtxn_prepared(txn));
|
|
Assert(TransactionIdIsValid(txn->xid));
|
|
|
|
/* send the flags field */
|
|
pq_sendbyte(out, flags);
|
|
|
|
/* send fields */
|
|
pq_sendint64(out, prepare_lsn);
|
|
pq_sendint64(out, txn->end_lsn);
|
|
pq_sendint64(out, txn->xact_time.prepare_time);
|
|
pq_sendint32(out, txn->xid);
|
|
|
|
/* send gid */
|
|
pq_sendstring(out, txn->gid);
|
|
}
|
|
|
|
/*
|
|
* Write PREPARE to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
|
|
XLogRecPtr prepare_lsn)
|
|
{
|
|
logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
|
|
txn, prepare_lsn);
|
|
}
|
|
|
|
/*
|
|
* The core functionality for logicalrep_read_prepare and
|
|
* logicalrep_read_stream_prepare.
|
|
*/
|
|
static void
|
|
logicalrep_read_prepare_common(StringInfo in, char *msgtype,
|
|
LogicalRepPreparedTxnData *prepare_data)
|
|
{
|
|
/* read flags */
|
|
uint8 flags = pq_getmsgbyte(in);
|
|
|
|
if (flags != 0)
|
|
elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
|
|
|
|
/* read fields */
|
|
prepare_data->prepare_lsn = pq_getmsgint64(in);
|
|
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
|
|
prepare_data->end_lsn = pq_getmsgint64(in);
|
|
if (prepare_data->end_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "end_lsn is not set in %s message", msgtype);
|
|
prepare_data->prepare_time = pq_getmsgint64(in);
|
|
prepare_data->xid = pq_getmsgint(in, 4);
|
|
if (prepare_data->xid == InvalidTransactionId)
|
|
elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
|
|
|
|
/* read gid (copy it into a pre-allocated buffer) */
|
|
strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
|
|
}
|
|
|
|
/*
|
|
* Read transaction PREPARE from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
|
|
{
|
|
logicalrep_read_prepare_common(in, "prepare", prepare_data);
|
|
}
|
|
|
|
/*
|
|
* Write COMMIT PREPARED to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn)
|
|
{
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
|
|
|
|
/*
|
|
* This should only ever happen for two-phase commit transactions, in
|
|
* which case we expect to have a valid GID.
|
|
*/
|
|
Assert(txn->gid != NULL);
|
|
|
|
/* send the flags field */
|
|
pq_sendbyte(out, flags);
|
|
|
|
/* send fields */
|
|
pq_sendint64(out, commit_lsn);
|
|
pq_sendint64(out, txn->end_lsn);
|
|
pq_sendint64(out, txn->xact_time.commit_time);
|
|
pq_sendint32(out, txn->xid);
|
|
|
|
/* send gid */
|
|
pq_sendstring(out, txn->gid);
|
|
}
|
|
|
|
/*
|
|
* Read transaction COMMIT PREPARED from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
|
|
{
|
|
/* read flags */
|
|
uint8 flags = pq_getmsgbyte(in);
|
|
|
|
if (flags != 0)
|
|
elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
|
|
|
|
/* read fields */
|
|
prepare_data->commit_lsn = pq_getmsgint64(in);
|
|
if (prepare_data->commit_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "commit_lsn is not set in commit prepared message");
|
|
prepare_data->end_lsn = pq_getmsgint64(in);
|
|
if (prepare_data->end_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "end_lsn is not set in commit prepared message");
|
|
prepare_data->commit_time = pq_getmsgint64(in);
|
|
prepare_data->xid = pq_getmsgint(in, 4);
|
|
|
|
/* read gid (copy it into a pre-allocated buffer) */
|
|
strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
|
|
}
|
|
|
|
/*
|
|
* Write ROLLBACK PREPARED to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
|
|
XLogRecPtr prepare_end_lsn,
|
|
TimestampTz prepare_time)
|
|
{
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
|
|
|
|
/*
|
|
* This should only ever happen for two-phase commit transactions, in
|
|
* which case we expect to have a valid GID.
|
|
*/
|
|
Assert(txn->gid != NULL);
|
|
|
|
/* send the flags field */
|
|
pq_sendbyte(out, flags);
|
|
|
|
/* send fields */
|
|
pq_sendint64(out, prepare_end_lsn);
|
|
pq_sendint64(out, txn->end_lsn);
|
|
pq_sendint64(out, prepare_time);
|
|
pq_sendint64(out, txn->xact_time.commit_time);
|
|
pq_sendint32(out, txn->xid);
|
|
|
|
/* send gid */
|
|
pq_sendstring(out, txn->gid);
|
|
}
|
|
|
|
/*
|
|
* Read transaction ROLLBACK PREPARED from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_rollback_prepared(StringInfo in,
|
|
LogicalRepRollbackPreparedTxnData *rollback_data)
|
|
{
|
|
/* read flags */
|
|
uint8 flags = pq_getmsgbyte(in);
|
|
|
|
if (flags != 0)
|
|
elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
|
|
|
|
/* read fields */
|
|
rollback_data->prepare_end_lsn = pq_getmsgint64(in);
|
|
if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
|
|
rollback_data->rollback_end_lsn = pq_getmsgint64(in);
|
|
if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
|
|
elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
|
|
rollback_data->prepare_time = pq_getmsgint64(in);
|
|
rollback_data->rollback_time = pq_getmsgint64(in);
|
|
rollback_data->xid = pq_getmsgint(in, 4);
|
|
|
|
/* read gid (copy it into a pre-allocated buffer) */
|
|
strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
|
|
}
|
|
|
|
/*
|
|
* Write STREAM PREPARE to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_stream_prepare(StringInfo out,
|
|
ReorderBufferTXN *txn,
|
|
XLogRecPtr prepare_lsn)
|
|
{
|
|
logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
|
|
txn, prepare_lsn);
|
|
}
|
|
|
|
/*
|
|
* Read STREAM PREPARE from the stream.
|
|
*/
|
|
void
|
|
logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
|
|
{
|
|
logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
|
|
}
|
|
|
|
/*
|
|
* Write ORIGIN to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_origin(StringInfo out, const char *origin,
|
|
XLogRecPtr origin_lsn)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
|
|
|
|
/* fixed fields */
|
|
pq_sendint64(out, origin_lsn);
|
|
|
|
/* origin string */
|
|
pq_sendstring(out, origin);
|
|
}
|
|
|
|
/*
|
|
* Read ORIGIN from the output stream.
|
|
*/
|
|
char *
|
|
logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
|
|
{
|
|
/* fixed fields */
|
|
*origin_lsn = pq_getmsgint64(in);
|
|
|
|
/* return origin */
|
|
return pstrdup(pq_getmsgstring(in));
|
|
}
|
|
|
|
/*
|
|
* Write INSERT to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
|
|
TupleTableSlot *newslot, bool binary, Bitmapset *columns)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
/* use Oid as relation identifier */
|
|
pq_sendint32(out, RelationGetRelid(rel));
|
|
|
|
pq_sendbyte(out, 'N'); /* new tuple follows */
|
|
logicalrep_write_tuple(out, rel, newslot, binary, columns);
|
|
}
|
|
|
|
/*
|
|
* Read INSERT from stream.
|
|
*
|
|
* Fills the new tuple.
|
|
*/
|
|
LogicalRepRelId
|
|
logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
|
|
{
|
|
char action;
|
|
LogicalRepRelId relid;
|
|
|
|
/* read the relation id */
|
|
relid = pq_getmsgint(in, 4);
|
|
|
|
action = pq_getmsgbyte(in);
|
|
if (action != 'N')
|
|
elog(ERROR, "expected new tuple but got %d",
|
|
action);
|
|
|
|
logicalrep_read_tuple(in, newtup);
|
|
|
|
return relid;
|
|
}
|
|
|
|
/*
|
|
* Write UPDATE to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
|
|
TupleTableSlot *oldslot, TupleTableSlot *newslot,
|
|
bool binary, Bitmapset *columns)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
|
|
|
|
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
|
|
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
|
|
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
/* use Oid as relation identifier */
|
|
pq_sendint32(out, RelationGetRelid(rel));
|
|
|
|
if (oldslot != NULL)
|
|
{
|
|
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
|
|
pq_sendbyte(out, 'O'); /* old tuple follows */
|
|
else
|
|
pq_sendbyte(out, 'K'); /* old key follows */
|
|
logicalrep_write_tuple(out, rel, oldslot, binary, columns);
|
|
}
|
|
|
|
pq_sendbyte(out, 'N'); /* new tuple follows */
|
|
logicalrep_write_tuple(out, rel, newslot, binary, columns);
|
|
}
|
|
|
|
/*
|
|
* Read UPDATE from stream.
|
|
*/
|
|
LogicalRepRelId
|
|
logicalrep_read_update(StringInfo in, bool *has_oldtuple,
|
|
LogicalRepTupleData *oldtup,
|
|
LogicalRepTupleData *newtup)
|
|
{
|
|
char action;
|
|
LogicalRepRelId relid;
|
|
|
|
/* read the relation id */
|
|
relid = pq_getmsgint(in, 4);
|
|
|
|
/* read and verify action */
|
|
action = pq_getmsgbyte(in);
|
|
if (action != 'K' && action != 'O' && action != 'N')
|
|
elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
|
|
action);
|
|
|
|
/* check for old tuple */
|
|
if (action == 'K' || action == 'O')
|
|
{
|
|
logicalrep_read_tuple(in, oldtup);
|
|
*has_oldtuple = true;
|
|
|
|
action = pq_getmsgbyte(in);
|
|
}
|
|
else
|
|
*has_oldtuple = false;
|
|
|
|
/* check for new tuple */
|
|
if (action != 'N')
|
|
elog(ERROR, "expected action 'N', got %c",
|
|
action);
|
|
|
|
logicalrep_read_tuple(in, newtup);
|
|
|
|
return relid;
|
|
}
|
|
|
|
/*
|
|
* Write DELETE to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
|
|
TupleTableSlot *oldslot, bool binary,
|
|
Bitmapset *columns)
|
|
{
|
|
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
|
|
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
|
|
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
/* use Oid as relation identifier */
|
|
pq_sendint32(out, RelationGetRelid(rel));
|
|
|
|
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
|
|
pq_sendbyte(out, 'O'); /* old tuple follows */
|
|
else
|
|
pq_sendbyte(out, 'K'); /* old key follows */
|
|
|
|
logicalrep_write_tuple(out, rel, oldslot, binary, columns);
|
|
}
|
|
|
|
/*
|
|
* Read DELETE from stream.
|
|
*
|
|
* Fills the old tuple.
|
|
*/
|
|
LogicalRepRelId
|
|
logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
|
|
{
|
|
char action;
|
|
LogicalRepRelId relid;
|
|
|
|
/* read the relation id */
|
|
relid = pq_getmsgint(in, 4);
|
|
|
|
/* read and verify action */
|
|
action = pq_getmsgbyte(in);
|
|
if (action != 'K' && action != 'O')
|
|
elog(ERROR, "expected action 'O' or 'K', got %c", action);
|
|
|
|
logicalrep_read_tuple(in, oldtup);
|
|
|
|
return relid;
|
|
}
|
|
|
|
/*
|
|
* Write TRUNCATE to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_truncate(StringInfo out,
|
|
TransactionId xid,
|
|
int nrelids,
|
|
Oid relids[],
|
|
bool cascade, bool restart_seqs)
|
|
{
|
|
int i;
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
pq_sendint32(out, nrelids);
|
|
|
|
/* encode and send truncate flags */
|
|
if (cascade)
|
|
flags |= TRUNCATE_CASCADE;
|
|
if (restart_seqs)
|
|
flags |= TRUNCATE_RESTART_SEQS;
|
|
pq_sendint8(out, flags);
|
|
|
|
for (i = 0; i < nrelids; i++)
|
|
pq_sendint32(out, relids[i]);
|
|
}
|
|
|
|
/*
|
|
* Read TRUNCATE from stream.
|
|
*/
|
|
List *
|
|
logicalrep_read_truncate(StringInfo in,
|
|
bool *cascade, bool *restart_seqs)
|
|
{
|
|
int i;
|
|
int nrelids;
|
|
List *relids = NIL;
|
|
uint8 flags;
|
|
|
|
nrelids = pq_getmsgint(in, 4);
|
|
|
|
/* read and decode truncate flags */
|
|
flags = pq_getmsgint(in, 1);
|
|
*cascade = (flags & TRUNCATE_CASCADE) > 0;
|
|
*restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
|
|
|
|
for (i = 0; i < nrelids; i++)
|
|
relids = lappend_oid(relids, pq_getmsgint(in, 4));
|
|
|
|
return relids;
|
|
}
|
|
|
|
/*
|
|
* Write MESSAGE to stream
|
|
*/
|
|
void
|
|
logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
|
|
bool transactional, const char *prefix, Size sz,
|
|
const char *message)
|
|
{
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
|
|
|
|
/* encode and send message flags */
|
|
if (transactional)
|
|
flags |= MESSAGE_TRANSACTIONAL;
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
pq_sendint8(out, flags);
|
|
pq_sendint64(out, lsn);
|
|
pq_sendstring(out, prefix);
|
|
pq_sendint32(out, sz);
|
|
pq_sendbytes(out, message, sz);
|
|
}
|
|
|
|
/*
|
|
* Write relation description to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
|
|
Bitmapset *columns)
|
|
{
|
|
char *relname;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
/* use Oid as relation identifier */
|
|
pq_sendint32(out, RelationGetRelid(rel));
|
|
|
|
/* send qualified relation name */
|
|
logicalrep_write_namespace(out, RelationGetNamespace(rel));
|
|
relname = RelationGetRelationName(rel);
|
|
pq_sendstring(out, relname);
|
|
|
|
/* send replica identity */
|
|
pq_sendbyte(out, rel->rd_rel->relreplident);
|
|
|
|
/* send the attribute info */
|
|
logicalrep_write_attrs(out, rel, columns);
|
|
}
|
|
|
|
/*
|
|
* Read the relation info from stream and return as LogicalRepRelation.
|
|
*/
|
|
LogicalRepRelation *
|
|
logicalrep_read_rel(StringInfo in)
|
|
{
|
|
LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
|
|
|
|
rel->remoteid = pq_getmsgint(in, 4);
|
|
|
|
/* Read relation name from stream */
|
|
rel->nspname = pstrdup(logicalrep_read_namespace(in));
|
|
rel->relname = pstrdup(pq_getmsgstring(in));
|
|
|
|
/* Read the replica identity. */
|
|
rel->replident = pq_getmsgbyte(in);
|
|
|
|
/* Get attribute description */
|
|
logicalrep_read_attrs(in, rel);
|
|
|
|
return rel;
|
|
}
|
|
|
|
/*
|
|
* Write type info to the output stream.
|
|
*
|
|
* This function will always write base type info.
|
|
*/
|
|
void
|
|
logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
|
|
{
|
|
Oid basetypoid = getBaseType(typoid);
|
|
HeapTuple tup;
|
|
Form_pg_type typtup;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
|
|
|
|
/* transaction ID (if not valid, we're not streaming) */
|
|
if (TransactionIdIsValid(xid))
|
|
pq_sendint32(out, xid);
|
|
|
|
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "cache lookup failed for type %u", basetypoid);
|
|
typtup = (Form_pg_type) GETSTRUCT(tup);
|
|
|
|
/* use Oid as relation identifier */
|
|
pq_sendint32(out, typoid);
|
|
|
|
/* send qualified type name */
|
|
logicalrep_write_namespace(out, typtup->typnamespace);
|
|
pq_sendstring(out, NameStr(typtup->typname));
|
|
|
|
ReleaseSysCache(tup);
|
|
}
|
|
|
|
/*
|
|
* Read type info from the output stream.
|
|
*/
|
|
void
|
|
logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
|
|
{
|
|
ltyp->remoteid = pq_getmsgint(in, 4);
|
|
|
|
/* Read type name from stream */
|
|
ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
|
|
ltyp->typname = pstrdup(pq_getmsgstring(in));
|
|
}
|
|
|
|
/*
|
|
* Write a tuple to the outputstream, in the most efficient format possible.
|
|
*/
|
|
static void
|
|
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
|
|
bool binary, Bitmapset *columns)
|
|
{
|
|
TupleDesc desc;
|
|
Datum *values;
|
|
bool *isnull;
|
|
int i;
|
|
uint16 nliveatts = 0;
|
|
|
|
desc = RelationGetDescr(rel);
|
|
|
|
for (i = 0; i < desc->natts; i++)
|
|
{
|
|
Form_pg_attribute att = TupleDescAttr(desc, i);
|
|
|
|
if (att->attisdropped || att->attgenerated)
|
|
continue;
|
|
|
|
if (!column_in_column_list(att->attnum, columns))
|
|
continue;
|
|
|
|
nliveatts++;
|
|
}
|
|
pq_sendint16(out, nliveatts);
|
|
|
|
slot_getallattrs(slot);
|
|
values = slot->tts_values;
|
|
isnull = slot->tts_isnull;
|
|
|
|
/* Write the values */
|
|
for (i = 0; i < desc->natts; i++)
|
|
{
|
|
HeapTuple typtup;
|
|
Form_pg_type typclass;
|
|
Form_pg_attribute att = TupleDescAttr(desc, i);
|
|
|
|
if (att->attisdropped || att->attgenerated)
|
|
continue;
|
|
|
|
if (!column_in_column_list(att->attnum, columns))
|
|
continue;
|
|
|
|
if (isnull[i])
|
|
{
|
|
pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
|
|
continue;
|
|
}
|
|
|
|
if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
|
|
{
|
|
/*
|
|
* Unchanged toasted datum. (Note that we don't promise to detect
|
|
* unchanged data in general; this is just a cheap check to avoid
|
|
* sending large values unnecessarily.)
|
|
*/
|
|
pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
|
|
continue;
|
|
}
|
|
|
|
typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
|
|
if (!HeapTupleIsValid(typtup))
|
|
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
|
|
typclass = (Form_pg_type) GETSTRUCT(typtup);
|
|
|
|
/*
|
|
* Send in binary if requested and type has suitable send function.
|
|
*/
|
|
if (binary && OidIsValid(typclass->typsend))
|
|
{
|
|
bytea *outputbytes;
|
|
int len;
|
|
|
|
pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
|
|
outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
|
|
len = VARSIZE(outputbytes) - VARHDRSZ;
|
|
pq_sendint(out, len, 4); /* length */
|
|
pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
|
|
pfree(outputbytes);
|
|
}
|
|
else
|
|
{
|
|
char *outputstr;
|
|
|
|
pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
|
|
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
|
|
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
|
|
pfree(outputstr);
|
|
}
|
|
|
|
ReleaseSysCache(typtup);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Read tuple in logical replication format from stream.
|
|
*/
|
|
static void
|
|
logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
|
|
{
|
|
int i;
|
|
int natts;
|
|
|
|
/* Get number of attributes */
|
|
natts = pq_getmsgint(in, 2);
|
|
|
|
/* Allocate space for per-column values; zero out unused StringInfoDatas */
|
|
tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
|
|
tuple->colstatus = (char *) palloc(natts * sizeof(char));
|
|
tuple->ncols = natts;
|
|
|
|
/* Read the data */
|
|
for (i = 0; i < natts; i++)
|
|
{
|
|
char kind;
|
|
int len;
|
|
StringInfo value = &tuple->colvalues[i];
|
|
|
|
kind = pq_getmsgbyte(in);
|
|
tuple->colstatus[i] = kind;
|
|
|
|
switch (kind)
|
|
{
|
|
case LOGICALREP_COLUMN_NULL:
|
|
/* nothing more to do */
|
|
break;
|
|
case LOGICALREP_COLUMN_UNCHANGED:
|
|
/* we don't receive the value of an unchanged column */
|
|
break;
|
|
case LOGICALREP_COLUMN_TEXT:
|
|
case LOGICALREP_COLUMN_BINARY:
|
|
len = pq_getmsgint(in, 4); /* read length */
|
|
|
|
/* and data */
|
|
value->data = palloc(len + 1);
|
|
pq_copymsgbytes(in, value->data, len);
|
|
|
|
/*
|
|
* Not strictly necessary for LOGICALREP_COLUMN_BINARY, but
|
|
* per StringInfo practice.
|
|
*/
|
|
value->data[len] = '\0';
|
|
|
|
/* make StringInfo fully valid */
|
|
value->len = len;
|
|
value->cursor = 0;
|
|
value->maxlen = len;
|
|
break;
|
|
default:
|
|
elog(ERROR, "unrecognized data representation type '%c'", kind);
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Write relation attribute metadata to the stream.
|
|
*/
|
|
static void
|
|
logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
|
|
{
|
|
TupleDesc desc;
|
|
int i;
|
|
uint16 nliveatts = 0;
|
|
Bitmapset *idattrs = NULL;
|
|
bool replidentfull;
|
|
|
|
desc = RelationGetDescr(rel);
|
|
|
|
/* send number of live attributes */
|
|
for (i = 0; i < desc->natts; i++)
|
|
{
|
|
Form_pg_attribute att = TupleDescAttr(desc, i);
|
|
|
|
if (att->attisdropped || att->attgenerated)
|
|
continue;
|
|
|
|
if (!column_in_column_list(att->attnum, columns))
|
|
continue;
|
|
|
|
nliveatts++;
|
|
}
|
|
pq_sendint16(out, nliveatts);
|
|
|
|
/* fetch bitmap of REPLICATION IDENTITY attributes */
|
|
replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
|
|
if (!replidentfull)
|
|
idattrs = RelationGetIdentityKeyBitmap(rel);
|
|
|
|
/* send the attributes */
|
|
for (i = 0; i < desc->natts; i++)
|
|
{
|
|
Form_pg_attribute att = TupleDescAttr(desc, i);
|
|
uint8 flags = 0;
|
|
|
|
if (att->attisdropped || att->attgenerated)
|
|
continue;
|
|
|
|
if (!column_in_column_list(att->attnum, columns))
|
|
continue;
|
|
|
|
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
|
|
if (replidentfull ||
|
|
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
|
|
idattrs))
|
|
flags |= LOGICALREP_IS_REPLICA_IDENTITY;
|
|
|
|
pq_sendbyte(out, flags);
|
|
|
|
/* attribute name */
|
|
pq_sendstring(out, NameStr(att->attname));
|
|
|
|
/* attribute type id */
|
|
pq_sendint32(out, (int) att->atttypid);
|
|
|
|
/* attribute mode */
|
|
pq_sendint32(out, att->atttypmod);
|
|
}
|
|
|
|
bms_free(idattrs);
|
|
}
|
|
|
|
/*
|
|
* Read relation attribute metadata from the stream.
|
|
*/
|
|
static void
|
|
logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
|
|
{
|
|
int i;
|
|
int natts;
|
|
char **attnames;
|
|
Oid *atttyps;
|
|
Bitmapset *attkeys = NULL;
|
|
|
|
natts = pq_getmsgint(in, 2);
|
|
attnames = palloc(natts * sizeof(char *));
|
|
atttyps = palloc(natts * sizeof(Oid));
|
|
|
|
/* read the attributes */
|
|
for (i = 0; i < natts; i++)
|
|
{
|
|
uint8 flags;
|
|
|
|
/* Check for replica identity column */
|
|
flags = pq_getmsgbyte(in);
|
|
if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
|
|
attkeys = bms_add_member(attkeys, i);
|
|
|
|
/* attribute name */
|
|
attnames[i] = pstrdup(pq_getmsgstring(in));
|
|
|
|
/* attribute type id */
|
|
atttyps[i] = (Oid) pq_getmsgint(in, 4);
|
|
|
|
/* we ignore attribute mode for now */
|
|
(void) pq_getmsgint(in, 4);
|
|
}
|
|
|
|
rel->attnames = attnames;
|
|
rel->atttyps = atttyps;
|
|
rel->attkeys = attkeys;
|
|
rel->natts = natts;
|
|
}
|
|
|
|
/*
|
|
* Write the namespace name or empty string for pg_catalog (to save space).
|
|
*/
|
|
static void
|
|
logicalrep_write_namespace(StringInfo out, Oid nspid)
|
|
{
|
|
if (nspid == PG_CATALOG_NAMESPACE)
|
|
pq_sendbyte(out, '\0');
|
|
else
|
|
{
|
|
char *nspname = get_namespace_name(nspid);
|
|
|
|
if (nspname == NULL)
|
|
elog(ERROR, "cache lookup failed for namespace %u",
|
|
nspid);
|
|
|
|
pq_sendstring(out, nspname);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Read the namespace name while treating empty string as pg_catalog.
|
|
*/
|
|
static const char *
|
|
logicalrep_read_namespace(StringInfo in)
|
|
{
|
|
const char *nspname = pq_getmsgstring(in);
|
|
|
|
if (nspname[0] == '\0')
|
|
nspname = "pg_catalog";
|
|
|
|
return nspname;
|
|
}
|
|
|
|
/*
|
|
* Write the information for the start stream message to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_stream_start(StringInfo out,
|
|
TransactionId xid, bool first_segment)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
|
|
|
|
Assert(TransactionIdIsValid(xid));
|
|
|
|
/* transaction ID (we're starting to stream, so must be valid) */
|
|
pq_sendint32(out, xid);
|
|
|
|
/* 1 if this is the first streaming segment for this xid */
|
|
pq_sendbyte(out, first_segment ? 1 : 0);
|
|
}
|
|
|
|
/*
|
|
* Read the information about the start stream message from output stream.
|
|
*/
|
|
TransactionId
|
|
logicalrep_read_stream_start(StringInfo in, bool *first_segment)
|
|
{
|
|
TransactionId xid;
|
|
|
|
Assert(first_segment);
|
|
|
|
xid = pq_getmsgint(in, 4);
|
|
*first_segment = (pq_getmsgbyte(in) == 1);
|
|
|
|
return xid;
|
|
}
|
|
|
|
/*
|
|
* Write the stop stream message to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_stream_stop(StringInfo out)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
|
|
}
|
|
|
|
/*
|
|
* Write STREAM COMMIT to the output stream.
|
|
*/
|
|
void
|
|
logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn)
|
|
{
|
|
uint8 flags = 0;
|
|
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
|
|
|
|
Assert(TransactionIdIsValid(txn->xid));
|
|
|
|
/* transaction ID */
|
|
pq_sendint32(out, txn->xid);
|
|
|
|
/* send the flags field (unused for now) */
|
|
pq_sendbyte(out, flags);
|
|
|
|
/* send fields */
|
|
pq_sendint64(out, commit_lsn);
|
|
pq_sendint64(out, txn->end_lsn);
|
|
pq_sendint64(out, txn->xact_time.commit_time);
|
|
}
|
|
|
|
/*
|
|
* Read STREAM COMMIT from the output stream.
|
|
*/
|
|
TransactionId
|
|
logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
|
|
{
|
|
TransactionId xid;
|
|
uint8 flags;
|
|
|
|
xid = pq_getmsgint(in, 4);
|
|
|
|
/* read flags (unused for now) */
|
|
flags = pq_getmsgbyte(in);
|
|
|
|
if (flags != 0)
|
|
elog(ERROR, "unrecognized flags %u in commit message", flags);
|
|
|
|
/* read fields */
|
|
commit_data->commit_lsn = pq_getmsgint64(in);
|
|
commit_data->end_lsn = pq_getmsgint64(in);
|
|
commit_data->committime = pq_getmsgint64(in);
|
|
|
|
return xid;
|
|
}
|
|
|
|
/*
|
|
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
|
|
* same for the top-level transaction abort.
|
|
*
|
|
* If write_abort_info is true, send the abort_lsn and abort_time fields,
|
|
* otherwise don't.
|
|
*/
|
|
void
|
|
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
|
|
TransactionId subxid, XLogRecPtr abort_lsn,
|
|
TimestampTz abort_time, bool write_abort_info)
|
|
{
|
|
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
|
|
|
|
Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
|
|
|
|
/* transaction ID */
|
|
pq_sendint32(out, xid);
|
|
pq_sendint32(out, subxid);
|
|
|
|
if (write_abort_info)
|
|
{
|
|
pq_sendint64(out, abort_lsn);
|
|
pq_sendint64(out, abort_time);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Read STREAM ABORT from the output stream.
|
|
*
|
|
* If read_abort_info is true, read the abort_lsn and abort_time fields,
|
|
* otherwise don't.
|
|
*/
|
|
void
|
|
logicalrep_read_stream_abort(StringInfo in,
|
|
LogicalRepStreamAbortData *abort_data,
|
|
bool read_abort_info)
|
|
{
|
|
Assert(abort_data);
|
|
|
|
abort_data->xid = pq_getmsgint(in, 4);
|
|
abort_data->subxid = pq_getmsgint(in, 4);
|
|
|
|
if (read_abort_info)
|
|
{
|
|
abort_data->abort_lsn = pq_getmsgint64(in);
|
|
abort_data->abort_time = pq_getmsgint64(in);
|
|
}
|
|
else
|
|
{
|
|
abort_data->abort_lsn = InvalidXLogRecPtr;
|
|
abort_data->abort_time = 0;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Get string representing LogicalRepMsgType.
|
|
*/
|
|
char *
|
|
logicalrep_message_type(LogicalRepMsgType action)
|
|
{
|
|
switch (action)
|
|
{
|
|
case LOGICAL_REP_MSG_BEGIN:
|
|
return "BEGIN";
|
|
case LOGICAL_REP_MSG_COMMIT:
|
|
return "COMMIT";
|
|
case LOGICAL_REP_MSG_ORIGIN:
|
|
return "ORIGIN";
|
|
case LOGICAL_REP_MSG_INSERT:
|
|
return "INSERT";
|
|
case LOGICAL_REP_MSG_UPDATE:
|
|
return "UPDATE";
|
|
case LOGICAL_REP_MSG_DELETE:
|
|
return "DELETE";
|
|
case LOGICAL_REP_MSG_TRUNCATE:
|
|
return "TRUNCATE";
|
|
case LOGICAL_REP_MSG_RELATION:
|
|
return "RELATION";
|
|
case LOGICAL_REP_MSG_TYPE:
|
|
return "TYPE";
|
|
case LOGICAL_REP_MSG_MESSAGE:
|
|
return "MESSAGE";
|
|
case LOGICAL_REP_MSG_BEGIN_PREPARE:
|
|
return "BEGIN PREPARE";
|
|
case LOGICAL_REP_MSG_PREPARE:
|
|
return "PREPARE";
|
|
case LOGICAL_REP_MSG_COMMIT_PREPARED:
|
|
return "COMMIT PREPARED";
|
|
case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
|
|
return "ROLLBACK PREPARED";
|
|
case LOGICAL_REP_MSG_STREAM_START:
|
|
return "STREAM START";
|
|
case LOGICAL_REP_MSG_STREAM_STOP:
|
|
return "STREAM STOP";
|
|
case LOGICAL_REP_MSG_STREAM_COMMIT:
|
|
return "STREAM COMMIT";
|
|
case LOGICAL_REP_MSG_STREAM_ABORT:
|
|
return "STREAM ABORT";
|
|
case LOGICAL_REP_MSG_STREAM_PREPARE:
|
|
return "STREAM PREPARE";
|
|
}
|
|
|
|
elog(ERROR, "invalid logical replication message type \"%c\"", action);
|
|
|
|
return NULL; /* keep compiler quiet */
|
|
}
|