postgresql/contrib/worker_spi/worker_spi.c
Alvaro Herrera da07a1e856 Background worker processes
Background workers are postmaster subprocesses that run arbitrary
user-specified code.  They can request shared memory access as well as
backend database connections; or they can just use plain libpq frontend
database connections.

Modules listed in shared_preload_libraries can register background
workers in their _PG_init() function; this is early enough that it's not
necessary to provide an extra GUC option, because the necessary extra
resources can be allocated early on.  Modules can install more than one
bgworker, if necessary.

Care is taken that these extra processes do not interfere with other
postmaster tasks: only one such process is started on each ServerLoop
iteration.  This means a large number of them could be waiting to be
started up and postmaster is still able to quickly service external
connection requests.  Also, shutdown sequence should not be impacted by
a worker process that's reasonably well behaved (i.e. promptly responds
to termination signals.)

The current implementation lets worker processes specify their start
time, i.e. at what point in the server startup process they are to be
started: right after postmaster start (in which case they mustn't ask
for shared memory access), when consistent state has been reached
(useful during recovery in a HOT standby server), or when recovery has
terminated (i.e. when normal backends are allowed).

In case of a bgworker crash, actions to take depend on registration
data: if shared memory was requested, then all other connections are
taken down (as well as other bgworkers), just like it were a regular
backend crashing.  The bgworker itself is restarted, too, within a
configurable timeframe (which can be configured to be never).

More features to add to this framework can be imagined without much
effort, and have been discussed, but this seems good enough as a useful
unit already.

An elementary sample module is supplied.

Author: Álvaro Herrera

This patch is loosely based on prior patches submitted by KaiGai Kohei,
and unsubmitted code by Simon Riggs.

Reviewed by: KaiGai Kohei, Markus Wanner, Andres Freund,
Heikki Linnakangas, Simon Riggs, Amit Kapila
2012-12-06 17:47:30 -03:00

264 lines
6.7 KiB
C

/* -------------------------------------------------------------------------
*
* worker_spi.c
* Sample background worker code that demonstrates usage of a database
* connection.
*
* This code connects to a database, create a schema and table, and summarizes
* the numbers contained therein. To see it working, insert an initial value
* with "total" type and some initial value; then insert some other rows with
* "delta" type. Delta rows will be deleted by this worker and their values
* aggregated into the total.
*
* Copyright (C) 2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/worker_spi/worker_spi.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
PG_MODULE_MAGIC;
void _PG_init(void);
static bool got_sigterm = false;
typedef struct worktable
{
const char *schema;
const char *name;
} worktable;
static void
worker_spi_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
static void
worker_spi_sighup(SIGNAL_ARGS)
{
elog(LOG, "got sighup!");
if (MyProc)
SetLatch(&MyProc->procLatch);
}
static void
initialize_worker_spi(worktable *table)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
}
static void
worker_spi_main(void *main_arg)
{
worktable *table = (worktable *) main_arg;
StringInfoData buf;
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection("postgres", NULL);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH deleted AS (DELETE "
"FROM %s.%s "
"WHERE type = 'delta' RETURNING value), "
"total AS (SELECT coalesce(sum(value), 0) as sum "
"FROM deleted) "
"UPDATE %s.%s "
"SET value = %s.value + total.sum "
"FROM total WHERE type = 'total' "
"RETURNING %s.value",
table->schema, table->name,
table->schema, table->name,
table->name,
table->name);
while (!got_sigterm)
{
int ret;
int rc;
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
}
proc_exit(0);
}
/*
* Entrypoint of this module.
*
* We register two worker processes here, to demonstrate how that can be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
worktable *table;
/* register the worker processes. These values are common for both */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_main = worker_spi_main;
worker.bgw_sighup = worker_spi_sighup;
worker.bgw_sigterm = worker_spi_sigterm;
/*
* These values are used for the first worker.
*
* Note these are palloc'd. The reason this works after starting a new
* worker process is that if we only fork, they point to valid allocated
* memory in the child process; and if we fork and then exec, the exec'd
* process will run this code again, and so the memory is also valid there.
*/
table = palloc(sizeof(worktable));
table->schema = pstrdup("schema1");
table->name = pstrdup("counted");
worker.bgw_name = "SPI worker 1";
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main_arg = (void *) table;
RegisterBackgroundWorker(&worker);
/* Values for the second worker */
table = palloc(sizeof(worktable));
table->schema = pstrdup("our schema2");
table->name = pstrdup("counted rows");
worker.bgw_name = "SPI worker 2";
worker.bgw_restart_time = 2;
worker.bgw_main_arg = (void *) table;
RegisterBackgroundWorker(&worker);
}