From c9af054653077699189884c336a65e23a7c8aebb Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Mon, 31 Jul 2023 17:09:24 +0900 Subject: [PATCH] Support custom wait events for wait event type "Extension" Two backend routines are added to allow extension to allocate and define custom wait events, all of these being allocated in the type "Extension": * WaitEventExtensionNew(), that allocates a wait event ID computed from a counter in shared memory. * WaitEventExtensionRegisterName(), to associate a custom string to the wait event ID allocated. Note that this includes an example of how to use this new facility in worker_spi with tests in TAP for various scenarios, and some documentation about how to use them. Any code in the tree that currently uses WAIT_EVENT_EXTENSION could switch to this new facility to define custom wait events. This is left as work for future patches. Author: Masahiro Ikeda Reviewed-by: Andres Freund, Michael Paquier, Tristan Partin, Bharath Rupireddy Discussion: https://postgr.es/m/b9f5411acda0cf15c8fbb767702ff43e@oss.nttdata.com --- doc/src/sgml/monitoring.sgml | 11 +- doc/src/sgml/xfunc.sgml | 45 +++++ src/backend/storage/ipc/ipci.c | 3 + .../activity/generate-wait_event_types.pl | 7 +- src/backend/utils/activity/wait_event.c | 178 +++++++++++++++++- .../utils/activity/wait_event_names.txt | 2 +- src/include/utils/wait_event.h | 26 +++ .../modules/worker_spi/t/001_worker_spi.pl | 34 +++- .../modules/worker_spi/worker_spi--1.0.sql | 5 + src/test/modules/worker_spi/worker_spi.c | 103 +++++++++- src/tools/pgindent/typedefs.list | 2 + 11 files changed, 393 insertions(+), 23 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 588b720f57..f4fc5d814f 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1117,11 +1117,14 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - Extensions can add LWLock types to the list shown in - . In some cases, the name + Extensions can add Extension and + LWLock types + to the list shown in and + . In some cases, the name assigned by an extension will not be available in all server processes; - so an LWLock wait event might be reported as - just extension rather than the + so an Extension or LWLock wait + event might be reported as just + extension rather than the extension-assigned name. diff --git a/doc/src/sgml/xfunc.sgml b/doc/src/sgml/xfunc.sgml index 9620ea9ae3..d6345a775b 100644 --- a/doc/src/sgml/xfunc.sgml +++ b/doc/src/sgml/xfunc.sgml @@ -3453,6 +3453,51 @@ if (!ptr) + + Shared Memory and Custom Wait Events + + + Add-ins can define custom wait events under the wait event type + Extension. The add-in's shared library must be + preloaded by specifying it in shared_preload_libraries, + and register a shmem_request_hook and a + shmem_startup_hook in its + _PG_init function. + shmem_request_hook can request a shared memory size + to be later used at startup by calling: + +void RequestAddinShmemSpace(int size) + + + + shmem_startup_hook can allocate in shared memory + custom wait events by calling while holding the LWLock + AddinShmemInitLock to avoid any race conditions: + +uint32 WaitEventExtensionNew(void) + + Next, each process needs to associate the wait event allocated previously + to a user-facing custom string, which is something done by calling: + +void WaitEventExtensionRegisterName(uint32 wait_event_info, const char *wait_event_name) + + An example can be found in src/test/modules/worker_spi + in the PostgreSQL source tree. + + + Custom wait events can be viewed in + pg_stat_activity: + +=# SELECT wait_event_type, wait_event FROM pg_stat_activity + WHERE backend_type ~ 'worker_spi'; + wait_event_type | wait_event +-----------------+----------------- + Extension | worker_spi_main +(1 row) + + + + Using C++ for Extensibility diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index cc387c00a1..5551afffc0 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -49,6 +49,7 @@ #include "storage/spin.h" #include "utils/guc.h" #include "utils/snapmgr.h" +#include "utils/wait_event.h" /* GUCs */ int shared_memory_type = DEFAULT_SHARED_MEMORY_TYPE; @@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, StatsShmemSize()); + size = add_size(size, WaitEventExtensionShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -301,6 +303,7 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); StatsShmemInit(); + WaitEventExtensionShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/utils/activity/generate-wait_event_types.pl b/src/backend/utils/activity/generate-wait_event_types.pl index f63c991051..56335e8730 100644 --- a/src/backend/utils/activity/generate-wait_event_types.pl +++ b/src/backend/utils/activity/generate-wait_event_types.pl @@ -133,10 +133,11 @@ if ($gen_code) foreach my $waitclass (sort { uc($a) cmp uc($b) } keys %hashwe) { - # Don't generate .c and .h files for LWLock and Lock, these are - # handled independently. + # Don't generate .c and .h files for Extension, LWLock and + # Lock, these are handled independently. next - if ( $waitclass eq 'WaitEventLWLock' + if ( $waitclass eq 'WaitEventExtension' + || $waitclass eq 'WaitEventLWLock' || $waitclass eq 'WaitEventLock'); my $last = $waitclass; diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 59177de7a0..b3596ece80 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -22,15 +22,18 @@ */ #include "postgres.h" +#include "miscadmin.h" +#include "port/pg_bitutils.h" #include "storage/lmgr.h" /* for GetLockNameFromTagType */ #include "storage/lwlock.h" /* for GetLWLockIdentifier */ +#include "storage/spin.h" +#include "utils/memutils.h" #include "utils/wait_event.h" static const char *pgstat_get_wait_activity(WaitEventActivity w); static const char *pgstat_get_wait_bufferpin(WaitEventBufferPin w); static const char *pgstat_get_wait_client(WaitEventClient w); -static const char *pgstat_get_wait_extension(WaitEventExtension w); static const char *pgstat_get_wait_ipc(WaitEventIPC w); static const char *pgstat_get_wait_timeout(WaitEventTimeout w); static const char *pgstat_get_wait_io(WaitEventIO w); @@ -42,6 +45,169 @@ uint32 *my_wait_event_info = &local_my_wait_event_info; #define WAIT_EVENT_CLASS_MASK 0xFF000000 #define WAIT_EVENT_ID_MASK 0x0000FFFF +/* dynamic allocation counter for custom wait events in extensions */ +typedef struct WaitEventExtensionCounterData +{ + int nextId; /* next ID to assign */ + slock_t mutex; /* protects the counter */ +} WaitEventExtensionCounterData; + +/* pointer to the shared memory */ +static WaitEventExtensionCounterData *WaitEventExtensionCounter; + +/* first event ID of custom wait events for extensions */ +#define NUM_BUILTIN_WAIT_EVENT_EXTENSION \ + (WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED - WAIT_EVENT_EXTENSION) + +/* + * This is indexed by event ID minus NUM_BUILTIN_WAIT_EVENT_EXTENSION, and + * stores the names of all dynamically-created event IDs known to the current + * process. Any unused entries in the array will contain NULL. + */ +static const char **WaitEventExtensionNames = NULL; +static int WaitEventExtensionNamesAllocated = 0; + +static const char *GetWaitEventExtensionIdentifier(uint16 eventId); + +/* + * Return the space for dynamic allocation counter. + */ +Size +WaitEventExtensionShmemSize(void) +{ + return sizeof(WaitEventExtensionCounterData); +} + +/* + * Allocate shmem space for dynamic allocation counter. + */ +void +WaitEventExtensionShmemInit(void) +{ + bool found; + + WaitEventExtensionCounter = (WaitEventExtensionCounterData *) + ShmemInitStruct("WaitEventExtensionCounterData", + WaitEventExtensionShmemSize(), &found); + + if (!found) + { + /* initialize the allocation counter and its spinlock. */ + WaitEventExtensionCounter->nextId = NUM_BUILTIN_WAIT_EVENT_EXTENSION; + SpinLockInit(&WaitEventExtensionCounter->mutex); + } +} + +/* + * Allocate a new event ID and return the wait event. + */ +uint32 +WaitEventExtensionNew(void) +{ + uint16 eventId; + + Assert(LWLockHeldByMeInMode(AddinShmemInitLock, LW_EXCLUSIVE)); + + SpinLockAcquire(&WaitEventExtensionCounter->mutex); + + if (WaitEventExtensionCounter->nextId > PG_UINT16_MAX) + { + SpinLockRelease(&WaitEventExtensionCounter->mutex); + ereport(ERROR, + errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("too many wait events for extensions")); + } + + eventId = WaitEventExtensionCounter->nextId++; + + SpinLockRelease(&WaitEventExtensionCounter->mutex); + + return PG_WAIT_EXTENSION | eventId; +} + +/* + * Register a dynamic wait event name for extension in the lookup table + * of the current process. + * + * This routine will save a pointer to the wait event name passed as an argument, + * so the name should be allocated in a backend-lifetime context + * (shared memory, TopMemoryContext, static constant, or similar). + * + * The "wait_event_name" will be user-visible as a wait event name, so try to + * use a name that fits the style for those. + */ +void +WaitEventExtensionRegisterName(uint32 wait_event_info, + const char *wait_event_name) +{ + uint32 classId; + uint16 eventId; + + classId = wait_event_info & WAIT_EVENT_CLASS_MASK; + eventId = wait_event_info & WAIT_EVENT_ID_MASK; + + /* Check the wait event class. */ + if (classId != PG_WAIT_EXTENSION) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid wait event class %u", classId)); + + /* This should only be called for user-defined wait event. */ + if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid wait event ID %u", eventId)); + + /* Convert to array index. */ + eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION; + + /* If necessary, create or enlarge array. */ + if (eventId >= WaitEventExtensionNamesAllocated) + { + uint32 newalloc; + + newalloc = pg_nextpower2_32(Max(8, eventId + 1)); + + if (WaitEventExtensionNames == NULL) + WaitEventExtensionNames = (const char **) + MemoryContextAllocZero(TopMemoryContext, + newalloc * sizeof(char *)); + else + WaitEventExtensionNames = + repalloc0_array(WaitEventExtensionNames, const char *, + WaitEventExtensionNamesAllocated, newalloc); + WaitEventExtensionNamesAllocated = newalloc; + } + + WaitEventExtensionNames[eventId] = wait_event_name; +} + +/* + * Return the name of an wait event ID for extension. + */ +static const char * +GetWaitEventExtensionIdentifier(uint16 eventId) +{ + /* Built-in event? */ + if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION) + return "Extension"; + + /* + * It is a user-defined wait event, so look at WaitEventExtensionNames[]. + * However, it is possible that the name has never been registered by + * calling WaitEventExtensionRegisterName() in the current process, in + * which case give up and return "extension". + */ + eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION; + + if (eventId >= WaitEventExtensionNamesAllocated || + WaitEventExtensionNames[eventId] == NULL) + return "extension"; + + return WaitEventExtensionNames[eventId]; +} + + /* * Configure wait event reporting to report wait events to *wait_event_info. * *wait_event_info needs to be valid until pgstat_reset_wait_event_storage() @@ -151,6 +317,9 @@ pgstat_get_wait_event(uint32 wait_event_info) case PG_WAIT_LOCK: event_name = GetLockNameFromTagType(eventId); break; + case PG_WAIT_EXTENSION: + event_name = GetWaitEventExtensionIdentifier(eventId); + break; case PG_WAIT_BUFFERPIN: { WaitEventBufferPin w = (WaitEventBufferPin) wait_event_info; @@ -172,13 +341,6 @@ pgstat_get_wait_event(uint32 wait_event_info) event_name = pgstat_get_wait_client(w); break; } - case PG_WAIT_EXTENSION: - { - WaitEventExtension w = (WaitEventExtension) wait_event_info; - - event_name = pgstat_get_wait_extension(w); - break; - } case PG_WAIT_IPC: { WaitEventIPC w = (WaitEventIPC) wait_event_info; diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 3fabad96d9..2ea4789b00 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -261,7 +261,7 @@ WAIT_EVENT_BUFFER_PIN BufferPin "Waiting to acquire an exclusive pin on a buffer Section: ClassName - WaitEventExtension -WAIT_EVENT_EXTENSION Extension "Waiting in an extension." +WAIT_EVENT_DOCONLY Extension "Waiting in an extension." # # Wait events - LWLock diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 4517425f84..aad8bc08fa 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -38,6 +38,32 @@ extern void pgstat_reset_wait_event_storage(void); extern PGDLLIMPORT uint32 *my_wait_event_info; +/* ---------- + * Wait Events - Extension + * + * Use this category when the server process is waiting for some condition + * defined by an extension module. + * + * Extensions can define their own wait events in this category. First, + * they should call WaitEventExtensionNew() to get one or more wait event + * IDs that are allocated from a shared counter. These can be used directly + * with pgstat_report_wait_start() or equivalent. Next, each individual + * process should call WaitEventExtensionRegisterName() to associate a wait + * event string to the number allocated previously. + */ +typedef enum +{ + WAIT_EVENT_EXTENSION = PG_WAIT_EXTENSION, + WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED +} WaitEventExtension; + +extern void WaitEventExtensionShmemInit(void); +extern Size WaitEventExtensionShmemSize(void); + +extern uint32 WaitEventExtensionNew(void); +extern void WaitEventExtensionRegisterName(uint32 wait_event_info, + const char *wait_event_name); + /* ---------- * pgstat_report_wait_start() - * diff --git a/src/test/modules/worker_spi/t/001_worker_spi.pl b/src/test/modules/worker_spi/t/001_worker_spi.pl index 74e109f9a1..c3e7f5fbe6 100644 --- a/src/test/modules/worker_spi/t/001_worker_spi.pl +++ b/src/test/modules/worker_spi/t/001_worker_spi.pl @@ -39,6 +39,28 @@ $node->poll_query_until('postgres', $result = $node->safe_psql('postgres', 'SELECT * FROM schema4.counted;'); is($result, qq(total|1), 'dynamic bgworker correctly consumed tuple data'); +# Check the wait event used by the dynamic bgworker. For a session without +# the state in shared memory known, the default of "extension" is the value +# waited on. +$result = $node->poll_query_until( + 'postgres', + qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';], + 'extension'); +is($result, 1, 'dynamic bgworker has reported "extension" as wait event'); + +# If the shared memory state is loaded (here with worker_spi_init within +# the same connection as the one querying pg_stat_activity), the wait +# event is the custom one. +# The expected result is a special pattern here with a newline coming from the +# first query where the shared memory state is set. +$result = $node->poll_query_until( + 'postgres', + qq[SELECT worker_spi_init(); SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';], + qq[ +worker_spi_main]); +is($result, 1, + 'dynamic bgworker has reported "worker_spi_main" as wait event'); + note "testing bgworkers loaded with shared_preload_libraries"; # Create the database first so as the workers can connect to it when @@ -58,9 +80,9 @@ $node->restart; # Check that bgworkers have been registered and launched. ok( $node->poll_query_until( 'mydb', - qq[SELECT datname, count(datname) FROM pg_stat_activity - WHERE backend_type = 'worker_spi' GROUP BY datname;], - 'mydb|3'), + qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity + WHERE backend_type = 'worker_spi' GROUP BY datname, wait_event;], + 'mydb|3|worker_spi_main'), 'bgworkers all launched' ) or die "Timed out while waiting for bgworkers to be launched"; @@ -72,10 +94,10 @@ my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);'); ok( $node->poll_query_until( 'mydb', - qq[SELECT datname, count(datname) FROM pg_stat_activity + qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity WHERE backend_type = 'worker_spi dynamic' AND - pid IN ($worker1_pid, $worker2_pid) GROUP BY datname;], - 'mydb|2'), + pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;], + 'mydb|2|worker_spi_main'), 'dynamic bgworkers all launched' ) or die "Timed out while waiting for dynamic bgworkers to be launched"; diff --git a/src/test/modules/worker_spi/worker_spi--1.0.sql b/src/test/modules/worker_spi/worker_spi--1.0.sql index e9d5b07373..f13f7e0f98 100644 --- a/src/test/modules/worker_spi/worker_spi--1.0.sql +++ b/src/test/modules/worker_spi/worker_spi--1.0.sql @@ -7,3 +7,8 @@ CREATE FUNCTION worker_spi_launch(pg_catalog.int4) RETURNS pg_catalog.int4 STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION worker_spi_init() +RETURNS VOID STRICT +AS 'MODULE_PATHNAME' +LANGUAGE C; diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c index 903dcddef9..c4317351ce 100644 --- a/src/test/modules/worker_spi/worker_spi.c +++ b/src/test/modules/worker_spi/worker_spi.c @@ -44,10 +44,28 @@ PG_MODULE_MAGIC; +PG_FUNCTION_INFO_V1(worker_spi_init); PG_FUNCTION_INFO_V1(worker_spi_launch); PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn(); +/* Shared memory state */ +typedef struct worker_spi_state +{ + /* the wait event defined during initialization phase */ + uint32 wait_event; +} worker_spi_state; + +static worker_spi_state *wsstate = NULL; /* pointer to shared memory */ + +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_request_hook_type prev_shmem_startup_hook = NULL; + +static void worker_spi_shmem_request(void); +static void worker_spi_shmem_startup(void); +static void worker_spi_shmem_init(void); +static Size worker_spi_memsize(void); + /* GUC variables */ static int worker_spi_naptime = 10; static int worker_spi_total_workers = 2; @@ -60,6 +78,63 @@ typedef struct worktable const char *name; } worktable; +static void +worker_spi_shmem_request(void) +{ + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + + RequestAddinShmemSpace(worker_spi_memsize()); +} + +static void +worker_spi_shmem_startup(void) +{ + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + worker_spi_shmem_init(); +} + +static Size +worker_spi_memsize(void) +{ + return MAXALIGN(sizeof(worker_spi_state)); +} + +/* + * Initialize the shared memory state of worker_spi. + * + * This routine allocates a new wait event when called the first time. + * On follow-up calls, the name of the wait event associated with the + * existing shared memory state is registered. + */ +static void +worker_spi_shmem_init(void) +{ + bool found; + + wsstate = NULL; + + /* Create or attach to the shared memory state */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + wsstate = ShmemInitStruct("worker_spi State", + sizeof(worker_spi_state), + &found); + + /* Define a new wait event */ + if (!found) + wsstate->wait_event = WaitEventExtensionNew(); + + LWLockRelease(AddinShmemInitLock); + + /* + * Register the wait event in the lookup table of the current process. + */ + WaitEventExtensionRegisterName(wsstate->wait_event, "worker_spi_main"); + return; +} + /* * Initialize workspace for a worker process: create the schema if it doesn't * already exist. @@ -149,6 +224,9 @@ worker_spi_main(Datum main_arg) /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); + /* Create (if necessary) and attach to our shared memory area. */ + worker_spi_shmem_init(); + /* Connect to our database */ BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0); @@ -199,7 +277,7 @@ worker_spi_main(Datum main_arg) (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, worker_spi_naptime * 1000L, - WAIT_EVENT_EXTENSION); + wsstate->wait_event); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -328,6 +406,11 @@ _PG_init(void) MarkGUCPrefixReserved("worker_spi"); + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = worker_spi_shmem_request; + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = worker_spi_shmem_startup; + /* set up common data for all our workers */ memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | @@ -351,6 +434,21 @@ _PG_init(void) } } +/* + * Wrapper to initialize a session with the shared memory state + * used by this module. This is a convenience routine to be able to + * see the custom wait event stored in shared memory without loading + * through shared_preload_libraries. + */ +Datum +worker_spi_init(PG_FUNCTION_ARGS) +{ + /* Create (if necessary) and attach to our shared memory area. */ + worker_spi_shmem_init(); + + PG_RETURN_VOID(); +} + /* * Dynamically launch an SPI worker. */ @@ -363,6 +461,9 @@ worker_spi_launch(PG_FUNCTION_ARGS) BgwHandleStatus status; pid_t pid; + /* Create (if necessary) and attach to our shared memory area. */ + worker_spi_shmem_init(); + memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 11d47294cf..ab97f1794a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2992,6 +2992,7 @@ WaitEventActivity WaitEventBufferPin WaitEventClient WaitEventExtension +WaitEventExtensionCounterData WaitEventIO WaitEventIPC WaitEventSet @@ -3862,6 +3863,7 @@ wchar2mb_with_len_converter wchar_t win32_deadchild_waitinfo wint_t +worker_spi_state worker_state worktable wrap