Respect permissions within logical replication.

Prevent logical replication workers from performing insert, update,
delete, truncate, or copy commands on tables unless the subscription
owner has permission to do so.

Prevent subscription owners from circumventing row-level security by
forbidding replication into tables with row-level security policies
which the subscription owner is subject to, without regard to whether
the policy would ordinarily allow the INSERT, UPDATE, DELETE or
TRUNCATE which is being replicated.  This seems sufficient for now, as
superusers, roles with bypassrls, and target table owners should still
be able to replicate despite RLS policies.  We can revisit the
question of applying row-level security policies on a per-row basis if
this restriction proves too severe in practice.

Author: Mark Dilger
Reviewed-by: Jeff Davis, Andrew Dunstan, Ronan Dunklau
Discussion: https://postgr.es/m/9DFC88D3-1300-4DE8-ACBC-4CEF84399A53%40enterprisedb.com
This commit is contained in:
Jeff Davis 2022-01-07 17:38:20 -08:00
parent d0d62262d3
commit a2ab9c06ea
6 changed files with 499 additions and 8 deletions

View File

@ -330,6 +330,19 @@
will simply be skipped.
</para>
<para>
Logical replication operations are performed with the privileges of the role
which owns the subscription. Permissions failures on target tables will
cause replication conflicts, as will enabled
<link linkend="ddl-rowsecurity">row-level security</link> on target tables
that the subscription owner is subject to, without regard to whether any
policy would ordinary reject the <command>INSERT</command>,
<command>UPDATE</command>, <command>DELETE</command> or
<command>TRUNCATE</command> which is being replicated. This restriction on
row-level security may be lifted in a future version of
<productname>PostgreSQL</productname>.
</para>
<para>
A conflict will produce an error and will stop the replication; it must be
resolved manually by the user. Details about the conflict can be found in
@ -337,7 +350,7 @@
</para>
<para>
The resolution can be done either by changing data on the subscriber so
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
transaction that conflicts with the existing data. The transaction can be
skipped by calling the <link linkend="pg-replication-origin-advance">
@ -530,9 +543,9 @@
<para>
A user able to modify the schema of subscriber-side tables can execute
arbitrary code as a superuser. Limit ownership
and <literal>TRIGGER</literal> privilege on such tables to roles that
superusers trust. Moreover, if untrusted users can create tables, use only
arbitrary code as the role which owns any subscription which modifies those tables. Limit ownership
and <literal>TRIGGER</literal> privilege on such tables to trusted roles.
Moreover, if untrusted users can create tables, use only
publications that list tables explicitly. That is to say, create a
subscription <literal>FOR ALL TABLES</literal> or
<literal>FOR ALL TABLES IN SCHEMA</literal> only when superusers trust
@ -576,13 +589,20 @@
<para>
The subscription apply process will run in the local database with the
privileges of a superuser.
privileges of the subscription owner.
</para>
<para>
Privileges are only checked once at the start of a replication connection.
They are not re-checked as each change record is read from the publisher,
nor are they re-checked for each change when applied.
On the publisher, privileges are only checked once at the start of a
replication connection and are not re-checked as each change record is read.
</para>
<para>
On the subscriber, the subscription owner's privileges are re-checked for
each transaction when applied. If a worker is in the process of applying a
transaction when the ownership of the subscription is changed by a
concurrent transaction, the application of the current transaction will
continue under the old owner's privileges.
</para>
</sect1>

View File

