diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 58e0f384cb..69e99fa0ac 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -53,7 +53,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding --extra-install=contrib/test_decoding \ $(REGRESSCHECKS) -ISOLATIONCHECKS=mxact delayed_startup concurrent_ddl_dml +ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml isolationcheck: all | submake-isolation submake-test_decoding $(MKDIR_P) isolation_output diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out new file mode 100644 index 0000000000..65115c830a --- /dev/null +++ b/contrib/test_decoding/expected/ondisk_startup.out @@ -0,0 +1,43 @@ +Parsed test spec with 3 sessions + +starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start +step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +?column? + +f +step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +?column? + +f +step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int; +step s2c: COMMIT; +step s1init: <... completed> +?column? + +init +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1checkpoint: CHECKPOINT; +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:1 addedbys2[integer]:null +COMMIT +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1alter: ALTER TABLE do_write ADD COLUMN addedbys1 int; +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:2 addedbys2[integer]:null +COMMIT +BEGIN +COMMIT +BEGIN +table public.do_write: INSERT: id[integer]:3 addedbys2[integer]:null addedbys1[integer]:null +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec new file mode 100644 index 0000000000..39c4a223ae --- /dev/null +++ b/contrib/test_decoding/specs/ondisk_startup.spec @@ -0,0 +1,43 @@ +# Force usage of ondisk decoding snapshots to test that code path. +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} +step "s1insert" { INSERT INTO do_write DEFAULT VALUES; } +step "s1checkpoint" { CHECKPOINT; } +step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; } +step "s2c" { COMMIT; } + + +session "s3" +setup { SET synchronous_commit=on; } + +step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s3c" { COMMIT; } + +# Force usage of ondisk snapshot by starting and not finishing a +# transaction with a assigned xid after consistency has been +# reached. In combination with a checkpoint forcing a snapshot to be +# written and a new restart point computed that'll lead to the usage +# of the snapshot. +permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7e7a0dffc0..84cef7247c 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1406,7 +1406,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 1 +#define SNAPBUILD_VERSION 2 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1552,6 +1552,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) COMP_CRC32(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; + FIN_CRC32(ondisk->checksum); + /* we have valid data now, open tempfile and write it there */ fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, @@ -1724,6 +1726,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) CloseTransientFile(fd); + FIN_CRC32(checksum); + /* verify checksum of what we've read */ if (!EQ_CRC32(checksum, ondisk.checksum)) ereport(ERROR, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 09be0ba48c..29191ffb95 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -61,18 +61,29 @@ typedef struct ReplicationSlotOnDisk uint32 version; uint32 length; + /* + * The actual data in the slot that follows can differ based on the above + * 'version'. + */ + ReplicationSlotPersistentData slotdata; } ReplicationSlotOnDisk; -/* size of the part of the slot that is version independent */ +/* size of version independent data */ #define ReplicationSlotOnDiskConstantSize \ offsetof(ReplicationSlotOnDisk, slotdata) -/* size of the slots that is not version indepenent */ -#define ReplicationSlotOnDiskDynamicSize \ +/* size of the part of the slot not covered by the checksum */ +#define SnapBuildOnDiskNotChecksummedSize \ + offsetof(ReplicationSlotOnDisk, version) +/* size of the part covered by the checksum */ +#define SnapBuildOnDiskChecksummedSize \ + sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize +/* size of the slot data that is version dependant */ +#define ReplicationSlotOnDiskV2Size \ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 1 /* version for new files */ +#define SLOT_VERSION 2 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -992,8 +1003,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) cp.magic = SLOT_MAGIC; INIT_CRC32(cp.checksum); - cp.version = 1; - cp.length = ReplicationSlotOnDiskDynamicSize; + cp.version = SLOT_VERSION; + cp.length = ReplicationSlotOnDiskV2Size; SpinLockAcquire(&slot->mutex); @@ -1002,8 +1013,9 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SpinLockRelease(&slot->mutex); COMP_CRC32(cp.checksum, - (char *) (&cp) + ReplicationSlotOnDiskConstantSize, - ReplicationSlotOnDiskDynamicSize); + (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskChecksummedSize); + FIN_CRC32(cp.checksum); if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) { @@ -1155,7 +1167,7 @@ RestoreSlotFromDisk(const char *name) path, cp.version))); /* boundary check on length */ - if (cp.length != ReplicationSlotOnDiskDynamicSize) + if (cp.length != ReplicationSlotOnDiskV2Size) ereport(PANIC, (errcode_for_file_access(), errmsg("replication slot file \"%s\" has corrupted length %u", @@ -1179,11 +1191,12 @@ RestoreSlotFromDisk(const char *name) CloseTransientFile(fd); - /* now verify the CRC32 */ + /* now verify the CRC */ INIT_CRC32(checksum); COMP_CRC32(checksum, - (char *) &cp + ReplicationSlotOnDiskConstantSize, - ReplicationSlotOnDiskDynamicSize); + (char *) &cp + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskChecksummedSize); + FIN_CRC32(checksum); if (!EQ_CRC32(checksum, cp.checksum)) ereport(PANIC,