Fix the incorrect assertion introduced in commit 7f13ac8123.

It has been incorrectly assumed in commit 7f13ac8123 that we can either
purge all or none in the catalog modifying xids list retrieved from a
serialized snapshot. It is quite possible that some of the xids in that
array are old enough to be pruned but not others.

As per buildfarm

Author: Amit Kapila and Masahiko Sawada
Reviwed-by: Masahiko Sawada
Discussion: https://postgr.es/m/CAA4eK1LBtv6ayE+TvCcPmC-xse=DVg=SmbyQD1nv_AaqcpUJEg@mail.gmail.com
This commit is contained in:
Amit Kapila 2022-08-29 08:10:10 +05:30
parent 8c7fc86ca9
commit d2169c9985
3 changed files with 106 additions and 20 deletions

View File

@ -1,4 +1,4 @@
Parsed test spec with 2 sessions
Parsed test spec with 3 sessions
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
@ -42,3 +42,57 @@ COMMIT
stop
(1 row)
starting permutation: s0_init s0_begin s0_truncate s2_begin s2_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s2_commit s1_checkpoint s1_get_changes s0_commit s1_get_changes
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
?column?
--------
init
(1 row)
step s0_begin: BEGIN;
step s0_truncate: TRUNCATE tbl1;
step s2_begin: BEGIN;
step s2_truncate: TRUNCATE tbl2;
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
----
(0 rows)
step s0_commit: COMMIT;
step s0_begin: BEGIN;
step s0_insert: INSERT INTO tbl1 VALUES (1);
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
---------------------------------------
BEGIN
table public.tbl1: TRUNCATE: (no-flags)
COMMIT
(3 rows)
step s2_commit: COMMIT;
step s1_checkpoint: CHECKPOINT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
---------------------------------------
BEGIN
table public.tbl2: TRUNCATE: (no-flags)
COMMIT
(3 rows)
step s0_commit: COMMIT;
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
data
-------------------------------------------------------------
BEGIN
table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
COMMIT
(3 rows)
?column?
--------
stop
(1 row)

View File

@ -3,12 +3,15 @@
setup
{
DROP TABLE IF EXISTS tbl1;
DROP TABLE IF EXISTS tbl2;
CREATE TABLE tbl1 (val1 integer, val2 integer);
CREATE TABLE tbl2 (val1 integer, val2 integer);
}
teardown
{
DROP TABLE tbl1;
DROP TABLE tbl2;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
@ -26,6 +29,12 @@ setup { SET synchronous_commit=on; }
step "s1_checkpoint" { CHECKPOINT; }
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
session "s2"
setup { SET synchronous_commit=on; }
step "s2_begin" { BEGIN; }
step "s2_truncate" { TRUNCATE tbl2; }
step "s2_commit" { COMMIT; }
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
# during the first checkpoint execution. This transaction must be marked as
@ -37,3 +46,14 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_
# record written by bgwriter. One might think we can either stop the bgwriter or
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
# Test that we can purge the old catalog modifying transactions after restoring
# them from the serialized snapshot. The first checkpoint will serialize the list
# of two catalog modifying xacts. The purpose of the second checkpoint is to allow
# partial pruning of the list of catalog modifying xact. The third checkpoint
# followed by get_changes establishes a restart_point at the first checkpoint LSN.
# The last get_changes will start decoding from the first checkpoint which
# restores the list of catalog modifying xacts and then while decoding the second
# checkpoint record it prunes one of the xacts in that list and when decoding the
# next checkpoint, it will completely prune that list.
permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"

View File

@ -969,28 +969,40 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder)
pfree(workspace);
/*
* Either all the xacts got purged or none. It is only possible to
* partially remove the xids from this array if one or more of the xids
* are still running but not all. That can happen if we start decoding
* from a point (LSN where the snapshot state became consistent) where all
* the xacts in this were running and then at least one of those got
* committed and a few are still running. We will never start from such a
* point because we won't move the slot's restart_lsn past the point where
* the oldest running transaction's restart_decoding_lsn is.
* Purge xids in ->catchange as well. The purged array must also be sorted
* in xidComparator order.
*/
if (builder->catchange.xcnt == 0 ||
TransactionIdFollowsOrEquals(builder->catchange.xip[0],
builder->xmin))
return;
if (builder->catchange.xcnt > 0)
{
/*
* Since catchange.xip is sorted, we find the lower bound of xids that
* are still interesting.
*/
for (off = 0; off < builder->catchange.xcnt; off++)
{
if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
builder->xmin))
break;
}
Assert(TransactionIdFollows(builder->xmin,
builder->catchange.xip[builder->catchange.xcnt - 1]));
pfree(builder->catchange.xip);
builder->catchange.xip = NULL;
builder->catchange.xcnt = 0;
surviving_xids = builder->catchange.xcnt - off;
elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
builder->xmin);
if (surviving_xids > 0)
{
memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
surviving_xids * sizeof(TransactionId));
}
else
{
pfree(builder->catchange.xip);
builder->catchange.xip = NULL;
}
elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
(uint32) builder->catchange.xcnt, (uint32) surviving_xids,
builder->xmin, builder->xmax);
builder->catchange.xcnt = surviving_xids;
}
}
/*