From 4f44aa04b53f26d3abbf64beb0c1b3d10be324a3 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Tue, 24 Oct 2000 01:38:44 +0000 Subject: [PATCH] Major overhaul of large-object implementation, by Denis Perchine with kibitzing from Tom Lane. Large objects are now all stored in a single system relation "pg_largeobject" --- no more xinv or xinx files, no more relkind 'l'. This should offer substantial performance improvement for large numbers of LOs, since there won't be directory bloat anymore. It'll also fix problems like running out of locktable space when you access thousands of LOs in one transaction. Also clean up cruft in read/write routines. LOs with "holes" in them (never-written byte ranges) now work just like Unix files with holes do: a hole reads as zeroes but doesn't occupy storage space. INITDB forced! --- contrib/pg_dumplo/lo_export.c | 5 +- contrib/vacuumlo/vacuumlo.c | 7 +- doc/src/sgml/ref/psql-ref.sgml | 5 +- src/backend/catalog/Makefile | 7 +- src/backend/catalog/indexing.c | 4 +- src/backend/catalog/pg_largeobject.c | 184 +++ src/backend/libpq/be-fsstubs.c | 46 +- src/backend/storage/large_object/inv_api.c | 1497 +++++--------------- src/bin/pg_dump/pg_dump.c | 14 +- src/bin/pgtclsh/updateStats.tcl | 2 +- src/bin/psql/describe.c | 10 +- src/bin/psql/large_obj.c | 18 +- src/include/catalog/catname.h | 3 +- src/include/catalog/catversion.h | 4 +- src/include/catalog/indexing.h | 6 +- src/include/catalog/pg_class.h | 3 +- src/include/catalog/pg_largeobject.h | 63 + src/include/storage/large_object.h | 52 +- src/interfaces/odbc/info.c | 3 +- src/test/regress/expected/opr_sanity.out | 4 +- src/test/regress/expected/sanity_check.out | 3 +- 21 files changed, 735 insertions(+), 1205 deletions(-) create mode 100644 src/backend/catalog/pg_largeobject.c create mode 100644 src/include/catalog/pg_largeobject.h diff --git a/contrib/pg_dumplo/lo_export.c b/contrib/pg_dumplo/lo_export.c index e18c3ef651..248cf831f5 100644 --- a/contrib/pg_dumplo/lo_export.c +++ b/contrib/pg_dumplo/lo_export.c @@ -94,7 +94,7 @@ pglo_export(LODumpMaster *pgLO) * Query * ---------- */ - sprintf(Qbuff, "SELECT x.%s FROM %s x, pg_class c WHERE x.%s = c.oid and c.relkind = 'l'", + sprintf(Qbuff, "SELECT DISTINCT x.\"%s\" FROM \"%s\" x, pg_largeobject l WHERE x.\"%s\" = l.loid", ll->lo_attr, ll->lo_table, ll->lo_attr); /* puts(Qbuff); */ @@ -104,7 +104,8 @@ pglo_export(LODumpMaster *pgLO) if ((tuples = PQntuples(pgLO->res)) == 0) { if (!pgLO->quiet && pgLO->action == ACTION_EXPORT_ATTR) - printf("%s: no large objets in '%s'\n", progname, ll->lo_table); + printf("%s: no large objects in '%s'\n", + progname, ll->lo_table); continue; } else if (check_res(pgLO)) { diff --git a/contrib/vacuumlo/vacuumlo.c b/contrib/vacuumlo/vacuumlo.c index 3f2c592c09..6e46caf8dd 100644 --- a/contrib/vacuumlo/vacuumlo.c +++ b/contrib/vacuumlo/vacuumlo.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/contrib/vacuumlo/vacuumlo.c,v 1.5 2000/06/19 13:54:50 momjian Exp $ + * $Header: /cvsroot/pgsql/contrib/vacuumlo/vacuumlo.c,v 1.6 2000/10/24 01:38:20 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -59,10 +59,9 @@ vacuumlo(char *database, int verbose) * First we create and populate the lo temp table */ buf[0] = '\0'; - strcat(buf, "SELECT oid AS lo "); + strcat(buf, "SELECT DISTINCT loid AS lo "); strcat(buf, "INTO TEMP TABLE vacuum_l "); - strcat(buf, "FROM pg_class "); - strcat(buf, "WHERE relkind='l'"); + strcat(buf, "FROM pg_largeobject "); if (!(res = PQexec(conn, buf))) { fprintf(stderr, "Failed to create temp table.\n"); diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index c8daa1f7a4..446449d95e 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -1,5 +1,5 @@ @@ -706,7 +706,8 @@ lo_import 152801 Shows a list of all Postgres large - objects currently stored in the database along with their owners. + objects currently stored in the database, along with any + comments provided for them. diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 6a5beee94d..e17a37388c 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -2,7 +2,7 @@ # # Makefile for catalog # -# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.30 2000/10/22 05:27:10 momjian Exp $ +# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.31 2000/10/24 01:38:23 tgl Exp $ # #------------------------------------------------------------------------- @@ -11,7 +11,8 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = catalog.o heap.o index.o indexing.o aclchk.o \ - pg_aggregate.o pg_operator.o pg_proc.o pg_type.o + pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \ + pg_type.o BKIFILES = global.bki template1.bki global.description template1.description @@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_proc.h pg_type.h pg_attribute.h pg_class.h \ pg_inherits.h pg_index.h pg_statistic.h \ pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ - pg_language.h \ + pg_language.h pg_largeobject.h \ pg_aggregate.h pg_ipl.h pg_inheritproc.h \ pg_rewrite.h pg_listener.h pg_description.h indexing.h \ ) diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c index 342896a93b..1a96c3f5ea 100644 --- a/src/backend/catalog/indexing.c +++ b/src/backend/catalog/indexing.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.71 2000/10/22 05:27:10 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.72 2000/10/24 01:38:22 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -51,6 +51,8 @@ char *Name_pg_inherits_indices[Num_pg_inherits_indices] = {InheritsRelidSeqnoIndex}; char *Name_pg_language_indices[Num_pg_language_indices] = {LanguageOidIndex, LanguageNameIndex}; +char *Name_pg_largeobject_indices[Num_pg_largeobject_indices] = +{LargeObjectLOidPNIndex}; char *Name_pg_listener_indices[Num_pg_listener_indices] = {ListenerPidRelnameIndex}; char *Name_pg_opclass_indices[Num_pg_opclass_indices] = diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c new file mode 100644 index 0000000000..c471a9ae13 --- /dev/null +++ b/src/backend/catalog/pg_largeobject.c @@ -0,0 +1,184 @@ +/*------------------------------------------------------------------------- + * + * pg_largeobject.c + * routines to support manipulation of the pg_largeobject relation + * + * Portions Copyright (c) 1996-2000, PostgreSQL, Inc + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.5 2000/10/24 01:38:23 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "catalog/catname.h" +#include "catalog/indexing.h" +#include "catalog/pg_largeobject.h" +#include "miscadmin.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" + + +/* + * Create a large object having the given LO identifier. + * + * We do this by inserting an empty first page, so that the object will + * appear to exist with size 0. Note that the unique index will reject + * an attempt to create a duplicate page. + * + * Return value is OID assigned to the page tuple (any use in it?) + */ +Oid +LargeObjectCreate(Oid loid) +{ + Oid retval; + Relation pg_largeobject; + HeapTuple ntup; + Relation idescs[Num_pg_largeobject_indices]; + Datum values[Natts_pg_largeobject]; + char nulls[Natts_pg_largeobject]; + int i; + + pg_largeobject = heap_openr(LargeObjectRelationName, RowExclusiveLock); + + /* + * Form new tuple + */ + for (i = 0; i < Natts_pg_largeobject; i++) + { + values[i] = (Datum)NULL; + nulls[i] = ' '; + } + + i = 0; + values[i++] = ObjectIdGetDatum(loid); + values[i++] = Int32GetDatum(0); + values[i++] = DirectFunctionCall1(byteain, + CStringGetDatum("")); + + ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls); + + /* + * Insert it + */ + retval = heap_insert(pg_largeobject, ntup); + + /* + * Update indices + */ + if (!IsIgnoringSystemIndexes()) + { + CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs); + CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup); + CatalogCloseIndices(Num_pg_largeobject_indices, idescs); + } + + heap_close(pg_largeobject, RowExclusiveLock); + + heap_freetuple(ntup); + + return retval; +} + +void +LargeObjectDrop(Oid loid) +{ + bool found = false; + Relation pg_largeobject; + Relation pg_lo_idx; + ScanKeyData skey[1]; + IndexScanDesc sd; + RetrieveIndexResult indexRes; + HeapTupleData tuple; + Buffer buffer; + + ScanKeyEntryInitialize(&skey[0], + (bits16) 0x0, + (AttrNumber) 1, + (RegProcedure) F_OIDEQ, + ObjectIdGetDatum(loid)); + + pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock); + pg_lo_idx = index_openr(LargeObjectLOidPNIndex); + + sd = index_beginscan(pg_lo_idx, false, 1, skey); + + tuple.t_datamcxt = CurrentMemoryContext; + tuple.t_data = NULL; + + while ((indexRes = index_getnext(sd, ForwardScanDirection))) + { + tuple.t_self = indexRes->heap_iptr; + heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer); + pfree(indexRes); + if (tuple.t_data != NULL) + { + heap_delete(pg_largeobject, &tuple.t_self, NULL); + ReleaseBuffer(buffer); + found = true; + } + } + + index_endscan(sd); + + index_close(pg_lo_idx); + heap_close(pg_largeobject, RowShareLock); + + if (!found) + elog(ERROR, "LargeObjectDrop: large object %u not found", loid); +} + +bool +LargeObjectExists(Oid loid) +{ + bool retval = false; + Relation pg_largeobject; + Relation pg_lo_idx; + ScanKeyData skey[1]; + IndexScanDesc sd; + RetrieveIndexResult indexRes; + HeapTupleData tuple; + Buffer buffer; + + /* + * See if we can find any tuples belonging to the specified LO + */ + ScanKeyEntryInitialize(&skey[0], + (bits16) 0x0, + (AttrNumber) 1, + (RegProcedure) F_OIDEQ, + ObjectIdGetDatum(loid)); + + pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock); + pg_lo_idx = index_openr(LargeObjectLOidPNIndex); + + sd = index_beginscan(pg_lo_idx, false, 1, skey); + + tuple.t_datamcxt = CurrentMemoryContext; + tuple.t_data = NULL; + + while ((indexRes = index_getnext(sd, ForwardScanDirection))) + { + tuple.t_self = indexRes->heap_iptr; + heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer); + pfree(indexRes); + if (tuple.t_data != NULL) + { + retval = true; + ReleaseBuffer(buffer); + break; + } + } + + index_endscan(sd); + + index_close(pg_lo_idx); + heap_close(pg_largeobject, RowShareLock); + + return retval; +} diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c index bb5c7f6e55..7eff84e5d3 100644 --- a/src/backend/libpq/be-fsstubs.c +++ b/src/backend/libpq/be-fsstubs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.54 2000/10/22 05:27:12 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.55 2000/10/24 01:38:26 tgl Exp $ * * NOTES * This should be moved to a more appropriate place. It is here @@ -32,13 +32,13 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" + #include #include #include #include -#include "postgres.h" - #include "catalog/pg_shadow.h" #include "libpq/be-fsstubs.h" #include "libpq/libpq-fs.h" @@ -50,8 +50,7 @@ /*#define FSDB 1*/ #define MAX_LOBJ_FDS 256 -#define BUFSIZE 1024 -#define FNAME_BUFSIZE 8192 +#define BUFSIZE 8192 /* * LO "FD"s are indexes into this array. @@ -141,10 +140,10 @@ lo_close(PG_FUNCTION_ARGS) inv_close(cookies[fd]); - MemoryContextSwitchTo(currentContext); - deleteLOfd(fd); + MemoryContextSwitchTo(currentContext); + PG_RETURN_INT32(0); } @@ -267,7 +266,7 @@ lo_creat(PG_FUNCTION_ARGS) PG_RETURN_OID(InvalidOid); } - lobjId = RelationGetRelid(lobjDesc->heap_r); + lobjId = lobjDesc->id; inv_close(lobjDesc); @@ -310,8 +309,8 @@ lo_unlink(PG_FUNCTION_ARGS) * any LO-specific data structures at all. (Again, that's probably * more than this module ought to be assuming.) * - * XXX there ought to be some code to clean up any open LOs that - * reference the specified relation... as is, they remain "open". + * XXX there ought to be some code to clean up any open LO FDs that + * reference the specified LO... as is, they remain "open". */ PG_RETURN_INT32(inv_drop(lobjId)); } @@ -367,7 +366,7 @@ lo_import(PG_FUNCTION_ARGS) int nbytes, tmp; char buf[BUFSIZE]; - char fnamebuf[FNAME_BUFSIZE]; + char fnamebuf[MAXPGPATH]; LargeObjectDesc *lobj; Oid lobjOid; @@ -382,8 +381,8 @@ lo_import(PG_FUNCTION_ARGS) * open the file to be read in */ nbytes = VARSIZE(filename) - VARHDRSZ; - if (nbytes >= FNAME_BUFSIZE) - nbytes = FNAME_BUFSIZE-1; + if (nbytes >= MAXPGPATH) + nbytes = MAXPGPATH-1; memcpy(fnamebuf, VARDATA(filename), nbytes); fnamebuf[nbytes] = '\0'; fd = PathNameOpenFile(fnamebuf, O_RDONLY | PG_BINARY, 0666); @@ -398,12 +397,7 @@ lo_import(PG_FUNCTION_ARGS) if (lobj == NULL) elog(ERROR, "lo_import: can't create inv object for \"%s\"", fnamebuf); - - /* - * the oid for the large object is just the oid of the relation - * XInv??? which contains the data. - */ - lobjOid = RelationGetRelid(lobj->heap_r); + lobjOid = lobj->id; /* * read in from the Unix file and write to the inversion file @@ -411,7 +405,7 @@ lo_import(PG_FUNCTION_ARGS) while ((nbytes = FileRead(fd, buf, BUFSIZE)) > 0) { tmp = inv_write(lobj, buf, nbytes); - if (tmp < nbytes) + if (tmp != nbytes) elog(ERROR, "lo_import: error while reading \"%s\"", fnamebuf); } @@ -435,7 +429,7 @@ lo_export(PG_FUNCTION_ARGS) int nbytes, tmp; char buf[BUFSIZE]; - char fnamebuf[FNAME_BUFSIZE]; + char fnamebuf[MAXPGPATH]; LargeObjectDesc *lobj; mode_t oumask; @@ -461,8 +455,8 @@ lo_export(PG_FUNCTION_ARGS) * world-writable export files doesn't seem wise. */ nbytes = VARSIZE(filename) - VARHDRSZ; - if (nbytes >= FNAME_BUFSIZE) - nbytes = FNAME_BUFSIZE-1; + if (nbytes >= MAXPGPATH) + nbytes = MAXPGPATH-1; memcpy(fnamebuf, VARDATA(filename), nbytes); fnamebuf[nbytes] = '\0'; oumask = umask((mode_t) 0022); @@ -473,12 +467,12 @@ lo_export(PG_FUNCTION_ARGS) fnamebuf); /* - * read in from the Unix file and write to the inversion file + * read in from the inversion file and write to the Unix file */ while ((nbytes = inv_read(lobj, buf, BUFSIZE)) > 0) { tmp = FileWrite(fd, buf, nbytes); - if (tmp < nbytes) + if (tmp != nbytes) elog(ERROR, "lo_export: error while writing \"%s\"", fnamebuf); } @@ -513,7 +507,7 @@ lo_commit(bool isCommit) if (cookies[i] != NULL) { if (isCommit) - inv_cleanindex(cookies[i]); + inv_close(cookies[i]); cookies[i] = NULL; } } diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c index 5b7df0562a..607c4861dc 100644 --- a/src/backend/storage/large_object/inv_api.c +++ b/src/backend/storage/large_object/inv_api.c @@ -9,77 +9,51 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.78 2000/10/22 05:27:15 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.79 2000/10/24 01:38:29 tgl Exp $ * *------------------------------------------------------------------------- */ +#include "postgres.h" + +#include #include #include #include -#include "postgres.h" - #include "access/genam.h" #include "access/heapam.h" #include "access/nbtree.h" +#include "access/htup.h" #include "catalog/catalog.h" +#include "catalog/catname.h" #include "catalog/heap.h" #include "catalog/index.h" +#include "catalog/indexing.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_largeobject.h" #include "catalog/pg_type.h" #include "libpq/libpq-fs.h" #include "miscadmin.h" #include "storage/large_object.h" #include "storage/smgr.h" #include "utils/fmgroids.h" -#include "utils/relcache.h" +#include "utils/builtins.h" -/* - * Warning, Will Robinson... In order to pack data into an inversion - * file as densely as possible, we violate the class abstraction here. - * When we're appending a new tuple to the end of the table, we check - * the last page to see how much data we can put on it. If it's more - * than IMINBLK, we write enough to fill the page. This limits external - * fragmentation. In no case can we write more than IMAXBLK, since - * the 8K postgres page size less overhead leaves only this much space - * for data. - */ -/* - * In order to prevent buffer leak on transaction commit, large object - * scan index handling has been modified. Indexes are persistant inside - * a transaction but may be closed between two calls to this API (when - * transaction is committed while object is opened, or when no - * transaction is active). Scan indexes are thus now reinitialized using - * the object current offset. [PA] - * - * Some cleanup has been also done for non freed memory. - * - * For subsequent notes, [PA] is Pascal André - */ - -#define IFREESPC(p) (PageGetFreeSpace(p) - \ - MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \ - MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \ - sizeof(double)) -#define IMAXBLK 8092 -#define IMINBLK 512 - -/* non-export function prototypes */ -static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer, - Page page, char *dbuf, int nwrite); -static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer); -static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes); -static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes, - HeapTuple tuple, Buffer buffer); -static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple); -static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln); +static int32 +getbytealen(bytea *data) +{ + Assert(! VARATT_IS_EXTENDED(data)); + if (VARSIZE(data) < VARHDRSZ) + elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error."); + return (VARSIZE(data) - VARHDRSZ); +} /* * inv_create -- create a new large object. * * Arguments: - * flags -- was archive, smgr + * flags * * Returns: * large object descriptor, appropriately filled in. @@ -87,168 +61,80 @@ static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln); LargeObjectDesc * inv_create(int flags) { - LargeObjectDesc *retval; Oid file_oid; - Relation r; - Relation indr; - TupleDesc tupdesc; - IndexInfo *indexInfo; - Oid classObjectId[1]; - char objname[NAMEDATALEN]; - char indname[NAMEDATALEN]; + LargeObjectDesc *retval; /* - * add one here since the pg_class tuple created will have the next - * oid and we want to have the relation name to correspond to the - * tuple OID + * Allocate an OID to be the LO's identifier. */ - file_oid = newoid() + 1; + file_oid = newoid(); - /* come up with some table names */ - sprintf(objname, "xinv%u", file_oid); - sprintf(indname, "xinx%u", file_oid); - - if (RelnameFindRelid(objname) != InvalidOid) - elog(ERROR, - "internal error: %s already exists -- cannot create large obj", - objname); - if (RelnameFindRelid(indname) != InvalidOid) - elog(ERROR, - "internal error: %s already exists -- cannot create large obj", - indname); - - /* this is pretty painful... want a tuple descriptor */ - tupdesc = CreateTemplateTupleDesc(2); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, - "olastbye", - INT4OID, - -1, 0, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, - "odata", - BYTEAOID, - -1, 0, false); + /* Check for duplicate (shouldn't happen) */ + if (LargeObjectExists(file_oid)) + elog(ERROR, "inv_create: large object %u already exists. This is internal error.", file_oid); /* - * First create the table to hold the inversion large object. It will - * be located on whatever storage manager the user requested. + * Create the LO by writing an empty first page for it in pg_largeobject */ + (void) LargeObjectCreate(file_oid); - heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT, - false, false); - - /* make the relation visible in this transaction */ + /* + * Advance command counter so that new tuple will be seen by later + * large-object operations in this transaction. + */ CommandCounterIncrement(); - /*-------------------- - * We hold AccessShareLock on any large object we have open - * by inv_create or inv_open; it is released by inv_close. - * Note this will not conflict with ExclusiveLock or ShareLock - * that we acquire when actually reading/writing; it just prevents - * deletion of the large object while we have it open. - *-------------------- - */ - r = heap_openr(objname, AccessShareLock); - /* - * Now create a btree index on the relation's olastbyte attribute to - * make seeks go faster. + * Prepare LargeObjectDesc data structure for accessing LO */ - indexInfo = makeNode(IndexInfo); - indexInfo->ii_NumIndexAttrs = 1; - indexInfo->ii_NumKeyAttrs = 1; - indexInfo->ii_KeyAttrNumbers[0] = 1; - indexInfo->ii_Predicate = NULL; - indexInfo->ii_FuncOid = InvalidOid; - indexInfo->ii_Unique = false; - - classObjectId[0] = INT4_OPS_OID; - - index_create(objname, indname, indexInfo, - BTREE_AM_OID, classObjectId, - false, false, false); - - /* make the index visible in this transaction */ - CommandCounterIncrement(); - - indr = index_openr(indname); - - if (!RelationIsValid(indr)) - { - elog(ERROR, "cannot create index for large obj on %s under inversion", - DatumGetCString(DirectFunctionCall1(smgrout, - Int16GetDatum(DEFAULT_SMGR)))); - } - retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc)); - retval->heap_r = r; - retval->index_r = indr; - retval->iscan = (IndexScanDesc) NULL; - retval->hdesc = RelationGetDescr(r); - retval->idesc = RelationGetDescr(indr); - retval->offset = retval->lowbyte = retval->highbyte = 0; - ItemPointerSetInvalid(&(retval->htid)); - retval->flags = 0; + retval->id = file_oid; + retval->offset = 0; - if (flags & INV_WRITE) - { - LockRelation(r, ExclusiveLock); + if (flags & INV_WRITE) { retval->flags = IFS_WRLOCK | IFS_RDLOCK; - } - else if (flags & INV_READ) - { - LockRelation(r, ShareLock); + retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock); + } else if (flags & INV_READ) { retval->flags = IFS_RDLOCK; - } - retval->flags |= IFS_ATEOF; /* since we know the object is empty */ + retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock); + } else + elog(ERROR, "inv_create: invalid flags: %d", flags); + + retval->index_r = index_openr(LargeObjectLOidPNIndex); return retval; } +/* + * inv_open -- access an existing large object. + * + * Returns: + * large object descriptor, appropriately filled in. + */ LargeObjectDesc * inv_open(Oid lobjId, int flags) { LargeObjectDesc *retval; - Relation r; - char *indname; - Relation indrel; - - r = heap_open(lobjId, AccessShareLock); - - indname = pstrdup(RelationGetRelationName(r)); - - /* - * hack hack hack... we know that the fourth character of the - * relation name is a 'v', and that the fourth character of the index - * name is an 'x', and that they're otherwise identical. - */ - indname[3] = 'x'; - indrel = index_openr(indname); - - if (!RelationIsValid(indrel)) - return (LargeObjectDesc *) NULL; + if (! LargeObjectExists(lobjId)) + elog(ERROR, "inv_open: large object %u not found", lobjId); + retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc)); - retval->heap_r = r; - retval->index_r = indrel; - retval->iscan = (IndexScanDesc) NULL; - retval->hdesc = RelationGetDescr(r); - retval->idesc = RelationGetDescr(indrel); - retval->offset = retval->lowbyte = retval->highbyte = 0; - ItemPointerSetInvalid(&(retval->htid)); - retval->flags = 0; + retval->id = lobjId; + retval->offset = 0; - if (flags & INV_WRITE) - { - LockRelation(r, ExclusiveLock); + if (flags & INV_WRITE) { retval->flags = IFS_WRLOCK | IFS_RDLOCK; - } - else if (flags & INV_READ) - { - LockRelation(r, ShareLock); + retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock); + } else if (flags & INV_READ) { retval->flags = IFS_RDLOCK; - } + retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock); + } else + elog(ERROR, "inv_open: invalid flags: %d", flags); + + retval->index_r = index_openr(LargeObjectLOidPNIndex); return retval; } @@ -261,174 +147,129 @@ inv_close(LargeObjectDesc *obj_desc) { Assert(PointerIsValid(obj_desc)); - if (obj_desc->iscan != (IndexScanDesc) NULL) - { - index_endscan(obj_desc->iscan); - obj_desc->iscan = NULL; - } - + if (obj_desc->flags & IFS_WRLOCK) + heap_close(obj_desc->heap_r, RowExclusiveLock); + else if (obj_desc->flags & IFS_RDLOCK) + heap_close(obj_desc->heap_r, AccessShareLock); index_close(obj_desc->index_r); - heap_close(obj_desc->heap_r, AccessShareLock); pfree(obj_desc); } /* - * Destroys an existing large object, and frees its associated pointers. + * Destroys an existing large object (not to be confused with a descriptor!) * * returns -1 if failed */ int inv_drop(Oid lobjId) { - Relation r; - - r = RelationIdGetRelation(lobjId); - if (!RelationIsValid(r)) - return -1; - - if (r->rd_rel->relkind != RELKIND_LOBJECT) - { - /* drop relcache refcount from RelationIdGetRelation */ - RelationDecrementReferenceCount(r); - return -1; - } + LargeObjectDrop(lobjId); /* - * Since heap_drop_with_catalog will destroy the relcache entry, - * there's no need to drop the refcount in this path. + * Advance command counter so that tuple removal will be seen by later + * large-object operations in this transaction. */ - heap_drop_with_catalog(RelationGetRelationName(r), false); + CommandCounterIncrement(); + return 1; } /* - * inv_stat() -- do a stat on an inversion file. + * Determine size of a large object * - * For the time being, this is an insanely expensive operation. In - * order to find the size of the file, we seek to the last block in - * it and compute the size from that. We scan pg_class to determine - * the file's owner and create time. We don't maintain mod time or - * access time, yet. - * - * These fields aren't stored in a table anywhere because they're - * updated so frequently, and postgres only appends tuples at the - * end of relations. Once clustering works, we should fix this. + * NOTE: LOs can contain gaps, just like Unix files. We actually return + * the offset of the last byte + 1. */ -#ifdef NOT_USED - -struct pgstat -{ /* just the fields we need from stat - * structure */ - int st_ino; - int st_mode; - unsigned int st_size; - unsigned int st_sizehigh; /* high order bits */ -/* 2^64 == 1.8 x 10^20 bytes */ - int st_uid; - int st_atime_s; /* just the seconds */ - int st_mtime_s; /* since SysV and the new BSD both have */ - int st_ctime_s; /* usec fields.. */ -}; - -int -inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf) +static uint32 +inv_getsize(LargeObjectDesc *obj_desc) { + bool found = false; + uint32 lastbyte = 0; + uint32 thislastbyte; + ScanKeyData skey[1]; + IndexScanDesc sd; + RetrieveIndexResult indexRes; + HeapTupleData tuple; + Buffer buffer; + Form_pg_largeobject data; + bytea *datafield; + bool pfreeit; + Assert(PointerIsValid(obj_desc)); - Assert(stbuf != NULL); - /* need read lock for stat */ - if (!(obj_desc->flags & IFS_RDLOCK)) + ScanKeyEntryInitialize(&skey[0], + (bits16) 0x0, + (AttrNumber) 1, + (RegProcedure) F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + sd = index_beginscan(obj_desc->index_r, true, 1, skey); + + tuple.t_datamcxt = CurrentMemoryContext; + tuple.t_data = NULL; + + while ((indexRes = index_getnext(sd, ForwardScanDirection))) { - LockRelation(obj_desc->heap_r, ShareLock); - obj_desc->flags |= IFS_RDLOCK; + tuple.t_self = indexRes->heap_iptr; + heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer); + pfree(indexRes); + if (tuple.t_data == NULL) + continue; + found = true; + data = (Form_pg_largeobject) GETSTRUCT(&tuple); + datafield = &(data->data); + pfreeit = false; + if (VARATT_IS_EXTENDED(datafield)) + { + datafield = (bytea *) + heap_tuple_untoast_attr((varattrib *) datafield); + pfreeit = true; + } + thislastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield); + if (thislastbyte > lastbyte) + lastbyte = thislastbyte; + if (pfreeit) + pfree(datafield); + ReleaseBuffer(buffer); } + + index_endscan(sd); - stbuf->st_ino = RelationGetRelid(obj_desc->heap_r); -#if 1 - stbuf->st_mode = (S_IFREG | 0666); /* IFREG|rw-rw-rw- */ -#else - stbuf->st_mode = 100666; /* IFREG|rw-rw-rw- */ -#endif - stbuf->st_size = _inv_getsize(obj_desc->heap_r, - obj_desc->hdesc, - obj_desc->index_r); - - stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner; - - /* we have no good way of computing access times right now */ - stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0; - - return 0; + if (!found) + elog(ERROR, "inv_getsize: large object %u not found", obj_desc->id); + return lastbyte; } -#endif - int inv_seek(LargeObjectDesc *obj_desc, int offset, int whence) { - int oldOffset; - Datum d; - ScanKeyData skey; - Assert(PointerIsValid(obj_desc)); - if (whence == SEEK_CUR) + switch (whence) { - offset += obj_desc->offset; /* calculate absolute position */ + case SEEK_SET: + if (offset < 0) + elog(ERROR, "inv_seek: invalid offset: %d", offset); + obj_desc->offset = offset; + break; + case SEEK_CUR: + if ((obj_desc->offset + offset) < 0) + elog(ERROR, "inv_seek: invalid offset: %d", offset); + obj_desc->offset += offset; + break; + case SEEK_END: + { + uint32 size = inv_getsize(obj_desc); + if (offset < 0 || ((uint32) offset) > size) + elog(ERROR, "inv_seek: invalid offset"); + obj_desc->offset = size - offset; + } + break; + default: + elog(ERROR, "inv_seek: invalid whence: %d", whence); } - else if (whence == SEEK_END) - { - /* need read lock for getsize */ - if (!(obj_desc->flags & IFS_RDLOCK)) - { - LockRelation(obj_desc->heap_r, ShareLock); - obj_desc->flags |= IFS_RDLOCK; - } - offset += _inv_getsize(obj_desc->heap_r, - obj_desc->hdesc, - obj_desc->index_r); - } - /* now we can assume that the operation is SEEK_SET */ - - /* - * Whenever we do a seek, we turn off the EOF flag bit to force - * ourselves to check for real on the next read. - */ - - obj_desc->flags &= ~IFS_ATEOF; - oldOffset = obj_desc->offset; - obj_desc->offset = offset; - - /* try to avoid doing any work, if we can manage it */ - if (offset >= obj_desc->lowbyte - && offset <= obj_desc->highbyte - && oldOffset <= obj_desc->highbyte - && obj_desc->iscan != (IndexScanDesc) NULL) - return offset; - - /* - * To do a seek on an inversion file, we start an index scan that will - * bring us to the right place. Each tuple in an inversion file - * stores the offset of the last byte that appears on it, and we have - * an index on this. - */ - if (obj_desc->iscan != (IndexScanDesc) NULL) - { - d = Int32GetDatum(offset); - btmovescan(obj_desc->iscan, d); - } - else - { - ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE, - Int32GetDatum(offset)); - - obj_desc->iscan = index_beginscan(obj_desc->index_r, - (bool) 0, (uint16) 1, - &skey); - } - - return offset; + return obj_desc->offset; } int @@ -442,862 +283,306 @@ inv_tell(LargeObjectDesc *obj_desc) int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes) { - HeapTupleData tuple; - int nread; - int off; - int ncopy; - Datum d; - struct varlena *fsblock; - bool isNull; + int nread = 0; + int n; + int off; + int len; + int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE); + uint32 pageoff; + ScanKeyData skey[2]; + IndexScanDesc sd; + RetrieveIndexResult indexRes; + HeapTupleData tuple; + Buffer buffer; + Form_pg_largeobject data; + bytea *datafield; + bool pfreeit; Assert(PointerIsValid(obj_desc)); Assert(buf != NULL); - /* if we're already at EOF, we don't need to do any work here */ - if (obj_desc->flags & IFS_ATEOF) + if (nbytes <= 0) return 0; - /* make sure we obey two-phase locking */ - if (!(obj_desc->flags & IFS_RDLOCK)) + ScanKeyEntryInitialize(&skey[0], + (bits16) 0x0, + (AttrNumber) 1, + (RegProcedure) F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + ScanKeyEntryInitialize(&skey[1], + (bits16) 0x0, + (AttrNumber) 2, + (RegProcedure) F_INT4GE, + Int32GetDatum(pageno)); + + sd = index_beginscan(obj_desc->index_r, false, 2, skey); + + tuple.t_datamcxt = CurrentMemoryContext; + tuple.t_data = NULL; + + while ((indexRes = index_getnext(sd, ForwardScanDirection))) { - LockRelation(obj_desc->heap_r, ShareLock); - obj_desc->flags |= IFS_RDLOCK; - } - - nread = 0; - - /* fetch a block at a time */ - while (nread < nbytes) - { - Buffer buffer; - - /* fetch an inversion file system block */ - inv_fetchtup(obj_desc, &tuple, &buffer); + tuple.t_self = indexRes->heap_iptr; + heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer); + pfree(indexRes); if (tuple.t_data == NULL) - { - obj_desc->flags |= IFS_ATEOF; - break; - } - - /* copy the data from this block into the buffer */ - d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull); - fsblock = (struct varlena *) DatumGetPointer(d); - ReleaseBuffer(buffer); + continue; + + data = (Form_pg_largeobject) GETSTRUCT(&tuple); /* - * If block starts beyond current seek point, then we are looking - * at a "hole" (unwritten area) in the object. Return zeroes for - * the "hole". + * We assume the indexscan will deliver pages in order. However, + * there may be missing pages if the LO contains unwritten "holes". + * We want missing sections to read out as zeroes. */ - if (obj_desc->offset < obj_desc->lowbyte) + pageoff = ((uint32) data->pageno) * LOBLKSIZE; + if (pageoff > obj_desc->offset) { - int nzeroes = obj_desc->lowbyte - obj_desc->offset; - - if (nzeroes > (nbytes - nread)) - nzeroes = (nbytes - nread); - MemSet(buf, 0, nzeroes); - buf += nzeroes; - nread += nzeroes; - obj_desc->offset += nzeroes; - if (nread >= nbytes) - break; + n = pageoff - obj_desc->offset; + n = (n <= (nbytes - nread)) ? n : (nbytes - nread); + MemSet(buf + nread, 0, n); + nread += n; + obj_desc->offset += n; } - off = obj_desc->offset - obj_desc->lowbyte; - ncopy = obj_desc->highbyte - obj_desc->offset + 1; - if (ncopy > (nbytes - nread)) - ncopy = (nbytes - nread); - memmove(buf, &(fsblock->vl_dat[off]), ncopy); + if (nread < nbytes) + { + Assert(obj_desc->offset >= pageoff); + off = (int) (obj_desc->offset - pageoff); + Assert(off >= 0 && off < LOBLKSIZE); - /* move pointers past the amount we just read */ - buf += ncopy; - nread += ncopy; - obj_desc->offset += ncopy; + datafield = &(data->data); + pfreeit = false; + if (VARATT_IS_EXTENDED(datafield)) + { + datafield = (bytea *) + heap_tuple_untoast_attr((varattrib *) datafield); + pfreeit = true; + } + len = getbytealen(datafield); + if (len > off) + { + n = len - off; + n = (n <= (nbytes - nread)) ? n : (nbytes - nread); + memcpy(buf + nread, VARDATA(datafield) + off, n); + nread += n; + obj_desc->offset += n; + } + if (pfreeit) + pfree(datafield); + } + + ReleaseBuffer(buffer); + if (nread >= nbytes) + break; } + index_endscan(sd); + return nread; } int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) { - HeapTupleData tuple; - int nwritten; - int tuplen; + int nwritten = 0; + int n; + int off; + int len; + int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE); + ScanKeyData skey[2]; + IndexScanDesc sd; + RetrieveIndexResult indexRes; + HeapTupleData oldtuple; + Buffer buffer; + Form_pg_largeobject olddata; + bool neednextpage; + bytea *datafield; + bool pfreeit; + char workbuf[LOBLKSIZE + VARHDRSZ]; + char *workb = VARATT_DATA(workbuf); + HeapTuple newtup; + Datum values[Natts_pg_largeobject]; + char nulls[Natts_pg_largeobject]; + char replace[Natts_pg_largeobject]; + bool write_indices; + Relation idescs[Num_pg_largeobject_indices]; Assert(PointerIsValid(obj_desc)); Assert(buf != NULL); - /* - * Make sure we obey two-phase locking. A write lock entitles you to - * read the relation, as well. - */ + if (nbytes <= 0) + return 0; - if (!(obj_desc->flags & IFS_WRLOCK)) - { - LockRelation(obj_desc->heap_r, ExclusiveLock); - obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK); - } + write_indices = ! IsIgnoringSystemIndexes(); + if (write_indices) + CatalogOpenIndices(Num_pg_largeobject_indices, + Name_pg_largeobject_indices, + idescs); - nwritten = 0; + ScanKeyEntryInitialize(&skey[0], + (bits16) 0x0, + (AttrNumber) 1, + (RegProcedure) F_OIDEQ, + ObjectIdGetDatum(obj_desc->id)); + + ScanKeyEntryInitialize(&skey[1], + (bits16) 0x0, + (AttrNumber) 2, + (RegProcedure) F_INT4GE, + Int32GetDatum(pageno)); + + sd = index_beginscan(obj_desc->index_r, false, 2, skey); + + oldtuple.t_datamcxt = CurrentMemoryContext; + oldtuple.t_data = NULL; + olddata = NULL; + buffer = InvalidBuffer; + neednextpage = true; - /* write a block at a time */ while (nwritten < nbytes) { - Buffer buffer; - /* - * Fetch the current inversion file system block. We can skip - * the work if we already know we are at EOF. + * If possible, get next pre-existing page of the LO. We assume + * the indexscan will deliver these in order --- but there may be + * holes. */ - - if (obj_desc->flags & IFS_ATEOF) - tuple.t_data = NULL; - else - inv_fetchtup(obj_desc, &tuple, &buffer); - - /* either append or replace a block, as required */ - if (tuple.t_data == NULL) - tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten); - else + if (neednextpage) { - if (obj_desc->offset > obj_desc->highbyte) + while ((indexRes = index_getnext(sd, ForwardScanDirection))) { - tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten); - ReleaseBuffer(buffer); + oldtuple.t_self = indexRes->heap_iptr; + heap_fetch(obj_desc->heap_r, SnapshotNow, &oldtuple, &buffer); + pfree(indexRes); + if (oldtuple.t_data != NULL) + { + olddata = (Form_pg_largeobject) GETSTRUCT(&oldtuple); + Assert(olddata->pageno >= pageno); + break; + } } - else - tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer); - - /* - * inv_wrold() has already issued WriteBuffer() which has - * decremented local reference counter (LocalRefCount). So we - * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4 - */ + neednextpage = false; } - - /* move pointers past the amount we just wrote */ - buf += tuplen; - nwritten += tuplen; - obj_desc->offset += tuplen; - } - - /* that's it */ - return nwritten; -} - -/* - * inv_cleanindex - * Clean opened indexes for large objects, and clears current result. - * This is necessary on transaction commit in order to prevent buffer - * leak. - * This function must be called for each opened large object. - * [ PA, 7/17/98 ] - */ -void -inv_cleanindex(LargeObjectDesc *obj_desc) -{ - Assert(PointerIsValid(obj_desc)); - - if (obj_desc->iscan == (IndexScanDesc) NULL) - return; - - index_endscan(obj_desc->iscan); - obj_desc->iscan = (IndexScanDesc) NULL; - - ItemPointerSetInvalid(&(obj_desc->htid)); -} - -/* - * inv_fetchtup -- Fetch an inversion file system block. - * - * This routine finds the file system block containing the offset - * recorded in the obj_desc structure. Later, we need to think about - * the effects of non-functional updates (can you rewrite the same - * block twice in a single transaction?), but for now, we won't bother. - * - * Parameters: - * obj_desc -- the object descriptor. - * bufP -- pointer to a buffer in the buffer cache; caller - * must free this. - * - * Returns: - * A heap tuple containing the desired block, or NULL if no - * such tuple exists. - */ -static void -inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer) -{ - RetrieveIndexResult res; - Datum d; - int firstbyte, - lastbyte; - struct varlena *fsblock; - bool isNull; - - /* - * If we've exhausted the current block, we need to get the next one. - * When we support time travel and non-functional updates, we will - * need to loop over the blocks, rather than just have an 'if', in - * order to find the one we're really interested in. - */ - - if (obj_desc->offset > obj_desc->highbyte - || obj_desc->offset < obj_desc->lowbyte - || !ItemPointerIsValid(&(obj_desc->htid))) - { - ScanKeyData skey; - - ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE, - Int32GetDatum(obj_desc->offset)); - - /* initialize scan key if not done */ - if (obj_desc->iscan == (IndexScanDesc) NULL) + /* + * If we have a pre-existing page, see if it is the page we want + * to write, or a later one. + */ + if (olddata != NULL && olddata->pageno == pageno) { - /* - * As scan index may be prematurely closed (on commit), we - * must use object current offset (was 0) to reinitialize the - * entry [ PA ]. + * Update an existing page with fresh data. + * + * First, load old data into workbuf */ - obj_desc->iscan = index_beginscan(obj_desc->index_r, - (bool) 0, (uint16) 1, - &skey); - } - else - index_rescan(obj_desc->iscan, false, &skey); - - do - { - res = index_getnext(obj_desc->iscan, ForwardScanDirection); - - if (res == (RetrieveIndexResult) NULL) + datafield = &(olddata->data); + pfreeit = false; + if (VARATT_IS_EXTENDED(datafield)) { - ItemPointerSetInvalid(&(obj_desc->htid)); - tuple->t_datamcxt = NULL; - tuple->t_data = NULL; - return; + datafield = (bytea *) + heap_tuple_untoast_attr((varattrib *) datafield); + pfreeit = true; } - + len = getbytealen(datafield); + Assert(len <= LOBLKSIZE); + memcpy(workb, VARDATA(datafield), len); + if (pfreeit) + pfree(datafield); /* - * For time travel, we need to use the actual time qual here, - * rather that NowTimeQual. We currently have no way to pass - * a time qual in. - * - * This is now valid for snapshot !!! And should be fixed in some - * way... - vadim 07/28/98 - * + * Fill any hole + */ + off = (int) (obj_desc->offset % LOBLKSIZE); + if (off > len) + MemSet(workb + len, 0, off - len); + /* + * Insert appropriate portion of new data + */ + n = LOBLKSIZE - off; + n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten); + memcpy(workb + off, buf + nwritten, n); + nwritten += n; + obj_desc->offset += n; + off += n; + /* compute valid length of new page */ + len = (len >= off) ? len : off; + VARATT_SIZEP(workbuf) = len + VARHDRSZ; + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, ' ', sizeof(nulls)); + memset(replace, ' ', sizeof(replace)); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf); + replace[Anum_pg_largeobject_data - 1] = 'r'; + newtup = heap_modifytuple(&oldtuple, obj_desc->heap_r, + values, nulls, replace); + heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL); + if (write_indices) + CatalogIndexInsert(idescs, Num_pg_largeobject_indices, + obj_desc->heap_r, newtup); + heap_freetuple(newtup); + /* + * We're done with this old page. */ - tuple->t_self = res->heap_iptr; - heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer); - pfree(res); - } while (tuple->t_data == NULL); - - /* remember this tid -- we may need it for later reads/writes */ - ItemPointerCopy(&(tuple->t_self), &obj_desc->htid); - } - else - { - tuple->t_self = obj_desc->htid; - heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer); - if (tuple->t_data == NULL) - elog(ERROR, "inv_fetchtup: heap_fetch failed"); - } - - /* - * By here, we have the heap tuple we're interested in. We cache the - * upper and lower bounds for this block in the object descriptor and - * return the tuple. - */ - - d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull); - lastbyte = (int32) DatumGetInt32(d); - d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull); - fsblock = (struct varlena *) DatumGetPointer(d); - - /* - * order of + and - is important -- these are unsigned quantites near - * 0 - */ - firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len; - - obj_desc->lowbyte = firstbyte; - obj_desc->highbyte = lastbyte; - - return; -} - -/* - * inv_wrnew() -- append a new filesystem block tuple to the inversion - * file. - * - * In response to an inv_write, we append one or more file system - * blocks to the class containing the large object. We violate the - * class abstraction here in order to pack things as densely as we - * are able. We examine the last page in the relation, and write - * just enough to fill it, assuming that it has above a certain - * threshold of space available. If the space available is less than - * the threshold, we allocate a new page by writing a big tuple. - * - * By the time we get here, we know all the parameters passed in - * are valid, and that we hold the appropriate lock on the heap - * relation. - * - * Parameters: - * obj_desc: large object descriptor for which to append block. - * buf: buffer containing data to write. - * nbytes: amount to write - * - * Returns: - * number of bytes actually written to the new tuple. - */ -static int -inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes) -{ - Relation hr; - HeapTuple ntup; - Buffer buffer; - Page page; - int nblocks; - int nwritten; - - hr = obj_desc->heap_r; - - /* - * Get the last block in the relation. If there's no data in the - * relation at all, then we just get a new block. Otherwise, we check - * the last block to see whether it has room to accept some or all of - * the data that the user wants to write. If it doesn't, then we - * allocate a new block. - */ - - nblocks = RelationGetNumberOfBlocks(hr); - - if (nblocks > 0) - { - buffer = ReadBuffer(hr, nblocks - 1); - page = BufferGetPage(buffer); - } - else - { - buffer = ReadBuffer(hr, P_NEW); - page = BufferGetPage(buffer); - PageInit(page, BufferGetPageSize(buffer), 0); - } - - /* - * If the last page is too small to hold all the data, and it's too - * small to hold IMINBLK, then we allocate a new page. If it will - * hold at least IMINBLK, but less than all the data requested, then - * we write IMINBLK here. The caller is responsible for noticing that - * less than the requested number of bytes were written, and calling - * this routine again. - */ - - nwritten = IFREESPC(page); - if (nwritten < nbytes) - { - if (nwritten < IMINBLK) - { ReleaseBuffer(buffer); - buffer = ReadBuffer(hr, P_NEW); - page = BufferGetPage(buffer); - PageInit(page, BufferGetPageSize(buffer), 0); - if (nbytes > IMAXBLK) - nwritten = IMAXBLK; - else - nwritten = nbytes; - } - } - else - nwritten = nbytes; - - /* - * Insert a new file system block tuple, index it, and write it out. - */ - - ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten); - inv_indextup(obj_desc, ntup); - heap_freetuple(ntup); - - /* new tuple is inserted */ - WriteBuffer(buffer); - - return nwritten; -} - -static int -inv_wrold(LargeObjectDesc *obj_desc, - char *dbuf, - int nbytes, - HeapTuple tuple, - Buffer buffer) -{ - Relation hr; - HeapTuple ntup; - Buffer newbuf; - Page page; - Page newpage; - int tupbytes; - Datum d; - struct varlena *fsblock; - int nwritten, - nblocks, - freespc; - bool isNull; - int keep_offset; - RetrieveIndexResult res; - - /* - * Since we're using a no-overwrite storage manager, the way we - * overwrite blocks is to mark the old block invalid and append a new - * block. First mark the old block invalid. This violates the tuple - * abstraction. - */ - - TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax)); - tuple->t_data->t_cmax = GetCurrentCommandId(); - tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); - - /* - * If we're overwriting the entire block, we're lucky. All we need to - * do is to insert a new block. - */ - - if (obj_desc->offset == obj_desc->lowbyte - && obj_desc->lowbyte + nbytes >= obj_desc->highbyte) - { - WriteBuffer(buffer); - return inv_wrnew(obj_desc, dbuf, nbytes); - } - - /* - * By here, we need to overwrite part of the data in the current - * tuple. In order to reduce the degree to which we fragment blocks, - * we guarantee that no block will be broken up due to an overwrite. - * This means that we need to allocate a tuple on a new page, if - * there's not room for the replacement on this one. - */ - - newbuf = buffer; - page = BufferGetPage(buffer); - newpage = BufferGetPage(newbuf); - hr = obj_desc->heap_r; - freespc = IFREESPC(page); - d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull); - fsblock = (struct varlena *) DatumGetPointer(d); - tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len); - - if (freespc < tupbytes) - { - - /* - * First see if there's enough space on the last page of the table - * to put this tuple. - */ - - nblocks = RelationGetNumberOfBlocks(hr); - - if (nblocks > 0) - { - newbuf = ReadBuffer(hr, nblocks - 1); - newpage = BufferGetPage(newbuf); + oldtuple.t_datamcxt = CurrentMemoryContext; + oldtuple.t_data = NULL; + olddata = NULL; + neednextpage = true; } else { - newbuf = ReadBuffer(hr, P_NEW); - newpage = BufferGetPage(newbuf); - PageInit(newpage, BufferGetPageSize(newbuf), 0); - } - - freespc = IFREESPC(newpage); - - /* - * If there's no room on the last page, allocate a new last page - * for the table, and put it there. - */ - - if (freespc < tupbytes) - { - ReleaseBuffer(newbuf); - newbuf = ReadBuffer(hr, P_NEW); - newpage = BufferGetPage(newbuf); - PageInit(newpage, BufferGetPageSize(newbuf), 0); + /* + * Write a brand new page. + * + * First, fill any hole + */ + off = (int) (obj_desc->offset % LOBLKSIZE); + if (off > 0) + MemSet(workb, 0, off); + /* + * Insert appropriate portion of new data + */ + n = LOBLKSIZE - off; + n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten); + memcpy(workb + off, buf + nwritten, n); + nwritten += n; + obj_desc->offset += n; + /* compute valid length of new page */ + len = off + n; + VARATT_SIZEP(workbuf) = len + VARHDRSZ; + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, ' ', sizeof(nulls)); + values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id); + values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf); + newtup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls); + heap_insert(obj_desc->heap_r, newtup); + if (write_indices) + CatalogIndexInsert(idescs, Num_pg_largeobject_indices, + obj_desc->heap_r, newtup); + heap_freetuple(newtup); } + pageno++; } - nwritten = nbytes; - if (nwritten > obj_desc->highbyte - obj_desc->offset + 1) - nwritten = obj_desc->highbyte - obj_desc->offset + 1; - memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte), - dbuf, nwritten); + if (olddata != NULL) + ReleaseBuffer(buffer); + + index_endscan(sd); + + if (write_indices) + CatalogCloseIndices(Num_pg_largeobject_indices, idescs); /* - * we are rewriting the entire old block, therefore we reset offset to - * the lowbyte of the original block before jumping into - * inv_newtuple() + * Advance command counter so that my tuple updates will be seen by later + * large-object operations in this transaction. */ - keep_offset = obj_desc->offset; - obj_desc->offset = obj_desc->lowbyte; - ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock), - tupbytes); - /* after we are done, we restore to the true offset */ - obj_desc->offset = keep_offset; + CommandCounterIncrement(); - /* - * By here, we have a page (newpage) that's guaranteed to have enough - * space on it to put the new tuple. Call inv_newtuple to do the - * work. Passing NULL as a buffer to inv_newtuple() keeps it from - * copying any data into the new tuple. When it returns, the tuple is - * ready to receive data from the old tuple and the user's data - * buffer. - */ -/* - ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes); - dptr = ((char *) ntup) + ntup->t_hoff - - (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) + - sizeof(int4) - + sizeof(fsblock->vl_len); - - if (obj_desc->offset > obj_desc->lowbyte) { - memmove(dptr, - &(fsblock->vl_dat[0]), - obj_desc->offset - obj_desc->lowbyte); - dptr += obj_desc->offset - obj_desc->lowbyte; - } - - - nwritten = nbytes; - if (nwritten > obj_desc->highbyte - obj_desc->offset + 1) - nwritten = obj_desc->highbyte - obj_desc->offset + 1; - - memmove(dptr, dbuf, nwritten); - dptr += nwritten; - - if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) { -*/ -/* - loc = (obj_desc->highbyte - obj_desc->offset) - + nwritten; - sz = obj_desc->highbyte - (obj_desc->lowbyte + loc); - - what's going on here?? - jolly -*/ -/* - sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten); - memmove(&(fsblock->vl_dat[0]), dptr, sz); - } -*/ - - - /* index the new tuple */ - inv_indextup(obj_desc, ntup); - heap_freetuple(ntup); - - /* - * move the scandesc forward so we don't reread the newly inserted - * tuple on the next index scan - */ - res = NULL; - if (obj_desc->iscan) - res = index_getnext(obj_desc->iscan, ForwardScanDirection); - - if (res) - pfree(res); - - /* - * Okay, by here, a tuple for the new block is correctly placed, - * indexed, and filled. Write the changed pages out. - */ - - WriteBuffer(buffer); - if (newbuf != buffer) - WriteBuffer(newbuf); - - /* Tuple id is no longer valid */ - ItemPointerSetInvalid(&(obj_desc->htid)); - - /* done */ return nwritten; } - -static HeapTuple -inv_newtuple(LargeObjectDesc *obj_desc, - Buffer buffer, - Page page, - char *dbuf, - int nwrite) -{ - HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData)); - PageHeader ph; - int tupsize; - int hoff; - Offset lower; - Offset upper; - ItemId itemId; - OffsetNumber off; - OffsetNumber limit; - char *attptr; - - /* compute tuple size -- no nulls */ - hoff = offsetof(HeapTupleHeaderData, t_bits); - hoff = MAXALIGN(hoff); - - /* add in olastbyte, varlena.vl_len, varlena.vl_dat */ - tupsize = hoff + (2 * sizeof(int32)) + nwrite; - tupsize = MAXALIGN(tupsize); - - /* - * Allocate the tuple on the page, violating the page abstraction. - * This code was swiped from PageAddItem(). - */ - - ph = (PageHeader) page; - limit = OffsetNumberNext(PageGetMaxOffsetNumber(page)); - - /* look for "recyclable" (unused & deallocated) ItemId */ - for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off)) - { - itemId = &ph->pd_linp[off - 1]; - if ((((*itemId).lp_flags & LP_USED) == 0) && - ((*itemId).lp_len == 0)) - break; - } - - if (off > limit) - lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page)); - else if (off == limit) - lower = ph->pd_lower + sizeof(ItemIdData); - else - lower = ph->pd_lower; - - upper = ph->pd_upper - tupsize; - - itemId = &ph->pd_linp[off - 1]; - (*itemId).lp_off = upper; - (*itemId).lp_len = tupsize; - (*itemId).lp_flags = LP_USED; - ph->pd_lower = lower; - ph->pd_upper = upper; - - ntup->t_datamcxt = NULL; - ntup->t_data = (HeapTupleHeader) ((char *) page + upper); - - /* - * Tuple is now allocated on the page. Next, fill in the tuple - * header. This block of code violates the tuple abstraction. - */ - - ntup->t_len = tupsize; - ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off); - ntup->t_data->t_oid = newoid(); - TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin)); - ntup->t_data->t_cmin = GetCurrentCommandId(); - StoreInvalidTransactionId(&(ntup->t_data->t_xmax)); - ntup->t_data->t_cmax = 0; - ntup->t_data->t_infomask = HEAP_XMAX_INVALID; - ntup->t_data->t_natts = 2; - ntup->t_data->t_hoff = hoff; - - /* if a NULL is passed in, avoid the calculations below */ - if (dbuf == NULL) - return ntup; - - /* - * Finally, copy the user's data buffer into the tuple. This violates - * the tuple and class abstractions. - */ - - attptr = ((char *) ntup->t_data) + hoff; - *((int32 *) attptr) = obj_desc->offset + nwrite - 1; - attptr += sizeof(int32); - - /* - * * mer fixed disk layout of varlenas to get rid of the need for - * this. * - * - * ((int32 *) attptr) = nwrite + sizeof(int32); * attptr += - * sizeof(int32); - */ - - *((int32 *) attptr) = nwrite + sizeof(int32); - attptr += sizeof(int32); - - /* - * If a data buffer was passed in, then copy the data from the buffer - * to the tuple. Some callers (eg, inv_wrold()) may not pass in a - * buffer, since they have to copy part of the old tuple data and part - * of the user's new data into the new tuple. - */ - - if (dbuf != (char *) NULL) - memmove(attptr, dbuf, nwrite); - - /* keep track of boundary of current tuple */ - obj_desc->lowbyte = obj_desc->offset; - obj_desc->highbyte = obj_desc->offset + nwrite - 1; - - /* new tuple is filled -- return it */ - return ntup; -} - -static void -inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple) -{ - InsertIndexResult res; - Datum v[1]; - char n[1]; - - n[0] = ' '; - v[0] = Int32GetDatum(obj_desc->highbyte); - res = index_insert(obj_desc->index_r, &v[0], &n[0], - &(tuple->t_self), obj_desc->heap_r); - - if (res) - pfree(res); -} - -#ifdef NOT_USED - -static void -DumpPage(Page page, int blkno) -{ - ItemId lp; - HeapTuple tup; - int flags, i, nline; - ItemPointerData pointerData; - - printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0, - ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper, - ((PageHeader)page)->pd_special); - - printf("\t:MaxOffsetNumber=%d\n", - (int16) PageGetMaxOffsetNumber(page)); - - nline = (int16) PageGetMaxOffsetNumber(page); - -{ - int i; - char *cp; - - i = PageGetSpecialSize(page); - cp = PageGetSpecialPointer(page); - - printf("\t:SpecialData="); - - while (i > 0) { - printf(" 0x%02x", *cp); - cp += 1; - i -= 1; - } - printf("\n"); -} - for (i = 0; i < nline; i++) { - lp = ((PageHeader)page)->pd_linp + i; - flags = (*lp).lp_flags; - ItemPointerSet(&pointerData, blkno, 1 + i); - printf("%s:off=%d:flags=0x%x:len=%d", - ItemPointerFormExternal(&pointerData), (*lp).lp_off, - flags, (*lp).lp_len); - - if (flags & LP_USED) { - HeapTupleData htdata; - - printf(":USED"); - - memmove((char *) &htdata, - (char *) &((char *)page)[(*lp).lp_off], - sizeof(htdata)); - - tup = &htdata; - - printf("\n\t:ctid=%s:oid=%d", - ItemPointerFormExternal(&tup->t_ctid), - tup->t_oid); - printf(":natts=%d:thoff=%d:", - tup->t_natts, - tup->t_hoff); - - printf("\n\t:cmin=%u:", - tup->t_cmin); - - printf("xmin=%u:", tup->t_xmin); - - printf("\n\t:cmax=%u:", - tup->t_cmax); - - printf("xmax=%u:\n", tup->t_xmax); - - } else - putchar('\n'); - } -} - -static char* -ItemPointerFormExternal(ItemPointer pointer) -{ - static char itemPointerString[32]; - - if (!ItemPointerIsValid(pointer)) { - memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->"); - } else { - sprintf(itemPointerString, "<%u,%u>", - ItemPointerGetBlockNumber(pointer), - ItemPointerGetOffsetNumber(pointer)); - } - - return itemPointerString; -} - -#endif - -static int -_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln) -{ - IndexScanDesc iscan; - RetrieveIndexResult res; - HeapTupleData tuple; - Datum d; - long size; - bool isNull; - Buffer buffer; - - /* scan backwards from end */ - iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL); - - do - { - res = index_getnext(iscan, BackwardScanDirection); - - /* - * If there are no more index tuples, then the relation is empty, - * so the file's size is zero. - */ - - if (res == (RetrieveIndexResult) NULL) - { - index_endscan(iscan); - return 0; - } - - /* - * For time travel, we need to use the actual time qual here, - * rather that NowTimeQual. We currently have no way to pass a - * time qual in. - */ - tuple.t_self = res->heap_iptr; - heap_fetch(hreln, SnapshotNow, &tuple, &buffer); - pfree(res); - } while (tuple.t_data == NULL); - - /* don't need the index scan anymore */ - index_endscan(iscan); - - /* get olastbyte attribute */ - d = heap_getattr(&tuple, 1, hdesc, &isNull); - size = DatumGetInt32(d) + 1; - ReleaseBuffer(buffer); - - return size; -} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index a3a914f8af..0d2c161280 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -22,7 +22,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.174 2000/10/22 23:16:55 pjw Exp $ + * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.175 2000/10/24 01:38:32 tgl Exp $ * * Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb * @@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal) fprintf(stderr, "%s saving BLOBs\n", g_comment_start); /* Cursor to get all BLOB tables */ - appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT); + appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT DISTINCT loid FROM pg_largeobject"); res = PQexec(g_conn, oidQry->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) @@ -1874,8 +1874,7 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs) * tables before the child tables when traversing the tblinfo* * * we ignore tables that are not type 'r' (ordinary relation) or 'S' - * (sequence) or 'v' (view) --- in particular, Large Object - * relations (type 'l') are ignored. + * (sequence) or 'v' (view). */ appendPQExpBuffer(query, @@ -1886,7 +1885,6 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs) "where relname !~ '^pg_' " "and relkind in ('%c', '%c', '%c') " "order by oid", - RELKIND_VIEW, RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW); res = PQexec(g_conn, query->data); @@ -2585,7 +2583,7 @@ getIndices(int *numIndices) * find all the user-defined indices. We do not handle partial * indices. * - * Notice we skip indices on inversion objects (relkind 'l') + * Notice we skip indices on system classes * * this is a 4-way join !! */ @@ -2597,8 +2595,8 @@ getIndices(int *numIndices) "from pg_index i, pg_class t1, pg_class t2, pg_am a " "WHERE t1.oid = i.indexrelid and t2.oid = i.indrelid " "and t1.relam = a.oid and i.indexrelid > '%u'::oid " - "and t2.relname !~ '^pg_' and t2.relkind != '%c' and not i.indisprimary", - g_last_builtin_oid, RELKIND_LOBJECT); + "and t2.relname !~ '^pg_' and not i.indisprimary", + g_last_builtin_oid); res = PQexec(g_conn, query->data); if (!res || diff --git a/src/bin/pgtclsh/updateStats.tcl b/src/bin/pgtclsh/updateStats.tcl index d97c8a7b67..9cb8384dc2 100644 --- a/src/bin/pgtclsh/updateStats.tcl +++ b/src/bin/pgtclsh/updateStats.tcl @@ -59,7 +59,7 @@ proc update_attnvals {conn rel} { proc updateStats { dbName } { # datnames is the list to be result set conn [pg_connect $dbName] - set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_' and relname !~ '^xinv'"] + set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_'"] set ntups [pg_result $res -numTuples] for {set i 0} {$i < $ntups} {incr i} { set rel [pg_result $res -getTuple $i] diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 26c54b366a..3db2eb95a6 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3,7 +3,7 @@ * * Copyright 2000 by PostgreSQL Global Development Group * - * $Header: /cvsroot/pgsql/src/bin/psql/describe.c,v 1.24 2000/09/07 04:55:27 ishii Exp $ + * $Header: /cvsroot/pgsql/src/bin/psql/describe.c,v 1.25 2000/10/24 01:38:38 tgl Exp $ */ #include "postgres.h" #include "describe.h" @@ -1020,10 +1020,6 @@ listTables(const char *infotype, const char *name, bool desc) strcat(buf, "'S'"); strcat(buf, ")\n"); - /* ignore large-obj indices */ - if (showIndices) - strcat(buf, " AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n"); - strcat(buf, showSystem ? " AND c.relname ~ '^pg_'\n" : " AND c.relname !~ '^pg_'\n"); if (name) { @@ -1050,10 +1046,6 @@ listTables(const char *infotype, const char *name, bool desc) strcat(buf, "'S'"); strcat(buf, ")\n"); - /* ignore large-obj indices */ - if (showIndices) - strcat(buf, " AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n"); - strcat(buf, showSystem ? " AND c.relname ~ '^pg_'\n" : " AND c.relname !~ '^pg_'\n"); if (name) { diff --git a/src/bin/psql/large_obj.c b/src/bin/psql/large_obj.c index 020b0173eb..5cfd18c328 100644 --- a/src/bin/psql/large_obj.c +++ b/src/bin/psql/large_obj.c @@ -3,7 +3,7 @@ * * Copyright 2000 by PostgreSQL Global Development Group * - * $Header: /cvsroot/pgsql/src/bin/psql/large_obj.c,v 1.10 2000/04/12 17:16:22 momjian Exp $ + * $Header: /cvsroot/pgsql/src/bin/psql/large_obj.c,v 1.11 2000/10/24 01:38:39 tgl Exp $ */ #include "postgres.h" #include "large_obj.h" @@ -193,7 +193,7 @@ do_lo_import(const char *filename_arg, const char *comment_arg) /* insert description if given */ if (comment_arg) { - sprintf(buf, "INSERT INTO pg_description VALUES (%d, '", loid); + sprintf(buf, "INSERT INTO pg_description VALUES (%u, '", loid); for (i = 0; i < strlen(comment_arg); i++) if (comment_arg[i] == '\'') strcat(buf, "\\'"); @@ -284,7 +284,7 @@ do_lo_unlink(const char *loid_arg) } /* remove the comment as well */ - sprintf(buf, "DELETE FROM pg_description WHERE objoid = %d", loid); + sprintf(buf, "DELETE FROM pg_description WHERE objoid = %u", loid); if (!(res = PSQLexec(buf))) { if (own_transaction) @@ -328,15 +328,9 @@ do_lo_list(void) printQueryOpt myopt = pset.popt; strcpy(buf, - "SELECT usename as \"Owner\", substring(relname from 5) as \"ID\",\n" - " obj_description(pg_class.oid) as \"Description\"\n" - "FROM pg_class, pg_user\n" - "WHERE usesysid = relowner AND relkind = 'l'\n" - "UNION\n" - "SELECT NULL as \"Owner\", substring(relname from 5) as \"ID\",\n" - " obj_description(pg_class.oid) as \"Description\"\n" - "FROM pg_class\n" - "WHERE not exists (select 1 from pg_user where usesysid = relowner) AND relkind = 'l'\n" + "SELECT DISTINCT loid as \"ID\",\n" + " obj_description(loid) as \"Description\"\n" + "FROM pg_largeobject\n" "ORDER BY \"ID\""); res = PSQLexec(buf); diff --git a/src/include/catalog/catname.h b/src/include/catalog/catname.h index b82977d806..54b964e215 100644 --- a/src/include/catalog/catname.h +++ b/src/include/catalog/catname.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: catname.h,v 1.16 2000/10/22 05:27:20 momjian Exp $ + * $Id: catname.h,v 1.17 2000/10/24 01:38:41 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -29,6 +29,7 @@ #define InheritsRelationName "pg_inherits" #define InheritancePrecidenceListRelationName "pg_ipl" #define LanguageRelationName "pg_language" +#define LargeObjectRelationName "pg_largeobject" #define ListenerRelationName "pg_listener" #define LogRelationName "pg_log" #define OperatorClassRelationName "pg_opclass" diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index c16c6ae83e..f6fd284f34 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: catversion.h,v 1.51 2000/10/22 17:55:49 pjw Exp $ + * $Id: catversion.h,v 1.52 2000/10/24 01:38:41 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 200010231 +#define CATALOG_VERSION_NO 200010232 #endif diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index 6cc98bdc32..7150a43d2d 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: indexing.h,v 1.44 2000/10/22 05:27:20 momjian Exp $ + * $Id: indexing.h,v 1.45 2000/10/24 01:38:41 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #define Num_pg_index_indices 2 #define Num_pg_inherits_indices 1 #define Num_pg_language_indices 2 +#define Num_pg_largeobject_indices 1 #define Num_pg_listener_indices 1 #define Num_pg_opclass_indices 2 #define Num_pg_operator_indices 2 @@ -62,6 +63,7 @@ #define InheritsRelidSeqnoIndex "pg_inherits_relid_seqno_index" #define LanguageNameIndex "pg_language_name_index" #define LanguageOidIndex "pg_language_oid_index" +#define LargeObjectLOidPNIndex "pg_largeobject_loid_pn_index" #define ListenerPidRelnameIndex "pg_listener_pid_relname_index" #define OpclassDeftypeIndex "pg_opclass_deftype_index" #define OpclassNameIndex "pg_opclass_name_index" @@ -92,6 +94,7 @@ extern char *Name_pg_group_indices[]; extern char *Name_pg_index_indices[]; extern char *Name_pg_inherits_indices[]; extern char *Name_pg_language_indices[]; +extern char *Name_pg_largeobject_indices[]; extern char *Name_pg_listener_indices[]; extern char *Name_pg_opclass_indices[]; extern char *Name_pg_operator_indices[]; @@ -191,6 +194,7 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops)); DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops)); DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops)); DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops)); /* This column needs to allow multiple zero entries, but is in the cache */ DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops)); diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h index a9592e7ddb..68db583fe3 100644 --- a/src/include/catalog/pg_class.h +++ b/src/include/catalog/pg_class.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: pg_class.h,v 1.43 2000/10/22 17:55:49 pjw Exp $ + * $Id: pg_class.h,v 1.44 2000/10/24 01:38:41 tgl Exp $ * * NOTES * the genbki.sh script reads this file and generates .bki @@ -174,7 +174,6 @@ DESCR(""); #define XactLockTableId 376 #define RELKIND_INDEX 'i' /* secondary index */ -#define RELKIND_LOBJECT 'l' /* large objects */ #define RELKIND_RELATION 'r' /* ordinary cataloged heap */ #define RELKIND_SPECIAL 's' /* special (non-heap) */ #define RELKIND_SEQUENCE 'S' /* SEQUENCE relation */ diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h new file mode 100644 index 0000000000..7777604e27 --- /dev/null +++ b/src/include/catalog/pg_largeobject.h @@ -0,0 +1,63 @@ +/*------------------------------------------------------------------------- + * + * pg_largeobject.h + * definition of the system "largeobject" relation (pg_largeobject) + * along with the relation's initial contents. + * + * + * Portions Copyright (c) 1996-2000, PostgreSQL, Inc + * Portions Copyright (c) 1994, Regents of the University of California + * + * $Id: pg_largeobject.h,v 1.5 2000/10/24 01:38:41 tgl Exp $ + * + * NOTES + * the genbki.sh script reads this file and generates .bki + * information from the DATA() statements. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LARGEOBJECT_H +#define PG_LARGEOBJECT_H + +/* ---------------- + * postgres.h contains the system type definintions and the + * CATALOG(), BOOTSTRAP and DATA() sugar words so this file + * can be read by both genbki.sh and the C compiler. + * ---------------- + */ + +/* ---------------- + * pg_largeobject definition. cpp turns this into + * typedef struct FormData_pg_largeobject. Large object id + * is stored in loid; + * ---------------- + */ + +CATALOG(pg_largeobject) +{ + Oid loid; /* Identifier of large object */ + int4 pageno; /* Page number (starting from 0) */ + bytea data; /* Data for page (may be zero-length) */ +} FormData_pg_largeobject; + +/* ---------------- + * Form_pg_largeobject corresponds to a pointer to a tuple with + * the format of pg_largeobject relation. + * ---------------- + */ +typedef FormData_pg_largeobject *Form_pg_largeobject; + +/* ---------------- + * compiler constants for pg_largeobject + * ---------------- + */ +#define Natts_pg_largeobject 3 +#define Anum_pg_largeobject_loid 1 +#define Anum_pg_largeobject_pageno 2 +#define Anum_pg_largeobject_data 3 + +extern Oid LargeObjectCreate(Oid loid); +extern void LargeObjectDrop(Oid loid); +extern bool LargeObjectExists(Oid loid); + +#endif /* PG_LARGEOBJECT_H */ diff --git a/src/include/storage/large_object.h b/src/include/storage/large_object.h index c480f5b787..6bb0c4fcf2 100644 --- a/src/include/storage/large_object.h +++ b/src/include/storage/large_object.h @@ -8,39 +8,54 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: large_object.h,v 1.17 2000/10/22 05:27:23 momjian Exp $ + * $Id: large_object.h,v 1.18 2000/10/24 01:38:43 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef LARGE_OBJECT_H #define LARGE_OBJECT_H -#include +#include "utils/rel.h" -#include "access/relscan.h" -/* - * This structure will eventually have lots more stuff associated with it. +/*---------- + * Data about a currently-open large object. + * + * id is the logical OID of the large object + * offset is the current seek offset within the LO + * heap_r holds an open-relation reference to pg_largeobject + * index_r holds an open-relation reference to pg_largeobject_loid_pn_index + * + * NOTE: before 7.1, heap_r and index_r held references to the separate + * table and index of a specific large object. Now they all live in one rel. + *---------- */ -typedef struct LargeObjectDesc -{ - Relation heap_r; /* heap relation */ - Relation index_r; /* index relation on seqno attribute */ - IndexScanDesc iscan; /* index scan we're using */ - TupleDesc hdesc; /* heap relation tuple desc */ - TupleDesc idesc; /* index relation tuple desc */ - uint32 lowbyte; /* low byte on the current page */ - uint32 highbyte; /* high byte on the current page */ +typedef struct LargeObjectDesc { + Oid id; uint32 offset; /* current seek pointer */ - ItemPointerData htid; /* tid of current heap tuple */ + int flags; /* locking info, etc */ +/* flag bits: */ #define IFS_RDLOCK (1 << 0) #define IFS_WRLOCK (1 << 1) -#define IFS_ATEOF (1 << 2) - u_long flags; /* locking info, etc */ + Relation heap_r; + Relation index_r; } LargeObjectDesc; + +/* + * Each "page" (tuple) of a large object can hold this much data + * + * Calculation is max tuple size less tuple header, loid field (Oid), + * pageno field (int32), and varlena header of data (int32). Note we + * assume none of the fields will be NULL, hence no need for null bitmap. + */ +#define LOBLKSIZE (MaxTupleSize \ + - MAXALIGN(offsetof(HeapTupleHeaderData, t_bits)) \ + - sizeof(Oid) - sizeof(int32) * 2) + + /* * Function definitions... */ @@ -55,7 +70,4 @@ extern int inv_tell(LargeObjectDesc *obj_desc); extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes); extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes); -/* added for buffer leak prevention [ PA ] */ -extern void inv_cleanindex(LargeObjectDesc *obj_desc); - #endif /* LARGE_OBJECT_H */ diff --git a/src/interfaces/odbc/info.c b/src/interfaces/odbc/info.c index 9c99a120ad..9d4e75a9e0 100644 --- a/src/interfaces/odbc/info.c +++ b/src/interfaces/odbc/info.c @@ -1007,8 +1007,7 @@ mylog("%s: entering...stmt=%u\n", func, stmt); } - /* filter out large objects unconditionally (they are not system tables) and match users */ - strcat(tables_query, " and relname !~ '^xinv[0-9]+'"); + /* match users */ strcat(tables_query, " and usesysid = relowner"); strcat(tables_query, " order by relname"); diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out index f5d2427cfa..9fd96b2280 100644 --- a/src/test/regress/expected/opr_sanity.out +++ b/src/test/regress/expected/opr_sanity.out @@ -482,8 +482,8 @@ WHERE p1.aggtransfn = p2.oid AND (p2.pronargs = 1 AND p1.aggbasetype = 0))); oid | aggname | oid | proname -------+---------+-----+------------- - 16984 | max | 768 | int4larger - 16998 | min | 769 | int4smaller + 16996 | max | 768 | int4larger + 17010 | min | 769 | int4smaller (2 rows) -- Cross-check finalfn (if present) against its entry in pg_proc. diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 823d9e142d..f2412386d1 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -40,6 +40,7 @@ SELECT relname, relhasindex pg_index | t pg_inherits | t pg_language | t + pg_largeobject | t pg_listener | t pg_opclass | t pg_operator | t @@ -54,5 +55,5 @@ SELECT relname, relhasindex shighway | t tenk1 | t tenk2 | t -(44 rows) +(45 rows)