diff --git a/contrib/worker_spi/Makefile b/contrib/worker_spi/Makefile index edf4105a11..fbb29b4f2f 100644 --- a/contrib/worker_spi/Makefile +++ b/contrib/worker_spi/Makefile @@ -2,6 +2,9 @@ MODULES = worker_spi +EXTENSION = worker_spi +DATA = worker_spi--1.0.sql + ifdef USE_PGXS PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/contrib/worker_spi/worker_spi--1.0.sql b/contrib/worker_spi/worker_spi--1.0.sql new file mode 100644 index 0000000000..a56b42c10e --- /dev/null +++ b/contrib/worker_spi/worker_spi--1.0.sql @@ -0,0 +1,9 @@ +/* contrib/worker_spi/worker_spi--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit + +CREATE FUNCTION worker_spi_launch(pg_catalog.int4) +RETURNS pg_catalog.bool STRICT +AS 'MODULE_PATHNAME' +LANGUAGE C; diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c index 414721a70f..ef19e4b39e 100644 --- a/contrib/worker_spi/worker_spi.c +++ b/contrib/worker_spi/worker_spi.c @@ -42,8 +42,11 @@ #include "tcop/utility.h" PG_MODULE_MAGIC; +PG_FUNCTION_INFO_V1(worker_spi_launch); void _PG_init(void); +void worker_spi_main(Datum); +Datum worker_spi_launch(PG_FUNCTION_ARGS); /* flags set by signal handlers */ static volatile sig_atomic_t got_sighup = false; @@ -153,11 +156,22 @@ initialize_worker_spi(worktable *table) pgstat_report_activity(STATE_IDLE, NULL); } -static void -worker_spi_main(void *main_arg) +void +worker_spi_main(Datum main_arg) { - worktable *table = (worktable *) main_arg; + int index = DatumGetInt32(main_arg); + worktable *table; StringInfoData buf; + char name[20]; + + table = palloc(sizeof(worktable)); + sprintf(name, "schema%d", index); + table->schema = pstrdup(name); + table->name = pstrdup("counted"); + + /* Establish signal handlers before unblocking signals. */ + pqsignal(SIGHUP, worker_spi_sighup); + pqsignal(SIGTERM, worker_spi_sigterm); /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); @@ -279,7 +293,7 @@ worker_spi_main(void *main_arg) pgstat_report_activity(STATE_IDLE, NULL); } - proc_exit(0); + proc_exit(1); } /* @@ -292,9 +306,7 @@ void _PG_init(void) { BackgroundWorker worker; - worktable *table; unsigned int i; - char name[20]; /* get the configuration */ DefineCustomIntVariable("worker_spi.naptime", @@ -309,6 +321,10 @@ _PG_init(void) NULL, NULL, NULL); + + if (!process_shared_preload_libraries_in_progress) + return; + DefineCustomIntVariable("worker_spi.total_workers", "Number of workers.", NULL, @@ -328,23 +344,41 @@ _PG_init(void) worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_main = worker_spi_main; - worker.bgw_sighup = worker_spi_sighup; - worker.bgw_sigterm = worker_spi_sigterm; + worker.bgw_sighup = NULL; + worker.bgw_sigterm = NULL; /* * Now fill in worker-specific data, and do the actual registrations. */ for (i = 1; i <= worker_spi_total_workers; i++) { - sprintf(name, "worker %d", i); - worker.bgw_name = pstrdup(name); - - table = palloc(sizeof(worktable)); - sprintf(name, "schema%d", i); - table->schema = pstrdup(name); - table->name = pstrdup("counted"); - worker.bgw_main_arg = (void *) table; + snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); + worker.bgw_main_arg = Int32GetDatum(i); RegisterBackgroundWorker(&worker); } } + +/* + * Dynamically launch an SPI worker. + */ +Datum +worker_spi_launch(PG_FUNCTION_ARGS) +{ + int32 i = PG_GETARG_INT32(0); + BackgroundWorker worker; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = NULL; /* new worker might not have library loaded */ + sprintf(worker.bgw_library_name, "worker_spi"); + sprintf(worker.bgw_function_name, "worker_spi_main"); + worker.bgw_sighup = NULL; /* new worker might not have library loaded */ + worker.bgw_sigterm = NULL; /* new worker might not have library loaded */ + snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); + worker.bgw_main_arg = Int32GetDatum(i); + + PG_RETURN_BOOL(RegisterDynamicBackgroundWorker(&worker)); +} diff --git a/contrib/worker_spi/worker_spi.control b/contrib/worker_spi/worker_spi.control new file mode 100644 index 0000000000..84d6294628 --- /dev/null +++ b/contrib/worker_spi/worker_spi.control @@ -0,0 +1,5 @@ +# worker_spi extension +comment = 'Sample background worker' +default_version = '1.0' +module_pathname = '$libdir/worker_spi' +relocatable = true diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index f7126388af..9d9b631ac1 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -30,23 +30,35 @@ - Only modules listed in shared_preload_libraries can run - background workers. A module wishing to run a background worker needs - to register it by calling + Background workers can be initialized at the time that + PostgreSQL is started including the module name in + shared_preload_libraries. A module wishing to run a background + worker can register it by calling RegisterBackgroundWorker(BackgroundWorker *worker) - from its _PG_init(). + from its _PG_init(). Background workers can also be started + after the system is up and running by calling the function + RegisterDynamicBackgroundWorker(BackgroundWorker + *worker). Unlike RegisterBackgroundWorker, which can + only be called from within the postmaster, + RegisterDynamicBackgroundWorker must be called from + a regular backend. + + + The structure BackgroundWorker is defined thus: typedef void (*bgworker_main_type)(void *main_arg); typedef void (*bgworker_sighdlr_type)(SIGNAL_ARGS); typedef struct BackgroundWorker { - char *bgw_name; + char bgw_name[BGW_MAXLEN]; int bgw_flags; BgWorkerStartTime bgw_start_time; int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */ - bgworker_main_type bgw_main; - void *bgw_main_arg; + bgworker_main_type bgw_main; + char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ + char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ + Datum bgw_main_arg; bgworker_sighdlr_type bgw_sighup; bgworker_sighdlr_type bgw_sigterm; } BackgroundWorker; @@ -101,7 +113,29 @@ typedef struct BackgroundWorker bgw_main_arg will be passed to it as its only argument. Note that the global variable MyBgworkerEntry points to a copy of the BackgroundWorker structure - passed at registration time. + passed at registration time. bgw_main may be + NULL; in that case, bgw_library_name and + bgw_function_name will be used to determine + the entrypoint. This is useful for background workers launched after + postmaster startup, where the postmaster does not have the requisite + library loaded. + + + + bgw_library_name is the name of a library in + which the initial entrypoint for the background worker should be sought. + It is ignored unless bgw_main is NULL. + But if bgw_main is NULL, then the named library + will be dynamically loaded by the worker process and + bgw_function_name will be used to identify + the function to be called. + + + + bgw_function_name is the name of a function in + a dynamically loaded library which should be used as the initial entrypoint + for a new background worker. It is ignored unless + bgw_main is NULL. @@ -109,7 +143,10 @@ typedef struct BackgroundWorker pointers to functions that will be installed as signal handlers for the new process. If bgw_sighup is NULL, then SIG_IGN is used; if bgw_sigterm is NULL, a handler is installed that - will terminate the process after logging a suitable message. + will terminate the process after logging a suitable message. These + fields should not be used if bgw_main is NULL; instead, + the worker process should set its own signal handlers before calling + BackgroundWorkerUnblockSignals(). Once running, the process can connect to a database by calling diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 3056b09f0d..71c23211b2 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \ - startup.o syslogger.o walwriter.o checkpointer.o +OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ + pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c new file mode 100644 index 0000000000..3728d85486 --- /dev/null +++ b/src/backend/postmaster/bgworker.c @@ -0,0 +1,483 @@ +/*-------------------------------------------------------------------- + * bgworker.c + * POSTGRES pluggable background workers implementation + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/postmaster/bgworker.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "postmaster/bgworker_internals.h" +#include "storage/barrier.h" +#include "storage/lwlock.h" +#include "storage/pmsignal.h" +#include "storage/shmem.h" +#include "utils/ascii.h" + +/* + * The postmaster's list of registered background workers, in private memory. + */ +slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList); + +/* + * BackgroundWorkerSlots exist in shared memory and can be accessed (via + * the BackgroundWorkerArray) by both the postmaster and by regular backends. + * However, the postmaster cannot take locks, even spinlocks, because this + * might allow it to crash or become wedged if shared memory gets corrupted. + * Such an outcome is intolerable. Therefore, we need a lockless protocol + * for coordinating access to this data. + * + * The 'in_use' flag is used to hand off responsibility for the slot between + * the postmaster and the rest of the system. When 'in_use' is false, + * the postmaster will ignore the slot entirely, except for the 'in_use' flag + * itself, which it may read. In this state, regular backends may modify the + * slot. Once a backend sets 'in_use' to true, the slot becomes the + * responsibility of the postmaster. Regular backends may no longer modify it, + * but the postmaster may examine it. Thus, a backend initializing a slot + * must fully initialize the slot - and insert a write memory barrier - before + * marking it as in use. + * + * In addition to coordinating with the postmaster, backends modifying this + * data structure must coordinate with each other. Since they can take locks, + * this is straightforward: any backend wishing to manipulate a slot must + * take BackgroundWorkerLock in exclusive mode. Backends wishing to read + * data that might get concurrently modified by other backends should take + * this lock in shared mode. No matter what, backends reading this data + * structure must be able to tolerate concurrent modifications by the + * postmaster. + */ +typedef struct BackgroundWorkerSlot +{ + bool in_use; + BackgroundWorker worker; +} BackgroundWorkerSlot; + +typedef struct BackgroundWorkerArray +{ + int total_slots; + BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER]; +} BackgroundWorkerArray; + +BackgroundWorkerArray *BackgroundWorkerData; + +/* + * Calculate shared memory needed. + */ +Size +BackgroundWorkerShmemSize(void) +{ + Size size; + + /* Array of workers is variably sized. */ + size = offsetof(BackgroundWorkerArray, slot); + size = add_size(size, mul_size(max_worker_processes, + sizeof(BackgroundWorkerSlot))); + + return size; +} + +/* + * Initialize shared memory. + */ +void +BackgroundWorkerShmemInit(void) +{ + bool found; + + BackgroundWorkerData = ShmemInitStruct("Background Worker Data", + BackgroundWorkerShmemSize(), + &found); + if (!IsUnderPostmaster) + { + slist_iter siter; + int slotno = 0; + + BackgroundWorkerData->total_slots = max_worker_processes; + + /* + * Copy contents of worker list into shared memory. Record the + * shared memory slot assigned to each worker. This ensures + * a 1-to-1 correspondence betwen the postmaster's private list and + * the array in shared memory. + */ + slist_foreach(siter, &BackgroundWorkerList) + { + BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno]; + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur); + Assert(slotno < max_worker_processes); + slot->in_use = true; + rw->rw_shmem_slot = slotno; + memcpy(&slot->worker, &rw->rw_worker, sizeof(BackgroundWorker)); + ++slotno; + } + + /* + * Mark any remaining slots as not in use. + */ + while (slotno < max_worker_processes) + { + BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno]; + + slot->in_use = false; + ++slotno; + } + } + else + Assert(found); +} + +static RegisteredBgWorker * +FindRegisteredWorkerBySlotNumber(int slotno) +{ + slist_iter siter; + + /* + * Copy contents of worker list into shared memory. Record the + * shared memory slot assigned to each worker. This ensures + * a 1-to-1 correspondence betwen the postmaster's private list and + * the array in shared memory. + */ + slist_foreach(siter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur); + if (rw->rw_shmem_slot == slotno) + return rw; + } + + return NULL; +} + +/* + * Notice changes to shared_memory made by other backends. This code + * runs in the postmaster, so we must be very careful not to assume that + * shared memory contents are sane. Otherwise, a rogue backend could take + * out the postmaster. + */ +void +BackgroundWorkerStateChange(void) +{ + int slotno; + + /* + * The total number of slots stored in shared memory should match our + * notion of max_worker_processes. If it does not, something is very + * wrong. Further down, we always refer to this value as + * max_worker_processes, in case shared memory gets corrupted while + * we're looping. + */ + if (max_worker_processes != BackgroundWorkerData->total_slots) + { + elog(LOG, + "inconsistent background worker state (max_worker_processes=%d, total_slots=%d", + max_worker_processes, + BackgroundWorkerData->total_slots); + return; + } + + /* + * Iterate through slots, looking for newly-registered workers or + * workers who must die. + */ + for (slotno = 0; slotno < max_worker_processes; ++slotno) + { + BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno]; + RegisteredBgWorker *rw; + + if (!slot->in_use) + continue; + + /* + * Make sure we don't see the in_use flag before the updated slot + * contents. + */ + pg_read_barrier(); + + /* + * See whether we already know about this worker. If not, we need + * to update our backend-private BackgroundWorkerList to match shared + * memory. + */ + rw = FindRegisteredWorkerBySlotNumber(slotno); + if (rw != NULL) + continue; + + /* + * Copy the registration data into the registered workers list. + */ + rw = malloc(sizeof(RegisteredBgWorker)); + if (rw == NULL) + { + ereport(LOG, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + return; + } + + /* + * Copy strings in a paranoid way. If shared memory is corrupted, + * the source data might not even be NUL-terminated. + */ + ascii_safe_strlcpy(rw->rw_worker.bgw_name, + slot->worker.bgw_name, BGW_MAXLEN); + ascii_safe_strlcpy(rw->rw_worker.bgw_library_name, + slot->worker.bgw_library_name, BGW_MAXLEN); + ascii_safe_strlcpy(rw->rw_worker.bgw_function_name, + slot->worker.bgw_function_name, BGW_MAXLEN); + + /* + * Copy remaining fields. + * + * flags, start_time, and restart_time are examined by the + * postmaster, but nothing too bad will happen if they are + * corrupted. The remaining fields will only be examined by the + * child process. It might crash, but we won't. + */ + rw->rw_worker.bgw_flags = slot->worker.bgw_flags; + rw->rw_worker.bgw_start_time = slot->worker.bgw_start_time; + rw->rw_worker.bgw_restart_time = slot->worker.bgw_restart_time; + rw->rw_worker.bgw_main = slot->worker.bgw_main; + rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg; + rw->rw_worker.bgw_sighup = slot->worker.bgw_sighup; + rw->rw_worker.bgw_sigterm = slot->worker.bgw_sigterm; + + /* Initialize postmaster bookkeeping. */ + rw->rw_backend = NULL; + rw->rw_pid = 0; + rw->rw_child_slot = 0; + rw->rw_crashed_at = 0; + rw->rw_shmem_slot = slotno; + + /* Log it! */ + ereport(LOG, + (errmsg("registering background worker: %s", + rw->rw_worker.bgw_name))); + + slist_push_head(&BackgroundWorkerList, &rw->rw_lnode); + } +} + +/* + * Forget about a background worker that's no longer needed. + * + * At present, this only happens when a background worker marked + * BGW_NEVER_RESTART exits. This function should only be invoked in + * the postmaster. + */ +void +ForgetBackgroundWorker(RegisteredBgWorker *rw) +{ + BackgroundWorkerSlot *slot; + + Assert(rw->rw_shmem_slot < max_worker_processes); + slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot]; + slot->in_use = false; + + ereport(LOG, + (errmsg("unregistering background worker: %s", + rw->rw_worker.bgw_name))); + + slist_delete(&BackgroundWorkerList, &rw->rw_lnode); + free(rw); +} + +#ifdef EXEC_BACKEND +/* + * In EXEC_BACKEND mode, workers use this to retrieve their details from + * shared memory. + */ +BackgroundWorker * +BackgroundWorkerEntry(int slotno) +{ + BackgroundWorkerSlot *slot; + + Assert(slotno < BackgroundWorkerData->total_slots); + slot = &BackgroundWorkerData->slot[slotno]; + Assert(slot->in_use); + return &slot->worker; /* can't become free while we're still here */ +} +#endif + +/* + * Complain about the BackgroundWorker definition using error level elevel. + * Return true if it looks ok, false if not (unless elevel >= ERROR, in + * which case we won't return at all in the not-OK case). + */ +static bool +SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel) +{ + /* sanity check for flags */ + if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) + { + if (!(worker->bgw_flags & BGWORKER_SHMEM_ACCESS)) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": must attach to shared memory in order to request a database connection", + worker->bgw_name))); + return false; + } + + if (worker->bgw_start_time == BgWorkerStart_PostmasterStart) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": cannot request database access if starting at postmaster start", + worker->bgw_name))); + return false; + } + + /* XXX other checks? */ + } + + if ((worker->bgw_restart_time < 0 && + worker->bgw_restart_time != BGW_NEVER_RESTART) || + (worker->bgw_restart_time > USECS_PER_DAY / 1000)) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": invalid restart interval", + worker->bgw_name))); + return false; + } + + return true; +} + +/* + * Register a new background worker while processing shared_preload_libraries. + * + * This can only be called in the _PG_init function of a module library + * that's loaded by shared_preload_libraries; otherwise it has no effect. + */ +void +RegisterBackgroundWorker(BackgroundWorker *worker) +{ + RegisteredBgWorker *rw; + static int numworkers = 0; + + if (!IsUnderPostmaster) + ereport(LOG, + (errmsg("registering background worker: %s", worker->bgw_name))); + + if (!process_shared_preload_libraries_in_progress) + { + if (!IsUnderPostmaster) + ereport(LOG, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("background worker \"%s\": must be registered in shared_preload_libraries", + worker->bgw_name))); + return; + } + + if (!SanityCheckBackgroundWorker(worker, LOG)) + return; + + /* + * Enforce maximum number of workers. Note this is overly restrictive: we + * could allow more non-shmem-connected workers, because these don't count + * towards the MAX_BACKENDS limit elsewhere. For now, it doesn't seem + * important to relax this restriction. + */ + if (++numworkers > max_worker_processes) + { + ereport(LOG, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("too many background workers"), + errdetail_plural("Up to %d background worker can be registered with the current settings.", + "Up to %d background workers can be registered with the current settings.", + max_worker_processes, + max_worker_processes), + errhint("Consider increasing the configuration parameter \"max_worker_processes\"."))); + return; + } + + /* + * Copy the registration data into the registered workers list. + */ + rw = malloc(sizeof(RegisteredBgWorker)); + if (rw == NULL) + { + ereport(LOG, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + return; + } + + rw->rw_worker = *worker; + rw->rw_backend = NULL; + rw->rw_pid = 0; + rw->rw_child_slot = 0; + rw->rw_crashed_at = 0; + + slist_push_head(&BackgroundWorkerList, &rw->rw_lnode); +} + +/* + * Register a new background worker from a regular backend. + * + * Returns true on success and false on failure. Failure typically indicates + * that no background worker slots are currently available. + */ +bool +RegisterDynamicBackgroundWorker(BackgroundWorker *worker) +{ + int slotno; + bool success = false; + + /* + * We can't register dynamic background workers from the postmaster. + * If this is a standalone backend, we're the only process and can't + * start any more. In a multi-process environement, it might be + * theoretically possible, but we don't currently support it due to + * locking considerations; see comments on the BackgroundWorkerSlot + * data structure. + */ + if (!IsUnderPostmaster) + return false; + + if (!SanityCheckBackgroundWorker(worker, ERROR)) + return false; + + LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE); + + /* + * Look for an unused slot. If we find one, grab it. + */ + for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno) + { + BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno]; + + if (!slot->in_use) + { + memcpy(&slot->worker, worker, sizeof(BackgroundWorker)); + + /* + * Make sure postmaster doesn't see the slot as in use before + * it sees the new contents. + */ + pg_write_barrier(); + + slot->in_use = true; + success = true; + break; + } + } + + LWLockRelease(BackgroundWorkerLock); + + /* If we found a slot, tell the postmaster to notice the change. */ + if (success) + SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE); + + return success; +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 496192d57c..d9b800c4e7 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -103,7 +103,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" -#include "postmaster/bgworker.h" +#include "postmaster/bgworker_internals.h" #include "postmaster/fork_process.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" @@ -117,6 +117,7 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/datetime.h" +#include "utils/dynamic_loader.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -178,29 +179,6 @@ static dlist_head BackendList = DLIST_STATIC_INIT(BackendList); static Backend *ShmemBackendArray; #endif - -/* - * List of background workers. - * - * A worker that requests a database connection during registration will have - * rw_backend set, and will be present in BackendList. Note: do not rely on - * rw_backend being non-NULL for shmem-connected workers! - */ -typedef struct RegisteredBgWorker -{ - BackgroundWorker rw_worker; /* its registry entry */ - Backend *rw_backend; /* its BackendList entry, or NULL */ - pid_t rw_pid; /* 0 if not running */ - int rw_child_slot; - TimestampTz rw_crashed_at; /* if not 0, time it last crashed */ -#ifdef EXEC_BACKEND - int rw_cookie; -#endif - slist_node rw_lnode; /* list link */ -} RegisteredBgWorker; - -static slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList); - BackgroundWorker *MyBgworkerEntry = NULL; @@ -532,8 +510,6 @@ static bool save_backend_variables(BackendParameters *param, Port *port, static void ShmemBackendArrayAdd(Backend *bn); static void ShmemBackendArrayRemove(Backend *bn); - -static BackgroundWorker *find_bgworker_entry(int cookie); #endif /* EXEC_BACKEND */ #define StartupDataBase() StartChildProcess(StartupProcess) @@ -1456,7 +1432,7 @@ DetermineSleepTime(struct timeval * timeout) if (HaveCrashedWorker) { - slist_iter siter; + slist_mutable_iter siter; /* * When there are crashed bgworkers, we sleep just long enough that @@ -1464,7 +1440,7 @@ DetermineSleepTime(struct timeval * timeout) * determine the minimum of all wakeup times according to most recent * crash time and requested restart interval. */ - slist_foreach(siter, &BackgroundWorkerList) + slist_foreach_modify(siter, &BackgroundWorkerList) { RegisteredBgWorker *rw; TimestampTz this_wakeup; @@ -1475,7 +1451,10 @@ DetermineSleepTime(struct timeval * timeout) continue; if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART) + { + ForgetBackgroundWorker(rw); continue; + } this_wakeup = TimestampTzPlusMilliseconds(rw->rw_crashed_at, 1000L * rw->rw_worker.bgw_restart_time); @@ -4619,7 +4598,7 @@ SubPostmasterMain(int argc, char *argv[]) } if (strncmp(argv[1], "--forkbgworker=", 15) == 0) { - int cookie; + int shmem_slot; /* Close the postmaster's sockets */ ClosePostmasterPorts(false); @@ -4633,8 +4612,8 @@ SubPostmasterMain(int argc, char *argv[]) /* Attach process to shared data structures */ CreateSharedMemoryAndSemaphores(false, 0); - cookie = atoi(argv[1] + 15); - MyBgworkerEntry = find_bgworker_entry(cookie); + shmem_slot = atoi(argv[1] + 15); + MyBgworkerEntry = BackgroundWorkerEntry(shmem_slot); do_start_bgworker(); } if (strcmp(argv[1], "--forkarch") == 0) @@ -4697,9 +4676,17 @@ static void sigusr1_handler(SIGNAL_ARGS) { int save_errno = errno; + bool start_bgworker = false; PG_SETMASK(&BlockSig); + /* Process background worker state change. */ + if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)) + { + BackgroundWorkerStateChange(); + start_bgworker = true; + } + /* * RECOVERY_STARTED and BEGIN_HOT_STANDBY signals are ignored in * unexpected states. If the startup process quickly starts up, completes @@ -4737,11 +4724,13 @@ sigusr1_handler(SIGNAL_ARGS) (errmsg("database system is ready to accept read only connections"))); pmState = PM_HOT_STANDBY; - /* Some workers may be scheduled to start now */ - StartOneBackgroundWorker(); + start_bgworker = true; } + if (start_bgworker) + StartOneBackgroundWorker(); + if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) && PgArchPID != 0) { @@ -5214,126 +5203,6 @@ MaxLivePostmasterChildren(void) max_worker_processes); } -/* - * Register a new background worker. - * - * This can only be called in the _PG_init function of a module library - * that's loaded by shared_preload_libraries; otherwise it has no effect. - */ -void -RegisterBackgroundWorker(BackgroundWorker *worker) -{ - RegisteredBgWorker *rw; - int namelen = strlen(worker->bgw_name); - static int numworkers = 0; - -#ifdef EXEC_BACKEND - - /* - * Use 1 here, not 0, to avoid confusing a possible bogus cookie read by - * atoi() in SubPostmasterMain. - */ - static int BackgroundWorkerCookie = 1; -#endif - - if (!IsUnderPostmaster) - ereport(LOG, - (errmsg("registering background worker: %s", worker->bgw_name))); - - if (!process_shared_preload_libraries_in_progress) - { - if (!IsUnderPostmaster) - ereport(LOG, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("background worker \"%s\": must be registered in shared_preload_libraries", - worker->bgw_name))); - return; - } - - /* sanity check for flags */ - if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) - { - if (!(worker->bgw_flags & BGWORKER_SHMEM_ACCESS)) - { - if (!IsUnderPostmaster) - ereport(LOG, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("background worker \"%s\": must attach to shared memory in order to request a database connection", - worker->bgw_name))); - return; - } - - if (worker->bgw_start_time == BgWorkerStart_PostmasterStart) - { - if (!IsUnderPostmaster) - ereport(LOG, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("background worker \"%s\": cannot request database access if starting at postmaster start", - worker->bgw_name))); - return; - } - - /* XXX other checks? */ - } - - if ((worker->bgw_restart_time < 0 && - worker->bgw_restart_time != BGW_NEVER_RESTART) || - (worker->bgw_restart_time > USECS_PER_DAY / 1000)) - { - if (!IsUnderPostmaster) - ereport(LOG, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("background worker \"%s\": invalid restart interval", - worker->bgw_name))); - return; - } - - /* - * Enforce maximum number of workers. Note this is overly restrictive: we - * could allow more non-shmem-connected workers, because these don't count - * towards the MAX_BACKENDS limit elsewhere. For now, it doesn't seem - * important to relax this restriction. - */ - if (++numworkers > max_worker_processes) - { - ereport(LOG, - (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("too many background workers"), - errdetail_plural("Up to %d background worker can be registered with the current settings.", - "Up to %d background workers can be registered with the current settings.", - max_worker_processes, - max_worker_processes), - errhint("Consider increasing the configuration parameter \"max_worker_processes\"."))); - return; - } - - /* - * Copy the registration data into the registered workers list. - */ - rw = malloc(sizeof(RegisteredBgWorker) + namelen + 1); - if (rw == NULL) - { - ereport(LOG, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - return; - } - - rw->rw_worker = *worker; - rw->rw_worker.bgw_name = ((char *) rw) + sizeof(RegisteredBgWorker); - strlcpy(rw->rw_worker.bgw_name, worker->bgw_name, namelen + 1); - - rw->rw_backend = NULL; - rw->rw_pid = 0; - rw->rw_child_slot = 0; - rw->rw_crashed_at = 0; -#ifdef EXEC_BACKEND - rw->rw_cookie = BackgroundWorkerCookie++; -#endif - - slist_push_head(&BackgroundWorkerList, &rw->rw_lnode); -} - /* * Connect background worker to a database. */ @@ -5372,25 +5241,6 @@ BackgroundWorkerUnblockSignals(void) PG_SETMASK(&UnBlockSig); } -#ifdef EXEC_BACKEND -static BackgroundWorker * -find_bgworker_entry(int cookie) -{ - slist_iter iter; - - slist_foreach(iter, &BackgroundWorkerList) - { - RegisteredBgWorker *rw; - - rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); - if (rw->rw_cookie == cookie) - return &rw->rw_worker; - } - - return NULL; -} -#endif - static void bgworker_quickdie(SIGNAL_ARGS) { @@ -5453,6 +5303,7 @@ do_start_bgworker(void) sigjmp_buf local_sigjmp_buf; char buf[MAXPGPATH]; BackgroundWorker *worker = MyBgworkerEntry; + bgworker_main_type entrypt; if (worker == NULL) elog(FATAL, "unable to find bgworker entry"); @@ -5568,6 +5419,23 @@ do_start_bgworker(void) InitProcess(); #endif + /* + * If bgw_main is set, we use that value as the initial entrypoint. + * However, if the library containing the entrypoint wasn't loaded at + * postmaster startup time, passing it as a direct function pointer is + * not possible. To work around that, we allow callers for whom a + * function pointer is not available to pass a library name (which will + * be loaded, if necessary) and a function name (which will be looked up + * in the named library). + */ + if (worker->bgw_main != NULL) + entrypt = worker->bgw_main; + else + entrypt = (bgworker_main_type) + load_external_function(worker->bgw_library_name, + worker->bgw_function_name, + true, NULL); + /* * Note that in normal processes, we would call InitPostgres here. For a * worker, however, we don't know what database to connect to, yet; so we @@ -5578,7 +5446,7 @@ do_start_bgworker(void) /* * Now invoke the user-defined worker code */ - worker->bgw_main(worker->bgw_main_arg); + entrypt(worker->bgw_main_arg); /* ... and if it returns, we're done */ proc_exit(0); @@ -5586,13 +5454,13 @@ do_start_bgworker(void) #ifdef EXEC_BACKEND static pid_t -bgworker_forkexec(int cookie) +bgworker_forkexec(int shmem_slot) { char *av[10]; int ac = 0; char forkav[MAXPGPATH]; - snprintf(forkav, MAXPGPATH, "--forkbgworker=%d", cookie); + snprintf(forkav, MAXPGPATH, "--forkbgworker=%d", shmem_slot); av[ac++] = "postgres"; av[ac++] = forkav; @@ -5621,7 +5489,7 @@ start_bgworker(RegisteredBgWorker *rw) rw->rw_worker.bgw_name))); #ifdef EXEC_BACKEND - switch ((worker_pid = bgworker_forkexec(rw->rw_cookie))) + switch ((worker_pid = bgworker_forkexec(rw->rw_shmem_slot))) #else switch ((worker_pid = fork_process())) #endif @@ -5749,7 +5617,7 @@ assign_backendlist_entry(RegisteredBgWorker *rw) static void StartOneBackgroundWorker(void) { - slist_iter iter; + slist_mutable_iter iter; TimestampTz now = 0; if (FatalError) @@ -5761,7 +5629,7 @@ StartOneBackgroundWorker(void) HaveCrashedWorker = false; - slist_foreach(iter, &BackgroundWorkerList) + slist_foreach_modify(iter, &BackgroundWorkerList) { RegisteredBgWorker *rw; @@ -5781,7 +5649,10 @@ StartOneBackgroundWorker(void) if (rw->rw_crashed_at != 0) { if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART) + { + ForgetBackgroundWorker(rw); continue; + } if (now == 0) now = GetCurrentTimestamp(); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index b34ba44712..a0b741b444 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -24,6 +24,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" +#include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" #include "replication/walreceiver.h" @@ -113,6 +114,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, CLOGShmemSize()); size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, TwoPhaseShmemSize()); + size = add_size(size, BackgroundWorkerShmemSize()); size = add_size(size, MultiXactShmemSize()); size = add_size(size, LWLockShmemSize()); size = add_size(size, ProcArrayShmemSize()); @@ -214,6 +216,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) CreateSharedProcArray(); CreateSharedBackendStatus(); TwoPhaseShmemInit(); + BackgroundWorkerShmemInit(); /* * Set up shared-inval messaging diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 53167057e9..794eb39072 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -52,7 +52,7 @@ #define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002 -typedef void (*bgworker_main_type) (void *main_arg); +typedef void (*bgworker_main_type) (Datum main_arg); typedef void (*bgworker_sighdlr_type) (SIGNAL_ARGS); /* @@ -67,22 +67,28 @@ typedef enum #define BGW_DEFAULT_RESTART_INTERVAL 60 #define BGW_NEVER_RESTART -1 +#define BGW_MAXLEN 64 typedef struct BackgroundWorker { - char *bgw_name; + char bgw_name[BGW_MAXLEN]; int bgw_flags; BgWorkerStartTime bgw_start_time; int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */ bgworker_main_type bgw_main; - void *bgw_main_arg; + char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ + char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ + Datum bgw_main_arg; bgworker_sighdlr_type bgw_sighup; bgworker_sighdlr_type bgw_sigterm; } BackgroundWorker; -/* Register a new bgworker */ +/* Register a new bgworker during shared_preload_libraries */ extern void RegisterBackgroundWorker(BackgroundWorker *worker); +/* Register a new bgworker from a regular backend */ +extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker); + /* This is valid in a running worker */ extern BackgroundWorker *MyBgworkerEntry; diff --git a/src/include/postmaster/bgworker_internals.h b/src/include/postmaster/bgworker_internals.h new file mode 100644 index 0000000000..6484cfb7a6 --- /dev/null +++ b/src/include/postmaster/bgworker_internals.h @@ -0,0 +1,48 @@ +/*-------------------------------------------------------------------- + * bgworker_internals.h + * POSTGRES pluggable background workers internals + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/postmaster/bgworker.h + *-------------------------------------------------------------------- + */ +#ifndef BGWORKER_INTERNALS_H +#define BGWORKER_INTERNALS_H + +#include "datatype/timestamp.h" +#include "lib/ilist.h" +#include "postmaster/bgworker.h" + +/* + * List of background workers, private to postmaster. + * + * A worker that requests a database connection during registration will have + * rw_backend set, and will be present in BackendList. Note: do not rely on + * rw_backend being non-NULL for shmem-connected workers! + */ +typedef struct RegisteredBgWorker +{ + BackgroundWorker rw_worker; /* its registry entry */ + struct bkend *rw_backend; /* its BackendList entry, or NULL */ + pid_t rw_pid; /* 0 if not running */ + int rw_child_slot; + TimestampTz rw_crashed_at; /* if not 0, time it last crashed */ + int rw_shmem_slot; + slist_node rw_lnode; /* list link */ +} RegisteredBgWorker; + +extern slist_head BackgroundWorkerList; + +extern Size BackgroundWorkerShmemSize(void); +extern void BackgroundWorkerShmemInit(void); +extern void BackgroundWorkerStateChange(void); +extern void ForgetBackgroundWorker(RegisteredBgWorker *); + +#ifdef EXEC_BACKEND +extern BackgroundWorker *BackgroundWorkerEntry(int slotno); +#endif + +#endif /* BGWORKER_INTERNLS_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 85dc4ffdaa..39415a398a 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -79,6 +79,7 @@ typedef enum LWLockId SerializablePredicateLockListLock, OldSerXidLock, SyncRepLock, + BackgroundWorkerLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index a6cb84431f..d894edf9a7 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -28,6 +28,7 @@ typedef enum PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ + PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */ PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */