Teach contrib/amcheck to check the unique constraint violation

Add the 'checkunique' argument to bt_index_check() and bt_index_parent_check().
When the flag is specified the procedures will check the unique constraint
violation for unique indexes.  Only one heap entry for all equal keys in
the index should be visible (including posting list entries).  Report an error
otherwise.

pg_amcheck called with the --checkunique option will do the same check for all
the indexes it checks.

Author: Anastasia Lubennikova <lubennikovaav@gmail.com>
Author: Pavel Borisov <pashkin.elfe@gmail.com>
Author: Maxim Orlov <orlovmg@gmail.com>
Reviewed-by: Mark Dilger <mark.dilger@enterprisedb.com>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Reviewed-by: Peter Geoghegan <pg@bowt.ie>
Reviewed-by: Aleksander Alekseev <aleksander@timescale.com>
Discussion: https://postgr.es/m/CALT9ZEHRn5xAM5boga0qnrCmPV52bScEK2QnQ1HmUZDD301JEg%40mail.gmail.com
This commit is contained in:
Alexander Korotkov 2023-10-28 00:21:23 +03:00
parent 8b62b441ff
commit 5ae2087202
13 changed files with 830 additions and 23 deletions

View File

@ -7,7 +7,7 @@ OBJS = \
verify_nbtree.o
EXTENSION = amcheck
DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql
DATA = amcheck--1.3--1.4.sql amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql
PGFILEDESC = "amcheck - function for verifying relation integrity"
REGRESS = check check_btree check_heap

View File

