PL/pgSQL: Nested CALL with transactions

So far, a nested CALL or DO in PL/pgSQL would not establish a context
where transaction control statements were allowed.  This fixes that by
handling CALL and DO specially in PL/pgSQL, passing the atomic/nonatomic
execution context through and doing the required management around
transaction boundaries.

Reviewed-by: Tomas Vondra <tomas.vondra@2ndquadrant.com>
This commit is contained in:
Peter Eisentraut 2018-03-24 10:05:06 -04:00
parent c2d4eb1b1f
commit d92bc83c48
13 changed files with 235 additions and 59 deletions

View File

@ -3463,9 +3463,9 @@ END LOOP <optional> <replaceable>label</replaceable> </optional>;
<title>Transaction Management</title> <title>Transaction Management</title>
<para> <para>
In procedures invoked by the <command>CALL</command> command from the top In procedures invoked by the <command>CALL</command> command
level as well as in anonymous code blocks (<command>DO</command> command) as well as in anonymous code blocks (<command>DO</command> command),
called from the top level, it is possible to end transactions using the it is possible to end transactions using the
commands <command>COMMIT</command> and <command>ROLLBACK</command>. A new commands <command>COMMIT</command> and <command>ROLLBACK</command>. A new
transaction is started automatically after a transaction is ended using transaction is started automatically after a transaction is ended using
these commands, so there is no separate <command>START these commands, so there is no separate <command>START
@ -3495,6 +3495,20 @@ CALL transaction_test1();
</programlisting> </programlisting>
</para> </para>
<para>
Transaction control is only possible in <command>CALL</command> or
<command>DO</command> invocations from the top level or nested
<command>CALL</command> or <command>DO</command> invocations without any
other intervening command. For example, if the call stack is
<command>CALL proc1()</command> &rarr; <command>CALL proc2()</command>
&rarr; <command>CALL proc3()</command>, then the second and third
procedures can perform transaction control actions. But if the call stack
is <command>CALL proc1()</command> &rarr; <command>SELECT
func2()</command> &rarr; <command>CALL proc3()</command>, then the last
procedure cannot do transaction control, because of the
<command>SELECT</command> in between.
</para>
<para> <para>
A transaction cannot be ended inside a loop over a query result, nor A transaction cannot be ended inside a loop over a query result, nor
inside a block with exception handlers. inside a block with exception handlers.

View File

