postgresql/contrib/bloom/blscan.c
Tom Lane ee4af347ba Measure Bloom index signature-length reloption in bits, not words.
Per discussion, this is a more understandable and future-proof way of
exposing the setting to users.  On-disk, we can still store it in words,
so as to not break on-disk compatibility with beta1.

Along the way, clean up the code associated with Bloom reloptions.
Provide explicit macros for default and maximum lengths rather than
having magic numbers buried in multiple places in the code.  Drop
the adjustBloomOptions() code altogether: it was useless in view of
the fact that reloptions.c already performed default-substitution and
range checking for the options.  Rename a couple of macros and types
for more clarity.

Discussion: <23767.1464926580@sss.pgh.pa.us>
2016-06-03 10:52:45 -04:00

174 lines
3.5 KiB
C

/*-------------------------------------------------------------------------
*
* blscan.c
* Bloom index scan functions.
*
* Copyright (c) 2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/bloom/blscan.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/relscan.h"
#include "pgstat.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "bloom.h"
/*
* Begin scan of bloom index.
*/
IndexScanDesc
blbeginscan(Relation r, int nkeys, int norderbys)
{
IndexScanDesc scan;
BloomScanOpaque so;
scan = RelationGetIndexScan(r, nkeys, norderbys);
so = (BloomScanOpaque) palloc(sizeof(BloomScanOpaqueData));
initBloomState(&so->state, scan->indexRelation);
so->sign = NULL;
scan->opaque = so;
return scan;
}
/*
* Rescan a bloom index.
*/
void
blrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys)
{
BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
if (so->sign)
pfree(so->sign);
so->sign = NULL;
if (scankey && scan->numberOfKeys > 0)
{
memmove(scan->keyData, scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
}
}
/*
* End scan of bloom index.
*/
void
blendscan(IndexScanDesc scan)
{
BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
if (so->sign)
pfree(so->sign);
so->sign = NULL;
}
/*
* Insert all matching tuples into to a bitmap.
*/
int64
blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
{
int64 ntids = 0;
BlockNumber blkno = BLOOM_HEAD_BLKNO,
npages;
int i;
BufferAccessStrategy bas;
BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
if (so->sign == NULL)
{
/* New search: have to calculate search signature */
ScanKey skey = scan->keyData;
so->sign = palloc0(sizeof(BloomSignatureWord) * so->state.opts.bloomLength);
for (i = 0; i < scan->numberOfKeys; i++)
{
/*
* Assume bloom-indexable operators to be strict, so nothing could
* be found for NULL key.
*/
if (skey->sk_flags & SK_ISNULL)
{
pfree(so->sign);
so->sign = NULL;
return 0;
}
/* Add next value to the signature */
signValue(&so->state, so->sign, skey->sk_argument,
skey->sk_attno - 1);
skey++;
}
}
/*
* We're going to read the whole index. This is why we use appropriate
* buffer access strategy.
*/
bas = GetAccessStrategy(BAS_BULKREAD);
npages = RelationGetNumberOfBlocks(scan->indexRelation);
for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
{
Buffer buffer;
Page page;
buffer = ReadBufferExtended(scan->indexRelation, MAIN_FORKNUM,
blkno, RBM_NORMAL, bas);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
if (!BloomPageIsDeleted(page))
{
OffsetNumber offset,
maxOffset = BloomPageGetMaxOffset(page);
for (offset = 1; offset <= maxOffset; offset++)
{
BloomTuple *itup = BloomPageGetTuple(&so->state, page, offset);
bool res = true;
/* Check index signature with scan signature */
for (i = 0; i < so->state.opts.bloomLength; i++)
{
if ((itup->sign[i] & so->sign[i]) != so->sign[i])
{
res = false;
break;
}
}
/* Add matching tuples to bitmap */
if (res)
{
tbm_add_tuples(tbm, &itup->heapPtr, 1, true);
ntids++;
}
}
}
UnlockReleaseBuffer(buffer);
CHECK_FOR_INTERRUPTS();
}
FreeAccessStrategy(bas);
return ntids;
}