Dump public schema ownership and security labels.

As a side effect, this corrects dumps of public schema ACLs in databases
where the DBA dropped and recreated that schema.

Reviewed by Asif Rehman.

Discussion: https://postgr.es/m/20201229134924.GA1431748@rfd.leadboat.com
This commit is contained in:
Noah Misch 2021-06-28 18:34:55 -07:00
parent 14b2ffaf7f
commit a7a7be1f2f
7 changed files with 177 additions and 43 deletions

View File

@ -725,6 +725,7 @@ void
buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
PQExpBuffer init_acl_subquery, PQExpBuffer init_racl_subquery,
const char *acl_column, const char *acl_owner,
const char *initprivs_expr,
const char *obj_kind, bool binary_upgrade)
{
/*
@ -765,23 +766,25 @@ buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
"WITH ORDINALITY AS perm(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM "
"pg_catalog.unnest(coalesce(pip.initprivs,pg_catalog.acldefault(%s,%s))) "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"AS init(init_acl) WHERE acl = init_acl)) as foo)",
acl_column,
obj_kind,
acl_owner,
initprivs_expr,
obj_kind,
acl_owner);
printfPQExpBuffer(racl_subquery,
"(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
"(SELECT acl, row_n FROM "
"pg_catalog.unnest(coalesce(pip.initprivs,pg_catalog.acldefault(%s,%s))) "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"WITH ORDINALITY AS initp(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"AS permp(orig_acl) WHERE acl = orig_acl)) as foo)",
initprivs_expr,
obj_kind,
acl_owner,
acl_column,
@ -807,12 +810,13 @@ buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
printfPQExpBuffer(init_acl_subquery,
"CASE WHEN privtype = 'e' THEN "
"(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
"(SELECT acl, row_n FROM pg_catalog.unnest(pip.initprivs) "
"(SELECT acl, row_n FROM pg_catalog.unnest(%s) "
"WITH ORDINALITY AS initp(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM "
"pg_catalog.unnest(pg_catalog.acldefault(%s,%s)) "
"AS privm(orig_acl) WHERE acl = orig_acl)) as foo) END",
initprivs_expr,
obj_kind,
acl_owner);
@ -823,10 +827,11 @@ buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
"pg_catalog.unnest(pg_catalog.acldefault(%s,%s)) "
"WITH ORDINALITY AS privp(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM pg_catalog.unnest(pip.initprivs) "
"SELECT 1 FROM pg_catalog.unnest(%s) "
"AS initp(init_acl) WHERE acl = init_acl)) as foo) END",
obj_kind,
acl_owner);
acl_owner,
initprivs_expr);
}
else
{

View File

@ -54,6 +54,7 @@ extern void emitShSecLabels(PGconn *conn, PGresult *res,
extern void buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
PQExpBuffer init_acl_subquery, PQExpBuffer init_racl_subquery,
const char *acl_column, const char *acl_owner,
const char *initprivs_expr,
const char *obj_kind, bool binary_upgrade);
extern bool variable_is_guc_list_quote(const char *name);

View File

@ -3538,10 +3538,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
* Actually print the definition.
*
* Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
* versions put into CREATE SCHEMA. We have to do this when --no-owner
* mode is selected. This is ugly, but I see no other good way ...
* versions put into CREATE SCHEMA. Don't mutate the variant for schema
* "public" that is a comment. We have to do this when --no-owner mode is
* selected. This is ugly, but I see no other good way ...
*/
if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
if (ropt->noOwner &&
strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
{
ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
}
@ -3553,11 +3555,16 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
/*
* If we aren't using SET SESSION AUTH to determine ownership, we must
* instead issue an ALTER OWNER command. We assume that anything without
* a DROP command is not a separately ownable object. All the categories
* with DROP commands must appear in one list or the other.
* instead issue an ALTER OWNER command. Schema "public" is special; when
* a dump emits a comment in lieu of creating it, we use ALTER OWNER even
* when using SET SESSION for all other objects. We assume that anything
* without a DROP command is not a separately ownable object. All the
* categories with DROP commands must appear in one list or the other.
*/
if (!ropt->noOwner && !ropt->use_setsessauth &&
if (!ropt->noOwner &&
(!ropt->use_setsessauth ||
(strcmp(te->desc, "SCHEMA") == 0 &&
strncmp(te->defn, "--", 2) == 0)) &&
te->owner && strlen(te->owner) > 0 &&
te->dropStmt && strlen(te->dropStmt) > 0)
{

View File

@ -44,6 +44,7 @@
#include "catalog/pg_aggregate_d.h"
#include "catalog/pg_am_d.h"
#include "catalog/pg_attribute_d.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_cast_d.h"
#include "catalog/pg_class_d.h"
#include "catalog/pg_default_acl_d.h"
@ -1598,6 +1599,13 @@ checkExtensionMembership(DumpableObject *dobj, Archive *fout)
static void
selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
{
/*
* DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
* and (for --clean) a DROP SCHEMA statement. (In the absence of
* DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
*/
nsinfo->create = true;
/*
* If specific tables are being dumped, do not dump any complete
* namespaces. If specific namespaces are being dumped, dump just those
@ -1633,10 +1641,15 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
* no-mans-land between being a system object and a user object. We
* don't want to dump creation or comment commands for it, because
* that complicates matters for non-superuser use of pg_dump. But we
* should dump any ACL changes that have occurred for it, and of
* course we should dump contained objects.
* should dump any ownership changes, security labels, and ACL
* changes, and of course we should dump contained objects. pg_dump
* ties ownership to DUMP_COMPONENT_DEFINITION. Hence, unless the
* owner is the default, dump a "definition" bearing just a comment.
*/
nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
nsinfo->create = false;
nsinfo->dobj.dump = DUMP_COMPONENT_ALL & ~DUMP_COMPONENT_COMMENT;
if (nsinfo->nspowner == BOOTSTRAP_SUPERUSERID)
nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
nsinfo->dobj.dump_contains = DUMP_COMPONENT_ALL;
}
else
@ -3428,8 +3441,8 @@ getBlobs(Archive *fout)
PQExpBuffer init_racl_subquery = createPQExpBuffer();
buildACLQueries(acl_subquery, racl_subquery, init_acl_subquery,
init_racl_subquery, "l.lomacl", "l.lomowner", "'L'",
dopt->binary_upgrade);
init_racl_subquery, "l.lomacl", "l.lomowner",
"pip.initprivs", "'L'", dopt->binary_upgrade);
appendPQExpBuffer(blobQry,
"SELECT l.oid, (%s l.lomowner) AS rolname, "
@ -4830,6 +4843,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
int i_tableoid;
int i_oid;
int i_nspname;
int i_nspowner;
int i_rolname;
int i_nspacl;
int i_rnspacl;
@ -4849,11 +4863,27 @@ getNamespaces(Archive *fout, int *numNamespaces)
PQExpBuffer init_acl_subquery = createPQExpBuffer();
PQExpBuffer init_racl_subquery = createPQExpBuffer();
/*
* Bypass pg_init_privs.initprivs for the public schema. Dropping and
* recreating the schema detaches it from its pg_init_privs row, but
* an empty destination database starts with this ACL nonetheless.
* Also, we support dump/reload of public schema ownership changes.
* ALTER SCHEMA OWNER filters nspacl through aclnewowner(), but
* initprivs continues to reflect the initial owner (the bootstrap
* superuser). Hence, synthesize the value that nspacl will have
* after the restore's ALTER SCHEMA OWNER.
*/
buildACLQueries(acl_subquery, racl_subquery, init_acl_subquery,
init_racl_subquery, "n.nspacl", "n.nspowner", "'n'",
dopt->binary_upgrade);
init_racl_subquery, "n.nspacl", "n.nspowner",
"CASE WHEN n.nspname = 'public' THEN array["
" format('%s=UC/%s', "
" n.nspowner::regrole, n.nspowner::regrole),"
" format('=UC/%s', n.nspowner::regrole)]::aclitem[] "
"ELSE pip.initprivs END",
"'n'", dopt->binary_upgrade);
appendPQExpBuffer(query, "SELECT n.tableoid, n.oid, n.nspname, "
"n.nspowner, "
"(%s nspowner) AS rolname, "
"%s as nspacl, "
"%s as rnspacl, "
@ -4878,7 +4908,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
destroyPQExpBuffer(init_racl_subquery);
}
else
appendPQExpBuffer(query, "SELECT tableoid, oid, nspname, "
appendPQExpBuffer(query, "SELECT tableoid, oid, nspname, nspowner, "
"(%s nspowner) AS rolname, "
"nspacl, NULL as rnspacl, "
"NULL AS initnspacl, NULL as initrnspacl "
@ -4894,6 +4924,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
i_tableoid = PQfnumber(res, "tableoid");
i_oid = PQfnumber(res, "oid");
i_nspname = PQfnumber(res, "nspname");
i_nspowner = PQfnumber(res, "nspowner");
i_rolname = PQfnumber(res, "rolname");
i_nspacl = PQfnumber(res, "nspacl");
i_rnspacl = PQfnumber(res, "rnspacl");
@ -4907,6 +4938,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
nsinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
AssignDumpId(&nsinfo[i].dobj);
nsinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_nspname));
nsinfo[i].nspowner = atooid(PQgetvalue(res, i, i_nspowner));
nsinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
nsinfo[i].nspacl = pg_strdup(PQgetvalue(res, i, i_nspacl));
nsinfo[i].rnspacl = pg_strdup(PQgetvalue(res, i, i_rnspacl));
@ -5098,8 +5130,8 @@ getTypes(Archive *fout, int *numTypes)
PQExpBuffer initracl_subquery = createPQExpBuffer();
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "t.typacl", "t.typowner", "'T'",
dopt->binary_upgrade);
initracl_subquery, "t.typacl", "t.typowner",
"pip.initprivs", "'T'", dopt->binary_upgrade);
appendPQExpBuffer(query, "SELECT t.tableoid, t.oid, t.typname, "
"t.typnamespace, "
@ -5800,8 +5832,8 @@ getAggregates(Archive *fout, int *numAggs)
const char *agg_check;
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "p.proacl", "p.proowner", "'f'",
dopt->binary_upgrade);
initracl_subquery, "p.proacl", "p.proowner",
"pip.initprivs", "'f'", dopt->binary_upgrade);
agg_check = (fout->remoteVersion >= 110000 ? "p.prokind = 'a'"
: "p.proisagg");
@ -6013,8 +6045,8 @@ getFuncs(Archive *fout, int *numFuncs)
const char *not_agg_check;
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "p.proacl", "p.proowner", "'f'",
dopt->binary_upgrade);
initracl_subquery, "p.proacl", "p.proowner",
"pip.initprivs", "'f'", dopt->binary_upgrade);
not_agg_check = (fout->remoteVersion >= 110000 ? "p.prokind <> 'a'"
: "NOT p.proisagg");
@ -6310,13 +6342,14 @@ getTables(Archive *fout, int *numTables)
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "c.relacl", "c.relowner",
"pip.initprivs",
"CASE WHEN c.relkind = " CppAsString2(RELKIND_SEQUENCE)
" THEN 's' ELSE 'r' END::\"char\"",
dopt->binary_upgrade);
buildACLQueries(attacl_subquery, attracl_subquery, attinitacl_subquery,
attinitracl_subquery, "at.attacl", "c.relowner", "'c'",
dopt->binary_upgrade);
attinitracl_subquery, "at.attacl", "c.relowner",
"pip.initprivs", "'c'", dopt->binary_upgrade);
appendPQExpBuffer(query,
"SELECT c.tableoid, c.oid, c.relname, "
@ -8241,8 +8274,8 @@ getProcLangs(Archive *fout, int *numProcLangs)
PQExpBuffer initracl_subquery = createPQExpBuffer();
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "l.lanacl", "l.lanowner", "'l'",
dopt->binary_upgrade);
initracl_subquery, "l.lanacl", "l.lanowner",
"pip.initprivs", "'l'", dopt->binary_upgrade);
/* pg_language has a laninline column */
appendPQExpBuffer(query, "SELECT l.tableoid, l.oid, "
@ -9432,8 +9465,8 @@ getForeignDataWrappers(Archive *fout, int *numForeignDataWrappers)
PQExpBuffer initracl_subquery = createPQExpBuffer();
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "f.fdwacl", "f.fdwowner", "'F'",
dopt->binary_upgrade);
initracl_subquery, "f.fdwacl", "f.fdwowner",
"pip.initprivs", "'F'", dopt->binary_upgrade);
appendPQExpBuffer(query, "SELECT f.tableoid, f.oid, f.fdwname, "
"(%s f.fdwowner) AS rolname, "
@ -9599,8 +9632,8 @@ getForeignServers(Archive *fout, int *numForeignServers)
PQExpBuffer initracl_subquery = createPQExpBuffer();
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "f.srvacl", "f.srvowner", "'S'",
dopt->binary_upgrade);
initracl_subquery, "f.srvacl", "f.srvowner",
"pip.initprivs", "'S'", dopt->binary_upgrade);
appendPQExpBuffer(query, "SELECT f.tableoid, f.oid, f.srvname, "
"(%s f.srvowner) AS rolname, "
@ -9746,6 +9779,7 @@ getDefaultACLs(Archive *fout, int *numDefaultACLs)
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "defaclacl", "defaclrole",
"pip.initprivs",
"CASE WHEN defaclobjtype = 'S' THEN 's' ELSE defaclobjtype END::\"char\"",
dopt->binary_upgrade);
@ -10363,9 +10397,19 @@ dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo)
qnspname = pg_strdup(fmtId(nspinfo->dobj.name));
appendPQExpBuffer(delq, "DROP SCHEMA %s;\n", qnspname);
appendPQExpBuffer(q, "CREATE SCHEMA %s;\n", qnspname);
if (nspinfo->create)
{
appendPQExpBuffer(delq, "DROP SCHEMA %s;\n", qnspname);
appendPQExpBuffer(q, "CREATE SCHEMA %s;\n", qnspname);
}
else
{
/* see selectDumpableNamespace() */
appendPQExpBufferStr(delq,
"-- *not* dropping schema, since initdb creates it\n");
appendPQExpBufferStr(q,
"-- *not* creating schema, since initdb creates it\n");
}
if (dopt->binary_upgrade)
binary_upgrade_extension_member(q, &nspinfo->dobj,
@ -15556,8 +15600,8 @@ dumpTable(Archive *fout, const TableInfo *tbinfo)
PQExpBuffer initracl_subquery = createPQExpBuffer();
buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
initracl_subquery, "at.attacl", "c.relowner", "'c'",
dopt->binary_upgrade);
initracl_subquery, "at.attacl", "c.relowner",
"pip.initprivs", "'c'", dopt->binary_upgrade);
appendPQExpBuffer(query,
"SELECT at.attname, "

View File

@ -142,6 +142,8 @@ typedef struct _dumpableObject
typedef struct _namespaceInfo
{
DumpableObject dobj;
bool create; /* CREATE SCHEMA, or just set owner? */
Oid nspowner;
char *rolname; /* name of owner, or empty string */
char *nspacl;
char *rnspacl;

View File

@ -128,6 +128,14 @@ my %pgdump_runs = (
'regress_pg_dump_test',
],
},
defaults_public_owner => {
database => 'regress_public_owner',
dump_cmd => [
'pg_dump', '--no-sync', '-f',
"$tempdir/defaults_public_owner.sql",
'regress_public_owner',
],
},
# Do not use --no-sync to give test coverage for data sync.
defaults_custom_format => {
@ -620,6 +628,24 @@ my %tests = (
unlike => { no_owner => 1, },
},
'ALTER SCHEMA public OWNER TO' => {
# see test "REVOKE CREATE ON SCHEMA public" for causative create_sql
regexp => qr/^ALTER SCHEMA public OWNER TO .+;/m,
like => {
%full_runs, section_pre_data => 1,
},
unlike => { no_owner => 1, },
},
'ALTER SCHEMA public OWNER TO (w/o ACL changes)' => {
database => 'regress_public_owner',
create_order => 100,
create_sql =>
'ALTER SCHEMA public OWNER TO "regress_quoted \"" role";',
regexp => qr/^(GRANT|REVOKE)/m,
unlike => { defaults_public_owner => 1 },
},
'ALTER SEQUENCE test_table_col1_seq' => {
regexp => qr/^
\QALTER SEQUENCE dump_test.test_table_col1_seq OWNED BY dump_test.test_table.col1;\E
@ -955,6 +981,13 @@ my %tests = (
like => {},
},
'COMMENT ON SCHEMA public' => {
regexp => qr/^COMMENT ON SCHEMA public IS .+;/m,
# this shouldn't ever get emitted
like => {},
},
'COMMENT ON TABLE dump_test.test_table' => {
create_order => 36,
create_sql => 'COMMENT ON TABLE dump_test.test_table
@ -1385,6 +1418,18 @@ my %tests = (
},
},
'CREATE ROLE regress_quoted...' => {
create_order => 1,
create_sql => 'CREATE ROLE "regress_quoted \"" role";',
regexp => qr/^\QCREATE ROLE "regress_quoted \"" role";\E/m,
like => {
pg_dumpall_dbprivs => 1,
pg_dumpall_exclude => 1,
pg_dumpall_globals => 1,
pg_dumpall_globals_clean => 1,
},
},
'CREATE ACCESS METHOD gist2' => {
create_order => 52,
create_sql =>
@ -3333,6 +3378,23 @@ my %tests = (
unlike => { no_privs => 1, },
},
# With the exception of the public schema, we don't dump ownership changes
# for objects originating at initdb. Hence, any GRANT or REVOKE affecting
# owner privileges for those objects should reference the bootstrap
# superuser, not the dump-time owner.
'REVOKE EXECUTE ON FUNCTION pg_stat_reset FROM regress_dump_test_role' =>
{
create_order => 15,
create_sql => '
ALTER FUNCTION pg_stat_reset OWNER TO regress_dump_test_role;
REVOKE EXECUTE ON FUNCTION pg_stat_reset
FROM regress_dump_test_role;',
regexp => qr/^[^-].*pg_stat_reset.* regress_dump_test_role/m,
# this shouldn't ever get emitted
like => {},
},
'REVOKE SELECT ON TABLE pg_proc FROM public' => {
create_order => 45,
create_sql => 'REVOKE SELECT ON TABLE pg_proc FROM public;',
@ -3344,9 +3406,13 @@ my %tests = (
'REVOKE CREATE ON SCHEMA public FROM public' => {
create_order => 16,
create_sql => 'REVOKE CREATE ON SCHEMA public FROM public;',
regexp => qr/^
\QREVOKE ALL ON SCHEMA public FROM PUBLIC;\E
create_sql => '
REVOKE CREATE ON SCHEMA public FROM public;
ALTER SCHEMA public OWNER TO "regress_quoted \"" role";
REVOKE ALL ON SCHEMA public FROM "regress_quoted \"" role";',
regexp => qr/^
\QREVOKE ALL ON SCHEMA public FROM "regress_quoted \"" role";\E
\n\QREVOKE ALL ON SCHEMA public FROM PUBLIC;\E
\n\QGRANT USAGE ON SCHEMA public TO PUBLIC;\E
/xm,
like => { %full_runs, section_pre_data => 1, },
@ -3451,8 +3517,9 @@ if ($collation_check_stderr !~ /ERROR: /)
# Determine whether build supports LZ4.
my $supports_lz4 = check_pg_config("#define HAVE_LIBLZ4 1");
# Create a second database for certain tests to work against
# Create additional databases for mutations of schema public
$node->psql('postgres', 'create database regress_pg_dump_test;');
$node->psql('postgres', 'create database regress_public_owner;');
# Start with number of command_fails_like()*2 tests below (each
# command_fails_like is actually 2 tests)

View File

@ -318,6 +318,14 @@ my %tests = (
like => { pg_dumpall_globals => 1, },
},
'CREATE SCHEMA public' => {
regexp => qr/^CREATE SCHEMA public;/m,
like => {
extension_schema => 1,
without_extension_explicit_schema => 1,
},
},
'CREATE SEQUENCE regress_pg_dump_table_col1_seq' => {
regexp => qr/^
\QCREATE SEQUENCE public.regress_pg_dump_table_col1_seq\E