Add large object functions catering to SQL callers.

With these, one need no longer manipulate large object descriptors and
extract numeric constants from header files in order to read and write
large object contents from SQL.

Pavel Stehule, reviewed by Rushabh Lathia.
This commit is contained in:
Noah Misch 2013-10-27 22:42:46 -04:00
parent 9c339eb4f8
commit c50b7c09d8
9 changed files with 362 additions and 7 deletions

View File

@ -3420,7 +3420,8 @@ SELECT format('Testing %3$s, %2$s, %s', 'one', 'two', 'three');
<para>
See also the aggregate function <function>string_agg</function> in
<xref linkend="functions-aggregate">.
<xref linkend="functions-aggregate"> and the large object functions
in <xref linkend="lo-funcs">.
</para>
</sect1>

View File

@ -526,11 +526,79 @@ int lo_unlink(PGconn *conn, Oid lobjId);
<title>Server-side Functions</title>
<para>
There are server-side functions callable from SQL that correspond to
each of the client-side functions described above; indeed, for the
most part the client-side functions are simply interfaces to the
equivalent server-side functions. The ones that are actually useful
to call via SQL commands are
Server-side functions tailored for manipulating large objects from SQL are
listed in <xref linkend="lo-funcs-table">.
</para>
<table id="lo-funcs-table">
<title>SQL-oriented Large Object Functions</title>
<tgroup cols="5">
<thead>
<row>
<entry>Function</entry>
<entry>Return Type</entry>
<entry>Description</entry>
<entry>Example</entry>
<entry>Result</entry>
</row>
</thead>
<tbody>
<row>
<entry>
<indexterm>
<primary>lo_create</primary>
</indexterm>
<literal><function>lo_create(<parameter>loid</parameter> <type>oid</type>, <parameter>string</parameter> <type>bytea</type>)</function></literal>
</entry>
<entry><type>oid</type></entry>
<entry>
Create a large object and store data there, returning its OID.
Pass <literal>0</> to have the system choose an OID.
</entry>
<entry><literal>lo_create(0, E'\\xffffff00')</literal></entry>
<entry><literal>24528</literal></entry>
</row>
<row>
<entry>
<indexterm>
<primary>lo_put</primary>
</indexterm>
<literal><function>lo_put(<parameter>loid</parameter> <type>oid</type>, <parameter>offset</parameter> <type>bigint</type>, <parameter>str</parameter> <type>bytea</type>)</function></literal>
</entry>
<entry><type>void</type></entry>
<entry>
Write data at the given offset.
</entry>
<entry><literal>lo_put(24528, 1, E'\\xaa')</literal></entry>
<entry></entry>
</row>
<row>
<entry>
<indexterm>
<primary>lo_get</primary>
</indexterm>
<literal><function>lo_get(<parameter>loid</parameter> <type>oid</type> <optional>, <parameter>from</parameter> <type>bigint</type>, <parameter>for</parameter> <type>int</type></optional>)</function></literal>
</entry>
<entry><type>bytea</type></entry>
<entry>
Extract contents or a substring thereof.
</entry>
<entry><literal>lo_get(24528, 0, 3)</literal></entry>
<entry><literal>\xffaaff</literal></entry>
</row>
</tbody>
</tgroup>
</table>
<para>
There are additional server-side functions corresponding to each of the
client-side functions described earlier; indeed, for the most part the
client-side functions are simply interfaces to the equivalent server-side
functions. The ones just as convenient to call via SQL commands are
<function>lo_creat</function><indexterm><primary>lo_creat</></>,
<function>lo_create</function><indexterm><primary>lo_create</></>,
<function>lo_unlink</function><indexterm><primary>lo_unlink</></>,

View File

