diff --git a/src/backend/storage/lmgr/spin.c b/src/backend/storage/lmgr/spin.c index f44e5e2b37..016fab2b03 100644 --- a/src/backend/storage/lmgr/spin.c +++ b/src/backend/storage/lmgr/spin.c @@ -28,8 +28,24 @@ #ifndef HAVE_SPINLOCKS + +/* + * No TAS, so spinlocks are implemented as PGSemaphores. + */ + +#ifndef HAVE_ATOMICS +#define NUM_EMULATION_SEMAPHORES (NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES) +#else +#define NUM_EMULATION_SEMAPHORES (NUM_SPINLOCK_SEMAPHORES) +#endif /* DISABLE_ATOMICS */ + PGSemaphore *SpinlockSemaArray; -#endif + +#else /* !HAVE_SPINLOCKS */ + +#define NUM_EMULATION_SEMAPHORES 0 + +#endif /* HAVE_SPINLOCKS */ /* * Report the amount of shared memory needed to store semaphores for spinlock @@ -38,34 +54,19 @@ PGSemaphore *SpinlockSemaArray; Size SpinlockSemaSize(void) { - return SpinlockSemas() * sizeof(PGSemaphore); + return NUM_EMULATION_SEMAPHORES * sizeof(PGSemaphore); } -#ifdef HAVE_SPINLOCKS - /* * Report number of semaphores needed to support spinlocks. */ int SpinlockSemas(void) { - return 0; + return NUM_EMULATION_SEMAPHORES; } -#else /* !HAVE_SPINLOCKS */ -/* - * No TAS, so spinlocks are implemented as PGSemaphores. - */ - - -/* - * Report number of semaphores needed to support spinlocks. - */ -int -SpinlockSemas(void) -{ - return NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES; -} +#ifndef HAVE_SPINLOCKS /* * Initialize spinlock emulation. @@ -92,23 +93,59 @@ SpinlockSemaInit(void) /* * s_lock.h hardware-spinlock emulation using semaphores * - * We map all spinlocks onto a set of NUM_SPINLOCK_SEMAPHORES semaphores. - * It's okay to map multiple spinlocks onto one semaphore because no process - * should ever hold more than one at a time. We just need enough semaphores - * so that we aren't adding too much extra contention from that. + * We map all spinlocks onto NUM_EMULATION_SEMAPHORES semaphores. It's okay to + * map multiple spinlocks onto one semaphore because no process should ever + * hold more than one at a time. We just need enough semaphores so that we + * aren't adding too much extra contention from that. + * + * There is one exception to the restriction of only holding one spinlock at a + * time, which is that it's ok if emulated atomic operations are nested inside + * spinlocks. To avoid the danger of spinlocks and atomic using the same sema, + * we make sure "normal" spinlocks and atomics backed by spinlocks use + * distinct semaphores (see the nested argument to s_init_lock_sema). * * slock_t is just an int for this implementation; it holds the spinlock - * number from 1..NUM_SPINLOCK_SEMAPHORES. We intentionally ensure that 0 + * number from 1..NUM_EMULATION_SEMAPHORES. We intentionally ensure that 0 * is not a valid value, so that testing with this code can help find * failures to initialize spinlocks. */ +static inline void +s_check_valid(int lockndx) +{ + if (unlikely(lockndx <= 0 || lockndx > NUM_EMULATION_SEMAPHORES)) + elog(ERROR, "invalid spinlock number: %d", lockndx); +} + void s_init_lock_sema(volatile slock_t *lock, bool nested) { static uint32 counter = 0; + uint32 offset; + uint32 sema_total; + uint32 idx; - *lock = ((++counter) % NUM_SPINLOCK_SEMAPHORES) + 1; + if (nested) + { + /* + * To allow nesting atomics inside spinlocked sections, use a + * different spinlock. See comment above. + */ + offset = 1 + NUM_SPINLOCK_SEMAPHORES; + sema_total = NUM_ATOMICS_SEMAPHORES; + } + else + { + offset = 1; + sema_total = NUM_SPINLOCK_SEMAPHORES; + } + + idx = (counter++ % sema_total) + offset; + + /* double check we did things correctly */ + s_check_valid(idx); + + *lock = idx; } void @@ -116,8 +153,8 @@ s_unlock_sema(volatile slock_t *lock) { int lockndx = *lock; - if (lockndx <= 0 || lockndx > NUM_SPINLOCK_SEMAPHORES) - elog(ERROR, "invalid spinlock number: %d", lockndx); + s_check_valid(lockndx); + PGSemaphoreUnlock(SpinlockSemaArray[lockndx - 1]); } @@ -134,8 +171,8 @@ tas_sema(volatile slock_t *lock) { int lockndx = *lock; - if (lockndx <= 0 || lockndx > NUM_SPINLOCK_SEMAPHORES) - elog(ERROR, "invalid spinlock number: %d", lockndx); + s_check_valid(lockndx); + /* Note that TAS macros return 0 if *success* */ return !PGSemaphoreTryLock(SpinlockSemaArray[lockndx - 1]); } diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c index 24c27db013..968f8340d9 100644 --- a/src/test/regress/regress.c +++ b/src/test/regress/regress.c @@ -899,6 +899,56 @@ test_spinlock(void) #endif } +/* + * Verify that performing atomic ops inside a spinlock isn't a + * problem. Realistically that's only going to be a problem when both + * --disable-spinlocks and --disable-atomics are used, but it's cheap enough + * to just always test. + * + * The test works by initializing enough atomics that we'd conflict if there + * were an overlap between a spinlock and an atomic by holding a spinlock + * while manipulating more than NUM_SPINLOCK_SEMAPHORES atomics. + * + * NUM_TEST_ATOMICS doesn't really need to be more than + * NUM_SPINLOCK_SEMAPHORES, but it seems better to test a bit more + * extensively. + */ +static void +test_atomic_spin_nest(void) +{ + slock_t lock; +#define NUM_TEST_ATOMICS (NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES + 27) + pg_atomic_uint32 atomics32[NUM_TEST_ATOMICS]; + pg_atomic_uint64 atomics64[NUM_TEST_ATOMICS]; + + SpinLockInit(&lock); + + for (int i = 0; i < NUM_TEST_ATOMICS; i++) + { + pg_atomic_init_u32(&atomics32[i], 0); + pg_atomic_init_u64(&atomics64[i], 0); + } + + /* just so it's not all zeroes */ + for (int i = 0; i < NUM_TEST_ATOMICS; i++) + { + EXPECT_EQ_U32(pg_atomic_fetch_add_u32(&atomics32[i], i), 0); + EXPECT_EQ_U64(pg_atomic_fetch_add_u64(&atomics64[i], i), 0); + } + + /* test whether we can do atomic op with lock held */ + SpinLockAcquire(&lock); + for (int i = 0; i < NUM_TEST_ATOMICS; i++) + { + EXPECT_EQ_U32(pg_atomic_fetch_sub_u32(&atomics32[i], i), i); + EXPECT_EQ_U32(pg_atomic_read_u32(&atomics32[i]), 0); + EXPECT_EQ_U64(pg_atomic_fetch_sub_u64(&atomics64[i], i), i); + EXPECT_EQ_U64(pg_atomic_read_u64(&atomics64[i]), 0); + } + SpinLockRelease(&lock); +} +#undef NUM_TEST_ATOMICS + PG_FUNCTION_INFO_V1(test_atomic_ops); Datum test_atomic_ops(PG_FUNCTION_ARGS) @@ -915,6 +965,8 @@ test_atomic_ops(PG_FUNCTION_ARGS) */ test_spinlock(); + test_atomic_spin_nest(); + PG_RETURN_BOOL(true); }