Allow "internal" subtransactions in parallel mode.

Allow use of BeginInternalSubTransaction() in parallel mode, so long
as the subtransaction doesn't attempt to acquire an XID or increment
the command counter.  Given those restrictions, the other parallel
processes don't need to know about the subtransaction at all, so
this should be safe.  The benefit is that it allows subtransactions
intended for error recovery, such as pl/pgsql exception blocks,
to be used in PARALLEL SAFE functions.

Another reason for doing this is that the API of
BeginInternalSubTransaction() doesn't allow reporting failure.
pl/python for one, and perhaps other PLs, copes very poorly with an
error longjmp out of BeginInternalSubTransaction().  The headline
feature of this patch removes the only easily-triggerable failure
case within that function.  There remain some resource-exhaustion
and similar cases, which we now deal with by promoting them to FATAL
errors, so that callers need not try to clean up.  (It is likely
that such errors would leave us with corrupted transaction state
inside xact.c, making recovery difficult if not impossible anyway.)

Although this work started because of a report of a pl/python crash,
we're not going to do anything about that in the back branches.
Back-patching this particular fix is obviously not very wise.
While we could contemplate some narrower band-aid, pl/python is
already an untrusted language, so it seems okay to classify this
as a "so don't do that" case.

Patch by me, per report from Hao Zhang.  Thanks to Robert Haas for
review.

Discussion: https://postgr.es/m/CALY6Dr-2yLVeVPhNMhuBnRgOZo1UjoTETgtKBx1B2gUi8yy+3g@mail.gmail.com
This commit is contained in:
Tom Lane 2024-03-28 12:43:10 -04:00
parent e2395cdbe8
commit 0075d78947
7 changed files with 180 additions and 87 deletions

View File

@ -545,10 +545,10 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
</para>
<para>
Functions and aggregates must be marked <literal>PARALLEL UNSAFE</literal> if
they write to the database, access sequences, change the transaction state
even temporarily (e.g., a PL/pgSQL function that establishes an
<literal>EXCEPTION</literal> block to catch errors), or make persistent changes to
Functions and aggregates must be marked <literal>PARALLEL UNSAFE</literal>
if they write to the database, change the transaction state (other than by
using a subtransaction for error recovery), access sequences, or make
persistent changes to
settings. Similarly, functions must be marked <literal>PARALLEL
RESTRICTED</literal> if they access temporary tables, client connection state,
cursors, prepared statements, or miscellaneous backend-local state that

View File

