From 352d297dc74feb0bf0dcb255cc0dfaaed2b96c1e Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 10 Mar 2022 12:54:54 -0800 Subject: [PATCH] dshash: Add sequential scan support. Add ability to scan all entries sequentially to dshash. The interface is similar but a bit different both from that of dynahash and simple dshash search functions. The most significant differences is that dshash's interfac always needs a call to dshash_seq_term when scan ends. Another is locking. Dshash holds partition lock when returning an entry, dshash_seq_next() also holds lock when returning an entry but callers shouldn't release it, since the lock is essential to continue a scan. The seqscan interface allows entry deletion while a scan is in progress using dshash_delete_current(). Reviewed-By: Andres Freund Author: Kyotaro Horiguchi --- src/backend/lib/dshash.c | 163 ++++++++++++++++++++++++++++++- src/include/lib/dshash.h | 23 +++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 186 insertions(+), 1 deletion(-) diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c index decedb2605..84a9db47c7 100644 --- a/src/backend/lib/dshash.c +++ b/src/backend/lib/dshash.c @@ -127,6 +127,10 @@ struct dshash_table #define NUM_SPLITS(size_log2) \ (size_log2 - DSHASH_NUM_PARTITIONS_LOG2) +/* How many buckets are there in a given size? */ +#define NUM_BUCKETS(size_log2) \ + (((size_t) 1) << (size_log2)) + /* How many buckets are there in each partition at a given size? */ #define BUCKETS_PER_PARTITION(size_log2) \ (((size_t) 1) << NUM_SPLITS(size_log2)) @@ -153,6 +157,10 @@ struct dshash_table #define BUCKET_INDEX_FOR_PARTITION(partition, size_log2) \ ((partition) << NUM_SPLITS(size_log2)) +/* Choose partition based on bucket index. */ +#define PARTITION_FOR_BUCKET_INDEX(bucket_idx, size_log2) \ + ((bucket_idx) >> NUM_SPLITS(size_log2)) + /* The head of the active bucket for a given hash value (lvalue). */ #define BUCKET_FOR_HASH(hash_table, hash) \ (hash_table->buckets[ \ @@ -324,7 +332,7 @@ dshash_destroy(dshash_table *hash_table) ensure_valid_bucket_pointers(hash_table); /* Free all the entries. */ - size = ((size_t) 1) << hash_table->size_log2; + size = NUM_BUCKETS(hash_table->size_log2); for (i = 0; i < size; ++i) { dsa_pointer item_pointer = hash_table->buckets[i]; @@ -592,6 +600,159 @@ dshash_memhash(const void *v, size_t size, void *arg) return tag_hash(v, size); } +/* + * dshash_seq_init/_next/_term + * Sequentially scan through dshash table and return all the + * elements one by one, return NULL when no more. + * + * dshash_seq_term should always be called when a scan finished. + * The caller may delete returned elements midst of a scan by using + * dshash_delete_current(). exclusive must be true to delete elements. + */ +void +dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, + bool exclusive) +{ + status->hash_table = hash_table; + status->curbucket = 0; + status->nbuckets = 0; + status->curitem = NULL; + status->pnextitem = InvalidDsaPointer; + status->curpartition = -1; + status->exclusive = exclusive; +} + +/* + * Returns the next element. + * + * Returned elements are locked and the caller must not explicitly release + * it. It is released at the next call to dshash_next(). + */ +void * +dshash_seq_next(dshash_seq_status *status) +{ + dsa_pointer next_item_pointer; + + if (status->curitem == NULL) + { + int partition; + + Assert(status->curbucket == 0); + Assert(!status->hash_table->find_locked); + + /* first shot. grab the first item. */ + partition = + PARTITION_FOR_BUCKET_INDEX(status->curbucket, + status->hash_table->size_log2); + LWLockAcquire(PARTITION_LOCK(status->hash_table, partition), + status->exclusive ? LW_EXCLUSIVE : LW_SHARED); + status->curpartition = partition; + + /* resize doesn't happen from now until seq scan ends */ + status->nbuckets = + NUM_BUCKETS(status->hash_table->control->size_log2); + ensure_valid_bucket_pointers(status->hash_table); + + next_item_pointer = status->hash_table->buckets[status->curbucket]; + } + else + next_item_pointer = status->pnextitem; + + Assert(LWLockHeldByMeInMode(PARTITION_LOCK(status->hash_table, + status->curpartition), + status->exclusive ? LW_EXCLUSIVE : LW_SHARED)); + + /* Move to the next bucket if we finished the current bucket */ + while (!DsaPointerIsValid(next_item_pointer)) + { + int next_partition; + + if (++status->curbucket >= status->nbuckets) + { + /* all buckets have been scanned. finish. */ + return NULL; + } + + /* Check if move to the next partition */ + next_partition = + PARTITION_FOR_BUCKET_INDEX(status->curbucket, + status->hash_table->size_log2); + + if (status->curpartition != next_partition) + { + /* + * Move to the next partition. Lock the next partition then + * release the current, not in the reverse order to avoid + * concurrent resizing. Avoid dead lock by taking lock in the + * same order with resize(). + */ + LWLockAcquire(PARTITION_LOCK(status->hash_table, + next_partition), + status->exclusive ? LW_EXCLUSIVE : LW_SHARED); + LWLockRelease(PARTITION_LOCK(status->hash_table, + status->curpartition)); + status->curpartition = next_partition; + } + + next_item_pointer = status->hash_table->buckets[status->curbucket]; + } + + status->curitem = + dsa_get_address(status->hash_table->area, next_item_pointer); + status->hash_table->find_locked = true; + status->hash_table->find_exclusively_locked = status->exclusive; + + /* + * The caller may delete the item. Store the next item in case of + * deletion. + */ + status->pnextitem = status->curitem->next; + + return ENTRY_FROM_ITEM(status->curitem); +} + +/* + * Terminates the seqscan and release all locks. + * + * Should be always called when finishing or exiting a seqscan. + */ +void +dshash_seq_term(dshash_seq_status *status) +{ + status->hash_table->find_locked = false; + status->hash_table->find_exclusively_locked = false; + + if (status->curpartition >= 0) + LWLockRelease(PARTITION_LOCK(status->hash_table, status->curpartition)); +} + +/* Remove the current entry while a seq scan. */ +void +dshash_delete_current(dshash_seq_status *status) +{ + dshash_table *hash_table = status->hash_table; + dshash_table_item *item = status->curitem; + size_t partition PG_USED_FOR_ASSERTS_ONLY; + + partition = PARTITION_FOR_HASH(item->hash); + + Assert(status->exclusive); + Assert(hash_table->control->magic == DSHASH_MAGIC); + Assert(hash_table->find_locked); + Assert(hash_table->find_exclusively_locked); + Assert(LWLockHeldByMeInMode(PARTITION_LOCK(hash_table, partition), + LW_EXCLUSIVE)); + + delete_item(hash_table, item); +} + +/* Get the current entry while a seq scan. */ +void * +dshash_get_current(dshash_seq_status *status) +{ + return ENTRY_FROM_ITEM(status->curitem); +} + /* * Print debugging information about the internal state of the hash table to * stderr. The caller must hold no partition locks. diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h index f3c57e76bf..caeb60ad72 100644 --- a/src/include/lib/dshash.h +++ b/src/include/lib/dshash.h @@ -59,6 +59,21 @@ typedef struct dshash_parameters struct dshash_table_item; typedef struct dshash_table_item dshash_table_item; +/* + * Sequential scan state. The detail is exposed to let users know the storage + * size but it should be considered as an opaque type by callers. + */ +typedef struct dshash_seq_status +{ + dshash_table *hash_table; /* dshash table working on */ + int curbucket; /* bucket number we are at */ + int nbuckets; /* total number of buckets in the dshash */ + dshash_table_item *curitem; /* item we are currently at */ + dsa_pointer pnextitem; /* dsa-pointer to the next item */ + int curpartition; /* partition number we are at */ + bool exclusive; /* locking mode */ +} dshash_seq_status; + /* Creating, sharing and destroying from hash tables. */ extern dshash_table *dshash_create(dsa_area *area, const dshash_parameters *params, @@ -80,6 +95,14 @@ extern bool dshash_delete_key(dshash_table *hash_table, const void *key); extern void dshash_delete_entry(dshash_table *hash_table, void *entry); extern void dshash_release_lock(dshash_table *hash_table, void *entry); +/* seq scan support */ +extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, + bool exclusive); +extern void *dshash_seq_next(dshash_seq_status *status); +extern void dshash_seq_term(dshash_seq_status *status); +extern void dshash_delete_current(dshash_seq_status *status); +extern void *dshash_get_current(dshash_seq_status *status); + /* Convenience hash and compare functions wrapping memcmp and tag_hash. */ extern int dshash_memcmp(const void *a, const void *b, size_t size, void *arg); extern dshash_hash dshash_memhash(const void *v, size_t size, void *arg); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d9b83f744f..eaf3e7a8d4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3103,6 +3103,7 @@ dshash_hash dshash_hash_function dshash_parameters dshash_partition +dshash_seq_status dshash_table dshash_table_control dshash_table_handle