From 37b369dc67bc44a3aab5c07e2c0012475efd00f3 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Mon, 4 Mar 2024 09:19:13 +0900 Subject: [PATCH] injection_points: Add wait and wakeup of processes This commit adds two features to the in-core module for injection points: - A new callback called "wait" that can be attached to an injection point to make it wait. - A new SQL function to update the shared state and broadcast the update using a condition variable. This function uses an input an injection point name. This offers the possibility to stop a process in flight and wake it up in a controlled manner, which is useful when implementing tests that aim to trigger scenarios for race conditions (some tests are planned for integration). The logic uses a set of counters with a condition variable to monitor and broadcast the changes. Up to 8 waits can be registered in a single run, which should be plenty enough. Waits can be monitored in pg_stat_activity, based on the injection point name which is registered in a custom wait event under the "Extension" category. The shared memory state used by the module is registered using the DSM registry, and is optional, so there is no need to load the module with shared_preload_libraries to be able to use these features. Author: Michael Paquier Reviewed-by: Andrey Borodin, Bertrand Drouvot Discussion: https://postgr.es/m/ZdLuxBk5hGpol91B@paquier.xyz --- .../injection_points--1.0.sql | 10 ++ .../injection_points/injection_points.c | 155 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 166 insertions(+) diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql index 5944c41716..0a2e59aba7 100644 --- a/src/test/modules/injection_points/injection_points--1.0.sql +++ b/src/test/modules/injection_points/injection_points--1.0.sql @@ -24,6 +24,16 @@ RETURNS void AS 'MODULE_PATHNAME', 'injection_points_run' LANGUAGE C STRICT PARALLEL UNSAFE; +-- +-- injection_points_wakeup() +-- +-- Wakes up a waiting injection point. +-- +CREATE FUNCTION injection_points_wakeup(IN point_name TEXT) +RETURNS void +AS 'MODULE_PATHNAME', 'injection_points_wakeup' +LANGUAGE C STRICT PARALLEL UNSAFE; + -- -- injection_points_detach() -- diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c index e843e6594f..7f52d758c5 100644 --- a/src/test/modules/injection_points/injection_points.c +++ b/src/test/modules/injection_points/injection_points.c @@ -18,18 +18,75 @@ #include "postgres.h" #include "fmgr.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" +#include "storage/dsm_registry.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/wait_event.h" PG_MODULE_MAGIC; +/* Maximum number of waits usable in injection points at once */ +#define INJ_MAX_WAIT 8 +#define INJ_NAME_MAXLEN 64 + +/* Shared state information for injection points. */ +typedef struct InjectionPointSharedState +{ + /* Protects access to other fields */ + slock_t lock; + + /* Counters advancing when injection_points_wakeup() is called */ + uint32 wait_counts[INJ_MAX_WAIT]; + + /* Names of injection points attached to wait counters */ + char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN]; + + /* Condition variable used for waits and wakeups */ + ConditionVariable wait_point; +} InjectionPointSharedState; + +/* Pointer to shared-memory state. */ +static InjectionPointSharedState *inj_state = NULL; + extern PGDLLEXPORT void injection_error(const char *name); extern PGDLLEXPORT void injection_notice(const char *name); +extern PGDLLEXPORT void injection_wait(const char *name); +/* + * Callback for shared memory area initialization. + */ +static void +injection_point_init_state(void *ptr) +{ + InjectionPointSharedState *state = (InjectionPointSharedState *) ptr; + + SpinLockInit(&state->lock); + memset(state->wait_counts, 0, sizeof(state->wait_counts)); + memset(state->name, 0, sizeof(state->name)); + ConditionVariableInit(&state->wait_point); +} + +/* + * Initialize shared memory area for this module. + */ +static void +injection_init_shmem(void) +{ + bool found; + + if (inj_state != NULL) + return; + + inj_state = GetNamedDSMSegment("injection_points", + sizeof(InjectionPointSharedState), + injection_point_init_state, + &found); +} + /* Set of callbacks available to be attached to an injection point. */ void injection_error(const char *name) @@ -43,6 +100,66 @@ injection_notice(const char *name) elog(NOTICE, "notice triggered for injection point %s", name); } +/* Wait on a condition variable, awaken by injection_points_wakeup() */ +void +injection_wait(const char *name) +{ + uint32 old_wait_counts = 0; + int index = -1; + uint32 injection_wait_event = 0; + + if (inj_state == NULL) + injection_init_shmem(); + + /* + * Use the injection point name for this custom wait event. Note that + * this custom wait event name is not released, but we don't care much for + * testing as this should be short-lived. + */ + injection_wait_event = WaitEventExtensionNew(name); + + /* + * Find a free slot to wait for, and register this injection point's name. + */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (inj_state->name[i][0] == '\0') + { + index = i; + strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN); + old_wait_counts = inj_state->wait_counts[i]; + break; + } + } + SpinLockRelease(&inj_state->lock); + + if (index < 0) + elog(ERROR, "could not find free slot for wait of injection point %s ", + name); + + /* And sleep.. */ + ConditionVariablePrepareToSleep(&inj_state->wait_point); + for (;;) + { + uint32 new_wait_counts; + + SpinLockAcquire(&inj_state->lock); + new_wait_counts = inj_state->wait_counts[index]; + SpinLockRelease(&inj_state->lock); + + if (old_wait_counts != new_wait_counts) + break; + ConditionVariableSleep(&inj_state->wait_point, injection_wait_event); + } + ConditionVariableCancelSleep(); + + /* Remove this injection point from the waiters. */ + SpinLockAcquire(&inj_state->lock); + inj_state->name[index][0] = '\0'; + SpinLockRelease(&inj_state->lock); +} + /* * SQL function for creating an injection point. */ @@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS) function = "injection_error"; else if (strcmp(action, "notice") == 0) function = "injection_notice"; + else if (strcmp(action, "wait") == 0) + function = "injection_wait"; else elog(ERROR, "incorrect action \"%s\" for injection point creation", action); @@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * SQL function for waking up an injection point waiting in injection_wait(). + */ +PG_FUNCTION_INFO_V1(injection_points_wakeup); +Datum +injection_points_wakeup(PG_FUNCTION_ARGS) +{ + char *name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + int index = -1; + + if (inj_state == NULL) + injection_init_shmem(); + + /* First bump the wait counter for the injection point to wake up */ + SpinLockAcquire(&inj_state->lock); + for (int i = 0; i < INJ_MAX_WAIT; i++) + { + if (strcmp(name, inj_state->name[i]) == 0) + { + index = i; + break; + } + } + if (index < 0) + { + SpinLockRelease(&inj_state->lock); + elog(ERROR, "could not find injection point %s to wake up", name); + } + inj_state->wait_counts[index]++; + SpinLockRelease(&inj_state->lock); + + /* And broadcast the change to the waiters */ + ConditionVariableBroadcast(&inj_state->wait_point); + PG_RETURN_VOID(); +} + /* * SQL function for dropping an injection point. */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ee40a341d3..782b7d7b1c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function InitializeWorkerForeignScan_function InjectionPointCacheEntry InjectionPointEntry +InjectionPointSharedState InlineCodeBlock InsertStmt Instrumentation