postgres_fdw: Add support for parallel commit.

postgres_fdw commits remote (sub)transactions opened on remote server(s)
in a local (sub)transaction one by one when the local (sub)transaction
commits.  This patch allows it to commit the remote (sub)transactions in
parallel to improve performance.  This is enabled by the server option
"parallel_commit".  The default is false.

Etsuro Fujita, reviewed by Fujii Masao and David Zhang.

Discussion: http://postgr.es/m/CAPmGK17dAZCXvwnfpr1eTfknTGdt%3DhYTV9405Gt5SqPOX8K84w%40mail.gmail.com
This commit is contained in:
Etsuro Fujita 2022-02-24 14:30:00 +09:00
parent cfb4e209ec
commit 04e706d423
5 changed files with 376 additions and 19 deletions

View File

@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
bool parallel_commit; /* do we commit (sub)xacts in parallel? */
bool invalidated; /* true if reconnect is pending */
bool keep_connections; /* setting value of keep_connections
* server option */
@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
static void configure_remote_session(PGconn *conn);
static void do_sql_command_begin(PGconn *conn, const char *sql);
static void do_sql_command_end(PGconn *conn, const char *sql,
bool consume_input);
static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg);
static void pgfdw_subxact_callback(SubXactEvent event,
@ -100,6 +104,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
void *arg);
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
@ -107,6 +112,9 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
* is changed will be closed and re-made later.
*
* By default, all the connections to any foreign servers are kept open.
*
* Also determine whether to commit (sub)transactions opened on the remote
* server in parallel at (sub)transaction end.
*/
entry->keep_connections = true;
entry->parallel_commit = false;
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "keep_connections") == 0)
entry->keep_connections = defGetBoolean(def);
else if (strcmp(def->defname, "parallel_commit") == 0)
entry->parallel_commit = defGetBoolean(def);
}
/* Now try to make the connection */
@ -622,10 +636,30 @@ configure_remote_session(PGconn *conn)
*/
void
do_sql_command(PGconn *conn, const char *sql)
{
do_sql_command_begin(conn, sql);
do_sql_command_end(conn, sql, false);
}
static void
do_sql_command_begin(PGconn *conn, const char *sql)
{
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
}
static void
do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
{
PGresult *res;
if (!PQsendQuery(conn, sql))
/*
* If requested, consume whatever data is available from the socket.
* (Note that if all data is available, this allows pgfdw_get_result to
* call PQgetResult without forcing the overhead of WaitLatchOrSocket,
* which would be large compared to the overhead of PQconsumeInput.)
*/
if (consume_input && !PQconsumeInput(conn))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
List *pending_entries = NIL;
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Commit all remote transactions during pre-commit */
entry->changing_xact_state = true;
if (entry->parallel_commit)
{
do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
pending_entries = lappend(pending_entries, entry);
continue;
}
do_sql_command(entry->conn, "COMMIT TRANSACTION");
entry->changing_xact_state = false;
@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
}
/* Reset state to show we're out of a transaction */
entry->xact_depth = 0;
pgfdw_reset_xact_state(entry, true);
}
/*
* If the connection isn't in a good idle state, it is marked as
* invalid or keep_connections option of its server is disabled, then
* discard it to recover. Next GetConnection will open a new
* connection.
*/
if (PQstatus(entry->conn) != CONNECTION_OK ||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
entry->changing_xact_state ||
entry->invalidated ||
!entry->keep_connections)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
disconnect_pg_server(entry);
}
/* If there are any pending connections, finish cleaning them up */
if (pending_entries)
{
Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
event == XACT_EVENT_PRE_COMMIT);
pgfdw_finish_pre_commit_cleanup(pending_entries);
}
/*
@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
/* Commit all remote subtransactions during pre-commit */
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
entry->changing_xact_state = true;
if (entry->parallel_commit)
{
do_sql_command_begin(entry->conn, sql);
pending_entries = lappend(pending_entries, entry);
continue;
}
do_sql_command(entry->conn, sql);
entry->changing_xact_state = false;
}
@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
/* OK, we're outta that level of subtransaction */
entry->xact_depth--;
pgfdw_reset_xact_state(entry, false);
}
/* If there are any pending connections, finish cleaning them up */
if (pending_entries)
{
Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
}
}
@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
server->servername)));
}
/*
* Reset state to show we're out of a (sub)transaction.
*/
static void
pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
{
if (toplevel)
{
/* Reset state to show we're out of a transaction */
entry->xact_depth = 0;
/*
* If the connection isn't in a good idle state, it is marked as
* invalid or keep_connections option of its server is disabled, then
* discard it to recover. Next GetConnection will open a new
* connection.
*/
if (PQstatus(entry->conn) != CONNECTION_OK ||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
entry->changing_xact_state ||
entry->invalidated ||
!entry->keep_connections)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
disconnect_pg_server(entry);
}
}
else
{
/* Reset state to show we're out of a subtransaction */
entry->xact_depth--;
}
}
/*
* Cancel the currently-in-progress query (whose query text we do not have)
* and ignore the result. Returns true if we successfully cancel the query
@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
entry->changing_xact_state = false;
}
/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
*/
static void
pgfdw_finish_pre_commit_cleanup(List *pending_entries)
{
ConnCacheEntry *entry;
List *pending_deallocs = NIL;
ListCell *lc;
Assert(pending_entries);
/*
* Get the result of the COMMIT command for each of the pending entries
*/
foreach(lc, pending_entries)
{
entry = (ConnCacheEntry *) lfirst(lc);
Assert(entry->changing_xact_state);
/*
* We might already have received the result on the socket, so pass
* consume_input=true to try to consume it first
*/
do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
entry->changing_xact_state = false;
/* Do a DEALLOCATE ALL in parallel if needed */
if (entry->have_prep_stmt && entry->have_error)
{
/* Ignore errors (see notes in pgfdw_xact_callback) */
if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
{
pending_deallocs = lappend(pending_deallocs, entry);
continue;
}
}
entry->have_prep_stmt = false;
entry->have_error = false;
pgfdw_reset_xact_state(entry, true);
}
/* No further work if no pending entries */
if (!pending_deallocs)
return;
/*
* Get the result of the DEALLOCATE command for each of the pending
* entries
*/
foreach(lc, pending_deallocs)
{
PGresult *res;
entry = (ConnCacheEntry *) lfirst(lc);
/* Ignore errors (see notes in pgfdw_xact_callback) */
while ((res = PQgetResult(entry->conn)) != NULL)
{
PQclear(res);
/* Stop if the connection is lost (else we'll loop infinitely) */
if (PQstatus(entry->conn) == CONNECTION_BAD)
break;
}
entry->have_prep_stmt = false;
entry->have_error = false;
pgfdw_reset_xact_state(entry, true);
}
}
/*
* Finish pre-subcommit cleanup of connections on each of which we've sent a
* RELEASE command to the remote server.
*/
static void
pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
{
ConnCacheEntry *entry;
char sql[100];
ListCell *lc;
Assert(pending_entries);
/*
* Get the result of the RELEASE command for each of the pending entries
*/
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
foreach(lc, pending_entries)
{
entry = (ConnCacheEntry *) lfirst(lc);
Assert(entry->changing_xact_state);
/*
* We might already have received the result on the socket, so pass
* consume_input=true to try to consume it first
*/
do_sql_command_end(entry->conn, sql, true);
entry->changing_xact_state = false;
pgfdw_reset_xact_state(entry, false);
}
}
/*
* List active foreign server connections.
*

View File

@ -9509,7 +9509,7 @@ DO $d$
END;
$d$;
ERROR: invalid option "password"
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, parallel_commit, keep_connections
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
PL/pgSQL function inline_code_block line 3 at EXECUTE
-- If we add a password for our user mapping instead, we should get a different
@ -10933,3 +10933,79 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity
--Clean up
RESET postgres_fdw.application_name;
RESET debug_discard_caches;
-- ===================================================================
-- test parallel commit
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
CREATE TABLE ploc1 (f1 int, f2 text);
CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
SERVER loopback OPTIONS (table_name 'ploc1');
CREATE TABLE ploc2 (f1 int, f2 text);
CREATE FOREIGN TABLE prem2 (f1 int, f2 text)
SERVER loopback2 OPTIONS (table_name 'ploc2');
BEGIN;
INSERT INTO prem1 VALUES (101, 'foo');
INSERT INTO prem2 VALUES (201, 'bar');
COMMIT;
SELECT * FROM prem1;
f1 | f2
-----+-----
101 | foo
(1 row)
SELECT * FROM prem2;
f1 | f2
-----+-----
201 | bar
(1 row)
BEGIN;
SAVEPOINT s;
INSERT INTO prem1 VALUES (102, 'foofoo');
INSERT INTO prem2 VALUES (202, 'barbar');
RELEASE SAVEPOINT s;
COMMIT;
SELECT * FROM prem1;
f1 | f2
-----+--------
101 | foo
102 | foofoo
(2 rows)
SELECT * FROM prem2;
f1 | f2
-----+--------
201 | bar
202 | barbar
(2 rows)
-- This tests executing DEALLOCATE ALL against foreign servers in parallel
-- during pre-commit
BEGIN;
SAVEPOINT s;
INSERT INTO prem1 VALUES (103, 'baz');
INSERT INTO prem2 VALUES (203, 'qux');
ROLLBACK TO SAVEPOINT s;
RELEASE SAVEPOINT s;
INSERT INTO prem1 VALUES (104, 'bazbaz');
INSERT INTO prem2 VALUES (204, 'quxqux');
COMMIT;
SELECT * FROM prem1;
f1 | f2
-----+--------
101 | foo
102 | foofoo
104 | bazbaz
(3 rows)
SELECT * FROM prem2;
f1 | f2
-----+--------
201 | bar
202 | barbar
204 | quxqux
(3 rows)
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);

View File

@ -121,6 +121,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
strcmp(def->defname, "updatable") == 0 ||
strcmp(def->defname, "truncatable") == 0 ||
strcmp(def->defname, "async_capable") == 0 ||
strcmp(def->defname, "parallel_commit") == 0 ||
strcmp(def->defname, "keep_connections") == 0)
{
/* these accept only boolean values */
@ -249,6 +250,7 @@ InitPgFdwOptions(void)
/* async_capable is available on both server and table */
{"async_capable", ForeignServerRelationId, false},
{"async_capable", ForeignTableRelationId, false},
{"parallel_commit", ForeignServerRelationId, false},
{"keep_connections", ForeignServerRelationId, false},
{"password_required", UserMappingRelationId, false},