@ -2041,8 +2041,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
* *
* In the first two cases, we can just push the snap onto the stack once * In the first two cases, we can just push the snap onto the stack once
* for the whole plan list. * for the whole plan list.
*
* But if the plan has no_snapshots set to true, then don't manage
* snapshots at all. The caller should then take care of that.
*/ */
if (snapshot != InvalidSnapshot) if (snapshot != InvalidSnapshot && !plan->no_snapshots)
{ {
if (read_only) if (read_only)
{ {
@ -2121,7 +2124,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
* In the default non-read-only case, get a new snapshot, replacing * In the default non-read-only case, get a new snapshot, replacing
* any that we pushed in a previous cycle. * any that we pushed in a previous cycle.
*/ */
if (snapshot == InvalidSnapshot && !read_only) if (snapshot == InvalidSnapshot && !read_only && !plan->no_snapshots)
{ {
if (pushed_active_snap) if (pushed_active_snap)
PopActiveSnapshot(); PopActiveSnapshot();
@ -2172,7 +2175,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
* If not read-only mode, advance the command counter before each * If not read-only mode, advance the command counter before each
* command and update the snapshot. * command and update the snapshot.
*/ */
if (!read_only) if (!read_only && !plan->no_snapshots)
{ {
CommandCounterIncrement(); CommandCounterIncrement();
UpdateActiveSnapshotCommandId(); UpdateActiveSnapshotCommandId();
@ -2203,10 +2206,23 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
else else
{ {
char completionTag[COMPLETION_TAG_BUFSIZE]; char completionTag[COMPLETION_TAG_BUFSIZE];
ProcessUtilityContext context;
/*
* If the SPI context is atomic, or we are asked to manage
* snapshots, then we are in an atomic execution context.
* Conversely, to propagate a nonatomic execution context, the
* caller must be in a nonatomic SPI context and manage
* snapshots itself.
*/
if (_SPI_current->atomic || !plan->no_snapshots)
context = PROCESS_UTILITY_QUERY;
else
context = PROCESS_UTILITY_QUERY_NONATOMIC;
ProcessUtility(stmt, ProcessUtility(stmt,
plansource->query_string, plansource->query_string,
PROCESS_UTILITY_QUERY, context,
paramLI, paramLI,
_SPI_current->queryEnv, _SPI_current->queryEnv,
dest, dest,
@ -2638,11 +2654,8 @@ _SPI_make_plan_non_temp(SPIPlanPtr plan)
oldcxt = MemoryContextSwitchTo(plancxt); oldcxt = MemoryContextSwitchTo(plancxt);
/* Copy the SPI_plan struct and subsidiary data into the new context */ /* Copy the SPI_plan struct and subsidiary data into the new context */
newplan = (SPIPlanPtr) palloc(sizeof(_SPI_plan)); newplan = (SPIPlanPtr) palloc0(sizeof(_SPI_plan));
newplan->magic = _SPI_PLAN_MAGIC; newplan->magic = _SPI_PLAN_MAGIC;
newplan->saved = false;
newplan->oneshot = false;
newplan->plancache_list = NIL;
newplan->plancxt = plancxt; newplan->plancxt = plancxt;
newplan->cursor_options = plan->cursor_options; newplan->cursor_options = plan->cursor_options;
newplan->nargs = plan->nargs; newplan->nargs = plan->nargs;
@ -2705,11 +2718,8 @@ _SPI_save_plan(SPIPlanPtr plan)
oldcxt = MemoryContextSwitchTo(plancxt); oldcxt = MemoryContextSwitchTo(plancxt);
/* Copy the SPI plan into its own context */ /* Copy the SPI plan into its own context */
newplan = (SPIPlanPtr) palloc(sizeof(_SPI_plan)); newplan = (SPIPlanPtr) palloc0(sizeof(_SPI_plan));
newplan->magic = _SPI_PLAN_MAGIC; newplan->magic = _SPI_PLAN_MAGIC;
newplan->saved = false;
newplan->oneshot = false;
newplan->plancache_list = NIL;
newplan->plancxt = plancxt; newplan->plancxt = plancxt;
newplan->cursor_options = plan->cursor_options; newplan->cursor_options = plan->cursor_options;
newplan->nargs = plan->nargs; newplan->nargs = plan->nargs;

View File

@ -382,7 +382,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
{ {
Node *parsetree = pstmt->utilityStmt; Node *parsetree = pstmt->utilityStmt;
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
bool isAtomicContext = (context != PROCESS_UTILITY_TOPLEVEL || IsTransactionBlock()); bool isAtomicContext = (!(context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY_NONATOMIC) || IsTransactionBlock());
ParseState *pstate; ParseState *pstate;
check_xact_readonly(parsetree); check_xact_readonly(parsetree);

View File

@ -86,6 +86,7 @@ typedef struct _SPI_plan
int magic; /* should equal _SPI_PLAN_MAGIC */ int magic; /* should equal _SPI_PLAN_MAGIC */
bool saved; /* saved or unsaved plan? */ bool saved; /* saved or unsaved plan? */
bool oneshot; /* one-shot plan? */ bool oneshot; /* one-shot plan? */
bool no_snapshots; /* let the caller handle the snapshots */
List *plancache_list; /* one CachedPlanSource per parsetree */ List *plancache_list; /* one CachedPlanSource per parsetree */
MemoryContext plancxt; /* Context containing _SPI_plan and data */ MemoryContext plancxt; /* Context containing _SPI_plan and data */
int cursor_options; /* Cursor options used for planning */ int cursor_options; /* Cursor options used for planning */

View File

@ -20,6 +20,7 @@ typedef enum
{ {
PROCESS_UTILITY_TOPLEVEL, /* toplevel interactive command */ PROCESS_UTILITY_TOPLEVEL, /* toplevel interactive command */
PROCESS_UTILITY_QUERY, /* a complete query, but not toplevel */ PROCESS_UTILITY_QUERY, /* a complete query, but not toplevel */
PROCESS_UTILITY_QUERY_NONATOMIC, /* a complete query, nonatomic execution context */
PROCESS_UTILITY_SUBCOMMAND /* a portion of a query */ PROCESS_UTILITY_SUBCOMMAND /* a portion of a query */
} ProcessUtilityContext; } ProcessUtilityContext;

View File

@ -1,10 +1,10 @@
CREATE TABLE test1 (a int, b text); CREATE TABLE test1 (a int, b text);
CREATE PROCEDURE transaction_test1() CREATE PROCEDURE transaction_test1(x int, y text)
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
FOR i IN 0..9 LOOP FOR i IN 0..x LOOP
INSERT INTO test1 (a) VALUES (i); INSERT INTO test1 (a, b) VALUES (i, y);
IF i % 2 = 0 THEN IF i % 2 = 0 THEN
COMMIT; COMMIT;
ELSE ELSE
@ -13,15 +13,15 @@ BEGIN
END LOOP; END LOOP;
END END
$$; $$;
CALL transaction_test1(); CALL transaction_test1(9, 'foo');
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
---+--- ---+-----
0 | 0 | foo
2 | 2 | foo
4 | 4 | foo
6 | 6 | foo
8 | 8 | foo
(5 rows) (5 rows)
TRUNCATE test1; TRUNCATE test1;
@ -51,9 +51,9 @@ SELECT * FROM test1;
-- transaction commands not allowed when called in transaction block -- transaction commands not allowed when called in transaction block
START TRANSACTION; START TRANSACTION;
CALL transaction_test1(); CALL transaction_test1(9, 'error');
ERROR: invalid transaction termination ERROR: invalid transaction termination
CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT CONTEXT: PL/pgSQL function transaction_test1(integer,text) line 6 at COMMIT
COMMIT; COMMIT;
START TRANSACTION; START TRANSACTION;
DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$;
@ -90,14 +90,14 @@ CREATE FUNCTION transaction_test3() RETURNS int
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
CALL transaction_test1(); CALL transaction_test1(9, 'error');
RETURN 1; RETURN 1;
END; END;
$$; $$;
SELECT transaction_test3(); SELECT transaction_test3();
ERROR: invalid transaction termination ERROR: invalid transaction termination
CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT CONTEXT: PL/pgSQL function transaction_test1(integer,text) line 6 at COMMIT
SQL statement "CALL transaction_test1()" SQL statement "CALL transaction_test1(9, 'error')"
PL/pgSQL function transaction_test3() line 3 at CALL PL/pgSQL function transaction_test3() line 3 at CALL
SELECT * FROM test1; SELECT * FROM test1;
a | b a | b
@ -130,6 +130,57 @@ $$;
CALL transaction_test5(); CALL transaction_test5();
ERROR: invalid transaction termination ERROR: invalid transaction termination
CONTEXT: PL/pgSQL function transaction_test5() line 3 at COMMIT CONTEXT: PL/pgSQL function transaction_test5() line 3 at COMMIT
TRUNCATE test1;
-- nested procedure calls
CREATE PROCEDURE transaction_test6(c text)
LANGUAGE plpgsql
AS $$
BEGIN
CALL transaction_test1(9, c);
END;
$$;
CALL transaction_test6('bar');
SELECT * FROM test1;
a | b
---+-----
0 | bar
2 | bar
4 | bar
6 | bar
8 | bar
(5 rows)
TRUNCATE test1;
CREATE PROCEDURE transaction_test7()
LANGUAGE plpgsql
AS $$
BEGIN
DO 'BEGIN CALL transaction_test1(9, $x$baz$x$); END;';
END;
$$;
CALL transaction_test7();
SELECT * FROM test1;
a | b
---+-----
0 | baz
2 | baz
4 | baz
6 | baz
8 | baz
(5 rows)
CREATE PROCEDURE transaction_test8()
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE 'CALL transaction_test1(10, $x$baz$x$)';
END;
$$;
CALL transaction_test8();
ERROR: invalid transaction termination
CONTEXT: PL/pgSQL function transaction_test1(integer,text) line 6 at COMMIT
SQL statement "CALL transaction_test1(10, $x$baz$x$)"
PL/pgSQL function transaction_test8() line 3 at EXECUTE
-- commit inside cursor loop -- commit inside cursor loop
CREATE TABLE test2 (x int); CREATE TABLE test2 (x int);
INSERT INTO test2 VALUES (0), (1), (2), (3), (4); INSERT INTO test2 VALUES (0), (1), (2), (3), (4);

View File

@ -22,6 +22,7 @@
#include "access/tupconvert.h" #include "access/tupconvert.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "executor/execExpr.h" #include "executor/execExpr.h"
#include "executor/spi.h" #include "executor/spi.h"
#include "executor/spi_priv.h" #include "executor/spi_priv.h"
@ -33,6 +34,7 @@
#include "parser/scansup.h" #include "parser/scansup.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "utils/array.h" #include "utils/array.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/datum.h" #include "utils/datum.h"
@ -311,7 +313,8 @@ static void plpgsql_estate_setup(PLpgSQL_execstate *estate,
static void exec_eval_cleanup(PLpgSQL_execstate *estate); static void exec_eval_cleanup(PLpgSQL_execstate *estate);
static void exec_prepare_plan(PLpgSQL_execstate *estate, static void exec_prepare_plan(PLpgSQL_execstate *estate,
PLpgSQL_expr *expr, int cursorOptions); PLpgSQL_expr *expr, int cursorOptions,
bool keepplan);
static void exec_simple_check_plan(PLpgSQL_execstate *estate, PLpgSQL_expr *expr); static void exec_simple_check_plan(PLpgSQL_execstate *estate, PLpgSQL_expr *expr);
static void exec_save_simple_expr(PLpgSQL_expr *expr, CachedPlan *cplan); static void exec_save_simple_expr(PLpgSQL_expr *expr, CachedPlan *cplan);
static void exec_check_rw_parameter(PLpgSQL_expr *expr, int target_dno); static void exec_check_rw_parameter(PLpgSQL_expr *expr, int target_dno);
@ -440,7 +443,7 @@ static char *format_preparedparamsdata(PLpgSQL_execstate *estate,
*/ */
Datum Datum
plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo, plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
EState *simple_eval_estate) EState *simple_eval_estate, bool atomic)
{ {
PLpgSQL_execstate estate; PLpgSQL_execstate estate;
ErrorContextCallback plerrcontext; ErrorContextCallback plerrcontext;
@ -452,6 +455,7 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
*/ */
plpgsql_estate_setup(&estate, func, (ReturnSetInfo *) fcinfo->resultinfo, plpgsql_estate_setup(&estate, func, (ReturnSetInfo *) fcinfo->resultinfo,
simple_eval_estate); simple_eval_estate);
estate.atomic = atomic;
/* /*
* Setup error traceback support for ereport() * Setup error traceback support for ereport()
@ -2057,20 +2061,48 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
{ {
PLpgSQL_expr *expr = stmt->expr; PLpgSQL_expr *expr = stmt->expr;
ParamListInfo paramLI; ParamListInfo paramLI;
LocalTransactionId before_lxid;
LocalTransactionId after_lxid;
int rc; int rc;
if (expr->plan == NULL) if (expr->plan == NULL)
exec_prepare_plan(estate, expr, 0); {
/*
* Don't save the plan if not in atomic context. Otherwise,
* transaction ends would cause warnings about plan leaks.
*/
exec_prepare_plan(estate, expr, 0, estate->atomic);
/*
* The procedure call could end transactions, which would upset the
* snapshot management in SPI_execute*, so don't let it do it.
*/
expr->plan->no_snapshots = true;
}
paramLI = setup_param_list(estate, expr); paramLI = setup_param_list(estate, expr);
before_lxid = MyProc->lxid;
rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI, rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI,
estate->readonly_func, 0); estate->readonly_func, 0);
after_lxid = MyProc->lxid;
if (rc < 0) if (rc < 0)
elog(ERROR, "SPI_execute_plan_with_paramlist failed executing query \"%s\": %s", elog(ERROR, "SPI_execute_plan_with_paramlist failed executing query \"%s\": %s",
expr->query, SPI_result_code_string(rc)); expr->query, SPI_result_code_string(rc));
/*
* If we are in a new transaction after the call, we need to reset some
* internal state.
*/
if (before_lxid != after_lxid)
{
estate->simple_eval_estate = NULL;
plpgsql_create_econtext(estate);
}
if (SPI_processed == 1) if (SPI_processed == 1)
{ {
SPITupleTable *tuptab = SPI_tuptable; SPITupleTable *tuptab = SPI_tuptable;
@ -2705,7 +2737,7 @@ exec_stmt_forc(PLpgSQL_execstate *estate, PLpgSQL_stmt_forc *stmt)
Assert(query); Assert(query);
if (query->plan == NULL) if (query->plan == NULL)
exec_prepare_plan(estate, query, curvar->cursor_options); exec_prepare_plan(estate, query, curvar->cursor_options, true);
/* /*
* Set up ParamListInfo for this query * Set up ParamListInfo for this query
@ -3719,6 +3751,7 @@ plpgsql_estate_setup(PLpgSQL_execstate *estate,
estate->retisset = func->fn_retset; estate->retisset = func->fn_retset;
estate->readonly_func = func->fn_readonly; estate->readonly_func = func->fn_readonly;
estate->atomic = true;
estate->exitlabel = NULL; estate->exitlabel = NULL;
estate->cur_error = NULL; estate->cur_error = NULL;
@ -3863,7 +3896,8 @@ exec_eval_cleanup(PLpgSQL_execstate *estate)
*/ */
static void static void
exec_prepare_plan(PLpgSQL_execstate *estate, exec_prepare_plan(PLpgSQL_execstate *estate,
PLpgSQL_expr *expr, int cursorOptions) PLpgSQL_expr *expr, int cursorOptions,
bool keepplan)
{ {
SPIPlanPtr plan; SPIPlanPtr plan;
@ -3899,7 +3933,8 @@ exec_prepare_plan(PLpgSQL_execstate *estate,
expr->query, SPI_result_code_string(SPI_result)); expr->query, SPI_result_code_string(SPI_result));
} }
} }
SPI_keepplan(plan); if (keepplan)
SPI_keepplan(plan);
expr->plan = plan; expr->plan = plan;
/* Check to see if it's a simple expression */ /* Check to see if it's a simple expression */
@ -3938,7 +3973,7 @@ exec_stmt_execsql(PLpgSQL_execstate *estate,
{ {
ListCell *l; ListCell *l;
exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK); exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
stmt->mod_stmt = false; stmt->mod_stmt = false;
foreach(l, SPI_plan_get_plan_sources(expr->plan)) foreach(l, SPI_plan_get_plan_sources(expr->plan))
{ {
@ -4396,7 +4431,7 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt)
*/ */
query = stmt->query; query = stmt->query;
if (query->plan == NULL) if (query->plan == NULL)
exec_prepare_plan(estate, query, stmt->cursor_options); exec_prepare_plan(estate, query, stmt->cursor_options, true);
} }
else if (stmt->dynquery != NULL) else if (stmt->dynquery != NULL)
{ {
@ -4467,7 +4502,7 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt)
query = curvar->cursor_explicit_expr; query = curvar->cursor_explicit_expr;
if (query->plan == NULL) if (query->plan == NULL)
exec_prepare_plan(estate, query, curvar->cursor_options); exec_prepare_plan(estate, query, curvar->cursor_options, true);
} }
/* /*
@ -4707,7 +4742,7 @@ exec_assign_expr(PLpgSQL_execstate *estate, PLpgSQL_datum *target,
*/ */
if (expr->plan == NULL) if (expr->plan == NULL)
{ {
exec_prepare_plan(estate, expr, 0); exec_prepare_plan(estate, expr, 0, true);
if (target->dtype == PLPGSQL_DTYPE_VAR) if (target->dtype == PLPGSQL_DTYPE_VAR)
exec_check_rw_parameter(expr, target->dno); exec_check_rw_parameter(expr, target->dno);
} }
@ -5566,7 +5601,7 @@ exec_eval_expr(PLpgSQL_execstate *estate,
* If first time through, create a plan for this expression. * If first time through, create a plan for this expression.
*/ */
if (expr->plan == NULL) if (expr->plan == NULL)
exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK); exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
/* /*
* If this is a simple expression, bypass SPI and use the executor * If this is a simple expression, bypass SPI and use the executor
@ -5652,7 +5687,7 @@ exec_run_select(PLpgSQL_execstate *estate,
*/ */
if (expr->plan == NULL) if (expr->plan == NULL)
exec_prepare_plan(estate, expr, exec_prepare_plan(estate, expr,
portalP == NULL ? CURSOR_OPT_PARALLEL_OK : 0); portalP == NULL ? CURSOR_OPT_PARALLEL_OK : 0, true);
/* /*
* Set up ParamListInfo to pass to executor * Set up ParamListInfo to pass to executor
@ -7834,11 +7869,13 @@ plpgsql_create_econtext(PLpgSQL_execstate *estate)
{ {
MemoryContext oldcontext; MemoryContext oldcontext;
Assert(shared_simple_eval_estate == NULL); if (shared_simple_eval_estate == NULL)
oldcontext = MemoryContextSwitchTo(TopTransactionContext); {
shared_simple_eval_estate = CreateExecutorState(); oldcontext = MemoryContextSwitchTo(TopTransactionContext);
shared_simple_eval_estate = CreateExecutorState();
MemoryContextSwitchTo(oldcontext);
}
estate->simple_eval_estate = shared_simple_eval_estate; estate->simple_eval_estate = shared_simple_eval_estate;
MemoryContextSwitchTo(oldcontext);
} }
/* /*

View File

@ -285,7 +285,7 @@ plpgsql_stmt_typename(PLpgSQL_stmt *stmt)
case PLPGSQL_STMT_PERFORM: case PLPGSQL_STMT_PERFORM:
return "PERFORM"; return "PERFORM";
case PLPGSQL_STMT_CALL: case PLPGSQL_STMT_CALL:
return "CALL"; return ((PLpgSQL_stmt_call *) stmt)->is_call ? "CALL" : "DO";
case PLPGSQL_STMT_COMMIT: case PLPGSQL_STMT_COMMIT:
return "COMMIT"; return "COMMIT";
case PLPGSQL_STMT_ROLLBACK: case PLPGSQL_STMT_ROLLBACK:
@ -1295,7 +1295,7 @@ static void
dump_call(PLpgSQL_stmt_call *stmt) dump_call(PLpgSQL_stmt_call *stmt)
{ {
dump_ind(); dump_ind();
printf("CALL expr = "); printf("%s expr = ", stmt->is_call ? "CALL" : "DO");
dump_expr(stmt->expr); dump_expr(stmt->expr);
printf("\n"); printf("\n");
} }

View File

@ -276,6 +276,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt);
%token <keyword> K_DEFAULT %token <keyword> K_DEFAULT
%token <keyword> K_DETAIL %token <keyword> K_DETAIL
%token <keyword> K_DIAGNOSTICS %token <keyword> K_DIAGNOSTICS
%token <keyword> K_DO
%token <keyword> K_DUMP %token <keyword> K_DUMP
%token <keyword> K_ELSE %token <keyword> K_ELSE
%token <keyword> K_ELSIF %token <keyword> K_ELSIF
@ -914,8 +915,24 @@ stmt_call : K_CALL
new->cmd_type = PLPGSQL_STMT_CALL; new->cmd_type = PLPGSQL_STMT_CALL;
new->lineno = plpgsql_location_to_lineno(@1); new->lineno = plpgsql_location_to_lineno(@1);
new->expr = read_sql_stmt("CALL "); new->expr = read_sql_stmt("CALL ");
new->is_call = true;
$$ = (PLpgSQL_stmt *)new; $$ = (PLpgSQL_stmt *)new;
}
| K_DO
{
/* use the same structures as for CALL, for simplicity */
PLpgSQL_stmt_call *new;
new = palloc0(sizeof(PLpgSQL_stmt_call));
new->cmd_type = PLPGSQL_STMT_CALL;
new->lineno = plpgsql_location_to_lineno(@1);
new->expr = read_sql_stmt("DO ");
new->is_call = false;
$$ = (PLpgSQL_stmt *)new;
} }
; ;
@ -2434,6 +2451,7 @@ unreserved_keyword :
| K_DEFAULT | K_DEFAULT
| K_DETAIL | K_DETAIL
| K_DIAGNOSTICS | K_DIAGNOSTICS
| K_DO
| K_DUMP | K_DUMP
| K_ELSIF | K_ELSIF
| K_ERRCODE | K_ERRCODE

View File

@ -260,7 +260,7 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
retval = (Datum) 0; retval = (Datum) 0;
} }
else else
retval = plpgsql_exec_function(func, fcinfo, NULL); retval = plpgsql_exec_function(func, fcinfo, NULL, !nonatomic);
} }
PG_CATCH(); PG_CATCH();
{ {
@ -332,7 +332,7 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
/* And run the function */ /* And run the function */
PG_TRY(); PG_TRY();
{ {
retval = plpgsql_exec_function(func, &fake_fcinfo, simple_eval_estate); retval = plpgsql_exec_function(func, &fake_fcinfo, simple_eval_estate, codeblock->atomic);
} }
PG_CATCH(); PG_CATCH();
{ {

View File

@ -119,6 +119,7 @@ static const ScanKeyword unreserved_keywords[] = {
PG_KEYWORD("default", K_DEFAULT, UNRESERVED_KEYWORD) PG_KEYWORD("default", K_DEFAULT, UNRESERVED_KEYWORD)
PG_KEYWORD("detail", K_DETAIL, UNRESERVED_KEYWORD) PG_KEYWORD("detail", K_DETAIL, UNRESERVED_KEYWORD)
PG_KEYWORD("diagnostics", K_DIAGNOSTICS, UNRESERVED_KEYWORD) PG_KEYWORD("diagnostics", K_DIAGNOSTICS, UNRESERVED_KEYWORD)
PG_KEYWORD("do", K_DO, UNRESERVED_KEYWORD)
PG_KEYWORD("dump", K_DUMP, UNRESERVED_KEYWORD) PG_KEYWORD("dump", K_DUMP, UNRESERVED_KEYWORD)
PG_KEYWORD("elseif", K_ELSIF, UNRESERVED_KEYWORD) PG_KEYWORD("elseif", K_ELSIF, UNRESERVED_KEYWORD)
PG_KEYWORD("elsif", K_ELSIF, UNRESERVED_KEYWORD) PG_KEYWORD("elsif", K_ELSIF, UNRESERVED_KEYWORD)

View File

@ -517,6 +517,7 @@ typedef struct PLpgSQL_stmt_call
PLpgSQL_stmt_type cmd_type; PLpgSQL_stmt_type cmd_type;
int lineno; int lineno;
PLpgSQL_expr *expr; PLpgSQL_expr *expr;
bool is_call;
PLpgSQL_variable *target; PLpgSQL_variable *target;
} PLpgSQL_stmt_call; } PLpgSQL_stmt_call;
@ -979,6 +980,7 @@ typedef struct PLpgSQL_execstate
bool retisset; bool retisset;
bool readonly_func; bool readonly_func;
bool atomic;
char *exitlabel; /* the "target" label of the current EXIT or char *exitlabel; /* the "target" label of the current EXIT or
* CONTINUE stmt, if any */ * CONTINUE stmt, if any */
@ -1194,7 +1196,8 @@ extern void _PG_init(void);
*/ */
extern Datum plpgsql_exec_function(PLpgSQL_function *func, extern Datum plpgsql_exec_function(PLpgSQL_function *func,
FunctionCallInfo fcinfo, FunctionCallInfo fcinfo,
EState *simple_eval_estate); EState *simple_eval_estate,
bool atomic);
extern HeapTuple plpgsql_exec_trigger(PLpgSQL_function *func, extern HeapTuple plpgsql_exec_trigger(PLpgSQL_function *func,
TriggerData *trigdata); TriggerData *trigdata);
extern void plpgsql_exec_event_trigger(PLpgSQL_function *func, extern void plpgsql_exec_event_trigger(PLpgSQL_function *func,

View File

@ -1,12 +1,12 @@
CREATE TABLE test1 (a int, b text); CREATE TABLE test1 (a int, b text);
CREATE PROCEDURE transaction_test1() CREATE PROCEDURE transaction_test1(x int, y text)
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
FOR i IN 0..9 LOOP FOR i IN 0..x LOOP
INSERT INTO test1 (a) VALUES (i); INSERT INTO test1 (a, b) VALUES (i, y);
IF i % 2 = 0 THEN IF i % 2 = 0 THEN
COMMIT; COMMIT;
ELSE ELSE
@ -16,7 +16,7 @@ BEGIN
END END
$$; $$;
CALL transaction_test1(); CALL transaction_test1(9, 'foo');
SELECT * FROM test1; SELECT * FROM test1;
@ -43,7 +43,7 @@ SELECT * FROM test1;
-- transaction commands not allowed when called in transaction block -- transaction commands not allowed when called in transaction block
START TRANSACTION; START TRANSACTION;
CALL transaction_test1(); CALL transaction_test1(9, 'error');
COMMIT; COMMIT;
START TRANSACTION; START TRANSACTION;
@ -80,7 +80,7 @@ CREATE FUNCTION transaction_test3() RETURNS int
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
CALL transaction_test1(); CALL transaction_test1(9, 'error');
RETURN 1; RETURN 1;
END; END;
$$; $$;
@ -116,6 +116,46 @@ $$;
CALL transaction_test5(); CALL transaction_test5();
TRUNCATE test1;
-- nested procedure calls
CREATE PROCEDURE transaction_test6(c text)
LANGUAGE plpgsql
AS $$
BEGIN
CALL transaction_test1(9, c);
END;
$$;
CALL transaction_test6('bar');
SELECT * FROM test1;
TRUNCATE test1;
CREATE PROCEDURE transaction_test7()
LANGUAGE plpgsql
AS $$
BEGIN
DO 'BEGIN CALL transaction_test1(9, $x$baz$x$); END;';
END;
$$;
CALL transaction_test7();
SELECT * FROM test1;
CREATE PROCEDURE transaction_test8()
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE 'CALL transaction_test1(10, $x$baz$x$)';
END;
$$;
CALL transaction_test8();
-- commit inside cursor loop -- commit inside cursor loop
CREATE TABLE test2 (x int); CREATE TABLE test2 (x int);
INSERT INTO test2 VALUES (0), (1), (2), (3), (4); INSERT INTO test2 VALUES (0), (1), (2), (3), (4);