From dafbfed9efbe3d166f25df7e564bad716e9f8bfc Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 5 Feb 2024 10:45:34 +0530 Subject: [PATCH] Enhance libpqrcv APIs to support slot synchronization. This patch provides support for regular (non-replication) connections in libpqrcv_connect(). This can be used to execute SQL statements on the primary server without starting a walsender. A new API libpqrcv_get_dbname_from_conninfo() is also added to extract the database name from the given connection-info. Note that this patch doesn't change any existing functionality but later patches implementing the slot synchronization will use this functionality to connect to the primary server to fetch required slot information. Author: Shveta Malik, Hou Zhijie, Ajin Cherian Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com --- src/backend/commands/subscriptioncmds.c | 8 +- .../libpqwalreceiver/libpqwalreceiver.c | 119 +++++++++++++----- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 2 +- src/backend/replication/walreceiver.c | 2 +- src/include/replication/walreceiver.h | 21 +++- 6 files changed, 114 insertions(+), 40 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b647a81fc8..a400ba0e40 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -759,7 +759,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = !superuser_arg(owner) && opts.passwordrequired; - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, stmt->subname, &err); if (!wrconn) ereport(ERROR, @@ -910,7 +910,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1537,7 +1537,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1788,7 +1788,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, subname, &err); if (wrconn == NULL) { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2439733b55..9270d7b855 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -48,7 +48,8 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, bool must_use_password, + bool replication, bool logical, + bool must_use_password, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password); @@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -121,7 +124,11 @@ _PG_init(void) } /* - * Establish the connection to the primary server for XLOG streaming + * Establish the connection to the primary server. + * + * This function can be used for both replication and regular connections. + * If it is a replication connection, it could be either logical or physical + * based on input argument 'logical'. * * If an error occurs, this function will normally return NULL and set *err * to a palloc'ed error message. However, if must_use_password is true and @@ -132,8 +139,8 @@ _PG_init(void) * case. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, - const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool replication, bool logical, + bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; PostgresPollingStatusType status; @@ -156,36 +163,46 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, */ keys[i] = "dbname"; vals[i] = conninfo; - keys[++i] = "replication"; - vals[i] = logical ? "database" : "true"; - if (!logical) + + /* We can not have logical without replication */ + Assert(replication || !logical); + + if (replication) { - /* - * The database name is ignored by the server in replication mode, but - * specify "replication" for .pgpass lookup. - */ - keys[++i] = "dbname"; - vals[i] = "replication"; + keys[++i] = "replication"; + vals[i] = logical ? "database" : "true"; + + if (logical) + { + /* Tell the publisher to translate to our encoding */ + keys[++i] = "client_encoding"; + vals[i] = GetDatabaseEncodingName(); + + /* + * Force assorted GUC parameters to settings that ensure that the + * publisher will output data values in a form that is unambiguous + * to the subscriber. (We don't want to modify the subscriber's + * GUC settings, since that might surprise user-defined code + * running in the subscriber, such as triggers.) This should + * match what pg_dump does. + */ + keys[++i] = "options"; + vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3"; + } + else + { + /* + * The database name is ignored by the server in replication mode, + * but specify "replication" for .pgpass lookup. + */ + keys[++i] = "dbname"; + vals[i] = "replication"; + } } + keys[++i] = "fallback_application_name"; vals[i] = appname; - if (logical) - { - /* Tell the publisher to translate to our encoding */ - keys[++i] = "client_encoding"; - vals[i] = GetDatabaseEncodingName(); - /* - * Force assorted GUC parameters to settings that ensure that the - * publisher will output data values in a form that is unambiguous to - * the subscriber. (We don't want to modify the subscriber's GUC - * settings, since that might surprise user-defined code running in - * the subscriber, such as triggers.) This should match what pg_dump - * does. - */ - keys[++i] = "options"; - vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3"; - } keys[++i] = NULL; vals[i] = NULL; @@ -471,6 +488,50 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + *opt->val) + { + if (dbname) + pfree(dbname); + + dbname = pstrdup(opt->val); + } + } + + PQconninfoFree(opts); + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 5acab3f3e2..ee06629088 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, + walrcv_connect(MySubscription->conninfo, true, true, must_use_password, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 32ff4c0336..9dd2446fbf 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4519,7 +4519,7 @@ run_apply_worker() !MySubscription->ownersuperuser; LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, + true, must_use_password, MySubscription->name, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e29a6196a3..b80447d15f 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,7 @@ WalReceiverMain(void) sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, false, + wrconn = walrcv_connect(conninfo, true, false, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba1..b906bb5ce8 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -228,8 +228,10 @@ typedef struct WalRcvExecResult /* * walrcv_connect_fn * - * Establish connection to a cluster. 'logical' is true if the - * connection is logical, and false if the connection is physical. + * Establish connection to a cluster. 'replication' is true if the + * connection is a replication connection, and false if it is a + * regular connection. If it is a replication connection, it could + * be either logical or physical based on input argument 'logical'. * 'appname' is a name associated to the connection, to use for example * with fallback_application_name or application_name. Returns the * details about the connection established, as defined by @@ -237,6 +239,7 @@ typedef struct WalRcvExecResult * returned with 'err' including the error generated. */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, + bool replication, bool logical, bool must_use_password, const char *appname, @@ -279,6 +282,13 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the database name from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +413,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -418,8 +429,8 @@ typedef struct WalReceiverFunctionsType extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) +#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) #define walrcv_check_conninfo(conninfo, must_use_password) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password) #define walrcv_get_conninfo(conn) \ @@ -428,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \