From 7a5f6b47488d824b1ea1326be4337e2c32325ff2 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Wed, 19 Jan 2022 14:58:04 -0800 Subject: [PATCH] Make logical decoding a part of the rmgr. Add a new rmgr method, rm_decode, and use that rather than a switch statement. In preparation for rmgr extensibility. Reviewed-by: Julien Rouhaud Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com Discussion: https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud --- src/backend/access/transam/rmgr.c | 5 +- src/backend/replication/logical/decode.c | 105 +++++------------------ src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/rmgrdesc.c | 2 +- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 44 +++++----- src/include/access/xlog_internal.h | 5 ++ src/include/replication/decode.h | 16 +++- 8 files changed, 69 insertions(+), 112 deletions(-) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b52..f8847d5aeb 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,14 +24,15 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ + { name, redo, desc, identify, startup, cleanup, mask, decode }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1d22208c1a..9b450c9f90 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -43,21 +43,6 @@ #include "replication/snapbuild.h" #include "storage/standby.h" -typedef struct XLogRecordBuffer -{ - XLogRecPtr origptr; - XLogRecPtr endptr; - XLogReaderState *record; -} XLogRecordBuffer; - -/* RMGR Handlers */ -static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); - /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { XLogRecordBuffer buf; TransactionId txid; + RmgrId rmid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; @@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } - /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrId) XLogRecGetRmid(record)) + rmid = XLogRecGetRmid(record); + + if (RmgrTable[rmid].rm_decode != NULL) + RmgrTable[rmid].rm_decode(ctx, &buf); + else { - /* - * Rmgrs we care about for logical decoding. Add new rmgrs in - * rmgrlist.h's order. - */ - case RM_XLOG_ID: - DecodeXLogOp(ctx, &buf); - break; - - case RM_XACT_ID: - DecodeXactOp(ctx, &buf); - break; - - case RM_STANDBY_ID: - DecodeStandbyOp(ctx, &buf); - break; - - case RM_HEAP2_ID: - DecodeHeap2Op(ctx, &buf); - break; - - case RM_HEAP_ID: - DecodeHeapOp(ctx, &buf); - break; - - case RM_LOGICALMSG_ID: - DecodeLogicalMsgOp(ctx, &buf); - break; - - /* - * Rmgrs irrelevant for logical decoding; they describe stuff not - * represented in logical decoding. Add new rmgrs in rmgrlist.h's - * order. - */ - case RM_SMGR_ID: - case RM_CLOG_ID: - case RM_DBASE_ID: - case RM_TBLSPC_ID: - case RM_MULTIXACT_ID: - case RM_RELMAP_ID: - case RM_BTREE_ID: - case RM_HASH_ID: - case RM_GIN_ID: - case RM_GIST_ID: - case RM_SEQ_ID: - case RM_SPGIST_ID: - case RM_BRIN_ID: - case RM_COMMIT_TS_ID: - case RM_REPLORIGIN_ID: - case RM_GENERIC_ID: - /* just deal with xid, and done */ - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), - buf.origptr); - break; - case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); } } /* * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; @@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; @@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; @@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 9143797458..f6cfee4ce8 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1..6a4ebd1310 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56a4c..d9b512630c 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index ed751aaf03..9a74721c97 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,25 +25,25 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index e27fca0cc0..849954a8e5 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -287,6 +287,9 @@ typedef enum RECOVERY_TARGET_ACTION_SHUTDOWN } RecoveryTargetAction; +struct LogicalDecodingContext; +struct XLogRecordBuffer; + /* * Method table for resource managers. * @@ -312,6 +315,8 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + void (*rm_decode) (struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); } RmgrData; extern const RmgrData RmgrTable[]; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 1db73f3554..a33c2a718a 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -14,7 +14,21 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" -void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, +typedef struct XLogRecordBuffer +{ + XLogRecPtr origptr; + XLogRecPtr endptr; + XLogReaderState *record; +} XLogRecordBuffer; + +extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + +extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); #endif