@ -754,3 +754,154 @@ deleteLOfd(int fd)
{
cookies[fd] = NULL;
}
/*****************************************************************************
* Wrappers oriented toward SQL callers
*****************************************************************************/
/*
* Read [offset, offset+nbytes) within LO; when nbytes is -1, read to end.
*/
static bytea *
lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
{
LargeObjectDesc *loDesc;
int64 loSize;
int64 result_length;
int total_read PG_USED_FOR_ASSERTS_ONLY;
bytea *result = NULL;
/*
* We don't actually need to store into fscxt, but create it anyway to
* ensure that AtEOXact_LargeObject knows there is state to clean up
*/
CreateFSContext();
loDesc = inv_open(loOid, INV_READ, fscxt);
/* Permission check */
if (!lo_compat_privileges &&
pg_largeobject_aclcheck_snapshot(loDesc->id,
GetUserId(),
ACL_SELECT,
loDesc->snapshot) != ACLCHECK_OK)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for large object %u",
loDesc->id)));
/*
* Compute number of bytes we'll actually read, accommodating nbytes == -1
* and reads beyond the end of the LO.
*/
loSize = inv_seek(loDesc, 0, SEEK_END);
if (loSize > offset)
{
if (nbytes >= 0 && nbytes <= loSize - offset)
result_length = nbytes; /* request is wholly inside LO */
else
result_length = loSize - offset; /* adjust to end of LO */
}
else
result_length = 0; /* request is wholly outside LO */
/*
* A result_length calculated from loSize may not fit in a size_t. Check
* that the size will satisfy this and subsequently-enforced size limits.
*/
if (result_length > MaxAllocSize - VARHDRSZ)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("large object read request is too large")));
result = (bytea *) palloc(VARHDRSZ + result_length);
inv_seek(loDesc, offset, SEEK_SET);
total_read = inv_read(loDesc, VARDATA(result), result_length);
Assert(total_read == result_length);
SET_VARSIZE(result, result_length + VARHDRSZ);
inv_close(loDesc);
return result;
}
/*
* Read entire LO
*/
Datum
lo_get(PG_FUNCTION_ARGS)
{
Oid loOid = PG_GETARG_OID(0);
bytea *result;
result = lo_get_fragment_internal(loOid, 0, -1);
PG_RETURN_BYTEA_P(result);
}
/*
* Read range within LO
*/
Datum
lo_get_fragment(PG_FUNCTION_ARGS)
{
Oid loOid = PG_GETARG_OID(0);
int64 offset = PG_GETARG_INT64(1);
int32 nbytes = PG_GETARG_INT32(2);
bytea *result;
if (nbytes < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested length cannot be negative")));
result = lo_get_fragment_internal(loOid, offset, nbytes);
PG_RETURN_BYTEA_P(result);
}
/*
* Create LO with initial contents
*/
Datum
lo_create_bytea(PG_FUNCTION_ARGS)
{
Oid loOid = PG_GETARG_OID(0);
bytea *str = PG_GETARG_BYTEA_PP(1);
LargeObjectDesc *loDesc;
int written PG_USED_FOR_ASSERTS_ONLY;
CreateFSContext();
loOid = inv_create(loOid);
loDesc = inv_open(loOid, INV_WRITE, fscxt);
written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
Assert(written == VARSIZE_ANY_EXHDR(str));
inv_close(loDesc);
PG_RETURN_OID(loOid);
}
/*
* Update range within LO
*/
Datum
lo_put(PG_FUNCTION_ARGS)
{
Oid loOid = PG_GETARG_OID(0);
int64 offset = PG_GETARG_INT64(1);
bytea *str = PG_GETARG_BYTEA_PP(2);
LargeObjectDesc *loDesc;
int written PG_USED_FOR_ASSERTS_ONLY;
CreateFSContext();
loDesc = inv_open(loOid, INV_WRITE, fscxt);
inv_seek(loDesc, offset, SEEK_SET);
written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
Assert(written == VARSIZE_ANY_EXHDR(str));
inv_close(loDesc);
PG_RETURN_VOID();
}

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201310101
#define CATALOG_VERSION_NO 201310271
#endif

View File

@ -1055,6 +1055,15 @@ DESCR("truncate large object");
DATA(insert OID = 3172 ( lo_truncate64 PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 23 "23 20" _null_ _null_ _null_ _null_ lo_truncate64 _null_ _null_ _null_ ));
DESCR("truncate large object (64 bit)");
DATA(insert OID = 3457 ( lo_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 26 "26 17" _null_ _null_ _null_ _null_ lo_create_bytea _null_ _null_ _null_ ));
DESCR("create new large object with content");
DATA(insert OID = 3458 ( lo_get PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 17 "26" _null_ _null_ _null_ _null_ lo_get _null_ _null_ _null_ ));
DESCR("read entire large object");
DATA(insert OID = 3459 ( lo_get PGNSP PGUID 12 1 0 0 0 f f f f t f v 3 0 17 "26 20 23" _null_ _null_ _null_ _null_ lo_get_fragment _null_ _null_ _null_ ));
DESCR("read large object from offset for length");
DATA(insert OID = 3460 ( lo_put PGNSP PGUID 12 1 0 0 0 f f f f t f v 3 0 2278 "26 20 17" _null_ _null_ _null_ _null_ lo_put _null_ _null_ _null_ ));
DESCR("write data at offset");
DATA(insert OID = 959 ( on_pl PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "600 628" _null_ _null_ _null_ _null_ on_pl _null_ _null_ _null_ ));
DATA(insert OID = 960 ( on_sl PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "601 628" _null_ _null_ _null_ _null_ on_sl _null_ _null_ _null_ ));
DATA(insert OID = 961 ( close_pl PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 600 "600 628" _null_ _null_ _null_ _null_ close_pl _null_ _null_ _null_ ));

View File

@ -25,6 +25,7 @@ extern Datum lo_export(PG_FUNCTION_ARGS);
extern Datum lo_creat(PG_FUNCTION_ARGS);
extern Datum lo_create(PG_FUNCTION_ARGS);
extern Datum lo_create_bytea(PG_FUNCTION_ARGS);
extern Datum lo_open(PG_FUNCTION_ARGS);
extern Datum lo_close(PG_FUNCTION_ARGS);
@ -32,6 +33,10 @@ extern Datum lo_close(PG_FUNCTION_ARGS);
extern Datum loread(PG_FUNCTION_ARGS);
extern Datum lowrite(PG_FUNCTION_ARGS);
extern Datum lo_get(PG_FUNCTION_ARGS);
extern Datum lo_get_fragment(PG_FUNCTION_ARGS);
extern Datum lo_put(PG_FUNCTION_ARGS);
extern Datum lo_lseek(PG_FUNCTION_ARGS);
extern Datum lo_tell(PG_FUNCTION_ARGS);
extern Datum lo_lseek64(PG_FUNCTION_ARGS);

View File

@ -203,5 +203,26 @@ SELECT pageno, data FROM pg_largeobject WHERE loid = :newloid;
SELECT lo_unlink(loid) FROM lotest_stash_values;
\lo_unlink :newloid
\lo_import 'results/lotest.txt'
\set newloid_1 :LASTOID
SELECT lo_create(0, lo_get(:newloid_1)) AS newloid_2
\gset
SELECT md5(lo_get(:newloid_1)) = md5(lo_get(:newloid_2));
SELECT lo_get(:newloid_1, 0, 20);
SELECT lo_get(:newloid_1, 10, 20);
SELECT lo_put(:newloid_1, 5, decode('afafafaf', 'hex'));
SELECT lo_get(:newloid_1, 0, 20);
SELECT lo_put(:newloid_1, 4294967310, 'foo');
SELECT lo_get(:newloid_1);
SELECT lo_get(:newloid_1, 4294967294, 100);
\lo_unlink :newloid_1
\lo_unlink :newloid_2
TRUNCATE lotest_stash_values;
DROP ROLE regresslo;

View File

@ -391,5 +391,55 @@ SELECT lo_unlink(loid) FROM lotest_stash_values;
(1 row)
\lo_unlink :newloid
\lo_import 'results/lotest.txt'
\set newloid_1 :LASTOID
SELECT lo_create(0, lo_get(:newloid_1)) AS newloid_2
\gset
SELECT md5(lo_get(:newloid_1)) = md5(lo_get(:newloid_2));
?column?
----------
t
(1 row)
SELECT lo_get(:newloid_1, 0, 20);
lo_get
-------------------------------------------
8800\0110\0110\0110\0110\0110\0110\011800
(1 row)
SELECT lo_get(:newloid_1, 10, 20);
lo_get
-------------------------------------------
\0110\0110\0110\011800\011800\0113800\011
(1 row)
SELECT lo_put(:newloid_1, 5, decode('afafafaf', 'hex'));
lo_put
--------
(1 row)
SELECT lo_get(:newloid_1, 0, 20);
lo_get
-------------------------------------------------
8800\011\257\257\257\2570\0110\0110\0110\011800
(1 row)
SELECT lo_put(:newloid_1, 4294967310, 'foo');
lo_put
--------
(1 row)
SELECT lo_get(:newloid_1);
ERROR: large object read request is too large
SELECT lo_get(:newloid_1, 4294967294, 100);
lo_get
---------------------------------------------------------------------
\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000foo
(1 row)
\lo_unlink :newloid_1
\lo_unlink :newloid_2
TRUNCATE lotest_stash_values;
DROP ROLE regresslo;

View File

@ -391,5 +391,55 @@ SELECT lo_unlink(loid) FROM lotest_stash_values;
(1 row)
\lo_unlink :newloid
\lo_import 'results/lotest.txt'
\set newloid_1 :LASTOID
SELECT lo_create(0, lo_get(:newloid_1)) AS newloid_2
\gset
SELECT md5(lo_get(:newloid_1)) = md5(lo_get(:newloid_2));
?column?
----------
t
(1 row)
SELECT lo_get(:newloid_1, 0, 20);
lo_get
-------------------------------------------
8800\0110\0110\0110\0110\0110\0110\011800
(1 row)
SELECT lo_get(:newloid_1, 10, 20);
lo_get
-------------------------------------------
\0110\0110\0110\011800\011800\0113800\011
(1 row)
SELECT lo_put(:newloid_1, 5, decode('afafafaf', 'hex'));
lo_put
--------
(1 row)
SELECT lo_get(:newloid_1, 0, 20);
lo_get
-------------------------------------------------
8800\011\257\257\257\2570\0110\0110\0110\011800
(1 row)
SELECT lo_put(:newloid_1, 4294967310, 'foo');
lo_put
--------
(1 row)
SELECT lo_get(:newloid_1);
ERROR: large object read request is too large
SELECT lo_get(:newloid_1, 4294967294, 100);
lo_get
---------------------------------------------------------------------
\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000foo
(1 row)
\lo_unlink :newloid_1
\lo_unlink :newloid_2
TRUNCATE lotest_stash_values;
DROP ROLE regresslo;