Make logical decoding a part of the rmgr.

Add a new rmgr method, rm_decode, and use that rather than a switch
statement.

In preparation for rmgr extensibility.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
Discussion: https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud
This commit is contained in:
Jeff Davis 2022-01-19 14:58:04 -08:00
parent a3d6264bbc
commit 7a5f6b4748
8 changed files with 69 additions and 112 deletions

View File

@ -24,14 +24,15 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
#include "replication/decode.h"
#include "replication/message.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
/* must be kept in sync with RmgrData definition in xlog_internal.h */
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
{ name, redo, desc, identify, startup, cleanup, mask },
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
{ name, redo, desc, identify, startup, cleanup, mask, decode },
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"

View File

@ -43,21 +43,6 @@
#include "replication/snapbuild.h"
#include "storage/standby.h"
typedef struct XLogRecordBuffer
{
XLogRecPtr origptr;
XLogRecPtr endptr;
XLogReaderState *record;
} XLogRecordBuffer;
/* RMGR Handlers */
static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{
XLogRecordBuffer buf;
TransactionId txid;
RmgrId rmid;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
buf.origptr);
}
/* cast so we get a warning when new rmgrs are added */
switch ((RmgrId) XLogRecGetRmid(record))
rmid = XLogRecGetRmid(record);
if (RmgrTable[rmid].rm_decode != NULL)
RmgrTable[rmid].rm_decode(ctx, &buf);
else
{
/*
* Rmgrs we care about for logical decoding. Add new rmgrs in
* rmgrlist.h's order.
*/
case RM_XLOG_ID:
DecodeXLogOp(ctx, &buf);
break;
case RM_XACT_ID:
DecodeXactOp(ctx, &buf);
break;
case RM_STANDBY_ID:
DecodeStandbyOp(ctx, &buf);
break;
case RM_HEAP2_ID:
DecodeHeap2Op(ctx, &buf);
break;
case RM_HEAP_ID:
DecodeHeapOp(ctx, &buf);
break;
case RM_LOGICALMSG_ID:
DecodeLogicalMsgOp(ctx, &buf);
break;
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
* order.
*/
case RM_SMGR_ID:
case RM_CLOG_ID:
case RM_DBASE_ID:
case RM_TBLSPC_ID:
case RM_MULTIXACT_ID:
case RM_RELMAP_ID:
case RM_BTREE_ID:
case RM_HASH_ID:
case RM_GIN_ID:
case RM_GIST_ID:
case RM_SEQ_ID:
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
case RM_REPLORIGIN_ID:
case RM_GENERIC_ID:
/* just deal with xid, and done */
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
buf.origptr);
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
/* just deal with xid, and done */
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
buf.origptr);
}
}
/*
* Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
*/
static void
DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void
xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
*/
static void
DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void
xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
*/
static void
DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void
standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
*/
static void
DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void
heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
*/
static void
DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void
heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
static void
DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void
logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;

View File

@ -28,7 +28,7 @@
* RmgrNames is an array of resource manager names, to make error messages
* a bit nicer.
*/
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
name,
static const char *RmgrNames[RM_MAX_ID + 1] = {

View File

@ -32,7 +32,7 @@
#include "storage/standbydefs.h"
#include "utils/relmapper.h"
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
{ name, desc, identify},
const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {

View File

@ -19,7 +19,7 @@ typedef uint8 RmgrId;
* Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
* file format.
*/
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
symname,
typedef enum RmgrIds

View File

@ -25,25 +25,25 @@
*/
/* symbol name, textual name, redo, desc, identify, startup, cleanup */
PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)

View File

@ -287,6 +287,9 @@ typedef enum
RECOVERY_TARGET_ACTION_SHUTDOWN
} RecoveryTargetAction;
struct LogicalDecodingContext;
struct XLogRecordBuffer;
/*
* Method table for resource managers.
*
@ -312,6 +315,8 @@ typedef struct RmgrData
void (*rm_startup) (void);
void (*rm_cleanup) (void);
void (*rm_mask) (char *pagedata, BlockNumber blkno);
void (*rm_decode) (struct LogicalDecodingContext *ctx,
struct XLogRecordBuffer *buf);
} RmgrData;
extern const RmgrData RmgrTable[];

View File

@ -14,7 +14,21 @@
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
typedef struct XLogRecordBuffer
{
XLogRecPtr origptr;
XLogRecPtr endptr;
XLogReaderState *record;
} XLogRecordBuffer;
extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
#endif