Restart the apply worker if the privileges have been revoked.

Restart the apply worker if the subscription owner's superuser privileges
have been revoked. This is required so that the subscription connection
string gets revalidated and use the password option to connect to the
publisher for non-superusers, if required.

Author: Vignesh C
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/CALDaNm2Dxmhq08nr4P6G+24QvdBo_GAVyZ_Q1TcGYK+8NHs9xw@mail.gmail.com
This commit is contained in:
Amit Kapila 2023-10-17 08:30:05 +05:30
parent 2f04720307
commit 79243de13f
6 changed files with 60 additions and 11 deletions

View File

@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
Anum_pg_subscription_suborigin);
sub->origin = TextDatumGetCString(datum);
/* Is the subscription owner a superuser? */
sub->ownersuperuser = superuser_arg(sub->owner);
ReleaseSysCache(tup);
return sub;

View File

@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
load_file("libpqwalreceiver", false);
/* Try to connect to the publisher. */
must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
sub->name, &err);
if (!wrconn)
@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
walrcv_check_conninfo(stmt->conninfo,
sub->passwordrequired && !superuser_arg(sub->owner));
sub->passwordrequired && !sub->ownersuperuser);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(stmt->conninfo);

View File

@ -1275,13 +1275,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
&relstate_lsn);
CommitTransactionCommand();
/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
!superuser_arg(MySubscription->owner);
/* Note that the superuser_arg call can access the DB */
CommitTransactionCommand();
!MySubscription->ownersuperuser;
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = relstate;

View File

@ -3966,6 +3966,24 @@ maybe_reread_subscription(void)
apply_worker_exit();
}
/*
* Exit if the subscription owner's superuser privileges have been
* revoked.
*/
if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
{
if (am_parallel_apply_worker())
ereport(LOG,
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
MySubscription->name));
else
ereport(LOG,
errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
MySubscription->name));
apply_worker_exit();
}
/* Check for other changes that should never happen too. */
if (newsub->dbid != MySubscription->dbid)
{
@ -4492,13 +4510,11 @@ run_apply_worker()
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
!superuser_arg(MySubscription->owner);
/* Note that the superuser_arg call can access the DB */
CommitTransactionCommand();
!MySubscription->ownersuperuser;
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
must_use_password,
@ -4621,11 +4637,18 @@ InitializeLogRepWorker(void)
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
/* Keep us informed about subscription changes. */
/*
* Keep us informed about subscription or role changes. Note that the
* role's superuser privilege can be revoked.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
(Datum) 0);
CacheRegisterSyscacheCallback(AUTHOID,
subscription_change_cb,
(Datum) 0);
if (am_tablesync_worker())
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",

View File

@ -127,6 +127,7 @@ typedef struct Subscription
* skipped */
char *name; /* Name of the subscription */
Oid owner; /* Oid of the subscription owner */
bool ownersuperuser; /* Is the subscription owner a superuser? */
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */

View File

@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
CREATE ROLE regress_admin SUPERUSER LOGIN;
CREATE ROLE regress_alice NOSUPERUSER LOGIN;
GRANT CREATE ON DATABASE postgres TO regress_alice;
GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
SET SESSION AUTHORIZATION regress_alice;
CREATE SCHEMA alice;
GRANT USAGE ON SCHEMA alice TO regress_admin;
@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
expect_replication("alice.unpartitioned", 3, 17, 21,
"restoring SELECT permission permits replication to continue");
# The apply worker should get restarted after the superuser privileges are
# revoked for subscription owner alice.
grant_superuser("regress_alice");
$node_subscriber->safe_psql(
'postgres', qq(
SET SESSION AUTHORIZATION regress_alice;
CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
));
# Wait for initial sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher,
'regression_sub');
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
revoke_superuser("regress_alice");
# After the user becomes non-superuser the apply worker should be restarted.
$node_subscriber->wait_for_log(
qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner's superuser privileges have been revoked/,
$offset);
done_testing();