diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c index e448e388d4..3452944a42 100644 --- a/src/bin/pg_dump/dumputils.c +++ b/src/bin/pg_dump/dumputils.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.53 2010/01/02 16:57:59 momjian Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.54 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -486,7 +486,8 @@ parsePGArray(const char *atext, char ***itemarray, int *nitems) * name: the object name, in the form to use in the commands (already quoted) * subname: the sub-object name, if any (already quoted); NULL if none * type: the object type (as seen in GRANT command: must be one of - * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE) + * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE, + * FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT) * acls: the ACL string fetched from the database * owner: username of object owner (will be passed through fmtId); can be * NULL or empty string to indicate "no owner known" diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 4357d11971..f227ed1590 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -15,7 +15,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.178 2010/01/19 18:39:19 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.179 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -98,6 +98,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace); static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te); static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te); static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls); +static bool _tocEntryIsACL(TocEntry *te); static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id); @@ -329,9 +330,9 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) AH->currentTE = te; reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ ); - if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt) + /* We want anything that's selected and has a dropStmt */ + if (((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) && te->dropStmt) { - /* We want the schema */ ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag); /* Select owner and schema as necessary */ _becomeOwner(AH, te); @@ -381,7 +382,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) /* Work out what, if anything, we want from this entry */ reqs = _tocEntryRequired(te, ropt, true); - if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + /* Both schema and data objects might now have ownership/ACLs */ + if ((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) { ahlog(AH, 1, "setting owner and privileges for %s %s\n", te->desc, te->tag); @@ -905,6 +907,7 @@ EndRestoreBlobs(ArchiveHandle *AH) void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop) { + bool old_blob_style = (AH->version < K_VERS_1_12); Oid loOid; AH->blobCount++; @@ -914,24 +917,32 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop) ahlog(AH, 2, "restoring large object with OID %u\n", oid); - if (drop) + /* With an old archive we must do drop and create logic here */ + if (old_blob_style && drop) DropBlobIfExists(AH, oid); if (AH->connection) { - loOid = lo_create(AH->connection, oid); - if (loOid == 0 || loOid != oid) - die_horribly(AH, modulename, "could not create large object %u\n", - oid); - + if (old_blob_style) + { + loOid = lo_create(AH->connection, oid); + if (loOid == 0 || loOid != oid) + die_horribly(AH, modulename, "could not create large object %u\n", + oid); + } AH->loFd = lo_open(AH->connection, oid, INV_WRITE); if (AH->loFd == -1) - die_horribly(AH, modulename, "could not open large object\n"); + die_horribly(AH, modulename, "could not open large object %u\n", + oid); } else { - ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n", - oid, INV_WRITE); + if (old_blob_style) + ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n", + oid, INV_WRITE); + else + ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n", + oid, INV_WRITE); } AH->writingBlob = 1; @@ -1829,6 +1840,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->vmin = K_VERS_MINOR; AH->vrev = K_VERS_REV; + /* Make a convenient integer 00 */ + AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0; + /* initialize for backwards compatible string processing */ AH->public.encoding = 0; /* PG_SQL_ASCII */ AH->public.std_strings = false; @@ -2068,12 +2082,13 @@ ReadToc(ArchiveHandle *AH) else { /* - * rules for pre-8.4 archives wherein pg_dump hasn't classified - * the entries into sections + * Rules for pre-8.4 archives wherein pg_dump hasn't classified + * the entries into sections. This list need not cover entry + * types added later than 8.4. */ if (strcmp(te->desc, "COMMENT") == 0 || strcmp(te->desc, "ACL") == 0 || - strcmp(te->desc, "DEFAULT ACL") == 0) + strcmp(te->desc, "ACL LANGUAGE") == 0) te->section = SECTION_NONE; else if (strcmp(te->desc, "TABLE DATA") == 0 || strcmp(te->desc, "BLOBS") == 0 || @@ -2228,10 +2243,10 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls) return 0; /* If it's an ACL, maybe ignore it */ - if ((!include_acls || ropt->aclsSkip) && - (strcmp(te->desc, "ACL") == 0 || strcmp(te->desc, "DEFAULT ACL") == 0)) + if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te)) return 0; + /* Ignore DATABASE entry unless we should create it */ if (!ropt->create && strcmp(te->desc, "DATABASE") == 0) return 0; @@ -2286,9 +2301,18 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls) if (!te->hadDumper) { /* - * Special Case: If 'SEQUENCE SET' then it is considered a data entry + * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, + * then it is considered a data entry. We don't need to check for + * the BLOBS entry or old-style BLOB COMMENTS, because they will + * have hadDumper = true ... but we do need to check new-style + * BLOB comments. */ - if (strcmp(te->desc, "SEQUENCE SET") == 0) + if (strcmp(te->desc, "SEQUENCE SET") == 0 || + strcmp(te->desc, "BLOB") == 0 || + (strcmp(te->desc, "ACL") == 0 && + strncmp(te->tag, "LARGE OBJECT ", 13) == 0) || + (strcmp(te->desc, "COMMENT") == 0 && + strncmp(te->tag, "LARGE OBJECT ", 13) == 0)) res = res & REQ_DATA; else res = res & ~REQ_DATA; @@ -2320,6 +2344,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls) return res; } +/* + * Identify TOC entries that are ACLs. + */ +static bool +_tocEntryIsACL(TocEntry *te) +{ + /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */ + if (strcmp(te->desc, "ACL") == 0 || + strcmp(te->desc, "ACL LANGUAGE") == 0 || + strcmp(te->desc, "DEFAULT ACL") == 0) + return true; + return false; +} + /* * Issue SET commands for parameters that we want to have set the same way * at all times during execution of a restore script. @@ -2685,6 +2723,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH) return; } + /* BLOBs just have a name, but it's numeric so must not use fmtId */ + if (strcmp(type, "BLOB") == 0) + { + appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag); + return; + } + /* * These object types require additional decoration. Fortunately, the * information needed is exactly what's in the DROP command. @@ -2723,14 +2768,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat /* ACLs are dumped only during acl pass */ if (acl_pass) { - if (!(strcmp(te->desc, "ACL") == 0 || - strcmp(te->desc, "DEFAULT ACL") == 0)) + if (!_tocEntryIsACL(te)) return; } else { - if (strcmp(te->desc, "ACL") == 0 || - strcmp(te->desc, "DEFAULT ACL") == 0) + if (_tocEntryIsACL(te)) return; } @@ -2824,6 +2867,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat strlen(te->owner) > 0 && strlen(te->dropStmt) > 0) { if (strcmp(te->desc, "AGGREGATE") == 0 || + strcmp(te->desc, "BLOB") == 0 || strcmp(te->desc, "CONVERSION") == 0 || strcmp(te->desc, "DATABASE") == 0 || strcmp(te->desc, "DOMAIN") == 0 || @@ -2873,7 +2917,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION * commands, so we can no longer assume we know the current auth setting. */ - if (strncmp(te->desc, "ACL", 3) == 0) + if (acl_pass) { if (AH->currUser) free(AH->currUser); diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index b3eb8b7b10..dc2ccb5e95 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -17,7 +17,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.83 2009/12/14 00:39:11 itagaki Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.84 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -61,16 +61,16 @@ typedef struct _z_stream typedef z_stream *z_streamp; #endif +/* Current archive version number (the format we can output) */ #define K_VERS_MAJOR 1 -#define K_VERS_MINOR 11 +#define K_VERS_MINOR 12 #define K_VERS_REV 0 /* Data block types */ #define BLK_DATA 1 -#define BLK_BLOB 2 #define BLK_BLOBS 3 -/* Some important version numbers (checked in code) */ +/* Historical version numbers (checked in code) */ #define K_VERS_1_0 (( (1 * 256 + 0) * 256 + 0) * 256 + 0) #define K_VERS_1_2 (( (1 * 256 + 2) * 256 + 0) * 256 + 0) /* Allow No ZLIB */ #define K_VERS_1_3 (( (1 * 256 + 3) * 256 + 0) * 256 + 0) /* BLOBs */ @@ -87,8 +87,11 @@ typedef z_stream *z_streamp; #define K_VERS_1_10 (( (1 * 256 + 10) * 256 + 0) * 256 + 0) /* add tablespace */ #define K_VERS_1_11 (( (1 * 256 + 11) * 256 + 0) * 256 + 0) /* add toc section * indicator */ +#define K_VERS_1_12 (( (1 * 256 + 12) * 256 + 0) * 256 + 0) /* add separate BLOB + * entries */ -#define K_VERS_MAX (( (1 * 256 + 11) * 256 + 255) * 256 + 0) +/* Newest format we can read */ +#define K_VERS_MAX (( (1 * 256 + 12) * 256 + 255) * 256 + 0) /* Flags to indicate disposition of offsets stored in files */ diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index 53711de119..1a9ddc9580 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -5,7 +5,7 @@ * Implements the basic DB functions used by the archiver. * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.87 2010/02/17 04:19:40 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.88 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -703,16 +703,26 @@ CommitTransaction(ArchiveHandle *AH) void DropBlobIfExists(ArchiveHandle *AH, Oid oid) { - /* Call lo_unlink only if exists to avoid not-found error. */ - if (PQserverVersion(AH->connection) >= 90000) + /* + * If we are not restoring to a direct database connection, we have to + * guess about how to detect whether the blob exists. Assume new-style. + */ + if (AH->connection == NULL || + PQserverVersion(AH->connection) >= 90000) { - ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) " - "FROM pg_catalog.pg_largeobject_metadata " - "WHERE oid = %u;\n", oid); + ahprintf(AH, + "SELECT pg_catalog.lo_unlink(oid) " + "FROM pg_catalog.pg_largeobject_metadata " + "WHERE oid = '%u';\n", + oid); } else { - ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n", + /* Restoring to pre-9.0 server, so do it the old way */ + ahprintf(AH, + "SELECT CASE WHEN EXISTS(" + "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'" + ") THEN pg_catalog.lo_unlink('%u') END;\n", oid, oid); } } diff --git a/src/bin/pg_dump/pg_backup_null.c b/src/bin/pg_dump/pg_backup_null.c index 127ff57219..35b9a18a4e 100644 --- a/src/bin/pg_dump/pg_backup_null.c +++ b/src/bin/pg_dump/pg_backup_null.c @@ -17,7 +17,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.23 2009/12/14 00:39:11 itagaki Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.24 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -147,14 +147,21 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) { + bool old_blob_style = (AH->version < K_VERS_1_12); + if (oid == 0) die_horribly(AH, NULL, "invalid OID for large object\n"); - if (AH->ropt->dropSchema) + /* With an old archive we must do drop and create logic here */ + if (old_blob_style && AH->ropt->dropSchema) DropBlobIfExists(AH, oid); - ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n", - oid, INV_WRITE); + if (old_blob_style) + ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n", + oid, INV_WRITE); + else + ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n", + oid, INV_WRITE); AH->WriteDataPtr = _WriteBlobData; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ace4ffe34b..84d12b118f 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -12,7 +12,7 @@ * by PostgreSQL * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.571 2010/02/17 04:19:40 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.572 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName); static char *getFormattedTypeName(Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); static const char *fmtQualifiedId(const char *schema, const char *id); -static bool hasBlobs(Archive *AH); +static void getBlobs(Archive *AH); +static void dumpBlob(Archive *AH, BlobInfo *binfo); static int dumpBlobs(Archive *AH, void *arg); -static int dumpBlobComments(Archive *AH, void *arg); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); @@ -701,25 +701,8 @@ main(int argc, char **argv) getTableDataFKConstraints(); } - if (outputBlobs && hasBlobs(g_fout)) - { - /* Add placeholders to allow correct sorting of blobs */ - DumpableObject *blobobj; - DumpableObject *blobcobj; - - blobobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobobj->objType = DO_BLOBS; - blobobj->catId = nilCatalogId; - AssignDumpId(blobobj); - blobobj->name = strdup("BLOBS"); - - blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobcobj->objType = DO_BLOB_COMMENTS; - blobcobj->catId = nilCatalogId; - AssignDumpId(blobcobj); - blobcobj->name = strdup("BLOB COMMENTS"); - addObjectDependency(blobcobj, blobobj->dumpId); - } + if (outputBlobs) + getBlobs(g_fout); /* * Collect dependency data to assist in ordering the objects. @@ -1808,7 +1791,7 @@ dumpDatabase(Archive *AH) appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid\n" "FROM pg_catalog.pg_class\n" - "WHERE oid = %d;\n", + "WHERE oid = %u;\n", LargeObjectRelationId); lo_res = PQexec(g_conn, loFrozenQry->data); @@ -1825,7 +1808,7 @@ dumpDatabase(Archive *AH) appendPQExpBuffer(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid.\n"); appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n" "SET relfrozenxid = '%u'\n" - "WHERE oid = %d;\n", + "WHERE oid = %u;\n", atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)), LargeObjectRelationId); ArchiveEntry(AH, nilCatalogId, createDumpId(), @@ -1938,40 +1921,135 @@ dumpStdStrings(Archive *AH) /* - * hasBlobs: - * Test whether database contains any large objects + * getBlobs: + * Collect schema-level data about large objects */ -static bool -hasBlobs(Archive *AH) +static void +getBlobs(Archive *AH) { - bool result; - const char *blobQry; - PGresult *res; + PQExpBuffer blobQry = createPQExpBuffer(); + BlobInfo *binfo; + DumpableObject *bdata; + PGresult *res; + int ntups; + int i; + + /* Verbose message */ + if (g_verbose) + write_msg(NULL, "reading large objects\n"); /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); - /* Check for BLOB OIDs */ + /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */ if (AH->remoteVersion >= 90000) - blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1"; + appendPQExpBuffer(blobQry, + "SELECT oid, (%s lomowner) AS rolname, lomacl" + " FROM pg_largeobject_metadata", + username_subquery); else if (AH->remoteVersion >= 70100) - blobQry = "SELECT loid FROM pg_largeobject LIMIT 1"; + appendPQExpBuffer(blobQry, + "SELECT DISTINCT loid, NULL::oid, NULL::oid" + " FROM pg_largeobject"); else - blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1"; + appendPQExpBuffer(blobQry, + "SELECT oid, NULL::oid, NULL::oid" + " FROM pg_class WHERE relkind = 'l'"); - res = PQexec(g_conn, blobQry); - check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK); + res = PQexec(g_conn, blobQry->data); + check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK); - result = PQntuples(res) > 0; + ntups = PQntuples(res); + if (ntups > 0) + { + /* + * Each large object has its own BLOB archive entry. + */ + binfo = (BlobInfo *) malloc(ntups * sizeof(BlobInfo)); + + for (i = 0; i < ntups; i++) + { + binfo[i].dobj.objType = DO_BLOB; + binfo[i].dobj.catId.tableoid = LargeObjectRelationId; + binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, 0)); + AssignDumpId(&binfo[i].dobj); + + binfo[i].dobj.name = strdup(PQgetvalue(res, i, 0)); + if (!PQgetisnull(res, i, 1)) + binfo[i].rolname = strdup(PQgetvalue(res, i, 1)); + else + binfo[i].rolname = ""; + if (!PQgetisnull(res, i, 2)) + binfo[i].blobacl = strdup(PQgetvalue(res, i, 2)); + else + binfo[i].blobacl = NULL; + } + + /* + * If we have any large objects, a "BLOBS" archive entry is needed. + * This is just a placeholder for sorting; it carries no data now. + */ + bdata = (DumpableObject *) malloc(sizeof(DumpableObject)); + bdata->objType = DO_BLOB_DATA; + bdata->catId = nilCatalogId; + AssignDumpId(bdata); + bdata->name = strdup("BLOBS"); + } PQclear(res); + destroyPQExpBuffer(blobQry); +} - return result; +/* + * dumpBlob + * + * dump the definition (metadata) of the given large object + */ +static void +dumpBlob(Archive *AH, BlobInfo *binfo) +{ + PQExpBuffer cquery = createPQExpBuffer(); + PQExpBuffer dquery = createPQExpBuffer(); + + appendPQExpBuffer(cquery, + "SELECT pg_catalog.lo_create('%s');\n", + binfo->dobj.name); + + appendPQExpBuffer(dquery, + "SELECT pg_catalog.lo_unlink('%s');\n", + binfo->dobj.name); + + ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId, + binfo->dobj.name, + NULL, NULL, + binfo->rolname, false, + "BLOB", SECTION_PRE_DATA, + cquery->data, dquery->data, NULL, + binfo->dobj.dependencies, binfo->dobj.nDeps, + NULL, NULL); + + /* set up tag for comment and/or ACL */ + resetPQExpBuffer(cquery); + appendPQExpBuffer(cquery, "LARGE OBJECT %s", binfo->dobj.name); + + /* Dump comment if any */ + dumpComment(AH, cquery->data, + NULL, binfo->rolname, + binfo->dobj.catId, 0, binfo->dobj.dumpId); + + /* Dump ACL if any */ + if (binfo->blobacl) + dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId, "LARGE OBJECT", + binfo->dobj.name, NULL, cquery->data, + NULL, binfo->rolname, binfo->blobacl); + + destroyPQExpBuffer(cquery); + destroyPQExpBuffer(dquery); } /* * dumpBlobs: - * dump all blobs + * dump the data contents of all large objects */ static int dumpBlobs(Archive *AH, void *arg) @@ -1980,6 +2058,7 @@ dumpBlobs(Archive *AH, void *arg) const char *blobFetchQry; PGresult *res; char buf[LOBBUFSIZE]; + int ntups; int i; int cnt; @@ -1989,7 +2068,10 @@ dumpBlobs(Archive *AH, void *arg) /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); - /* Cursor to get all BLOB OIDs */ + /* + * Currently, we re-fetch all BLOB OIDs using a cursor. Consider + * scanning the already-in-memory dumpable objects instead... + */ if (AH->remoteVersion >= 90000) blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70100) @@ -2012,7 +2094,8 @@ dumpBlobs(Archive *AH, void *arg) check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK); /* Process the tuples, if any */ - for (i = 0; i < PQntuples(res); i++) + ntups = PQntuples(res); + for (i = 0; i < ntups; i++) { Oid blobOid; int loFd; @@ -2022,8 +2105,8 @@ dumpBlobs(Archive *AH, void *arg) loFd = lo_open(g_conn, blobOid, INV_READ); if (loFd == -1) { - write_msg(NULL, "dumpBlobs(): could not open large object: %s", - PQerrorMessage(g_conn)); + write_msg(NULL, "dumpBlobs(): could not open large object %u: %s", + blobOid, PQerrorMessage(g_conn)); exit_nicely(); } @@ -2035,8 +2118,8 @@ dumpBlobs(Archive *AH, void *arg) cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE); if (cnt < 0) { - write_msg(NULL, "dumpBlobs(): error reading large object: %s", - PQerrorMessage(g_conn)); + write_msg(NULL, "dumpBlobs(): error reading large object %u: %s", + blobOid, PQerrorMessage(g_conn)); exit_nicely(); } @@ -2047,141 +2130,13 @@ dumpBlobs(Archive *AH, void *arg) EndBlob(AH, blobOid); } - } while (PQntuples(res) > 0); + } while (ntups > 0); PQclear(res); return 1; } -/* - * dumpBlobComments - * dump all blob properties. - * It has "BLOB COMMENTS" tag due to the historical reason, but note - * that it is the routine to dump all the properties of blobs. - * - * Since we don't provide any way to be selective about dumping blobs, - * there's no need to be selective about their comments either. We put - * all the comments into one big TOC entry. - */ -static int -dumpBlobComments(Archive *AH, void *arg) -{ - const char *blobQry; - const char *blobFetchQry; - PQExpBuffer cmdQry = createPQExpBuffer(); - PGresult *res; - int i; - - if (g_verbose) - write_msg(NULL, "saving large object properties\n"); - - /* Make sure we are in proper schema */ - selectSourceSchema("pg_catalog"); - - /* Cursor to get all BLOB comments */ - if (AH->remoteVersion >= 90000) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " - "obj_description(oid, 'pg_largeobject'), " - "pg_get_userbyid(lomowner), lomacl " - "FROM pg_largeobject_metadata"; - else if (AH->remoteVersion >= 70300) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " - "obj_description(loid, 'pg_largeobject'), NULL, NULL " - "FROM (SELECT DISTINCT loid FROM " - "pg_description d JOIN pg_largeobject l ON (objoid = loid) " - "WHERE classoid = 'pg_largeobject'::regclass) ss"; - else if (AH->remoteVersion >= 70200) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " - "obj_description(loid, 'pg_largeobject'), NULL, NULL " - "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; - else if (AH->remoteVersion >= 70100) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " - "obj_description(loid), NULL, NULL " - "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; - else - blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " - " ( " - " SELECT description " - " FROM pg_description pd " - " WHERE pd.objoid=pc.oid " - " ), NULL, NULL " - "FROM pg_class pc WHERE relkind = 'l'"; - - res = PQexec(g_conn, blobQry); - check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK); - - /* Command to fetch from cursor */ - blobFetchQry = "FETCH 100 IN blobcmt"; - - do - { - PQclear(res); - - /* Do a fetch */ - res = PQexec(g_conn, blobFetchQry); - check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK); - - /* Process the tuples, if any */ - for (i = 0; i < PQntuples(res); i++) - { - Oid blobOid = atooid(PQgetvalue(res, i, 0)); - char *lo_comment = PQgetvalue(res, i, 1); - char *lo_owner = PQgetvalue(res, i, 2); - char *lo_acl = PQgetvalue(res, i, 3); - char lo_name[32]; - - resetPQExpBuffer(cmdQry); - - /* comment on the blob */ - if (!PQgetisnull(res, i, 1)) - { - appendPQExpBuffer(cmdQry, - "COMMENT ON LARGE OBJECT %u IS ", blobOid); - appendStringLiteralAH(cmdQry, lo_comment, AH); - appendPQExpBuffer(cmdQry, ";\n"); - } - - /* dump blob ownership, if necessary */ - if (!PQgetisnull(res, i, 2)) - { - appendPQExpBuffer(cmdQry, - "ALTER LARGE OBJECT %u OWNER TO %s;\n", - blobOid, lo_owner); - } - - /* dump blob privileges, if necessary */ - if (!PQgetisnull(res, i, 3) && - !dataOnly && !aclsSkip) - { - snprintf(lo_name, sizeof(lo_name), "%u", blobOid); - if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT", - lo_acl, lo_owner, "", - AH->remoteVersion, cmdQry)) - { - write_msg(NULL, "could not parse ACL (%s) for " - "large object %u", lo_acl, blobOid); - exit_nicely(); - } - } - - if (cmdQry->len > 0) - { - appendPQExpBuffer(cmdQry, "\n"); - archputs(cmdQry->data, AH); - } - } - } while (PQntuples(res) > 0); - - PQclear(res); - - archputs("\n", AH); - - destroyPQExpBuffer(cmdQry); - - return 1; -} - static void binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer, Oid pg_type_oid) @@ -6140,9 +6095,17 @@ dumpComment(Archive *fout, const char *target, CommentItem *comments; int ncomments; - /* Comments are SCHEMA not data */ - if (dataOnly) - return; + /* Comments are schema not data ... except blob comments are data */ + if (strncmp(target, "LARGE OBJECT ", 13) != 0) + { + if (dataOnly) + return; + } + else + { + if (schemaOnly) + return; + } /* Search for comments associated with catalogId, using table */ ncomments = findComments(fout, catalogId.tableoid, catalogId.oid, @@ -6530,7 +6493,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_DEFAULT_ACL: dumpDefaultACL(fout, (DefaultACLInfo *) dobj); break; - case DO_BLOBS: + case DO_BLOB: + dumpBlob(fout, (BlobInfo *) dobj); + break; + case DO_BLOB_DATA: ArchiveEntry(fout, dobj->catId, dobj->dumpId, dobj->name, NULL, NULL, "", false, "BLOBS", SECTION_DATA, @@ -6538,14 +6504,6 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) dobj->dependencies, dobj->nDeps, dumpBlobs, NULL); break; - case DO_BLOB_COMMENTS: - ArchiveEntry(fout, dobj->catId, dobj->dumpId, - dobj->name, NULL, NULL, "", - false, "BLOB COMMENTS", SECTION_DATA, - "", "", NULL, - dobj->dependencies, dobj->nDeps, - dumpBlobComments, NULL); - break; } } @@ -10382,14 +10340,16 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo) * * 'objCatId' is the catalog ID of the underlying object. * 'objDumpId' is the dump ID of the underlying object. - * 'type' must be TABLE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE. + * 'type' must be one of + * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE, + * FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT. * 'name' is the formatted name of the object. Must be quoted etc. already. * 'subname' is the formatted name of the sub-object, if any. Must be quoted. * 'tag' is the tag for the archive entry (typ. unquoted name of object). * 'nspname' is the namespace the object is in (NULL if none). * 'owner' is the owner, NULL if there is no owner (for languages). * 'acls' is the string read out of the fooacl system catalog field; - * it will be parsed here. + * it will be parsed here. *---------- */ static void @@ -10401,7 +10361,11 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId, PQExpBuffer sql; /* Do nothing if ACL dump is not enabled */ - if (dataOnly || aclsSkip) + if (aclsSkip) + return; + + /* --data-only skips ACLs *except* BLOB ACLs */ + if (dataOnly && strcmp(type, "LARGE OBJECT") != 0) return; sql = createPQExpBuffer(); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index fa5972a55f..e71d03604b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.162 2010/01/28 23:21:12 petere Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.163 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -115,8 +115,8 @@ typedef enum DO_FDW, DO_FOREIGN_SERVER, DO_DEFAULT_ACL, - DO_BLOBS, - DO_BLOB_COMMENTS + DO_BLOB, + DO_BLOB_DATA } DumpableObjectType; typedef struct _dumpableObject @@ -443,6 +443,13 @@ typedef struct _defaultACLInfo char *defaclacl; } DefaultACLInfo; +typedef struct _blobInfo +{ + DumpableObject dobj; + char *rolname; + char *blobacl; +} BlobInfo; + /* global decls */ extern bool force_quotes; /* double-quotes for identifiers flag */ extern bool g_verbose; /* verbose flag */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 1551af3dbe..f3761217d1 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.28 2010/02/15 19:59:47 petere Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.29 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -46,7 +46,7 @@ static const int oldObjectTypePriority[] = 16, /* DO_FK_CONSTRAINT */ 2, /* DO_PROCLANG */ 2, /* DO_CAST */ - 9, /* DO_TABLE_DATA */ + 10, /* DO_TABLE_DATA */ 7, /* DO_DUMMY_TYPE */ 3, /* DO_TSPARSER */ 4, /* DO_TSDICT */ @@ -55,8 +55,8 @@ static const int oldObjectTypePriority[] = 3, /* DO_FDW */ 4, /* DO_FOREIGN_SERVER */ 17, /* DO_DEFAULT_ACL */ - 10, /* DO_BLOBS */ - 11 /* DO_BLOB_COMMENTS */ + 9, /* DO_BLOB */ + 11 /* DO_BLOB_DATA */ }; /* @@ -83,7 +83,7 @@ static const int newObjectTypePriority[] = 26, /* DO_FK_CONSTRAINT */ 2, /* DO_PROCLANG */ 8, /* DO_CAST */ - 19, /* DO_TABLE_DATA */ + 20, /* DO_TABLE_DATA */ 17, /* DO_DUMMY_TYPE */ 10, /* DO_TSPARSER */ 12, /* DO_TSDICT */ @@ -92,8 +92,8 @@ static const int newObjectTypePriority[] = 14, /* DO_FDW */ 15, /* DO_FOREIGN_SERVER */ 27, /* DO_DEFAULT_ACL */ - 20, /* DO_BLOBS */ - 21 /* DO_BLOB_COMMENTS */ + 19, /* DO_BLOB */ + 21 /* DO_BLOB_DATA */ }; @@ -1157,14 +1157,14 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "DEFAULT ACL %s (ID %d OID %u)", obj->name, obj->dumpId, obj->catId.oid); return; - case DO_BLOBS: + case DO_BLOB: snprintf(buf, bufsize, - "BLOBS (ID %d)", - obj->dumpId); + "BLOB (ID %d OID %u)", + obj->dumpId, obj->catId.oid); return; - case DO_BLOB_COMMENTS: + case DO_BLOB_DATA: snprintf(buf, bufsize, - "BLOB COMMENTS (ID %d)", + "BLOB DATA (ID %d)", obj->dumpId); return; }