dshash: Add sequential scan support.

Add ability to scan all entries sequentially to dshash. The interface is
similar but a bit different both from that of dynahash and simple dshash
search functions. The most significant differences is that dshash's interfac
always needs a call to dshash_seq_term when scan ends. Another is
locking. Dshash holds partition lock when returning an entry,
dshash_seq_next() also holds lock when returning an entry but callers
shouldn't release it, since the lock is essential to continue a scan. The
seqscan interface allows entry deletion while a scan is in progress using
dshash_delete_current().

Reviewed-By: Andres Freund <andres@anarazel.de>
Author: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
This commit is contained in:
Andres Freund 2022-03-10 12:54:54 -08:00
parent adb5c28adc
commit 352d297dc7
3 changed files with 186 additions and 1 deletions

View File

@ -127,6 +127,10 @@ struct dshash_table
#define NUM_SPLITS(size_log2) \
(size_log2 - DSHASH_NUM_PARTITIONS_LOG2)
/* How many buckets are there in a given size? */
#define NUM_BUCKETS(size_log2) \
(((size_t) 1) << (size_log2))
/* How many buckets are there in each partition at a given size? */
#define BUCKETS_PER_PARTITION(size_log2) \
(((size_t) 1) << NUM_SPLITS(size_log2))
@ -153,6 +157,10 @@ struct dshash_table
#define BUCKET_INDEX_FOR_PARTITION(partition, size_log2) \
((partition) << NUM_SPLITS(size_log2))
/* Choose partition based on bucket index. */
#define PARTITION_FOR_BUCKET_INDEX(bucket_idx, size_log2) \
((bucket_idx) >> NUM_SPLITS(size_log2))
/* The head of the active bucket for a given hash value (lvalue). */
#define BUCKET_FOR_HASH(hash_table, hash) \
(hash_table->buckets[ \
@ -324,7 +332,7 @@ dshash_destroy(dshash_table *hash_table)
ensure_valid_bucket_pointers(hash_table);
/* Free all the entries. */
size = ((size_t) 1) << hash_table->size_log2;
size = NUM_BUCKETS(hash_table->size_log2);
for (i = 0; i < size; ++i)
{
dsa_pointer item_pointer = hash_table->buckets[i];
@ -592,6 +600,159 @@ dshash_memhash(const void *v, size_t size, void *arg)
return tag_hash(v, size);
}
/*
* dshash_seq_init/_next/_term
* Sequentially scan through dshash table and return all the
* elements one by one, return NULL when no more.
*
* dshash_seq_term should always be called when a scan finished.
* The caller may delete returned elements midst of a scan by using
* dshash_delete_current(). exclusive must be true to delete elements.
*/
void
dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
bool exclusive)
{
status->hash_table = hash_table;
status->curbucket = 0;
status->nbuckets = 0;
status->curitem = NULL;
status->pnextitem = InvalidDsaPointer;
status->curpartition = -1;
status->exclusive = exclusive;
}
/*
* Returns the next element.
*
* Returned elements are locked and the caller must not explicitly release
* it. It is released at the next call to dshash_next().
*/
void *
dshash_seq_next(dshash_seq_status *status)
{
dsa_pointer next_item_pointer;
if (status->curitem == NULL)
{
int partition;
Assert(status->curbucket == 0);
Assert(!status->hash_table->find_locked);
/* first shot. grab the first item. */
partition =
PARTITION_FOR_BUCKET_INDEX(status->curbucket,
status->hash_table->size_log2);
LWLockAcquire(PARTITION_LOCK(status->hash_table, partition),
status->exclusive ? LW_EXCLUSIVE : LW_SHARED);
status->curpartition = partition;
/* resize doesn't happen from now until seq scan ends */
status->nbuckets =
NUM_BUCKETS(status->hash_table->control->size_log2);
ensure_valid_bucket_pointers(status->hash_table);
next_item_pointer = status->hash_table->buckets[status->curbucket];
}
else
next_item_pointer = status->pnextitem;
Assert(LWLockHeldByMeInMode(PARTITION_LOCK(status->hash_table,
status->curpartition),
status->exclusive ? LW_EXCLUSIVE : LW_SHARED));
/* Move to the next bucket if we finished the current bucket */
while (!DsaPointerIsValid(next_item_pointer))
{
int next_partition;
if (++status->curbucket >= status->nbuckets)
{
/* all buckets have been scanned. finish. */
return NULL;
}
/* Check if move to the next partition */
next_partition =
PARTITION_FOR_BUCKET_INDEX(status->curbucket,
status->hash_table->size_log2);
if (status->curpartition != next_partition)
{
/*
* Move to the next partition. Lock the next partition then
* release the current, not in the reverse order to avoid
* concurrent resizing. Avoid dead lock by taking lock in the
* same order with resize().
*/
LWLockAcquire(PARTITION_LOCK(status->hash_table,
next_partition),
status->exclusive ? LW_EXCLUSIVE : LW_SHARED);
LWLockRelease(PARTITION_LOCK(status->hash_table,
status->curpartition));
status->curpartition = next_partition;
}
next_item_pointer = status->hash_table->buckets[status->curbucket];
}
status->curitem =
dsa_get_address(status->hash_table->area, next_item_pointer);
status->hash_table->find_locked = true;
status->hash_table->find_exclusively_locked = status->exclusive;
/*
* The caller may delete the item. Store the next item in case of
* deletion.
*/
status->pnextitem = status->curitem->next;
return ENTRY_FROM_ITEM(status->curitem);
}
/*
* Terminates the seqscan and release all locks.
*
* Should be always called when finishing or exiting a seqscan.
*/
void
dshash_seq_term(dshash_seq_status *status)
{
status->hash_table->find_locked = false;
status->hash_table->find_exclusively_locked = false;
if (status->curpartition >= 0)
LWLockRelease(PARTITION_LOCK(status->hash_table, status->curpartition));
}
/* Remove the current entry while a seq scan. */
void
dshash_delete_current(dshash_seq_status *status)
{
dshash_table *hash_table = status->hash_table;
dshash_table_item *item = status->curitem;
size_t partition PG_USED_FOR_ASSERTS_ONLY;
partition = PARTITION_FOR_HASH(item->hash);
Assert(status->exclusive);
Assert(hash_table->control->magic == DSHASH_MAGIC);
Assert(hash_table->find_locked);
Assert(hash_table->find_exclusively_locked);
Assert(LWLockHeldByMeInMode(PARTITION_LOCK(hash_table, partition),
LW_EXCLUSIVE));
delete_item(hash_table, item);
}
/* Get the current entry while a seq scan. */
void *
dshash_get_current(dshash_seq_status *status)
{
return ENTRY_FROM_ITEM(status->curitem);
}
/*
* Print debugging information about the internal state of the hash table to
* stderr. The caller must hold no partition locks.

View File

@ -59,6 +59,21 @@ typedef struct dshash_parameters
struct dshash_table_item;
typedef struct dshash_table_item dshash_table_item;
/*
* Sequential scan state. The detail is exposed to let users know the storage
* size but it should be considered as an opaque type by callers.
*/
typedef struct dshash_seq_status
{
dshash_table *hash_table; /* dshash table working on */
int curbucket; /* bucket number we are at */
int nbuckets; /* total number of buckets in the dshash */
dshash_table_item *curitem; /* item we are currently at */
dsa_pointer pnextitem; /* dsa-pointer to the next item */
int curpartition; /* partition number we are at */
bool exclusive; /* locking mode */
} dshash_seq_status;
/* Creating, sharing and destroying from hash tables. */
extern dshash_table *dshash_create(dsa_area *area,
const dshash_parameters *params,
@ -80,6 +95,14 @@ extern bool dshash_delete_key(dshash_table *hash_table, const void *key);
extern void dshash_delete_entry(dshash_table *hash_table, void *entry);
extern void dshash_release_lock(dshash_table *hash_table, void *entry);
/* seq scan support */
extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
bool exclusive);
extern void *dshash_seq_next(dshash_seq_status *status);
extern void dshash_seq_term(dshash_seq_status *status);
extern void dshash_delete_current(dshash_seq_status *status);
extern void *dshash_get_current(dshash_seq_status *status);
/* Convenience hash and compare functions wrapping memcmp and tag_hash. */
extern int dshash_memcmp(const void *a, const void *b, size_t size, void *arg);
extern dshash_hash dshash_memhash(const void *v, size_t size, void *arg);

View File

@ -3103,6 +3103,7 @@ dshash_hash
dshash_hash_function
dshash_parameters
dshash_partition
dshash_seq_status
dshash_table
dshash_table_control
dshash_table_handle