@ -0,0 +1,29 @@
/* contrib/amcheck/amcheck--1.3--1.4.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.4'" to load this file. \quit
-- In order to avoid issues with dependencies when updating amcheck to 1.4,
-- create new, overloaded versions of the 1.2 bt_index_parent_check signature,
-- and 1.1 bt_index_check signature.
--
-- bt_index_parent_check()
--
CREATE FUNCTION bt_index_parent_check(index regclass,
heapallindexed boolean, rootdescend boolean, checkunique boolean)
RETURNS VOID
AS 'MODULE_PATHNAME', 'bt_index_parent_check'
LANGUAGE C STRICT PARALLEL RESTRICTED;
--
-- bt_index_check()
--
CREATE FUNCTION bt_index_check(index regclass,
heapallindexed boolean, checkunique boolean)
RETURNS VOID
AS 'MODULE_PATHNAME', 'bt_index_check'
LANGUAGE C STRICT PARALLEL RESTRICTED;
-- We don't want this to be available to public
REVOKE ALL ON FUNCTION bt_index_parent_check(regclass, boolean, boolean, boolean) FROM PUBLIC;
REVOKE ALL ON FUNCTION bt_index_check(regclass, boolean, boolean) FROM PUBLIC;

View File

@ -1,5 +1,5 @@
# amcheck extension
comment = 'functions for verifying relation integrity'
default_version = '1.3'
default_version = '1.4'
module_pathname = '$libdir/amcheck'
relocatable = true

View File

@ -199,6 +199,47 @@ SELECT bt_index_check('bttest_a_expr_idx', true);
(1 row)
-- UNIQUE constraint check
SELECT bt_index_check('bttest_a_idx', heapallindexed => true, checkunique => true);
bt_index_check
----------------
(1 row)
SELECT bt_index_check('bttest_b_idx', heapallindexed => false, checkunique => true);
bt_index_check
----------------
(1 row)
SELECT bt_index_parent_check('bttest_a_idx', heapallindexed => true, rootdescend => true, checkunique => true);
bt_index_parent_check
-----------------------
(1 row)
SELECT bt_index_parent_check('bttest_b_idx', heapallindexed => true, rootdescend => false, checkunique => true);
bt_index_parent_check
-----------------------
(1 row)
-- Check that null values in an unique index are not treated as equal
CREATE TABLE bttest_unique_nulls (a serial, b int, c int UNIQUE);
INSERT INTO bttest_unique_nulls VALUES (generate_series(1, 10000), 2, default);
SELECT bt_index_check('bttest_unique_nulls_c_key', heapallindexed => true, checkunique => true);
bt_index_check
----------------
(1 row)
CREATE INDEX on bttest_unique_nulls (b,c);
SELECT bt_index_check('bttest_unique_nulls_b_c_idx', heapallindexed => true, checkunique => true);
bt_index_check
----------------
(1 row)
-- cleanup
DROP TABLE bttest_a;
DROP TABLE bttest_b;
@ -206,5 +247,6 @@ DROP TABLE bttest_multi;
DROP TABLE delete_test_table;
DROP TABLE toast_bug;
DROP FUNCTION ifun(int8);
DROP TABLE bttest_unique_nulls;
DROP OWNED BY regress_bttest_role; -- permissions
DROP ROLE regress_bttest_role;

View File

@ -23,6 +23,7 @@ install_data(
'amcheck--1.0--1.1.sql',
'amcheck--1.1--1.2.sql',
'amcheck--1.2--1.3.sql',
'amcheck--1.3--1.4.sql',
kwargs: contrib_data_args,
)
@ -42,6 +43,7 @@ tests += {
't/001_verify_heapam.pl',
't/002_cic.pl',
't/003_cic_2pc.pl',
't/004_verify_nbtree_unique.pl',
],
},
}

View File

@ -135,6 +135,19 @@ CREATE INDEX bttest_a_expr_idx ON bttest_a ((ifun(id) + ifun(0)))
SELECT bt_index_check('bttest_a_expr_idx', true);
-- UNIQUE constraint check
SELECT bt_index_check('bttest_a_idx', heapallindexed => true, checkunique => true);
SELECT bt_index_check('bttest_b_idx', heapallindexed => false, checkunique => true);
SELECT bt_index_parent_check('bttest_a_idx', heapallindexed => true, rootdescend => true, checkunique => true);
SELECT bt_index_parent_check('bttest_b_idx', heapallindexed => true, rootdescend => false, checkunique => true);
-- Check that null values in an unique index are not treated as equal
CREATE TABLE bttest_unique_nulls (a serial, b int, c int UNIQUE);
INSERT INTO bttest_unique_nulls VALUES (generate_series(1, 10000), 2, default);
SELECT bt_index_check('bttest_unique_nulls_c_key', heapallindexed => true, checkunique => true);
CREATE INDEX on bttest_unique_nulls (b,c);
SELECT bt_index_check('bttest_unique_nulls_b_c_idx', heapallindexed => true, checkunique => true);
-- cleanup
DROP TABLE bttest_a;
DROP TABLE bttest_b;
@ -142,5 +155,6 @@ DROP TABLE bttest_multi;
DROP TABLE delete_test_table;
DROP TABLE toast_bug;
DROP FUNCTION ifun(int8);
DROP TABLE bttest_unique_nulls;
DROP OWNED BY regress_bttest_role; -- permissions
DROP ROLE regress_bttest_role;

View File

@ -0,0 +1,244 @@
# Copyright (c) 2023, PostgreSQL Global Development Group
# This regression test checks the behavior of the btree validation in the
# presence of breaking sort order changes.
#
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
my $node = PostgreSQL::Test::Cluster->new('test');
$node->init;
$node->append_conf('postgresql.conf', 'autovacuum = off');
$node->start;
# Create a custom operator class and an index which uses it.
$node->safe_psql(
'postgres', q(
CREATE EXTENSION amcheck;
CREATE FUNCTION ok_cmp (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT
CASE WHEN $1 < $2 THEN -1
WHEN $1 > $2 THEN 1
ELSE 0
END;
$$;
---
--- Check 1: uniqueness violation.
---
CREATE FUNCTION ok_cmp1 (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT ok_cmp($1, $2);
$$;
---
--- Make values 768 and 769 look equal.
---
CREATE FUNCTION bad_cmp1 (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT
CASE WHEN ($1 = 768 AND $2 = 769) OR
($1 = 769 AND $2 = 768) THEN 0
ELSE ok_cmp($1, $2)
END;
$$;
---
--- Check 2: uniqueness violation without deduplication.
---
CREATE FUNCTION ok_cmp2 (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT ok_cmp($1, $2);
$$;
CREATE FUNCTION bad_cmp2 (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT
CASE WHEN $1 = $2 AND $1 = 400 THEN -1
ELSE ok_cmp($1, $2)
END;
$$;
---
--- Check 3: uniqueness violation with deduplication.
---
CREATE FUNCTION ok_cmp3 (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT ok_cmp($1, $2);
$$;
CREATE FUNCTION bad_cmp3 (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT bad_cmp2($1, $2);
$$;
---
--- Create data.
---
CREATE TABLE bttest_unique1 (i int4);
INSERT INTO bttest_unique1
(SELECT * FROM generate_series(1, 1024) gs);
CREATE TABLE bttest_unique2 (i int4);
INSERT INTO bttest_unique2(i)
(SELECT * FROM generate_series(1, 400) gs);
INSERT INTO bttest_unique2
(SELECT * FROM generate_series(400, 1024) gs);
CREATE TABLE bttest_unique3 (i int4);
INSERT INTO bttest_unique3
SELECT * FROM bttest_unique2;
CREATE OPERATOR CLASS int4_custom_ops1 FOR TYPE int4 USING btree AS
OPERATOR 1 < (int4, int4), OPERATOR 2 <= (int4, int4),
OPERATOR 3 = (int4, int4), OPERATOR 4 >= (int4, int4),
OPERATOR 5 > (int4, int4), FUNCTION 1 ok_cmp1(int4, int4);
CREATE OPERATOR CLASS int4_custom_ops2 FOR TYPE int4 USING btree AS
OPERATOR 1 < (int4, int4), OPERATOR 2 <= (int4, int4),
OPERATOR 3 = (int4, int4), OPERATOR 4 >= (int4, int4),
OPERATOR 5 > (int4, int4), FUNCTION 1 bad_cmp2(int4, int4);
CREATE OPERATOR CLASS int4_custom_ops3 FOR TYPE int4 USING btree AS
OPERATOR 1 < (int4, int4), OPERATOR 2 <= (int4, int4),
OPERATOR 3 = (int4, int4), OPERATOR 4 >= (int4, int4),
OPERATOR 5 > (int4, int4), FUNCTION 1 bad_cmp3(int4, int4);
CREATE UNIQUE INDEX bttest_unique_idx1
ON bttest_unique1
USING btree (i int4_custom_ops1)
WITH (deduplicate_items = off);
CREATE UNIQUE INDEX bttest_unique_idx2
ON bttest_unique2
USING btree (i int4_custom_ops2)
WITH (deduplicate_items = off);
CREATE UNIQUE INDEX bttest_unique_idx3
ON bttest_unique3
USING btree (i int4_custom_ops3)
WITH (deduplicate_items = on);
));
my ($result, $stdout, $stderr);
#
# Test 1.
# - insert seq values
# - create unique index
# - break cmp function
# - amcheck finds the uniqueness violation
#
# We have not yet broken the index, so we should get no corruption
$result = $node->safe_psql(
'postgres', q(
SELECT bt_index_check('bttest_unique_idx1', true, true);
));
is($result, '', 'run amcheck on non-broken bttest_unique_idx1');
# Change the operator class to use a function which considers certain different
# values to be equal.
$node->safe_psql(
'postgres', q(
UPDATE pg_catalog.pg_amproc SET
amproc = 'bad_cmp1'::regproc
WHERE amproc = 'ok_cmp1'::regproc;
));
($result, $stdout, $stderr) = $node->psql(
'postgres', q(
SELECT bt_index_check('bttest_unique_idx1', true, true);
));
ok( $stderr =~ /index uniqueness is violated for index "bttest_unique_idx1"/,
'detected uniqueness violation for index "bttest_unique_idx1"');
#
# Test 2.
# - break cmp function
# - insert seq values with duplicates
# - create unique index
# - make cmp function correct
# - amcheck finds the uniqueness violation
#
# Due to bad cmp function we expect amcheck to detect item order violation,
# but no uniqueness violation.
($result, $stdout, $stderr) = $node->psql(
'postgres', q(
SELECT bt_index_check('bttest_unique_idx2', true, true);
));
ok( $stderr =~ /item order invariant violated for index "bttest_unique_idx2"/,
'detected item order invariant violation for index "bttest_unique_idx2"');
$node->safe_psql(
'postgres', q(
UPDATE pg_catalog.pg_amproc SET
amproc = 'ok_cmp2'::regproc
WHERE amproc = 'bad_cmp2'::regproc;
));
($result, $stdout, $stderr) = $node->psql(
'postgres', q(
SELECT bt_index_check('bttest_unique_idx2', true, true);
));
ok( $stderr =~ /index uniqueness is violated for index "bttest_unique_idx2"/,
'detected uniqueness violation for index "bttest_unique_idx2"');
#
# Test 3.
# - same as Test 2, but with index deduplication
#
# Then uniqueness violation is detected between different posting list
# entries inside one index entry.
#
# Due to bad cmp function we expect amcheck to detect item order violation,
# but no uniqueness violation.
($result, $stdout, $stderr) = $node->psql(
'postgres', q(
SELECT bt_index_check('bttest_unique_idx3', true, true);
));
ok( $stderr =~ /item order invariant violated for index "bttest_unique_idx3"/,
'detected item order invariant violation for index "bttest_unique_idx3"');
# For unique index deduplication is possible only for same values, but
# with different visibility.
$node->safe_psql(
'postgres', q(
DELETE FROM bttest_unique3 WHERE 380 <= i AND i <= 420;
INSERT INTO bttest_unique3 (SELECT * FROM generate_series(380, 420));
INSERT INTO bttest_unique3 VALUES (400);
DELETE FROM bttest_unique3 WHERE 380 <= i AND i <= 420;
INSERT INTO bttest_unique3 (SELECT * FROM generate_series(380, 420));
INSERT INTO bttest_unique3 VALUES (400);
DELETE FROM bttest_unique3 WHERE 380 <= i AND i <= 420;
INSERT INTO bttest_unique3 (SELECT * FROM generate_series(380, 420));
INSERT INTO bttest_unique3 VALUES (400);
));
$node->safe_psql(
'postgres', q(
UPDATE pg_catalog.pg_amproc SET
amproc = 'ok_cmp3'::regproc
WHERE amproc = 'bad_cmp3'::regproc;
));
($result, $stdout, $stderr) = $node->psql(
'postgres', q(
SELECT bt_index_check('bttest_unique_idx3', true, true);
));
ok( $stderr =~ /index uniqueness is violated for index "bttest_unique_idx3"/,
'detected uniqueness violation for index "bttest_unique_idx3"');
$node->stop;
done_testing();

View File

@ -81,11 +81,19 @@ typedef struct BtreeCheckState
bool heapallindexed;
/* Also making sure non-pivot tuples can be found by new search? */
bool rootdescend;
/* Also check uniqueness constraint if index is unique */
bool checkunique;
/* Per-page context */
MemoryContext targetcontext;
/* Buffer access strategy */
BufferAccessStrategy checkstrategy;
/*
* Info for uniqueness checking. Fill these fields once per index check.
*/
IndexInfo *indexinfo;
Snapshot snapshot;
/*
* Mutable state, for verification of particular page:
*/
@ -140,19 +148,33 @@ PG_FUNCTION_INFO_V1(bt_index_check);
PG_FUNCTION_INFO_V1(bt_index_parent_check);
static void bt_index_check_internal(Oid indrelid, bool parentcheck,
bool heapallindexed, bool rootdescend);
bool heapallindexed, bool rootdescend,
bool checkunique);
static inline void btree_index_checkable(Relation rel);
static inline bool btree_index_mainfork_expected(Relation rel);
static void bt_check_every_level(Relation rel, Relation heaprel,
bool heapkeyspace, bool readonly, bool heapallindexed,
bool rootdescend);
bool rootdescend, bool checkunique);
static BtreeLevel bt_check_level_from_leftmost(BtreeCheckState *state,
BtreeLevel level);
static void bt_recheck_sibling_links(BtreeCheckState *state,
BlockNumber btpo_prev_from_target,
BlockNumber leftcurrent);
static bool heap_entry_is_visible(BtreeCheckState *state, ItemPointer tid);
static void bt_report_duplicate(BtreeCheckState *state, ItemPointer tid,
BlockNumber block, OffsetNumber offset,
int posting, ItemPointer nexttid,
BlockNumber nblock, OffsetNumber noffset,
int nposting);
static void bt_entry_unique_check(BtreeCheckState *state, IndexTuple itup,
BlockNumber targetblock,
OffsetNumber offset, int *lVis_i,
ItemPointer *lVis_tid,
OffsetNumber *lVis_offset,
BlockNumber *lVis_block);
static void bt_target_page_check(BtreeCheckState *state);
static BTScanInsert bt_right_page_check_scankey(BtreeCheckState *state);
static BTScanInsert bt_right_page_check_scankey(BtreeCheckState *state,
OffsetNumber *rightfirstoffset);
static void bt_child_check(BtreeCheckState *state, BTScanInsert targetkey,
OffsetNumber downlinkoffnum);
static void bt_child_highkey_check(BtreeCheckState *state,
@ -192,7 +214,7 @@ static inline ItemPointer BTreeTupleGetHeapTIDCareful(BtreeCheckState *state,
static inline ItemPointer BTreeTupleGetPointsToTID(IndexTuple itup);
/*
* bt_index_check(index regclass, heapallindexed boolean)
* bt_index_check(index regclass, heapallindexed boolean, checkunique boolean)
*
* Verify integrity of B-Tree index.
*
@ -205,17 +227,20 @@ bt_index_check(PG_FUNCTION_ARGS)
{
Oid indrelid = PG_GETARG_OID(0);
bool heapallindexed = false;
bool checkunique = false;
if (PG_NARGS() == 2)
if (PG_NARGS() >= 2)
heapallindexed = PG_GETARG_BOOL(1);
if (PG_NARGS() == 3)
checkunique = PG_GETARG_BOOL(2);
bt_index_check_internal(indrelid, false, heapallindexed, false);
bt_index_check_internal(indrelid, false, heapallindexed, false, checkunique);
PG_RETURN_VOID();
}
/*
* bt_index_parent_check(index regclass, heapallindexed boolean)
* bt_index_parent_check(index regclass, heapallindexed boolean, rootdescend boolean, checkunique boolean)
*
* Verify integrity of B-Tree index.
*
@ -229,13 +254,16 @@ bt_index_parent_check(PG_FUNCTION_ARGS)
Oid indrelid = PG_GETARG_OID(0);
bool heapallindexed = false;
bool rootdescend = false;
bool checkunique = false;
if (PG_NARGS() >= 2)
heapallindexed = PG_GETARG_BOOL(1);
if (PG_NARGS() == 3)
if (PG_NARGS() >= 3)
rootdescend = PG_GETARG_BOOL(2);
if (PG_NARGS() == 4)
checkunique = PG_GETARG_BOOL(3);
bt_index_check_internal(indrelid, true, heapallindexed, rootdescend);
bt_index_check_internal(indrelid, true, heapallindexed, rootdescend, checkunique);
PG_RETURN_VOID();
}
@ -245,7 +273,7 @@ bt_index_parent_check(PG_FUNCTION_ARGS)
*/
static void
bt_index_check_internal(Oid indrelid, bool parentcheck, bool heapallindexed,
bool rootdescend)
bool rootdescend, bool checkunique)
{
Oid heapid;
Relation indrel;
@ -356,7 +384,7 @@ bt_index_check_internal(Oid indrelid, bool parentcheck, bool heapallindexed,
/* Check index, possibly against table it is an index on */
bt_check_every_level(indrel, heaprel, heapkeyspace, parentcheck,
heapallindexed, rootdescend);
heapallindexed, rootdescend, checkunique);
}
/* Roll back any GUC changes executed by index functions */
@ -457,7 +485,8 @@ btree_index_mainfork_expected(Relation rel)
*/
static void
bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
bool readonly, bool heapallindexed, bool rootdescend)
bool readonly, bool heapallindexed, bool rootdescend,
bool checkunique)
{
BtreeCheckState *state;
Page metapage;
@ -489,6 +518,8 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
state->readonly = readonly;
state->heapallindexed = heapallindexed;
state->rootdescend = rootdescend;
state->checkunique = checkunique;
state->snapshot = InvalidSnapshot;
if (state->heapallindexed)
{
@ -546,6 +577,23 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
}
}
/*
* We need a snapshot to check the uniqueness of the index. For better
* performance take it once per index check. If snapshot already taken
* reuse it.
*/
if (state->checkunique)
{
state->indexinfo = BuildIndexInfo(state->rel);
if (state->indexinfo->ii_Unique)
{
if (snapshot != SnapshotAny)
state->snapshot = snapshot;
else
state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
}
}
Assert(!state->rootdescend || state->readonly);
if (state->rootdescend && !state->heapkeyspace)
ereport(ERROR,
@ -672,6 +720,8 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
}
/* Be tidy: */
if (snapshot == SnapshotAny && state->snapshot != InvalidSnapshot)
UnregisterSnapshot(state->snapshot);
MemoryContextDelete(state->targetcontext);
}
@ -912,6 +962,161 @@ nextpage:
return nextleveldown;
}
/* Check visibility of the table entry referenced by nbtree index */
static bool
heap_entry_is_visible(BtreeCheckState *state, ItemPointer tid)
{
bool tid_visible;
TupleTableSlot *slot = table_slot_create(state->heaprel, NULL);
tid_visible = table_tuple_fetch_row_version(state->heaprel,
tid, state->snapshot, slot);
if (slot != NULL)
ExecDropSingleTupleTableSlot(slot);
return tid_visible;
}
/*
* Prepare an error message for unique constrain violation in
* a btree index and report ERROR.
*/
static void
bt_report_duplicate(BtreeCheckState *state,
ItemPointer tid, BlockNumber block, OffsetNumber offset,
int posting,
ItemPointer nexttid, BlockNumber nblock, OffsetNumber noffset,
int nposting)
{
char *htid,
*nhtid,
*itid,
*nitid = "",
*pposting = "",
*pnposting = "";
htid = psprintf("tid=(%u,%u)",
ItemPointerGetBlockNumberNoCheck(tid),
ItemPointerGetOffsetNumberNoCheck(tid));
nhtid = psprintf("tid=(%u,%u)",
ItemPointerGetBlockNumberNoCheck(nexttid),
ItemPointerGetOffsetNumberNoCheck(nexttid));
itid = psprintf("tid=(%u,%u)", block, offset);
if (nblock != block || noffset != offset)
nitid = psprintf(" tid=(%u,%u)", nblock, noffset);
if (posting >= 0)
pposting = psprintf(" posting %u", posting);
if (nposting >= 0)
pnposting = psprintf(" posting %u", nposting);
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("index uniqueness is violated for index \"%s\"",
RelationGetRelationName(state->rel)),
errdetail("Index %s%s and%s%s (point to heap %s and %s) page lsn=%X/%X.",
itid, pposting, nitid, pnposting, htid, nhtid,
LSN_FORMAT_ARGS(state->targetlsn))));
}
/* Check if current nbtree leaf entry complies with UNIQUE constraint */
static void
bt_entry_unique_check(BtreeCheckState *state, IndexTuple itup,
BlockNumber targetblock, OffsetNumber offset, int *lVis_i,
ItemPointer *lVis_tid, OffsetNumber *lVis_offset,
BlockNumber *lVis_block)
{
ItemPointer tid;
bool has_visible_entry = false;
Assert(targetblock != P_NONE);
/*
* Current tuple has posting list. Report duplicate if TID of any posting
* list entry is visible and lVis_tid is valid.
*/
if (BTreeTupleIsPosting(itup))
{
for (int i = 0; i < BTreeTupleGetNPosting(itup); i++)
{
tid = BTreeTupleGetPostingN(itup, i);
if (heap_entry_is_visible(state, tid))
{
has_visible_entry = true;
if (ItemPointerIsValid(*lVis_tid))
{
bt_report_duplicate(state,
*lVis_tid, *lVis_block,
*lVis_offset, *lVis_i,
tid, targetblock,
offset, i);
}
/*
* Prevent double reporting unique constraint violation between
* the posting list entries of the first tuple on the page after
* cross-page check.
*/
if (*lVis_block != targetblock && ItemPointerIsValid(*lVis_tid))
return;
*lVis_i = i;
*lVis_tid = tid;
*lVis_offset = offset;
*lVis_block = targetblock;
}
}
}
/*
* Current tuple has no posting list. If TID is visible save info about
* it for the next comparisons in the loop in bt_page_check(). Report
* duplicate if lVis_tid is already valid.
*/
else
{
tid = BTreeTupleGetHeapTID(itup);
if (heap_entry_is_visible(state, tid))
{
has_visible_entry = true;
if (ItemPointerIsValid(*lVis_tid))
{
bt_report_duplicate(state,
*lVis_tid, *lVis_block,
*lVis_offset, *lVis_i,
tid, targetblock,
offset, -1);
}
*lVis_i = -1;
*lVis_tid = tid;
*lVis_offset = offset;
*lVis_block = targetblock;
}
}
if (!has_visible_entry && *lVis_block != InvalidBlockNumber &&
*lVis_block != targetblock)
{
char *posting = "";
if (*lVis_i >= 0)
posting = psprintf(" posting %u", *lVis_i);
ereport(DEBUG1,
(errcode(ERRCODE_NO_DATA),
errmsg("index uniqueness can not be checked for index tid=(%u,%u) in index \"%s\"",
targetblock, offset,
RelationGetRelationName(state->rel)),
errdetail("It doesn't have visible heap tids and key is equal to the tid=(%u,%u)%s (points to heap tid=(%u,%u)).",
*lVis_block, *lVis_offset, posting,
ItemPointerGetBlockNumberNoCheck(*lVis_tid),
ItemPointerGetOffsetNumberNoCheck(*lVis_tid)),
errhint("VACUUM the table and repeat the check.")));
}
}
/*
* Raise an error when target page's left link does not point back to the
* previous target page, called leftcurrent here. The leftcurrent page's
@ -1066,6 +1271,9 @@ bt_recheck_sibling_links(BtreeCheckState *state,
* - Various checks on the structure of tuples themselves. For example, check
* that non-pivot tuples have no truncated attributes.
*
* - For index with unique constraint make sure that only one of table entries
* for equal keys is visible.
*
* Furthermore, when state passed shows ShareLock held, function also checks:
*
* - That all child pages respect strict lower bound from parent's pivot
@ -1088,6 +1296,13 @@ bt_target_page_check(BtreeCheckState *state)
OffsetNumber max;
BTPageOpaque topaque;
/* last visible entry info for checking indexes with unique constraint */
int lVis_i = -1; /* the position of last visible item for
* posting tuple. for non-posting tuple (-1) */
ItemPointer lVis_tid = NULL;
BlockNumber lVis_block = InvalidBlockNumber;
OffsetNumber lVis_offset = InvalidOffsetNumber;
topaque = BTPageGetOpaque(state->target);
max = PageGetMaxOffsetNumber(state->target);
@ -1478,6 +1693,45 @@ bt_target_page_check(BtreeCheckState *state)
LSN_FORMAT_ARGS(state->targetlsn))));
}
/*
* If the index is unique verify entries uniqueness by checking the heap
* tuples visibility.
*/
if (state->checkunique && state->indexinfo->ii_Unique &&
P_ISLEAF(topaque) && !skey->anynullkeys)
bt_entry_unique_check(state, itup, state->targetblock, offset,
&lVis_i, &lVis_tid, &lVis_offset,
&lVis_block);
if (state->checkunique && state->indexinfo->ii_Unique &&
P_ISLEAF(topaque) && OffsetNumberNext(offset) <= max)
{
/* Save current scankey tid */
scantid = skey->scantid;
/*
* Invalidate scankey tid to make _bt_compare compare only keys in
* the item to report equality even if heap TIDs are different
*/
skey->scantid = NULL;
/*
* If next key tuple is different, invalidate last visible entry
* data (whole index tuple or last posting in index tuple). Key
* containing null value does not violate unique constraint and
* treated as different to any other key.
*/
if (_bt_compare(state->rel, skey, state->target,
OffsetNumberNext(offset)) != 0 || skey->anynullkeys)
{
lVis_i = -1;
lVis_tid = NULL;
lVis_block = InvalidBlockNumber;
lVis_offset = InvalidOffsetNumber;
}
skey->scantid = scantid; /* Restore saved scan key state */
}
/*
* * Last item check *
*
@ -1495,12 +1749,16 @@ bt_target_page_check(BtreeCheckState *state)
* available from sibling for various reasons, though (e.g., target is
* the rightmost page on level).
*/
else if (offset == max)
if (offset == max)
{
BTScanInsert rightkey;
BlockNumber rightblock_number;
/* first offset on a right index page (log only) */
OffsetNumber rightfirstoffset = InvalidOffsetNumber;
/* Get item in next/right page */
rightkey = bt_right_page_check_scankey(state);
rightkey = bt_right_page_check_scankey(state, &rightfirstoffset);
if (rightkey &&
!invariant_g_offset(state, rightkey, max))
@ -1534,6 +1792,45 @@ bt_target_page_check(BtreeCheckState *state)
state->targetblock, offset,
LSN_FORMAT_ARGS(state->targetlsn))));
}
/*
* If index has unique constraint make sure that no more than one
* found equal items is visible.
*/
rightblock_number = topaque->btpo_next;
if (state->checkunique && state->indexinfo->ii_Unique &&
rightkey && P_ISLEAF(topaque) && rightblock_number != P_NONE)
{
elog(DEBUG2, "check cross page unique condition");
/*
* Make _bt_compare compare only index keys without heap TIDs.
* rightkey->scantid is modified destructively but it is ok
* for it is not used later.
*/
rightkey->scantid = NULL;
/* The first key on the next page is the same */
if (_bt_compare(state->rel, rightkey, state->target, max) == 0 && !rightkey->anynullkeys)
{
elog(DEBUG2, "cross page equal keys");
state->target = palloc_btree_page(state,
rightblock_number);
topaque = BTPageGetOpaque(state->target);
if (P_IGNORE(topaque) || !P_ISLEAF(topaque))
break;
itemid = PageGetItemIdCareful(state, rightblock_number,
state->target,
rightfirstoffset);
itup = (IndexTuple) PageGetItem(state->target, itemid);
bt_entry_unique_check(state, itup, rightblock_number, rightfirstoffset,
&lVis_i, &lVis_tid, &lVis_offset,
&lVis_block);
}
}
}
/*
@ -1579,9 +1876,11 @@ bt_target_page_check(BtreeCheckState *state)
*
* Note that !readonly callers must reverify that target page has not
* been concurrently deleted.
*
* Save rightfirstdataoffset for detailed error message.
*/
static BTScanInsert
bt_right_page_check_scankey(BtreeCheckState *state)
bt_right_page_check_scankey(BtreeCheckState *state, OffsetNumber *rightfirstoffset)
{
BTPageOpaque opaque;
ItemId rightitem;
@ -1748,6 +2047,7 @@ bt_right_page_check_scankey(BtreeCheckState *state)
/* Return first data item (if any) */
rightitem = PageGetItemIdCareful(state, targetnext, rightpage,
P_FIRSTDATAKEY(opaque));
*rightfirstoffset = P_FIRSTDATAKEY(opaque);
}
else if (!P_ISLEAF(opaque) &&
nline >= OffsetNumberNext(P_FIRSTDATAKEY(opaque)))

