From 924bcf4f16d54c55310b28f77686608684734f42 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 30 Apr 2015 15:02:14 -0400 Subject: [PATCH] Create an infrastructure for parallel computation in PostgreSQL. This does four basic things. First, it provides convenience routines to coordinate the startup and shutdown of parallel workers. Second, it synchronizes various pieces of state (e.g. GUCs, combo CID mappings, transaction snapshot) from the parallel group leader to the worker processes. Third, it prohibits various operations that would result in unsafe changes to that state while parallelism is active. Finally, it propagates events that would result in an ErrorResponse, NoticeResponse, or NotifyResponse message being sent to the client from the parallel workers back to the master, from which they can then be sent on to the client. Robert Haas, Amit Kapila, Noah Misch, Rushabh Lathia, Jeevan Chalke. Suggestions and review from Andres Freund, Heikki Linnakangas, Noah Misch, Simon Riggs, Euler Taveira, and Jim Nasby. --- contrib/postgres_fdw/connection.c | 3 + src/backend/access/heap/heapam.c | 55 ++ src/backend/access/transam/Makefile | 2 +- src/backend/access/transam/README.parallel | 223 +++++ src/backend/access/transam/parallel.c | 1007 ++++++++++++++++++++ src/backend/access/transam/varsup.c | 7 + src/backend/access/transam/xact.c | 486 +++++++++- src/backend/access/transam/xlog.c | 8 + src/backend/catalog/namespace.c | 11 +- src/backend/commands/copy.c | 3 +- src/backend/commands/sequence.c | 14 + src/backend/executor/execMain.c | 30 +- src/backend/executor/functions.c | 3 + src/backend/executor/spi.c | 32 +- src/backend/libpq/pqmq.c | 33 +- src/backend/postmaster/bgworker.c | 50 + src/backend/storage/ipc/procarray.c | 44 + src/backend/storage/ipc/procsignal.c | 4 + src/backend/storage/lmgr/predicate.c | 8 + src/backend/tcop/postgres.c | 4 +- src/backend/tcop/utility.c | 30 +- src/backend/utils/adt/lockfuncs.c | 30 + src/backend/utils/fmgr/dfmgr.c | 54 ++ src/backend/utils/misc/guc.c | 23 + src/backend/utils/time/combocid.c | 74 ++ src/backend/utils/time/snapmgr.c | 210 +++- src/include/access/parallel.h | 68 ++ src/include/access/xact.h | 11 + src/include/catalog/namespace.h | 2 +- src/include/fmgr.h | 3 + src/include/libpq/pqmq.h | 1 + src/include/miscadmin.h | 1 + src/include/postmaster/bgworker.h | 2 + src/include/storage/procarray.h | 1 + src/include/storage/procsignal.h | 1 + src/include/utils/combocid.h | 3 + src/include/utils/snapmgr.h | 5 + 37 files changed, 2499 insertions(+), 47 deletions(-) create mode 100644 src/backend/access/transam/README.parallel create mode 100644 src/backend/access/transam/parallel.c create mode 100644 src/include/access/parallel.h diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4e02cb289d..1a1e5b5eae 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -546,6 +546,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) switch (event) { + case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ do_sql_command(entry->conn, "COMMIT TRANSACTION"); @@ -588,11 +589,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot prepare a transaction that modified remote tables"))); break; + case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_COMMIT: case XACT_EVENT_PREPARE: /* Pre-commit should have closed the open transaction */ elog(ERROR, "missed cleaning up connection during pre-commit"); break; + case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ entry->have_error = true; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b504ccd05c..e84c1743f4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -42,6 +42,7 @@ #include "access/heapam_xlog.h" #include "access/hio.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/relscan.h" #include "access/sysattr.h" #include "access/transam.h" @@ -1051,7 +1052,13 @@ relation_open(Oid relationId, LOCKMODE lockmode) /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) + { + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot access temporary tables during a parallel operation"))); MyXactAccessedTempRel = true; + } pgstat_initstats(r); @@ -1097,7 +1104,13 @@ try_relation_open(Oid relationId, LOCKMODE lockmode) /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) + { + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot access temporary tables during a parallel operation"))); MyXactAccessedTempRel = true; + } pgstat_initstats(r); @@ -2237,6 +2250,17 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { + /* + * For now, parallel operations are required to be strictly read-only. + * Unlike heap_update() and heap_delete(), an insert should never create + * a combo CID, so it might be possible to relax this restriction, but + * not without more thought and testing. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot insert tuples during a parallel operation"))); + if (relation->rd_rel->relhasoids) { #ifdef NOT_USED @@ -2648,6 +2672,16 @@ heap_delete(Relation relation, ItemPointer tid, Assert(ItemPointerIsValid(tid)); + /* + * Forbid this during a parallel operation, lets it allocate a combocid. + * Other workers might need that combocid for visibility checks, and we + * have no provision for broadcasting it to them. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot delete tuples during a parallel operation"))); + block = ItemPointerGetBlockNumber(tid); buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); @@ -3099,6 +3133,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, Assert(ItemPointerIsValid(otid)); + /* + * Forbid this during a parallel operation, lets it allocate a combocid. + * Other workers might need that combocid for visibility checks, and we + * have no provision for broadcasting it to them. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot update tuples during a parallel operation"))); + /* * Fetch the list of attributes to be checked for HOT update. This is * wasted effort if we fail to update or have to put the new tuple on a @@ -5400,6 +5444,17 @@ heap_inplace_update(Relation relation, HeapTuple tuple) uint32 oldlen; uint32 newlen; + /* + * For now, parallel operations are required to be strictly read-only. + * Unlike a regular update, this should never create a combo CID, so it + * might be possible to relax this restriction, but not without more + * thought and testing. It's not clear that it would be useful, anyway. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot update tuples during a parallel operation"))); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 9d4d5dbc97..94455b23f7 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \ +OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \ timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ xloginsert.o xlogreader.o xlogutils.o diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel new file mode 100644 index 0000000000..10051863fe --- /dev/null +++ b/src/backend/access/transam/README.parallel @@ -0,0 +1,223 @@ +Overview +======== + +PostgreSQL provides some simple facilities to make writing parallel algorithms +easier. Using a data structure called a ParallelContext, you can arrange to +launch background worker processes, initialize their state to match that of +the backend which initiated parallelism, communicate with them via dynamic +shared memory, and write reasonably complex code that can run either in the +user backend or in one of the parallel workers without needing to be aware of +where it's running. + +The backend which starts a parallel operation (hereafter, the initiating +backend) starts by creating a dynamic shared memory segment which will last +for the lifetime of the parallel operation. This dynamic shared memory segment +will contain (1) a shm_mq that can be used to transport errors (and other +messages reported via elog/ereport) from the worker back to the initiating +backend; (2) serialized representations of the initiating backend's private +state, so that the worker can synchronize its state with of the initiating +backend; and (3) any other data structures which a particular user of the +ParallelContext data structure may wish to add for its own purposes. Once +the initiating backend has initialized the dynamic shared memory segment, it +asks the postmaster to launch the appropriate number of parallel workers. +These workers then connect to the dynamic shared memory segment, initiate +their state, and then invoke the appropriate entrypoint, as further detailed +below. + +Error Reporting +=============== + +When started, each parallel worker begins by attaching the dynamic shared +memory segment and locating the shm_mq to be used for error reporting; it +redirects all of its protocol messages to this shm_mq. Prior to this point, +any failure of the background worker will not be reported to the initiating +backend; from the point of view of the initiating backend, the worker simply +failed to start. The initiating backend must anyway be prepared to cope +with fewer parallel workers than it originally requested, so catering to +this case imposes no additional burden. + +Whenever a new message (or partial message; very large messages may wrap) is +sent to the error-reporting queue, PROCSIG_PARALLEL_MESSAGE is sent to the +initiating backend. This causes the next CHECK_FOR_INTERRUPTS() in the +initiating backend to read and rethrow the message. For the most part, this +makes error reporting in parallel mode "just work". Of course, to work +properly, it is important that the code the initiating backend is executing +CHECK_FOR_INTERRUPTS() regularly and avoid blocking interrupt processing for +long periods of time, but those are good things to do anyway. + +(A currently-unsolved problem is that some messages may get written to the +system log twice, once in the backend where the report was originally +generated, and again when the initiating backend rethrows the message. If +we decide to suppress one of these reports, it should probably be second one; +otherwise, if the worker is for some reason unable to propagate the message +back to the initiating backend, the message will be lost altogether.) + +State Sharing +============= + +It's possible to write C code which works correctly without parallelism, but +which fails when parallelism is used. No parallel infrastructure can +completely eliminate this problem, because any global variable is a risk. +There's no general mechanism for ensuring that every global variable in the +worker will have the same value that it does in the initiating backend; even +if we could ensure that, some function we're calling could update the variable +after each call, and only the backend where that update is performed will see +the new value. Similar problems can arise with any more-complex data +structure we might choose to use. For example, a pseudo-random number +generator should, given a particular seed value, produce the same predictable +series of values every time. But it does this by relying on some private +state which won't automatically be shared between cooperating backends. A +parallel-safe PRNG would need to store its state in dynamic shared memory, and +would require locking. The parallelism infrastructure has no way of knowing +whether the user intends to call code that has this sort of problem, and can't +do anything about it anyway. + +Instead, we take a more pragmatic approach. First, we try to make as many of +the operations that are safe outside of parallel mode work correctly in +parallel mode as well. Second, we try to prohibit common unsafe operations +via suitable error checks. These checks are intended to catch 100% of +unsafe things that a user might do from the SQL interface, but code written +in C can do unsafe things that won't trigger these checks. The error checks +are engaged via EnterParallelMode(), which should be called before creating +a parallel context, and disarmed via ExitParallelMode(), which should be +called after all parallel contexts have been destroyed. The most +significant restriction imposed by parallel mode is that all operations must +be strictly read-only; we allow no writes to the database and no DDL. We +might try to relax these restrictions in the future. + +To make as many operations as possible safe in parallel mode, we try to copy +the most important pieces of state from the initiating backend to each parallel +worker. This includes: + + - The set of libraries dynamically loaded by dfmgr.c. + + - The authenticated user ID and current database. Each parallel worker + will connect to the same database as the initiating backend, using the + same user ID. + + - The values of all GUCs. Accordingly, permanent changes to the value of + any GUC are forbidden while in parallel mode; but temporary changes, + such as entering a function with non-NULL proconfig, are OK. + + - The current subtransaction's XID, the top-level transaction's XID, and + the list of XIDs considered current (that is, they are in-progress or + subcommitted). This information is needed to ensure that tuple visibility + checks return the same results in the worker as they do in the + initiating backend. See also the section Transaction Integration, below. + + - The combo CID mappings. This is needed to ensure consistent answers to + tuple visibility checks. The need to synchronize this data structure is + a major reason why we can't support writes in parallel mode: such writes + might create new combo CIDs, and we have no way to let other workers + (or the initiating backend) know about them. + + - The transaction snapshot. + + - The active snapshot, which might be different from the transaction + snapshot. + + - The currently active user ID and security context. Note that this is + the fourth user ID we restore: the initial step of binding to the correct + database also involves restoring the authenticated user ID. When GUC + values are restored, this incidentally sets SessionUserId and OuterUserId + to the correct values. This final step restores CurrentUserId. + +To prevent undetected or unprincipled deadlocks when running in parallel mode, +this could should eventually handle heavyweight locks in some way. This is +not implemented yet. + +Transaction Integration +======================= + +Regardless of what the TransactionState stack looks like in the parallel +leader, each parallel worker ends up with a stack of depth 1. This stack +entry is marked with the special transaction block state +TBLOCK_PARALLEL_INPROGRESS so that it's not confused with an ordinary +toplevel transaction. The XID of this TransactionState is set to the XID of +the innermost currently-active subtransaction in the initiating backend. The +initiating backend's toplevel XID, and the XIDs of all current (in-progress +or subcommitted) XIDs are stored separately from the TransactionState stack, +but in such a way that GetTopTransactionId(), GetTopTransactionIdIfAny(), and +TransactionIdIsCurrentTransactionId() return the same values that they would +in the initiating backend. We could copy the entire transaction state stack, +but most of it would be useless: for example, you can't roll back to a +savepoint from within a parallel worker, and there are no resources to +associated with the memory contexts or resource owners of intermediate +subtransactions. + +No meaningful change to the transaction state can be made while in parallel +mode. No XIDs can be assigned, and no subtransactions can start or end, +because we have no way of communicating these state changes to cooperating +backends, or of synchronizing them. It's clearly unworkable for the initiating +backend to exit any transaction or subtransaction that was in progress when +parallelism was started before all parallel workers have exited; and it's even +more clearly crazy for a parallel worker to try to subcommit or subabort the +current subtransaction and execute in some other transaction context than was +present in the initiating backend. It might be practical to allow internal +sub-transactions (e.g. to implement a PL/pgsql EXCEPTION block) to be used in +parallel mode, provided that they are XID-less, because other backends +wouldn't really need to know about those transactions or do anything +differently because of them. Right now, we don't even allow that. + +At the end of a parallel operation, which can happen either because it +completed successfully or because it was interrupted by an error, parallel +workers associated with that operation exit. In the error case, transaction +abort processing in the parallel leader kills of any remaining workers, and +the parallel leader then waits for them to die. In the case of a successful +parallel operation, the parallel leader does not send any signals, but must +wait for workers to complete and exit of their own volition. In either +case, it is very important that all workers actually exit before the +parallel leader cleans up the (sub)transaction in which they were created; +otherwise, chaos can ensue. For example, if the leader is rolling back the +transaction that created the relation being scanned by a worker, the +relation could disappear while the worker is still busy scanning it. That's +not safe. + +Generally, the cleanup performed by each worker at this point is similar to +top-level commit or abort. Each backend has its own resource owners: buffer +pins, catcache or relcache reference counts, tuple descriptors, and so on +are managed separately by each backend, and must free them before exiting. +There are, however, some important differences between parallel worker +commit or abort and a real top-level transaction commit or abort. Most +importantly: + + - No commit or abort record is written; the initiating backend is + responsible for this. + + - Cleanup of pg_temp namespaces is not done. Parallel workers cannot + safely access the initiating backend's pg_temp namespace, and should + not create one of their own. + +Coding Conventions +=================== + +Before beginning any parallel operation, call EnterParallelMode(); after all +parallel operations are completed, call ExitParallelMode(). To actually +parallelize a particular operation, use a ParallelContext. The basic coding +pattern looks like this: + + EnterParallelMode(); /* prohibit unsafe state changes */ + + pcxt = CreateParallelContext(entrypoint, nworkers); + + /* Allow space for application-specific data here. */ + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, keys); + + InitializeParallelDSM(pcxt); /* create DSM and copy state to it */ + + /* Store the data for which we reserved space. */ + space = shm_toc_allocate(pcxt->toc, size); + shm_toc_insert(pcxt->toc, key, space); + + LaunchParallelWorkers(pcxt); + + /* do parallel stuff */ + + WaitForParallelWorkersToFinish(pcxt); + + /* read any final results from dynamic shared memory */ + + DestroyParallelContext(pcxt); + + ExitParallelMode(); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c new file mode 100644 index 0000000000..8ed7314b58 --- /dev/null +++ b/src/backend/access/transam/parallel.c @@ -0,0 +1,1007 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/parallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/parallel.h" +#include "commands/async.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/sinval.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/combocid.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/snapmgr.h" + +/* + * We don't want to waste a lot of memory on an error queue which, most of + * the time, will process only a handful of small messages. However, it is + * desirable to make it large enough that a typical ErrorResponse can be sent + * without blocking. That way, a worker that errors out can write the whole + * message into the queue and terminate without waiting for the user backend. + */ +#define PARALLEL_ERROR_QUEUE_SIZE 16384 + +/* Magic number for parallel context TOC. */ +#define PARALLEL_MAGIC 0x50477c7c + +/* + * Magic numbers for parallel state sharing. Higher-level code should use + * smaller values, leaving these very large ones for use by this module. + */ +#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) +#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) +#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003) +#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004) +#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005) +#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) +#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) +#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) +#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) + +/* Fixed-size parallel state. */ +typedef struct FixedParallelState +{ + /* Fixed-size state that workers must restore. */ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; + PGPROC *parallel_master_pgproc; + pid_t parallel_master_pid; + BackendId parallel_master_backend_id; + + /* Entrypoint for parallel workers. */ + parallel_worker_main_type entrypoint; + + /* Mutex protects remaining fields. */ + slock_t mutex; + + /* Track whether workers have attached. */ + int workers_expected; + int workers_attached; + + /* Maximum XactLastRecEnd of any worker. */ + XLogRecPtr last_xlog_end; +} FixedParallelState; + +/* + * Our parallel worker number. We initialize this to -1, meaning that we are + * not a parallel worker. In parallel workers, it will be set to a value >= 0 + * and < the number of workers before any user code is invoked; each parallel + * worker will get a different parallel worker number. + */ +int ParallelWorkerNumber = -1; + +/* Is there a parallel message pending which we need to receive? */ +bool ParallelMessagePending = false; + +/* Pointer to our fixed parallel state. */ +static FixedParallelState *MyFixedParallelState; + +/* List of active parallel contexts. */ +static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); + +/* Private functions. */ +static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void ParallelErrorContext(void *arg); +static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); +static void ParallelWorkerMain(Datum main_arg); + +/* + * Establish a new parallel context. This should be done after entering + * parallel mode, and (unless there is an error) the context should be + * destroyed before exiting the current subtransaction. + */ +ParallelContext * +CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* It is unsafe to create a parallel context if not in parallel mode. */ + Assert(IsInParallelMode()); + + /* Number of workers should be non-negative. */ + Assert(nworkers >= 0); + + /* + * If dynamic shared memory is not available, we won't be able to use + * background workers. + */ + if (dynamic_shared_memory_type == DSM_IMPL_NONE) + nworkers = 0; + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize a new ParallelContext. */ + pcxt = palloc0(sizeof(ParallelContext)); + pcxt->subid = GetCurrentSubTransactionId(); + pcxt->nworkers = nworkers; + pcxt->entrypoint = entrypoint; + pcxt->error_context_stack = error_context_stack; + shm_toc_initialize_estimator(&pcxt->estimator); + dlist_push_head(&pcxt_list, &pcxt->node); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish a new parallel context that calls a function provided by an + * extension. This works around the fact that the library might get mapped + * at a different address in each backend. + */ +ParallelContext * +CreateParallelContextForExternalFunction(char *library_name, + char *function_name, + int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Create the context. */ + pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); + pcxt->library_name = pstrdup(library_name); + pcxt->function_name = pstrdup(function_name); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish the dynamic shared memory segment for a parallel context and + * copied state and other bookkeeping information that will need by parallel + * workers into it. + */ +void +InitializeParallelDSM(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + Size library_len = 0; + Size guc_len = 0; + Size combocidlen = 0; + Size tsnaplen = 0; + Size asnaplen = 0; + Size tstatelen = 0; + Size segsize = 0; + int i; + FixedParallelState *fps; + Snapshot transaction_snapshot = GetTransactionSnapshot(); + Snapshot active_snapshot = GetActiveSnapshot(); + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Allow space to store the fixed-size parallel state. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Normally, the user will have requested at least one worker process, + * but if by chance they have not, we can skip a bunch of things here. + */ + if (pcxt->nworkers > 0) + { + /* Estimate space for various kinds of state sharing. */ + library_len = EstimateLibraryStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, library_len); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, guc_len); + combocidlen = EstimateComboCIDStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, combocidlen); + tsnaplen = EstimateSnapshotSpace(transaction_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); + asnaplen = EstimateSnapshotSpace(active_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); + tstatelen = EstimateTransactionStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + /* If you add more chunks here, you probably need to add keys. */ + shm_toc_estimate_keys(&pcxt->estimator, 6); + + /* Estimate space need for error queues. */ + StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == + PARALLEL_ERROR_QUEUE_SIZE, + "parallel error queue size not buffer-aligned"); + shm_toc_estimate_chunk(&pcxt->estimator, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate how much we'll need for extension entrypoint info. */ + if (pcxt->library_name != NULL) + { + Assert(pcxt->entrypoint == ParallelExtensionTrampoline); + Assert(pcxt->function_name != NULL); + shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + + strlen(pcxt->function_name) + 2); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + } + + /* + * Create DSM and initialize with new table of contents. But if the user + * didn't request any workers, then don't bother creating a dynamic shared + * memory segment; instead, just use backend-private memory. + * + * Also, if we can't create a dynamic shared memory segment because the + * maximum number of segments have already been created, then fall back + * to backend-private memory, and plan not to use any workers. We hope + * this won't happen very often, but it's better to abandon the use of + * parallelism than to fail outright. + */ + segsize = shm_toc_estimate(&pcxt->estimator); + if (pcxt->nworkers != 0) + pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (pcxt->seg != NULL) + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, + dsm_segment_address(pcxt->seg), + segsize); + else + { + pcxt->nworkers = 0; + pcxt->private = MemoryContextAlloc(TopMemoryContext, segsize); + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private, segsize); + } + + /* Initialize fixed-size state in shared memory. */ + fps = (FixedParallelState *) + shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState)); + fps->database_id = MyDatabaseId; + fps->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context); + fps->parallel_master_pgproc = MyProc; + fps->parallel_master_pid = MyProcPid; + fps->parallel_master_backend_id = MyBackendId; + fps->entrypoint = pcxt->entrypoint; + SpinLockInit(&fps->mutex); + fps->workers_expected = pcxt->nworkers; + fps->workers_attached = 0; + fps->last_xlog_end = 0; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); + + /* We can skip the rest of this if we're not budgeting for any workers. */ + if (pcxt->nworkers > 0) + { + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + char *error_queue_space; + + /* Serialize shared libraries we have loaded. */ + libraryspace = shm_toc_allocate(pcxt->toc, library_len); + SerializeLibraryState(library_len, libraryspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace); + + /* Serialize GUC settings. */ + gucspace = shm_toc_allocate(pcxt->toc, guc_len); + SerializeGUCState(guc_len, gucspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace); + + /* Serialize combo CID state. */ + combocidspace = shm_toc_allocate(pcxt->toc, combocidlen); + SerializeComboCIDState(combocidlen, combocidspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); + + /* Serialize transaction snapshot and active snapshot. */ + tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); + SerializeSnapshot(transaction_snapshot, tsnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, + tsnapspace); + asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); + SerializeSnapshot(active_snapshot, asnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + + /* Serialize transaction state. */ + tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); + SerializeTransactionState(tstatelen, tstatespace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); + + /* Allocate space for worker information. */ + pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); + + /* + * Establish error queues in dynamic shared memory. + * + * These queues should be used only for transmitting ErrorResponse, + * NoticeResponse, and NotifyResponse protocol messages. Tuple data + * should be transmitted via separate (possibly larger?) queues. + */ + error_queue_space = + shm_toc_allocate(pcxt->toc, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); + + /* Serialize extension entrypoint information. */ + if (pcxt->library_name != NULL) + { + Size lnamelen = strlen(pcxt->library_name); + char *extensionstate; + + extensionstate = shm_toc_allocate(pcxt->toc, lnamelen + + strlen(pcxt->function_name) + 2); + strcpy(extensionstate, pcxt->library_name); + strcpy(extensionstate + lnamelen + 1, pcxt->function_name); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, + extensionstate); + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Launch parallel workers. + */ +void +LaunchParallelWorkers(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + int i; + bool any_registrations_failed = false; + + /* Skip this if we have no workers. */ + if (pcxt->nworkers == 0) + return; + + /* If we do have workers, we'd better have a DSM segment. */ + Assert(pcxt->seg != NULL); + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Configure a worker. */ + snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", + MyProcPid); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = ParallelWorkerMain; + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); + worker.bgw_notify_pid = MyProcPid; + + /* + * Start workers. + * + * The caller must be able to tolerate ending up with fewer workers than + * expected, so there is no need to throw an error here if registration + * fails. It wouldn't help much anyway, because registering the worker + * in no way guarantees that it will start up and initialize successfully. + */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (!any_registrations_failed && + RegisterDynamicBackgroundWorker(&worker, + &pcxt->worker[i].bgwhandle)) + shm_mq_set_handle(pcxt->worker[i].error_mqh, + pcxt->worker[i].bgwhandle); + else + { + /* + * If we weren't able to register the worker, then we've bumped + * up against the max_worker_processes limit, and future + * registrations will probably fail too, so arrange to skip them. + * But we still have to execute this code for the remaining slots + * to make sure that we forget about the error queues we budgeted + * for those workers. Otherwise, we'll wait for them to start, + * but they never will. + */ + any_registrations_failed = true; + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Wait for all workers to exit. + * + * Even if the parallel operation seems to have completed successfully, it's + * important to call this function afterwards. We must not miss any errors + * the workers may have thrown during the parallel operation, or any that they + * may yet throw while shutting down. + * + * Also, we want to update our notion of XactLastRecEnd based on worker + * feedback. + */ +void +WaitForParallelWorkersToFinish(ParallelContext *pcxt) +{ + for (;;) + { + bool anyone_alive = false; + int i; + + /* + * This will process any parallel messages that are pending, which + * may change the outcome of the loop that follows. It may also + * throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + { + anyone_alive = true; + break; + } + } + + if (!anyone_alive) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1); + ResetLatch(&MyProc->procLatch); + } + + if (pcxt->toc != NULL) + { + FixedParallelState *fps; + + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + if (fps->last_xlog_end > XactLastRecEnd) + XactLastRecEnd = fps->last_xlog_end; + } +} + +/* + * Destroy a parallel context. + * + * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() + * first, before calling this function. When this function is invoked, any + * remaining workers are forcibly killed; the dynamic shared memory segment + * is unmapped; and we then wait (uninterruptibly) for the workers to exit. + */ +void +DestroyParallelContext(ParallelContext *pcxt) +{ + int i; + + /* + * Be careful about order of operations here! We remove the parallel + * context from the list before we do anything else; otherwise, if an + * error occurs during a subsequent step, we might try to nuke it again + * from AtEOXact_Parallel or AtEOSubXact_Parallel. + */ + dlist_delete(&pcxt->node); + + /* Kill each worker in turn, and forget their error queues. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].bgwhandle != NULL) + TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); + if (pcxt->worker[i].error_mqh != NULL) + { + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].error_mqh = NULL; + } + } + + /* + * If we have allocated a shared memory segment, detach it. This will + * implicitly detach the error queues, and any other shared memory queues, + * stored there. + */ + if (pcxt->seg != NULL) + { + dsm_detach(pcxt->seg); + pcxt->seg = NULL; + } + + /* + * If this parallel context is actually in backend-private memory rather + * than shared memory, free that memory instead. + */ + if (pcxt->private != NULL) + { + pfree(pcxt->private); + pcxt->private = NULL; + } + + /* Wait until the workers actually die. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + BgwHandleStatus status; + + if (pcxt->worker[i].bgwhandle == NULL) + continue; + + /* + * We can't finish transaction commit or abort until all of the + * workers are dead. This means, in particular, that we can't respond + * to interrupts at this stage. + */ + HOLD_INTERRUPTS(); + status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); + RESUME_INTERRUPTS(); + + /* + * If the postmaster kicked the bucket, we have no chance of cleaning + * up safely -- we won't be able to tell when our workers are actually + * dead. This doesn't necessitate a PANIC since they will all abort + * eventually, but we can't safely continue this session. + */ + if (status == BGWH_POSTMASTER_DIED) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("postmaster exited during a parallel transaction"))); + + /* Release memory. */ + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + + /* Free the worker array itself. */ + if (pcxt->worker != NULL) + { + pfree(pcxt->worker); + pcxt->worker = NULL; + } + + /* Free memory. */ + pfree(pcxt); +} + +/* + * Are there any parallel contexts currently active? + */ +bool +ParallelContextActive(void) +{ + return !dlist_is_empty(&pcxt_list); +} + +/* + * Handle receipt of an interrupt indicating a parallel worker message. + */ +void +HandleParallelMessageInterrupt(void) +{ + int save_errno = errno; + + InterruptPending = true; + ParallelMessagePending = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Handle any queued protocol messages received from parallel workers. + */ +void +HandleParallelMessages(void) +{ + dlist_iter iter; + + ParallelMessagePending = false; + + dlist_foreach(iter, &pcxt_list) + { + ParallelContext *pcxt; + int i; + Size nbytes; + void *data; + + pcxt = dlist_container(ParallelContext, node, iter.cur); + if (pcxt->worker == NULL) + continue; + + for (i = 0; i < pcxt->nworkers; ++i) + { + /* + * Read as many messages as we can from each worker, but stop + * when either (1) the error queue goes away, which can happen if + * we receive a Terminate message from the worker; or (2) no more + * messages can be read from the worker without blocking. + */ + while (pcxt->worker[i].error_mqh != NULL) + { + shm_mq_result res; + + res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, + &data, true); + if (res == SHM_MQ_WOULD_BLOCK) + break; + else if (res == SHM_MQ_SUCCESS) + { + StringInfoData msg; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + HandleParallelMessage(pcxt, i, &msg); + pfree(msg.data); + } + else + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ + errmsg("lost connection to parallel worker"))); + + /* This might make the error queue go away. */ + CHECK_FOR_INTERRUPTS(); + } + } + } +} + +/* + * Handle a single protocol message received from a single parallel worker. + */ +static void +HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) +{ + char msgtype; + + msgtype = pq_getmsgbyte(msg); + + switch (msgtype) + { + case 'K': /* BackendKeyData */ + { + int32 pid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 4); /* discard cancel key */ + (void) pq_getmsgend(msg); + pcxt->worker[i].pid = pid; + break; + } + + case 'E': /* ErrorResponse */ + case 'N': /* NoticeResponse */ + { + ErrorData edata; + ErrorContextCallback errctx; + ErrorContextCallback *save_error_context_stack; + + /* + * Rethrow the error using the error context callbacks that + * were in effect when the context was created, not the + * current ones. + */ + save_error_context_stack = error_context_stack; + errctx.callback = ParallelErrorContext; + errctx.arg = &pcxt->worker[i].pid; + errctx.previous = pcxt->error_context_stack; + error_context_stack = &errctx; + + /* Parse ErrorReponse or NoticeResponse. */ + pq_parse_errornotice(msg, &edata); + + /* Death of a worker isn't enough justification for suicide. */ + edata.elevel = Min(edata.elevel, ERROR); + + /* Rethrow error or notice. */ + ThrowErrorData(&edata); + + /* Restore previous context. */ + error_context_stack = save_error_context_stack; + + break; + } + + case 'A': /* NotifyResponse */ + { + /* Propagate NotifyResponse. */ + pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + break; + } + + case 'X': /* Terminate, indicating clean exit */ + { + pfree(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + break; + } + + default: + { + elog(ERROR, "unknown message type: %c (%d bytes)", + msgtype, msg->len); + } + } +} + +/* + * End-of-subtransaction cleanup for parallel contexts. + * + * Currently, it's forbidden to enter or leave a subtransaction while + * parallel mode is in effect, so we could just blow away everything. But + * we may want to relax that restriction in the future, so this code + * contemplates that there may be multiple subtransaction IDs in pcxt_list. + */ +void +AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) +{ + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (pcxt->subid != mySubId) + break; + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * End-of-transaction cleanup for parallel contexts. + */ +void +AtEOXact_Parallel(bool isCommit) +{ + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * Main entrypoint for parallel workers. + */ +static void +ParallelWorkerMain(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + FixedParallelState *fps; + char *error_queue_space; + shm_mq *mq; + shm_mq_handle *mqh; + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + StringInfoData msgbuf; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "parallel worker", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * Now that we have a resource owner, we can attach to the dynamic + * shared memory segment and read the table of contents. + */ + seg = dsm_attach(DatumGetUInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Determine and set our worker number. */ + fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); + Assert(fps != NULL); + Assert(ParallelWorkerNumber == -1); + SpinLockAcquire(&fps->mutex); + if (fps->workers_attached < fps->workers_expected) + ParallelWorkerNumber = fps->workers_attached++; + SpinLockRelease(&fps->mutex); + if (ParallelWorkerNumber < 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many parallel workers already attached"))); + MyFixedParallelState = fps; + + /* + * Now that we have a worker number, we can find and attach to the error + * queue provided for us. That's good, because until we do that, any + * errors that happen here will not be reported back to the process that + * requested that this worker be launched. + */ + error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); + mq = (shm_mq *) (error_queue_space + + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_sender(mq, MyProc); + mqh = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(mq, mqh); + pq_set_parallel_master(fps->parallel_master_pid, + fps->parallel_master_backend_id); + + /* + * Send a BackendKeyData message to the process that initiated parallelism + * so that it has access to our PID before it receives any other messages + * from us. Our cancel key is sent, too, since that's the way the protocol + * message is defined, but it won't actually be used for anything in this + * case. + */ + pq_beginmessage(&msgbuf, 'K'); + pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32)); + pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32)); + pq_endmessage(&msgbuf); + + /* + * Hooray! Primary initialization is complete. Now, we need to set up + * our backend-local state to match the original backend. + */ + + /* + * Load libraries that were loaded by original backend. We want to do this + * before restoring GUCs, because the libraries might define custom + * variables. + */ + libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); + Assert(libraryspace != NULL); + RestoreLibraryState(libraryspace); + + /* Restore database connection. */ + BackgroundWorkerInitializeConnectionByOid(fps->database_id, + fps->authenticated_user_id); + + /* Restore GUC values from launching backend. */ + gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); + Assert(gucspace != NULL); + StartTransactionCommand(); + RestoreGUCState(gucspace); + CommitTransactionCommand(); + + /* Crank up a transaction state appropriate to a parallel worker. */ + tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); + StartParallelWorkerTransaction(tstatespace); + + /* Restore combo CID state. */ + combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); + Assert(combocidspace != NULL); + RestoreComboCIDState(combocidspace); + + /* Restore transaction snapshot. */ + tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); + Assert(tsnapspace != NULL); + RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), + fps->parallel_master_pgproc); + + /* Restore active snapshot. */ + asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); + Assert(asnapspace != NULL); + PushActiveSnapshot(RestoreSnapshot(asnapspace)); + + /* Restore user ID and security context. */ + SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); + + /* + * We've initialized all of our state now; nothing should change hereafter. + */ + EnterParallelMode(); + + /* + * Time to do the real work: invoke the caller-supplied code. + * + * If you get a crash at this line, see the comments for + * ParallelExtensionTrampoline. + */ + fps->entrypoint(seg, toc); + + /* Must exit parallel mode to pop active snapshot. */ + ExitParallelMode(); + + /* Must pop active snapshot so resowner.c doesn't complain. */ + PopActiveSnapshot(); + + /* Shut down the parallel-worker transaction. */ + EndParallelWorkerTransaction(); + + /* Report success. */ + pq_putmessage('X', NULL, 0); +} + +/* + * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a + * function living in a dynamically loaded module, because the module might + * not be loaded in every process, or might be loaded but not at the same + * address. To work around that problem, CreateParallelContextForExtension() + * arranges to call this function rather than calling the extension-provided + * function directly; and this function then looks up the real entrypoint and + * calls it. + */ +static void +ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) +{ + char *extensionstate; + char *library_name; + char *function_name; + parallel_worker_main_type entrypt; + + extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); + Assert(extensionstate != NULL); + library_name = extensionstate; + function_name = extensionstate + strlen(library_name) + 1; + + entrypt = (parallel_worker_main_type) + load_external_function(library_name, function_name, true, NULL); + entrypt(seg, toc); +} + +/* + * Give the user a hint that this is a message propagated from a parallel + * worker. Otherwise, it can sometimes be confusing to understand what + * actually happened. + */ +static void +ParallelErrorContext(void *arg) +{ + errcontext("parallel worker, pid %d", * (int32 *) arg); +} + +/* + * Update shared memory with the ending location of the last WAL record we + * wrote, if it's greater than the value already stored there. + */ +void +ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) +{ + FixedParallelState *fps = MyFixedParallelState; + + Assert(fps != NULL); + SpinLockAcquire(&fps->mutex); + if (fps->last_xlog_end < last_xlog_end) + fps->last_xlog_end = last_xlog_end; + SpinLockRelease(&fps->mutex); +} diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 42ee57fe8d..cf3e964fc6 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -49,6 +49,13 @@ GetNewTransactionId(bool isSubXact) { TransactionId xid; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new XIDs after that point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign TransactionIds during a parallel operation"); + /* * During bootstrap initialization, we return the special bootstrap * transaction id. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 511bcbbc51..a8f78d6376 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -22,6 +22,7 @@ #include "access/commit_ts.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -51,6 +52,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" +#include "utils/builtins.h" #include "utils/catcache.h" #include "utils/combocid.h" #include "utils/guc.h" @@ -77,6 +79,33 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; +/* + * When running as a parallel worker, we place only a single + * TransactionStateData on the parallel worker's state stack, and the XID + * reflected there will be that of the *innermost* currently-active + * subtransaction in the backend that initiated paralllelism. However, + * GetTopTransactionId() and TransactionIdIsCurrentTransactionId() + * need to return the same answers in the parallel worker as they would have + * in the user backend, so we need some additional bookkeeping. + * + * XactTopTransactionId stores the XID of our toplevel transaction, which + * will be the same as TopTransactionState.transactionId in an ordinary + * backend; but in a parallel backend, which does not have the entire + * transaction state, it will instead be copied from the backend that started + * the parallel operation. + * + * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary + * backend, but in a parallel backend, nParallelCurrentXids will contain the + * number of XIDs that need to be considered current, and ParallelCurrentXids + * will contain the XIDs themselves. This includes all XIDs that were current + * or sub-committed in the parent at the time the parallel operation began. + * The XIDs are stored sorted in numerical order (not logical order) to make + * lookups as fast as possible. + */ +TransactionId XactTopTransactionId = InvalidTransactionId; +int nParallelCurrentXids = 0; +TransactionId *ParallelCurrentXids; + /* * MyXactAccessedTempRel is set when a temporary relation is accessed. * We don't allow PREPARE TRANSACTION in that case. (This is global @@ -113,6 +142,7 @@ typedef enum TBlockState /* transaction block states */ TBLOCK_BEGIN, /* starting transaction block */ TBLOCK_INPROGRESS, /* live transaction */ + TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */ TBLOCK_END, /* COMMIT received */ TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ @@ -154,6 +184,7 @@ typedef struct TransactionStateData bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ bool didLogXid; /* has xid been included in WAL record? */ + int parallelModeLevel; /* Enter/ExitParallelMode counter */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -184,6 +215,7 @@ static TransactionStateData TopTransactionStateData = { false, /* entry-time xact r/o state */ false, /* startedInRecovery */ false, /* didLogXid */ + 0, /* parallelMode */ NULL /* link to parent state block */ }; @@ -353,9 +385,9 @@ IsAbortedTransactionBlockState(void) TransactionId GetTopTransactionId(void) { - if (!TransactionIdIsValid(TopTransactionStateData.transactionId)) + if (!TransactionIdIsValid(XactTopTransactionId)) AssignTransactionId(&TopTransactionStateData); - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -368,7 +400,7 @@ GetTopTransactionId(void) TransactionId GetTopTransactionIdIfAny(void) { - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -461,6 +493,13 @@ AssignTransactionId(TransactionState s) Assert(!TransactionIdIsValid(s->transactionId)); Assert(s->state == TRANS_INPROGRESS); + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new XIDs at this point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign XIDs during a parallel operation"); + /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Musn't recurse here, or we might get a stack overflow @@ -513,6 +552,8 @@ AssignTransactionId(TransactionState s) * the Xid as "running". See GetNewTransactionId. */ s->transactionId = GetNewTransactionId(isSubXact); + if (!isSubXact) + XactTopTransactionId = s->transactionId; if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); @@ -644,7 +685,16 @@ GetCurrentCommandId(bool used) { /* this is global to a transaction, not subtransaction-local */ if (used) + { + /* + * Forbid setting currentCommandIdUsed in parallel mode, because we + * have no provision for communicating this back to the master. We + * could relax this restriction when currentCommandIdUsed was already + * true at the start of the parallel operation. + */ + Assert(CurrentTransactionState->parallelModeLevel == 0); currentCommandIdUsed = true; + } return currentCommandId; } @@ -737,6 +787,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) if (!TransactionIdIsNormal(xid)) return false; + /* + * In parallel workers, the XIDs we must consider as current are stored + * in ParallelCurrentXids rather than the transaction-state stack. Note + * that the XIDs in this array are sorted numerically rather than + * according to transactionIdPrecedes order. + */ + if (nParallelCurrentXids > 0) + { + int low, + high; + + low = 0; + high = nParallelCurrentXids - 1; + while (low <= high) + { + int middle; + TransactionId probe; + + middle = low + (high - low) / 2; + probe = ParallelCurrentXids[middle]; + if (probe == xid) + return true; + else if (probe < xid) + low = middle + 1; + else + high = middle - 1; + } + return false; + } + /* * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their @@ -790,6 +870,48 @@ TransactionStartedDuringRecovery(void) return CurrentTransactionState->startedInRecovery; } +/* + * EnterParallelMode + */ +void +EnterParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelModeLevel >= 0); + + ++s->parallelModeLevel; +} + +/* + * ExitParallelMode + */ +void +ExitParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelModeLevel > 0); + Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); + + --s->parallelModeLevel; +} + +/* + * IsInParallelMode + * + * Are we in a parallel operation, as either the master or a worker? Check + * this to prohibit operations that change backend-local state expected to + * match across all workers. Mere caches usually don't require such a + * restriction. State modified in a strict push/pop fashion, such as the + * active snapshot stack, is often fine. + */ +bool +IsInParallelMode(void) +{ + return CurrentTransactionState->parallelModeLevel != 0; +} + /* * CommandCounterIncrement */ @@ -804,6 +926,14 @@ CommandCounterIncrement(void) */ if (currentCommandIdUsed) { + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new commands after that + * point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot start commands during a parallel operation"); + currentCommandId += 1; if (currentCommandId == InvalidCommandId) { @@ -1650,6 +1780,8 @@ StartTransaction(void) s = &TopTransactionStateData; CurrentTransactionState = s; + Assert(XactTopTransactionId == InvalidTransactionId); + /* * check the current transaction state */ @@ -1779,6 +1911,9 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool is_parallel_worker; + + is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); ShowTransactionState("CommitTransaction"); @@ -1812,7 +1947,8 @@ CommitTransaction(void) break; } - CallXactCallbacks(XACT_EVENT_PRE_COMMIT); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT + : XACT_EVENT_PRE_COMMIT); /* * The remaining actions cannot call any user-defined code, so it's safe @@ -1821,6 +1957,13 @@ CommitTransaction(void) * the transaction-abort path. */ + /* If we might have parallel workers, clean them up now. */ + if (IsInParallelMode()) + { + AtEOXact_Parallel(true); + s->parallelModeLevel = 0; + } + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -1859,10 +2002,28 @@ CommitTransaction(void) */ s->state = TRANS_COMMIT; - /* - * Here is where we really truly commit. - */ - latestXid = RecordTransactionCommit(); + if (!is_parallel_worker) + { + /* + * We need to mark our XIDs as commited in pg_clog. This is where we + * durably commit. + */ + latestXid = RecordTransactionCommit(); + } + else + { + /* + * We must not mark our XID committed; the parallel master is + * responsible for that. + */ + latestXid = InvalidTransactionId; + + /* + * Make sure the master will know about any WAL we wrote before it + * commits. + */ + ParallelWorkerReportLastRecEnd(XactLastRecEnd); + } TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); @@ -1889,7 +2050,8 @@ CommitTransaction(void) * state. */ - CallXactCallbacks(XACT_EVENT_COMMIT); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT + : XACT_EVENT_COMMIT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -1937,7 +2099,7 @@ CommitTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + AtEOXact_Namespace(true, is_parallel_worker); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -1962,6 +2124,9 @@ CommitTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with commit processing, set current transaction state back to * default @@ -1985,6 +2150,8 @@ PrepareTransaction(void) GlobalTransaction gxact; TimestampTz prepared_at; + Assert(!IsInParallelMode()); + ShowTransactionState("PrepareTransaction"); /* @@ -2204,7 +2371,7 @@ PrepareTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + AtEOXact_Namespace(true, false); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2229,6 +2396,9 @@ PrepareTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with 1st phase commit processing, set current transaction state * back to default @@ -2247,6 +2417,7 @@ AbortTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool is_parallel_worker; /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -2295,6 +2466,7 @@ AbortTransaction(void) /* * check the current transaction state */ + is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); @@ -2318,6 +2490,13 @@ AbortTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* If in parallel mode, clean up workers and exit parallel mode. */ + if (IsInParallelMode()) + { + AtEOXact_Parallel(false); + s->parallelModeLevel = 0; + } + /* * do abort processing */ @@ -2330,9 +2509,23 @@ AbortTransaction(void) /* * Advertise the fact that we aborted in pg_clog (assuming that we got as - * far as assigning an XID to advertise). + * far as assigning an XID to advertise). But if we're inside a parallel + * worker, skip this; the user backend must be the one to write the abort + * record. */ - latestXid = RecordTransactionAbort(false); + if (!is_parallel_worker) + latestXid = RecordTransactionAbort(false); + else + { + latestXid = InvalidTransactionId; + + /* + * Since the parallel master won't get our value of XactLastRecEnd in this + * case, we nudge WAL-writer ourselves in this case. See related comments in + * RecordTransactionAbort for why this matters. + */ + XLogSetAsyncXactLSN(XactLastRecEnd); + } TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2350,7 +2543,10 @@ AbortTransaction(void) */ if (TopTransactionResourceOwner != NULL) { - CallXactCallbacks(XACT_EVENT_ABORT); + if (is_parallel_worker) + CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT); + else + CallXactCallbacks(XACT_EVENT_ABORT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -2371,7 +2567,7 @@ AbortTransaction(void) AtEOXact_GUC(false, 1); AtEOXact_SPI(false); AtEOXact_on_commit_actions(false); - AtEOXact_Namespace(false); + AtEOXact_Namespace(false, is_parallel_worker); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2423,6 +2619,10 @@ CleanupTransaction(void) s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0; + s->parallelModeLevel = 0; + + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; /* * done with abort processing, set current transaction state back to @@ -2476,6 +2676,7 @@ StartTransactionCommand(void) /* These cases are invalid. */ case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -2511,11 +2712,13 @@ CommitTransactionCommand(void) switch (s->blockState) { /* - * This shouldn't happen, because it means the previous + * These shouldn't happen. TBLOCK_DEFAULT means the previous * StartTransactionCommand didn't set the STARTED state - * appropriately. + * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended + * by EndParallelWorkerTranaction(), not this function. */ case TBLOCK_DEFAULT: + case TBLOCK_PARALLEL_INPROGRESS: elog(FATAL, "CommitTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2797,6 +3000,7 @@ AbortCurrentTransaction(void) * ABORT state. We will stay in ABORT until we get a ROLLBACK. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: AbortTransaction(); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ @@ -3186,6 +3390,7 @@ BeginTransactionBlock(void) * Already a transaction block in progress. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_ABORT: case TBLOCK_SUBABORT: @@ -3363,6 +3568,16 @@ EndTransactionBlock(void) result = true; break; + /* + * The user issued a COMMIT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3456,6 +3671,16 @@ UserAbortTransactionBlock(void) s->blockState = TBLOCK_ABORT_PENDING; break; + /* + * The user issued an ABORT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot abort during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3485,6 +3710,18 @@ DefineSavepoint(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot define savepoints during a parallel operation"))); + switch (s->blockState) { case TBLOCK_INPROGRESS: @@ -3505,6 +3742,7 @@ DefineSavepoint(char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3539,6 +3777,18 @@ ReleaseSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot release savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3562,6 +3812,7 @@ ReleaseSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3639,6 +3890,18 @@ RollbackToSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot rollback to savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3663,6 +3926,7 @@ RollbackToSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3751,6 +4015,20 @@ BeginInternalSubTransaction(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that point. + * We might be able to make an exception for the type of subtransaction + * established by this function, which is typically used in contexts where + * we're going to release or roll back the subtransaction before proceeding + * further, so that no enduring change to the transaction state occurs. + * For now, however, we prohibit this case along with all the others. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start subtransactions during a parallel operation"))); + switch (s->blockState) { case TBLOCK_STARTED: @@ -3773,6 +4051,7 @@ BeginInternalSubTransaction(char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -3805,6 +4084,18 @@ ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for commit of subtransactions after that + * point. This should not happen anyway. Code calling this would + * typically have called BeginInternalSubTransaction() first, failing + * there. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit subtransactions during a parallel operation"))); + if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); @@ -3827,6 +4118,14 @@ RollbackAndReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted + * during parallel operations. That's because we may be in the master, + * recovering from an error thrown while we were in parallel mode. We + * won't reach here in a worker, because BeginInternalSubTransaction() + * will have failed. + */ + switch (s->blockState) { /* Must be in a subtransaction */ @@ -3838,6 +4137,7 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: case TBLOCK_END: @@ -3913,6 +4213,7 @@ AbortOutOfAnyTransaction(void) case TBLOCK_STARTED: case TBLOCK_BEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: case TBLOCK_PREPARE: @@ -4004,6 +4305,7 @@ TransactionBlockStatusCode(void) case TBLOCK_BEGIN: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4107,6 +4409,13 @@ CommitSubTransaction(void) CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, s->parent->subTransactionId); + /* If in parallel mode, clean up workers and exit parallel mode. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(true, s->subTransactionId); + s->parallelModeLevel = 0; + } + /* Do the actual "commit", such as it is */ s->state = TRANS_COMMIT; @@ -4260,6 +4569,13 @@ AbortSubTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* Exit from parallel mode, if necessary. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(false, s->subTransactionId); + s->parallelModeLevel = 0; + } + /* * We can skip all this stuff if the subxact failed before creating a * ResourceOwner... @@ -4400,6 +4716,7 @@ PushTransaction(void) s->blockState = TBLOCK_SUBBEGIN; GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; + s->parallelModeLevel = 0; CurrentTransactionState = s; @@ -4446,6 +4763,139 @@ PopTransaction(void) pfree(s); } +/* + * EstimateTransactionStateSpace + * Estimate the amount of space that will be needed by + * SerializeTransactionState. It would be OK to overestimate slightly, + * but it's simple for us to work out the precise value, so we do. + */ +Size +EstimateTransactionStateSpace(void) +{ + TransactionState s; + Size nxids = 5; /* iso level, deferrable, top & current XID, XID count */ + + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + + nxids = add_size(nxids, nParallelCurrentXids); + return mul_size(nxids, sizeof(TransactionId)); +} + +/* + * SerializeTransactionState + * Write out relevant details of our transaction state that will be + * needed by a parallel worker. + * + * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs + * associated with this transaction. The first eight bytes of the result + * contain XactDeferrable and XactIsoLevel; the next eight bytes contain the + * XID of the top-level transaction and the XID of the current transaction + * (or, in each case, InvalidTransactionId if none). After that, the next 4 + * bytes contain a count of how many additional XIDs follow; this is followed + * by all of those XIDs one after another. We emit the XIDs in sorted order + * for the convenience of the receiving process. + */ +void +SerializeTransactionState(Size maxsize, char *start_address) +{ + TransactionState s; + Size nxids = 0; + Size i = 0; + TransactionId *workspace; + TransactionId *result = (TransactionId *) start_address; + + Assert(maxsize >= 5 * sizeof(TransactionId)); + result[0] = (TransactionId) XactIsoLevel; + result[1] = (TransactionId) XactDeferrable; + result[2] = XactTopTransactionId; + result[3] = CurrentTransactionState->transactionId; + + /* + * If we're running in a parallel worker and launching a parallel worker + * of our own, we can just pass along the information that was passed to + * us. + */ + if (nParallelCurrentXids > 0) + { + Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId)); + result[4] = nParallelCurrentXids; + memcpy(&result[5], ParallelCurrentXids, + nParallelCurrentXids * sizeof(TransactionId)); + return; + } + + /* + * OK, we need to generate a sorted list of XIDs that our workers + * should view as current. First, figure out how many there are. + */ + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + Assert(nxids * sizeof(TransactionId) < maxsize); + + /* Copy them to our scratch space. */ + workspace = palloc(nxids * sizeof(TransactionId)); + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + workspace[i++] = s->transactionId; + memcpy(&workspace[i], s->childXids, + s->nChildXids * sizeof(TransactionId)); + i += s->nChildXids; + } + Assert(i == nxids); + + /* Sort them. */ + qsort(workspace, nxids, sizeof(TransactionId), xidComparator); + + /* Copy data into output area. */ + result[4] = (TransactionId) nxids; + memcpy(&result[5], workspace, nxids * sizeof(TransactionId)); +} + +/* + * StartParallelWorkerTransaction + * Start a parallel worker transaction, restoring the relevant + * transaction state serialized by SerializeTransactionState. + */ +void +StartParallelWorkerTransaction(char *tstatespace) +{ + TransactionId *tstate = (TransactionId *) tstatespace; + + Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT); + StartTransaction(); + + XactIsoLevel = (int) tstate[0]; + XactDeferrable = (bool) tstate[1]; + XactTopTransactionId = tstate[2]; + CurrentTransactionState->transactionId = tstate[3]; + nParallelCurrentXids = (int) tstate[4]; + ParallelCurrentXids = &tstate[5]; + + CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; +} + +/* + * EndParallelWorkerTransaction + * End a parallel worker transaction. + */ +void +EndParallelWorkerTransaction(void) +{ + Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS); + CommitTransaction(); + CurrentTransactionState->blockState = TBLOCK_DEFAULT; +} + /* * ShowTransactionState * Debug support @@ -4516,6 +4966,8 @@ BlockStateAsString(TBlockState blockState) return "BEGIN"; case TBLOCK_INPROGRESS: return "INPROGRESS"; + case TBLOCK_PARALLEL_INPROGRESS: + return "PARALLEL_INPROGRESS"; case TBLOCK_END: return "END"; case TBLOCK_ABORT: diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index da7b6c2fad..6cf441534c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -292,6 +292,14 @@ static TimeLineID curFileTLI; * end+1 of the last record, and is reset when we end a top-level transaction, * or start a new one; so it can be used to tell if the current transaction has * created any XLOG records. + * + * While in parallel mode, this may not be fully up to date. When committing, + * a transaction can assume this covers all xlog records written either by the + * user backend or by any parallel worker which was present at any point during + * the transaction. But when aborting, or when still in parallel mode, other + * parallel backends may have written WAL records at later LSNs than the value + * stored here. The parallel leader advances its own copy, when necessary, + * in WaitForParallelWorkersToFinish. */ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 1af977cb1d..2f6d697d82 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -20,6 +20,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/dependency.h" @@ -3646,6 +3647,12 @@ InitTempTableNamespace(void) (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), errmsg("cannot create temporary tables during recovery"))); + /* Parallel workers can't create temporary tables, either. */ + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("cannot create temporary tables in parallel mode"))); + snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId); namespaceId = get_namespace_oid(namespaceName, true); @@ -3709,7 +3716,7 @@ InitTempTableNamespace(void) * End-of-transaction cleanup for namespaces. */ void -AtEOXact_Namespace(bool isCommit) +AtEOXact_Namespace(bool isCommit, bool parallel) { /* * If we abort the transaction in which a temp namespace was selected, @@ -3719,7 +3726,7 @@ AtEOXact_Namespace(bool isCommit) * at backend shutdown. (We only want to register the callback once per * session, so this is a good place to do it.) */ - if (myTempNamespaceSubID != InvalidSubTransactionId) + if (myTempNamespaceSubID != InvalidSubTransactionId && !parallel) { if (isCommit) before_shmem_exit(RemoveTempRelationsCallback, 0); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 92ff632e12..0d3721a96d 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -924,9 +924,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed) { Assert(rel); - /* check read-only transaction */ + /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); + PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 6d316d62b6..80f5553d3a 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -565,6 +565,13 @@ nextval_internal(Oid relid) if (!seqrel->rd_islocaltemp) PreventCommandIfReadOnly("nextval()"); + /* + * Forbid this during parallel operation because, to make it work, + * the cooperating backends would need to share the backend-local cached + * sequence information. Currently, we don't support that. + */ + PreventCommandIfParallelMode("nextval()"); + if (elm->last != elm->cached) /* some numbers were cached */ { Assert(elm->last_valid); @@ -862,6 +869,13 @@ do_setval(Oid relid, int64 next, bool iscalled) if (!seqrel->rd_islocaltemp) PreventCommandIfReadOnly("setval()"); + /* + * Forbid this during parallel operation because, to make it work, + * the cooperating backends would need to share the backend-local cached + * sequence information. Currently, we don't support that. + */ + PreventCommandIfParallelMode("setval()"); + /* lock page' buffer and read tuple */ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index df4da3faa9..26793eecf4 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -147,8 +147,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) /* * If the transaction is read-only, we need to check if any writes are * planned to non-temporary tables. EXPLAIN is considered read-only. + * + * Don't allow writes in parallel mode. Supporting UPDATE and DELETE would + * require (a) storing the combocid hash in shared memory, rather than + * synchronizing it just once at the start of parallelism, and (b) an + * alternative to heap_update()'s reliance on xmax for mutual exclusion. + * INSERT may have no such troubles, but we forbid it to simplify the + * checks. + * + * We have lower-level defenses in CommandCounterIncrement and elsewhere + * against performing unsafe operations in parallel mode, but this gives + * a more user-friendly error message. */ - if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + if ((XactReadOnly || IsInParallelMode()) && + !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); /* @@ -691,18 +703,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte) } /* - * Check that the query does not imply any writes to non-temp tables. + * Check that the query does not imply any writes to non-temp tables; + * unless we're in parallel mode, in which case don't even allow writes + * to temp tables. * * Note: in a Hot Standby slave this would need to reject writes to temp - * tables as well; but an HS slave can't have created any temp tables - * in the first place, so no need to check that. + * tables just as we do in parallel mode; but an HS slave can't have created + * any temp tables in the first place, so no need to check that. */ static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt) { ListCell *l; - /* Fail if write permissions are requested on any non-temp table */ + /* + * Fail if write permissions are requested in parallel mode for + * table (temp or non-temp), otherwise fail for any non-temp table. + */ foreach(l, plannedstmt->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(l); @@ -718,6 +735,9 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) PreventCommandIfReadOnly(CreateCommandTag((Node *) plannedstmt)); } + + if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt)); } diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 6c3eff7c58..ce49c471d4 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list, errmsg("%s is not allowed in a non-volatile function", CreateCommandTag(stmt)))); + if (IsInParallelMode() && !CommandIsReadOnly(stmt)) + PreventCommandIfParallelMode(CreateCommandTag(stmt)); + /* OK, build the execution_state for this query */ newes = (execution_state *) palloc(sizeof(execution_state)); if (preves) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index b3c05025bc..557d153f2a 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/spi_priv.h" +#include "miscadmin.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, } /* - * If told to be read-only, we'd better check for read-only queries. This - * can't be done earlier because we need to look at the finished, planned - * queries. (In particular, we don't want to do it between GetCachedPlan - * and PortalDefineQuery, because throwing an error between those steps - * would result in leaking our plancache refcount.) + * If told to be read-only, or in parallel mode, verify that this query + * is in fact read-only. This can't be done earlier because we need to + * look at the finished, planned queries. (In particular, we don't want + * to do it between GetCachedPlan and PortalDefineQuery, because throwing + * an error between those steps would result in leaking our plancache + * refcount.) */ - if (read_only) + if (read_only || IsInParallelMode()) { ListCell *lc; @@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, Node *pstmt = (Node *) lfirst(lc); if (!CommandIsReadOnly(pstmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a non-volatile function", - CreateCommandTag(pstmt)))); + { + if (read_only) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a non-volatile function", + CreateCommandTag(pstmt)))); + else + PreventCommandIfParallelMode(CreateCommandTag(pstmt)); + } } } @@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, errmsg("%s is not allowed in a non-volatile function", CreateCommandTag(stmt)))); + if (IsInParallelMode() && !CommandIsReadOnly(stmt)) + PreventCommandIfParallelMode(CreateCommandTag(stmt)); + /* * If not read-only mode, advance the command counter before each * command and update the snapshot. diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 307fb60665..f12f2d582e 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -16,12 +16,15 @@ #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" +#include "miscadmin.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" static shm_mq *pq_mq; static shm_mq_handle *pq_mq_handle; static bool pq_mq_busy = false; +static pid_t pq_mq_parallel_master_pid = 0; +static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId; static void mq_comm_reset(void); static int mq_flush(void); @@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh) FrontendProtocol = PG_PROTOCOL_LATEST; } +/* + * Arrange to SendProcSignal() to the parallel master each time we transmit + * message data via the shm_mq. + */ +void +pq_set_parallel_master(pid_t pid, BackendId backend_id) +{ + Assert(PqCommMethods == &PqCommMqMethods); + pq_mq_parallel_master_pid = pid; + pq_mq_parallel_master_backend_id = backend_id; +} + static void mq_comm_reset(void) { @@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len) iov[1].len = len; Assert(pq_mq_handle != NULL); - result = shm_mq_sendv(pq_mq_handle, iov, 2, false); + + for (;;) + { + result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + + if (pq_mq_parallel_master_pid != 0) + SendProcSignal(pq_mq_parallel_master_pid, + PROCSIG_PARALLEL_MESSAGE, + pq_mq_parallel_master_backend_id); + + if (result != SHM_MQ_WOULD_BLOCK) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } pq_mq_busy = false; diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index d4939415f0..377733377b 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -995,6 +995,56 @@ WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp) return status; } +/* + * Wait for a background worker to stop. + * + * If the worker hasn't yet started, or is running, we wait for it to stop + * and then return BGWH_STOPPED. However, if the postmaster has died, we give + * up and return BGWH_POSTMASTER_DIED, because it's the postmaster that + * notifies us when a worker's state changes. + */ +BgwHandleStatus +WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + bool save_set_latch_on_sigusr1; + + save_set_latch_on_sigusr1 = set_latch_on_sigusr1; + set_latch_on_sigusr1 = true; + + PG_TRY(); + { + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + status = GetBackgroundWorkerPid(handle, &pid); + if (status == BGWH_STOPPED) + return status; + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + + if (rc & WL_POSTMASTER_DEATH) + return BGWH_POSTMASTER_DIED; + + ResetLatch(&MyProc->procLatch); + } + } + PG_CATCH(); + { + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + PG_RE_THROW(); + } + PG_END_TRY(); + + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + return status; +} + /* * Instruct the postmaster to terminate a background worker. * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8eaec0ca6e..68cc6edd09 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1682,6 +1682,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) return result; } +/* + * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin + * + * This is like ProcArrayInstallImportedXmin, but we have a pointer to the + * PGPROC of the transaction from which we imported the snapshot, rather than + * an XID. + * + * Returns TRUE if successful, FALSE if source xact is no longer running. + */ +bool +ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc) +{ + bool result = false; + TransactionId xid; + volatile PGXACT *pgxact; + + Assert(TransactionIdIsNormal(xmin)); + Assert(proc != NULL); + + /* Get lock so source xact can't end while we're doing this */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + pgxact = &allPgXact[proc->pgprocno]; + + /* + * Be certain that the referenced PGPROC has an advertised xmin which + * is no later than the one we're installing, so that the system-wide + * xmin can't go backwards. Also, make sure it's running in the same + * database, so that the per-database xmin cannot go backwards. + */ + xid = pgxact->xmin; /* fetch just once */ + if (proc->databaseId == MyDatabaseId && + TransactionIdIsNormal(xid) && + TransactionIdPrecedesOrEquals(xid, xmin)) + { + MyPgXact->xmin = TransactionXmin = xmin; + result = true; + } + + LWLockRelease(ProcArrayLock); + + return result; +} + /* * GetRunningTransactionData -- returns information about running transactions. * diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 48573bef60..0abde43565 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -17,6 +17,7 @@ #include #include +#include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" #include "storage/latch.h" @@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT)) HandleNotifyInterrupt(); + if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE)) + HandleParallelMessageInterrupt(); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index b81ebeb260..01e03f0e84 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -1653,6 +1653,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, Assert(!RecoveryInProgress()); + /* + * Since all parts of a serializable transaction must use the same + * snapshot, it is too late to establish one after a parallel operation + * has begun. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot establish serializable snapshot during a parallel operation"); + proc = MyProc; Assert(proc != NULL); GET_VXID_FROM_PGPROC(vxid, *proc); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 33720e8f8b..ea2a43209d 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -36,6 +36,7 @@ #include "rusagestub.h" #endif +#include "access/parallel.h" #include "access/printtup.h" #include "access/xact.h" #include "catalog/pg_type.h" @@ -2988,7 +2989,8 @@ ProcessInterrupts(void) } } - /* If we get here, do nothing (probably, QueryCancelPending was reset) */ + if (ParallelMessagePending) + HandleParallelMessages(); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 31e9d4cf8b..59f09dc93a 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree) static void check_xact_readonly(Node *parsetree) { - if (!XactReadOnly) + /* Only perform the check if we have a reason to do so. */ + if (!XactReadOnly && !IsInParallelMode()) return; /* * Note: Commands that need to do more complicated checking are handled * elsewhere, in particular COPY and plannable statements do their own - * checking. However they should all call PreventCommandIfReadOnly to - * actually throw the error. + * checking. However they should all call PreventCommandIfReadOnly + * or PreventCommandIfParallelMode to actually throw the error. */ switch (nodeTag(parsetree)) @@ -208,6 +209,7 @@ check_xact_readonly(Node *parsetree) case T_ImportForeignSchemaStmt: case T_SecLabelStmt: PreventCommandIfReadOnly(CreateCommandTag(parsetree)); + PreventCommandIfParallelMode(CreateCommandTag(parsetree)); break; default: /* do nothing */ @@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname) cmdname))); } +/* + * PreventCommandIfParallelMode: throw error if current (sub)transaction is + * in parallel mode. + * + * This is useful mainly to ensure consistency of the error message wording; + * most callers have checked IsInParallelMode() for themselves. + */ +void +PreventCommandIfParallelMode(const char *cmdname) +{ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + /* translator: %s is name of a SQL command, eg CREATE */ + errmsg("cannot execute %s during a parallel operation", + cmdname))); +} + /* * PreventCommandDuringRecovery: throw error if RecoveryInProgress * @@ -618,6 +638,7 @@ standard_ProcessUtility(Node *parsetree, case T_ClusterStmt: /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("CLUSTER"); + /* forbidden in parallel mode due to CommandIsReadOnly */ cluster((ClusterStmt *) parsetree, isTopLevel); break; @@ -628,6 +649,7 @@ standard_ProcessUtility(Node *parsetree, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"); + /* forbidden in parallel mode due to CommandIsReadOnly */ ExecVacuum(stmt, isTopLevel); } break; @@ -704,6 +726,7 @@ standard_ProcessUtility(Node *parsetree, * outside a transaction block is presumed to be user error. */ RequireTransactionChain(isTopLevel, "LOCK TABLE"); + /* forbidden in parallel mode due to CommandIsReadOnly */ LockTableCommand((LockStmt *) parsetree); break; @@ -735,6 +758,7 @@ standard_ProcessUtility(Node *parsetree, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("REINDEX"); + /* forbidden in parallel mode due to CommandIsReadOnly */ switch (stmt->kind) { case REINDEX_OBJECT_INDEX: diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c index a1967b6963..491824dd6b 100644 --- a/src/backend/utils/adt/lockfuncs.c +++ b/src/backend/utils/adt/lockfuncs.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/xact.h" #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" @@ -411,6 +412,15 @@ pg_lock_status(PG_FUNCTION_ARGS) #define SET_LOCKTAG_INT32(tag, key1, key2) \ SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2) +static void +PreventAdvisoryLocksInParallelMode(void) +{ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot use advisory locks during a parallel operation"))); +} + /* * pg_advisory_lock(int8) - acquire exclusive lock on an int8 key */ @@ -420,6 +430,7 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ExclusiveLock, true, false); @@ -437,6 +448,7 @@ pg_advisory_xact_lock_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ExclusiveLock, false, false); @@ -453,6 +465,7 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ShareLock, true, false); @@ -470,6 +483,7 @@ pg_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ShareLock, false, false); @@ -489,6 +503,7 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ExclusiveLock, true, true); @@ -509,6 +524,7 @@ pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ExclusiveLock, false, true); @@ -528,6 +544,7 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ShareLock, true, true); @@ -548,6 +565,7 @@ pg_try_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ShareLock, false, true); @@ -567,6 +585,7 @@ pg_advisory_unlock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockRelease(&tag, ExclusiveLock, true); @@ -586,6 +605,7 @@ pg_advisory_unlock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockRelease(&tag, ShareLock, true); @@ -603,6 +623,7 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ExclusiveLock, true, false); @@ -621,6 +642,7 @@ pg_advisory_xact_lock_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ExclusiveLock, false, false); @@ -638,6 +660,7 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ShareLock, true, false); @@ -656,6 +679,7 @@ pg_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ShareLock, false, false); @@ -676,6 +700,7 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ExclusiveLock, true, true); @@ -697,6 +722,7 @@ pg_try_advisory_xact_lock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ExclusiveLock, false, true); @@ -717,6 +743,7 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ShareLock, true, true); @@ -738,6 +765,7 @@ pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ShareLock, false, true); @@ -758,6 +786,7 @@ pg_advisory_unlock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockRelease(&tag, ExclusiveLock, true); @@ -778,6 +807,7 @@ pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockRelease(&tag, ShareLock, true); diff --git a/src/backend/utils/fmgr/dfmgr.c b/src/backend/utils/fmgr/dfmgr.c index 7476a26b79..46bc1f238f 100644 --- a/src/backend/utils/fmgr/dfmgr.c +++ b/src/backend/utils/fmgr/dfmgr.c @@ -23,6 +23,7 @@ #endif #include "lib/stringinfo.h" #include "miscadmin.h" +#include "storage/shmem.h" #include "utils/dynamic_loader.h" #include "utils/hsearch.h" @@ -692,3 +693,56 @@ find_rendezvous_variable(const char *varName) return &hentry->varValue; } + +/* + * Estimate the amount of space needed to serialize the list of libraries + * we have loaded. + */ +Size +EstimateLibraryStateSpace(void) +{ + DynamicFileList *file_scanner; + Size size = 1; + + for (file_scanner = file_list; + file_scanner != NULL; + file_scanner = file_scanner->next) + size = add_size(size, strlen(file_scanner->filename) + 1); + + return size; +} + +/* + * Serialize the list of libraries we have loaded to a chunk of memory. + */ +void +SerializeLibraryState(Size maxsize, char *start_address) +{ + DynamicFileList *file_scanner; + + for (file_scanner = file_list; + file_scanner != NULL; + file_scanner = file_scanner->next) + { + Size len; + + len = strlcpy(start_address, file_scanner->filename, maxsize) + 1; + Assert(len < maxsize); + maxsize -= len; + start_address += len; + } + start_address[0] = '\0'; +} + +/* + * Load every library the serializing backend had loaded. + */ +void +RestoreLibraryState(char *start_address) +{ + while (*start_address != '\0') + { + internal_load_library(start_address); + start_address += strlen(start_address) + 1; + } +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f43aff2d2c..8727ee3b74 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -5665,6 +5665,20 @@ set_config_option(const char *name, const char *value, elevel = ERROR; } + /* + * GUC_ACTION_SAVE changes are acceptable during a parallel operation, + * because the current worker will also pop the change. We're probably + * dealing with a function having a proconfig entry. Only the function's + * body should observe the change, and peer workers do not share in the + * execution of a function call started by this worker. + * + * Other changes might need to affect other workers, so forbid them. + */ + if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE) + ereport(elevel, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot set parameters during a parallel operation"))); + record = find_option(name, true, elevel); if (record == NULL) { @@ -6969,6 +6983,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel) { GucAction action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET; + /* + * Workers synchronize these parameters at the start of the parallel + * operation; then, we block SET during the operation. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot set parameters during a parallel operation"))); + switch (stmt->kind) { case VAR_SET_VALUE: diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c index bfd7d0ad42..cc5409b880 100644 --- a/src/backend/utils/time/combocid.c +++ b/src/backend/utils/time/combocid.c @@ -44,6 +44,7 @@ #include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" +#include "storage/shmem.h" #include "utils/combocid.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -286,3 +287,76 @@ GetRealCmax(CommandId combocid) Assert(combocid < usedComboCids); return comboCids[combocid].cmax; } + +/* + * Estimate the amount of space required to serialize the current ComboCID + * state. + */ +Size +EstimateComboCIDStateSpace(void) +{ + Size size; + + /* Add space required for saving usedComboCids */ + size = sizeof(int); + + /* Add space required for saving the combocids key */ + size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids)); + + return size; +} + +/* + * Serialize the ComboCID state into the memory, beginning at start_address. + * maxsize should be at least as large as the value returned by + * EstimateComboCIDStateSpace. + */ +void +SerializeComboCIDState(Size maxsize, char *start_address) +{ + char *endptr; + + /* First, we store the number of currently-existing ComboCIDs. */ + * (int *) start_address = usedComboCids; + + /* If maxsize is too small, throw an error. */ + endptr = start_address + sizeof(int) + + (sizeof(ComboCidKeyData) * usedComboCids); + if (endptr < start_address || endptr > start_address + maxsize) + elog(ERROR, "not enough space to serialize ComboCID state"); + + /* Now, copy the actual cmin/cmax pairs. */ + memcpy(start_address + sizeof(int), comboCids, + (sizeof(ComboCidKeyData) * usedComboCids)); +} + +/* + * Read the ComboCID state at the specified address and initialize this + * backend with the same ComboCIDs. This is only valid in a backend that + * currently has no ComboCIDs (and only makes sense if the transaction state + * is serialized and restored as well). + */ +void +RestoreComboCIDState(char *comboCIDstate) +{ + int num_elements; + ComboCidKeyData *keydata; + int i; + CommandId cid; + + Assert(!comboCids && !comboHash); + + /* First, we retrieve the number of ComboCIDs that were serialized. */ + num_elements = * (int *) comboCIDstate; + keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int)); + + /* Use GetComboCommandId to restore each ComboCID. */ + for (i = 0; i < num_elements; i++) + { + cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax); + + /* Verify that we got the expected answer. */ + if (cid != i) + elog(ERROR, "unexpected command ID while restoring combo CIDs"); + } +} diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 7cfa0cf848..a2cb4a037f 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -157,6 +157,22 @@ static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); +/* + * Snapshot fields to be serialized. + * + * Only these fields need to be sent to the cooperating backend; the + * remaining ones can (and must) set by the receiver upon restore. + */ +typedef struct SerializedSnapshotData +{ + TransactionId xmin; + TransactionId xmax; + uint32 xcnt; + int32 subxcnt; + bool suboverflowed; + bool takenDuringRecovery; + CommandId curcid; +} SerializedSnapshotData; /* * GetTransactionSnapshot @@ -188,6 +204,10 @@ GetTransactionSnapshot(void) Assert(pairingheap_is_empty(&RegisteredSnapshots)); Assert(FirstXactSnapshot == NULL); + if (IsInParallelMode()) + elog(ERROR, + "cannot take query snapshot during a parallel operation"); + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must @@ -238,6 +258,14 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { + /* + * We might be able to relax this, but nothing that could otherwise work + * needs it. + */ + if (IsInParallelMode()) + elog(ERROR, + "cannot update SecondarySnapshot during a parallel operation"); + /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. @@ -347,7 +375,8 @@ SnapshotSetCommandId(CommandId curcid) * in GetTransactionSnapshot. */ static void -SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) +SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, + PGPROC *sourceproc) { /* Caller should have checked this already */ Assert(!FirstSnapshotSet); @@ -394,7 +423,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) * doesn't seem worth contorting the logic here to avoid two calls, * especially since it's not clear that predicate.c *must* do this. */ - if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + if (sourceproc != NULL) + { + if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import the requested snapshot"), + errdetail("The source transaction is not running anymore."))); + } + else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), @@ -550,11 +587,24 @@ PushCopiedSnapshot(Snapshot snapshot) void UpdateActiveSnapshotCommandId(void) { + CommandId save_curcid, curcid; Assert(ActiveSnapshot != NULL); Assert(ActiveSnapshot->as_snap->active_count == 1); Assert(ActiveSnapshot->as_snap->regd_count == 0); - ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false); + /* + * Don't allow modification of the active snapshot during parallel + * operation. We share the snapshot to worker backends at beginning of + * parallel operation, so any change to snapshot can lead to + * inconsistencies. We have other defenses against + * CommandCounterIncrement, but there are a few places that call this + * directly, so we put an additional guard here. + */ + save_curcid = ActiveSnapshot->as_snap->curcid; + curcid = GetCurrentCommandId(false); + if (IsInParallelMode() && save_curcid != curcid) + elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation"); + ActiveSnapshot->as_snap->curcid = curcid; } /* @@ -1289,7 +1339,7 @@ ImportSnapshot(const char *idstr) errmsg("cannot import a snapshot from a different database"))); /* OK, install the snapshot */ - SetTransactionSnapshot(&snapshot, src_xid); + SetTransactionSnapshot(&snapshot, src_xid, NULL); } /* @@ -1393,3 +1443,155 @@ HistoricSnapshotGetTupleCids(void) Assert(HistoricSnapshotActive()); return tuplecid_data; } + +/* + * EstimateSnapshotSpace + * Returns the size need to store the given snapshot. + * + * We are exporting only required fields from the Snapshot, stored in + * SerializedSnapshotData. + */ +Size +EstimateSnapshotSpace(Snapshot snap) +{ + Size size; + + Assert(snap != InvalidSnapshot); + Assert(snap->satisfies == HeapTupleSatisfiesMVCC); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = add_size(sizeof(SerializedSnapshotData), + mul_size(snap->xcnt, sizeof(TransactionId))); + if (snap->subxcnt > 0 && + (!snap->suboverflowed || snap->takenDuringRecovery)) + size = add_size(size, + mul_size(snap->subxcnt, sizeof(TransactionId))); + + return size; +} + +/* + * SerializeSnapshot + * Dumps the serialized snapshot (extracted from given snapshot) onto the + * memory location at start_address. + */ +void +SerializeSnapshot(Snapshot snapshot, char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + + Assert(snapshot->xcnt >= 0); + Assert(snapshot->subxcnt >= 0); + + serialized_snapshot = (SerializedSnapshotData *) start_address; + + /* Copy all required fields */ + serialized_snapshot->xmin = snapshot->xmin; + serialized_snapshot->xmax = snapshot->xmax; + serialized_snapshot->xcnt = snapshot->xcnt; + serialized_snapshot->subxcnt = snapshot->subxcnt; + serialized_snapshot->suboverflowed = snapshot->suboverflowed; + serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery; + serialized_snapshot->curcid = snapshot->curcid; + + /* + * Ignore the SubXID array if it has overflowed, unless the snapshot + * was taken during recovey - in that case, top-level XIDs are in subxip + * as well, and we mustn't lose them. + */ + if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery) + serialized_snapshot->subxcnt = 0; + + /* Copy XID array */ + if (snapshot->xcnt > 0) + memcpy((TransactionId *) (serialized_snapshot + 1), + snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); + + /* + * Copy SubXID array. Don't bother to copy it if it had overflowed, + * though, because it's not used anywhere in that case. Except if it's a + * snapshot taken during recovery; all the top-level XIDs are in subxip as + * well in that case, so we mustn't lose them. + */ + if (snapshot->subxcnt > 0) + { + Size subxipoff = sizeof(SerializedSnapshotData) + + snapshot->xcnt * sizeof(TransactionId); + + memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff), + snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); + } +} + +/* + * RestoreSnapshot + * Restore a serialized snapshot from the specified address. + * + * The copy is palloc'd in TopTransactionContext and has initial refcounts set + * to 0. The returned snapshot has the copied flag set. + */ +Snapshot +RestoreSnapshot(char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + Size size; + Snapshot snapshot; + TransactionId *serialized_xids; + + serialized_snapshot = (SerializedSnapshotData *) start_address; + serialized_xids = (TransactionId *) + (start_address + sizeof(SerializedSnapshotData)); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = sizeof(SnapshotData) + + serialized_snapshot->xcnt * sizeof(TransactionId) + + serialized_snapshot->subxcnt * sizeof(TransactionId); + + /* Copy all required fields */ + snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); + snapshot->satisfies = HeapTupleSatisfiesMVCC; + snapshot->xmin = serialized_snapshot->xmin; + snapshot->xmax = serialized_snapshot->xmax; + snapshot->xip = NULL; + snapshot->xcnt = serialized_snapshot->xcnt; + snapshot->subxip = NULL; + snapshot->subxcnt = serialized_snapshot->subxcnt; + snapshot->suboverflowed = serialized_snapshot->suboverflowed; + snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery; + snapshot->curcid = serialized_snapshot->curcid; + + /* Copy XIDs, if present. */ + if (serialized_snapshot->xcnt > 0) + { + snapshot->xip = (TransactionId *) (snapshot + 1); + memcpy(snapshot->xip, serialized_xids, + serialized_snapshot->xcnt * sizeof(TransactionId)); + } + + /* Copy SubXIDs, if present. */ + if (serialized_snapshot->subxcnt > 0) + { + snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt; + memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt, + serialized_snapshot->subxcnt * sizeof(TransactionId)); + } + + /* Set the copied flag so that the caller will set refcounts correctly. */ + snapshot->regd_count = 0; + snapshot->active_count = 0; + snapshot->copied = true; + + return snapshot; +} + +/* + * Install a restored snapshot as the transaction snapshot. + * + * The second argument is of type void * so that snapmgr.h need not include + * the declaration for PGPROC. + */ +void +RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) +{ + SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); +} diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h new file mode 100644 index 0000000000..8274f841b6 --- /dev/null +++ b/src/include/access/parallel.h @@ -0,0 +1,68 @@ +/*------------------------------------------------------------------------- + * + * parallel.h + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel.h + * + *------------------------------------------------------------------------- + */ + +#ifndef PARALLEL_H +#define PARALLEL_H + +#include "access/xlogdefs.h" +#include "lib/ilist.h" +#include "postmaster/bgworker.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "utils/elog.h" + +typedef void (*parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc); + +typedef struct ParallelWorkerInfo +{ + BackgroundWorkerHandle *bgwhandle; + shm_mq_handle *error_mqh; + int32 pid; +} ParallelWorkerInfo; + +typedef struct ParallelContext +{ + dlist_node node; + SubTransactionId subid; + int nworkers; + parallel_worker_main_type entrypoint; + char *library_name; + char *function_name; + ErrorContextCallback *error_context_stack; + shm_toc_estimator estimator; + dsm_segment *seg; + void *private; + shm_toc *toc; + ParallelWorkerInfo *worker; +} ParallelContext; + +extern bool ParallelMessagePending; +extern int ParallelWorkerNumber; + +#define IsParallelWorker() (ParallelWorkerNumber >= 0) + +extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); +extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); +extern void InitializeParallelDSM(ParallelContext *); +extern void LaunchParallelWorkers(ParallelContext *); +extern void WaitForParallelWorkersToFinish(ParallelContext *); +extern void DestroyParallelContext(ParallelContext *); +extern bool ParallelContextActive(void); + +extern void HandleParallelMessageInterrupt(void); +extern void HandleParallelMessages(void); +extern void AtEOXact_Parallel(bool isCommit); +extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId); +extern void ParallelWorkerReportLastRecEnd(XLogRecPtr); + +#endif /* PARALLEL_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cad1bb1d31..ed808fc150 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -78,9 +78,12 @@ extern bool MyXactAccessedTempRel; typedef enum { XACT_EVENT_COMMIT, + XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_ABORT, + XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PREPARE, XACT_EVENT_PRE_COMMIT, + XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_PREPARE } XactEvent; @@ -332,6 +335,10 @@ extern void BeginInternalSubTransaction(char *name); extern void ReleaseCurrentSubTransaction(void); extern void RollbackAndReleaseCurrentSubTransaction(void); extern bool IsSubTransaction(void); +extern Size EstimateTransactionStateSpace(void); +extern void SerializeTransactionState(Size maxsize, char *start_address); +extern void StartParallelWorkerTransaction(char *tstatespace); +extern void EndParallelWorkerTransaction(void); extern bool IsTransactionBlock(void); extern bool IsTransactionOrTransactionBlock(void); extern char TransactionBlockStatusCode(void); @@ -368,4 +375,8 @@ extern const char *xact_identify(uint8 info); extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed); extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed); +extern void EnterParallelMode(void); +extern void ExitParallelMode(void); +extern bool IsInParallelMode(void); + #endif /* XACT_H */ diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index cf5f7d0a78..f3b005fa9d 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -140,7 +140,7 @@ extern Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding); /* initialization & transaction cleanup code */ extern void InitializeSearchPath(void); -extern void AtEOXact_Namespace(bool isCommit); +extern void AtEOXact_Namespace(bool isCommit, bool parallel); extern void AtEOSubXact_Namespace(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); diff --git a/src/include/fmgr.h b/src/include/fmgr.h index 418f6aadaa..b9a5c40f59 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -642,6 +642,9 @@ extern PGFunction load_external_function(char *filename, char *funcname, extern PGFunction lookup_external_function(void *filehandle, char *funcname); extern void load_file(const char *filename, bool restricted); extern void **find_rendezvous_variable(const char *varName); +extern Size EstimateLibraryStateSpace(void); +extern void SerializeLibraryState(Size maxsize, char *start_address); +extern void RestoreLibraryState(char *start_address); /* * Support for aggregate functions diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index 5f2815ca90..ad7589d4ed 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -17,6 +17,7 @@ #include "storage/shm_mq.h" extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *); +extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index eacfccbcba..c389939738 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -271,6 +271,7 @@ extern void check_stack_depth(void); /* in tcop/utility.c */ extern void PreventCommandIfReadOnly(const char *cmdname); +extern void PreventCommandIfParallelMode(const char *cmdname); extern void PreventCommandDuringRecovery(const char *cmdname); /* in utils/misc/guc.c */ diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index a81b90badc..de9180df91 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -112,6 +112,8 @@ extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle * handle, pid_t *pid); +extern BgwHandleStatus +WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *); /* Terminate a bgworker */ extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 97c6e9344e..a9b40ed944 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid); +extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc); extern RunningTransactions GetRunningTransactionData(void); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index ac9d236dec..af1a0cd71f 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -31,6 +31,7 @@ typedef enum { PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ + PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ /* Recovery conflict reasons */ PROCSIG_RECOVERY_CONFLICT_DATABASE, diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h index ce7b47c24e..f2faa12623 100644 --- a/src/include/utils/combocid.h +++ b/src/include/utils/combocid.h @@ -21,5 +21,8 @@ */ extern void AtEOXact_ComboCid(void); +extern void RestoreComboCIDState(char *comboCIDstate); +extern void SerializeComboCIDState(Size maxsize, char *start_address); +extern Size EstimateComboCIDStateSpace(void); #endif /* COMBOCID_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 64d2ec1e5e..f8524eb687 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -64,4 +64,9 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids) extern void TeardownHistoricSnapshot(bool is_error); extern bool HistoricSnapshotActive(void); +extern Size EstimateSnapshotSpace(Snapshot snapshot); +extern void SerializeSnapshot(Snapshot snapshot, char *start_address); +extern Snapshot RestoreSnapshot(char *start_address); +extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc); + #endif /* SNAPMGR_H */