View File

@ -3515,3 +3515,49 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity
--Clean up
RESET postgres_fdw.application_name;
RESET debug_discard_caches;
-- ===================================================================
-- test parallel commit
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
CREATE TABLE ploc1 (f1 int, f2 text);
CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
SERVER loopback OPTIONS (table_name 'ploc1');
CREATE TABLE ploc2 (f1 int, f2 text);
CREATE FOREIGN TABLE prem2 (f1 int, f2 text)
SERVER loopback2 OPTIONS (table_name 'ploc2');
BEGIN;
INSERT INTO prem1 VALUES (101, 'foo');
INSERT INTO prem2 VALUES (201, 'bar');
COMMIT;
SELECT * FROM prem1;
SELECT * FROM prem2;
BEGIN;
SAVEPOINT s;
INSERT INTO prem1 VALUES (102, 'foofoo');
INSERT INTO prem2 VALUES (202, 'barbar');
RELEASE SAVEPOINT s;
COMMIT;
SELECT * FROM prem1;
SELECT * FROM prem2;
-- This tests executing DEALLOCATE ALL against foreign servers in parallel
-- during pre-commit
BEGIN;
SAVEPOINT s;
INSERT INTO prem1 VALUES (103, 'baz');
INSERT INTO prem2 VALUES (203, 'qux');
ROLLBACK TO SAVEPOINT s;
RELEASE SAVEPOINT s;
INSERT INTO prem1 VALUES (104, 'bazbaz');
INSERT INTO prem2 VALUES (204, 'quxqux');
COMMIT;
SELECT * FROM prem1;
SELECT * FROM prem2;
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);

