From 89e850e6fda9e4e441712012abe971fe938d595a Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 5 Dec 2011 19:52:15 +0200 Subject: [PATCH] plpython: Add SPI cursor support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a function plpy.cursor that is similar to plpy.execute but uses an SPI cursor to avoid fetching the entire result set into memory. Jan UrbaƄski, reviewed by Steve Singer --- doc/src/sgml/plpython.sgml | 81 +++ src/pl/plpython/expected/plpython_spi.out | 151 ++++ .../expected/plpython_subtransaction.out | 66 ++ .../expected/plpython_subtransaction_0.out | 70 ++ .../expected/plpython_subtransaction_5.out | 70 ++ src/pl/plpython/expected/plpython_test.out | 6 +- src/pl/plpython/plpython.c | 642 ++++++++++++++++++ src/pl/plpython/sql/plpython_spi.sql | 116 ++++ .../plpython/sql/plpython_subtransaction.sql | 52 ++ 9 files changed, 1251 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml index eda2bbf34c..618f8d055e 100644 --- a/doc/src/sgml/plpython.sgml +++ b/doc/src/sgml/plpython.sgml @@ -891,6 +891,15 @@ $$ LANGUAGE plpythonu; can be modified. + + Note that calling plpy.execute will cause the entire + result set to be read into memory. Only use that function when you are sure + that the result set will be relatively small. If you don't want to risk + excessive memory usage when fetching large results, + use plpy.cursor rather + than plpy.execute. + + For example: @@ -958,6 +967,78 @@ $$ LANGUAGE plpythonu; + + Accessing Data with Cursors + + + The plpy.cursor function accepts the same arguments + as plpy.execute (except for limit) + and returns a cursor object, which allows you to process large result sets + in smaller chunks. As with plpy.execute, either a query + string or a plan object along with a list of arguments can be used. The + cursor object provides a fetch method that accepts an + integer parameter and returns a result object. Each time you + call fetch, the returned object will contain the next + batch of rows, never larger than the parameter value. Once all rows are + exhausted, fetch starts returning an empty result + object. Cursor objects also provide an + iterator + interface, yielding one row at a time until all rows are exhausted. + Data fetched that way is not returned as result objects, but rather as + dictionaries, each dictionary corresponding to a single result row. + + + + Cursors are automatically disposed of. But if you want to explicitly + release all resources held by a cursor, use the close + method. Once closed, a cursor cannot be fetched from anymore. + + + + + Do not confuse objects created by plpy.cursor with + DB-API cursors as defined by + the Python Database + API specification. They don't have anything in common except for + the name. + + + + + An example of two ways of processing data from a large table is: + +CREATE FUNCTION count_odd_iterator() RETURNS integer AS $$ +odd = 0 +for row in plpy.cursor("select num from largetable"): + if row['num'] % 2: + odd += 1 +return odd +$$ LANGUAGE plpythonu; + +CREATE FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$ +odd = 0 +cursor = plpy.cursor("select num from largetable") +while True: + rows = cursor.fetch(batch_size) + if not rows: + break + for row in rows: + if row['num'] % 2: + odd += 1 +return odd +$$ LANGUAGE plpythonu; + +CREATE FUNCTION count_odd_prepared() RETURNS integer AS $$ +odd = 0 +plan = plpy.prepare("select num from largetable where num % $1 <> 0", ["integer"]) +rows = list(plpy.cursor(plan, [2])) + +return len(rows) +$$ LANGUAGE plpythonu; + + + + Trapping Errors diff --git a/src/pl/plpython/expected/plpython_spi.out b/src/pl/plpython/expected/plpython_spi.out index 7f4ae5ca99..3b4d7a3010 100644 --- a/src/pl/plpython/expected/plpython_spi.out +++ b/src/pl/plpython/expected/plpython_spi.out @@ -133,3 +133,154 @@ CONTEXT: PL/Python function "result_nrows_test" 2 (1 row) +-- cursor objects +CREATE FUNCTION simple_cursor_test() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +does = 0 +for row in res: + if row['lname'] == 'doe': + does += 1 +return does +$$ LANGUAGE plpythonu; +CREATE FUNCTION double_cursor_close() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +res.close() +res.close() +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_fetch() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +assert len(res.fetch(3)) == 3 +assert len(res.fetch(3)) == 1 +assert len(res.fetch(3)) == 0 +assert len(res.fetch(3)) == 0 +try: + # use next() or __next__(), the method name changed in + # http://www.python.org/dev/peps/pep-3114/ + try: + res.next() + except AttributeError: + res.__next__() +except StopIteration: + pass +else: + assert False, "StopIteration not raised" +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users order by fname") +assert len(res.fetch(2)) == 2 + +item = None +try: + item = res.next() +except AttributeError: + item = res.__next__() +assert item['fname'] == 'rick' + +assert len(res.fetch(2)) == 1 +$$ LANGUAGE plpythonu; +CREATE FUNCTION fetch_after_close() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +res.close() +try: + res.fetch(1) +except ValueError: + pass +else: + assert False, "ValueError not raised" +$$ LANGUAGE plpythonu; +CREATE FUNCTION next_after_close() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +res.close() +try: + try: + res.next() + except AttributeError: + res.__next__() +except ValueError: + pass +else: + assert False, "ValueError not raised" +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users where false") +assert len(res.fetch(1)) == 0 +try: + try: + res.next() + except AttributeError: + res.__next__() +except StopIteration: + pass +else: + assert False, "StopIteration not raised" +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$ +plan = plpy.prepare( + "select fname, lname from users where fname like $1 || '%' order by fname", + ["text"]) +for row in plpy.cursor(plan, ["w"]): + yield row['fname'] +for row in plpy.cursor(plan, ["j"]): + yield row['fname'] +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$ +plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'", + ["text"]) +c = plpy.cursor(plan, ["a", "b"]) +$$ LANGUAGE plpythonu; +SELECT simple_cursor_test(); + simple_cursor_test +-------------------- + 3 +(1 row) + +SELECT double_cursor_close(); + double_cursor_close +--------------------- + +(1 row) + +SELECT cursor_fetch(); + cursor_fetch +-------------- + +(1 row) + +SELECT cursor_mix_next_and_fetch(); + cursor_mix_next_and_fetch +--------------------------- + +(1 row) + +SELECT fetch_after_close(); + fetch_after_close +------------------- + +(1 row) + +SELECT next_after_close(); + next_after_close +------------------ + +(1 row) + +SELECT cursor_fetch_next_empty(); + cursor_fetch_next_empty +------------------------- + +(1 row) + +SELECT cursor_plan(); + cursor_plan +------------- + willem + jane + john +(3 rows) + +SELECT cursor_plan_wrong_args(); +ERROR: TypeError: Expected sequence of 1 argument, got 2: ['a', 'b'] +CONTEXT: Traceback (most recent call last): + PL/Python function "cursor_plan_wrong_args", line 4, in + c = plpy.cursor(plan, ["a", "b"]) +PL/Python function "cursor_plan_wrong_args" diff --git a/src/pl/plpython/expected/plpython_subtransaction.out b/src/pl/plpython/expected/plpython_subtransaction.out index 515b0bb734..c2c22f0ae4 100644 --- a/src/pl/plpython/expected/plpython_subtransaction.out +++ b/src/pl/plpython/expected/plpython_subtransaction.out @@ -409,3 +409,69 @@ SELECT * FROM subtransaction_tbl; (1 row) DROP TABLE subtransaction_tbl; +-- cursor/subtransactions interactions +CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$ +with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10) +fetched = cur.fetch(10); +return int(fetched[5]["i"]) +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10); + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(10) + return int(fetched[5]["i"]) +return 0 # not reached +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + plpy.execute('create temporary table tmp(i) ' + 'as select generate_series(1, 10)') + plan = plpy.prepare("select i from tmp") + cur = plpy.cursor(plan) + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(5) + return fetched[2]["i"] +return 0 # not reached +$$ LANGUAGE plpythonu; +CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor('select 1') + plpy.execute("select no_such_function()") +except plpy.SPIError: + cur.close() + return True +return False # not reached +$$ LANGUAGE plpythonu; +SELECT cursor_in_subxact(); + cursor_in_subxact +------------------- + 16 +(1 row) + +SELECT cursor_aborted_subxact(); +ERROR: ValueError: iterating a cursor in an aborted subtransaction +CONTEXT: Traceback (most recent call last): + PL/Python function "cursor_aborted_subxact", line 8, in + fetched = cur.fetch(10) +PL/Python function "cursor_aborted_subxact" +SELECT cursor_plan_aborted_subxact(); +ERROR: ValueError: iterating a cursor in an aborted subtransaction +CONTEXT: Traceback (most recent call last): + PL/Python function "cursor_plan_aborted_subxact", line 10, in + fetched = cur.fetch(5) +PL/Python function "cursor_plan_aborted_subxact" +SELECT cursor_close_aborted_subxact(); +ERROR: ValueError: closing a cursor in an aborted subtransaction +CONTEXT: Traceback (most recent call last): + PL/Python function "cursor_close_aborted_subxact", line 7, in + cur.close() +PL/Python function "cursor_close_aborted_subxact" diff --git a/src/pl/plpython/expected/plpython_subtransaction_0.out b/src/pl/plpython/expected/plpython_subtransaction_0.out index 4017c41edc..ece0134a94 100644 --- a/src/pl/plpython/expected/plpython_subtransaction_0.out +++ b/src/pl/plpython/expected/plpython_subtransaction_0.out @@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl; (0 rows) DROP TABLE subtransaction_tbl; +-- cursor/subtransactions interactions +CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$ +with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10) +fetched = cur.fetch(10); +return int(fetched[5]["i"]) +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_in_subxact" +DETAIL: SyntaxError: invalid syntax (line 3) +CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10); + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(10) + return int(fetched[5]["i"]) +return 0 # not reached +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_aborted_subxact" +DETAIL: SyntaxError: invalid syntax (line 4) +CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + plpy.execute('create temporary table tmp(i) ' + 'as select generate_series(1, 10)') + plan = plpy.prepare("select i from tmp") + cur = plpy.cursor(plan) + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(5) + return fetched[2]["i"] +return 0 # not reached +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_plan_aborted_subxact" +DETAIL: SyntaxError: invalid syntax (line 4) +CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor('select 1') + plpy.execute("select no_such_function()") +except plpy.SPIError: + cur.close() + return True +return False # not reached +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_close_aborted_subxact" +DETAIL: SyntaxError: invalid syntax (line 4) +SELECT cursor_in_subxact(); +ERROR: function cursor_in_subxact() does not exist +LINE 1: SELECT cursor_in_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT cursor_aborted_subxact(); +ERROR: function cursor_aborted_subxact() does not exist +LINE 1: SELECT cursor_aborted_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT cursor_plan_aborted_subxact(); +ERROR: function cursor_plan_aborted_subxact() does not exist +LINE 1: SELECT cursor_plan_aborted_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT cursor_close_aborted_subxact(); +ERROR: function cursor_close_aborted_subxact() does not exist +LINE 1: SELECT cursor_close_aborted_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. diff --git a/src/pl/plpython/expected/plpython_subtransaction_5.out b/src/pl/plpython/expected/plpython_subtransaction_5.out index 9216151b94..66de239499 100644 --- a/src/pl/plpython/expected/plpython_subtransaction_5.out +++ b/src/pl/plpython/expected/plpython_subtransaction_5.out @@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl; (0 rows) DROP TABLE subtransaction_tbl; +-- cursor/subtransactions interactions +CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$ +with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10) +fetched = cur.fetch(10); +return int(fetched[5]["i"]) +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_in_subxact" +DETAIL: SyntaxError: invalid syntax (, line 3) +CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10); + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(10) + return int(fetched[5]["i"]) +return 0 # not reached +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_aborted_subxact" +DETAIL: SyntaxError: invalid syntax (, line 4) +CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + plpy.execute('create temporary table tmp(i) ' + 'as select generate_series(1, 10)') + plan = plpy.prepare("select i from tmp") + cur = plpy.cursor(plan) + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(5) + return fetched[2]["i"] +return 0 # not reached +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_plan_aborted_subxact" +DETAIL: SyntaxError: invalid syntax (, line 4) +CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor('select 1') + plpy.execute("select no_such_function()") +except plpy.SPIError: + cur.close() + return True +return False # not reached +$$ LANGUAGE plpythonu; +ERROR: could not compile PL/Python function "cursor_close_aborted_subxact" +DETAIL: SyntaxError: invalid syntax (, line 4) +SELECT cursor_in_subxact(); +ERROR: function cursor_in_subxact() does not exist +LINE 1: SELECT cursor_in_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT cursor_aborted_subxact(); +ERROR: function cursor_aborted_subxact() does not exist +LINE 1: SELECT cursor_aborted_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT cursor_plan_aborted_subxact(); +ERROR: function cursor_plan_aborted_subxact() does not exist +LINE 1: SELECT cursor_plan_aborted_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT cursor_close_aborted_subxact(); +ERROR: function cursor_close_aborted_subxact() does not exist +LINE 1: SELECT cursor_close_aborted_subxact(); + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index f2dda66532..a884fc0e27 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -43,9 +43,9 @@ contents.sort() return ", ".join(contents) $$ LANGUAGE plpythonu; select module_contents(); - module_contents ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning + module_contents +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning (1 row) CREATE FUNCTION elog_test() RETURNS void diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c index afd5dfce83..29e0ac7c45 100644 --- a/src/pl/plpython/plpython.c +++ b/src/pl/plpython/plpython.c @@ -134,6 +134,11 @@ typedef int Py_ssize_t; PyObject_HEAD_INIT(type) size, #endif +/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */ +#if PY_MAJOR_VERSION >= 3 +#define Py_TPFLAGS_HAVE_ITER 0 +#endif + /* define our text domain for translations */ #undef TEXTDOMAIN #define TEXTDOMAIN PG_TEXTDOMAIN("plpython") @@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject bool exited; } PLySubtransactionObject; +typedef struct PLyCursorObject +{ + PyObject_HEAD + char *portalname; + PLyTypeInfo result; + bool closed; +} PLyCursorObject; + /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ typedef struct ExceptionMap { @@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = { "PostgreSQL subtransaction context manager" }; +static char PLy_cursor_doc[] = { + "Wrapper around a PostgreSQL cursor" +}; + /* * the function definitions @@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *); static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *); static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *); +static PyObject *PLy_cursor(PyObject *self, PyObject *unused); +static PyObject *PLy_cursor_query(const char *query); +static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args); +static void PLy_cursor_dealloc(PyObject *arg); +static PyObject *PLy_cursor_iternext(PyObject *self); +static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args); +static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused); + static PyMethodDef PLy_plan_methods[] = { {"status", PLy_plan_status, METH_VARARGS, NULL}, @@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = { PLy_subtransaction_methods, /* tp_tpmethods */ }; +static PyMethodDef PLy_cursor_methods[] = { + {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL}, + {"close", PLy_cursor_close, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PLy_CursorType = { + PyVarObject_HEAD_INIT(NULL, 0) + "PLyCursor", /* tp_name */ + sizeof(PLyCursorObject), /* tp_size */ + 0, /* tp_itemsize */ + + /* + * methods + */ + PLy_cursor_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */ + PLy_cursor_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + PyObject_SelfIter, /* tp_iter */ + PLy_cursor_iternext, /* tp_iternext */ + PLy_cursor_methods, /* tp_tpmethods */ +}; + static PyMethodDef PLy_methods[] = { /* * logging methods @@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = { */ {"subtransaction", PLy_subtransaction, METH_NOARGS, NULL}, + /* + * create a cursor + */ + {"cursor", PLy_cursor, METH_VARARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -3833,6 +3904,575 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status) return (PyObject *) result; } +/* + * c = plpy.cursor("select * from largetable") + * c = plpy.cursor(plan, []) + */ +static PyObject * +PLy_cursor(PyObject *self, PyObject *args) +{ + char *query; + PyObject *plan; + PyObject *planargs = NULL; + + if (PyArg_ParseTuple(args, "s", &query)) + return PLy_cursor_query(query); + + PyErr_Clear(); + + if (PyArg_ParseTuple(args, "O|O", &plan, &planargs)) + return PLy_cursor_plan(plan, planargs); + + PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan"); + return NULL; +} + + +static PyObject * +PLy_cursor_query(const char *query) +{ + PLyCursorObject *cursor; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) + return NULL; + cursor->portalname = NULL; + cursor->closed = false; + PLy_typeinfo_init(&cursor->result); + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPIPlanPtr plan; + Portal portal; + + pg_verifymbstr(query, strlen(query), false); + + plan = SPI_prepare(query, 0, NULL); + if (plan == NULL) + elog(ERROR, "SPI_prepare failed: %s", + SPI_result_code_string(SPI_result)); + + portal = SPI_cursor_open(NULL, plan, NULL, NULL, + PLy_curr_procedure->fn_readonly); + SPI_freeplan(plan); + + if (portal == NULL) + elog(ERROR, "SPI_cursor_open() failed:%s", + SPI_result_code_string(SPI_result)); + + cursor->portalname = PLy_strdup(portal->name); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + Py_DECREF(cursor); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + Assert(cursor->portalname != NULL); + return (PyObject *) cursor; +} + +static PyObject * +PLy_cursor_plan(PyObject *ob, PyObject *args) +{ + PLyCursorObject *cursor; + volatile int nargs; + int i; + PLyPlanObject *plan; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + + if (args) + { + if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args)) + { + PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument"); + return NULL; + } + nargs = PySequence_Length(args); + } + else + nargs = 0; + + plan = (PLyPlanObject *) ob; + + if (nargs != plan->nargs) + { + char *sv; + PyObject *so = PyObject_Str(args); + + if (!so) + PLy_elog(ERROR, "could not execute plan"); + sv = PyString_AsString(so); + PLy_exception_set_plural(PyExc_TypeError, + "Expected sequence of %d argument, got %d: %s", + "Expected sequence of %d arguments, got %d: %s", + plan->nargs, + plan->nargs, nargs, sv); + Py_DECREF(so); + + return NULL; + } + + if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL) + return NULL; + cursor->portalname = NULL; + cursor->closed = false; + PLy_typeinfo_init(&cursor->result); + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + Portal portal; + char *volatile nulls; + volatile int j; + + if (nargs > 0) + nulls = palloc(nargs * sizeof(char)); + else + nulls = NULL; + + for (j = 0; j < nargs; j++) + { + PyObject *elem; + + elem = PySequence_GetItem(args, j); + if (elem != Py_None) + { + PG_TRY(); + { + plan->values[j] = + plan->args[j].out.d.func(&(plan->args[j].out.d), + -1, + elem); + } + PG_CATCH(); + { + Py_DECREF(elem); + PG_RE_THROW(); + } + PG_END_TRY(); + + Py_DECREF(elem); + nulls[j] = ' '; + } + else + { + Py_DECREF(elem); + plan->values[j] = + InputFunctionCall(&(plan->args[j].out.d.typfunc), + NULL, + plan->args[j].out.d.typioparam, + -1); + nulls[j] = 'n'; + } + } + + portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls, + PLy_curr_procedure->fn_readonly); + if (portal == NULL) + elog(ERROR, "SPI_cursor_open() failed:%s", + SPI_result_code_string(SPI_result)); + + cursor->portalname = PLy_strdup(portal->name); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + int k; + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* cleanup plan->values array */ + for (k = 0; k < nargs; k++) + { + if (!plan->args[k].out.d.typbyval && + (plan->values[k] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[k])); + plan->values[k] = PointerGetDatum(NULL); + } + } + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + Py_DECREF(cursor); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + for (i = 0; i < nargs; i++) + { + if (!plan->args[i].out.d.typbyval && + (plan->values[i] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[i])); + plan->values[i] = PointerGetDatum(NULL); + } + } + + Assert(cursor->portalname != NULL); + return (PyObject *) cursor; +} + +static void +PLy_cursor_dealloc(PyObject *arg) +{ + PLyCursorObject *cursor; + Portal portal; + + cursor = (PLyCursorObject *) arg; + + if (!cursor->closed) + { + portal = GetPortalByName(cursor->portalname); + + if (PortalIsValid(portal)) + SPI_cursor_close(portal); + } + + PLy_free(cursor->portalname); + cursor->portalname = NULL; + + PLy_typeinfo_dealloc(&cursor->result); + arg->ob_type->tp_free(arg); +} + +static PyObject * +PLy_cursor_iternext(PyObject *self) +{ + PLyCursorObject *cursor; + PyObject *ret; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + Portal portal; + + cursor = (PLyCursorObject *) self; + + if (cursor->closed) + { + PLy_exception_set(PyExc_ValueError, "iterating a closed cursor"); + return NULL; + } + + portal = GetPortalByName(cursor->portalname); + if (!PortalIsValid(portal)) + { + PLy_exception_set(PyExc_ValueError, + "iterating a cursor in an aborted subtransaction"); + return NULL; + } + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPI_cursor_fetch(portal, true, 1); + if (SPI_processed == 0) + { + PyErr_SetNone(PyExc_StopIteration); + ret = NULL; + } + else + { + if (cursor->result.is_rowtype != 1) + PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc); + + ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0], + SPI_tuptable->tupdesc); + } + + SPI_freetuptable(SPI_tuptable); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + SPI_freetuptable(SPI_tuptable); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode, + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + return ret; +} + +static PyObject * +PLy_cursor_fetch(PyObject *self, PyObject *args) +{ + PLyCursorObject *cursor; + int count; + PLyResultObject *ret; + volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; + Portal portal; + + if (!PyArg_ParseTuple(args, "i", &count)) + return NULL; + + cursor = (PLyCursorObject *) self; + + if (cursor->closed) + { + PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor"); + return NULL; + } + + portal = GetPortalByName(cursor->portalname); + if (!PortalIsValid(portal)) + { + PLy_exception_set(PyExc_ValueError, + "iterating a cursor in an aborted subtransaction"); + return NULL; + } + + ret = (PLyResultObject *) PLy_result_new(); + if (ret == NULL) + return NULL; + + oldcontext = CurrentMemoryContext; + oldowner = CurrentResourceOwner; + + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + SPI_cursor_fetch(portal, true, count); + + if (cursor->result.is_rowtype != 1) + PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc); + + Py_DECREF(ret->status); + ret->status = PyInt_FromLong(SPI_OK_FETCH); + + Py_DECREF(ret->nrows); + ret->nrows = PyInt_FromLong(SPI_processed); + + if (SPI_processed != 0) + { + int i; + + Py_DECREF(ret->rows); + ret->rows = PyList_New(SPI_processed); + + for (i = 0; i < SPI_processed; i++) + { + PyObject *row = PLyDict_FromTuple(&cursor->result, + SPI_tuptable->vals[i], + SPI_tuptable->tupdesc); + PyList_SetItem(ret->rows, i, row); + } + } + + SPI_freetuptable(SPI_tuptable); + + /* Commit the inner transaction, return to outer xact context */ + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* + * AtEOSubXact_SPI() should not have popped any SPI context, but just + * in case it did, make sure we remain connected. + */ + SPI_restore_connection(); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* Abort the inner transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + SPI_freetuptable(SPI_tuptable); + + /* + * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will + * have left us in a disconnected state. We need this hack to return + * to connected state. + */ + SPI_restore_connection(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode, + HASH_FIND, NULL); + /* We really should find it, but just in case have a fallback */ + Assert(entry != NULL); + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + return NULL; + } + PG_END_TRY(); + + return (PyObject *) ret; +} + +static PyObject * +PLy_cursor_close(PyObject *self, PyObject *unused) +{ + PLyCursorObject *cursor = (PLyCursorObject *) self; + + if (!cursor->closed) + { + Portal portal = GetPortalByName(cursor->portalname); + + if (!PortalIsValid(portal)) + { + PLy_exception_set(PyExc_ValueError, + "closing a cursor in an aborted subtransaction"); + return NULL; + } + + SPI_cursor_close(portal); + cursor->closed = true; + } + + Py_INCREF(Py_None); + return Py_None; +} + /* s = plpy.subtransaction() */ static PyObject * PLy_subtransaction(PyObject *self, PyObject *unused) @@ -4184,6 +4824,8 @@ PLy_init_plpy(void) elog(ERROR, "could not initialize PLy_ResultType"); if (PyType_Ready(&PLy_SubtransactionType) < 0) elog(ERROR, "could not initialize PLy_SubtransactionType"); + if (PyType_Ready(&PLy_CursorType) < 0) + elog(ERROR, "could not initialize PLy_CursorType"); #if PY_MAJOR_VERSION >= 3 PyModule_Create(&PLy_module); diff --git a/src/pl/plpython/sql/plpython_spi.sql b/src/pl/plpython/sql/plpython_spi.sql index 7f8f6a33d2..874b31e6df 100644 --- a/src/pl/plpython/sql/plpython_spi.sql +++ b/src/pl/plpython/sql/plpython_spi.sql @@ -105,3 +105,119 @@ else: $$ LANGUAGE plpythonu; SELECT result_nrows_test(); + + +-- cursor objects + +CREATE FUNCTION simple_cursor_test() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +does = 0 +for row in res: + if row['lname'] == 'doe': + does += 1 +return does +$$ LANGUAGE plpythonu; + +CREATE FUNCTION double_cursor_close() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +res.close() +res.close() +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_fetch() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +assert len(res.fetch(3)) == 3 +assert len(res.fetch(3)) == 1 +assert len(res.fetch(3)) == 0 +assert len(res.fetch(3)) == 0 +try: + # use next() or __next__(), the method name changed in + # http://www.python.org/dev/peps/pep-3114/ + try: + res.next() + except AttributeError: + res.__next__() +except StopIteration: + pass +else: + assert False, "StopIteration not raised" +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users order by fname") +assert len(res.fetch(2)) == 2 + +item = None +try: + item = res.next() +except AttributeError: + item = res.__next__() +assert item['fname'] == 'rick' + +assert len(res.fetch(2)) == 1 +$$ LANGUAGE plpythonu; + +CREATE FUNCTION fetch_after_close() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +res.close() +try: + res.fetch(1) +except ValueError: + pass +else: + assert False, "ValueError not raised" +$$ LANGUAGE plpythonu; + +CREATE FUNCTION next_after_close() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users") +res.close() +try: + try: + res.next() + except AttributeError: + res.__next__() +except ValueError: + pass +else: + assert False, "ValueError not raised" +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$ +res = plpy.cursor("select fname, lname from users where false") +assert len(res.fetch(1)) == 0 +try: + try: + res.next() + except AttributeError: + res.__next__() +except StopIteration: + pass +else: + assert False, "StopIteration not raised" +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$ +plan = plpy.prepare( + "select fname, lname from users where fname like $1 || '%' order by fname", + ["text"]) +for row in plpy.cursor(plan, ["w"]): + yield row['fname'] +for row in plpy.cursor(plan, ["j"]): + yield row['fname'] +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$ +plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'", + ["text"]) +c = plpy.cursor(plan, ["a", "b"]) +$$ LANGUAGE plpythonu; + +SELECT simple_cursor_test(); +SELECT double_cursor_close(); +SELECT cursor_fetch(); +SELECT cursor_mix_next_and_fetch(); +SELECT fetch_after_close(); +SELECT next_after_close(); +SELECT cursor_fetch_next_empty(); +SELECT cursor_plan(); +SELECT cursor_plan_wrong_args(); diff --git a/src/pl/plpython/sql/plpython_subtransaction.sql b/src/pl/plpython/sql/plpython_subtransaction.sql index a19cad5104..9ad6377c7c 100644 --- a/src/pl/plpython/sql/plpython_subtransaction.sql +++ b/src/pl/plpython/sql/plpython_subtransaction.sql @@ -242,3 +242,55 @@ SELECT pk_violation_inside_subtransaction(); SELECT * FROM subtransaction_tbl; DROP TABLE subtransaction_tbl; + +-- cursor/subtransactions interactions + +CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$ +with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10) +fetched = cur.fetch(10); +return int(fetched[5]["i"]) +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)") + cur.fetch(10); + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(10) + return int(fetched[5]["i"]) +return 0 # not reached +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$ +try: + with plpy.subtransaction(): + plpy.execute('create temporary table tmp(i) ' + 'as select generate_series(1, 10)') + plan = plpy.prepare("select i from tmp") + cur = plpy.cursor(plan) + plpy.execute("select no_such_function()") +except plpy.SPIError: + fetched = cur.fetch(5) + return fetched[2]["i"] +return 0 # not reached +$$ LANGUAGE plpythonu; + +CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$ +try: + with plpy.subtransaction(): + cur = plpy.cursor('select 1') + plpy.execute("select no_such_function()") +except plpy.SPIError: + cur.close() + return True +return False # not reached +$$ LANGUAGE plpythonu; + +SELECT cursor_in_subxact(); +SELECT cursor_aborted_subxact(); +SELECT cursor_plan_aborted_subxact(); +SELECT cursor_close_aborted_subxact();