diff --git a/doc/src/sgml/plperl.sgml b/doc/src/sgml/plperl.sgml index 100162dead..cff7a847de 100644 --- a/doc/src/sgml/plperl.sgml +++ b/doc/src/sgml/plperl.sgml @@ -661,6 +661,60 @@ SELECT release_hosts_query(); + + + + spi_commit() + + spi_commit + in PL/Perl + + + + spi_rollback() + + spi_rollback + in PL/Perl + + + + + Commit or roll back the current transaction. This can only be called + in a procedure or anonymous code block (DO command) + called from the top level. (Note that it is not possible to run the + SQL commands COMMIT or ROLLBACK + via spi_exec_query or similar. It has to be done + using these functions.) After a transaction is ended, a new + transaction is automatically started, so there is no separate function + for that. + + + + Here is an example: + +CREATE PROCEDURE transaction_test1() +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; + +CALL transaction_test1(); + + + + + Transactions cannot be ended when a cursor created by + spi_query is open. + + + diff --git a/doc/src/sgml/plpgsql.sgml b/doc/src/sgml/plpgsql.sgml index ddd054c6cc..90a3c00dfe 100644 --- a/doc/src/sgml/plpgsql.sgml +++ b/doc/src/sgml/plpgsql.sgml @@ -3449,6 +3449,48 @@ END LOOP label ; + + Transaction Management + + + In procedures invoked by the CALL command from the top + level as well as in anonymous code blocks (DO command) + called from the top level, it is possible to end transactions using the + commands COMMIT and ROLLBACK. A new + transaction is started automatically after a transaction is ended using + these commands, so there is no separate START + TRANSACTION command. (Note that BEGIN and + END have different meanings in PL/pgSQL.) + + + + Here is a simple example: + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; + +CALL transaction_test1(); + + + + + A transaction cannot be ended inside a loop over a query result, nor + inside a block with exception handlers. + + + Errors and Messages @@ -5432,14 +5474,13 @@ SELECT * FROM cs_parse_url('http://foobar.com/query.cgi?baz'); CREATE OR REPLACE PROCEDURE cs_create_job(v_job_id IN INTEGER) IS a_running_job_count INTEGER; - PRAGMA AUTONOMOUS_TRANSACTION; -- BEGIN - LOCK TABLE cs_jobs IN EXCLUSIVE MODE; -- + LOCK TABLE cs_jobs IN EXCLUSIVE MODE; SELECT count(*) INTO a_running_job_count FROM cs_jobs WHERE end_stamp IS NULL; IF a_running_job_count > 0 THEN - COMMIT; -- free lock + COMMIT; -- free lock raise_application_error(-20000, 'Unable to create a new job: a job is currently running.'); END IF; @@ -5459,45 +5500,11 @@ show errors - - Procedures like this can easily be converted into PostgreSQL - functions returning void. This procedure in - particular is interesting because it can teach us some things: - - - - - There is no PRAGMA statement in PostgreSQL. - - - - - - If you do a LOCK TABLE in PL/pgSQL, - the lock will not be released until the calling transaction is - finished. - - - - - - You cannot issue COMMIT in a - PL/pgSQL function. The function is - running within some outer transaction and so COMMIT - would imply terminating the function's execution. However, in - this particular case it is not necessary anyway, because the lock - obtained by the LOCK TABLE will be released when - we raise an error. - - - - - This is how we could port this procedure to PL/pgSQL: -CREATE OR REPLACE FUNCTION cs_create_job(v_job_id integer) RETURNS void AS $$ +CREATE OR REPLACE PROCEDURE cs_create_job(v_job_id integer) AS $$ DECLARE a_running_job_count integer; BEGIN @@ -5506,6 +5513,7 @@ BEGIN SELECT count(*) INTO a_running_job_count FROM cs_jobs WHERE end_stamp IS NULL; IF a_running_job_count > 0 THEN + COMMIT; -- free lock RAISE EXCEPTION 'Unable to create a new job: a job is currently running'; -- END IF; @@ -5518,6 +5526,7 @@ BEGIN WHEN unique_violation THEN -- -- don't worry if it already exists END; + COMMIT; END; $$ LANGUAGE plpgsql; @@ -5541,12 +5550,6 @@ $$ LANGUAGE plpgsql; - - The main functional difference between this procedure and the - Oracle equivalent is that the exclusive lock on the cs_jobs - table will be held until the calling transaction completes. Also, if - the caller later aborts (for example due to an error), the effects of - this procedure will be rolled back. diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml index 0dbeee1fa2..ba79beb743 100644 --- a/doc/src/sgml/plpython.sgml +++ b/doc/src/sgml/plpython.sgml @@ -1370,6 +1370,47 @@ $$ LANGUAGE plpythonu; + + Transaction Management + + + In a procedure called from the top level or an anonymous code block + (DO command) called from the top level it is possible to + control transactions. To commit the current transaction, call + plpy.commit(). To roll back the current transaction, + call plpy.rollback(). (Note that it is not possible to + run the SQL commands COMMIT or + ROLLBACK via plpy.execute or + similar. It has to be done using these functions.) After a transaction is + ended, a new transaction is automatically started, so there is no separate + function for that. + + + + Here is an example: + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +CALL transaction_test1(); + + + + + Transactions cannot be ended when a cursor created by + plpy.cursor is open or when an explicit subtransaction + is active. + + + Utility Functions diff --git a/doc/src/sgml/pltcl.sgml b/doc/src/sgml/pltcl.sgml index 8018783b0a..a834ab8862 100644 --- a/doc/src/sgml/pltcl.sgml +++ b/doc/src/sgml/pltcl.sgml @@ -1002,6 +1002,47 @@ $$ LANGUAGE pltcl; + + Transaction Management + + + In a procedure called from the top level or an anonymous code block + (DO command) called from the top level it is possible + to control transactions. To commit the current transaction, call the + commit command. To roll back the current transaction, + call the rollback command. (Note that it is not + possible to run the SQL commands COMMIT or + ROLLBACK via spi_exec or similar. + It has to be done using these functions.) After a transaction is ended, + a new transaction is automatically started, so there is no separate + command for that. + + + + Here is an example: + +CREATE PROCEDURE transaction_test1() +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +$$; + +CALL transaction_test1(); + + + + + Transactions cannot be ended when an explicit subtransaction is active. + + + PL/Tcl Configuration diff --git a/doc/src/sgml/ref/call.sgml b/doc/src/sgml/ref/call.sgml index 2741d8d15e..03da4518ee 100644 --- a/doc/src/sgml/ref/call.sgml +++ b/doc/src/sgml/ref/call.sgml @@ -70,6 +70,13 @@ CALL name ( [ and diff --git a/doc/src/sgml/ref/do.sgml b/doc/src/sgml/ref/do.sgml index 061218b135..b9a6f9a6fd 100644 --- a/doc/src/sgml/ref/do.sgml +++ b/doc/src/sgml/ref/do.sgml @@ -91,6 +91,13 @@ DO [ LANGUAGE lang_name ] + + + If DO is executed in a transaction block, then the + procedure code cannot execute transaction control statements. Transaction + control statements are only allowed if DO is executed in + its own transaction. + diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml index 350f0863e9..10448922b1 100644 --- a/doc/src/sgml/spi.sgml +++ b/doc/src/sgml/spi.sgml @@ -64,6 +64,7 @@ SPI_connect + SPI_connect_ext SPI_connect @@ -72,12 +73,17 @@ SPI_connect + SPI_connect_ext connect a procedure to the SPI manager int SPI_connect(void) + + + +int SPI_connect_ext(int options) @@ -90,6 +96,31 @@ int SPI_connect(void) function if you want to execute commands through SPI. Some utility SPI functions can be called from unconnected procedures. + + + SPI_connect_ext does the same but has an argument that + allows passing option flags. Currently, the following option values are + available: + + + SPI_OPT_NONATOMIC + + + Sets the SPI connection to be nonatomic, which + means that transaction control calls SPI_commit, + SPI_rollback, and + SPI_start_transaction are allowed. Otherwise, + calling these functions will result in an immediate error. + + + + + + + + SPI_connect() is equivalent to + SPI_connect_ext(0). + @@ -4325,6 +4356,152 @@ int SPI_freeplan(SPIPlanPtr plan) + + Transaction Management + + + It is not possible to run transaction control commands such + as COMMIT and ROLLBACK through SPI + functions such as SPI_execute. There are, however, + separate interface functions that allow transaction control through SPI. + + + + It is not generally safe and sensible to start and end transactions in + arbitrary user-defined SQL-callable functions without taking into account + the context in which they are called. For example, a transaction boundary + in the middle of a function that is part of a complex SQL expression that + is part of some SQL command will probably result in obscure internal errors + or crashes. The interface functions presented here are primarily intended + to be used by procedural language implementations to support transaction + management in procedures that are invoked by the CALL + command, taking the context of the CALL invocation into + account. SPI procedures implemented in C can implement the same logic, but + the details of that are beyond the scope of this documentation. + + + + + + SPI_commit + + + SPI_commit + 3 + + + + SPI_commit + commit the current transaction + + + + +void SPI_commit(void) + + + + + Description + + + SPI_commit commits the current transaction. It is + approximately equivalent to running the SQL + command COMMIT. After a transaction is committed, a new + transaction has to be started + using SPI_start_transaction before further database + actions can be executed. + + + + This function can only be executed if the SPI connection has been set as + nonatomic in the call to SPI_connect_ext. + + + + + + + + SPI_rollback + + + SPI_rollback + 3 + + + + SPI_rollback + abort the current transaction + + + + +void SPI_rollback(void) + + + + + Description + + + SPI_rollback rolls back the current transaction. It + is approximately equivalent to running the SQL + command ROLLBACK. After a transaction is rolled back, a + new transaction has to be started + using SPI_start_transaction before further database + actions can be executed. + + + + This function can only be executed if the SPI connection has been set as + nonatomic in the call to SPI_connect_ext. + + + + + + + + SPI_start_transaction + + + SPI_start_transaction + 3 + + + + SPI_start_transaction + start a new transaction + + + + +void SPI_start_transaction(void) + + + + + Description + + + SPI_start_transaction starts a new transaction. It + can only be called after SPI_commit + or SPI_rollback, as there is no transaction active at + that point. Normally, when an SPI procedure is called, there is already a + transaction active, so attempting to start another one before closing out + the current one will result in an error. + + + + This function can only be executed if the SPI connection has been set as + nonatomic in the call to SPI_connect_ext. + + + + + + Visibility of Data Changes diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index ea08c3237c..df87dfeb54 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -65,6 +65,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -2136,9 +2137,11 @@ IsThereFunctionInNamespace(const char *proname, int pronargs, /* * ExecuteDoStmt * Execute inline procedural-language code + * + * See at ExecuteCallStmt() about the atomic argument. */ void -ExecuteDoStmt(DoStmt *stmt) +ExecuteDoStmt(DoStmt *stmt, bool atomic) { InlineCodeBlock *codeblock = makeNode(InlineCodeBlock); ListCell *arg; @@ -2200,6 +2203,7 @@ ExecuteDoStmt(DoStmt *stmt) codeblock->langOid = HeapTupleGetOid(languageTuple); languageStruct = (Form_pg_language) GETSTRUCT(languageTuple); codeblock->langIsTrusted = languageStruct->lanpltrusted; + codeblock->atomic = atomic; if (languageStruct->lanpltrusted) { @@ -2236,9 +2240,28 @@ ExecuteDoStmt(DoStmt *stmt) /* * Execute CALL statement + * + * Inside a top-level CALL statement, transaction-terminating commands such as + * COMMIT or a PL-specific equivalent are allowed. The terminology in the SQL + * standard is that CALL establishes a non-atomic execution context. Most + * other commands establish an atomic execution context, in which transaction + * control actions are not allowed. If there are nested executions of CALL, + * we want to track the execution context recursively, so that the nested + * CALLs can also do transaction control. Note, however, that for example in + * CALL -> SELECT -> CALL, the second call cannot do transaction control, + * because the SELECT in between establishes an atomic execution context. + * + * So when ExecuteCallStmt() is called from the top level, we pass in atomic = + * false (recall that that means transactions = yes). We then create a + * CallContext node with content atomic = false, which is passed in the + * fcinfo->context field to the procedure invocation. The language + * implementation should then take appropriate measures to allow or prevent + * transaction commands based on that information, e.g., call + * SPI_connect_ext(SPI_OPT_NONATOMIC). The language should also pass on the + * atomic flag to any nested invocations to CALL. */ void -ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) +ExecuteCallStmt(ParseState *pstate, CallStmt *stmt, bool atomic) { List *targs; ListCell *lc; @@ -2249,6 +2272,8 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) AclResult aclresult; FmgrInfo flinfo; FunctionCallInfoData fcinfo; + CallContext *callcontext; + HeapTuple tp; targs = NIL; foreach(lc, stmt->funccall->args) @@ -2284,8 +2309,24 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) FUNC_MAX_ARGS, FUNC_MAX_ARGS))); + callcontext = makeNode(CallContext); + callcontext->atomic = atomic; + + /* + * If proconfig is set we can't allow transaction commands because of the + * way the GUC stacking works: The transaction boundary would have to pop + * the proconfig setting off the stack. That restriction could be lifted + * by redesigning the GUC nesting mechanism a bit. + */ + tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(fexpr->funcid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for function %u", fexpr->funcid); + if (!heap_attisnull(tp, Anum_pg_proc_proconfig)) + callcontext->atomic = true; + ReleaseSysCache(tp); + fmgr_info(fexpr->funcid, &flinfo); - InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, NULL, NULL); + InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, (Node *) callcontext, NULL); i = 0; foreach (lc, fexpr->args) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 995f67d266..9fc4431b80 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -82,6 +82,12 @@ static bool _SPI_checktuples(void); int SPI_connect(void) +{ + return SPI_connect_ext(0); +} + +int +SPI_connect_ext(int options) { int newdepth; @@ -92,7 +98,7 @@ SPI_connect(void) elog(ERROR, "SPI stack corrupted"); newdepth = 16; _SPI_stack = (_SPI_connection *) - MemoryContextAlloc(TopTransactionContext, + MemoryContextAlloc(TopMemoryContext, newdepth * sizeof(_SPI_connection)); _SPI_stack_depth = newdepth; } @@ -124,19 +130,25 @@ SPI_connect(void) _SPI_current->execCxt = NULL; _SPI_current->connectSubid = GetCurrentSubTransactionId(); _SPI_current->queryEnv = NULL; + _SPI_current->atomic = (options & SPI_OPT_NONATOMIC ? false : true); + _SPI_current->internal_xact = false; /* * Create memory contexts for this procedure * - * XXX it would be better to use PortalContext as the parent context, but - * we may not be inside a portal (consider deferred-trigger execution). - * Perhaps CurTransactionContext would do? For now it doesn't matter - * because we clean up explicitly in AtEOSubXact_SPI(). + * In atomic contexts (the normal case), we use TopTransactionContext, + * otherwise PortalContext, so that it lives across transaction + * boundaries. + * + * XXX It could be better to use PortalContext as the parent context in + * all cases, but we may not be inside a portal (consider deferred-trigger + * execution). Perhaps CurTransactionContext could be an option? For now + * it doesn't matter because we clean up explicitly in AtEOSubXact_SPI(). */ - _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->procCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : PortalContext, "SPI Proc", ALLOCSET_DEFAULT_SIZES); - _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->execCxt = AllocSetContextCreate(_SPI_current->atomic ? TopTransactionContext : _SPI_current->procCxt, "SPI Exec", ALLOCSET_DEFAULT_SIZES); /* ... and switch to procedure's context */ @@ -181,12 +193,85 @@ SPI_finish(void) return SPI_OK_FINISH; } +void +SPI_start_transaction(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + StartTransactionCommand(); + MemoryContextSwitchTo(oldcontext); +} + +void +SPI_commit(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + if (_SPI_current->atomic) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("invalid transaction termination"))); + + /* + * This restriction is required by PLs implemented on top of SPI. They + * use subtransactions to establish exception blocks that are supposed to + * be rolled back together if there is an error. Terminating the + * top-level transaction in such a block violates that idea. A future PL + * implementation might have different ideas about this, in which case + * this restriction would have to be refined or the check possibly be + * moved out of SPI into the PLs. + */ + if (IsSubTransaction()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot commit while a subtransaction is active"))); + + _SPI_current->internal_xact = true; + + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + CommitTransactionCommand(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; +} + +void +SPI_rollback(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + if (_SPI_current->atomic) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("invalid transaction termination"))); + + /* see under SPI_commit() */ + if (IsSubTransaction()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot roll back while a subtransaction is active"))); + + _SPI_current->internal_xact = true; + + AbortCurrentTransaction(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; +} + /* * Clean up SPI state at transaction commit or abort. */ void AtEOXact_SPI(bool isCommit) { + /* + * Do nothing if the transaction end was initiated by SPI. + */ + if (_SPI_current && _SPI_current->internal_xact) + return; + /* * Note that memory contexts belonging to SPI stack entries will be freed * automatically, so we can ignore them here. We just need to restore our @@ -224,6 +309,9 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) if (connection->connectSubid != mySubid) break; /* couldn't be any underneath it either */ + if (connection->internal_xact) + break; + found = true; /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 26df660f35..3abe7d6155 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -530,7 +530,8 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; case T_DoStmt: - ExecuteDoStmt((DoStmt *) parsetree); + ExecuteDoStmt((DoStmt *) parsetree, + (context != PROCESS_UTILITY_TOPLEVEL || IsTransactionBlock())); break; case T_CreateTableSpaceStmt: @@ -659,7 +660,8 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; case T_CallStmt: - ExecuteCallStmt(pstate, castNode(CallStmt, parsetree)); + ExecuteCallStmt(pstate, castNode(CallStmt, parsetree), + (context != PROCESS_UTILITY_TOPLEVEL || IsTransactionBlock())); break; case T_ClusterStmt: diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 84c68ac189..f3f0add1d6 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -742,11 +742,8 @@ PreCommit_Portals(bool isPrepare) /* * Abort processing for portals. * - * At this point we reset "active" status and run the cleanup hook if - * present, but we can't release the portal's memory until the cleanup call. - * - * The reason we need to reset active is so that we can replace the unnamed - * portal, else we'll fail to execute ROLLBACK when it arrives. + * At this point we run the cleanup hook if present, but we can't release the + * portal's memory until the cleanup call. */ void AtAbort_Portals(void) @@ -760,17 +757,6 @@ AtAbort_Portals(void) { Portal portal = hentry->portal; - /* - * See similar code in AtSubAbort_Portals(). This would fire if code - * orchestrating multiple top-level transactions within a portal, such - * as VACUUM, caught errors and continued under the same portal with a - * fresh transaction. No part of core PostgreSQL functions that way. - * XXX Such code would wish the portal to remain ACTIVE, as in - * PreCommit_Portals(). - */ - if (portal->status == PORTAL_ACTIVE) - MarkPortalFailed(portal); - /* * Do nothing else to cursors held over from a previous transaction. */ @@ -810,9 +796,10 @@ AtAbort_Portals(void) * Although we can't delete the portal data structure proper, we can * release any memory in subsidiary contexts, such as executor state. * The cleanup hook was the last thing that might have needed data - * there. + * there. But leave active portals alone. */ - MemoryContextDeleteChildren(portal->portalContext); + if (portal->status != PORTAL_ACTIVE) + MemoryContextDeleteChildren(portal->portalContext); } } @@ -832,6 +819,13 @@ AtCleanup_Portals(void) { Portal portal = hentry->portal; + /* + * Do not touch active portals --- this can only happen in the case of + * a multi-transaction command. + */ + if (portal->status == PORTAL_ACTIVE) + continue; + /* Do nothing to cursors held over from a previous transaction */ if (portal->createSubid == InvalidSubTransactionId) { @@ -1161,3 +1155,22 @@ ThereAreNoReadyPortals(void) return true; } + +bool +ThereArePinnedPortals(void) +{ + HASH_SEQ_STATUS status; + PortalHashEnt *hentry; + + hash_seq_init(&status, PortalHashTable); + + while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) + { + Portal portal = hentry->portal; + + if (portal->portalPinned) + return true; + } + + return false; +} diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 41007162aa..7b824c95af 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -59,8 +59,8 @@ extern ObjectAddress CreateTransform(CreateTransformStmt *stmt); extern void DropTransformById(Oid transformOid); extern void IsThereFunctionInNamespace(const char *proname, int pronargs, oidvector *proargtypes, Oid nspOid); -extern void ExecuteDoStmt(DoStmt *stmt); -extern void ExecuteCallStmt(ParseState *pstate, CallStmt *stmt); +extern void ExecuteDoStmt(DoStmt *stmt, bool atomic); +extern void ExecuteCallStmt(ParseState *pstate, CallStmt *stmt, bool atomic); extern Oid get_cast_oid(Oid sourcetypeid, Oid targettypeid, bool missing_ok); extern Oid get_transform_oid(Oid type_id, Oid lang_id, bool missing_ok); extern void interpret_function_parameter_list(ParseState *pstate, diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index 43580c5158..e5bdaecc4e 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -65,6 +65,8 @@ typedef struct _SPI_plan *SPIPlanPtr; #define SPI_OK_REL_UNREGISTER 16 #define SPI_OK_TD_REGISTER 17 +#define SPI_OPT_NONATOMIC (1 << 0) + /* These used to be functions, now just no-ops for backwards compatibility */ #define SPI_push() ((void) 0) #define SPI_pop() ((void) 0) @@ -78,6 +80,7 @@ extern PGDLLIMPORT SPITupleTable *SPI_tuptable; extern PGDLLIMPORT int SPI_result; extern int SPI_connect(void); +extern int SPI_connect_ext(int options); extern int SPI_finish(void); extern int SPI_execute(const char *src, bool read_only, long tcount); extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls, @@ -156,6 +159,10 @@ extern int SPI_register_relation(EphemeralNamedRelation enr); extern int SPI_unregister_relation(const char *name); extern int SPI_register_trigger_data(TriggerData *tdata); +extern void SPI_start_transaction(void); +extern void SPI_commit(void); +extern void SPI_rollback(void); + extern void AtEOXact_SPI(bool isCommit); extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid); diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index 64f8a450eb..263c8f1453 100644 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -36,6 +36,10 @@ typedef struct MemoryContext savedcxt; /* context of SPI_connect's caller */ SubTransactionId connectSubid; /* ID of connecting subtransaction */ QueryEnvironment *queryEnv; /* query environment setup for SPI level */ + + /* transaction management support */ + bool atomic; /* atomic execution context, does not allow transactions */ + bool internal_xact; /* SPI-managed transaction boundary, skip cleanup */ } _SPI_connection; /* diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 2eb3d6d371..74b094a9c3 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -500,7 +500,8 @@ typedef enum NodeTag T_FdwRoutine, /* in foreign/fdwapi.h */ T_IndexAmRoutine, /* in access/amapi.h */ T_TsmRoutine, /* in access/tsmapi.h */ - T_ForeignKeyCacheInfo /* in utils/rel.h */ + T_ForeignKeyCacheInfo, /* in utils/rel.h */ + T_CallContext /* in nodes/parsenodes.h */ } NodeTag; /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 93122adae8..bbacbe144c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2789,6 +2789,7 @@ typedef struct InlineCodeBlock char *source_text; /* source text of anonymous code block */ Oid langOid; /* OID of selected language */ bool langIsTrusted; /* trusted property of the language */ + bool atomic; /* atomic execution context */ } InlineCodeBlock; /* ---------------------- @@ -2801,6 +2802,12 @@ typedef struct CallStmt FuncCall *funccall; } CallStmt; +typedef struct CallContext +{ + NodeTag type; + bool atomic; +} CallContext; + /* ---------------------- * Alter Object Rename Statement * ---------------------- diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index bc9d52e506..b903cb0fbe 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -231,5 +231,6 @@ extern PlannedStmt *PortalGetPrimaryStmt(Portal portal); extern void PortalCreateHoldStore(Portal portal); extern void PortalHashTableDeleteAll(void); extern bool ThereAreNoReadyPortals(void); +extern bool ThereArePinnedPortals(void); #endif /* PORTAL_H */ diff --git a/src/pl/plperl/GNUmakefile b/src/pl/plperl/GNUmakefile index b829027d05..933abb47c4 100644 --- a/src/pl/plperl/GNUmakefile +++ b/src/pl/plperl/GNUmakefile @@ -55,7 +55,7 @@ endif # win32 SHLIB_LINK = $(perl_embed_ldflags) REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=plperl --load-extension=plperlu -REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array plperl_call +REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array plperl_call plperl_transaction # if Perl can support two interpreters in one backend, # test plperl-and-plperlu cases ifneq ($(PERL),) diff --git a/src/pl/plperl/SPI.xs b/src/pl/plperl/SPI.xs index d9e6f579d4..b98c547e8b 100644 --- a/src/pl/plperl/SPI.xs +++ b/src/pl/plperl/SPI.xs @@ -152,6 +152,15 @@ spi_spi_cursor_close(sv) plperl_spi_cursor_close(cursor); pfree(cursor); +void +spi_spi_commit() + CODE: + plperl_spi_commit(); + +void +spi_spi_rollback() + CODE: + plperl_spi_rollback(); BOOT: items = 0; /* avoid 'unused variable' warning */ diff --git a/src/pl/plperl/expected/plperl_transaction.out b/src/pl/plperl/expected/plperl_transaction.out new file mode 100644 index 0000000000..bd7b7f8660 --- /dev/null +++ b/src/pl/plperl/expected/plperl_transaction.out @@ -0,0 +1,133 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plperl +$$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +return 1; +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination at line 5. +CONTEXT: PL/Perl function "transaction_test2" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query("CALL transaction_test1()"); +return 1; +$$; +SELECT transaction_test3(); +ERROR: invalid transaction termination at line 5. at line 2. +CONTEXT: PL/Perl function "transaction_test3" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query('DO LANGUAGE plperl $x$ spi_commit(); $x$'); +return 1; +$$; +SELECT transaction_test4(); +ERROR: invalid transaction termination at line 1. at line 2. +CONTEXT: PL/Perl function "transaction_test4" +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_commit(); +} +$$; +ERROR: cannot commit transaction while a cursor is open at line 6. +CONTEXT: PL/Perl anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_rollback(); +} +$$; +ERROR: cannot abort transaction while a cursor is open at line 6. +CONTEXT: PL/Perl anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c index 10feef11cf..77c41b2821 100644 --- a/src/pl/plperl/plperl.c +++ b/src/pl/plperl/plperl.c @@ -1929,7 +1929,7 @@ plperl_inline_handler(PG_FUNCTION_ARGS) current_call_data = &this_call_data; - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI manager"); select_perl_context(desc.lanpltrusted); @@ -2396,13 +2396,18 @@ plperl_call_perl_event_trigger_func(plperl_proc_desc *desc, static Datum plperl_func_handler(PG_FUNCTION_ARGS) { + bool nonatomic; plperl_proc_desc *prodesc; SV *perlret; Datum retval = 0; ReturnSetInfo *rsi; ErrorContextCallback pl_error_context; - if (SPI_connect() != SPI_OK_CONNECT) + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI manager"); prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false, false); @@ -3953,6 +3958,66 @@ plperl_spi_freeplan(char *query) SPI_freeplan(plan); } +void +plperl_spi_commit(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot commit transaction while a cursor is open"))); + + SPI_commit(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Punt the error to Perl */ + croak_cstr(edata->message); + } + PG_END_TRY(); +} + +void +plperl_spi_rollback(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction while a cursor is open"))); + + SPI_rollback(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Punt the error to Perl */ + croak_cstr(edata->message); + } + PG_END_TRY(); +} + /* * Implementation of plperl's elog() function * diff --git a/src/pl/plperl/plperl.h b/src/pl/plperl/plperl.h index 78366aac04..6fe7803088 100644 --- a/src/pl/plperl/plperl.h +++ b/src/pl/plperl/plperl.h @@ -125,6 +125,8 @@ HV *plperl_spi_exec_prepared(char *, HV *, int, SV **); SV *plperl_spi_query_prepared(char *, int, SV **); void plperl_spi_freeplan(char *); void plperl_spi_cursor_close(char *); +void plperl_spi_commit(void); +void plperl_spi_rollback(void); char *plperl_sv_to_literal(SV *, char *); void plperl_util_elog(int level, SV *msg); diff --git a/src/pl/plperl/sql/plperl_transaction.sql b/src/pl/plperl/sql/plperl_transaction.sql new file mode 100644 index 0000000000..5c14d4732e --- /dev/null +++ b/src/pl/plperl/sql/plperl_transaction.sql @@ -0,0 +1,120 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plperl +$$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +$$; + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plperl +AS $$ +foreach my $i (0..9) { + spi_exec_query("INSERT INTO test1 (a) VALUES ($i)"); + if ($i % 2 == 0) { + spi_commit(); + } else { + spi_rollback(); + } +} +return 1; +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query("CALL transaction_test1()"); +return 1; +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plperl +AS $$ +spi_exec_query('DO LANGUAGE plperl $x$ spi_commit(); $x$'); +return 1; +$$; + +SELECT transaction_test4(); + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_commit(); +} +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plperl $$ +my $sth = spi_query("SELECT * FROM test2 ORDER BY x"); +my $row; +while (defined($row = spi_fetchrow($sth))) { + spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")"); + spi_rollback(); +} +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plpgsql/src/Makefile b/src/pl/plpgsql/src/Makefile index 14a4d83584..91e1ada7ad 100644 --- a/src/pl/plpgsql/src/Makefile +++ b/src/pl/plpgsql/src/Makefile @@ -26,7 +26,7 @@ DATA = plpgsql.control plpgsql--1.0.sql plpgsql--unpackaged--1.0.sql REGRESS_OPTS = --dbname=$(PL_TESTDB) -REGRESS = plpgsql_call plpgsql_control +REGRESS = plpgsql_call plpgsql_control plpgsql_transaction all: all-lib diff --git a/src/pl/plpgsql/src/expected/plpgsql_transaction.out b/src/pl/plpgsql/src/expected/plpgsql_transaction.out new file mode 100644 index 0000000000..8ec22c646c --- /dev/null +++ b/src/pl/plpgsql/src/expected/plpgsql_transaction.out @@ -0,0 +1,241 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpgsql +$$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +-- transaction commands not allowed when called in transaction block +START TRANSACTION; +CALL transaction_test1(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT +COMMIT; +START TRANSACTION; +DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function inline_code_block line 1 at COMMIT +COMMIT; +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + RETURN 1; +END +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test2() line 6 at COMMIT +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + CALL transaction_test1(); + RETURN 1; +END; +$$; +SELECT transaction_test3(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test1() line 6 at COMMIT +SQL statement "CALL transaction_test1()" +PL/pgSQL function transaction_test3() line 3 at SQL statement +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE 'DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$'; + RETURN 1; +END; +$$; +SELECT transaction_test4(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function inline_code_block line 1 at COMMIT +SQL statement "DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$" +PL/pgSQL function transaction_test4() line 3 at EXECUTE +-- proconfig settings currently disallow transaction statements +CREATE PROCEDURE transaction_test5() +LANGUAGE plpgsql +SET work_mem = 555 +AS $$ +BEGIN + COMMIT; +END; +$$; +CALL transaction_test5(); +ERROR: invalid transaction termination +CONTEXT: PL/pgSQL function transaction_test5() line 3 at COMMIT +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + COMMIT; + END LOOP; +END; +$$; +ERROR: committing inside a cursor loop is not supported +CONTEXT: PL/pgSQL function inline_code_block line 7 at COMMIT +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; +ERROR: cannot abort transaction inside a cursor loop +CONTEXT: PL/pgSQL function inline_code_block line 7 at ROLLBACK +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- commit inside block with exception handler +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + COMMIT; + INSERT INTO test1 (a) VALUES (1/0); + COMMIT; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; +ERROR: cannot commit while a subtransaction is active +CONTEXT: PL/pgSQL function inline_code_block line 5 at COMMIT +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside block with exception handler +TRUNCATE test1; +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + ROLLBACK; + INSERT INTO test1 (a) VALUES (1/0); + ROLLBACK; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; +ERROR: cannot roll back while a subtransaction is active +CONTEXT: PL/pgSQL function inline_code_block line 5 at ROLLBACK +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- COMMIT failures +DO LANGUAGE plpgsql $$ +BEGIN + CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + INSERT INTO test3 (y) VALUES (2); + COMMIT; + INSERT INTO test3 (y) VALUES (3); -- won't get here +END; +$$; +ERROR: duplicate key value violates unique constraint "test3_y_key" +DETAIL: Key (y)=(1) already exists. +CONTEXT: PL/pgSQL function inline_code_block line 9 at COMMIT +SELECT * FROM test3; + y +--- + 1 +(1 row) + +DROP TABLE test1; +DROP TABLE test2; +DROP TABLE test3; diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index d096f242cd..4478c5332e 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -290,6 +290,10 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate *estate, PLpgSQL_stmt_dynexecute *stmt); static int exec_stmt_dynfors(PLpgSQL_execstate *estate, PLpgSQL_stmt_dynfors *stmt); +static int exec_stmt_commit(PLpgSQL_execstate *estate, + PLpgSQL_stmt_commit *stmt); +static int exec_stmt_rollback(PLpgSQL_execstate *estate, + PLpgSQL_stmt_rollback *stmt); static void plpgsql_estate_setup(PLpgSQL_execstate *estate, PLpgSQL_function *func, @@ -1731,6 +1735,14 @@ exec_stmt(PLpgSQL_execstate *estate, PLpgSQL_stmt *stmt) rc = exec_stmt_close(estate, (PLpgSQL_stmt_close *) stmt); break; + case PLPGSQL_STMT_COMMIT: + rc = exec_stmt_commit(estate, (PLpgSQL_stmt_commit *) stmt); + break; + + case PLPGSQL_STMT_ROLLBACK: + rc = exec_stmt_rollback(estate, (PLpgSQL_stmt_rollback *) stmt); + break; + default: estate->err_stmt = save_estmt; elog(ERROR, "unrecognized cmdtype: %d", stmt->cmd_type); @@ -4264,6 +4276,57 @@ exec_stmt_close(PLpgSQL_execstate *estate, PLpgSQL_stmt_close *stmt) return PLPGSQL_RC_OK; } +/* + * exec_stmt_commit + * + * Commit the transaction. + */ +static int +exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt) +{ + /* + * XXX This could be implemented by converting the pinned portals to + * holdable ones and organizing the cleanup separately. + */ + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("committing inside a cursor loop is not supported"))); + + SPI_commit(); + SPI_start_transaction(); + + estate->simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + + return PLPGSQL_RC_OK; +} + +/* + * exec_stmt_rollback + * + * Abort the transaction. + */ +static int +exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt) +{ + /* + * Unlike the COMMIT case above, this might not make sense at all, + * especially if the query driving the cursor loop has side effects. + */ + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction inside a cursor loop"))); + + SPI_rollback(); + SPI_start_transaction(); + + estate->simple_eval_estate = NULL; + plpgsql_create_econtext(estate); + + return PLPGSQL_RC_OK; +} /* ---------- * exec_assign_expr Put an expression's result into a variable. @@ -6767,8 +6830,7 @@ plpgsql_xact_cb(XactEvent event, void *arg) */ if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PREPARE) { - /* Shouldn't be any econtext stack entries left at commit */ - Assert(simple_econtext_stack == NULL); + simple_econtext_stack = NULL; if (shared_simple_eval_estate) FreeExecutorState(shared_simple_eval_estate); diff --git a/src/pl/plpgsql/src/pl_funcs.c b/src/pl/plpgsql/src/pl_funcs.c index 80b8448b7f..f0e85fcfcd 100644 --- a/src/pl/plpgsql/src/pl_funcs.c +++ b/src/pl/plpgsql/src/pl_funcs.c @@ -284,6 +284,10 @@ plpgsql_stmt_typename(PLpgSQL_stmt *stmt) return "CLOSE"; case PLPGSQL_STMT_PERFORM: return "PERFORM"; + case PLPGSQL_STMT_COMMIT: + return "COMMIT"; + case PLPGSQL_STMT_ROLLBACK: + return "ROLLBACK"; } return "unknown"; @@ -363,6 +367,8 @@ static void free_open(PLpgSQL_stmt_open *stmt); static void free_fetch(PLpgSQL_stmt_fetch *stmt); static void free_close(PLpgSQL_stmt_close *stmt); static void free_perform(PLpgSQL_stmt_perform *stmt); +static void free_commit(PLpgSQL_stmt_commit *stmt); +static void free_rollback(PLpgSQL_stmt_rollback *stmt); static void free_expr(PLpgSQL_expr *expr); @@ -443,6 +449,12 @@ free_stmt(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_PERFORM: free_perform((PLpgSQL_stmt_perform *) stmt); break; + case PLPGSQL_STMT_COMMIT: + free_commit((PLpgSQL_stmt_commit *) stmt); + break; + case PLPGSQL_STMT_ROLLBACK: + free_rollback((PLpgSQL_stmt_rollback *) stmt); + break; default: elog(ERROR, "unrecognized cmd_type: %d", stmt->cmd_type); break; @@ -590,6 +602,16 @@ free_perform(PLpgSQL_stmt_perform *stmt) free_expr(stmt->expr); } +static void +free_commit(PLpgSQL_stmt_commit *stmt) +{ +} + +static void +free_rollback(PLpgSQL_stmt_rollback *stmt) +{ +} + static void free_exit(PLpgSQL_stmt_exit *stmt) { @@ -777,6 +799,8 @@ static void dump_fetch(PLpgSQL_stmt_fetch *stmt); static void dump_cursor_direction(PLpgSQL_stmt_fetch *stmt); static void dump_close(PLpgSQL_stmt_close *stmt); static void dump_perform(PLpgSQL_stmt_perform *stmt); +static void dump_commit(PLpgSQL_stmt_commit *stmt); +static void dump_rollback(PLpgSQL_stmt_rollback *stmt); static void dump_expr(PLpgSQL_expr *expr); @@ -867,6 +891,12 @@ dump_stmt(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_PERFORM: dump_perform((PLpgSQL_stmt_perform *) stmt); break; + case PLPGSQL_STMT_COMMIT: + dump_commit((PLpgSQL_stmt_commit *) stmt); + break; + case PLPGSQL_STMT_ROLLBACK: + dump_rollback((PLpgSQL_stmt_rollback *) stmt); + break; default: elog(ERROR, "unrecognized cmd_type: %d", stmt->cmd_type); break; @@ -1239,6 +1269,20 @@ dump_perform(PLpgSQL_stmt_perform *stmt) printf("\n"); } +static void +dump_commit(PLpgSQL_stmt_commit *stmt) +{ + dump_ind(); + printf("COMMIT\n"); +} + +static void +dump_rollback(PLpgSQL_stmt_rollback *stmt) +{ + dump_ind(); + printf("ROLLBACK\n"); +} + static void dump_exit(PLpgSQL_stmt_exit *stmt) { diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y index d9cab1ad7e..42f6a2e161 100644 --- a/src/pl/plpgsql/src/pl_gram.y +++ b/src/pl/plpgsql/src/pl_gram.y @@ -198,6 +198,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %type stmt_return stmt_raise stmt_assert stmt_execsql %type stmt_dynexecute stmt_for stmt_perform stmt_getdiag %type stmt_open stmt_fetch stmt_move stmt_close stmt_null +%type stmt_commit stmt_rollback %type stmt_case stmt_foreach_a %type proc_exceptions @@ -260,6 +261,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %token K_COLLATE %token K_COLUMN %token K_COLUMN_NAME +%token K_COMMIT %token K_CONSTANT %token K_CONSTRAINT %token K_CONSTRAINT_NAME @@ -325,6 +327,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %token K_RETURN %token K_RETURNED_SQLSTATE %token K_REVERSE +%token K_ROLLBACK %token K_ROW_COUNT %token K_ROWTYPE %token K_SCHEMA @@ -897,6 +900,10 @@ proc_stmt : pl_block ';' { $$ = $1; } | stmt_null { $$ = $1; } + | stmt_commit + { $$ = $1; } + | stmt_rollback + { $$ = $1; } ; stmt_perform : K_PERFORM expr_until_semi @@ -2151,6 +2158,31 @@ stmt_null : K_NULL ';' } ; +stmt_commit : K_COMMIT ';' + { + PLpgSQL_stmt_commit *new; + + new = palloc(sizeof(PLpgSQL_stmt_commit)); + new->cmd_type = PLPGSQL_STMT_COMMIT; + new->lineno = plpgsql_location_to_lineno(@1); + + $$ = (PLpgSQL_stmt *)new; + } + ; + +stmt_rollback : K_ROLLBACK ';' + { + PLpgSQL_stmt_rollback *new; + + new = palloc(sizeof(PLpgSQL_stmt_rollback)); + new->cmd_type = PLPGSQL_STMT_ROLLBACK; + new->lineno = plpgsql_location_to_lineno(@1); + + $$ = (PLpgSQL_stmt *)new; + } + ; + + cursor_variable : T_DATUM { /* @@ -2387,6 +2419,7 @@ unreserved_keyword : | K_COLLATE | K_COLUMN | K_COLUMN_NAME + | K_COMMIT | K_CONSTANT | K_CONSTRAINT | K_CONSTRAINT_NAME @@ -2438,6 +2471,7 @@ unreserved_keyword : | K_RETURN | K_RETURNED_SQLSTATE | K_REVERSE + | K_ROLLBACK | K_ROW_COUNT | K_ROWTYPE | K_SCHEMA diff --git a/src/pl/plpgsql/src/pl_handler.c b/src/pl/plpgsql/src/pl_handler.c index 4c2ba2f734..c49428d923 100644 --- a/src/pl/plpgsql/src/pl_handler.c +++ b/src/pl/plpgsql/src/pl_handler.c @@ -219,15 +219,20 @@ PG_FUNCTION_INFO_V1(plpgsql_call_handler); Datum plpgsql_call_handler(PG_FUNCTION_ARGS) { + bool nonatomic; PLpgSQL_function *func; PLpgSQL_execstate *save_cur_estate; Datum retval; int rc; + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* * Connect to SPI manager */ - if ((rc = SPI_connect()) != SPI_OK_CONNECT) + if ((rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); /* Find or compile the function */ @@ -301,7 +306,7 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS) /* * Connect to SPI manager */ - if ((rc = SPI_connect()) != SPI_OK_CONNECT) + if ((rc = SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC)) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); /* Compile the anonymous code block */ diff --git a/src/pl/plpgsql/src/pl_scanner.c b/src/pl/plpgsql/src/pl_scanner.c index ee9aef8bbc..12a3e6b818 100644 --- a/src/pl/plpgsql/src/pl_scanner.c +++ b/src/pl/plpgsql/src/pl_scanner.c @@ -106,6 +106,7 @@ static const ScanKeyword unreserved_keywords[] = { PG_KEYWORD("collate", K_COLLATE, UNRESERVED_KEYWORD) PG_KEYWORD("column", K_COLUMN, UNRESERVED_KEYWORD) PG_KEYWORD("column_name", K_COLUMN_NAME, UNRESERVED_KEYWORD) + PG_KEYWORD("commit", K_COMMIT, UNRESERVED_KEYWORD) PG_KEYWORD("constant", K_CONSTANT, UNRESERVED_KEYWORD) PG_KEYWORD("constraint", K_CONSTRAINT, UNRESERVED_KEYWORD) PG_KEYWORD("constraint_name", K_CONSTRAINT_NAME, UNRESERVED_KEYWORD) @@ -158,6 +159,7 @@ static const ScanKeyword unreserved_keywords[] = { PG_KEYWORD("return", K_RETURN, UNRESERVED_KEYWORD) PG_KEYWORD("returned_sqlstate", K_RETURNED_SQLSTATE, UNRESERVED_KEYWORD) PG_KEYWORD("reverse", K_REVERSE, UNRESERVED_KEYWORD) + PG_KEYWORD("rollback", K_ROLLBACK, UNRESERVED_KEYWORD) PG_KEYWORD("row_count", K_ROW_COUNT, UNRESERVED_KEYWORD) PG_KEYWORD("rowtype", K_ROWTYPE, UNRESERVED_KEYWORD) PG_KEYWORD("schema", K_SCHEMA, UNRESERVED_KEYWORD) diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h index c571afa34b..a9b9d91de7 100644 --- a/src/pl/plpgsql/src/plpgsql.h +++ b/src/pl/plpgsql/src/plpgsql.h @@ -105,7 +105,9 @@ typedef enum PLpgSQL_stmt_type PLPGSQL_STMT_OPEN, PLPGSQL_STMT_FETCH, PLPGSQL_STMT_CLOSE, - PLPGSQL_STMT_PERFORM + PLPGSQL_STMT_PERFORM, + PLPGSQL_STMT_COMMIT, + PLPGSQL_STMT_ROLLBACK } PLpgSQL_stmt_type; /* @@ -433,6 +435,24 @@ typedef struct PLpgSQL_stmt_perform PLpgSQL_expr *expr; } PLpgSQL_stmt_perform; +/* + * COMMIT statement + */ +typedef struct PLpgSQL_stmt_commit +{ + PLpgSQL_stmt_type cmd_type; + int lineno; +} PLpgSQL_stmt_commit; + +/* + * ROLLBACK statement + */ +typedef struct PLpgSQL_stmt_rollback +{ + PLpgSQL_stmt_type cmd_type; + int lineno; +} PLpgSQL_stmt_rollback; + /* * GET DIAGNOSTICS item */ diff --git a/src/pl/plpgsql/src/sql/plpgsql_transaction.sql b/src/pl/plpgsql/src/sql/plpgsql_transaction.sql new file mode 100644 index 0000000000..02ee735079 --- /dev/null +++ b/src/pl/plpgsql/src/sql/plpgsql_transaction.sql @@ -0,0 +1,215 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpgsql +$$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; +END +$$; + +SELECT * FROM test1; + + +-- transaction commands not allowed when called in transaction block +START TRANSACTION; +CALL transaction_test1(); +COMMIT; + +START TRANSACTION; +DO LANGUAGE plpgsql $$ BEGIN COMMIT; END $$; +COMMIT; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + FOR i IN 0..9 LOOP + INSERT INTO test1 (a) VALUES (i); + IF i % 2 = 0 THEN + COMMIT; + ELSE + ROLLBACK; + END IF; + END LOOP; + RETURN 1; +END +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + CALL transaction_test1(); + RETURN 1; +END; +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE 'DO LANGUAGE plpgsql $x$ BEGIN COMMIT; END $x$'; + RETURN 1; +END; +$$; + +SELECT transaction_test4(); + + +-- proconfig settings currently disallow transaction statements +CREATE PROCEDURE transaction_test5() +LANGUAGE plpgsql +SET work_mem = 555 +AS $$ +BEGIN + COMMIT; +END; +$$; + +CALL transaction_test5(); + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + COMMIT; + END LOOP; +END; +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +DECLARE + r RECORD; +BEGIN + FOR r IN SELECT * FROM test2 ORDER BY x LOOP + INSERT INTO test1 (a) VALUES (r.x); + ROLLBACK; + END LOOP; +END; +$$; + +SELECT * FROM test1; + + +-- commit inside block with exception handler +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + COMMIT; + INSERT INTO test1 (a) VALUES (1/0); + COMMIT; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; + +SELECT * FROM test1; + + +-- rollback inside block with exception handler +TRUNCATE test1; + +DO LANGUAGE plpgsql $$ +BEGIN + BEGIN + INSERT INTO test1 (a) VALUES (1); + ROLLBACK; + INSERT INTO test1 (a) VALUES (1/0); + ROLLBACK; + EXCEPTION + WHEN division_by_zero THEN + RAISE NOTICE 'caught division_by_zero'; + END; +END; +$$; + +SELECT * FROM test1; + + +-- COMMIT failures +DO LANGUAGE plpgsql $$ +BEGIN + CREATE TABLE test3 (y int UNIQUE DEFERRABLE INITIALLY DEFERRED); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + COMMIT; + INSERT INTO test3 (y) VALUES (1); + INSERT INTO test3 (y) VALUES (2); + COMMIT; + INSERT INTO test3 (y) VALUES (3); -- won't get here +END; +$$; + +SELECT * FROM test3; + + +DROP TABLE test1; +DROP TABLE test2; +DROP TABLE test3; diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index cc91afebde..d09910835d 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -90,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_transaction \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc412..39b994f446 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -48,6 +48,7 @@ select module_contents(); Error Fatal SPIError + commit cursor debug error @@ -60,10 +61,11 @@ select module_contents(); quote_ident quote_literal quote_nullable + rollback spiexceptions subtransaction warning -(18 rows) +(20 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/expected/plpython_transaction.out b/src/pl/plpython/expected/plpython_transaction.out new file mode 100644 index 0000000000..1fadc69b63 --- /dev/null +++ b/src/pl/plpython/expected/plpython_transaction.out @@ -0,0 +1,135 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +return 1 +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +CONTEXT: PL/Python function "transaction_test2" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("CALL transaction_test1()") +return 1 +$$; +SELECT transaction_test3(); +ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination +CONTEXT: Traceback (most recent call last): + PL/Python function "transaction_test3", line 2, in + plpy.execute("CALL transaction_test1()") +PL/Python function "transaction_test3" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +return 1 +$$; +SELECT transaction_test4(); +ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination +CONTEXT: Traceback (most recent call last): + PL/Python function "transaction_test4", line 2, in + plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +PL/Python function "transaction_test4" +-- commit inside subtransaction (prohibited) +DO LANGUAGE plpythonu $$ +with plpy.subtransaction(): + plpy.commit() +$$; +WARNING: forcibly aborting a subtransaction that has not been exited +ERROR: cannot commit while a subtransaction is active +CONTEXT: PL/Python anonymous code block +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.commit() +$$; +ERROR: cannot commit transaction while a cursor is open +CONTEXT: PL/Python anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.rollback() +$$; +ERROR: cannot abort transaction while a cursor is open +CONTEXT: PL/Python anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 695de30583..5a197ce27a 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -60,7 +60,7 @@ static void plpython_error_callback(void *arg); static void plpython_inline_error_callback(void *arg); static void PLy_init_interp(void); -static PLyExecutionContext *PLy_push_execution_context(void); +static PLyExecutionContext *PLy_push_execution_context(bool atomic_context); static void PLy_pop_execution_context(void); /* static state for Python library conflict detection */ @@ -219,14 +219,19 @@ plpython2_validator(PG_FUNCTION_ARGS) Datum plpython_call_handler(PG_FUNCTION_ARGS) { + bool nonatomic; Datum retval; PLyExecutionContext *exec_ctx; ErrorContextCallback plerrcontext; PLy_initialize(); + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); /* @@ -235,7 +240,7 @@ plpython_call_handler(PG_FUNCTION_ARGS) * here and the PG_TRY. (plpython_error_callback expects the stack entry * to be there, so we have to make the context first.) */ - exec_ctx = PLy_push_execution_context(); + exec_ctx = PLy_push_execution_context(!nonatomic); /* * Setup error traceback support for ereport() @@ -303,7 +308,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS) PLy_initialize(); /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); MemSet(&fake_fcinfo, 0, sizeof(fake_fcinfo)); @@ -332,7 +337,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS) * need the stack entry, but for consistency with plpython_call_handler we * do it in this order.) */ - exec_ctx = PLy_push_execution_context(); + exec_ctx = PLy_push_execution_context(codeblock->atomic); /* * Setup error traceback support for ereport() @@ -430,12 +435,14 @@ PLy_get_scratch_context(PLyExecutionContext *context) } static PLyExecutionContext * -PLy_push_execution_context(void) +PLy_push_execution_context(bool atomic_context) { PLyExecutionContext *context; + /* Pick a memory context similar to what SPI uses. */ context = (PLyExecutionContext *) - MemoryContextAlloc(TopTransactionContext, sizeof(PLyExecutionContext)); + MemoryContextAlloc(atomic_context ? TopTransactionContext : PortalContext, + sizeof(PLyExecutionContext)); context->curr_proc = NULL; context->scratch_ctx = NULL; context->next = PLy_execution_contexts; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index 23f99e20ca..3d7dd13f0c 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -6,8 +6,10 @@ #include "postgres.h" +#include "access/xact.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" +#include "utils/snapmgr.h" #include "plpython.h" @@ -15,6 +17,7 @@ #include "plpy_cursorobject.h" #include "plpy_elog.h" +#include "plpy_main.h" #include "plpy_planobject.h" #include "plpy_resultobject.h" #include "plpy_spi.h" @@ -41,6 +44,8 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, PyObject *kw); static PyObject *PLy_quote_literal(PyObject *self, PyObject *args); static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args); static PyObject *PLy_quote_ident(PyObject *self, PyObject *args); +static PyObject *PLy_commit(PyObject *self, PyObject *args); +static PyObject *PLy_rollback(PyObject *self, PyObject *args); /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ @@ -95,6 +100,12 @@ static PyMethodDef PLy_methods[] = { */ {"cursor", PLy_cursor, METH_VARARGS, NULL}, + /* + * transaction control + */ + {"commit", PLy_commit, METH_NOARGS, NULL}, + {"rollback", PLy_rollback, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -577,3 +588,41 @@ PLy_output(volatile int level, PyObject *self, PyObject *args, PyObject *kw) */ Py_RETURN_NONE; } + +static PyObject * +PLy_commit(PyObject *self, PyObject *args) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot commit transaction while a cursor is open"))); + + SPI_commit(); + SPI_start_transaction(); + + /* was cleared at transaction end, reset pointer */ + exec_ctx->scratch_ctx = NULL; + + Py_RETURN_NONE; +} + +static PyObject * +PLy_rollback(PyObject *self, PyObject *args) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction while a cursor is open"))); + + SPI_rollback(); + SPI_start_transaction(); + + /* was cleared at transaction end, reset pointer */ + exec_ctx->scratch_ctx = NULL; + + Py_RETURN_NONE; +} diff --git a/src/pl/plpython/sql/plpython_transaction.sql b/src/pl/plpython/sql/plpython_transaction.sql new file mode 100644 index 0000000000..36c7b2ef38 --- /dev/null +++ b/src/pl/plpython/sql/plpython_transaction.sql @@ -0,0 +1,115 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +return 1 +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("CALL transaction_test1()") +return 1 +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +return 1 +$$; + +SELECT transaction_test4(); + + +-- commit inside subtransaction (prohibited) +DO LANGUAGE plpythonu $$ +with plpy.subtransaction(): + plpy.commit() +$$; + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.commit() +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.rollback() +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/tcl/Makefile b/src/pl/tcl/Makefile index 6a92a9b6aa..ef61ee596e 100644 --- a/src/pl/tcl/Makefile +++ b/src/pl/tcl/Makefile @@ -28,7 +28,7 @@ DATA = pltcl.control pltcl--1.0.sql pltcl--unpackaged--1.0.sql \ pltclu.control pltclu--1.0.sql pltclu--unpackaged--1.0.sql REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=pltcl -REGRESS = pltcl_setup pltcl_queries pltcl_call pltcl_start_proc pltcl_subxact pltcl_unicode +REGRESS = pltcl_setup pltcl_queries pltcl_call pltcl_start_proc pltcl_subxact pltcl_unicode pltcl_transaction # Tcl on win32 ships with import libraries only for Microsoft Visual C++, # which are not compatible with mingw gcc. Therefore we need to build a diff --git a/src/pl/tcl/expected/pltcl_transaction.out b/src/pl/tcl/expected/pltcl_transaction.out new file mode 100644 index 0000000000..007204b99a --- /dev/null +++ b/src/pl/tcl/expected/pltcl_transaction.out @@ -0,0 +1,100 @@ +-- suppress CONTEXT so that function OIDs aren't in output +\set VERBOSITY terse +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +return 1 +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE pltcl +AS $$ +spi_exec "CALL transaction_test1()" +return 1 +$$; +SELECT transaction_test3(); +ERROR: invalid transaction termination +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +CREATE PROCEDURE transaction_test4a() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + commit +} +$$; +CALL transaction_test4a(); +ERROR: cannot commit while a subtransaction is active +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +CREATE PROCEDURE transaction_test4b() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + rollback +} +$$; +CALL transaction_test4b(); +ERROR: cannot roll back while a subtransaction is active +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/tcl/pltcl.c b/src/pl/tcl/pltcl.c index 8f5847c4ff..5df4dfdf55 100644 --- a/src/pl/tcl/pltcl.c +++ b/src/pl/tcl/pltcl.c @@ -312,6 +312,10 @@ static int pltcl_SPI_lastoid(ClientData cdata, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]); static int pltcl_subtransaction(ClientData cdata, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]); +static int pltcl_commit(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]); +static int pltcl_rollback(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]); static void pltcl_subtrans_begin(MemoryContext oldcontext, ResourceOwner oldowner); @@ -524,6 +528,10 @@ pltcl_init_interp(pltcl_interp_desc *interp_desc, Oid prolang, bool pltrusted) pltcl_SPI_lastoid, NULL, NULL); Tcl_CreateObjCommand(interp, "subtransaction", pltcl_subtransaction, NULL, NULL); + Tcl_CreateObjCommand(interp, "commit", + pltcl_commit, NULL, NULL); + Tcl_CreateObjCommand(interp, "rollback", + pltcl_rollback, NULL, NULL); /************************************************************ * Call the appropriate start_proc, if there is one. @@ -797,6 +805,7 @@ static Datum pltcl_func_handler(PG_FUNCTION_ARGS, pltcl_call_state *call_state, bool pltrusted) { + bool nonatomic; pltcl_proc_desc *prodesc; Tcl_Interp *volatile interp; Tcl_Obj *tcl_cmd; @@ -804,8 +813,12 @@ pltcl_func_handler(PG_FUNCTION_ARGS, pltcl_call_state *call_state, int tcl_rc; Datum retval; + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* Connect to SPI manager */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI manager"); /* Find or compile the function */ @@ -2936,6 +2949,86 @@ pltcl_subtransaction(ClientData cdata, Tcl_Interp *interp, } +/********************************************************************** + * pltcl_commit() + * + * Commit the transaction and start a new one. + **********************************************************************/ +static int +pltcl_commit(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + SPI_commit(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Pass the error data to Tcl */ + pltcl_construct_errorCode(interp, edata); + UTF_BEGIN; + Tcl_SetObjResult(interp, Tcl_NewStringObj(UTF_E2U(edata->message), -1)); + UTF_END; + FreeErrorData(edata); + + return TCL_ERROR; + } + PG_END_TRY(); + + return TCL_OK; +} + + +/********************************************************************** + * pltcl_rollback() + * + * Abort the transaction and start a new one. + **********************************************************************/ +static int +pltcl_rollback(ClientData cdata, Tcl_Interp *interp, + int objc, Tcl_Obj *const objv[]) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + SPI_rollback(); + SPI_start_transaction(); + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Pass the error data to Tcl */ + pltcl_construct_errorCode(interp, edata); + UTF_BEGIN; + Tcl_SetObjResult(interp, Tcl_NewStringObj(UTF_E2U(edata->message), -1)); + UTF_END; + FreeErrorData(edata); + + return TCL_ERROR; + } + PG_END_TRY(); + + return TCL_OK; +} + + /********************************************************************** * pltcl_set_tuple_values() - Set variables for all attributes * of a given tuple diff --git a/src/pl/tcl/sql/pltcl_transaction.sql b/src/pl/tcl/sql/pltcl_transaction.sql new file mode 100644 index 0000000000..c752faf665 --- /dev/null +++ b/src/pl/tcl/sql/pltcl_transaction.sql @@ -0,0 +1,98 @@ +-- suppress CONTEXT so that function OIDs aren't in output +\set VERBOSITY terse + +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE pltcl +AS $$ +for {set i 0} {$i < 10} {incr i} { + spi_exec "INSERT INTO test1 (a) VALUES ($i)" + if {$i % 2 == 0} { + commit + } else { + rollback + } +} +return 1 +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE pltcl +AS $$ +spi_exec "CALL transaction_test1()" +return 1 +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +CREATE PROCEDURE transaction_test4a() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + commit +} +$$; + +CALL transaction_test4a(); + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +CREATE PROCEDURE transaction_test4b() +LANGUAGE pltcl +AS $$ +spi_exec -array row "SELECT * FROM test2 ORDER BY x" { + spi_exec "INSERT INTO test1 (a) VALUES ($row(x))" + rollback +} +$$; + +CALL transaction_test4b(); + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2;