View File

@ -456,6 +456,52 @@ OPTIONS (ADD password_required 'false');
</variablelist>
</sect3>
<sect3>
<title>Transaction Management Options</title>
<para>
When multiple remote (sub)transactions are involved in a local
(sub)transaction, by default <filename>postgres_fdw</filename> commits
those remote (sub)transactions one by one when the local (sub)transaction
commits.
Performance can be improved with the following option:
</para>
<variablelist>
<varlistentry>
<term><literal>parallel_commit</literal> (<type>boolean</type>)</term>
<listitem>
<para>
This option controls whether <filename>postgres_fdw</filename> commits
remote (sub)transactions opened on a foreign server in a local
(sub)transaction in parallel when the local (sub)transaction commits.
This option can only be specified for foreign servers, not per-table.
The default is <literal>false</literal>.
</para>
<para>
If multiple foreign servers with this option enabled are involved in
a local (sub)transaction, multiple remote (sub)transactions opened on
those foreign servers in the local (sub)transaction are committed in
parallel across those foreign servers when the local (sub)transaction
commits.
</para>
<para>
For a foreign server with this option enabled, if many remote
(sub)transactions are opened on the foreign server in a local
(sub)transaction, this option might increase the remote servers load
when the local (sub)transaction commits, so be careful when using this
option.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect3>
<sect3>
<title>Updatability Options</title>