From 5005469cb2f2bf8a57344b43f97969292ebebb76 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 12 Nov 2014 18:52:49 +0100 Subject: [PATCH] Fix several weaknesses in slot and logical replication on-disk serialization. Heikki noticed in 544E23C0.8090605@vmware.com that slot.c and snapbuild.c were missing the FIN_CRC32 call when computing/checking checksums of on disk files. That doesn't lower the the error detection capabilities of the checksum, but is inconsistent with other usages. In a followup mail Heikki also noticed that, contrary to a comment, the 'version' and 'length' struct fields of replication slot's on disk data where not covered by the checksum. That's not likely to lead to actually missed corruption as those fields are cross checked with the expected version and the actual file length. But it's wrong nonetheless. As fixing these issues makes existing on disk files unreadable, bump the expected versions of on disk files for both slots and logical decoding historic catalog snapshots. This means that loading old files will fail with ERROR: "replication slot file ... has unsupported version 1" and ERROR: "snapbuild state file ... has unsupported version 1 instead of 2" respectively. Given the low likelihood of anybody already using these new features in a production setup that seems acceptable. Fixing these issues made me notice that there's no regression test covering the loading of historic snapshot from disk - so add one. Backpatch to 9.4 where these features were introduced. --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/ondisk_startup.out | 43 +++++++++++++++++++ .../test_decoding/specs/ondisk_startup.spec | 43 +++++++++++++++++++ src/backend/replication/logical/snapbuild.c | 6 ++- src/backend/replication/slot.c | 37 ++++++++++------ 5 files changed, 117 insertions(+), 14 deletions(-) create mode 100644 contrib/test_decoding/expected/ondisk_startup.out create mode 100644 contrib/test_decoding/specs/ondisk_startup.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 58e0f384cb..69e99fa0ac 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -53,7 +53,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding --extra-install=contrib/test_decoding \ $(REGRESSCHECKS) -ISOLATIONCHECKS=mxact delayed_startup concurrent_ddl_dml +ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml isolationcheck: all | submake-isolation submake-test_decoding $(MKDIR_P) isolation_output diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out new file mode 100644 index 0000000000..65115c830a --- /dev/null +++ b/contrib/test_decoding/expected/ondisk_startup.out @@ -0,0 +1,43 @@ +Parsed test spec with 3 sessions + +starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start +step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +?column? + +f +step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +?column? + +f +step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int; +step s2c: COMMIT; +step s1init: <... completed> +?column? + +init +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1checkpoint: CHECKPOINT; +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:1 addedbys2[integer]:null +COMMIT +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1alter: ALTER TABLE do_write ADD COLUMN addedbys1 int; +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:2 addedbys2[integer]:null +COMMIT +BEGIN +COMMIT +BEGIN +table public.do_write: INSERT: id[integer]:3 addedbys2[integer]:null addedbys1[integer]:null +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec new file mode 100644 index 0000000000..39c4a223ae --- /dev/null +++ b/contrib/test_decoding/specs/ondisk_startup.spec @@ -0,0 +1,43 @@ +# Force usage of ondisk decoding snapshots to test that code path. +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} +step "s1insert" { INSERT INTO do_write DEFAULT VALUES; } +step "s1checkpoint" { CHECKPOINT; } +step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; } +step "s2c" { COMMIT; } + + +session "s3" +setup { SET synchronous_commit=on; } + +step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s3c" { COMMIT; } + +# Force usage of ondisk snapshot by starting and not finishing a +# transaction with a assigned xid after consistency has been +# reached. In combination with a checkpoint forcing a snapshot to be +# written and a new restart point computed that'll lead to the usage +# of the snapshot. +permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7e7a0dffc0..84cef7247c 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1406,7 +1406,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 1 +#define SNAPBUILD_VERSION 2 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1552,6 +1552,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) COMP_CRC32(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; + FIN_CRC32(ondisk->checksum); + /* we have valid data now, open tempfile and write it there */ fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, @@ -1724,6 +1726,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) CloseTransientFile(fd); + FIN_CRC32(checksum); + /* verify checksum of what we've read */ if (!EQ_CRC32(checksum, ondisk.checksum)) ereport(ERROR, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 09be0ba48c..29191ffb95 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -61,18 +61,29 @@ typedef struct ReplicationSlotOnDisk uint32 version; uint32 length; + /* + * The actual data in the slot that follows can differ based on the above + * 'version'. + */ + ReplicationSlotPersistentData slotdata; } ReplicationSlotOnDisk; -/* size of the part of the slot that is version independent */ +/* size of version independent data */ #define ReplicationSlotOnDiskConstantSize \ offsetof(ReplicationSlotOnDisk, slotdata) -/* size of the slots that is not version indepenent */ -#define ReplicationSlotOnDiskDynamicSize \ +/* size of the part of the slot not covered by the checksum */ +#define SnapBuildOnDiskNotChecksummedSize \ + offsetof(ReplicationSlotOnDisk, version) +/* size of the part covered by the checksum */ +#define SnapBuildOnDiskChecksummedSize \ + sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize +/* size of the slot data that is version dependant */ +#define ReplicationSlotOnDiskV2Size \ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 1 /* version for new files */ +#define SLOT_VERSION 2 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -992,8 +1003,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) cp.magic = SLOT_MAGIC; INIT_CRC32(cp.checksum); - cp.version = 1; - cp.length = ReplicationSlotOnDiskDynamicSize; + cp.version = SLOT_VERSION; + cp.length = ReplicationSlotOnDiskV2Size; SpinLockAcquire(&slot->mutex); @@ -1002,8 +1013,9 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SpinLockRelease(&slot->mutex); COMP_CRC32(cp.checksum, - (char *) (&cp) + ReplicationSlotOnDiskConstantSize, - ReplicationSlotOnDiskDynamicSize); + (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskChecksummedSize); + FIN_CRC32(cp.checksum); if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) { @@ -1155,7 +1167,7 @@ RestoreSlotFromDisk(const char *name) path, cp.version))); /* boundary check on length */ - if (cp.length != ReplicationSlotOnDiskDynamicSize) + if (cp.length != ReplicationSlotOnDiskV2Size) ereport(PANIC, (errcode_for_file_access(), errmsg("replication slot file \"%s\" has corrupted length %u", @@ -1179,11 +1191,12 @@ RestoreSlotFromDisk(const char *name) CloseTransientFile(fd); - /* now verify the CRC32 */ + /* now verify the CRC */ INIT_CRC32(checksum); COMP_CRC32(checksum, - (char *) &cp + ReplicationSlotOnDiskConstantSize, - ReplicationSlotOnDiskDynamicSize); + (char *) &cp + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskChecksummedSize); + FIN_CRC32(checksum); if (!EQ_CRC32(checksum, cp.checksum)) ereport(PANIC,