View File

@ -58,7 +58,7 @@
<variablelist>
<varlistentry>
<term>
<function>bt_index_check(index regclass, heapallindexed boolean) returns void</function>
<function>bt_index_check(index regclass, heapallindexed boolean, checkunique boolean) returns void</function>
<indexterm>
<primary>bt_index_check</primary>
</indexterm>
@ -115,7 +115,10 @@ ORDER BY c.relpages DESC LIMIT 10;
that span child/parent relationships, but will verify the
presence of all heap tuples as index tuples within the index
when <parameter>heapallindexed</parameter> is
<literal>true</literal>. When a routine, lightweight test for
<literal>true</literal>. When <parameter>checkunique</parameter>
is <literal>true</literal> <function>bt_index_check</function> will
check that no more than one among duplicate entries in unique
index is visible. When a routine, lightweight test for
corruption is required in a live production environment, using
<function>bt_index_check</function> often provides the best
trade-off between thoroughness of verification and limiting the
@ -126,7 +129,7 @@ ORDER BY c.relpages DESC LIMIT 10;
<varlistentry>
<term>
<function>bt_index_parent_check(index regclass, heapallindexed boolean, rootdescend boolean) returns void</function>
<function>bt_index_parent_check(index regclass, heapallindexed boolean, rootdescend boolean, checkunique boolean) returns void</function>
<indexterm>
<primary>bt_index_parent_check</primary>
</indexterm>
@ -139,7 +142,10 @@ ORDER BY c.relpages DESC LIMIT 10;
Optionally, when the <parameter>heapallindexed</parameter>
argument is <literal>true</literal>, the function verifies the
presence of all heap tuples that should be found within the
index. When the optional <parameter>rootdescend</parameter>
index. When <parameter>checkunique</parameter>
is <literal>true</literal> <function>bt_index_parent_check</function> will
check that no more than one among duplicate entries in unique
index is visible. When the optional <parameter>rootdescend</parameter>
argument is <literal>true</literal>, verification re-finds
tuples on the leaf level by performing a new search from the
root page for each tuple. The checks that can be performed by

