worker_spi: Expand set of options to start workers
A couple of new options are added to this module to provide more control on the ways bgworkers are started: - A new GUC called worker_spi.role to control which role to use by default when starting a worker. - worker_spi_launch() gains three arguments: a role OID, a database OID and flags (currently only BGWORKER_BYPASS_ALLOWCONN). By default, the role OID and the database OID are InvalidOid, in which case the worker would use the related GUCs. Workers loaded by shared_preload_libraries use the default values provided by the GUCs, with flags at 0. The options are given to the main bgworker routine through bgw_extra. A test case is tweaked to start two dynamic workers with databases and roles defined by the caller of worker_spi_launch(). These additions will have the advantage of expanding the tests for bgworkers, for at least two cases: - BGWORKER_BYPASS_ALLOWCONN has no coverage in the core tree. - A new bgworker flag is under discussion, and this eases the integration of new tests. Reviewed-by: Bertrand Drouvot Discussion: https://postgr.es/m/bcc36259-7850-4882-97ef-d6b905d2fc51@gmail.com
This commit is contained in:
parent
c789f0f6cc
commit
4f2994647f
|
@ -20,7 +20,9 @@ $node->safe_psql('postgres', 'CREATE EXTENSION worker_spi;');
|
|||
# This consists in making sure that a table name "counted" is created
|
||||
# on a new schema whose name includes the index defined in input argument
|
||||
# of worker_spi_launch().
|
||||
# By default, dynamic bgworkers connect to the "postgres" database.
|
||||
# By default, dynamic bgworkers connect to the "postgres" database with
|
||||
# an undefined role, falling back to the GUC defaults (or InvalidOid for
|
||||
# worker_spi_launch).
|
||||
my $result =
|
||||
$node->safe_psql('postgres', 'SELECT worker_spi_launch(4) IS NOT NULL;');
|
||||
is($result, 't', "dynamic bgworker launched");
|
||||
|
@ -44,8 +46,7 @@ $result = $node->poll_query_until(
|
|||
'postgres',
|
||||
qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';],
|
||||
qq[WorkerSpiMain]);
|
||||
is($result, 1,
|
||||
'dynamic bgworker has reported "WorkerSpiMain" as wait event');
|
||||
is($result, 1, 'dynamic bgworker has reported "WorkerSpiMain" as wait event');
|
||||
|
||||
# Check the wait event used by the dynamic bgworker appears in pg_wait_events
|
||||
$result = $node->safe_psql('postgres',
|
||||
|
@ -58,6 +59,7 @@ note "testing bgworkers loaded with shared_preload_libraries";
|
|||
# Create the database first so as the workers can connect to it when
|
||||
# the library is loaded.
|
||||
$node->safe_psql('postgres', q(CREATE DATABASE mydb;));
|
||||
$node->safe_psql('postgres', q(CREATE ROLE myrole SUPERUSER LOGIN;));
|
||||
$node->safe_psql('mydb', 'CREATE EXTENSION worker_spi;');
|
||||
|
||||
# Now load the module as a shared library.
|
||||
|
@ -80,16 +82,25 @@ ok( $node->poll_query_until(
|
|||
|
||||
# Ask worker_spi to launch dynamic bgworkers with the library loaded, then
|
||||
# check their existence. Use IDs that do not overlap with the schemas created
|
||||
# by the previous workers.
|
||||
my $worker1_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(10);');
|
||||
my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);');
|
||||
# by the previous workers. These ones use a new role, on different databases.
|
||||
my $myrole_id = $node->safe_psql('mydb',
|
||||
"SELECT oid FROM pg_roles where rolname = 'myrole';");
|
||||
my $mydb_id = $node->safe_psql('mydb',
|
||||
"SELECT oid FROM pg_database where datname = 'mydb';");
|
||||
my $postgresdb_id = $node->safe_psql('mydb',
|
||||
"SELECT oid FROM pg_database where datname = 'postgres';");
|
||||
my $worker1_pid = $node->safe_psql('mydb',
|
||||
"SELECT worker_spi_launch(10, $mydb_id, $myrole_id);");
|
||||
my $worker2_pid = $node->safe_psql('mydb',
|
||||
"SELECT worker_spi_launch(11, $postgresdb_id, $myrole_id);");
|
||||
|
||||
ok( $node->poll_query_until(
|
||||
'mydb',
|
||||
qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity
|
||||
qq[SELECT datname, usename, wait_event FROM pg_stat_activity
|
||||
WHERE backend_type = 'worker_spi dynamic' AND
|
||||
pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;],
|
||||
'mydb|2|WorkerSpiMain'),
|
||||
pid IN ($worker1_pid, $worker2_pid) ORDER BY datname;],
|
||||
qq[mydb|myrole|WorkerSpiMain
|
||||
postgres|myrole|WorkerSpiMain]),
|
||||
'dynamic bgworkers all launched'
|
||||
) or die "Timed out while waiting for dynamic bgworkers to be launched";
|
||||
|
||||
|
|
|
@ -3,7 +3,11 @@
|
|||
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||
\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
|
||||
-- In the default case, dboid and roleoid fall back to their respective GUCs.
|
||||
CREATE FUNCTION worker_spi_launch(index int4,
|
||||
dboid oid DEFAULT 0,
|
||||
roleoid oid DEFAULT 0,
|
||||
flags text[] DEFAULT '{}')
|
||||
RETURNS pg_catalog.int4 STRICT
|
||||
AS 'MODULE_PATHNAME'
|
||||
LANGUAGE C;
|
||||
|
|
|
@ -34,10 +34,12 @@
|
|||
|
||||
/* these headers are used by this particular worker's code */
|
||||
#include "access/xact.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "executor/spi.h"
|
||||
#include "fmgr.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "pgstat.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/snapmgr.h"
|
||||
#include "tcop/utility.h"
|
||||
|
@ -52,6 +54,7 @@ PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
|
|||
static int worker_spi_naptime = 10;
|
||||
static int worker_spi_total_workers = 2;
|
||||
static char *worker_spi_database = NULL;
|
||||
static char *worker_spi_role = NULL;
|
||||
|
||||
/* value cached, fetched from shared memory */
|
||||
static uint32 worker_spi_wait_event_main = 0;
|
||||
|
@ -138,12 +141,24 @@ worker_spi_main(Datum main_arg)
|
|||
worktable *table;
|
||||
StringInfoData buf;
|
||||
char name[20];
|
||||
Oid dboid;
|
||||
Oid roleoid;
|
||||
char *p;
|
||||
bits32 flags = 0;
|
||||
|
||||
table = palloc(sizeof(worktable));
|
||||
sprintf(name, "schema%d", index);
|
||||
table->schema = pstrdup(name);
|
||||
table->name = pstrdup("counted");
|
||||
|
||||
/* fetch database and role OIDs, these are set for a dynamic worker */
|
||||
p = MyBgworkerEntry->bgw_extra;
|
||||
memcpy(&dboid, p, sizeof(Oid));
|
||||
p += sizeof(Oid);
|
||||
memcpy(&roleoid, p, sizeof(Oid));
|
||||
p += sizeof(Oid);
|
||||
memcpy(&flags, p, sizeof(bits32));
|
||||
|
||||
/* Establish signal handlers before unblocking signals. */
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
|
@ -152,7 +167,11 @@ worker_spi_main(Datum main_arg)
|
|||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
/* Connect to our database */
|
||||
BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
|
||||
if (OidIsValid(dboid))
|
||||
BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
|
||||
else
|
||||
BackgroundWorkerInitializeConnection(worker_spi_database,
|
||||
worker_spi_role, flags);
|
||||
|
||||
elog(LOG, "%s initialized with %s.%s",
|
||||
MyBgworkerEntry->bgw_name, table->schema, table->name);
|
||||
|
@ -316,6 +335,15 @@ _PG_init(void)
|
|||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable("worker_spi.role",
|
||||
"Role to connect with.",
|
||||
NULL,
|
||||
&worker_spi_role,
|
||||
NULL,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
return;
|
||||
|
||||
|
@ -346,6 +374,10 @@ _PG_init(void)
|
|||
|
||||
/*
|
||||
* Now fill in worker-specific data, and do the actual registrations.
|
||||
*
|
||||
* bgw_extra can optionally include a dabatase OID, a role OID and a set
|
||||
* of flags. This is left empty here to fallback to the related GUCs at
|
||||
* startup (0 for the bgworker flags).
|
||||
*/
|
||||
for (int i = 1; i <= worker_spi_total_workers; i++)
|
||||
{
|
||||
|
@ -364,10 +396,18 @@ Datum
|
|||
worker_spi_launch(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 i = PG_GETARG_INT32(0);
|
||||
Oid dboid = PG_GETARG_OID(1);
|
||||
Oid roleoid = PG_GETARG_OID(2);
|
||||
BackgroundWorker worker;
|
||||
BackgroundWorkerHandle *handle;
|
||||
BgwHandleStatus status;
|
||||
pid_t pid;
|
||||
char *p;
|
||||
bits32 flags = 0;
|
||||
ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
|
||||
Size ndim;
|
||||
int nelems;
|
||||
Datum *datum_flags;
|
||||
|
||||
memset(&worker, 0, sizeof(worker));
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
|
||||
|
@ -382,6 +422,54 @@ worker_spi_launch(PG_FUNCTION_ARGS)
|
|||
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
|
||||
worker.bgw_notify_pid = MyProcPid;
|
||||
|
||||
/* extract flags, if any */
|
||||
ndim = ARR_NDIM(arr);
|
||||
if (ndim > 1)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("flags array must be one-dimensional")));
|
||||
|
||||
if (array_contains_nulls(arr))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("flags array must not contain nulls")));
|
||||
|
||||
Assert(ARR_ELEMTYPE(arr) == TEXTOID);
|
||||
deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
|
||||
|
||||
for (i = 0; i < nelems; i++)
|
||||
{
|
||||
char *optname = TextDatumGetCString(datum_flags[i]);
|
||||
|
||||
if (strcmp(optname, "ALLOWCONN") == 0)
|
||||
flags |= BGWORKER_BYPASS_ALLOWCONN;
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("incorrect flag value found in array")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Register database and role to use for the worker started in bgw_extra.
|
||||
* If none have been provided, this will fall back to the GUCs at startup.
|
||||
*/
|
||||
if (!OidIsValid(dboid))
|
||||
dboid = get_database_oid(worker_spi_database, false);
|
||||
|
||||
/*
|
||||
* worker_spi_role is NULL by default, so this gives to worker_spi_main()
|
||||
* an invalid OID in this case.
|
||||
*/
|
||||
if (!OidIsValid(roleoid) && worker_spi_role)
|
||||
roleoid = get_role_oid(worker_spi_role, false);
|
||||
|
||||
p = worker.bgw_extra;
|
||||
memcpy(p, &dboid, sizeof(Oid));
|
||||
p += sizeof(Oid);
|
||||
memcpy(p, &roleoid, sizeof(Oid));
|
||||
p += sizeof(Oid);
|
||||
memcpy(p, &flags, sizeof(bits32));
|
||||
|
||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||
PG_RETURN_NULL();
|
||||
|
||||
|
|
Loading…
Reference in New Issue