@ -1481,6 +1481,8 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
InvokeObjectPostAlterHook(SubscriptionRelationId,
form->oid, 0);
ApplyLauncherWakeupAtCommit();
}
/*

View File

@ -111,9 +111,11 @@
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@ -924,6 +926,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
char relstate;
XLogRecPtr relstate_lsn;
Relation rel;
AclResult aclresult;
WalRcvExecResult *res;
char originname[NAMEDATALEN];
RepOriginId originid;
@ -1042,6 +1045,31 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
/*
* Check that our table sync worker has permission to insert into the
* target table.
*/
aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
ACL_INSERT);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult,
get_relkind_objtype(rel->rd_rel->relkind),
RelationGetRelationName(rel));
/*
* COPY FROM does not honor RLS policies. That is not a problem for
* subscriptions owned by roles with BYPASSRLS privilege (or superuser, who
* has it implicitly), but other roles should not be able to circumvent
* RLS. Disallow logical replication into RLS enabled relations for such
* roles.
*/
if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
GetUserNameFromId(GetUserId(), true),
RelationGetRelationName(rel))));
/*
* Start a transaction in the remote node in REPEATABLE READ mode. This
* ensures that both the replication slot we create (see below) and the

View File

@ -179,6 +179,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
@ -189,6 +190,7 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
@ -1530,6 +1532,38 @@ GetRelationIdentityOrPK(Relation rel)
return idxoid;
}
/*
* Check that we (the subscription owner) have sufficient privileges on the
* target relation to perform the given operation.
*/
static void
TargetPrivilegesCheck(Relation rel, AclMode mode)
{
Oid relid;
AclResult aclresult;
relid = RelationGetRelid(rel);
aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult,
get_relkind_objtype(rel->rd_rel->relkind),
get_rel_name(relid));
/*
* We lack the infrastructure to honor RLS policies. It might be possible
* to add such infrastructure here, but tablesync workers lack it, too, so
* we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
* but it seems dangerous to replicate a TRUNCATE and then refuse to
* replicate subsequent INSERTs, so we forbid all commands the same.
*/
if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
GetUserNameFromId(GetUserId(), true),
RelationGetRelationName(rel))));
}
/*
* Handle INSERT message.
*/
@ -1613,6 +1647,7 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
ExecOpenIndices(relinfo, false);
/* Do the insert. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
ExecSimpleRelationInsert(relinfo, estate, remoteslot);
/* Cleanup. */
@ -1796,6 +1831,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, remoteslot);
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
remoteslot);
}
@ -1917,6 +1953,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
EvalPlanQualSetSlot(&epqstate, localslot);
/* Do the actual delete. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
}
else
@ -2110,6 +2147,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
ExecOpenIndices(partrelinfo, false);
EvalPlanQualSetSlot(&epqstate, remoteslot_part);
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
localslot, remoteslot_part);
ExecCloseIndices(partrelinfo);
@ -2236,6 +2275,7 @@ apply_handle_truncate(StringInfo s)
}
remote_rels = lappend(remote_rels, rel);
TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
rels = lappend(rels, rel->localrel);
relids = lappend_oid(relids, rel->localreloid);
if (RelationIsLogicallyLogged(rel->localrel))
@ -2273,6 +2313,7 @@ apply_handle_truncate(StringInfo s)
continue;
}
TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
rels = lappend(rels, childrel);
part_rels = lappend(part_rels, childrel);
relids = lappend_oid(relids, childrelid);
@ -2915,6 +2956,7 @@ maybe_reread_subscription(void)
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,

View File

@ -2599,6 +2599,42 @@ sub wait_for_slot_catchup
=pod
=item $node->wait_for_log(regexp, offset)
Waits for the contents of the server log file, starting at the given offset, to
match the supplied regular expression. Checks the entire log if no offset is
given. Times out after 180 seconds.
If successful, returns the length of the entire log file, in bytes.
=cut
sub wait_for_log
{
my ($self, $regexp, $offset) = @_;
$offset = 0 unless defined $offset;
my $max_attempts = 180 * 10;
my $attempts = 0;
while ($attempts < $max_attempts)
{
my $log = PostgreSQL::Test::Utils::slurp_file($self->logfile, $offset);
return $offset+length($log) if ($log =~ m/$regexp/);
# Wait 0.1 second before retrying.
usleep(100_000);
$attempts++;
}
# The logs didn't match within 180 seconds. Give up.
croak "timed out waiting for match: $regexp";
}
=pod
=item $node->query_hash($dbname, $query, @columns)
Execute $query on $dbname, replacing any appearance of the string __COLUMNS__

View File

@ -0,0 +1,363 @@
# Copyright (c) 2021, PostgreSQL Global Development Group
# Test that logical replication respects permissions
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use Test::More tests => 100;
my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset);
$offset = 0;
sub publish_insert($$)
{
my ($tbl, $new_i) = @_;
$node_publisher->safe_psql('postgres', qq(
SET SESSION AUTHORIZATION regress_alice;
INSERT INTO $tbl (i) VALUES ($new_i);
));
}
sub publish_update($$$)
{
my ($tbl, $old_i, $new_i) = @_;
$node_publisher->safe_psql('postgres', qq(
SET SESSION AUTHORIZATION regress_alice;
UPDATE $tbl SET i = $new_i WHERE i = $old_i;
));
}
sub publish_delete($$)
{
my ($tbl, $old_i) = @_;
$node_publisher->safe_psql('postgres', qq(
SET SESSION AUTHORIZATION regress_alice;
DELETE FROM $tbl WHERE i = $old_i;
));
}
sub expect_replication($$$$$)
{
my ($tbl, $cnt, $min, $max, $testname) = @_;
$node_publisher->wait_for_catchup('admin_sub');
$result = $node_subscriber->safe_psql('postgres', qq(
SELECT COUNT(i), MIN(i), MAX(i) FROM $tbl));
is ($result, "$cnt|$min|$max", $testname);
}
sub expect_failure($$$$$$)
{
my ($tbl, $cnt, $min, $max, $re, $testname) = @_;
$offset = $node_subscriber->wait_for_log($re, $offset);
$result = $node_subscriber->safe_psql('postgres', qq(
SELECT COUNT(i), MIN(i), MAX(i) FROM $tbl));
is ($result, "$cnt|$min|$max", $testname);
}
sub revoke_superuser($)
{
my ($role) = @_;
$node_subscriber->safe_psql('postgres', qq(
ALTER ROLE $role NOSUPERUSER));
}
sub grant_superuser($)
{
my ($role) = @_;
$node_subscriber->safe_psql('postgres', qq(
ALTER ROLE $role SUPERUSER));
}
sub revoke_bypassrls($)
{
my ($role) = @_;
$node_subscriber->safe_psql('postgres', qq(
ALTER ROLE $role NOBYPASSRLS));
}
sub grant_bypassrls($)
{
my ($role) = @_;
$node_subscriber->safe_psql('postgres', qq(
ALTER ROLE $role BYPASSRLS));
}
# Create publisher and subscriber nodes with schemas owned and published by
# "regress_alice" but subscribed and replicated by different role
# "regress_admin". For partitioned tables, layout the partitions differently
# on the publisher than on the subscriber.
#
$node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_publisher->init(allows_streaming => 'logical');
$node_subscriber->init;
$node_publisher->start;
$node_subscriber->start;
$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my %range_a = (
publisher => 'FROM (0) TO (15)',
subscriber => 'FROM (0) TO (5)');
my %range_b = (
publisher => 'FROM (15) TO (30)',
subscriber => 'FROM (5) TO (30)');
my %list_a = (
publisher => 'IN (1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29)',
subscriber => 'IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)',
);
my %list_b = (
publisher => 'IN (2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30)',
subscriber => 'IN (17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)');
my %remainder_a = (
publisher => 0,
subscriber => 1);
my %remainder_b = (
publisher => 1,
subscriber => 0);
for my $node ($node_publisher, $node_subscriber)
{
my $range_a = $range_a{$node->name};
my $range_b = $range_b{$node->name};
my $list_a = $list_a{$node->name};
my $list_b = $list_b{$node->name};
my $remainder_a = $remainder_a{$node->name};
my $remainder_b = $remainder_b{$node->name};
$node->safe_psql('postgres', qq(
CREATE ROLE regress_admin SUPERUSER LOGIN;
CREATE ROLE regress_alice NOSUPERUSER LOGIN;
GRANT CREATE ON DATABASE postgres TO regress_alice;
SET SESSION AUTHORIZATION regress_alice;
CREATE SCHEMA alice;
GRANT USAGE ON SCHEMA alice TO regress_admin;
CREATE TABLE alice.unpartitioned (i INTEGER);
ALTER TABLE alice.unpartitioned REPLICA IDENTITY FULL;
GRANT SELECT ON TABLE alice.unpartitioned TO regress_admin;
CREATE TABLE alice.rangepart (i INTEGER) PARTITION BY RANGE (i);
ALTER TABLE alice.rangepart REPLICA IDENTITY FULL;
GRANT SELECT ON TABLE alice.rangepart TO regress_admin;
CREATE TABLE alice.rangepart_a PARTITION OF alice.rangepart
FOR VALUES $range_a;
ALTER TABLE alice.rangepart_a REPLICA IDENTITY FULL;
CREATE TABLE alice.rangepart_b PARTITION OF alice.rangepart
FOR VALUES $range_b;
ALTER TABLE alice.rangepart_b REPLICA IDENTITY FULL;
CREATE TABLE alice.listpart (i INTEGER) PARTITION BY LIST (i);
ALTER TABLE alice.listpart REPLICA IDENTITY FULL;
GRANT SELECT ON TABLE alice.listpart TO regress_admin;
CREATE TABLE alice.listpart_a PARTITION OF alice.listpart
FOR VALUES $list_a;
ALTER TABLE alice.listpart_a REPLICA IDENTITY FULL;
CREATE TABLE alice.listpart_b PARTITION OF alice.listpart
FOR VALUES $list_b;
ALTER TABLE alice.listpart_b REPLICA IDENTITY FULL;
CREATE TABLE alice.hashpart (i INTEGER) PARTITION BY HASH (i);
ALTER TABLE alice.hashpart REPLICA IDENTITY FULL;
GRANT SELECT ON TABLE alice.hashpart TO regress_admin;
CREATE TABLE alice.hashpart_a PARTITION OF alice.hashpart
FOR VALUES WITH (MODULUS 2, REMAINDER $remainder_a);
ALTER TABLE alice.hashpart_a REPLICA IDENTITY FULL;
CREATE TABLE alice.hashpart_b PARTITION OF alice.hashpart
FOR VALUES WITH (MODULUS 2, REMAINDER $remainder_b);
ALTER TABLE alice.hashpart_b REPLICA IDENTITY FULL;
));
}
$node_publisher->safe_psql('postgres', qq(
SET SESSION AUTHORIZATION regress_alice;
CREATE PUBLICATION alice
FOR TABLE alice.unpartitioned, alice.rangepart, alice.listpart, alice.hashpart
WITH (publish_via_partition_root = true);
));
$node_subscriber->safe_psql('postgres', qq(
SET SESSION AUTHORIZATION regress_admin;
CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
));
# Verify that "regress_admin" can replicate into the tables
#
my @tbl = (qw(unpartitioned rangepart listpart hashpart));
for my $tbl (@tbl)
{
publish_insert("alice.$tbl", 1);
publish_insert("alice.$tbl", 3);
publish_insert("alice.$tbl", 5);
expect_replication(
"alice.$tbl", 3, 1, 5,
"superuser admin replicates insert into $tbl");
publish_update("alice.$tbl", 1 => 7);
expect_replication(
"alice.$tbl", 3, 3, 7,
"superuser admin replicates update into $tbl");
publish_delete("alice.$tbl", 3);
expect_replication(
"alice.$tbl", 2, 5, 7,
"superuser admin replicates delete into $tbl");
}
# Repeatedly revoke and restore superuser privilege for "regress_admin", verifying
# that replication fails while superuser privilege is missing, but works again and
# catches up once superuser is restored.
#
for my $tbl (@tbl)
{
revoke_superuser("regress_admin");
publish_insert("alice.$tbl", 3);
expect_failure("alice.$tbl", 2, 5, 7,
qr/ERROR: permission denied for table $tbl/msi,
"non-superuser admin fails to replicate insert");
grant_superuser("regress_admin");
expect_replication("alice.$tbl", 3, 3, 7,
"admin with restored superuser privilege replicates insert");
revoke_superuser("regress_admin");
publish_update("alice.$tbl", 3 => 9);
expect_failure("alice.$tbl", 3, 3, 7,
qr/ERROR: permission denied for table $tbl/msi,
"non-superuser admin fails to replicate update");
grant_superuser("regress_admin");
expect_replication("alice.$tbl", 3, 5, 9,
"admin with restored superuser privilege replicates update");
revoke_superuser("regress_admin");
publish_delete("alice.$tbl", 5);
expect_failure("alice.$tbl", 3, 5, 9,
qr/ERROR: permission denied for table $tbl/msi,
"non-superuser admin fails to replicate delete");
grant_superuser("regress_admin");
expect_replication("alice.$tbl", 2, 7, 9,
"admin with restored superuser privilege replicates delete");
}
# Grant privileges on the target tables to "regress_admin" so that superuser
# privileges are not necessary for replication.
#
$node_subscriber->safe_psql('postgres', qq(
ALTER ROLE regress_admin NOSUPERUSER;
SET SESSION AUTHORIZATION regress_alice;
GRANT ALL PRIVILEGES ON
alice.unpartitioned,
alice.rangepart, alice.rangepart_a, alice.rangepart_b,
alice.listpart, alice.listpart_a, alice.listpart_b,
alice.hashpart, alice.hashpart_a, alice.hashpart_b
TO regress_admin;
));
for my $tbl (@tbl)
{
publish_insert("alice.$tbl", 11);
publish_update("alice.$tbl", 7 => 13);
publish_delete("alice.$tbl", 9);
expect_replication("alice.$tbl", 2, 11, 13,
"nosuperuser admin with all table privileges can replicate into $tbl");
}
# Enable RLS on the target tables and check that "regress_admin" can only
# replicate into them when superuser. Note that RLS must be enabled on the
# partitions, not the partitioned tables, since the partitions are the targets
# of the replication.
#
$node_subscriber->safe_psql('postgres', qq(
SET SESSION AUTHORIZATION regress_alice;
ALTER TABLE alice.unpartitioned ENABLE ROW LEVEL SECURITY;
ALTER TABLE alice.rangepart_a ENABLE ROW LEVEL SECURITY;
ALTER TABLE alice.rangepart_b ENABLE ROW LEVEL SECURITY;
ALTER TABLE alice.listpart_a ENABLE ROW LEVEL SECURITY;
ALTER TABLE alice.listpart_b ENABLE ROW LEVEL SECURITY;
ALTER TABLE alice.hashpart_a ENABLE ROW LEVEL SECURITY;
ALTER TABLE alice.hashpart_b ENABLE ROW LEVEL SECURITY;
));
for my $tbl (@tbl)
{
revoke_superuser("regress_admin");
publish_insert("alice.$tbl", 15);
expect_failure("alice.$tbl", 2, 11, 13,
qr/ERROR: "regress_admin" cannot replicate into relation with row-level security enabled: "$tbl\w*"/msi,
"non-superuser admin fails to replicate insert into rls enabled table");
grant_superuser("regress_admin");
expect_replication("alice.$tbl", 3, 11, 15,
"admin with restored superuser privilege replicates insert into rls enabled $tbl");
revoke_superuser("regress_admin");
publish_update("alice.$tbl", 11 => 17);
expect_failure("alice.$tbl", 3, 11, 15,
qr/ERROR: "regress_admin" cannot replicate into relation with row-level security enabled: "$tbl\w*"/msi,
"non-superuser admin fails to replicate update into rls enabled $tbl");
grant_superuser("regress_admin");
expect_replication("alice.$tbl", 3, 13, 17,
"admin with restored superuser privilege replicates update into rls enabled $tbl");
revoke_superuser("regress_admin");
publish_delete("alice.$tbl", 13);
expect_failure("alice.$tbl", 3, 13, 17,
qr/ERROR: "regress_admin" cannot replicate into relation with row-level security enabled: "$tbl\w*"/msi,
"non-superuser admin fails to replicate delete into rls enabled $tbl");
grant_superuser("regress_admin");
expect_replication("alice.$tbl", 2, 15, 17,
"admin with restored superuser privilege replicates delete into rls enabled $tbl");
}
# Revoke superuser from "regress_admin". Check that the admin can now only
# replicate into alice's table when admin has the bypassrls privilege.
#
for my $tbl (@tbl)
{
revoke_superuser("regress_admin");
revoke_bypassrls("regress_admin");
publish_insert("alice.$tbl", 19);
expect_failure("alice.$tbl", 2, 15, 17,
qr/ERROR: "regress_admin" cannot replicate into relation with row-level security enabled: "$tbl\w*"/msi,
"nobypassrls admin fails to replicate insert into rls enabled $tbl");
grant_bypassrls("regress_admin");
expect_replication("alice.$tbl", 3, 15, 19,
"admin with bypassrls privilege replicates insert into rls enabled $tbl");
revoke_bypassrls("regress_admin");
publish_update("alice.$tbl", 15 => 21);
expect_failure("alice.$tbl", 3, 15, 19,
qr/ERROR: "regress_admin" cannot replicate into relation with row-level security enabled: "$tbl\w*"/msi,
"nobypassrls admin fails to replicate update into rls enabled $tbl");
grant_bypassrls("regress_admin");
expect_replication("alice.$tbl", 3, 17, 21,
"admin with restored bypassrls privilege replicates update into rls enabled $tbl");
revoke_bypassrls("regress_admin");
publish_delete("alice.$tbl", 17);
expect_failure("alice.$tbl", 3, 17, 21,
qr/ERROR: "regress_admin" cannot replicate into relation with row-level security enabled: "$tbl\w*"/msi,
"nobypassrls admin fails to replicate delete into rls enabled $tbl");
grant_bypassrls("regress_admin");
expect_replication("alice.$tbl", 2, 19, 21,
"admin with restored bypassrls privilege replicates delete into rls enabled $tbl");
}
# Alter the subscription owner to "regress_alice". She has neither superuser
# nor bypassrls, but as the table owner should be able to replicate.
#
$node_subscriber->safe_psql('postgres', qq(
ALTER SUBSCRIPTION admin_sub DISABLE;
ALTER ROLE regress_alice SUPERUSER;
ALTER SUBSCRIPTION admin_sub OWNER TO regress_alice;
ALTER ROLE regress_alice NOSUPERUSER;
ALTER SUBSCRIPTION admin_sub ENABLE;
));
for my $tbl (@tbl)
{
publish_insert("alice.$tbl", 23);
expect_replication(
"alice.$tbl", 3, 19, 23,
"nosuperuser nobypassrls table owner can replicate insert into $tbl despite rls");
publish_update("alice.$tbl", 19 => 25);
expect_replication(
"alice.$tbl", 3, 21, 25,
"nosuperuser nobypassrls table owner can replicate update into $tbl despite rls");
publish_delete("alice.$tbl", 21);
expect_replication(
"alice.$tbl", 2, 23, 25,
"nosuperuser nobypassrls table owner can replicate delete into $tbl despite rls");
}