Rethink pg_dump's handling of object ACLs.

Throw away most of the existing logic for this, as it was very
inefficient thanks to expensive sub-selects executed to collect
ACL data that we very possibly would have no interest in dumping.
Reduce the ACL handling in the initial per-object-type queries
to be just collection of the catalog ACL fields, as it was
originally.  Fetch pg_init_privs data separately in a single
scan of that catalog, and do the merging calculations on the
client side.  Remove the separate code path used for pre-9.6
source servers; there is no good reason to treat them differently
from newer servers that happen to have empty pg_init_privs.

Discussion: https://postgr.es/m/2273648.1634764485@sss.pgh.pa.us
Discussion: https://postgr.es/m/7d7eb6128f40401d81b3b7a898b6b4de@W2012-02.nidsa.loc
This commit is contained in:
Tom Lane 2021-12-06 12:39:45 -05:00
parent 5209c0ba0b
commit 0c9d84427f
7 changed files with 784 additions and 1137 deletions

View File

@ -24,7 +24,7 @@ static bool parseAclItem(const char *item, const char *type,
const char *name, const char *subname, int remoteVersion,
PQExpBuffer grantee, PQExpBuffer grantor,
PQExpBuffer privs, PQExpBuffer privswgo);
static char *copyAclUserName(PQExpBuffer output, char *input);
static char *dequoteAclUserName(PQExpBuffer output, char *input);
static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
const char *subname);
@ -39,7 +39,8 @@ static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
* TABLE, SEQUENCE, FUNCTION, PROCEDURE, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
* FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT)
* acls: the ACL string fetched from the database
* racls: the ACL string of any initial-but-now-revoked privileges
* baseacls: the initial ACL string for this object; can be
* NULL or empty string to indicate "not available from server"
* owner: username of object owner (will be passed through fmtId); can be
* NULL or empty string to indicate "no owner known"
* prefix: string to prefix to each generated command; typically empty
@ -48,6 +49,12 @@ static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
* Returns true if okay, false if could not parse the acl string.
* The resulting commands (if any) are appended to the contents of 'sql'.
*
* baseacls is typically the result of acldefault() for the object's type
* and owner. However, if there is a pg_init_privs entry for the object,
* it should instead be the initprivs ACLs. When acls is itself a
* pg_init_privs entry, baseacls is what to dump that relative to; then
* it can be either an acldefault() value or an empty ACL "{}".
*
* Note: when processing a default ACL, prefix is "ALTER DEFAULT PRIVILEGES "
* or something similar, and name is an empty string.
*
@ -56,15 +63,19 @@ static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
*/
bool
buildACLCommands(const char *name, const char *subname, const char *nspname,
const char *type, const char *acls, const char *racls,
const char *type, const char *acls, const char *baseacls,
const char *owner, const char *prefix, int remoteVersion,
PQExpBuffer sql)
{
bool ok = true;
char **aclitems = NULL;
char **raclitems = NULL;
char **baseitems = NULL;
char **grantitems = NULL;
char **revokeitems = NULL;
int naclitems = 0;
int nraclitems = 0;
int nbaseitems = 0;
int ngrantitems = 0;
int nrevokeitems = 0;
int i;
PQExpBuffer grantee,
grantor,
@ -72,37 +83,88 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
privswgo;
PQExpBuffer firstsql,
secondsql;
bool found_owner_privs = false;
if (strlen(acls) == 0 && strlen(racls) == 0)
/*
* If the acl was NULL (initial default state), we need do nothing. Note
* that this is distinguishable from all-privileges-revoked, which will
* look like an empty array ("{}").
*/
if (acls == NULL || *acls == '\0')
return true; /* object has default permissions */
/* treat empty-string owner same as NULL */
if (owner && *owner == '\0')
owner = NULL;
if (strlen(acls) != 0)
/* Parse the acls array */
if (!parsePGArray(acls, &aclitems, &naclitems))
{
if (!parsePGArray(acls, &aclitems, &naclitems))
if (aclitems)
free(aclitems);
return false;
}
/* Parse the baseacls, if provided */
if (baseacls && *baseacls != '\0')
{
if (!parsePGArray(baseacls, &baseitems, &nbaseitems))
{
if (aclitems)
free(aclitems);
if (baseitems)
free(baseitems);
return false;
}
}
if (strlen(racls) != 0)
/*
* Compare the actual ACL with the base ACL, extracting the privileges
* that need to be granted (i.e., are in the actual ACL but not the base
* ACL) and the ones that need to be revoked (the reverse). We use plain
* string comparisons to check for matches. In principle that could be
* fooled by extraneous issues such as whitespace, but since all these
* strings are the work of aclitemout(), it should be OK in practice.
* Besides, a false mismatch will just cause the output to be a little
* more verbose than it really needed to be.
*
* (If we weren't given a base ACL, this stanza winds up with all the
* ACL's items in grantitems and nothing in revokeitems. It's not worth
* special-casing that.)
*/
grantitems = (char **) pg_malloc(naclitems * sizeof(char *));
for (i = 0; i < naclitems; i++)
{
if (!parsePGArray(racls, &raclitems, &nraclitems))
bool found = false;
for (int j = 0; j < nbaseitems; j++)
{
if (aclitems)
free(aclitems);
if (raclitems)
free(raclitems);
return false;
if (strcmp(aclitems[i], baseitems[j]) == 0)
{
found = true;
break;
}
}
if (!found)
grantitems[ngrantitems++] = aclitems[i];
}
revokeitems = (char **) pg_malloc(nbaseitems * sizeof(char *));
for (i = 0; i < nbaseitems; i++)
{
bool found = false;
for (int j = 0; j < naclitems; j++)
{
if (strcmp(baseitems[i], aclitems[j]) == 0)
{
found = true;
break;
}
}
if (!found)
revokeitems[nrevokeitems++] = baseitems[i];
}
/* Prepare working buffers */
grantee = createPQExpBuffer();
grantor = createPQExpBuffer();
privs = createPQExpBuffer();
@ -110,50 +172,21 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
/*
* At the end, these two will be pasted together to form the result.
*
* For older systems we use these to ensure that the owner privileges go
* before the other ones, as a GRANT could create the default entry for
* the object, which generally includes all rights for the owner. In more
* recent versions we normally handle this because the owner rights come
* first in the ACLs, but older versions might have them after the PUBLIC
* privileges.
*
* For 9.6 and later systems, much of this changes. With 9.6, we check
* the default privileges for the objects at dump time and create two sets
* of ACLs- "racls" which are the ACLs to REVOKE from the object (as the
* object may have initial privileges on it, along with any default ACLs
* which are not part of the current set of privileges), and regular
* "acls", which are the ACLs to GRANT to the object. We handle the
* REVOKEs first, followed by the GRANTs.
*/
firstsql = createPQExpBuffer();
secondsql = createPQExpBuffer();
/*
* For pre-9.6 systems, we always start with REVOKE ALL FROM PUBLIC, as we
* don't wish to make any assumptions about what the default ACLs are, and
* we do not collect them during the dump phase (and racls will always be
* the empty set, see above).
* If we weren't given baseacls information, we just revoke everything and
* then grant what's listed in the ACL. This avoids having to embed
* detailed knowledge about what the defaults are/were, and it's not very
* expensive since servers lacking acldefault() are now rare.
*
* For 9.6 and later, if any revoke ACLs have been provided, then include
* them in 'firstsql'.
*
* Revoke ACLs happen when an object starts out life with a set of
* privileges (eg: GRANT SELECT ON pg_class TO PUBLIC;) and the user has
* decided to revoke those rights. Since those objects come into being
* with those default privileges, we have to revoke them to match what the
* current state of affairs is. Note that we only started explicitly
* tracking such initial rights in 9.6, and prior to that all initial
* rights are actually handled by the simple 'REVOKE ALL .. FROM PUBLIC'
* case, for initdb-created objects. Prior to 9.6, we didn't handle
* extensions correctly, but we do now by tracking their initial
* privileges, in the same way we track initdb initial privileges, see
* pg_init_privs.
* Otherwise, we need only revoke what's listed in revokeitems.
*/
if (remoteVersion < 90600)
if (baseacls == NULL || *baseacls == '\0')
{
Assert(nraclitems == 0);
/* We assume the old defaults only involved the owner and PUBLIC */
appendPQExpBuffer(firstsql, "%sREVOKE ALL", prefix);
if (subname)
appendPQExpBuffer(firstsql, "(%s)", subname);
@ -161,13 +194,24 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
if (nspname && *nspname)
appendPQExpBuffer(firstsql, "%s.", fmtId(nspname));
appendPQExpBuffer(firstsql, "%s FROM PUBLIC;\n", name);
if (owner)
{
appendPQExpBuffer(firstsql, "%sREVOKE ALL", prefix);
if (subname)
appendPQExpBuffer(firstsql, "(%s)", subname);
appendPQExpBuffer(firstsql, " ON %s ", type);
if (nspname && *nspname)
appendPQExpBuffer(firstsql, "%s.", fmtId(nspname));
appendPQExpBuffer(firstsql, "%s FROM %s;\n", name, fmtId(owner));
}
}
else
{
/* Scan individual REVOKE ACL items */
for (i = 0; i < nraclitems; i++)
for (i = 0; i < nrevokeitems; i++)
{
if (!parseAclItem(raclitems[i], type, name, subname, remoteVersion,
if (!parseAclItem(revokeitems[i],
type, name, subname, remoteVersion,
grantee, grantor, privs, NULL))
{
ok = false;
@ -195,6 +239,10 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
}
/*
* At this point we have issued REVOKE statements for all initial and
* default privileges that are no longer present on the object, so we are
* almost ready to GRANT the privileges listed in grantitems[].
*
* We still need some hacking though to cover the case where new default
* public privileges are added in new versions: the REVOKE ALL will revoke
* them, leading to behavior different from what the old version had,
@ -208,146 +256,92 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
prefix, type, name);
}
/* Scan individual ACL items */
for (i = 0; i < naclitems; i++)
/*
* Scan individual ACL items to be granted.
*
* The order in which privileges appear in the ACL string (the order they
* have been GRANT'd in, which the backend maintains) must be preserved to
* ensure that GRANTs WITH GRANT OPTION and subsequent GRANTs based on
* those are dumped in the correct order. However, some old server
* versions will show grants to PUBLIC before the owner's own grants; for
* consistency's sake, force the owner's grants to be output first.
*/
for (i = 0; i < ngrantitems; i++)
{
if (!parseAclItem(aclitems[i], type, name, subname, remoteVersion,
grantee, grantor, privs, privswgo))
{
ok = false;
break;
}
if (grantor->len == 0 && owner)
printfPQExpBuffer(grantor, "%s", owner);
if (privs->len > 0 || privswgo->len > 0)
if (parseAclItem(grantitems[i], type, name, subname, remoteVersion,
grantee, grantor, privs, privswgo))
{
/*
* Prior to 9.6, we had to handle owner privileges in a special
* manner by first REVOKE'ing the rights and then GRANT'ing them
* after. With 9.6 and above, what we need to REVOKE and what we
* need to GRANT is figured out when we dump and stashed into
* "racls" and "acls", respectively. See above.
* If the grantor isn't the owner, we'll need to use SET SESSION
* AUTHORIZATION to become the grantor. Issue the SET/RESET only
* if there's something useful to do.
*/
if (remoteVersion < 90600 && owner
&& strcmp(grantee->data, owner) == 0
&& strcmp(grantor->data, owner) == 0)
if (privs->len > 0 || privswgo->len > 0)
{
found_owner_privs = true;
PQExpBuffer thissql;
/* Set owner as grantor if that's not explicit in the ACL */
if (grantor->len == 0 && owner)
printfPQExpBuffer(grantor, "%s", owner);
/* Make sure owner's own grants are output before others */
if (owner &&
strcmp(grantee->data, owner) == 0 &&
strcmp(grantor->data, owner) == 0)
thissql = firstsql;
else
thissql = secondsql;
/*
* For the owner, the default privilege level is ALL WITH
* GRANT OPTION.
*/
if (strcmp(privswgo->data, "ALL") != 0)
{
appendPQExpBuffer(firstsql, "%sREVOKE ALL", prefix);
if (subname)
appendPQExpBuffer(firstsql, "(%s)", subname);
appendPQExpBuffer(firstsql, " ON %s ", type);
if (nspname && *nspname)
appendPQExpBuffer(firstsql, "%s.", fmtId(nspname));
appendPQExpBuffer(firstsql, "%s FROM %s;\n",
name, fmtId(grantee->data));
if (privs->len > 0)
{
appendPQExpBuffer(firstsql,
"%sGRANT %s ON %s ",
prefix, privs->data, type);
if (nspname && *nspname)
appendPQExpBuffer(firstsql, "%s.", fmtId(nspname));
appendPQExpBuffer(firstsql,
"%s TO %s;\n",
name, fmtId(grantee->data));
}
if (privswgo->len > 0)
{
appendPQExpBuffer(firstsql,
"%sGRANT %s ON %s ",
prefix, privswgo->data, type);
if (nspname && *nspname)
appendPQExpBuffer(firstsql, "%s.", fmtId(nspname));
appendPQExpBuffer(firstsql,
"%s TO %s WITH GRANT OPTION;\n",
name, fmtId(grantee->data));
}
}
}
else
{
/*
* For systems prior to 9.6, we can assume we are starting
* from no privs at this point.
*
* For 9.6 and above, at this point we have issued REVOKE
* statements for all initial and default privileges which are
* no longer present on the object (as they were passed in as
* 'racls') and we can simply GRANT the rights which are in
* 'acls'.
*/
if (grantor->len > 0
&& (!owner || strcmp(owner, grantor->data) != 0))
appendPQExpBuffer(secondsql, "SET SESSION AUTHORIZATION %s;\n",
appendPQExpBuffer(thissql, "SET SESSION AUTHORIZATION %s;\n",
fmtId(grantor->data));
if (privs->len > 0)
{
appendPQExpBuffer(secondsql, "%sGRANT %s ON %s ",
appendPQExpBuffer(thissql, "%sGRANT %s ON %s ",
prefix, privs->data, type);
if (nspname && *nspname)
appendPQExpBuffer(secondsql, "%s.", fmtId(nspname));
appendPQExpBuffer(secondsql, "%s TO ", name);
appendPQExpBuffer(thissql, "%s.", fmtId(nspname));
appendPQExpBuffer(thissql, "%s TO ", name);
if (grantee->len == 0)
appendPQExpBufferStr(secondsql, "PUBLIC;\n");
appendPQExpBufferStr(thissql, "PUBLIC;\n");
else if (strncmp(grantee->data, "group ",
strlen("group ")) == 0)
appendPQExpBuffer(secondsql, "GROUP %s;\n",
appendPQExpBuffer(thissql, "GROUP %s;\n",
fmtId(grantee->data + strlen("group ")));
else
appendPQExpBuffer(secondsql, "%s;\n", fmtId(grantee->data));
appendPQExpBuffer(thissql, "%s;\n", fmtId(grantee->data));
}
if (privswgo->len > 0)
{
appendPQExpBuffer(secondsql, "%sGRANT %s ON %s ",
appendPQExpBuffer(thissql, "%sGRANT %s ON %s ",
prefix, privswgo->data, type);
if (nspname && *nspname)
appendPQExpBuffer(secondsql, "%s.", fmtId(nspname));
appendPQExpBuffer(secondsql, "%s TO ", name);
appendPQExpBuffer(thissql, "%s.", fmtId(nspname));
appendPQExpBuffer(thissql, "%s TO ", name);
if (grantee->len == 0)
appendPQExpBufferStr(secondsql, "PUBLIC");
appendPQExpBufferStr(thissql, "PUBLIC");
else if (strncmp(grantee->data, "group ",
strlen("group ")) == 0)
appendPQExpBuffer(secondsql, "GROUP %s",
appendPQExpBuffer(thissql, "GROUP %s",
fmtId(grantee->data + strlen("group ")));
else
appendPQExpBufferStr(secondsql, fmtId(grantee->data));
appendPQExpBufferStr(secondsql, " WITH GRANT OPTION;\n");
appendPQExpBufferStr(thissql, fmtId(grantee->data));
appendPQExpBufferStr(thissql, " WITH GRANT OPTION;\n");
}
if (grantor->len > 0
&& (!owner || strcmp(owner, grantor->data) != 0))
appendPQExpBufferStr(secondsql, "RESET SESSION AUTHORIZATION;\n");
appendPQExpBufferStr(thissql, "RESET SESSION AUTHORIZATION;\n");
}
}
}
/*
* For systems prior to 9.6, if we didn't find any owner privs, the owner
* must have revoked 'em all.
*
* For 9.6 and above, we handle this through the 'racls'. See above.
*/
if (remoteVersion < 90600 && !found_owner_privs && owner)
{
appendPQExpBuffer(firstsql, "%sREVOKE ALL", prefix);
if (subname)
appendPQExpBuffer(firstsql, "(%s)", subname);
appendPQExpBuffer(firstsql, " ON %s ", type);
if (nspname && *nspname)
appendPQExpBuffer(firstsql, "%s.", fmtId(nspname));
appendPQExpBuffer(firstsql, "%s FROM %s;\n",
name, fmtId(owner));
else
{
/* parseAclItem failed, give up */
ok = false;
break;
}
}
destroyPQExpBuffer(grantee);
@ -361,19 +355,23 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
if (aclitems)
free(aclitems);
if (raclitems)
free(raclitems);
if (baseitems)
free(baseitems);
if (grantitems)
free(grantitems);
if (revokeitems)
free(revokeitems);
return ok;
}
/*
* Build ALTER DEFAULT PRIVILEGES command(s) for single pg_default_acl entry.
* Build ALTER DEFAULT PRIVILEGES command(s) for a single pg_default_acl entry.
*
* type: the object type (TABLES, FUNCTIONS, etc)
* nspname: schema name, or NULL for global default privileges
* acls: the ACL string fetched from the database
* acldefault: the appropriate default ACL for the object type and owner
* owner: username of privileges owner (will be passed through fmtId)
* remoteVersion: version of database
*
@ -382,8 +380,7 @@ buildACLCommands(const char *name, const char *subname, const char *nspname,
*/
bool
buildDefaultACLCommands(const char *type, const char *nspname,
const char *acls, const char *racls,
const char *initacls, const char *initracls,
const char *acls, const char *acldefault,
const char *owner,
int remoteVersion,
PQExpBuffer sql)
@ -403,21 +400,12 @@ buildDefaultACLCommands(const char *type, const char *nspname,
if (nspname)
appendPQExpBuffer(prefix, "IN SCHEMA %s ", fmtId(nspname));
if (strlen(initacls) != 0 || strlen(initracls) != 0)
{
appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
if (!buildACLCommands("", NULL, NULL, type,
initacls, initracls, owner,
prefix->data, remoteVersion, sql))
{
destroyPQExpBuffer(prefix);
return false;
}
appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
}
/*
* There's no such thing as initprivs for a default ACL, so the base ACL
* is always just the object-type-specific default.
*/
if (!buildACLCommands("", NULL, NULL, type,
acls, racls, owner,
acls, acldefault, owner,
prefix->data, remoteVersion, sql))
{
destroyPQExpBuffer(prefix);
@ -467,7 +455,7 @@ parseAclItem(const char *item, const char *type,
buf = pg_strdup(item);
/* user or group name is string up to = */
eqpos = copyAclUserName(grantee, buf);
eqpos = dequoteAclUserName(grantee, buf);
if (*eqpos != '=')
{
pg_free(buf);
@ -479,7 +467,7 @@ parseAclItem(const char *item, const char *type,
if (slpos)
{
*slpos++ = '\0';
slpos = copyAclUserName(grantor, slpos);
slpos = dequoteAclUserName(grantor, slpos);
if (*slpos != '\0')
{
pg_free(buf);
@ -603,13 +591,46 @@ do { \
return true;
}
/*
* Transfer the role name at *input into the output buffer, adding
* quoting according to the same rules as putid() in backend's acl.c.
*/
void
quoteAclUserName(PQExpBuffer output, const char *input)
{
const char *src;
bool safe = true;
for (src = input; *src; src++)
{
/* This test had better match what putid() does */
if (!isalnum((unsigned char) *src) && *src != '_')
{
safe = false;
break;
}
}
if (!safe)
appendPQExpBufferChar(output, '"');
for (src = input; *src; src++)
{
/* A double quote character in a username is encoded as "" */
if (*src == '"')
appendPQExpBufferChar(output, '"');
appendPQExpBufferChar(output, *src);
}
if (!safe)
appendPQExpBufferChar(output, '"');
}
/*
* Transfer a user or group name starting at *input into the output buffer,
* dequoting if needed. Returns a pointer to just past the input name.
* The name is taken to end at an unquoted '=' or end of string.
* Note: unlike quoteAclUserName(), this first clears the output buffer.
*/
static char *
copyAclUserName(PQExpBuffer output, char *input)
dequoteAclUserName(PQExpBuffer output, char *input)
{
resetPQExpBuffer(output);
@ -708,137 +729,6 @@ emitShSecLabels(PGconn *conn, PGresult *res, PQExpBuffer buffer,
}
}
/*
* buildACLQueries
*
* Build the subqueries to extract out the correct set of ACLs to be
* GRANT'd and REVOKE'd for the specific kind of object, accounting for any
* initial privileges (from pg_init_privs) and based on if we are in binary
* upgrade mode or not.
*
* Also builds subqueries to extract out the set of ACLs to go from the object
* default privileges to the privileges in pg_init_privs, if we are in binary
* upgrade mode, so that those privileges can be set up and recorded in the new
* cluster before the regular privileges are added on top of those.
*/
void
buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
PQExpBuffer init_acl_subquery, PQExpBuffer init_racl_subquery,
const char *acl_column, const char *acl_owner,
const char *initprivs_expr,
const char *obj_kind, bool binary_upgrade)
{
/*
* To get the delta from what the permissions were at creation time
* (either initdb or CREATE EXTENSION) vs. what they are now, we have to
* look at two things:
*
* What privileges have been added, which we calculate by extracting all
* the current privileges (using the set of default privileges for the
* object type if current privileges are NULL) and then removing those
* which existed at creation time (again, using the set of default
* privileges for the object type if there were no creation time
* privileges).
*
* What privileges have been removed, which we calculate by extracting the
* privileges as they were at creation time (or the default privileges, as
* above), and then removing the current privileges (or the default
* privileges, if current privileges are NULL).
*
* As a good cross-check, both directions of these checks should result in
* the empty set if both the current ACL and the initial privs are NULL
* (meaning, in practice, that the default ACLs were there at init time
* and is what the current privileges are).
*
* We always perform this delta on all ACLs and expect that by the time
* these are run the initial privileges will be in place, even in a binary
* upgrade situation (see below).
*
* Finally, the order in which privileges are in the ACL string (the order
* they been GRANT'd in, which the backend maintains) must be preserved to
* ensure that GRANTs WITH GRANT OPTION and subsequent GRANTs based on
* those are dumped in the correct order.
*/
printfPQExpBuffer(acl_subquery,
"(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
"(SELECT acl, row_n FROM "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"WITH ORDINALITY AS perm(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"AS init(init_acl) WHERE acl = init_acl)) as foo)",
acl_column,
obj_kind,
acl_owner,
initprivs_expr,
obj_kind,
acl_owner);
printfPQExpBuffer(racl_subquery,
"(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
"(SELECT acl, row_n FROM "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"WITH ORDINALITY AS initp(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM "
"pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
"AS permp(orig_acl) WHERE acl = orig_acl)) as foo)",
initprivs_expr,
obj_kind,
acl_owner,
acl_column,
obj_kind,
acl_owner);
/*
* In binary upgrade mode we don't run the extension script but instead
* dump out the objects independently and then recreate them. To preserve
* the initial privileges which were set on extension objects, we need to
* grab the set of GRANT and REVOKE commands necessary to get from the
* default privileges of an object to the initial privileges as recorded
* in pg_init_privs.
*
* These will then be run ahead of the regular ACL commands, which were
* calculated using the queries above, inside of a block which sets a flag
* to indicate that the backend should record the results of these GRANT
* and REVOKE statements into pg_init_privs. This is how we preserve the
* contents of that catalog across binary upgrades.
*/
if (binary_upgrade)
{
printfPQExpBuffer(init_acl_subquery,
"CASE WHEN privtype = 'e' THEN "
"(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
"(SELECT acl, row_n FROM pg_catalog.unnest(%s) "
"WITH ORDINALITY AS initp(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM "
"pg_catalog.unnest(pg_catalog.acldefault(%s,%s)) "
"AS privm(orig_acl) WHERE acl = orig_acl)) as foo) END",
initprivs_expr,
obj_kind,
acl_owner);
printfPQExpBuffer(init_racl_subquery,
"CASE WHEN privtype = 'e' THEN "
"(SELECT pg_catalog.array_agg(acl) FROM "
"(SELECT acl, row_n FROM "
"pg_catalog.unnest(pg_catalog.acldefault(%s,%s)) "
"WITH ORDINALITY AS privp(acl,row_n) "
"WHERE NOT EXISTS ( "
"SELECT 1 FROM pg_catalog.unnest(%s) "
"AS initp(init_acl) WHERE acl = init_acl)) as foo) END",
obj_kind,
acl_owner,
initprivs_expr);
}
else
{
printfPQExpBuffer(init_acl_subquery, "NULL");
printfPQExpBuffer(init_racl_subquery, "NULL");
}
}
/*
* Detect whether the given GUC variable is of GUC_LIST_QUOTE type.

View File

@ -37,26 +37,22 @@
extern bool buildACLCommands(const char *name, const char *subname, const char *nspname,
const char *type, const char *acls, const char *racls,
const char *type, const char *acls, const char *baseacls,
const char *owner, const char *prefix, int remoteVersion,
PQExpBuffer sql);
extern bool buildDefaultACLCommands(const char *type, const char *nspname,
const char *acls, const char *racls,
const char *initacls, const char *initracls,
const char *acls, const char *acldefault,
const char *owner,
int remoteVersion,
PQExpBuffer sql);
extern void quoteAclUserName(PQExpBuffer output, const char *input);
extern void buildShSecLabelQuery(const char *catalog_name,
Oid objectId, PQExpBuffer sql);
extern void emitShSecLabels(PGconn *conn, PGresult *res,
PQExpBuffer buffer, const char *objtype, const char *objname);
extern void buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
PQExpBuffer init_acl_subquery, PQExpBuffer init_racl_subquery,
const char *acl_column, const char *acl_owner,
const char *initprivs_expr,
const char *obj_kind, bool binary_upgrade);
extern bool variable_is_guc_list_quote(const char *name);
extern bool SplitGUCList(char *rawstring, char separator,

File diff suppressed because it is too large Load Diff

View File

@ -146,16 +146,36 @@ typedef struct _dumpableObject
int allocDeps; /* allocated size of dependencies[] */
} DumpableObject;
/*
* Object types that have ACLs must store them in a DumpableAcl sub-struct,
* which must immediately follow the DumpableObject base struct.
*
* Note: when dumping from a pre-9.2 server, which lacks the acldefault()
* function, acldefault will be NULL or empty.
*/
typedef struct _dumpableAcl
{
char *acl; /* the object's actual ACL string */
char *acldefault; /* default ACL for the object's type & owner */
/* these fields come from the object's pg_init_privs entry, if any: */
char privtype; /* entry type, 'i' or 'e'; 0 if no entry */
char *initprivs; /* the object's initial ACL string, or NULL */
} DumpableAcl;
/* Generic struct that can be used to access any object type having an ACL */
typedef struct _dumpableObjectWithAcl
{
DumpableObject dobj;
DumpableAcl dacl;
} DumpableObjectWithAcl;
typedef struct _namespaceInfo
{
DumpableObject dobj;
DumpableAcl dacl;
bool create; /* CREATE SCHEMA, or just set owner? */
Oid nspowner;
char *rolname; /* name of owner, or empty string */
char *nspacl;
char *rnspacl;
char *initnspacl;
char *initrnspacl;
} NamespaceInfo;
typedef struct _extensionInfo
@ -171,6 +191,7 @@ typedef struct _extensionInfo
typedef struct _typeInfo
{
DumpableObject dobj;
DumpableAcl dacl;
/*
* Note: dobj.name is the raw pg_type.typname entry. ftypname is the
@ -179,10 +200,6 @@ typedef struct _typeInfo
*/
char *ftypname;
char *rolname; /* name of owner, or empty string */
char *typacl;
char *rtypacl;
char *inittypacl;
char *initrtypacl;
Oid typelem;
Oid typrelid;
char typrelkind; /* 'r', 'v', 'c', etc */
@ -207,15 +224,12 @@ typedef struct _shellTypeInfo
typedef struct _funcInfo
{
DumpableObject dobj;
DumpableAcl dacl;
char *rolname; /* name of owner, or empty string */
Oid lang;
int nargs;
Oid *argtypes;
Oid prorettype;
char *proacl;
char *rproacl;
char *initproacl;
char *initrproacl;
} FuncInfo;
/* AggInfo is a superset of FuncInfo */
@ -270,11 +284,8 @@ typedef struct _tableInfo
* These fields are collected for every table in the database.
*/
DumpableObject dobj;
DumpableAcl dacl;
char *rolname; /* name of owner, or empty string */
char *relacl;
char *rrelacl;
char *initrelacl;
char *initrrelacl;
char relkind;
char relpersistence; /* relation persistence */
bool relispopulated; /* relation is populated */
@ -286,6 +297,7 @@ typedef struct _tableInfo
bool hasindex; /* does it have any indexes? */
bool hasrules; /* does it have any rules? */
bool hastriggers; /* does it have any triggers? */
bool hascolumnACLs; /* do any columns have non-default ACLs? */
bool rowsec; /* is row security enabled? */
bool forcerowsec; /* is row security forced? */
bool hasoids; /* does it have OIDs? */
@ -478,14 +490,11 @@ typedef struct _constraintInfo
typedef struct _procLangInfo
{
DumpableObject dobj;
DumpableAcl dacl;
bool lanpltrusted;
Oid lanplcallfoid;
Oid laninline;
Oid lanvalidator;
char *lanacl;
char *rlanacl;
char *initlanacl;
char *initrlanacl;
char *lanowner; /* name of owner, or empty string */
} ProcLangInfo;
@ -550,49 +559,37 @@ typedef struct _cfgInfo
typedef struct _fdwInfo
{
DumpableObject dobj;
DumpableAcl dacl;
char *rolname;
char *fdwhandler;
char *fdwvalidator;
char *fdwoptions;
char *fdwacl;
char *rfdwacl;
char *initfdwacl;
char *initrfdwacl;
} FdwInfo;
typedef struct _foreignServerInfo
{
DumpableObject dobj;
DumpableAcl dacl;
char *rolname;
Oid srvfdw;
char *srvtype;
char *srvversion;
char *srvacl;
char *rsrvacl;
char *initsrvacl;
char *initrsrvacl;
char *srvoptions;
} ForeignServerInfo;
typedef struct _defaultACLInfo
{
DumpableObject dobj;
DumpableAcl dacl;
char *defaclrole;
char defaclobjtype;
char *defaclacl;
char *rdefaclacl;
char *initdefaclacl;
char *initrdefaclacl;
} DefaultACLInfo;
typedef struct _blobInfo
{
DumpableObject dobj;
DumpableAcl dacl;
char *rolname;
char *blobacl;
char *rblobacl;
char *initblobacl;
char *initrblobacl;
} BlobInfo;
/*

View File

@ -1166,55 +1166,12 @@ dumpTablespaces(PGconn *conn)
/*
* Get all tablespaces except built-in ones (which we assume are named
* pg_xxx)
*
* For the tablespace ACLs, as of 9.6, we extract both the positive (as
* spcacl) and negative (as rspcacl) ACLs, relative to the default ACL for
* tablespaces, which are then passed to buildACLCommands() below.
*
* See buildACLQueries() and buildACLCommands().
*
* The order in which privileges are in the ACL string (the order they
* have been GRANT'd in, which the backend maintains) must be preserved to
* ensure that GRANTs WITH GRANT OPTION and subsequent GRANTs based on
* those are dumped in the correct order.
*
* Note that we do not support initial privileges (pg_init_privs) on
* tablespaces, so this logic cannot make use of buildACLQueries().
*/
if (server_version >= 90600)
if (server_version >= 90200)
res = executeQuery(conn, "SELECT oid, spcname, "
"pg_catalog.pg_get_userbyid(spcowner) AS spcowner, "
"pg_catalog.pg_tablespace_location(oid), "
"(SELECT array_agg(acl ORDER BY row_n) FROM "
" (SELECT acl, row_n FROM "
" unnest(coalesce(spcacl,acldefault('t',spcowner))) "
" WITH ORDINALITY AS perm(acl,row_n) "
" WHERE NOT EXISTS ( "
" SELECT 1 "
" FROM unnest(acldefault('t',spcowner)) "
" AS init(init_acl) "
" WHERE acl = init_acl)) AS spcacls) "
" AS spcacl, "
"(SELECT array_agg(acl ORDER BY row_n) FROM "
" (SELECT acl, row_n FROM "
" unnest(acldefault('t',spcowner)) "
" WITH ORDINALITY AS initp(acl,row_n) "
" WHERE NOT EXISTS ( "
" SELECT 1 "
" FROM unnest(coalesce(spcacl,acldefault('t',spcowner))) "
" AS permp(orig_acl) "
" WHERE acl = orig_acl)) AS rspcacls) "
" AS rspcacl, "
"array_to_string(spcoptions, ', '),"
"pg_catalog.shobj_description(oid, 'pg_tablespace') "
"FROM pg_catalog.pg_tablespace "
"WHERE spcname !~ '^pg_' "
"ORDER BY 1");
else if (server_version >= 90200)
res = executeQuery(conn, "SELECT oid, spcname, "
"pg_catalog.pg_get_userbyid(spcowner) AS spcowner, "
"pg_catalog.pg_tablespace_location(oid), "
"spcacl, '' as rspcacl, "
"spcacl, acldefault('t', spcowner) AS acldefault, "
"array_to_string(spcoptions, ', '),"
"pg_catalog.shobj_description(oid, 'pg_tablespace') "
"FROM pg_catalog.pg_tablespace "
@ -1223,7 +1180,7 @@ dumpTablespaces(PGconn *conn)
else if (server_version >= 90000)
res = executeQuery(conn, "SELECT oid, spcname, "
"pg_catalog.pg_get_userbyid(spcowner) AS spcowner, "
"spclocation, spcacl, '' as rspcacl, "
"spclocation, spcacl, NULL AS acldefault, "
"array_to_string(spcoptions, ', '),"
"pg_catalog.shobj_description(oid, 'pg_tablespace') "
"FROM pg_catalog.pg_tablespace "
@ -1232,7 +1189,7 @@ dumpTablespaces(PGconn *conn)
else if (server_version >= 80200)
res = executeQuery(conn, "SELECT oid, spcname, "
"pg_catalog.pg_get_userbyid(spcowner) AS spcowner, "
"spclocation, spcacl, '' as rspcacl, null, "
"spclocation, spcacl, NULL AS acldefault, null, "
"pg_catalog.shobj_description(oid, 'pg_tablespace') "
"FROM pg_catalog.pg_tablespace "
"WHERE spcname !~ '^pg_' "
@ -1240,7 +1197,7 @@ dumpTablespaces(PGconn *conn)
else
res = executeQuery(conn, "SELECT oid, spcname, "
"pg_catalog.pg_get_userbyid(spcowner) AS spcowner, "
"spclocation, spcacl, '' as rspcacl, "
"spclocation, spcacl, NULL AS acldefault, "
"null, null "
"FROM pg_catalog.pg_tablespace "
"WHERE spcname !~ '^pg_' "
@ -1257,7 +1214,7 @@ dumpTablespaces(PGconn *conn)
char *spcowner = PQgetvalue(res, i, 2);
char *spclocation = PQgetvalue(res, i, 3);
char *spcacl = PQgetvalue(res, i, 4);
char *rspcacl = PQgetvalue(res, i, 5);
char *acldefault = PQgetvalue(res, i, 5);
char *spcoptions = PQgetvalue(res, i, 6);
char *spccomment = PQgetvalue(res, i, 7);
char *fspcname;
@ -1276,9 +1233,11 @@ dumpTablespaces(PGconn *conn)
appendPQExpBuffer(buf, "ALTER TABLESPACE %s SET (%s);\n",
fspcname, spcoptions);
/* tablespaces can't have initprivs */
if (!skip_acls &&
!buildACLCommands(fspcname, NULL, NULL, "TABLESPACE",
spcacl, rspcacl,
spcacl, acldefault,
spcowner, "", server_version, buf))
{
pg_log_error("could not parse ACL list (%s) for tablespace \"%s\"",

View File

@ -726,6 +726,69 @@ parsePGArray(const char *atext, char ***itemarray, int *nitems)
}
/*
* Append one element to the text representation of a 1-dimensional Postgres
* array.
*
* The caller must provide the initial '{' and closing '}' of the array.
* This function handles all else, including insertion of commas and
* quoting of values.
*
* We assume that typdelim is ','.
*/
void
appendPGArray(PQExpBuffer buffer, const char *value)
{
bool needquote;
const char *tmp;
if (buffer->data[buffer->len - 1] != '{')
appendPQExpBufferChar(buffer, ',');
/* Decide if we need quotes; this should match array_out()'s choices. */
if (value[0] == '\0')
needquote = true; /* force quotes for empty string */
else if (pg_strcasecmp(value, "NULL") == 0)
needquote = true; /* force quotes for literal NULL */
else
needquote = false;
if (!needquote)
{
for (tmp = value; *tmp; tmp++)
{
char ch = *tmp;
if (ch == '"' || ch == '\\' ||
ch == '{' || ch == '}' || ch == ',' ||
/* these match array_isspace(): */
ch == ' ' || ch == '\t' || ch == '\n' ||
ch == '\r' || ch == '\v' || ch == '\f')
{
needquote = true;
break;
}
}
}
if (needquote)
{
appendPQExpBufferChar(buffer, '"');
for (tmp = value; *tmp; tmp++)
{
char ch = *tmp;
if (ch == '"' || ch == '\\')
appendPQExpBufferChar(buffer, '\\');
appendPQExpBufferChar(buffer, ch);
}
appendPQExpBufferChar(buffer, '"');
}
else
appendPQExpBufferStr(buffer, value);
}
/*
* Format a reloptions array and append it to the given buffer.
*

View File

@ -46,6 +46,7 @@ extern void appendConnStrVal(PQExpBuffer buf, const char *str);
extern void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname);
extern bool parsePGArray(const char *atext, char ***itemarray, int *nitems);
extern void appendPGArray(PQExpBuffer buffer, const char *value);
extern bool appendReloptionsArray(PQExpBuffer buffer, const char *reloptions,
const char *prefix, int encoding, bool std_strings);