@ -428,22 +428,24 @@ CREATE [ OR REPLACE ] FUNCTION
<term><literal>PARALLEL</literal></term>
<listitem>
<para><literal>PARALLEL UNSAFE</literal> indicates that the function
can't be executed in parallel mode and the presence of such a
<para>
<literal>PARALLEL UNSAFE</literal> indicates that the function
can't be executed in parallel mode; the presence of such a
function in an SQL statement forces a serial execution plan. This is
the default. <literal>PARALLEL RESTRICTED</literal> indicates that
the function can be executed in parallel mode, but the execution is
restricted to parallel group leader. <literal>PARALLEL SAFE</literal>
the function can be executed in parallel mode, but only in the parallel
group leader process. <literal>PARALLEL SAFE</literal>
indicates that the function is safe to run in parallel mode without
restriction.
restriction, including in parallel worker processes.
</para>
<para>
Functions should be labeled parallel unsafe if they modify any database
state, or if they make changes to the transaction such as using
sub-transactions, or if they access sequences or attempt to make
persistent changes to settings (e.g., <literal>setval</literal>). They should
be labeled as parallel restricted if they access temporary tables,
state, change the transaction state (other than by using a
subtransaction for error recovery), access sequences (e.g., by
calling <literal>currval</literal>) or make persistent changes to
settings. They should
be labeled parallel restricted if they access temporary tables,
client connection state, cursors, prepared statements, or miscellaneous
backend-local state which the system cannot synchronize in parallel mode
(e.g., <literal>setseed</literal> cannot be executed other than by the group

View File

@ -137,7 +137,7 @@ Transaction Integration
=======================
Regardless of what the TransactionState stack looks like in the parallel
leader, each parallel worker ends up with a stack of depth 1. This stack
leader, each parallel worker begins with a stack of depth 1. This stack
entry is marked with the special transaction block state
TBLOCK_PARALLEL_INPROGRESS so that it's not confused with an ordinary
toplevel transaction. The XID of this TransactionState is set to the XID of
@ -153,18 +153,18 @@ associated with the memory contexts or resource owners of intermediate
subtransactions.
No meaningful change to the transaction state can be made while in parallel
mode. No XIDs can be assigned, and no subtransactions can start or end,
mode. No XIDs can be assigned, and no command counter increments can happen,
because we have no way of communicating these state changes to cooperating
backends, or of synchronizing them. It's clearly unworkable for the initiating
backend to exit any transaction or subtransaction that was in progress when
parallelism was started before all parallel workers have exited; and it's even
more clearly crazy for a parallel worker to try to subcommit or subabort the
current subtransaction and execute in some other transaction context than was
present in the initiating backend. It might be practical to allow internal
sub-transactions (e.g. to implement a PL/pgSQL EXCEPTION block) to be used in
parallel mode, provided that they are XID-less, because other backends
wouldn't really need to know about those transactions or do anything
differently because of them. Right now, we don't even allow that.
present in the initiating backend. However, we allow internal subtransactions
(e.g. those used to implement a PL/pgSQL EXCEPTION block) to be used in
parallel mode, provided that they remain XID-less, because other backends
don't really need to know about those transactions or do anything differently
because of them.
At the end of a parallel operation, which can happen either because it
completed successfully or because it was interrupted by an error, parallel

View File

@ -1226,10 +1226,8 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
/*
* End-of-subtransaction cleanup for parallel contexts.
*
* Currently, it's forbidden to enter or leave a subtransaction while
* parallel mode is in effect, so we could just blow away everything. But
* we may want to relax that restriction in the future, so this code
* contemplates that there may be multiple subtransaction IDs in pcxt_list.
* Here we remove only parallel contexts initiated within the current
* subtransaction.
*/
void
AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
@ -1249,6 +1247,8 @@ AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
/*
* End-of-transaction cleanup for parallel contexts.
*
* We nuke all remaining parallel contexts.
*/
void
AtEOXact_Parallel(bool isCommit)

View File

@ -183,6 +183,10 @@ typedef enum TBlockState
/*
* transaction state structure
*
* Note: parallelModeLevel counts the number of unmatched EnterParallelMode
* calls done at this transaction level. parallelChildXact is true if any
* upper transaction level has nonzero parallelModeLevel.
*/
typedef struct TransactionStateData
{
@ -205,6 +209,7 @@ typedef struct TransactionStateData
bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
int parallelModeLevel; /* Enter/ExitParallelMode counter */
bool parallelChildXact; /* is any parent transaction parallel? */
bool chain; /* start a new block after this one */
bool topXidLogged; /* for a subxact: is top-level XID logged? */
struct TransactionStateData *parent; /* back link to parent */
@ -639,7 +644,9 @@ AssignTransactionId(TransactionState s)
* operation, so we can't account for new XIDs at this point.
*/
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot assign XIDs during a parallel operation");
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot assign XIDs during a parallel operation")));
/*
* Ensure parent(s) have XIDs, so that a child always has an XID later
@ -827,7 +834,11 @@ GetCurrentCommandId(bool used)
* could relax this restriction when currentCommandIdUsed was already
* true at the start of the parallel operation.
*/
Assert(!IsParallelWorker());
if (IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot modify data in a parallel worker")));
currentCommandIdUsed = true;
}
return currentCommandId;
@ -1052,7 +1063,8 @@ ExitParallelMode(void)
TransactionState s = CurrentTransactionState;
Assert(s->parallelModeLevel > 0);
Assert(s->parallelModeLevel > 1 || !ParallelContextActive());
Assert(s->parallelModeLevel > 1 || s->parallelChildXact ||
!ParallelContextActive());
--s->parallelModeLevel;
}
@ -1065,11 +1077,17 @@ ExitParallelMode(void)
* match across all workers. Mere caches usually don't require such a
* restriction. State modified in a strict push/pop fashion, such as the
* active snapshot stack, is often fine.
*
* We say we are in parallel mode if we are in a subxact of a transaction
* that's initiated a parallel operation; for most purposes that context
* has all the same restrictions.
*/
bool
IsInParallelMode(void)
{
return CurrentTransactionState->parallelModeLevel != 0;
TransactionState s = CurrentTransactionState;
return s->parallelModeLevel != 0 || s->parallelChildXact;
}
/*
@ -1092,7 +1110,9 @@ CommandCounterIncrement(void)
* point.
*/
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot start commands during a parallel operation");
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot start commands during a parallel operation")));
currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
@ -2210,9 +2230,26 @@ CommitTransaction(void)
CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
: XACT_EVENT_PRE_COMMIT);
/* If we might have parallel workers, clean them up now. */
if (IsInParallelMode())
AtEOXact_Parallel(true);
/*
* If this xact has started any unfinished parallel operation, clean up
* its workers, warning about leaked resources. (But we don't actually
* reset parallelModeLevel till entering TRANS_COMMIT, a bit below. This
* keeps parallel mode restrictions active as long as possible in a
* parallel worker.)
*/
AtEOXact_Parallel(true);
if (is_parallel_worker)
{
if (s->parallelModeLevel != 1)
elog(WARNING, "parallelModeLevel is %d not 1 at end of parallel worker transaction",
s->parallelModeLevel);
}
else
{
if (s->parallelModeLevel != 0)
elog(WARNING, "parallelModeLevel is %d not 0 at end of transaction",
s->parallelModeLevel);
}
/* Shut down the deferred-trigger manager */
AfterTriggerEndXact(true);
@ -2263,6 +2300,7 @@ CommitTransaction(void)
*/
s->state = TRANS_COMMIT;
s->parallelModeLevel = 0;
s->parallelChildXact = false; /* should be false already */
/* Disable transaction timeout */
if (TransactionTimeout > 0)
@ -2804,12 +2842,13 @@ AbortTransaction(void)
/* Reset snapshot export state. */
SnapBuildResetExportedSnapshotState();
/* If in parallel mode, clean up workers and exit parallel mode. */
if (IsInParallelMode())
{
AtEOXact_Parallel(false);
s->parallelModeLevel = 0;
}
/*
* If this xact has started any unfinished parallel operation, clean up
* its workers and exit parallel mode. Don't warn about leaked resources.
*/
AtEOXact_Parallel(false);
s->parallelModeLevel = 0;
s->parallelChildXact = false; /* should be false already */
/*
* do abort processing
@ -2937,6 +2976,7 @@ CleanupTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
s->parallelModeLevel = 0;
s->parallelChildXact = false;
XactTopFullTransactionId = InvalidFullTransactionId;
nParallelCurrentXids = 0;
@ -4303,7 +4343,7 @@ DefineSavepoint(const char *name)
* is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
* below.)
*/
if (IsInParallelMode())
if (IsInParallelMode() || IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot define savepoints during a parallel operation")));
@ -4390,7 +4430,7 @@ ReleaseSavepoint(const char *name)
* is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
* below.)
*/
if (IsInParallelMode())
if (IsInParallelMode() || IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot release savepoints during a parallel operation")));
@ -4499,7 +4539,7 @@ RollbackToSavepoint(const char *name)
* is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
* below.)
*/
if (IsInParallelMode())
if (IsInParallelMode() || IsParallelWorker())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot rollback to savepoints during a parallel operation")));
@ -4605,38 +4645,40 @@ RollbackToSavepoint(const char *name)
/*
* BeginInternalSubTransaction
* This is the same as DefineSavepoint except it allows TBLOCK_STARTED,
* TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_END, and TBLOCK_PREPARE states,
* and therefore it can safely be used in functions that might be called
* when not inside a BEGIN block or when running deferred triggers at
* COMMIT/PREPARE time. Also, it automatically does
* CommitTransactionCommand/StartTransactionCommand instead of expecting
* the caller to do it.
* TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_PARALLEL_INPROGRESS, TBLOCK_END,
* and TBLOCK_PREPARE states, and therefore it can safely be used in
* functions that might be called when not inside a BEGIN block or when
* running deferred triggers at COMMIT/PREPARE time. Also, it
* automatically does CommitTransactionCommand/StartTransactionCommand
* instead of expecting the caller to do it.
*/
void
BeginInternalSubTransaction(const char *name)
{
TransactionState s = CurrentTransactionState;
bool save_ExitOnAnyError = ExitOnAnyError;
/*
* Workers synchronize transaction state at the beginning of each parallel
* operation, so we can't account for new subtransactions after that
* point. We might be able to make an exception for the type of
* subtransaction established by this function, which is typically used in
* contexts where we're going to release or roll back the subtransaction
* before proceeding further, so that no enduring change to the
* transaction state occurs. For now, however, we prohibit this case along
* with all the others.
* Errors within this function are improbable, but if one does happen we
* force a FATAL exit. Callers generally aren't prepared to handle losing
* control, and moreover our transaction state is probably corrupted if we
* fail partway through; so an ordinary ERROR longjmp isn't okay.
*/
ExitOnAnyError = true;
/*
* We do not check for parallel mode here. It's permissible to start and
* end "internal" subtransactions while in parallel mode, so long as no
* new XIDs or command IDs are assigned. Enforcement of that occurs in
* AssignTransactionId() and CommandCounterIncrement().
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot start subtransactions during a parallel operation")));
switch (s->blockState)
{
case TBLOCK_STARTED:
case TBLOCK_INPROGRESS:
case TBLOCK_IMPLICIT_INPROGRESS:
case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_END:
case TBLOCK_PREPARE:
case TBLOCK_SUBINPROGRESS:
@ -4655,7 +4697,6 @@ BeginInternalSubTransaction(const char *name)
/* These cases are invalid. */
case TBLOCK_DEFAULT:
case TBLOCK_BEGIN:
case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_SUBRELEASE:
case TBLOCK_SUBCOMMIT:
@ -4674,6 +4715,8 @@ BeginInternalSubTransaction(const char *name)
CommitTransactionCommand();
StartTransactionCommand();
ExitOnAnyError = save_ExitOnAnyError;
}
/*
@ -4689,16 +4732,10 @@ ReleaseCurrentSubTransaction(void)
TransactionState s = CurrentTransactionState;
/*
* Workers synchronize transaction state at the beginning of each parallel
* operation, so we can't account for commit of subtransactions after that
* point. This should not happen anyway. Code calling this would
* typically have called BeginInternalSubTransaction() first, failing
* there.
* We do not check for parallel mode here. It's permissible to start and
* end "internal" subtransactions while in parallel mode, so long as no
* new XIDs or command IDs are assigned.
*/
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot commit subtransactions during a parallel operation")));
if (s->blockState != TBLOCK_SUBINPROGRESS)
elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
@ -4723,11 +4760,9 @@ RollbackAndReleaseCurrentSubTransaction(void)
TransactionState s = CurrentTransactionState;
/*
* Unlike ReleaseCurrentSubTransaction(), this is nominally permitted
* during parallel operations. That's because we may be in the leader,
* recovering from an error thrown while we were in parallel mode. We
* won't reach here in a worker, because BeginInternalSubTransaction()
* will have failed.
* We do not check for parallel mode here. It's permissible to start and
* end "internal" subtransactions while in parallel mode, so long as no
* new XIDs or command IDs are assigned.
*/
switch (s->blockState)
@ -4774,6 +4809,7 @@ RollbackAndReleaseCurrentSubTransaction(void)
Assert(s->blockState == TBLOCK_SUBINPROGRESS ||
s->blockState == TBLOCK_INPROGRESS ||
s->blockState == TBLOCK_IMPLICIT_INPROGRESS ||
s->blockState == TBLOCK_PARALLEL_INPROGRESS ||
s->blockState == TBLOCK_STARTED);
}
@ -5037,10 +5073,15 @@ CommitSubTransaction(void)
CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId,
s->parent->subTransactionId);
/* If in parallel mode, clean up workers and exit parallel mode. */
if (IsInParallelMode())
/*
* If this subxact has started any unfinished parallel operation, clean up
* its workers and exit parallel mode. Warn about leaked resources.
*/
AtEOSubXact_Parallel(true, s->subTransactionId);
if (s->parallelModeLevel != 0)
{
AtEOSubXact_Parallel(true, s->subTransactionId);
elog(WARNING, "parallelModeLevel is %d not 0 at end of subtransaction",
s->parallelModeLevel);
s->parallelModeLevel = 0;
}
@ -5213,12 +5254,12 @@ AbortSubTransaction(void)
* exports are not supported in subtransactions.
*/
/* Exit from parallel mode, if necessary. */
if (IsInParallelMode())
{
AtEOSubXact_Parallel(false, s->subTransactionId);
s->parallelModeLevel = 0;
}
/*
* If this subxact has started any unfinished parallel operation, clean up
* its workers and exit parallel mode. Don't warn about leaked resources.
*/
AtEOSubXact_Parallel(false, s->subTransactionId);
s->parallelModeLevel = 0;
/*
* We can skip all this stuff if the subxact failed before creating a
@ -5377,6 +5418,7 @@ PushTransaction(void)
s->prevXactReadOnly = XactReadOnly;
s->startedInRecovery = p->startedInRecovery;
s->parallelModeLevel = 0;
s->parallelChildXact = (p->parallelModeLevel != 0 || p->parallelChildXact);
s->topXidLogged = false;
CurrentTransactionState = s;

View File

@ -4032,7 +4032,7 @@ declare v int := 0;
begin
return 10 / v;
end;
$$ language plpgsql;
$$ language plpgsql parallel safe;
create or replace function raise_test() returns void as $$
begin
raise exception 'custom exception'
@ -4099,8 +4099,37 @@ $$ language plpgsql;
select stacked_diagnostics_test();
ERROR: GET STACKED DIAGNOSTICS cannot be used outside an exception handler
CONTEXT: PL/pgSQL function stacked_diagnostics_test() line 6 at GET STACKED DIAGNOSTICS
drop function zero_divide();
drop function stacked_diagnostics_test();
-- Test that an error recovery subtransaction is parallel safe
create function error_trap_test() returns text as $$
begin
perform zero_divide();
return 'no error detected!';
exception when division_by_zero then
return 'division_by_zero detected';
end;
$$ language plpgsql parallel safe;
set debug_parallel_query to on;
explain (verbose, costs off) select error_trap_test();
QUERY PLAN
-----------------------------------
Gather
Output: (error_trap_test())
Workers Planned: 1
Single Copy: true
-> Result
Output: error_trap_test()
(6 rows)
select error_trap_test();
error_trap_test
---------------------------
division_by_zero detected
(1 row)
reset debug_parallel_query;
drop function error_trap_test();
drop function zero_divide();
-- check cases where implicit SQLSTATE variable could be confused with
-- SQLSTATE as a keyword, cf bug #5524
create or replace function raise_test() returns void as $$

View File

@ -3356,7 +3356,7 @@ declare v int := 0;
begin
return 10 / v;
end;
$$ language plpgsql;
$$ language plpgsql parallel safe;
create or replace function raise_test() returns void as $$
begin
@ -3417,9 +3417,29 @@ $$ language plpgsql;
select stacked_diagnostics_test();
drop function zero_divide();
drop function stacked_diagnostics_test();
-- Test that an error recovery subtransaction is parallel safe
create function error_trap_test() returns text as $$
begin
perform zero_divide();
return 'no error detected!';
exception when division_by_zero then
return 'division_by_zero detected';
end;
$$ language plpgsql parallel safe;
set debug_parallel_query to on;
explain (verbose, costs off) select error_trap_test();
select error_trap_test();
reset debug_parallel_query;
drop function error_trap_test();
drop function zero_divide();
-- check cases where implicit SQLSTATE variable could be confused with
-- SQLSTATE as a keyword, cf bug #5524
create or replace function raise_test() returns void as $$