diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 588b720f57..f4fc5d814f 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1117,11 +1117,14 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i - Extensions can add LWLock types to the list shown in - . In some cases, the name + Extensions can add Extension and + LWLock types + to the list shown in and + . In some cases, the name assigned by an extension will not be available in all server processes; - so an LWLock wait event might be reported as - just extension rather than the + so an Extension or LWLock wait + event might be reported as just + extension rather than the extension-assigned name. diff --git a/doc/src/sgml/xfunc.sgml b/doc/src/sgml/xfunc.sgml index 9620ea9ae3..d6345a775b 100644 --- a/doc/src/sgml/xfunc.sgml +++ b/doc/src/sgml/xfunc.sgml @@ -3453,6 +3453,51 @@ if (!ptr) + + Shared Memory and Custom Wait Events + + + Add-ins can define custom wait events under the wait event type + Extension. The add-in's shared library must be + preloaded by specifying it in shared_preload_libraries, + and register a shmem_request_hook and a + shmem_startup_hook in its + _PG_init function. + shmem_request_hook can request a shared memory size + to be later used at startup by calling: + +void RequestAddinShmemSpace(int size) + + + + shmem_startup_hook can allocate in shared memory + custom wait events by calling while holding the LWLock + AddinShmemInitLock to avoid any race conditions: + +uint32 WaitEventExtensionNew(void) + + Next, each process needs to associate the wait event allocated previously + to a user-facing custom string, which is something done by calling: + +void WaitEventExtensionRegisterName(uint32 wait_event_info, const char *wait_event_name) + + An example can be found in src/test/modules/worker_spi + in the PostgreSQL source tree. + + + Custom wait events can be viewed in + pg_stat_activity: + +=# SELECT wait_event_type, wait_event FROM pg_stat_activity + WHERE backend_type ~ 'worker_spi'; + wait_event_type | wait_event +-----------------+----------------- + Extension | worker_spi_main +(1 row) + + + + Using C++ for Extensibility diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index cc387c00a1..5551afffc0 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -49,6 +49,7 @@ #include "storage/spin.h" #include "utils/guc.h" #include "utils/snapmgr.h" +#include "utils/wait_event.h" /* GUCs */ int shared_memory_type = DEFAULT_SHARED_MEMORY_TYPE; @@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, StatsShmemSize()); + size = add_size(size, WaitEventExtensionShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -301,6 +303,7 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); StatsShmemInit(); + WaitEventExtensionShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/utils/activity/generate-wait_event_types.pl b/src/backend/utils/activity/generate-wait_event_types.pl index f63c991051..56335e8730 100644 --- a/src/backend/utils/activity/generate-wait_event_types.pl +++ b/src/backend/utils/activity/generate-wait_event_types.pl @@ -133,10 +133,11 @@ if ($gen_code) foreach my $waitclass (sort { uc($a) cmp uc($b) } keys %hashwe) { - # Don't generate .c and .h files for LWLock and Lock, these are - # handled independently. + # Don't generate .c and .h files for Extension, LWLock and + # Lock, these are handled independently. next - if ( $waitclass eq 'WaitEventLWLock' + if ( $waitclass eq 'WaitEventExtension' + || $waitclass eq 'WaitEventLWLock' || $waitclass eq 'WaitEventLock'); my $last = $waitclass; diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 59177de7a0..b3596ece80 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -22,15 +22,18 @@ */ #include "postgres.h" +#include "miscadmin.h" +#include "port/pg_bitutils.h" #include "storage/lmgr.h" /* for GetLockNameFromTagType */ #include "storage/lwlock.h" /* for GetLWLockIdentifier */ +#include "storage/spin.h" +#include "utils/memutils.h" #include "utils/wait_event.h" static const char *pgstat_get_wait_activity(WaitEventActivity w); static const char *pgstat_get_wait_bufferpin(WaitEventBufferPin w); static const char *pgstat_get_wait_client(WaitEventClient w); -static const char *pgstat_get_wait_extension(WaitEventExtension w); static const char *pgstat_get_wait_ipc(WaitEventIPC w); static const char *pgstat_get_wait_timeout(WaitEventTimeout w); static const char *pgstat_get_wait_io(WaitEventIO w); @@ -42,6 +45,169 @@ uint32 *my_wait_event_info = &local_my_wait_event_info; #define WAIT_EVENT_CLASS_MASK 0xFF000000 #define WAIT_EVENT_ID_MASK 0x0000FFFF +/* dynamic allocation counter for custom wait events in extensions */ +typedef struct WaitEventExtensionCounterData +{ + int nextId; /* next ID to assign */ + slock_t mutex; /* protects the counter */ +} WaitEventExtensionCounterData; + +/* pointer to the shared memory */ +static WaitEventExtensionCounterData *WaitEventExtensionCounter; + +/* first event ID of custom wait events for extensions */ +#define NUM_BUILTIN_WAIT_EVENT_EXTENSION \ + (WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED - WAIT_EVENT_EXTENSION) + +/* + * This is indexed by event ID minus NUM_BUILTIN_WAIT_EVENT_EXTENSION, and + * stores the names of all dynamically-created event IDs known to the current + * process. Any unused entries in the array will contain NULL. + */ +static const char **WaitEventExtensionNames = NULL; +static int WaitEventExtensionNamesAllocated = 0; + +static const char *GetWaitEventExtensionIdentifier(uint16 eventId); + +/* + * Return the space for dynamic allocation counter. + */ +Size +WaitEventExtensionShmemSize(void) +{ + return sizeof(WaitEventExtensionCounterData); +} + +/* + * Allocate shmem space for dynamic allocation counter. + */ +void +WaitEventExtensionShmemInit(void) +{ + bool found; + + WaitEventExtensionCounter = (WaitEventExtensionCounterData *) + ShmemInitStruct("WaitEventExtensionCounterData", + WaitEventExtensionShmemSize(), &found); + + if (!found) + { + /* initialize the allocation counter and its spinlock. */ + WaitEventExtensionCounter->nextId = NUM_BUILTIN_WAIT_EVENT_EXTENSION; + SpinLockInit(&WaitEventExtensionCounter->mutex); + } +} + +/* + * Allocate a new event ID and return the wait event. + */ +uint32 +WaitEventExtensionNew(void) +{ + uint16 eventId; + + Assert(LWLockHeldByMeInMode(AddinShmemInitLock, LW_EXCLUSIVE)); + + SpinLockAcquire(&WaitEventExtensionCounter->mutex); + + if (WaitEventExtensionCounter->nextId > PG_UINT16_MAX) + { + SpinLockRelease(&WaitEventExtensionCounter->mutex); + ereport(ERROR, + errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("too many wait events for extensions")); + } + + eventId = WaitEventExtensionCounter->nextId++; + + SpinLockRelease(&WaitEventExtensionCounter->mutex); + + return PG_WAIT_EXTENSION | eventId; +} + +/* + * Register a dynamic wait event name for extension in the lookup table + * of the current process. + * + * This routine will save a pointer to the wait event name passed as an argument, + * so the name should be allocated in a backend-lifetime context + * (shared memory, TopMemoryContext, static constant, or similar). + * + * The "wait_event_name" will be user-visible as a wait event name, so try to + * use a name that fits the style for those. + */ +void +WaitEventExtensionRegisterName(uint32 wait_event_info, + const char *wait_event_name) +{ + uint32 classId; + uint16 eventId; + + classId = wait_event_info & WAIT_EVENT_CLASS_MASK; + eventId = wait_event_info & WAIT_EVENT_ID_MASK; + + /* Check the wait event class. */ + if (classId != PG_WAIT_EXTENSION) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid wait event class %u", classId)); + + /* This should only be called for user-defined wait event. */ + if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid wait event ID %u", eventId)); + + /* Convert to array index. */ + eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION; + + /* If necessary, create or enlarge array. */ + if (eventId >= WaitEventExtensionNamesAllocated) + { + uint32 newalloc; + + newalloc = pg_nextpower2_32(Max(8, eventId + 1)); + + if (WaitEventExtensionNames == NULL) + WaitEventExtensionNames = (const char **) + MemoryContextAllocZero(TopMemoryContext, + newalloc * sizeof(char *)); + else + WaitEventExtensionNames = + repalloc0_array(WaitEventExtensionNames, const char *, + WaitEventExtensionNamesAllocated, newalloc); + WaitEventExtensionNamesAllocated = newalloc; + } + + WaitEventExtensionNames[eventId] = wait_event_name; +} + +/* + * Return the name of an wait event ID for extension. + */ +static const char * +GetWaitEventExtensionIdentifier(uint16 eventId) +{ + /* Built-in event? */ + if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION) + return "Extension"; + + /* + * It is a user-defined wait event, so look at WaitEventExtensionNames[]. + * However, it is possible that the name has never been registered by + * calling WaitEventExtensionRegisterName() in the current process, in + * which case give up and return "extension". + */ + eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION; + + if (eventId >= WaitEventExtensionNamesAllocated || + WaitEventExtensionNames[eventId] == NULL) + return "extension"; + + return WaitEventExtensionNames[eventId]; +} + + /* * Configure wait event reporting to report wait events to *wait_event_info. * *wait_event_info needs to be valid until pgstat_reset_wait_event_storage() @@ -151,6 +317,9 @@ pgstat_get_wait_event(uint32 wait_event_info) case PG_WAIT_LOCK: event_name = GetLockNameFromTagType(eventId); break; + case PG_WAIT_EXTENSION: + event_name = GetWaitEventExtensionIdentifier(eventId); + break; case PG_WAIT_BUFFERPIN: { WaitEventBufferPin w = (WaitEventBufferPin) wait_event_info; @@ -172,13 +341,6 @@ pgstat_get_wait_event(uint32 wait_event_info) event_name = pgstat_get_wait_client(w); break; } - case PG_WAIT_EXTENSION: - { - WaitEventExtension w = (WaitEventExtension) wait_event_info; - - event_name = pgstat_get_wait_extension(w); - break; - } case PG_WAIT_IPC: { WaitEventIPC w = (WaitEventIPC) wait_event_info; diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 3fabad96d9..2ea4789b00 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -261,7 +261,7 @@ WAIT_EVENT_BUFFER_PIN BufferPin "Waiting to acquire an exclusive pin on a buffer Section: ClassName - WaitEventExtension -WAIT_EVENT_EXTENSION Extension "Waiting in an extension." +WAIT_EVENT_DOCONLY Extension "Waiting in an extension." # # Wait events - LWLock diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 4517425f84..aad8bc08fa 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -38,6 +38,32 @@ extern void pgstat_reset_wait_event_storage(void); extern PGDLLIMPORT uint32 *my_wait_event_info; +/* ---------- + * Wait Events - Extension + * + * Use this category when the server process is waiting for some condition + * defined by an extension module. + * + * Extensions can define their own wait events in this category. First, + * they should call WaitEventExtensionNew() to get one or more wait event + * IDs that are allocated from a shared counter. These can be used directly + * with pgstat_report_wait_start() or equivalent. Next, each individual + * process should call WaitEventExtensionRegisterName() to associate a wait + * event string to the number allocated previously. + */ +typedef enum +{ + WAIT_EVENT_EXTENSION = PG_WAIT_EXTENSION, + WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED +} WaitEventExtension; + +extern void WaitEventExtensionShmemInit(void); +extern Size WaitEventExtensionShmemSize(void); + +extern uint32 WaitEventExtensionNew(void); +extern void WaitEventExtensionRegisterName(uint32 wait_event_info, + const char *wait_event_name); + /* ---------- * pgstat_report_wait_start() - * diff --git a/src/test/modules/worker_spi/t/001_worker_spi.pl b/src/test/modules/worker_spi/t/001_worker_spi.pl index 74e109f9a1..c3e7f5fbe6 100644 --- a/src/test/modules/worker_spi/t/001_worker_spi.pl +++ b/src/test/modules/worker_spi/t/001_worker_spi.pl @@ -39,6 +39,28 @@ $node->poll_query_until('postgres', $result = $node->safe_psql('postgres', 'SELECT * FROM schema4.counted;'); is($result, qq(total|1), 'dynamic bgworker correctly consumed tuple data'); +# Check the wait event used by the dynamic bgworker. For a session without +# the state in shared memory known, the default of "extension" is the value +# waited on. +$result = $node->poll_query_until( + 'postgres', + qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';], + 'extension'); +is($result, 1, 'dynamic bgworker has reported "extension" as wait event'); + +# If the shared memory state is loaded (here with worker_spi_init within +# the same connection as the one querying pg_stat_activity), the wait +# event is the custom one. +# The expected result is a special pattern here with a newline coming from the +# first query where the shared memory state is set. +$result = $node->poll_query_until( + 'postgres', + qq[SELECT worker_spi_init(); SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';], + qq[ +worker_spi_main]); +is($result, 1, + 'dynamic bgworker has reported "worker_spi_main" as wait event'); + note "testing bgworkers loaded with shared_preload_libraries"; # Create the database first so as the workers can connect to it when @@ -58,9 +80,9 @@ $node->restart; # Check that bgworkers have been registered and launched. ok( $node->poll_query_until( 'mydb', - qq[SELECT datname, count(datname) FROM pg_stat_activity - WHERE backend_type = 'worker_spi' GROUP BY datname;], - 'mydb|3'), + qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity + WHERE backend_type = 'worker_spi' GROUP BY datname, wait_event;], + 'mydb|3|worker_spi_main'), 'bgworkers all launched' ) or die "Timed out while waiting for bgworkers to be launched"; @@ -72,10 +94,10 @@ my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);'); ok( $node->poll_query_until( 'mydb', - qq[SELECT datname, count(datname) FROM pg_stat_activity + qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity WHERE backend_type = 'worker_spi dynamic' AND - pid IN ($worker1_pid, $worker2_pid) GROUP BY datname;], - 'mydb|2'), + pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;], + 'mydb|2|worker_spi_main'), 'dynamic bgworkers all launched' ) or die "Timed out while waiting for dynamic bgworkers to be launched"; diff --git a/src/test/modules/worker_spi/worker_spi--1.0.sql b/src/test/modules/worker_spi/worker_spi--1.0.sql index e9d5b07373..f13f7e0f98 100644 --- a/src/test/modules/worker_spi/worker_spi--1.0.sql +++ b/src/test/modules/worker_spi/worker_spi--1.0.sql @@ -7,3 +7,8 @@ CREATE FUNCTION worker_spi_launch(pg_catalog.int4) RETURNS pg_catalog.int4 STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION worker_spi_init() +RETURNS VOID STRICT +AS 'MODULE_PATHNAME' +LANGUAGE C; diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c index 903dcddef9..c4317351ce 100644 --- a/src/test/modules/worker_spi/worker_spi.c +++ b/src/test/modules/worker_spi/worker_spi.c @@ -44,10 +44,28 @@ PG_MODULE_MAGIC; +PG_FUNCTION_INFO_V1(worker_spi_init); PG_FUNCTION_INFO_V1(worker_spi_launch); PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn(); +/* Shared memory state */ +typedef struct worker_spi_state +{ + /* the wait event defined during initialization phase */ + uint32 wait_event; +} worker_spi_state; + +static worker_spi_state *wsstate = NULL; /* pointer to shared memory */ + +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_request_hook_type prev_shmem_startup_hook = NULL; + +static void worker_spi_shmem_request(void); +static void worker_spi_shmem_startup(void); +static void worker_spi_shmem_init(void); +static Size worker_spi_memsize(void); + /* GUC variables */ static int worker_spi_naptime = 10; static int worker_spi_total_workers = 2; @@ -60,6 +78,63 @@ typedef struct worktable const char *name; } worktable; +static void +worker_spi_shmem_request(void) +{ + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + + RequestAddinShmemSpace(worker_spi_memsize()); +} + +static void +worker_spi_shmem_startup(void) +{ + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + worker_spi_shmem_init(); +} + +static Size +worker_spi_memsize(void) +{ + return MAXALIGN(sizeof(worker_spi_state)); +} + +/* + * Initialize the shared memory state of worker_spi. + * + * This routine allocates a new wait event when called the first time. + * On follow-up calls, the name of the wait event associated with the + * existing shared memory state is registered. + */ +static void +worker_spi_shmem_init(void) +{ + bool found; + + wsstate = NULL; + + /* Create or attach to the shared memory state */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + wsstate = ShmemInitStruct("worker_spi State", + sizeof(worker_spi_state), + &found); + + /* Define a new wait event */ + if (!found) + wsstate->wait_event = WaitEventExtensionNew(); + + LWLockRelease(AddinShmemInitLock); + + /* + * Register the wait event in the lookup table of the current process. + */ + WaitEventExtensionRegisterName(wsstate->wait_event, "worker_spi_main"); + return; +} + /* * Initialize workspace for a worker process: create the schema if it doesn't * already exist. @@ -149,6 +224,9 @@ worker_spi_main(Datum main_arg) /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); + /* Create (if necessary) and attach to our shared memory area. */ + worker_spi_shmem_init(); + /* Connect to our database */ BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0); @@ -199,7 +277,7 @@ worker_spi_main(Datum main_arg) (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, worker_spi_naptime * 1000L, - WAIT_EVENT_EXTENSION); + wsstate->wait_event); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -328,6 +406,11 @@ _PG_init(void) MarkGUCPrefixReserved("worker_spi"); + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = worker_spi_shmem_request; + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = worker_spi_shmem_startup; + /* set up common data for all our workers */ memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | @@ -351,6 +434,21 @@ _PG_init(void) } } +/* + * Wrapper to initialize a session with the shared memory state + * used by this module. This is a convenience routine to be able to + * see the custom wait event stored in shared memory without loading + * through shared_preload_libraries. + */ +Datum +worker_spi_init(PG_FUNCTION_ARGS) +{ + /* Create (if necessary) and attach to our shared memory area. */ + worker_spi_shmem_init(); + + PG_RETURN_VOID(); +} + /* * Dynamically launch an SPI worker. */ @@ -363,6 +461,9 @@ worker_spi_launch(PG_FUNCTION_ARGS) BgwHandleStatus status; pid_t pid; + /* Create (if necessary) and attach to our shared memory area. */ + worker_spi_shmem_init(); + memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 11d47294cf..ab97f1794a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2992,6 +2992,7 @@ WaitEventActivity WaitEventBufferPin WaitEventClient WaitEventExtension +WaitEventExtensionCounterData WaitEventIO WaitEventIPC WaitEventSet @@ -3862,6 +3863,7 @@ wchar2mb_with_len_converter wchar_t win32_deadchild_waitinfo wint_t +worker_spi_state worker_state worktable wrap