postgresql/contrib/worker_spi/worker_spi.c
Robert Haas 7f7485a0cd Allow background workers to be started dynamically.
There is a new API, RegisterDynamicBackgroundWorker, which allows
an ordinary user backend to register a new background writer during
normal running.  This means that it's no longer necessary for all
background workers to be registered during processing of
shared_preload_libraries, although the option of registering workers
at that time remains available.

When a background worker exits and will not be restarted, the
slot previously used by that background worker is automatically
released and becomes available for reuse.  Slots used by background
workers that are configured for automatic restart can't (yet) be
released without shutting down the system.

This commit adds a new source file, bgworker.c, and moves some
of the existing control logic for background workers there.
Previously, there was little enough logic that it made sense to
keep everything in postmaster.c, but not any more.

This commit also makes the worker_spi contrib module into an
extension and adds a new function, worker_spi_launch, which can
be used to demonstrate the new facility.
2013-07-16 13:02:15 -04:00

385 lines
10 KiB
C

/* -------------------------------------------------------------------------
*
* worker_spi.c
* Sample background worker code that demonstrates various coding
* patterns: establishing a database connection; starting and committing
* transactions; using GUC variables, and heeding SIGHUP to reread
* the configuration file; reporting to pg_stat_activity; using the
* process latch to sleep and exit in case of postmaster death.
*
* This code connects to a database, creates a schema and table, and summarizes
* the numbers contained therein. To see it working, insert an initial value
* with "total" type and some initial value; then insert some other rows with
* "delta" type. Delta rows will be deleted by this worker and their values
* aggregated into the total.
*
* Copyright (C) 2013, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/worker_spi/worker_spi.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_launch);
void _PG_init(void);
void worker_spi_main(Datum);
Datum worker_spi_launch(PG_FUNCTION_ARGS);
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
static volatile sig_atomic_t got_sigterm = false;
/* GUC variables */
static int worker_spi_naptime = 10;
static int worker_spi_total_workers = 2;
typedef struct worktable
{
const char *schema;
const char *name;
} worktable;
/*
* Signal handler for SIGTERM
* Set a flag to let the main loop to terminate, and set our latch to wake
* it up.
*/
static void
worker_spi_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
/*
* Signal handler for SIGHUP
* Set a flag to let the main loop to reread the config file, and set
* our latch to wake it up.
*/
static void
worker_spi_sighup(SIGNAL_ARGS)
{
got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
}
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
*/
static void
initialize_worker_spi(worktable *table)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
/* set statement start time */
SetCurrentStatementStartTimestamp();
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
void
worker_spi_main(Datum main_arg)
{
int index = DatumGetInt32(main_arg);
worktable *table;
StringInfoData buf;
char name[20];
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", index);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, worker_spi_sighup);
pqsignal(SIGTERM, worker_spi_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection("postgres", NULL);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH deleted AS (DELETE "
"FROM %s.%s "
"WHERE type = 'delta' RETURNING value), "
"total AS (SELECT coalesce(sum(value), 0) as sum "
"FROM deleted) "
"UPDATE %s.%s "
"SET value = %s.value + total.sum "
"FROM total WHERE type = 'total' "
"RETURNING %s.value",
table->schema, table->name,
table->schema, table->name,
table->name,
table->name);
/*
* Main loop: do this until the SIGTERM handler tells us to terminate
*/
while (!got_sigterm)
{
int ret;
int rc;
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
worker_spi_naptime * 1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (got_sighup)
{
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot
* which is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible
* through the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, buf.data);
/* We can now execute queries via SPI */
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
proc_exit(1);
}
/*
* Entrypoint of this module.
*
* We register more than one worker process here, to demonstrate how that can
* be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
unsigned int i;
/* get the configuration */
DefineCustomIntVariable("worker_spi.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_spi_naptime,
10,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (!process_shared_preload_libraries_in_progress)
return;
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
&worker_spi_total_workers,
2,
1,
100,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
/* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_spi_main;
worker.bgw_sighup = NULL;
worker.bgw_sigterm = NULL;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
for (i = 1; i <= worker_spi_total_workers; i++)
{
snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
/*
* Dynamically launch an SPI worker.
*/
Datum
worker_spi_launch(PG_FUNCTION_ARGS)
{
int32 i = PG_GETARG_INT32(0);
BackgroundWorker worker;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = NULL; /* new worker might not have library loaded */
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
worker.bgw_sighup = NULL; /* new worker might not have library loaded */
worker.bgw_sigterm = NULL; /* new worker might not have library loaded */
snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
worker.bgw_main_arg = Int32GetDatum(i);
PG_RETURN_BOOL(RegisterDynamicBackgroundWorker(&worker));
}