injection_points: Add wait and wakeup of processes

This commit adds two features to the in-core module for injection
points:
- A new callback called "wait" that can be attached to an injection
point to make it wait.
- A new SQL function to update the shared state and broadcast the update
using a condition variable.  This function uses an input an injection
point name.

This offers the possibility to stop a process in flight and wake it up
in a controlled manner, which is useful when implementing tests that aim
to trigger scenarios for race conditions (some tests are planned for
integration).  The logic uses a set of counters with a condition
variable to monitor and broadcast the changes.  Up to 8 waits can be
registered in a single run, which should be plenty enough.  Waits can be
monitored in pg_stat_activity, based on the injection point name which
is registered in a custom wait event under the "Extension" category.

The shared memory state used by the module is registered using the DSM
registry, and is optional, so there is no need to load the module with
shared_preload_libraries to be able to use these features.

Author: Michael Paquier
Reviewed-by: Andrey Borodin, Bertrand Drouvot
Discussion: https://postgr.es/m/ZdLuxBk5hGpol91B@paquier.xyz
This commit is contained in:
Michael Paquier 2024-03-04 09:19:13 +09:00
parent 024c521117
commit 37b369dc67
3 changed files with 166 additions and 0 deletions

View File

@ -24,6 +24,16 @@ RETURNS void
AS 'MODULE_PATHNAME', 'injection_points_run'
LANGUAGE C STRICT PARALLEL UNSAFE;
--
-- injection_points_wakeup()
--
-- Wakes up a waiting injection point.
--
CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
RETURNS void
AS 'MODULE_PATHNAME', 'injection_points_wakeup'
LANGUAGE C STRICT PARALLEL UNSAFE;
--
-- injection_points_detach()
--

View File

@ -18,18 +18,75 @@
#include "postgres.h"
#include "fmgr.h"
#include "storage/condition_variable.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/dsm_registry.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/wait_event.h"
PG_MODULE_MAGIC;
/* Maximum number of waits usable in injection points at once */
#define INJ_MAX_WAIT 8
#define INJ_NAME_MAXLEN 64
/* Shared state information for injection points. */
typedef struct InjectionPointSharedState
{
/* Protects access to other fields */
slock_t lock;
/* Counters advancing when injection_points_wakeup() is called */
uint32 wait_counts[INJ_MAX_WAIT];
/* Names of injection points attached to wait counters */
char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
/* Condition variable used for waits and wakeups */
ConditionVariable wait_point;
} InjectionPointSharedState;
/* Pointer to shared-memory state. */
static InjectionPointSharedState *inj_state = NULL;
extern PGDLLEXPORT void injection_error(const char *name);
extern PGDLLEXPORT void injection_notice(const char *name);
extern PGDLLEXPORT void injection_wait(const char *name);
/*
* Callback for shared memory area initialization.
*/
static void
injection_point_init_state(void *ptr)
{
InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
SpinLockInit(&state->lock);
memset(state->wait_counts, 0, sizeof(state->wait_counts));
memset(state->name, 0, sizeof(state->name));
ConditionVariableInit(&state->wait_point);
}
/*
* Initialize shared memory area for this module.
*/
static void
injection_init_shmem(void)
{
bool found;
if (inj_state != NULL)
return;
inj_state = GetNamedDSMSegment("injection_points",
sizeof(InjectionPointSharedState),
injection_point_init_state,
&found);
}
/* Set of callbacks available to be attached to an injection point. */
void
injection_error(const char *name)
@ -43,6 +100,66 @@ injection_notice(const char *name)
elog(NOTICE, "notice triggered for injection point %s", name);
}
/* Wait on a condition variable, awaken by injection_points_wakeup() */
void
injection_wait(const char *name)
{
uint32 old_wait_counts = 0;
int index = -1;
uint32 injection_wait_event = 0;
if (inj_state == NULL)
injection_init_shmem();
/*
* Use the injection point name for this custom wait event. Note that
* this custom wait event name is not released, but we don't care much for
* testing as this should be short-lived.
*/
injection_wait_event = WaitEventExtensionNew(name);
/*
* Find a free slot to wait for, and register this injection point's name.
*/
SpinLockAcquire(&inj_state->lock);
for (int i = 0; i < INJ_MAX_WAIT; i++)
{
if (inj_state->name[i][0] == '\0')
{
index = i;
strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
old_wait_counts = inj_state->wait_counts[i];
break;
}
}
SpinLockRelease(&inj_state->lock);
if (index < 0)
elog(ERROR, "could not find free slot for wait of injection point %s ",
name);
/* And sleep.. */
ConditionVariablePrepareToSleep(&inj_state->wait_point);
for (;;)
{
uint32 new_wait_counts;
SpinLockAcquire(&inj_state->lock);
new_wait_counts = inj_state->wait_counts[index];
SpinLockRelease(&inj_state->lock);
if (old_wait_counts != new_wait_counts)
break;
ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
}
ConditionVariableCancelSleep();
/* Remove this injection point from the waiters. */
SpinLockAcquire(&inj_state->lock);
inj_state->name[index][0] = '\0';
SpinLockRelease(&inj_state->lock);
}
/*
* SQL function for creating an injection point.
*/
@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
function = "injection_error";
else if (strcmp(action, "notice") == 0)
function = "injection_notice";
else if (strcmp(action, "wait") == 0)
function = "injection_wait";
else
elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* SQL function for waking up an injection point waiting in injection_wait().
*/
PG_FUNCTION_INFO_V1(injection_points_wakeup);
Datum
injection_points_wakeup(PG_FUNCTION_ARGS)
{
char *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
int index = -1;
if (inj_state == NULL)
injection_init_shmem();
/* First bump the wait counter for the injection point to wake up */
SpinLockAcquire(&inj_state->lock);
for (int i = 0; i < INJ_MAX_WAIT; i++)
{
if (strcmp(name, inj_state->name[i]) == 0)
{
index = i;
break;
}
}
if (index < 0)
{
SpinLockRelease(&inj_state->lock);
elog(ERROR, "could not find injection point %s to wake up", name);
}
inj_state->wait_counts[index]++;
SpinLockRelease(&inj_state->lock);
/* And broadcast the change to the waiters */
ConditionVariableBroadcast(&inj_state->wait_point);
PG_RETURN_VOID();
}
/*
* SQL function for dropping an injection point.
*/

View File

@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function
InitializeWorkerForeignScan_function
InjectionPointCacheEntry
InjectionPointEntry
InjectionPointSharedState
InlineCodeBlock
InsertStmt
Instrumentation