View File

@ -432,6 +432,17 @@ PostgreSQL documentation
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--checkunique</option></term>
<listitem>
<para>
For each index with unique constraint checked, verify that no more than
one among duplicate entries is visible in the index using <xref linkend="amcheck"/>'s
<option>checkunique</option> option.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>

View File

@ -102,6 +102,7 @@ typedef struct AmcheckOptions
bool parent_check;
bool rootdescend;
bool heapallindexed;
bool checkunique;
/* heap and btree hybrid option */
bool no_btree_expansion;
@ -132,6 +133,7 @@ static AmcheckOptions opts = {
.parent_check = false,
.rootdescend = false,
.heapallindexed = false,
.checkunique = false,
.no_btree_expansion = false
};
@ -148,6 +150,7 @@ typedef struct DatabaseInfo
{
char *datname;
char *amcheck_schema; /* escaped, quoted literal */
bool is_checkunique;
} DatabaseInfo;
typedef struct RelationInfo
@ -267,6 +270,7 @@ main(int argc, char *argv[])
{"heapallindexed", no_argument, NULL, 11},
{"parent-check", no_argument, NULL, 12},
{"install-missing", optional_argument, NULL, 13},
{"checkunique", no_argument, NULL, 14},
{NULL, 0, NULL, 0}
};
@ -434,6 +438,9 @@ main(int argc, char *argv[])
if (optarg)
opts.install_schema = pg_strdup(optarg);
break;
case 14:
opts.checkunique = true;
break;
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@ -589,6 +596,38 @@ main(int argc, char *argv[])
PQdb(conn), PQgetvalue(result, 0, 1), amcheck_schema);
dat->amcheck_schema = PQescapeIdentifier(conn, amcheck_schema,
strlen(amcheck_schema));
/*
* Check the version of amcheck extension. Skip requested unique
* constraint check with warning if it is not yet supported by amcheck.
*/
if (opts.checkunique == true)
{
/*
* Now amcheck has only major and minor versions in the string but
* we also support revision just in case. Now it is expected to be
* zero.
*/
int vmaj = 0,
vmin = 0,
vrev = 0;
const char *amcheck_version = PQgetvalue(result, 0, 1);
sscanf(amcheck_version, "%d.%d.%d", &vmaj, &vmin, &vrev);
/*
* checkunique option is supported in amcheck since version 1.4
*/
if ((vmaj == 1 && vmin < 4) || vmaj == 0)
{
pg_log_warning("--checkunique option is not supported by amcheck "
"version \"%s\"", amcheck_version);
dat->is_checkunique = false;
}
else
dat->is_checkunique = true;
}
PQclear(result);
compile_relation_list_one_db(conn, &relations, dat, &pagestotal);
@ -845,7 +884,8 @@ prepare_btree_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
if (opts.parent_check)
appendPQExpBuffer(sql,
"SELECT %s.bt_index_parent_check("
"index := c.oid, heapallindexed := %s, rootdescend := %s)"
"index := c.oid, heapallindexed := %s, rootdescend := %s "
"%s)"
"\nFROM pg_catalog.pg_class c, pg_catalog.pg_index i "
"WHERE c.oid = %u "
"AND c.oid = i.indexrelid "
@ -854,11 +894,13 @@ prepare_btree_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
rel->datinfo->amcheck_schema,
(opts.heapallindexed ? "true" : "false"),
(opts.rootdescend ? "true" : "false"),
(rel->datinfo->is_checkunique ? ", checkunique := true" : ""),
rel->reloid);
else
appendPQExpBuffer(sql,
"SELECT %s.bt_index_check("
"index := c.oid, heapallindexed := %s)"
"index := c.oid, heapallindexed := %s "
"%s)"
"\nFROM pg_catalog.pg_class c, pg_catalog.pg_index i "
"WHERE c.oid = %u "
"AND c.oid = i.indexrelid "
@ -866,6 +908,7 @@ prepare_btree_command(PQExpBuffer sql, RelationInfo *rel, PGconn *conn)
"AND i.indisready AND i.indisvalid AND i.indislive",
rel->datinfo->amcheck_schema,
(opts.heapallindexed ? "true" : "false"),
(rel->datinfo->is_checkunique ? ", checkunique := true" : ""),
rel->reloid);
}
@ -1163,6 +1206,7 @@ help(const char *progname)
printf(_(" --heapallindexed check that all heap tuples are found within indexes\n"));
printf(_(" --parent-check check index parent/child relationships\n"));
printf(_(" --rootdescend search from root page to refind tuples\n"));
printf(_(" --checkunique check unique constraint if index is unique\n"));
printf(_("\nConnection options:\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port\n"));

