diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4e02cb289d..1a1e5b5eae 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -546,6 +546,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) switch (event) { + case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ do_sql_command(entry->conn, "COMMIT TRANSACTION"); @@ -588,11 +589,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot prepare a transaction that modified remote tables"))); break; + case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_COMMIT: case XACT_EVENT_PREPARE: /* Pre-commit should have closed the open transaction */ elog(ERROR, "missed cleaning up connection during pre-commit"); break; + case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ entry->have_error = true; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b504ccd05c..e84c1743f4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -42,6 +42,7 @@ #include "access/heapam_xlog.h" #include "access/hio.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/relscan.h" #include "access/sysattr.h" #include "access/transam.h" @@ -1051,7 +1052,13 @@ relation_open(Oid relationId, LOCKMODE lockmode) /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) + { + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot access temporary tables during a parallel operation"))); MyXactAccessedTempRel = true; + } pgstat_initstats(r); @@ -1097,7 +1104,13 @@ try_relation_open(Oid relationId, LOCKMODE lockmode) /* Make note that we've accessed a temporary relation */ if (RelationUsesLocalBuffers(r)) + { + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot access temporary tables during a parallel operation"))); MyXactAccessedTempRel = true; + } pgstat_initstats(r); @@ -2237,6 +2250,17 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { + /* + * For now, parallel operations are required to be strictly read-only. + * Unlike heap_update() and heap_delete(), an insert should never create + * a combo CID, so it might be possible to relax this restriction, but + * not without more thought and testing. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot insert tuples during a parallel operation"))); + if (relation->rd_rel->relhasoids) { #ifdef NOT_USED @@ -2648,6 +2672,16 @@ heap_delete(Relation relation, ItemPointer tid, Assert(ItemPointerIsValid(tid)); + /* + * Forbid this during a parallel operation, lets it allocate a combocid. + * Other workers might need that combocid for visibility checks, and we + * have no provision for broadcasting it to them. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot delete tuples during a parallel operation"))); + block = ItemPointerGetBlockNumber(tid); buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); @@ -3099,6 +3133,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, Assert(ItemPointerIsValid(otid)); + /* + * Forbid this during a parallel operation, lets it allocate a combocid. + * Other workers might need that combocid for visibility checks, and we + * have no provision for broadcasting it to them. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot update tuples during a parallel operation"))); + /* * Fetch the list of attributes to be checked for HOT update. This is * wasted effort if we fail to update or have to put the new tuple on a @@ -5400,6 +5444,17 @@ heap_inplace_update(Relation relation, HeapTuple tuple) uint32 oldlen; uint32 newlen; + /* + * For now, parallel operations are required to be strictly read-only. + * Unlike a regular update, this should never create a combo CID, so it + * might be possible to relax this restriction, but not without more + * thought and testing. It's not clear that it would be useful, anyway. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot update tuples during a parallel operation"))); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 9d4d5dbc97..94455b23f7 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \ +OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \ timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ xloginsert.o xlogreader.o xlogutils.o diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel new file mode 100644 index 0000000000..10051863fe --- /dev/null +++ b/src/backend/access/transam/README.parallel @@ -0,0 +1,223 @@ +Overview +======== + +PostgreSQL provides some simple facilities to make writing parallel algorithms +easier. Using a data structure called a ParallelContext, you can arrange to +launch background worker processes, initialize their state to match that of +the backend which initiated parallelism, communicate with them via dynamic +shared memory, and write reasonably complex code that can run either in the +user backend or in one of the parallel workers without needing to be aware of +where it's running. + +The backend which starts a parallel operation (hereafter, the initiating +backend) starts by creating a dynamic shared memory segment which will last +for the lifetime of the parallel operation. This dynamic shared memory segment +will contain (1) a shm_mq that can be used to transport errors (and other +messages reported via elog/ereport) from the worker back to the initiating +backend; (2) serialized representations of the initiating backend's private +state, so that the worker can synchronize its state with of the initiating +backend; and (3) any other data structures which a particular user of the +ParallelContext data structure may wish to add for its own purposes. Once +the initiating backend has initialized the dynamic shared memory segment, it +asks the postmaster to launch the appropriate number of parallel workers. +These workers then connect to the dynamic shared memory segment, initiate +their state, and then invoke the appropriate entrypoint, as further detailed +below. + +Error Reporting +=============== + +When started, each parallel worker begins by attaching the dynamic shared +memory segment and locating the shm_mq to be used for error reporting; it +redirects all of its protocol messages to this shm_mq. Prior to this point, +any failure of the background worker will not be reported to the initiating +backend; from the point of view of the initiating backend, the worker simply +failed to start. The initiating backend must anyway be prepared to cope +with fewer parallel workers than it originally requested, so catering to +this case imposes no additional burden. + +Whenever a new message (or partial message; very large messages may wrap) is +sent to the error-reporting queue, PROCSIG_PARALLEL_MESSAGE is sent to the +initiating backend. This causes the next CHECK_FOR_INTERRUPTS() in the +initiating backend to read and rethrow the message. For the most part, this +makes error reporting in parallel mode "just work". Of course, to work +properly, it is important that the code the initiating backend is executing +CHECK_FOR_INTERRUPTS() regularly and avoid blocking interrupt processing for +long periods of time, but those are good things to do anyway. + +(A currently-unsolved problem is that some messages may get written to the +system log twice, once in the backend where the report was originally +generated, and again when the initiating backend rethrows the message. If +we decide to suppress one of these reports, it should probably be second one; +otherwise, if the worker is for some reason unable to propagate the message +back to the initiating backend, the message will be lost altogether.) + +State Sharing +============= + +It's possible to write C code which works correctly without parallelism, but +which fails when parallelism is used. No parallel infrastructure can +completely eliminate this problem, because any global variable is a risk. +There's no general mechanism for ensuring that every global variable in the +worker will have the same value that it does in the initiating backend; even +if we could ensure that, some function we're calling could update the variable +after each call, and only the backend where that update is performed will see +the new value. Similar problems can arise with any more-complex data +structure we might choose to use. For example, a pseudo-random number +generator should, given a particular seed value, produce the same predictable +series of values every time. But it does this by relying on some private +state which won't automatically be shared between cooperating backends. A +parallel-safe PRNG would need to store its state in dynamic shared memory, and +would require locking. The parallelism infrastructure has no way of knowing +whether the user intends to call code that has this sort of problem, and can't +do anything about it anyway. + +Instead, we take a more pragmatic approach. First, we try to make as many of +the operations that are safe outside of parallel mode work correctly in +parallel mode as well. Second, we try to prohibit common unsafe operations +via suitable error checks. These checks are intended to catch 100% of +unsafe things that a user might do from the SQL interface, but code written +in C can do unsafe things that won't trigger these checks. The error checks +are engaged via EnterParallelMode(), which should be called before creating +a parallel context, and disarmed via ExitParallelMode(), which should be +called after all parallel contexts have been destroyed. The most +significant restriction imposed by parallel mode is that all operations must +be strictly read-only; we allow no writes to the database and no DDL. We +might try to relax these restrictions in the future. + +To make as many operations as possible safe in parallel mode, we try to copy +the most important pieces of state from the initiating backend to each parallel +worker. This includes: + + - The set of libraries dynamically loaded by dfmgr.c. + + - The authenticated user ID and current database. Each parallel worker + will connect to the same database as the initiating backend, using the + same user ID. + + - The values of all GUCs. Accordingly, permanent changes to the value of + any GUC are forbidden while in parallel mode; but temporary changes, + such as entering a function with non-NULL proconfig, are OK. + + - The current subtransaction's XID, the top-level transaction's XID, and + the list of XIDs considered current (that is, they are in-progress or + subcommitted). This information is needed to ensure that tuple visibility + checks return the same results in the worker as they do in the + initiating backend. See also the section Transaction Integration, below. + + - The combo CID mappings. This is needed to ensure consistent answers to + tuple visibility checks. The need to synchronize this data structure is + a major reason why we can't support writes in parallel mode: such writes + might create new combo CIDs, and we have no way to let other workers + (or the initiating backend) know about them. + + - The transaction snapshot. + + - The active snapshot, which might be different from the transaction + snapshot. + + - The currently active user ID and security context. Note that this is + the fourth user ID we restore: the initial step of binding to the correct + database also involves restoring the authenticated user ID. When GUC + values are restored, this incidentally sets SessionUserId and OuterUserId + to the correct values. This final step restores CurrentUserId. + +To prevent undetected or unprincipled deadlocks when running in parallel mode, +this could should eventually handle heavyweight locks in some way. This is +not implemented yet. + +Transaction Integration +======================= + +Regardless of what the TransactionState stack looks like in the parallel +leader, each parallel worker ends up with a stack of depth 1. This stack +entry is marked with the special transaction block state +TBLOCK_PARALLEL_INPROGRESS so that it's not confused with an ordinary +toplevel transaction. The XID of this TransactionState is set to the XID of +the innermost currently-active subtransaction in the initiating backend. The +initiating backend's toplevel XID, and the XIDs of all current (in-progress +or subcommitted) XIDs are stored separately from the TransactionState stack, +but in such a way that GetTopTransactionId(), GetTopTransactionIdIfAny(), and +TransactionIdIsCurrentTransactionId() return the same values that they would +in the initiating backend. We could copy the entire transaction state stack, +but most of it would be useless: for example, you can't roll back to a +savepoint from within a parallel worker, and there are no resources to +associated with the memory contexts or resource owners of intermediate +subtransactions. + +No meaningful change to the transaction state can be made while in parallel +mode. No XIDs can be assigned, and no subtransactions can start or end, +because we have no way of communicating these state changes to cooperating +backends, or of synchronizing them. It's clearly unworkable for the initiating +backend to exit any transaction or subtransaction that was in progress when +parallelism was started before all parallel workers have exited; and it's even +more clearly crazy for a parallel worker to try to subcommit or subabort the +current subtransaction and execute in some other transaction context than was +present in the initiating backend. It might be practical to allow internal +sub-transactions (e.g. to implement a PL/pgsql EXCEPTION block) to be used in +parallel mode, provided that they are XID-less, because other backends +wouldn't really need to know about those transactions or do anything +differently because of them. Right now, we don't even allow that. + +At the end of a parallel operation, which can happen either because it +completed successfully or because it was interrupted by an error, parallel +workers associated with that operation exit. In the error case, transaction +abort processing in the parallel leader kills of any remaining workers, and +the parallel leader then waits for them to die. In the case of a successful +parallel operation, the parallel leader does not send any signals, but must +wait for workers to complete and exit of their own volition. In either +case, it is very important that all workers actually exit before the +parallel leader cleans up the (sub)transaction in which they were created; +otherwise, chaos can ensue. For example, if the leader is rolling back the +transaction that created the relation being scanned by a worker, the +relation could disappear while the worker is still busy scanning it. That's +not safe. + +Generally, the cleanup performed by each worker at this point is similar to +top-level commit or abort. Each backend has its own resource owners: buffer +pins, catcache or relcache reference counts, tuple descriptors, and so on +are managed separately by each backend, and must free them before exiting. +There are, however, some important differences between parallel worker +commit or abort and a real top-level transaction commit or abort. Most +importantly: + + - No commit or abort record is written; the initiating backend is + responsible for this. + + - Cleanup of pg_temp namespaces is not done. Parallel workers cannot + safely access the initiating backend's pg_temp namespace, and should + not create one of their own. + +Coding Conventions +=================== + +Before beginning any parallel operation, call EnterParallelMode(); after all +parallel operations are completed, call ExitParallelMode(). To actually +parallelize a particular operation, use a ParallelContext. The basic coding +pattern looks like this: + + EnterParallelMode(); /* prohibit unsafe state changes */ + + pcxt = CreateParallelContext(entrypoint, nworkers); + + /* Allow space for application-specific data here. */ + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, keys); + + InitializeParallelDSM(pcxt); /* create DSM and copy state to it */ + + /* Store the data for which we reserved space. */ + space = shm_toc_allocate(pcxt->toc, size); + shm_toc_insert(pcxt->toc, key, space); + + LaunchParallelWorkers(pcxt); + + /* do parallel stuff */ + + WaitForParallelWorkersToFinish(pcxt); + + /* read any final results from dynamic shared memory */ + + DestroyParallelContext(pcxt); + + ExitParallelMode(); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c new file mode 100644 index 0000000000..8ed7314b58 --- /dev/null +++ b/src/backend/access/transam/parallel.c @@ -0,0 +1,1007 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/parallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/parallel.h" +#include "commands/async.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/sinval.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/combocid.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/snapmgr.h" + +/* + * We don't want to waste a lot of memory on an error queue which, most of + * the time, will process only a handful of small messages. However, it is + * desirable to make it large enough that a typical ErrorResponse can be sent + * without blocking. That way, a worker that errors out can write the whole + * message into the queue and terminate without waiting for the user backend. + */ +#define PARALLEL_ERROR_QUEUE_SIZE 16384 + +/* Magic number for parallel context TOC. */ +#define PARALLEL_MAGIC 0x50477c7c + +/* + * Magic numbers for parallel state sharing. Higher-level code should use + * smaller values, leaving these very large ones for use by this module. + */ +#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) +#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) +#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003) +#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004) +#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005) +#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) +#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) +#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) +#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) + +/* Fixed-size parallel state. */ +typedef struct FixedParallelState +{ + /* Fixed-size state that workers must restore. */ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; + PGPROC *parallel_master_pgproc; + pid_t parallel_master_pid; + BackendId parallel_master_backend_id; + + /* Entrypoint for parallel workers. */ + parallel_worker_main_type entrypoint; + + /* Mutex protects remaining fields. */ + slock_t mutex; + + /* Track whether workers have attached. */ + int workers_expected; + int workers_attached; + + /* Maximum XactLastRecEnd of any worker. */ + XLogRecPtr last_xlog_end; +} FixedParallelState; + +/* + * Our parallel worker number. We initialize this to -1, meaning that we are + * not a parallel worker. In parallel workers, it will be set to a value >= 0 + * and < the number of workers before any user code is invoked; each parallel + * worker will get a different parallel worker number. + */ +int ParallelWorkerNumber = -1; + +/* Is there a parallel message pending which we need to receive? */ +bool ParallelMessagePending = false; + +/* Pointer to our fixed parallel state. */ +static FixedParallelState *MyFixedParallelState; + +/* List of active parallel contexts. */ +static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); + +/* Private functions. */ +static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void ParallelErrorContext(void *arg); +static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); +static void ParallelWorkerMain(Datum main_arg); + +/* + * Establish a new parallel context. This should be done after entering + * parallel mode, and (unless there is an error) the context should be + * destroyed before exiting the current subtransaction. + */ +ParallelContext * +CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* It is unsafe to create a parallel context if not in parallel mode. */ + Assert(IsInParallelMode()); + + /* Number of workers should be non-negative. */ + Assert(nworkers >= 0); + + /* + * If dynamic shared memory is not available, we won't be able to use + * background workers. + */ + if (dynamic_shared_memory_type == DSM_IMPL_NONE) + nworkers = 0; + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize a new ParallelContext. */ + pcxt = palloc0(sizeof(ParallelContext)); + pcxt->subid = GetCurrentSubTransactionId(); + pcxt->nworkers = nworkers; + pcxt->entrypoint = entrypoint; + pcxt->error_context_stack = error_context_stack; + shm_toc_initialize_estimator(&pcxt->estimator); + dlist_push_head(&pcxt_list, &pcxt->node); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish a new parallel context that calls a function provided by an + * extension. This works around the fact that the library might get mapped + * at a different address in each backend. + */ +ParallelContext * +CreateParallelContextForExternalFunction(char *library_name, + char *function_name, + int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Create the context. */ + pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); + pcxt->library_name = pstrdup(library_name); + pcxt->function_name = pstrdup(function_name); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish the dynamic shared memory segment for a parallel context and + * copied state and other bookkeeping information that will need by parallel + * workers into it. + */ +void +InitializeParallelDSM(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + Size library_len = 0; + Size guc_len = 0; + Size combocidlen = 0; + Size tsnaplen = 0; + Size asnaplen = 0; + Size tstatelen = 0; + Size segsize = 0; + int i; + FixedParallelState *fps; + Snapshot transaction_snapshot = GetTransactionSnapshot(); + Snapshot active_snapshot = GetActiveSnapshot(); + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Allow space to store the fixed-size parallel state. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Normally, the user will have requested at least one worker process, + * but if by chance they have not, we can skip a bunch of things here. + */ + if (pcxt->nworkers > 0) + { + /* Estimate space for various kinds of state sharing. */ + library_len = EstimateLibraryStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, library_len); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, guc_len); + combocidlen = EstimateComboCIDStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, combocidlen); + tsnaplen = EstimateSnapshotSpace(transaction_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); + asnaplen = EstimateSnapshotSpace(active_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); + tstatelen = EstimateTransactionStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + /* If you add more chunks here, you probably need to add keys. */ + shm_toc_estimate_keys(&pcxt->estimator, 6); + + /* Estimate space need for error queues. */ + StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == + PARALLEL_ERROR_QUEUE_SIZE, + "parallel error queue size not buffer-aligned"); + shm_toc_estimate_chunk(&pcxt->estimator, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate how much we'll need for extension entrypoint info. */ + if (pcxt->library_name != NULL) + { + Assert(pcxt->entrypoint == ParallelExtensionTrampoline); + Assert(pcxt->function_name != NULL); + shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + + strlen(pcxt->function_name) + 2); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + } + + /* + * Create DSM and initialize with new table of contents. But if the user + * didn't request any workers, then don't bother creating a dynamic shared + * memory segment; instead, just use backend-private memory. + * + * Also, if we can't create a dynamic shared memory segment because the + * maximum number of segments have already been created, then fall back + * to backend-private memory, and plan not to use any workers. We hope + * this won't happen very often, but it's better to abandon the use of + * parallelism than to fail outright. + */ + segsize = shm_toc_estimate(&pcxt->estimator); + if (pcxt->nworkers != 0) + pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (pcxt->seg != NULL) + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, + dsm_segment_address(pcxt->seg), + segsize); + else + { + pcxt->nworkers = 0; + pcxt->private = MemoryContextAlloc(TopMemoryContext, segsize); + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private, segsize); + } + + /* Initialize fixed-size state in shared memory. */ + fps = (FixedParallelState *) + shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState)); + fps->database_id = MyDatabaseId; + fps->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context); + fps->parallel_master_pgproc = MyProc; + fps->parallel_master_pid = MyProcPid; + fps->parallel_master_backend_id = MyBackendId; + fps->entrypoint = pcxt->entrypoint; + SpinLockInit(&fps->mutex); + fps->workers_expected = pcxt->nworkers; + fps->workers_attached = 0; + fps->last_xlog_end = 0; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); + + /* We can skip the rest of this if we're not budgeting for any workers. */ + if (pcxt->nworkers > 0) + { + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + char *error_queue_space; + + /* Serialize shared libraries we have loaded. */ + libraryspace = shm_toc_allocate(pcxt->toc, library_len); + SerializeLibraryState(library_len, libraryspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace); + + /* Serialize GUC settings. */ + gucspace = shm_toc_allocate(pcxt->toc, guc_len); + SerializeGUCState(guc_len, gucspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace); + + /* Serialize combo CID state. */ + combocidspace = shm_toc_allocate(pcxt->toc, combocidlen); + SerializeComboCIDState(combocidlen, combocidspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); + + /* Serialize transaction snapshot and active snapshot. */ + tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); + SerializeSnapshot(transaction_snapshot, tsnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, + tsnapspace); + asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); + SerializeSnapshot(active_snapshot, asnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + + /* Serialize transaction state. */ + tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); + SerializeTransactionState(tstatelen, tstatespace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); + + /* Allocate space for worker information. */ + pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); + + /* + * Establish error queues in dynamic shared memory. + * + * These queues should be used only for transmitting ErrorResponse, + * NoticeResponse, and NotifyResponse protocol messages. Tuple data + * should be transmitted via separate (possibly larger?) queues. + */ + error_queue_space = + shm_toc_allocate(pcxt->toc, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); + + /* Serialize extension entrypoint information. */ + if (pcxt->library_name != NULL) + { + Size lnamelen = strlen(pcxt->library_name); + char *extensionstate; + + extensionstate = shm_toc_allocate(pcxt->toc, lnamelen + + strlen(pcxt->function_name) + 2); + strcpy(extensionstate, pcxt->library_name); + strcpy(extensionstate + lnamelen + 1, pcxt->function_name); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, + extensionstate); + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Launch parallel workers. + */ +void +LaunchParallelWorkers(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + int i; + bool any_registrations_failed = false; + + /* Skip this if we have no workers. */ + if (pcxt->nworkers == 0) + return; + + /* If we do have workers, we'd better have a DSM segment. */ + Assert(pcxt->seg != NULL); + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Configure a worker. */ + snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", + MyProcPid); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = ParallelWorkerMain; + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); + worker.bgw_notify_pid = MyProcPid; + + /* + * Start workers. + * + * The caller must be able to tolerate ending up with fewer workers than + * expected, so there is no need to throw an error here if registration + * fails. It wouldn't help much anyway, because registering the worker + * in no way guarantees that it will start up and initialize successfully. + */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (!any_registrations_failed && + RegisterDynamicBackgroundWorker(&worker, + &pcxt->worker[i].bgwhandle)) + shm_mq_set_handle(pcxt->worker[i].error_mqh, + pcxt->worker[i].bgwhandle); + else + { + /* + * If we weren't able to register the worker, then we've bumped + * up against the max_worker_processes limit, and future + * registrations will probably fail too, so arrange to skip them. + * But we still have to execute this code for the remaining slots + * to make sure that we forget about the error queues we budgeted + * for those workers. Otherwise, we'll wait for them to start, + * but they never will. + */ + any_registrations_failed = true; + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Wait for all workers to exit. + * + * Even if the parallel operation seems to have completed successfully, it's + * important to call this function afterwards. We must not miss any errors + * the workers may have thrown during the parallel operation, or any that they + * may yet throw while shutting down. + * + * Also, we want to update our notion of XactLastRecEnd based on worker + * feedback. + */ +void +WaitForParallelWorkersToFinish(ParallelContext *pcxt) +{ + for (;;) + { + bool anyone_alive = false; + int i; + + /* + * This will process any parallel messages that are pending, which + * may change the outcome of the loop that follows. It may also + * throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + { + anyone_alive = true; + break; + } + } + + if (!anyone_alive) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1); + ResetLatch(&MyProc->procLatch); + } + + if (pcxt->toc != NULL) + { + FixedParallelState *fps; + + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + if (fps->last_xlog_end > XactLastRecEnd) + XactLastRecEnd = fps->last_xlog_end; + } +} + +/* + * Destroy a parallel context. + * + * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() + * first, before calling this function. When this function is invoked, any + * remaining workers are forcibly killed; the dynamic shared memory segment + * is unmapped; and we then wait (uninterruptibly) for the workers to exit. + */ +void +DestroyParallelContext(ParallelContext *pcxt) +{ + int i; + + /* + * Be careful about order of operations here! We remove the parallel + * context from the list before we do anything else; otherwise, if an + * error occurs during a subsequent step, we might try to nuke it again + * from AtEOXact_Parallel or AtEOSubXact_Parallel. + */ + dlist_delete(&pcxt->node); + + /* Kill each worker in turn, and forget their error queues. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].bgwhandle != NULL) + TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); + if (pcxt->worker[i].error_mqh != NULL) + { + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].error_mqh = NULL; + } + } + + /* + * If we have allocated a shared memory segment, detach it. This will + * implicitly detach the error queues, and any other shared memory queues, + * stored there. + */ + if (pcxt->seg != NULL) + { + dsm_detach(pcxt->seg); + pcxt->seg = NULL; + } + + /* + * If this parallel context is actually in backend-private memory rather + * than shared memory, free that memory instead. + */ + if (pcxt->private != NULL) + { + pfree(pcxt->private); + pcxt->private = NULL; + } + + /* Wait until the workers actually die. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + BgwHandleStatus status; + + if (pcxt->worker[i].bgwhandle == NULL) + continue; + + /* + * We can't finish transaction commit or abort until all of the + * workers are dead. This means, in particular, that we can't respond + * to interrupts at this stage. + */ + HOLD_INTERRUPTS(); + status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); + RESUME_INTERRUPTS(); + + /* + * If the postmaster kicked the bucket, we have no chance of cleaning + * up safely -- we won't be able to tell when our workers are actually + * dead. This doesn't necessitate a PANIC since they will all abort + * eventually, but we can't safely continue this session. + */ + if (status == BGWH_POSTMASTER_DIED) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("postmaster exited during a parallel transaction"))); + + /* Release memory. */ + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + + /* Free the worker array itself. */ + if (pcxt->worker != NULL) + { + pfree(pcxt->worker); + pcxt->worker = NULL; + } + + /* Free memory. */ + pfree(pcxt); +} + +/* + * Are there any parallel contexts currently active? + */ +bool +ParallelContextActive(void) +{ + return !dlist_is_empty(&pcxt_list); +} + +/* + * Handle receipt of an interrupt indicating a parallel worker message. + */ +void +HandleParallelMessageInterrupt(void) +{ + int save_errno = errno; + + InterruptPending = true; + ParallelMessagePending = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Handle any queued protocol messages received from parallel workers. + */ +void +HandleParallelMessages(void) +{ + dlist_iter iter; + + ParallelMessagePending = false; + + dlist_foreach(iter, &pcxt_list) + { + ParallelContext *pcxt; + int i; + Size nbytes; + void *data; + + pcxt = dlist_container(ParallelContext, node, iter.cur); + if (pcxt->worker == NULL) + continue; + + for (i = 0; i < pcxt->nworkers; ++i) + { + /* + * Read as many messages as we can from each worker, but stop + * when either (1) the error queue goes away, which can happen if + * we receive a Terminate message from the worker; or (2) no more + * messages can be read from the worker without blocking. + */ + while (pcxt->worker[i].error_mqh != NULL) + { + shm_mq_result res; + + res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, + &data, true); + if (res == SHM_MQ_WOULD_BLOCK) + break; + else if (res == SHM_MQ_SUCCESS) + { + StringInfoData msg; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + HandleParallelMessage(pcxt, i, &msg); + pfree(msg.data); + } + else + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ + errmsg("lost connection to parallel worker"))); + + /* This might make the error queue go away. */ + CHECK_FOR_INTERRUPTS(); + } + } + } +} + +/* + * Handle a single protocol message received from a single parallel worker. + */ +static void +HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) +{ + char msgtype; + + msgtype = pq_getmsgbyte(msg); + + switch (msgtype) + { + case 'K': /* BackendKeyData */ + { + int32 pid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 4); /* discard cancel key */ + (void) pq_getmsgend(msg); + pcxt->worker[i].pid = pid; + break; + } + + case 'E': /* ErrorResponse */ + case 'N': /* NoticeResponse */ + { + ErrorData edata; + ErrorContextCallback errctx; + ErrorContextCallback *save_error_context_stack; + + /* + * Rethrow the error using the error context callbacks that + * were in effect when the context was created, not the + * current ones. + */ + save_error_context_stack = error_context_stack; + errctx.callback = ParallelErrorContext; + errctx.arg = &pcxt->worker[i].pid; + errctx.previous = pcxt->error_context_stack; + error_context_stack = &errctx; + + /* Parse ErrorReponse or NoticeResponse. */ + pq_parse_errornotice(msg, &edata); + + /* Death of a worker isn't enough justification for suicide. */ + edata.elevel = Min(edata.elevel, ERROR); + + /* Rethrow error or notice. */ + ThrowErrorData(&edata); + + /* Restore previous context. */ + error_context_stack = save_error_context_stack; + + break; + } + + case 'A': /* NotifyResponse */ + { + /* Propagate NotifyResponse. */ + pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + break; + } + + case 'X': /* Terminate, indicating clean exit */ + { + pfree(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + break; + } + + default: + { + elog(ERROR, "unknown message type: %c (%d bytes)", + msgtype, msg->len); + } + } +} + +/* + * End-of-subtransaction cleanup for parallel contexts. + * + * Currently, it's forbidden to enter or leave a subtransaction while + * parallel mode is in effect, so we could just blow away everything. But + * we may want to relax that restriction in the future, so this code + * contemplates that there may be multiple subtransaction IDs in pcxt_list. + */ +void +AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) +{ + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (pcxt->subid != mySubId) + break; + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * End-of-transaction cleanup for parallel contexts. + */ +void +AtEOXact_Parallel(bool isCommit) +{ + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * Main entrypoint for parallel workers. + */ +static void +ParallelWorkerMain(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + FixedParallelState *fps; + char *error_queue_space; + shm_mq *mq; + shm_mq_handle *mqh; + char *libraryspace; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + StringInfoData msgbuf; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "parallel worker", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * Now that we have a resource owner, we can attach to the dynamic + * shared memory segment and read the table of contents. + */ + seg = dsm_attach(DatumGetUInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Determine and set our worker number. */ + fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); + Assert(fps != NULL); + Assert(ParallelWorkerNumber == -1); + SpinLockAcquire(&fps->mutex); + if (fps->workers_attached < fps->workers_expected) + ParallelWorkerNumber = fps->workers_attached++; + SpinLockRelease(&fps->mutex); + if (ParallelWorkerNumber < 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many parallel workers already attached"))); + MyFixedParallelState = fps; + + /* + * Now that we have a worker number, we can find and attach to the error + * queue provided for us. That's good, because until we do that, any + * errors that happen here will not be reported back to the process that + * requested that this worker be launched. + */ + error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); + mq = (shm_mq *) (error_queue_space + + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_sender(mq, MyProc); + mqh = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(mq, mqh); + pq_set_parallel_master(fps->parallel_master_pid, + fps->parallel_master_backend_id); + + /* + * Send a BackendKeyData message to the process that initiated parallelism + * so that it has access to our PID before it receives any other messages + * from us. Our cancel key is sent, too, since that's the way the protocol + * message is defined, but it won't actually be used for anything in this + * case. + */ + pq_beginmessage(&msgbuf, 'K'); + pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32)); + pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32)); + pq_endmessage(&msgbuf); + + /* + * Hooray! Primary initialization is complete. Now, we need to set up + * our backend-local state to match the original backend. + */ + + /* + * Load libraries that were loaded by original backend. We want to do this + * before restoring GUCs, because the libraries might define custom + * variables. + */ + libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); + Assert(libraryspace != NULL); + RestoreLibraryState(libraryspace); + + /* Restore database connection. */ + BackgroundWorkerInitializeConnectionByOid(fps->database_id, + fps->authenticated_user_id); + + /* Restore GUC values from launching backend. */ + gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); + Assert(gucspace != NULL); + StartTransactionCommand(); + RestoreGUCState(gucspace); + CommitTransactionCommand(); + + /* Crank up a transaction state appropriate to a parallel worker. */ + tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); + StartParallelWorkerTransaction(tstatespace); + + /* Restore combo CID state. */ + combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); + Assert(combocidspace != NULL); + RestoreComboCIDState(combocidspace); + + /* Restore transaction snapshot. */ + tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); + Assert(tsnapspace != NULL); + RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), + fps->parallel_master_pgproc); + + /* Restore active snapshot. */ + asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); + Assert(asnapspace != NULL); + PushActiveSnapshot(RestoreSnapshot(asnapspace)); + + /* Restore user ID and security context. */ + SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); + + /* + * We've initialized all of our state now; nothing should change hereafter. + */ + EnterParallelMode(); + + /* + * Time to do the real work: invoke the caller-supplied code. + * + * If you get a crash at this line, see the comments for + * ParallelExtensionTrampoline. + */ + fps->entrypoint(seg, toc); + + /* Must exit parallel mode to pop active snapshot. */ + ExitParallelMode(); + + /* Must pop active snapshot so resowner.c doesn't complain. */ + PopActiveSnapshot(); + + /* Shut down the parallel-worker transaction. */ + EndParallelWorkerTransaction(); + + /* Report success. */ + pq_putmessage('X', NULL, 0); +} + +/* + * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a + * function living in a dynamically loaded module, because the module might + * not be loaded in every process, or might be loaded but not at the same + * address. To work around that problem, CreateParallelContextForExtension() + * arranges to call this function rather than calling the extension-provided + * function directly; and this function then looks up the real entrypoint and + * calls it. + */ +static void +ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) +{ + char *extensionstate; + char *library_name; + char *function_name; + parallel_worker_main_type entrypt; + + extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); + Assert(extensionstate != NULL); + library_name = extensionstate; + function_name = extensionstate + strlen(library_name) + 1; + + entrypt = (parallel_worker_main_type) + load_external_function(library_name, function_name, true, NULL); + entrypt(seg, toc); +} + +/* + * Give the user a hint that this is a message propagated from a parallel + * worker. Otherwise, it can sometimes be confusing to understand what + * actually happened. + */ +static void +ParallelErrorContext(void *arg) +{ + errcontext("parallel worker, pid %d", * (int32 *) arg); +} + +/* + * Update shared memory with the ending location of the last WAL record we + * wrote, if it's greater than the value already stored there. + */ +void +ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) +{ + FixedParallelState *fps = MyFixedParallelState; + + Assert(fps != NULL); + SpinLockAcquire(&fps->mutex); + if (fps->last_xlog_end < last_xlog_end) + fps->last_xlog_end = last_xlog_end; + SpinLockRelease(&fps->mutex); +} diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 42ee57fe8d..cf3e964fc6 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -49,6 +49,13 @@ GetNewTransactionId(bool isSubXact) { TransactionId xid; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new XIDs after that point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign TransactionIds during a parallel operation"); + /* * During bootstrap initialization, we return the special bootstrap * transaction id. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 511bcbbc51..a8f78d6376 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -22,6 +22,7 @@ #include "access/commit_ts.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -51,6 +52,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" +#include "utils/builtins.h" #include "utils/catcache.h" #include "utils/combocid.h" #include "utils/guc.h" @@ -77,6 +79,33 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; +/* + * When running as a parallel worker, we place only a single + * TransactionStateData on the parallel worker's state stack, and the XID + * reflected there will be that of the *innermost* currently-active + * subtransaction in the backend that initiated paralllelism. However, + * GetTopTransactionId() and TransactionIdIsCurrentTransactionId() + * need to return the same answers in the parallel worker as they would have + * in the user backend, so we need some additional bookkeeping. + * + * XactTopTransactionId stores the XID of our toplevel transaction, which + * will be the same as TopTransactionState.transactionId in an ordinary + * backend; but in a parallel backend, which does not have the entire + * transaction state, it will instead be copied from the backend that started + * the parallel operation. + * + * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary + * backend, but in a parallel backend, nParallelCurrentXids will contain the + * number of XIDs that need to be considered current, and ParallelCurrentXids + * will contain the XIDs themselves. This includes all XIDs that were current + * or sub-committed in the parent at the time the parallel operation began. + * The XIDs are stored sorted in numerical order (not logical order) to make + * lookups as fast as possible. + */ +TransactionId XactTopTransactionId = InvalidTransactionId; +int nParallelCurrentXids = 0; +TransactionId *ParallelCurrentXids; + /* * MyXactAccessedTempRel is set when a temporary relation is accessed. * We don't allow PREPARE TRANSACTION in that case. (This is global @@ -113,6 +142,7 @@ typedef enum TBlockState /* transaction block states */ TBLOCK_BEGIN, /* starting transaction block */ TBLOCK_INPROGRESS, /* live transaction */ + TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */ TBLOCK_END, /* COMMIT received */ TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ @@ -154,6 +184,7 @@ typedef struct TransactionStateData bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ bool didLogXid; /* has xid been included in WAL record? */ + int parallelModeLevel; /* Enter/ExitParallelMode counter */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -184,6 +215,7 @@ static TransactionStateData TopTransactionStateData = { false, /* entry-time xact r/o state */ false, /* startedInRecovery */ false, /* didLogXid */ + 0, /* parallelMode */ NULL /* link to parent state block */ }; @@ -353,9 +385,9 @@ IsAbortedTransactionBlockState(void) TransactionId GetTopTransactionId(void) { - if (!TransactionIdIsValid(TopTransactionStateData.transactionId)) + if (!TransactionIdIsValid(XactTopTransactionId)) AssignTransactionId(&TopTransactionStateData); - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -368,7 +400,7 @@ GetTopTransactionId(void) TransactionId GetTopTransactionIdIfAny(void) { - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -461,6 +493,13 @@ AssignTransactionId(TransactionState s) Assert(!TransactionIdIsValid(s->transactionId)); Assert(s->state == TRANS_INPROGRESS); + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new XIDs at this point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign XIDs during a parallel operation"); + /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Musn't recurse here, or we might get a stack overflow @@ -513,6 +552,8 @@ AssignTransactionId(TransactionState s) * the Xid as "running". See GetNewTransactionId. */ s->transactionId = GetNewTransactionId(isSubXact); + if (!isSubXact) + XactTopTransactionId = s->transactionId; if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); @@ -644,7 +685,16 @@ GetCurrentCommandId(bool used) { /* this is global to a transaction, not subtransaction-local */ if (used) + { + /* + * Forbid setting currentCommandIdUsed in parallel mode, because we + * have no provision for communicating this back to the master. We + * could relax this restriction when currentCommandIdUsed was already + * true at the start of the parallel operation. + */ + Assert(CurrentTransactionState->parallelModeLevel == 0); currentCommandIdUsed = true; + } return currentCommandId; } @@ -737,6 +787,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) if (!TransactionIdIsNormal(xid)) return false; + /* + * In parallel workers, the XIDs we must consider as current are stored + * in ParallelCurrentXids rather than the transaction-state stack. Note + * that the XIDs in this array are sorted numerically rather than + * according to transactionIdPrecedes order. + */ + if (nParallelCurrentXids > 0) + { + int low, + high; + + low = 0; + high = nParallelCurrentXids - 1; + while (low <= high) + { + int middle; + TransactionId probe; + + middle = low + (high - low) / 2; + probe = ParallelCurrentXids[middle]; + if (probe == xid) + return true; + else if (probe < xid) + low = middle + 1; + else + high = middle - 1; + } + return false; + } + /* * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their @@ -790,6 +870,48 @@ TransactionStartedDuringRecovery(void) return CurrentTransactionState->startedInRecovery; } +/* + * EnterParallelMode + */ +void +EnterParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelModeLevel >= 0); + + ++s->parallelModeLevel; +} + +/* + * ExitParallelMode + */ +void +ExitParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelModeLevel > 0); + Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); + + --s->parallelModeLevel; +} + +/* + * IsInParallelMode + * + * Are we in a parallel operation, as either the master or a worker? Check + * this to prohibit operations that change backend-local state expected to + * match across all workers. Mere caches usually don't require such a + * restriction. State modified in a strict push/pop fashion, such as the + * active snapshot stack, is often fine. + */ +bool +IsInParallelMode(void) +{ + return CurrentTransactionState->parallelModeLevel != 0; +} + /* * CommandCounterIncrement */ @@ -804,6 +926,14 @@ CommandCounterIncrement(void) */ if (currentCommandIdUsed) { + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new commands after that + * point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot start commands during a parallel operation"); + currentCommandId += 1; if (currentCommandId == InvalidCommandId) { @@ -1650,6 +1780,8 @@ StartTransaction(void) s = &TopTransactionStateData; CurrentTransactionState = s; + Assert(XactTopTransactionId == InvalidTransactionId); + /* * check the current transaction state */ @@ -1779,6 +1911,9 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool is_parallel_worker; + + is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); ShowTransactionState("CommitTransaction"); @@ -1812,7 +1947,8 @@ CommitTransaction(void) break; } - CallXactCallbacks(XACT_EVENT_PRE_COMMIT); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT + : XACT_EVENT_PRE_COMMIT); /* * The remaining actions cannot call any user-defined code, so it's safe @@ -1821,6 +1957,13 @@ CommitTransaction(void) * the transaction-abort path. */ + /* If we might have parallel workers, clean them up now. */ + if (IsInParallelMode()) + { + AtEOXact_Parallel(true); + s->parallelModeLevel = 0; + } + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -1859,10 +2002,28 @@ CommitTransaction(void) */ s->state = TRANS_COMMIT; - /* - * Here is where we really truly commit. - */ - latestXid = RecordTransactionCommit(); + if (!is_parallel_worker) + { + /* + * We need to mark our XIDs as commited in pg_clog. This is where we + * durably commit. + */ + latestXid = RecordTransactionCommit(); + } + else + { + /* + * We must not mark our XID committed; the parallel master is + * responsible for that. + */ + latestXid = InvalidTransactionId; + + /* + * Make sure the master will know about any WAL we wrote before it + * commits. + */ + ParallelWorkerReportLastRecEnd(XactLastRecEnd); + } TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); @@ -1889,7 +2050,8 @@ CommitTransaction(void) * state. */ - CallXactCallbacks(XACT_EVENT_COMMIT); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT + : XACT_EVENT_COMMIT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -1937,7 +2099,7 @@ CommitTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + AtEOXact_Namespace(true, is_parallel_worker); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -1962,6 +2124,9 @@ CommitTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with commit processing, set current transaction state back to * default @@ -1985,6 +2150,8 @@ PrepareTransaction(void) GlobalTransaction gxact; TimestampTz prepared_at; + Assert(!IsInParallelMode()); + ShowTransactionState("PrepareTransaction"); /* @@ -2204,7 +2371,7 @@ PrepareTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + AtEOXact_Namespace(true, false); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2229,6 +2396,9 @@ PrepareTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with 1st phase commit processing, set current transaction state * back to default @@ -2247,6 +2417,7 @@ AbortTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool is_parallel_worker; /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -2295,6 +2466,7 @@ AbortTransaction(void) /* * check the current transaction state */ + is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); @@ -2318,6 +2490,13 @@ AbortTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* If in parallel mode, clean up workers and exit parallel mode. */ + if (IsInParallelMode()) + { + AtEOXact_Parallel(false); + s->parallelModeLevel = 0; + } + /* * do abort processing */ @@ -2330,9 +2509,23 @@ AbortTransaction(void) /* * Advertise the fact that we aborted in pg_clog (assuming that we got as - * far as assigning an XID to advertise). + * far as assigning an XID to advertise). But if we're inside a parallel + * worker, skip this; the user backend must be the one to write the abort + * record. */ - latestXid = RecordTransactionAbort(false); + if (!is_parallel_worker) + latestXid = RecordTransactionAbort(false); + else + { + latestXid = InvalidTransactionId; + + /* + * Since the parallel master won't get our value of XactLastRecEnd in this + * case, we nudge WAL-writer ourselves in this case. See related comments in + * RecordTransactionAbort for why this matters. + */ + XLogSetAsyncXactLSN(XactLastRecEnd); + } TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2350,7 +2543,10 @@ AbortTransaction(void) */ if (TopTransactionResourceOwner != NULL) { - CallXactCallbacks(XACT_EVENT_ABORT); + if (is_parallel_worker) + CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT); + else + CallXactCallbacks(XACT_EVENT_ABORT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -2371,7 +2567,7 @@ AbortTransaction(void) AtEOXact_GUC(false, 1); AtEOXact_SPI(false); AtEOXact_on_commit_actions(false); - AtEOXact_Namespace(false); + AtEOXact_Namespace(false, is_parallel_worker); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2423,6 +2619,10 @@ CleanupTransaction(void) s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0; + s->parallelModeLevel = 0; + + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; /* * done with abort processing, set current transaction state back to @@ -2476,6 +2676,7 @@ StartTransactionCommand(void) /* These cases are invalid. */ case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -2511,11 +2712,13 @@ CommitTransactionCommand(void) switch (s->blockState) { /* - * This shouldn't happen, because it means the previous + * These shouldn't happen. TBLOCK_DEFAULT means the previous * StartTransactionCommand didn't set the STARTED state - * appropriately. + * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended + * by EndParallelWorkerTranaction(), not this function. */ case TBLOCK_DEFAULT: + case TBLOCK_PARALLEL_INPROGRESS: elog(FATAL, "CommitTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2797,6 +3000,7 @@ AbortCurrentTransaction(void) * ABORT state. We will stay in ABORT until we get a ROLLBACK. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: AbortTransaction(); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ @@ -3186,6 +3390,7 @@ BeginTransactionBlock(void) * Already a transaction block in progress. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_ABORT: case TBLOCK_SUBABORT: @@ -3363,6 +3568,16 @@ EndTransactionBlock(void) result = true; break; + /* + * The user issued a COMMIT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3456,6 +3671,16 @@ UserAbortTransactionBlock(void) s->blockState = TBLOCK_ABORT_PENDING; break; + /* + * The user issued an ABORT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot abort during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3485,6 +3710,18 @@ DefineSavepoint(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot define savepoints during a parallel operation"))); + switch (s->blockState) { case TBLOCK_INPROGRESS: @@ -3505,6 +3742,7 @@ DefineSavepoint(char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3539,6 +3777,18 @@ ReleaseSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot release savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3562,6 +3812,7 @@ ReleaseSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3639,6 +3890,18 @@ RollbackToSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot rollback to savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3663,6 +3926,7 @@ RollbackToSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3751,6 +4015,20 @@ BeginInternalSubTransaction(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that point. + * We might be able to make an exception for the type of subtransaction + * established by this function, which is typically used in contexts where + * we're going to release or roll back the subtransaction before proceeding + * further, so that no enduring change to the transaction state occurs. + * For now, however, we prohibit this case along with all the others. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start subtransactions during a parallel operation"))); + switch (s->blockState) { case TBLOCK_STARTED: @@ -3773,6 +4051,7 @@ BeginInternalSubTransaction(char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -3805,6 +4084,18 @@ ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for commit of subtransactions after that + * point. This should not happen anyway. Code calling this would + * typically have called BeginInternalSubTransaction() first, failing + * there. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit subtransactions during a parallel operation"))); + if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); @@ -3827,6 +4118,14 @@ RollbackAndReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted + * during parallel operations. That's because we may be in the master, + * recovering from an error thrown while we were in parallel mode. We + * won't reach here in a worker, because BeginInternalSubTransaction() + * will have failed. + */ + switch (s->blockState) { /* Must be in a subtransaction */ @@ -3838,6 +4137,7 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: case TBLOCK_END: @@ -3913,6 +4213,7 @@ AbortOutOfAnyTransaction(void) case TBLOCK_STARTED: case TBLOCK_BEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: case TBLOCK_PREPARE: @@ -4004,6 +4305,7 @@ TransactionBlockStatusCode(void) case TBLOCK_BEGIN: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4107,6 +4409,13 @@ CommitSubTransaction(void) CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, s->parent->subTransactionId); + /* If in parallel mode, clean up workers and exit parallel mode. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(true, s->subTransactionId); + s->parallelModeLevel = 0; + } + /* Do the actual "commit", such as it is */ s->state = TRANS_COMMIT; @@ -4260,6 +4569,13 @@ AbortSubTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* Exit from parallel mode, if necessary. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(false, s->subTransactionId); + s->parallelModeLevel = 0; + } + /* * We can skip all this stuff if the subxact failed before creating a * ResourceOwner... @@ -4400,6 +4716,7 @@ PushTransaction(void) s->blockState = TBLOCK_SUBBEGIN; GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; + s->parallelModeLevel = 0; CurrentTransactionState = s; @@ -4446,6 +4763,139 @@ PopTransaction(void) pfree(s); } +/* + * EstimateTransactionStateSpace + * Estimate the amount of space that will be needed by + * SerializeTransactionState. It would be OK to overestimate slightly, + * but it's simple for us to work out the precise value, so we do. + */ +Size +EstimateTransactionStateSpace(void) +{ + TransactionState s; + Size nxids = 5; /* iso level, deferrable, top & current XID, XID count */ + + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + + nxids = add_size(nxids, nParallelCurrentXids); + return mul_size(nxids, sizeof(TransactionId)); +} + +/* + * SerializeTransactionState + * Write out relevant details of our transaction state that will be + * needed by a parallel worker. + * + * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs + * associated with this transaction. The first eight bytes of the result + * contain XactDeferrable and XactIsoLevel; the next eight bytes contain the + * XID of the top-level transaction and the XID of the current transaction + * (or, in each case, InvalidTransactionId if none). After that, the next 4 + * bytes contain a count of how many additional XIDs follow; this is followed + * by all of those XIDs one after another. We emit the XIDs in sorted order + * for the convenience of the receiving process. + */ +void +SerializeTransactionState(Size maxsize, char *start_address) +{ + TransactionState s; + Size nxids = 0; + Size i = 0; + TransactionId *workspace; + TransactionId *result = (TransactionId *) start_address; + + Assert(maxsize >= 5 * sizeof(TransactionId)); + result[0] = (TransactionId) XactIsoLevel; + result[1] = (TransactionId) XactDeferrable; + result[2] = XactTopTransactionId; + result[3] = CurrentTransactionState->transactionId; + + /* + * If we're running in a parallel worker and launching a parallel worker + * of our own, we can just pass along the information that was passed to + * us. + */ + if (nParallelCurrentXids > 0) + { + Assert(maxsize > (nParallelCurrentXids + 4) * sizeof(TransactionId)); + result[4] = nParallelCurrentXids; + memcpy(&result[5], ParallelCurrentXids, + nParallelCurrentXids * sizeof(TransactionId)); + return; + } + + /* + * OK, we need to generate a sorted list of XIDs that our workers + * should view as current. First, figure out how many there are. + */ + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + Assert(nxids * sizeof(TransactionId) < maxsize); + + /* Copy them to our scratch space. */ + workspace = palloc(nxids * sizeof(TransactionId)); + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + workspace[i++] = s->transactionId; + memcpy(&workspace[i], s->childXids, + s->nChildXids * sizeof(TransactionId)); + i += s->nChildXids; + } + Assert(i == nxids); + + /* Sort them. */ + qsort(workspace, nxids, sizeof(TransactionId), xidComparator); + + /* Copy data into output area. */ + result[4] = (TransactionId) nxids; + memcpy(&result[5], workspace, nxids * sizeof(TransactionId)); +} + +/* + * StartParallelWorkerTransaction + * Start a parallel worker transaction, restoring the relevant + * transaction state serialized by SerializeTransactionState. + */ +void +StartParallelWorkerTransaction(char *tstatespace) +{ + TransactionId *tstate = (TransactionId *) tstatespace; + + Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT); + StartTransaction(); + + XactIsoLevel = (int) tstate[0]; + XactDeferrable = (bool) tstate[1]; + XactTopTransactionId = tstate[2]; + CurrentTransactionState->transactionId = tstate[3]; + nParallelCurrentXids = (int) tstate[4]; + ParallelCurrentXids = &tstate[5]; + + CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; +} + +/* + * EndParallelWorkerTransaction + * End a parallel worker transaction. + */ +void +EndParallelWorkerTransaction(void) +{ + Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS); + CommitTransaction(); + CurrentTransactionState->blockState = TBLOCK_DEFAULT; +} + /* * ShowTransactionState * Debug support @@ -4516,6 +4966,8 @@ BlockStateAsString(TBlockState blockState) return "BEGIN"; case TBLOCK_INPROGRESS: return "INPROGRESS"; + case TBLOCK_PARALLEL_INPROGRESS: + return "PARALLEL_INPROGRESS"; case TBLOCK_END: return "END"; case TBLOCK_ABORT: diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index da7b6c2fad..6cf441534c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -292,6 +292,14 @@ static TimeLineID curFileTLI; * end+1 of the last record, and is reset when we end a top-level transaction, * or start a new one; so it can be used to tell if the current transaction has * created any XLOG records. + * + * While in parallel mode, this may not be fully up to date. When committing, + * a transaction can assume this covers all xlog records written either by the + * user backend or by any parallel worker which was present at any point during + * the transaction. But when aborting, or when still in parallel mode, other + * parallel backends may have written WAL records at later LSNs than the value + * stored here. The parallel leader advances its own copy, when necessary, + * in WaitForParallelWorkersToFinish. */ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 1af977cb1d..2f6d697d82 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -20,6 +20,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/dependency.h" @@ -3646,6 +3647,12 @@ InitTempTableNamespace(void) (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), errmsg("cannot create temporary tables during recovery"))); + /* Parallel workers can't create temporary tables, either. */ + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("cannot create temporary tables in parallel mode"))); + snprintf(namespaceName, sizeof(namespaceName), "pg_temp_%d", MyBackendId); namespaceId = get_namespace_oid(namespaceName, true); @@ -3709,7 +3716,7 @@ InitTempTableNamespace(void) * End-of-transaction cleanup for namespaces. */ void -AtEOXact_Namespace(bool isCommit) +AtEOXact_Namespace(bool isCommit, bool parallel) { /* * If we abort the transaction in which a temp namespace was selected, @@ -3719,7 +3726,7 @@ AtEOXact_Namespace(bool isCommit) * at backend shutdown. (We only want to register the callback once per * session, so this is a good place to do it.) */ - if (myTempNamespaceSubID != InvalidSubTransactionId) + if (myTempNamespaceSubID != InvalidSubTransactionId && !parallel) { if (isCommit) before_shmem_exit(RemoveTempRelationsCallback, 0); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 92ff632e12..0d3721a96d 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -924,9 +924,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed) { Assert(rel); - /* check read-only transaction */ + /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); + PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 6d316d62b6..80f5553d3a 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -565,6 +565,13 @@ nextval_internal(Oid relid) if (!seqrel->rd_islocaltemp) PreventCommandIfReadOnly("nextval()"); + /* + * Forbid this during parallel operation because, to make it work, + * the cooperating backends would need to share the backend-local cached + * sequence information. Currently, we don't support that. + */ + PreventCommandIfParallelMode("nextval()"); + if (elm->last != elm->cached) /* some numbers were cached */ { Assert(elm->last_valid); @@ -862,6 +869,13 @@ do_setval(Oid relid, int64 next, bool iscalled) if (!seqrel->rd_islocaltemp) PreventCommandIfReadOnly("setval()"); + /* + * Forbid this during parallel operation because, to make it work, + * the cooperating backends would need to share the backend-local cached + * sequence information. Currently, we don't support that. + */ + PreventCommandIfParallelMode("setval()"); + /* lock page' buffer and read tuple */ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index df4da3faa9..26793eecf4 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -147,8 +147,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) /* * If the transaction is read-only, we need to check if any writes are * planned to non-temporary tables. EXPLAIN is considered read-only. + * + * Don't allow writes in parallel mode. Supporting UPDATE and DELETE would + * require (a) storing the combocid hash in shared memory, rather than + * synchronizing it just once at the start of parallelism, and (b) an + * alternative to heap_update()'s reliance on xmax for mutual exclusion. + * INSERT may have no such troubles, but we forbid it to simplify the + * checks. + * + * We have lower-level defenses in CommandCounterIncrement and elsewhere + * against performing unsafe operations in parallel mode, but this gives + * a more user-friendly error message. */ - if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + if ((XactReadOnly || IsInParallelMode()) && + !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); /* @@ -691,18 +703,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte) } /* - * Check that the query does not imply any writes to non-temp tables. + * Check that the query does not imply any writes to non-temp tables; + * unless we're in parallel mode, in which case don't even allow writes + * to temp tables. * * Note: in a Hot Standby slave this would need to reject writes to temp - * tables as well; but an HS slave can't have created any temp tables - * in the first place, so no need to check that. + * tables just as we do in parallel mode; but an HS slave can't have created + * any temp tables in the first place, so no need to check that. */ static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt) { ListCell *l; - /* Fail if write permissions are requested on any non-temp table */ + /* + * Fail if write permissions are requested in parallel mode for + * table (temp or non-temp), otherwise fail for any non-temp table. + */ foreach(l, plannedstmt->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(l); @@ -718,6 +735,9 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) PreventCommandIfReadOnly(CreateCommandTag((Node *) plannedstmt)); } + + if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt)); } diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 6c3eff7c58..ce49c471d4 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list, errmsg("%s is not allowed in a non-volatile function", CreateCommandTag(stmt)))); + if (IsInParallelMode() && !CommandIsReadOnly(stmt)) + PreventCommandIfParallelMode(CreateCommandTag(stmt)); + /* OK, build the execution_state for this query */ newes = (execution_state *) palloc(sizeof(execution_state)); if (preves) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index b3c05025bc..557d153f2a 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/spi_priv.h" +#include "miscadmin.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, } /* - * If told to be read-only, we'd better check for read-only queries. This - * can't be done earlier because we need to look at the finished, planned - * queries. (In particular, we don't want to do it between GetCachedPlan - * and PortalDefineQuery, because throwing an error between those steps - * would result in leaking our plancache refcount.) + * If told to be read-only, or in parallel mode, verify that this query + * is in fact read-only. This can't be done earlier because we need to + * look at the finished, planned queries. (In particular, we don't want + * to do it between GetCachedPlan and PortalDefineQuery, because throwing + * an error between those steps would result in leaking our plancache + * refcount.) */ - if (read_only) + if (read_only || IsInParallelMode()) { ListCell *lc; @@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, Node *pstmt = (Node *) lfirst(lc); if (!CommandIsReadOnly(pstmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a non-volatile function", - CreateCommandTag(pstmt)))); + { + if (read_only) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a non-volatile function", + CreateCommandTag(pstmt)))); + else + PreventCommandIfParallelMode(CreateCommandTag(pstmt)); + } } } @@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, errmsg("%s is not allowed in a non-volatile function", CreateCommandTag(stmt)))); + if (IsInParallelMode() && !CommandIsReadOnly(stmt)) + PreventCommandIfParallelMode(CreateCommandTag(stmt)); + /* * If not read-only mode, advance the command counter before each * command and update the snapshot. diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 307fb60665..f12f2d582e 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -16,12 +16,15 @@ #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" +#include "miscadmin.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" static shm_mq *pq_mq; static shm_mq_handle *pq_mq_handle; static bool pq_mq_busy = false; +static pid_t pq_mq_parallel_master_pid = 0; +static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId; static void mq_comm_reset(void); static int mq_flush(void); @@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh) FrontendProtocol = PG_PROTOCOL_LATEST; } +/* + * Arrange to SendProcSignal() to the parallel master each time we transmit + * message data via the shm_mq. + */ +void +pq_set_parallel_master(pid_t pid, BackendId backend_id) +{ + Assert(PqCommMethods == &PqCommMqMethods); + pq_mq_parallel_master_pid = pid; + pq_mq_parallel_master_backend_id = backend_id; +} + static void mq_comm_reset(void) { @@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len) iov[1].len = len; Assert(pq_mq_handle != NULL); - result = shm_mq_sendv(pq_mq_handle, iov, 2, false); + + for (;;) + { + result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + + if (pq_mq_parallel_master_pid != 0) + SendProcSignal(pq_mq_parallel_master_pid, + PROCSIG_PARALLEL_MESSAGE, + pq_mq_parallel_master_backend_id); + + if (result != SHM_MQ_WOULD_BLOCK) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } pq_mq_busy = false; diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index d4939415f0..377733377b 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -995,6 +995,56 @@ WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp) return status; } +/* + * Wait for a background worker to stop. + * + * If the worker hasn't yet started, or is running, we wait for it to stop + * and then return BGWH_STOPPED. However, if the postmaster has died, we give + * up and return BGWH_POSTMASTER_DIED, because it's the postmaster that + * notifies us when a worker's state changes. + */ +BgwHandleStatus +WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + bool save_set_latch_on_sigusr1; + + save_set_latch_on_sigusr1 = set_latch_on_sigusr1; + set_latch_on_sigusr1 = true; + + PG_TRY(); + { + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + status = GetBackgroundWorkerPid(handle, &pid); + if (status == BGWH_STOPPED) + return status; + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + + if (rc & WL_POSTMASTER_DEATH) + return BGWH_POSTMASTER_DIED; + + ResetLatch(&MyProc->procLatch); + } + } + PG_CATCH(); + { + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + PG_RE_THROW(); + } + PG_END_TRY(); + + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + return status; +} + /* * Instruct the postmaster to terminate a background worker. * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8eaec0ca6e..68cc6edd09 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1682,6 +1682,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) return result; } +/* + * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin + * + * This is like ProcArrayInstallImportedXmin, but we have a pointer to the + * PGPROC of the transaction from which we imported the snapshot, rather than + * an XID. + * + * Returns TRUE if successful, FALSE if source xact is no longer running. + */ +bool +ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc) +{ + bool result = false; + TransactionId xid; + volatile PGXACT *pgxact; + + Assert(TransactionIdIsNormal(xmin)); + Assert(proc != NULL); + + /* Get lock so source xact can't end while we're doing this */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + pgxact = &allPgXact[proc->pgprocno]; + + /* + * Be certain that the referenced PGPROC has an advertised xmin which + * is no later than the one we're installing, so that the system-wide + * xmin can't go backwards. Also, make sure it's running in the same + * database, so that the per-database xmin cannot go backwards. + */ + xid = pgxact->xmin; /* fetch just once */ + if (proc->databaseId == MyDatabaseId && + TransactionIdIsNormal(xid) && + TransactionIdPrecedesOrEquals(xid, xmin)) + { + MyPgXact->xmin = TransactionXmin = xmin; + result = true; + } + + LWLockRelease(ProcArrayLock); + + return result; +} + /* * GetRunningTransactionData -- returns information about running transactions. * diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 48573bef60..0abde43565 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -17,6 +17,7 @@ #include #include +#include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" #include "storage/latch.h" @@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT)) HandleNotifyInterrupt(); + if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE)) + HandleParallelMessageInterrupt(); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index b81ebeb260..01e03f0e84 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -1653,6 +1653,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, Assert(!RecoveryInProgress()); + /* + * Since all parts of a serializable transaction must use the same + * snapshot, it is too late to establish one after a parallel operation + * has begun. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot establish serializable snapshot during a parallel operation"); + proc = MyProc; Assert(proc != NULL); GET_VXID_FROM_PGPROC(vxid, *proc); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 33720e8f8b..ea2a43209d 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -36,6 +36,7 @@ #include "rusagestub.h" #endif +#include "access/parallel.h" #include "access/printtup.h" #include "access/xact.h" #include "catalog/pg_type.h" @@ -2988,7 +2989,8 @@ ProcessInterrupts(void) } } - /* If we get here, do nothing (probably, QueryCancelPending was reset) */ + if (ParallelMessagePending) + HandleParallelMessages(); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 31e9d4cf8b..59f09dc93a 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree) static void check_xact_readonly(Node *parsetree) { - if (!XactReadOnly) + /* Only perform the check if we have a reason to do so. */ + if (!XactReadOnly && !IsInParallelMode()) return; /* * Note: Commands that need to do more complicated checking are handled * elsewhere, in particular COPY and plannable statements do their own - * checking. However they should all call PreventCommandIfReadOnly to - * actually throw the error. + * checking. However they should all call PreventCommandIfReadOnly + * or PreventCommandIfParallelMode to actually throw the error. */ switch (nodeTag(parsetree)) @@ -208,6 +209,7 @@ check_xact_readonly(Node *parsetree) case T_ImportForeignSchemaStmt: case T_SecLabelStmt: PreventCommandIfReadOnly(CreateCommandTag(parsetree)); + PreventCommandIfParallelMode(CreateCommandTag(parsetree)); break; default: /* do nothing */ @@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname) cmdname))); } +/* + * PreventCommandIfParallelMode: throw error if current (sub)transaction is + * in parallel mode. + * + * This is useful mainly to ensure consistency of the error message wording; + * most callers have checked IsInParallelMode() for themselves. + */ +void +PreventCommandIfParallelMode(const char *cmdname) +{ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + /* translator: %s is name of a SQL command, eg CREATE */ + errmsg("cannot execute %s during a parallel operation", + cmdname))); +} + /* * PreventCommandDuringRecovery: throw error if RecoveryInProgress * @@ -618,6 +638,7 @@ standard_ProcessUtility(Node *parsetree, case T_ClusterStmt: /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("CLUSTER"); + /* forbidden in parallel mode due to CommandIsReadOnly */ cluster((ClusterStmt *) parsetree, isTopLevel); break; @@ -628,6 +649,7 @@ standard_ProcessUtility(Node *parsetree, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"); + /* forbidden in parallel mode due to CommandIsReadOnly */ ExecVacuum(stmt, isTopLevel); } break; @@ -704,6 +726,7 @@ standard_ProcessUtility(Node *parsetree, * outside a transaction block is presumed to be user error. */ RequireTransactionChain(isTopLevel, "LOCK TABLE"); + /* forbidden in parallel mode due to CommandIsReadOnly */ LockTableCommand((LockStmt *) parsetree); break; @@ -735,6 +758,7 @@ standard_ProcessUtility(Node *parsetree, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("REINDEX"); + /* forbidden in parallel mode due to CommandIsReadOnly */ switch (stmt->kind) { case REINDEX_OBJECT_INDEX: diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c index a1967b6963..491824dd6b 100644 --- a/src/backend/utils/adt/lockfuncs.c +++ b/src/backend/utils/adt/lockfuncs.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/xact.h" #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" @@ -411,6 +412,15 @@ pg_lock_status(PG_FUNCTION_ARGS) #define SET_LOCKTAG_INT32(tag, key1, key2) \ SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2) +static void +PreventAdvisoryLocksInParallelMode(void) +{ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot use advisory locks during a parallel operation"))); +} + /* * pg_advisory_lock(int8) - acquire exclusive lock on an int8 key */ @@ -420,6 +430,7 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ExclusiveLock, true, false); @@ -437,6 +448,7 @@ pg_advisory_xact_lock_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ExclusiveLock, false, false); @@ -453,6 +465,7 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ShareLock, true, false); @@ -470,6 +483,7 @@ pg_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ShareLock, false, false); @@ -489,6 +503,7 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ExclusiveLock, true, true); @@ -509,6 +524,7 @@ pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ExclusiveLock, false, true); @@ -528,6 +544,7 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ShareLock, true, true); @@ -548,6 +565,7 @@ pg_try_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ShareLock, false, true); @@ -567,6 +585,7 @@ pg_advisory_unlock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockRelease(&tag, ExclusiveLock, true); @@ -586,6 +605,7 @@ pg_advisory_unlock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT64(tag, key); res = LockRelease(&tag, ShareLock, true); @@ -603,6 +623,7 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ExclusiveLock, true, false); @@ -621,6 +642,7 @@ pg_advisory_xact_lock_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ExclusiveLock, false, false); @@ -638,6 +660,7 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ShareLock, true, false); @@ -656,6 +679,7 @@ pg_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ShareLock, false, false); @@ -676,6 +700,7 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ExclusiveLock, true, true); @@ -697,6 +722,7 @@ pg_try_advisory_xact_lock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ExclusiveLock, false, true); @@ -717,6 +743,7 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ShareLock, true, true); @@ -738,6 +765,7 @@ pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ShareLock, false, true); @@ -758,6 +786,7 @@ pg_advisory_unlock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockRelease(&tag, ExclusiveLock, true); @@ -778,6 +807,7 @@ pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; bool res; + PreventAdvisoryLocksInParallelMode(); SET_LOCKTAG_INT32(tag, key1, key2); res = LockRelease(&tag, ShareLock, true); diff --git a/src/backend/utils/fmgr/dfmgr.c b/src/backend/utils/fmgr/dfmgr.c index 7476a26b79..46bc1f238f 100644 --- a/src/backend/utils/fmgr/dfmgr.c +++ b/src/backend/utils/fmgr/dfmgr.c @@ -23,6 +23,7 @@ #endif #include "lib/stringinfo.h" #include "miscadmin.h" +#include "storage/shmem.h" #include "utils/dynamic_loader.h" #include "utils/hsearch.h" @@ -692,3 +693,56 @@ find_rendezvous_variable(const char *varName) return &hentry->varValue; } + +/* + * Estimate the amount of space needed to serialize the list of libraries + * we have loaded. + */ +Size +EstimateLibraryStateSpace(void) +{ + DynamicFileList *file_scanner; + Size size = 1; + + for (file_scanner = file_list; + file_scanner != NULL; + file_scanner = file_scanner->next) + size = add_size(size, strlen(file_scanner->filename) + 1); + + return size; +} + +/* + * Serialize the list of libraries we have loaded to a chunk of memory. + */ +void +SerializeLibraryState(Size maxsize, char *start_address) +{ + DynamicFileList *file_scanner; + + for (file_scanner = file_list; + file_scanner != NULL; + file_scanner = file_scanner->next) + { + Size len; + + len = strlcpy(start_address, file_scanner->filename, maxsize) + 1; + Assert(len < maxsize); + maxsize -= len; + start_address += len; + } + start_address[0] = '\0'; +} + +/* + * Load every library the serializing backend had loaded. + */ +void +RestoreLibraryState(char *start_address) +{ + while (*start_address != '\0') + { + internal_load_library(start_address); + start_address += strlen(start_address) + 1; + } +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f43aff2d2c..8727ee3b74 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -5665,6 +5665,20 @@ set_config_option(const char *name, const char *value, elevel = ERROR; } + /* + * GUC_ACTION_SAVE changes are acceptable during a parallel operation, + * because the current worker will also pop the change. We're probably + * dealing with a function having a proconfig entry. Only the function's + * body should observe the change, and peer workers do not share in the + * execution of a function call started by this worker. + * + * Other changes might need to affect other workers, so forbid them. + */ + if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE) + ereport(elevel, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot set parameters during a parallel operation"))); + record = find_option(name, true, elevel); if (record == NULL) { @@ -6969,6 +6983,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel) { GucAction action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET; + /* + * Workers synchronize these parameters at the start of the parallel + * operation; then, we block SET during the operation. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot set parameters during a parallel operation"))); + switch (stmt->kind) { case VAR_SET_VALUE: diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c index bfd7d0ad42..cc5409b880 100644 --- a/src/backend/utils/time/combocid.c +++ b/src/backend/utils/time/combocid.c @@ -44,6 +44,7 @@ #include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" +#include "storage/shmem.h" #include "utils/combocid.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -286,3 +287,76 @@ GetRealCmax(CommandId combocid) Assert(combocid < usedComboCids); return comboCids[combocid].cmax; } + +/* + * Estimate the amount of space required to serialize the current ComboCID + * state. + */ +Size +EstimateComboCIDStateSpace(void) +{ + Size size; + + /* Add space required for saving usedComboCids */ + size = sizeof(int); + + /* Add space required for saving the combocids key */ + size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids)); + + return size; +} + +/* + * Serialize the ComboCID state into the memory, beginning at start_address. + * maxsize should be at least as large as the value returned by + * EstimateComboCIDStateSpace. + */ +void +SerializeComboCIDState(Size maxsize, char *start_address) +{ + char *endptr; + + /* First, we store the number of currently-existing ComboCIDs. */ + * (int *) start_address = usedComboCids; + + /* If maxsize is too small, throw an error. */ + endptr = start_address + sizeof(int) + + (sizeof(ComboCidKeyData) * usedComboCids); + if (endptr < start_address || endptr > start_address + maxsize) + elog(ERROR, "not enough space to serialize ComboCID state"); + + /* Now, copy the actual cmin/cmax pairs. */ + memcpy(start_address + sizeof(int), comboCids, + (sizeof(ComboCidKeyData) * usedComboCids)); +} + +/* + * Read the ComboCID state at the specified address and initialize this + * backend with the same ComboCIDs. This is only valid in a backend that + * currently has no ComboCIDs (and only makes sense if the transaction state + * is serialized and restored as well). + */ +void +RestoreComboCIDState(char *comboCIDstate) +{ + int num_elements; + ComboCidKeyData *keydata; + int i; + CommandId cid; + + Assert(!comboCids && !comboHash); + + /* First, we retrieve the number of ComboCIDs that were serialized. */ + num_elements = * (int *) comboCIDstate; + keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int)); + + /* Use GetComboCommandId to restore each ComboCID. */ + for (i = 0; i < num_elements; i++) + { + cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax); + + /* Verify that we got the expected answer. */ + if (cid != i) + elog(ERROR, "unexpected command ID while restoring combo CIDs"); + } +} diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 7cfa0cf848..a2cb4a037f 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -157,6 +157,22 @@ static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); +/* + * Snapshot fields to be serialized. + * + * Only these fields need to be sent to the cooperating backend; the + * remaining ones can (and must) set by the receiver upon restore. + */ +typedef struct SerializedSnapshotData +{ + TransactionId xmin; + TransactionId xmax; + uint32 xcnt; + int32 subxcnt; + bool suboverflowed; + bool takenDuringRecovery; + CommandId curcid; +} SerializedSnapshotData; /* * GetTransactionSnapshot @@ -188,6 +204,10 @@ GetTransactionSnapshot(void) Assert(pairingheap_is_empty(&RegisteredSnapshots)); Assert(FirstXactSnapshot == NULL); + if (IsInParallelMode()) + elog(ERROR, + "cannot take query snapshot during a parallel operation"); + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must @@ -238,6 +258,14 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { + /* + * We might be able to relax this, but nothing that could otherwise work + * needs it. + */ + if (IsInParallelMode()) + elog(ERROR, + "cannot update SecondarySnapshot during a parallel operation"); + /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. @@ -347,7 +375,8 @@ SnapshotSetCommandId(CommandId curcid) * in GetTransactionSnapshot. */ static void -SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) +SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, + PGPROC *sourceproc) { /* Caller should have checked this already */ Assert(!FirstSnapshotSet); @@ -394,7 +423,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) * doesn't seem worth contorting the logic here to avoid two calls, * especially since it's not clear that predicate.c *must* do this. */ - if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + if (sourceproc != NULL) + { + if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import the requested snapshot"), + errdetail("The source transaction is not running anymore."))); + } + else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), @@ -550,11 +587,24 @@ PushCopiedSnapshot(Snapshot snapshot) void UpdateActiveSnapshotCommandId(void) { + CommandId save_curcid, curcid; Assert(ActiveSnapshot != NULL); Assert(ActiveSnapshot->as_snap->active_count == 1); Assert(ActiveSnapshot->as_snap->regd_count == 0); - ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false); + /* + * Don't allow modification of the active snapshot during parallel + * operation. We share the snapshot to worker backends at beginning of + * parallel operation, so any change to snapshot can lead to + * inconsistencies. We have other defenses against + * CommandCounterIncrement, but there are a few places that call this + * directly, so we put an additional guard here. + */ + save_curcid = ActiveSnapshot->as_snap->curcid; + curcid = GetCurrentCommandId(false); + if (IsInParallelMode() && save_curcid != curcid) + elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation"); + ActiveSnapshot->as_snap->curcid = curcid; } /* @@ -1289,7 +1339,7 @@ ImportSnapshot(const char *idstr) errmsg("cannot import a snapshot from a different database"))); /* OK, install the snapshot */ - SetTransactionSnapshot(&snapshot, src_xid); + SetTransactionSnapshot(&snapshot, src_xid, NULL); } /* @@ -1393,3 +1443,155 @@ HistoricSnapshotGetTupleCids(void) Assert(HistoricSnapshotActive()); return tuplecid_data; } + +/* + * EstimateSnapshotSpace + * Returns the size need to store the given snapshot. + * + * We are exporting only required fields from the Snapshot, stored in + * SerializedSnapshotData. + */ +Size +EstimateSnapshotSpace(Snapshot snap) +{ + Size size; + + Assert(snap != InvalidSnapshot); + Assert(snap->satisfies == HeapTupleSatisfiesMVCC); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = add_size(sizeof(SerializedSnapshotData), + mul_size(snap->xcnt, sizeof(TransactionId))); + if (snap->subxcnt > 0 && + (!snap->suboverflowed || snap->takenDuringRecovery)) + size = add_size(size, + mul_size(snap->subxcnt, sizeof(TransactionId))); + + return size; +} + +/* + * SerializeSnapshot + * Dumps the serialized snapshot (extracted from given snapshot) onto the + * memory location at start_address. + */ +void +SerializeSnapshot(Snapshot snapshot, char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + + Assert(snapshot->xcnt >= 0); + Assert(snapshot->subxcnt >= 0); + + serialized_snapshot = (SerializedSnapshotData *) start_address; + + /* Copy all required fields */ + serialized_snapshot->xmin = snapshot->xmin; + serialized_snapshot->xmax = snapshot->xmax; + serialized_snapshot->xcnt = snapshot->xcnt; + serialized_snapshot->subxcnt = snapshot->subxcnt; + serialized_snapshot->suboverflowed = snapshot->suboverflowed; + serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery; + serialized_snapshot->curcid = snapshot->curcid; + + /* + * Ignore the SubXID array if it has overflowed, unless the snapshot + * was taken during recovey - in that case, top-level XIDs are in subxip + * as well, and we mustn't lose them. + */ + if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery) + serialized_snapshot->subxcnt = 0; + + /* Copy XID array */ + if (snapshot->xcnt > 0) + memcpy((TransactionId *) (serialized_snapshot + 1), + snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); + + /* + * Copy SubXID array. Don't bother to copy it if it had overflowed, + * though, because it's not used anywhere in that case. Except if it's a + * snapshot taken during recovery; all the top-level XIDs are in subxip as + * well in that case, so we mustn't lose them. + */ + if (snapshot->subxcnt > 0) + { + Size subxipoff = sizeof(SerializedSnapshotData) + + snapshot->xcnt * sizeof(TransactionId); + + memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff), + snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); + } +} + +/* + * RestoreSnapshot + * Restore a serialized snapshot from the specified address. + * + * The copy is palloc'd in TopTransactionContext and has initial refcounts set + * to 0. The returned snapshot has the copied flag set. + */ +Snapshot +RestoreSnapshot(char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + Size size; + Snapshot snapshot; + TransactionId *serialized_xids; + + serialized_snapshot = (SerializedSnapshotData *) start_address; + serialized_xids = (TransactionId *) + (start_address + sizeof(SerializedSnapshotData)); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = sizeof(SnapshotData) + + serialized_snapshot->xcnt * sizeof(TransactionId) + + serialized_snapshot->subxcnt * sizeof(TransactionId); + + /* Copy all required fields */ + snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); + snapshot->satisfies = HeapTupleSatisfiesMVCC; + snapshot->xmin = serialized_snapshot->xmin; + snapshot->xmax = serialized_snapshot->xmax; + snapshot->xip = NULL; + snapshot->xcnt = serialized_snapshot->xcnt; + snapshot->subxip = NULL; + snapshot->subxcnt = serialized_snapshot->subxcnt; + snapshot->suboverflowed = serialized_snapshot->suboverflowed; + snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery; + snapshot->curcid = serialized_snapshot->curcid; + + /* Copy XIDs, if present. */ + if (serialized_snapshot->xcnt > 0) + { + snapshot->xip = (TransactionId *) (snapshot + 1); + memcpy(snapshot->xip, serialized_xids, + serialized_snapshot->xcnt * sizeof(TransactionId)); + } + + /* Copy SubXIDs, if present. */ + if (serialized_snapshot->subxcnt > 0) + { + snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt; + memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt, + serialized_snapshot->subxcnt * sizeof(TransactionId)); + } + + /* Set the copied flag so that the caller will set refcounts correctly. */ + snapshot->regd_count = 0; + snapshot->active_count = 0; + snapshot->copied = true; + + return snapshot; +} + +/* + * Install a restored snapshot as the transaction snapshot. + * + * The second argument is of type void * so that snapmgr.h need not include + * the declaration for PGPROC. + */ +void +RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) +{ + SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); +} diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h new file mode 100644 index 0000000000..8274f841b6 --- /dev/null +++ b/src/include/access/parallel.h @@ -0,0 +1,68 @@ +/*------------------------------------------------------------------------- + * + * parallel.h + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel.h + * + *------------------------------------------------------------------------- + */ + +#ifndef PARALLEL_H +#define PARALLEL_H + +#include "access/xlogdefs.h" +#include "lib/ilist.h" +#include "postmaster/bgworker.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "utils/elog.h" + +typedef void (*parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc); + +typedef struct ParallelWorkerInfo +{ + BackgroundWorkerHandle *bgwhandle; + shm_mq_handle *error_mqh; + int32 pid; +} ParallelWorkerInfo; + +typedef struct ParallelContext +{ + dlist_node node; + SubTransactionId subid; + int nworkers; + parallel_worker_main_type entrypoint; + char *library_name; + char *function_name; + ErrorContextCallback *error_context_stack; + shm_toc_estimator estimator; + dsm_segment *seg; + void *private; + shm_toc *toc; + ParallelWorkerInfo *worker; +} ParallelContext; + +extern bool ParallelMessagePending; +extern int ParallelWorkerNumber; + +#define IsParallelWorker() (ParallelWorkerNumber >= 0) + +extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); +extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); +extern void InitializeParallelDSM(ParallelContext *); +extern void LaunchParallelWorkers(ParallelContext *); +extern void WaitForParallelWorkersToFinish(ParallelContext *); +extern void DestroyParallelContext(ParallelContext *); +extern bool ParallelContextActive(void); + +extern void HandleParallelMessageInterrupt(void); +extern void HandleParallelMessages(void); +extern void AtEOXact_Parallel(bool isCommit); +extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId); +extern void ParallelWorkerReportLastRecEnd(XLogRecPtr); + +#endif /* PARALLEL_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cad1bb1d31..ed808fc150 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -78,9 +78,12 @@ extern bool MyXactAccessedTempRel; typedef enum { XACT_EVENT_COMMIT, + XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_ABORT, + XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PREPARE, XACT_EVENT_PRE_COMMIT, + XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_PREPARE } XactEvent; @@ -332,6 +335,10 @@ extern void BeginInternalSubTransaction(char *name); extern void ReleaseCurrentSubTransaction(void); extern void RollbackAndReleaseCurrentSubTransaction(void); extern bool IsSubTransaction(void); +extern Size EstimateTransactionStateSpace(void); +extern void SerializeTransactionState(Size maxsize, char *start_address); +extern void StartParallelWorkerTransaction(char *tstatespace); +extern void EndParallelWorkerTransaction(void); extern bool IsTransactionBlock(void); extern bool IsTransactionOrTransactionBlock(void); extern char TransactionBlockStatusCode(void); @@ -368,4 +375,8 @@ extern const char *xact_identify(uint8 info); extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed); extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed); +extern void EnterParallelMode(void); +extern void ExitParallelMode(void); +extern bool IsInParallelMode(void); + #endif /* XACT_H */ diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index cf5f7d0a78..f3b005fa9d 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -140,7 +140,7 @@ extern Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding); /* initialization & transaction cleanup code */ extern void InitializeSearchPath(void); -extern void AtEOXact_Namespace(bool isCommit); +extern void AtEOXact_Namespace(bool isCommit, bool parallel); extern void AtEOSubXact_Namespace(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); diff --git a/src/include/fmgr.h b/src/include/fmgr.h index 418f6aadaa..b9a5c40f59 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -642,6 +642,9 @@ extern PGFunction load_external_function(char *filename, char *funcname, extern PGFunction lookup_external_function(void *filehandle, char *funcname); extern void load_file(const char *filename, bool restricted); extern void **find_rendezvous_variable(const char *varName); +extern Size EstimateLibraryStateSpace(void); +extern void SerializeLibraryState(Size maxsize, char *start_address); +extern void RestoreLibraryState(char *start_address); /* * Support for aggregate functions diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index 5f2815ca90..ad7589d4ed 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -17,6 +17,7 @@ #include "storage/shm_mq.h" extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *); +extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index eacfccbcba..c389939738 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -271,6 +271,7 @@ extern void check_stack_depth(void); /* in tcop/utility.c */ extern void PreventCommandIfReadOnly(const char *cmdname); +extern void PreventCommandIfParallelMode(const char *cmdname); extern void PreventCommandDuringRecovery(const char *cmdname); /* in utils/misc/guc.c */ diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index a81b90badc..de9180df91 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -112,6 +112,8 @@ extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle * handle, pid_t *pid); +extern BgwHandleStatus +WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *); /* Terminate a bgworker */ extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 97c6e9344e..a9b40ed944 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid); +extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc); extern RunningTransactions GetRunningTransactionData(void); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index ac9d236dec..af1a0cd71f 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -31,6 +31,7 @@ typedef enum { PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ + PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ /* Recovery conflict reasons */ PROCSIG_RECOVERY_CONFLICT_DATABASE, diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h index ce7b47c24e..f2faa12623 100644 --- a/src/include/utils/combocid.h +++ b/src/include/utils/combocid.h @@ -21,5 +21,8 @@ */ extern void AtEOXact_ComboCid(void); +extern void RestoreComboCIDState(char *comboCIDstate); +extern void SerializeComboCIDState(Size maxsize, char *start_address); +extern Size EstimateComboCIDStateSpace(void); #endif /* COMBOCID_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 64d2ec1e5e..f8524eb687 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -64,4 +64,9 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids) extern void TeardownHistoricSnapshot(bool is_error); extern bool HistoricSnapshotActive(void); +extern Size EstimateSnapshotSpace(Snapshot snapshot); +extern void SerializeSnapshot(Snapshot snapshot, char *start_address); +extern Snapshot RestoreSnapshot(char *start_address); +extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc); + #endif /* SNAPMGR_H */