diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index 5acc9537d6..dae9dd7f2f 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -545,10 +545,10 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; - Functions and aggregates must be marked PARALLEL UNSAFE if - they write to the database, access sequences, change the transaction state - even temporarily (e.g., a PL/pgSQL function that establishes an - EXCEPTION block to catch errors), or make persistent changes to + Functions and aggregates must be marked PARALLEL UNSAFE + if they write to the database, change the transaction state (other than by + using a subtransaction for error recovery), access sequences, or make + persistent changes to settings. Similarly, functions must be marked PARALLEL RESTRICTED if they access temporary tables, client connection state, cursors, prepared statements, or miscellaneous backend-local state that diff --git a/doc/src/sgml/ref/create_function.sgml b/doc/src/sgml/ref/create_function.sgml index 863d99d1fc..0d240484cd 100644 --- a/doc/src/sgml/ref/create_function.sgml +++ b/doc/src/sgml/ref/create_function.sgml @@ -428,22 +428,24 @@ CREATE [ OR REPLACE ] FUNCTION PARALLEL - PARALLEL UNSAFE indicates that the function - can't be executed in parallel mode and the presence of such a + + PARALLEL UNSAFE indicates that the function + can't be executed in parallel mode; the presence of such a function in an SQL statement forces a serial execution plan. This is the default. PARALLEL RESTRICTED indicates that - the function can be executed in parallel mode, but the execution is - restricted to parallel group leader. PARALLEL SAFE + the function can be executed in parallel mode, but only in the parallel + group leader process. PARALLEL SAFE indicates that the function is safe to run in parallel mode without - restriction. + restriction, including in parallel worker processes. Functions should be labeled parallel unsafe if they modify any database - state, or if they make changes to the transaction such as using - sub-transactions, or if they access sequences or attempt to make - persistent changes to settings (e.g., setval). They should - be labeled as parallel restricted if they access temporary tables, + state, change the transaction state (other than by using a + subtransaction for error recovery), access sequences (e.g., by + calling currval) or make persistent changes to + settings. They should + be labeled parallel restricted if they access temporary tables, client connection state, cursors, prepared statements, or miscellaneous backend-local state which the system cannot synchronize in parallel mode (e.g., setseed cannot be executed other than by the group diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index e486bffc47..9df3da91b0 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -137,7 +137,7 @@ Transaction Integration ======================= Regardless of what the TransactionState stack looks like in the parallel -leader, each parallel worker ends up with a stack of depth 1. This stack +leader, each parallel worker begins with a stack of depth 1. This stack entry is marked with the special transaction block state TBLOCK_PARALLEL_INPROGRESS so that it's not confused with an ordinary toplevel transaction. The XID of this TransactionState is set to the XID of @@ -153,18 +153,18 @@ associated with the memory contexts or resource owners of intermediate subtransactions. No meaningful change to the transaction state can be made while in parallel -mode. No XIDs can be assigned, and no subtransactions can start or end, +mode. No XIDs can be assigned, and no command counter increments can happen, because we have no way of communicating these state changes to cooperating backends, or of synchronizing them. It's clearly unworkable for the initiating backend to exit any transaction or subtransaction that was in progress when parallelism was started before all parallel workers have exited; and it's even more clearly crazy for a parallel worker to try to subcommit or subabort the current subtransaction and execute in some other transaction context than was -present in the initiating backend. It might be practical to allow internal -sub-transactions (e.g. to implement a PL/pgSQL EXCEPTION block) to be used in -parallel mode, provided that they are XID-less, because other backends -wouldn't really need to know about those transactions or do anything -differently because of them. Right now, we don't even allow that. +present in the initiating backend. However, we allow internal subtransactions +(e.g. those used to implement a PL/pgSQL EXCEPTION block) to be used in +parallel mode, provided that they remain XID-less, because other backends +don't really need to know about those transactions or do anything differently +because of them. At the end of a parallel operation, which can happen either because it completed successfully or because it was interrupted by an error, parallel diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 751c251cf5..8613fc6fb5 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -1226,10 +1226,8 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) /* * End-of-subtransaction cleanup for parallel contexts. * - * Currently, it's forbidden to enter or leave a subtransaction while - * parallel mode is in effect, so we could just blow away everything. But - * we may want to relax that restriction in the future, so this code - * contemplates that there may be multiple subtransaction IDs in pcxt_list. + * Here we remove only parallel contexts initiated within the current + * subtransaction. */ void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) @@ -1249,6 +1247,8 @@ AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) /* * End-of-transaction cleanup for parallel contexts. + * + * We nuke all remaining parallel contexts. */ void AtEOXact_Parallel(bool isCommit) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1d930752c5..df5a67e4c3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -183,6 +183,10 @@ typedef enum TBlockState /* * transaction state structure + * + * Note: parallelModeLevel counts the number of unmatched EnterParallelMode + * calls done at this transaction level. parallelChildXact is true if any + * upper transaction level has nonzero parallelModeLevel. */ typedef struct TransactionStateData { @@ -205,6 +209,7 @@ typedef struct TransactionStateData bool startedInRecovery; /* did we start in recovery? */ bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ + bool parallelChildXact; /* is any parent transaction parallel? */ bool chain; /* start a new block after this one */ bool topXidLogged; /* for a subxact: is top-level XID logged? */ struct TransactionStateData *parent; /* back link to parent */ @@ -639,7 +644,9 @@ AssignTransactionId(TransactionState s) * operation, so we can't account for new XIDs at this point. */ if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot assign XIDs during a parallel operation"); + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot assign XIDs during a parallel operation"))); /* * Ensure parent(s) have XIDs, so that a child always has an XID later @@ -827,7 +834,11 @@ GetCurrentCommandId(bool used) * could relax this restriction when currentCommandIdUsed was already * true at the start of the parallel operation. */ - Assert(!IsParallelWorker()); + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot modify data in a parallel worker"))); + currentCommandIdUsed = true; } return currentCommandId; @@ -1052,7 +1063,8 @@ ExitParallelMode(void) TransactionState s = CurrentTransactionState; Assert(s->parallelModeLevel > 0); - Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); + Assert(s->parallelModeLevel > 1 || s->parallelChildXact || + !ParallelContextActive()); --s->parallelModeLevel; } @@ -1065,11 +1077,17 @@ ExitParallelMode(void) * match across all workers. Mere caches usually don't require such a * restriction. State modified in a strict push/pop fashion, such as the * active snapshot stack, is often fine. + * + * We say we are in parallel mode if we are in a subxact of a transaction + * that's initiated a parallel operation; for most purposes that context + * has all the same restrictions. */ bool IsInParallelMode(void) { - return CurrentTransactionState->parallelModeLevel != 0; + TransactionState s = CurrentTransactionState; + + return s->parallelModeLevel != 0 || s->parallelChildXact; } /* @@ -1092,7 +1110,9 @@ CommandCounterIncrement(void) * point. */ if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot start commands during a parallel operation"); + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start commands during a parallel operation"))); currentCommandId += 1; if (currentCommandId == InvalidCommandId) @@ -2210,9 +2230,26 @@ CommitTransaction(void) CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); - /* If we might have parallel workers, clean them up now. */ - if (IsInParallelMode()) - AtEOXact_Parallel(true); + /* + * If this xact has started any unfinished parallel operation, clean up + * its workers, warning about leaked resources. (But we don't actually + * reset parallelModeLevel till entering TRANS_COMMIT, a bit below. This + * keeps parallel mode restrictions active as long as possible in a + * parallel worker.) + */ + AtEOXact_Parallel(true); + if (is_parallel_worker) + { + if (s->parallelModeLevel != 1) + elog(WARNING, "parallelModeLevel is %d not 1 at end of parallel worker transaction", + s->parallelModeLevel); + } + else + { + if (s->parallelModeLevel != 0) + elog(WARNING, "parallelModeLevel is %d not 0 at end of transaction", + s->parallelModeLevel); + } /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2263,6 +2300,7 @@ CommitTransaction(void) */ s->state = TRANS_COMMIT; s->parallelModeLevel = 0; + s->parallelChildXact = false; /* should be false already */ /* Disable transaction timeout */ if (TransactionTimeout > 0) @@ -2804,12 +2842,13 @@ AbortTransaction(void) /* Reset snapshot export state. */ SnapBuildResetExportedSnapshotState(); - /* If in parallel mode, clean up workers and exit parallel mode. */ - if (IsInParallelMode()) - { - AtEOXact_Parallel(false); - s->parallelModeLevel = 0; - } + /* + * If this xact has started any unfinished parallel operation, clean up + * its workers and exit parallel mode. Don't warn about leaked resources. + */ + AtEOXact_Parallel(false); + s->parallelModeLevel = 0; + s->parallelChildXact = false; /* should be false already */ /* * do abort processing @@ -2937,6 +2976,7 @@ CleanupTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; s->parallelModeLevel = 0; + s->parallelChildXact = false; XactTopFullTransactionId = InvalidFullTransactionId; nParallelCurrentXids = 0; @@ -4303,7 +4343,7 @@ DefineSavepoint(const char *name) * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case * below.) */ - if (IsInParallelMode()) + if (IsInParallelMode() || IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot define savepoints during a parallel operation"))); @@ -4390,7 +4430,7 @@ ReleaseSavepoint(const char *name) * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case * below.) */ - if (IsInParallelMode()) + if (IsInParallelMode() || IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot release savepoints during a parallel operation"))); @@ -4499,7 +4539,7 @@ RollbackToSavepoint(const char *name) * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case * below.) */ - if (IsInParallelMode()) + if (IsInParallelMode() || IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot rollback to savepoints during a parallel operation"))); @@ -4605,38 +4645,40 @@ RollbackToSavepoint(const char *name) /* * BeginInternalSubTransaction * This is the same as DefineSavepoint except it allows TBLOCK_STARTED, - * TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_END, and TBLOCK_PREPARE states, - * and therefore it can safely be used in functions that might be called - * when not inside a BEGIN block or when running deferred triggers at - * COMMIT/PREPARE time. Also, it automatically does - * CommitTransactionCommand/StartTransactionCommand instead of expecting - * the caller to do it. + * TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_PARALLEL_INPROGRESS, TBLOCK_END, + * and TBLOCK_PREPARE states, and therefore it can safely be used in + * functions that might be called when not inside a BEGIN block or when + * running deferred triggers at COMMIT/PREPARE time. Also, it + * automatically does CommitTransactionCommand/StartTransactionCommand + * instead of expecting the caller to do it. */ void BeginInternalSubTransaction(const char *name) { TransactionState s = CurrentTransactionState; + bool save_ExitOnAnyError = ExitOnAnyError; /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for new subtransactions after that - * point. We might be able to make an exception for the type of - * subtransaction established by this function, which is typically used in - * contexts where we're going to release or roll back the subtransaction - * before proceeding further, so that no enduring change to the - * transaction state occurs. For now, however, we prohibit this case along - * with all the others. + * Errors within this function are improbable, but if one does happen we + * force a FATAL exit. Callers generally aren't prepared to handle losing + * control, and moreover our transaction state is probably corrupted if we + * fail partway through; so an ordinary ERROR longjmp isn't okay. + */ + ExitOnAnyError = true; + + /* + * We do not check for parallel mode here. It's permissible to start and + * end "internal" subtransactions while in parallel mode, so long as no + * new XIDs or command IDs are assigned. Enforcement of that occurs in + * AssignTransactionId() and CommandCounterIncrement(). */ - if (IsInParallelMode()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot start subtransactions during a parallel operation"))); switch (s->blockState) { case TBLOCK_STARTED: case TBLOCK_INPROGRESS: case TBLOCK_IMPLICIT_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_PREPARE: case TBLOCK_SUBINPROGRESS: @@ -4655,7 +4697,6 @@ BeginInternalSubTransaction(const char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: - case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -4674,6 +4715,8 @@ BeginInternalSubTransaction(const char *name) CommitTransactionCommand(); StartTransactionCommand(); + + ExitOnAnyError = save_ExitOnAnyError; } /* @@ -4689,16 +4732,10 @@ ReleaseCurrentSubTransaction(void) TransactionState s = CurrentTransactionState; /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for commit of subtransactions after that - * point. This should not happen anyway. Code calling this would - * typically have called BeginInternalSubTransaction() first, failing - * there. + * We do not check for parallel mode here. It's permissible to start and + * end "internal" subtransactions while in parallel mode, so long as no + * new XIDs or command IDs are assigned. */ - if (IsInParallelMode()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot commit subtransactions during a parallel operation"))); if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", @@ -4723,11 +4760,9 @@ RollbackAndReleaseCurrentSubTransaction(void) TransactionState s = CurrentTransactionState; /* - * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted - * during parallel operations. That's because we may be in the leader, - * recovering from an error thrown while we were in parallel mode. We - * won't reach here in a worker, because BeginInternalSubTransaction() - * will have failed. + * We do not check for parallel mode here. It's permissible to start and + * end "internal" subtransactions while in parallel mode, so long as no + * new XIDs or command IDs are assigned. */ switch (s->blockState) @@ -4774,6 +4809,7 @@ RollbackAndReleaseCurrentSubTransaction(void) Assert(s->blockState == TBLOCK_SUBINPROGRESS || s->blockState == TBLOCK_INPROGRESS || s->blockState == TBLOCK_IMPLICIT_INPROGRESS || + s->blockState == TBLOCK_PARALLEL_INPROGRESS || s->blockState == TBLOCK_STARTED); } @@ -5037,10 +5073,15 @@ CommitSubTransaction(void) CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, s->parent->subTransactionId); - /* If in parallel mode, clean up workers and exit parallel mode. */ - if (IsInParallelMode()) + /* + * If this subxact has started any unfinished parallel operation, clean up + * its workers and exit parallel mode. Warn about leaked resources. + */ + AtEOSubXact_Parallel(true, s->subTransactionId); + if (s->parallelModeLevel != 0) { - AtEOSubXact_Parallel(true, s->subTransactionId); + elog(WARNING, "parallelModeLevel is %d not 0 at end of subtransaction", + s->parallelModeLevel); s->parallelModeLevel = 0; } @@ -5213,12 +5254,12 @@ AbortSubTransaction(void) * exports are not supported in subtransactions. */ - /* Exit from parallel mode, if necessary. */ - if (IsInParallelMode()) - { - AtEOSubXact_Parallel(false, s->subTransactionId); - s->parallelModeLevel = 0; - } + /* + * If this subxact has started any unfinished parallel operation, clean up + * its workers and exit parallel mode. Don't warn about leaked resources. + */ + AtEOSubXact_Parallel(false, s->subTransactionId); + s->parallelModeLevel = 0; /* * We can skip all this stuff if the subxact failed before creating a @@ -5377,6 +5418,7 @@ PushTransaction(void) s->prevXactReadOnly = XactReadOnly; s->startedInRecovery = p->startedInRecovery; s->parallelModeLevel = 0; + s->parallelChildXact = (p->parallelModeLevel != 0 || p->parallelChildXact); s->topXidLogged = false; CurrentTransactionState = s; diff --git a/src/test/regress/expected/plpgsql.out b/src/test/regress/expected/plpgsql.out index 57c1ae092d..0637256676 100644 --- a/src/test/regress/expected/plpgsql.out +++ b/src/test/regress/expected/plpgsql.out @@ -4032,7 +4032,7 @@ declare v int := 0; begin return 10 / v; end; -$$ language plpgsql; +$$ language plpgsql parallel safe; create or replace function raise_test() returns void as $$ begin raise exception 'custom exception' @@ -4099,8 +4099,37 @@ $$ language plpgsql; select stacked_diagnostics_test(); ERROR: GET STACKED DIAGNOSTICS cannot be used outside an exception handler CONTEXT: PL/pgSQL function stacked_diagnostics_test() line 6 at GET STACKED DIAGNOSTICS -drop function zero_divide(); drop function stacked_diagnostics_test(); +-- Test that an error recovery subtransaction is parallel safe +create function error_trap_test() returns text as $$ +begin + perform zero_divide(); + return 'no error detected!'; +exception when division_by_zero then + return 'division_by_zero detected'; +end; +$$ language plpgsql parallel safe; +set debug_parallel_query to on; +explain (verbose, costs off) select error_trap_test(); + QUERY PLAN +----------------------------------- + Gather + Output: (error_trap_test()) + Workers Planned: 1 + Single Copy: true + -> Result + Output: error_trap_test() +(6 rows) + +select error_trap_test(); + error_trap_test +--------------------------- + division_by_zero detected +(1 row) + +reset debug_parallel_query; +drop function error_trap_test(); +drop function zero_divide(); -- check cases where implicit SQLSTATE variable could be confused with -- SQLSTATE as a keyword, cf bug #5524 create or replace function raise_test() returns void as $$ diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql index 5a9b2f0040..9ca9449a50 100644 --- a/src/test/regress/sql/plpgsql.sql +++ b/src/test/regress/sql/plpgsql.sql @@ -3356,7 +3356,7 @@ declare v int := 0; begin return 10 / v; end; -$$ language plpgsql; +$$ language plpgsql parallel safe; create or replace function raise_test() returns void as $$ begin @@ -3417,9 +3417,29 @@ $$ language plpgsql; select stacked_diagnostics_test(); -drop function zero_divide(); drop function stacked_diagnostics_test(); +-- Test that an error recovery subtransaction is parallel safe + +create function error_trap_test() returns text as $$ +begin + perform zero_divide(); + return 'no error detected!'; +exception when division_by_zero then + return 'division_by_zero detected'; +end; +$$ language plpgsql parallel safe; + +set debug_parallel_query to on; + +explain (verbose, costs off) select error_trap_test(); +select error_trap_test(); + +reset debug_parallel_query; + +drop function error_trap_test(); +drop function zero_divide(); + -- check cases where implicit SQLSTATE variable could be confused with -- SQLSTATE as a keyword, cf bug #5524 create or replace function raise_test() returns void as $$