View File

@ -257,6 +257,9 @@ for my $dbname (qw(db1 db2 db3))
CREATE INDEX t1_spgist ON $schema.t1 USING SPGIST (ir);
CREATE INDEX t2_spgist ON $schema.t2 USING SPGIST (ir);
CREATE UNIQUE INDEX t1_btree_unique ON $schema.t1 USING BTREE (i);
CREATE UNIQUE INDEX t2_btree_unique ON $schema.t2 USING BTREE (i);
));
}
}
@ -517,4 +520,51 @@ $node->command_checks_all(
0, [$no_output_re], [$no_output_re],
'pg_amcheck excluding all corrupt schemas');
$node->command_checks_all(
[
@cmd, '-s', 's1', '-i', 't1_btree', '--parent-check',
'--checkunique', 'db1'
],
2,
[$index_missing_relation_fork_re],
[$no_output_re],
'pg_amcheck smoke test --parent-check --checkunique');
$node->command_checks_all(
[
@cmd, '-s', 's1', '-i', 't1_btree', '--heapallindexed',
'--rootdescend', '--checkunique', 'db1'
],
2,
[$index_missing_relation_fork_re],
[$no_output_re],
'pg_amcheck smoke test --heapallindexed --rootdescend --checkunique');
$node->command_checks_all(
[
@cmd, '--checkunique', '-d', 'db1', '-d', 'db2',
'-d', 'db3', '-S', 's*'
],
0,
[$no_output_re],
[$no_output_re],
'pg_amcheck excluding all corrupt schemas with --checkunique option');
#
# Smoke test for checkunique option for not supported versions.
#
$node->safe_psql(
'db3', q(
DROP EXTENSION amcheck;
CREATE EXTENSION amcheck WITH SCHEMA amcheck_schema VERSION '1.3' ;
));
$node->command_checks_all(
[ @cmd, '--checkunique', 'db3' ],
0,
[$no_output_re],
[
qr/pg_amcheck: warning: --checkunique option is not supported by amcheck version "1.3"/
],
'pg_amcheck smoke test --checkunique');
done_testing();

View File

@ -22,14 +22,33 @@ $node->safe_psql(
CREATE FUNCTION int4_asc_cmp (a int4, b int4) RETURNS int LANGUAGE sql AS $$
SELECT CASE WHEN $1 = $2 THEN 0 WHEN $1 > $2 THEN 1 ELSE -1 END; $$;
CREATE FUNCTION ok_cmp (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT
CASE WHEN $1 < $2 THEN -1
WHEN $1 > $2 THEN 1
ELSE 0
END;
$$;
CREATE OPERATOR CLASS int4_fickle_ops FOR TYPE int4 USING btree AS
OPERATOR 1 < (int4, int4), OPERATOR 2 <= (int4, int4),
OPERATOR 3 = (int4, int4), OPERATOR 4 >= (int4, int4),
OPERATOR 5 > (int4, int4), FUNCTION 1 int4_asc_cmp(int4, int4);
CREATE OPERATOR CLASS int4_unique_ops FOR TYPE int4 USING btree AS
OPERATOR 1 < (int4, int4), OPERATOR 2 <= (int4, int4),
OPERATOR 3 = (int4, int4), OPERATOR 4 >= (int4, int4),
OPERATOR 5 > (int4, int4), FUNCTION 1 ok_cmp(int4, int4);
CREATE TABLE int4tbl (i int4);
INSERT INTO int4tbl (SELECT * FROM generate_series(1,1000) gs);
CREATE INDEX fickleidx ON int4tbl USING btree (i int4_fickle_ops);
CREATE UNIQUE INDEX bttest_unique_idx
ON int4tbl
USING btree (i int4_unique_ops)
WITH (deduplicate_items = off);
));
# We have not yet broken the index, so we should get no corruption
@ -57,4 +76,50 @@ $node->command_checks_all(
'pg_amcheck all schemas, tables and indexes reports fickleidx corruption'
);
#
# Check unique constraints
#
# Repair broken opclass for check unique tests.
$node->safe_psql(
'postgres', q(
UPDATE pg_catalog.pg_amproc
SET amproc = 'int4_asc_cmp'::regproc
WHERE amproc = 'int4_desc_cmp'::regproc
));
# We should get no corruptions
$node->command_like(
[ 'pg_amcheck', '--checkunique', '-p', $node->port, 'postgres' ],
qr/^$/,
'pg_amcheck all schemas, tables and indexes reports no corruption');
# Break opclass for check unique tests.
$node->safe_psql(
'postgres', q(
CREATE FUNCTION bad_cmp (int4, int4)
RETURNS int LANGUAGE sql AS
$$
SELECT
CASE WHEN ($1 = 768 AND $2 = 769) OR
($1 = 769 AND $2 = 768) THEN 0
WHEN $1 < $2 THEN -1
WHEN $1 > $2 THEN 1
ELSE 0
END;
$$;
UPDATE pg_catalog.pg_amproc
SET amproc = 'bad_cmp'::regproc
WHERE amproc = 'ok_cmp'::regproc
));
# Unique index corruption should now be reported
$node->command_checks_all(
[ 'pg_amcheck', '--checkunique', '-p', $node->port, 'postgres' ],
2,
[qr/index uniqueness is violated for index "bttest_unique_idx"/],
[],
'pg_amcheck all schemas, tables and indexes reports bttest_unique_idx corruption'
);
done_testing();