Add support for asynchronous execution.

This implements asynchronous execution, which runs multiple parts of a
non-parallel-aware Append concurrently rather than serially to improve
performance when possible.  Currently, the only node type that can be
run concurrently is a ForeignScan that is an immediate child of such an
Append.  In the case where such ForeignScans access data on different
remote servers, this would run those ForeignScans concurrently, and
overlap the remote operations to be performed simultaneously, so it'll
improve the performance especially when the operations involve
time-consuming ones such as remote join and remote aggregation.

We may extend this to other node types such as joins or aggregates over
ForeignScans in the future.

This also adds the support for postgres_fdw, which is enabled by the
table-level/server-level option "async_capable".  The default is false.

Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself.  This commit
is mostly based on the patch proposed by Robert Haas, but also uses
stuff from the patch proposed by Kyotaro Horiguchi and from the patch
proposed by Thomas Munro.  Reviewed by Kyotaro Horiguchi, Konstantin
Knizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, and
others.

Discussion: https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com
Discussion: https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com
Discussion: https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com
This commit is contained in:
Etsuro Fujita 2021-03-31 18:45:00 +09:00
parent 66392d3965
commit 27e1f14563
39 changed files with 2068 additions and 57 deletions

View File

@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
Oid serverid; /* foreign server OID used to get server name */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid);
* will_prep_stmt must be true if caller intends to create any prepared
* statements. Since those don't go away automatically at transaction end
* (not even on error), we need this flag to cue manual cleanup.
*
* If state is not NULL, *state receives the per-connection state associated
* with the PGconn.
*/
PGconn *
GetConnection(UserMapping *user, bool will_prep_stmt)
GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
bool retry = false;
@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
*/
PG_TRY();
{
/* Process a pending asynchronous request if any. */
if (entry->state.pendingAreq)
process_pending_request(entry->state.pendingAreq);
/* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry);
}
@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
/* If caller needs access to the per-connection state, return it. */
if (state)
*state = &entry->state;
return entry->conn;
}
@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn)
* Caller is responsible for the error handling on the result.
*/
PGresult *
pgfdw_exec_query(PGconn *conn, const char *query)
pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
{
/* First, process a pending asynchronous request, if any. */
if (state && state->pendingAreq)
process_pending_request(state->pendingAreq);
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
entry->have_prep_stmt = false;
entry->have_error = false;
/* Also reset per-connection state */
memset(&entry->state, 0, sizeof(entry->state));
}
/* Disarm changing_xact_state if it all worked. */
@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
* Cancel the currently-in-progress query (whose query text we do not have)
* and ignore the result. Returns true if we successfully cancel the query
* and discard any pending result, and false if not.
*
* XXX: if the query was one sent by fetch_more_data_begin(), we could get the
* query text from the pendingAreq saved in the per-connection state, then
* report the query using it.
*/
static bool
pgfdw_cancel_query(PGconn *conn)

View File

@ -8946,7 +8946,7 @@ DO $d$
END;
$d$;
ERROR: invalid option "password"
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
PL/pgSQL function inline_code_block line 3 at EXECUTE
-- If we add a password for our user mapping instead, we should get a different
@ -9437,3 +9437,510 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
-- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
-- ===================================================================
-- test asynchronous execution
-- ===================================================================
ALTER SERVER loopback OPTIONS (DROP extensions);
ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
CREATE TABLE base_tbl1 (a int, b int, c text);
CREATE TABLE base_tbl2 (a int, b int, c text);
CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
SERVER loopback OPTIONS (table_name 'base_tbl1');
CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
SERVER loopback2 OPTIONS (table_name 'base_tbl2');
INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
ANALYZE async_pt;
-- simple queries
CREATE TABLE result_tbl (a int, b int, c text);
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
QUERY PLAN
----------------------------------------------------------------------------------------
Insert on public.result_tbl
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0))
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0))
(8 rows)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
SELECT * FROM result_tbl ORDER BY a;
a | b | c
------+-----+------
1000 | 0 | 0000
1100 | 100 | 0100
1200 | 200 | 0200
1300 | 300 | 0300
1400 | 400 | 0400
1500 | 500 | 0500
1600 | 600 | 0600
1700 | 700 | 0700
1800 | 800 | 0800
1900 | 900 | 0900
2000 | 0 | 0000
2100 | 100 | 0100
2200 | 200 | 0200
2300 | 300 | 0300
2400 | 400 | 0400
2500 | 500 | 0500
2600 | 600 | 0600
2700 | 700 | 0700
2800 | 800 | 0800
2900 | 900 | 0900
(20 rows)
DELETE FROM result_tbl;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
QUERY PLAN
----------------------------------------------------------------
Insert on public.result_tbl
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Filter: (async_pt_1.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl1
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Filter: (async_pt_2.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl2
(10 rows)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
a | b | c
------+-----+------
1505 | 505 | 0505
2505 | 505 | 0505
(2 rows)
DELETE FROM result_tbl;
-- Check case where multiple partitions use the same connection
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
SERVER loopback2 OPTIONS (table_name 'base_tbl3');
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
ANALYZE async_pt;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
QUERY PLAN
----------------------------------------------------------------
Insert on public.result_tbl
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Filter: (async_pt_1.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl1
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Filter: (async_pt_2.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl2
-> Async Foreign Scan on public.async_p3 async_pt_3
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
Filter: (async_pt_3.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl3
(14 rows)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
a | b | c
------+-----+------
1505 | 505 | 0505
2505 | 505 | 0505
3505 | 505 | 0505
(3 rows)
DELETE FROM result_tbl;
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
-- Check case where the partitioned table has local/remote partitions
CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
ANALYZE async_pt;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
QUERY PLAN
----------------------------------------------------------------
Insert on public.result_tbl
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Filter: (async_pt_1.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl1
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Filter: (async_pt_2.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl2
-> Seq Scan on public.async_p3 async_pt_3
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
Filter: (async_pt_3.b === 505)
(13 rows)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
a | b | c
------+-----+------
1505 | 505 | 0505
2505 | 505 | 0505
3505 | 505 | 0505
(3 rows)
DELETE FROM result_tbl;
-- partitionwise joins
SET enable_partitionwise_join TO true;
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.join_tbl
-> Append
-> Async Foreign Scan
Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c
Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
-> Async Foreign Scan
Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c
Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
-> Hash Join
Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c
Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
-> Seq Scan on public.async_p3 t2_3
Output: t2_3.a, t2_3.b, t2_3.c
-> Hash
Output: t1_3.a, t1_3.b, t1_3.c
-> Seq Scan on public.async_p3 t1_3
Output: t1_3.a, t1_3.b, t1_3.c
Filter: ((t1_3.b % 100) = 0)
(20 rows)
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
SELECT * FROM join_tbl ORDER BY a1;
a1 | b1 | c1 | a2 | b2 | c2
------+-----+------+------+-----+------
1000 | 0 | 0000 | 1000 | 0 | 0000
1100 | 100 | 0100 | 1100 | 100 | 0100
1200 | 200 | 0200 | 1200 | 200 | 0200
1300 | 300 | 0300 | 1300 | 300 | 0300
1400 | 400 | 0400 | 1400 | 400 | 0400
1500 | 500 | 0500 | 1500 | 500 | 0500
1600 | 600 | 0600 | 1600 | 600 | 0600
1700 | 700 | 0700 | 1700 | 700 | 0700
1800 | 800 | 0800 | 1800 | 800 | 0800
1900 | 900 | 0900 | 1900 | 900 | 0900
2000 | 0 | 0000 | 2000 | 0 | 0000
2100 | 100 | 0100 | 2100 | 100 | 0100
2200 | 200 | 0200 | 2200 | 200 | 0200
2300 | 300 | 0300 | 2300 | 300 | 0300
2400 | 400 | 0400 | 2400 | 400 | 0400
2500 | 500 | 0500 | 2500 | 500 | 0500
2600 | 600 | 0600 | 2600 | 600 | 0600
2700 | 700 | 0700 | 2700 | 700 | 0700
2800 | 800 | 0800 | 2800 | 800 | 0800
2900 | 900 | 0900 | 2900 | 900 | 0900
3000 | 0 | 0000 | 3000 | 0 | 0000
3100 | 100 | 0100 | 3100 | 100 | 0100
3200 | 200 | 0200 | 3200 | 200 | 0200
3300 | 300 | 0300 | 3300 | 300 | 0300
3400 | 400 | 0400 | 3400 | 400 | 0400
3500 | 500 | 0500 | 3500 | 500 | 0500
3600 | 600 | 0600 | 3600 | 600 | 0600
3700 | 700 | 0700 | 3700 | 700 | 0700
3800 | 800 | 0800 | 3800 | 800 | 0800
3900 | 900 | 0900 | 3900 | 900 | 0900
(30 rows)
DELETE FROM join_tbl;
RESET enable_partitionwise_join;
-- Test interaction of async execution with plan-time partition pruning
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt WHERE a < 3000;
QUERY PLAN
-----------------------------------------------------------------------------
Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
(7 rows)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt WHERE a < 2000;
QUERY PLAN
-----------------------------------------------------------------------
Foreign Scan on public.async_p1 async_pt
Output: async_pt.a, async_pt.b, async_pt.c
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000))
(3 rows)
-- Test interaction of async execution with run-time partition pruning
SET plan_cache_mode TO force_generic_plan;
PREPARE async_pt_query (int, int) AS
INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
EXPLAIN (VERBOSE, COSTS OFF)
EXECUTE async_pt_query (3000, 505);
QUERY PLAN
------------------------------------------------------------------------------------------
Insert on public.result_tbl
-> Append
Subplans Removed: 1
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Filter: (async_pt_1.b === $2)
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Filter: (async_pt_2.b === $2)
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer))
(11 rows)
EXECUTE async_pt_query (3000, 505);
SELECT * FROM result_tbl ORDER BY a;
a | b | c
------+-----+------
1505 | 505 | 0505
2505 | 505 | 0505
(2 rows)
DELETE FROM result_tbl;
EXPLAIN (VERBOSE, COSTS OFF)
EXECUTE async_pt_query (2000, 505);
QUERY PLAN
------------------------------------------------------------------------------------------
Insert on public.result_tbl
-> Append
Subplans Removed: 2
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Filter: (async_pt_1.b === $2)
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
(7 rows)
EXECUTE async_pt_query (2000, 505);
SELECT * FROM result_tbl ORDER BY a;
a | b | c
------+-----+------
1505 | 505 | 0505
(1 row)
DELETE FROM result_tbl;
RESET plan_cache_mode;
CREATE TABLE local_tbl(a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
ANALYZE local_tbl;
CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
CREATE INDEX async_p3_idx ON async_p3 (a);
ANALYZE base_tbl1;
ANALYZE base_tbl2;
ANALYZE async_p3;
ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
QUERY PLAN
------------------------------------------------------------------------------------------
Nested Loop
Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c
-> Seq Scan on public.local_tbl
Output: local_tbl.a, local_tbl.b, local_tbl.c
Filter: (local_tbl.c = 'bar'::text)
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a))
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a))
-> Seq Scan on public.async_p3 async_pt_3
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
Filter: (local_tbl.a = async_pt_3.a)
(15 rows)
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
QUERY PLAN
-------------------------------------------------------------------------------
Nested Loop (actual rows=1 loops=1)
-> Seq Scan on local_tbl (actual rows=1 loops=1)
Filter: (c = 'bar'::text)
Rows Removed by Filter: 1
-> Append (actual rows=1 loops=1)
-> Async Foreign Scan on async_p1 async_pt_1 (never executed)
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1)
-> Seq Scan on async_p3 async_pt_3 (never executed)
Filter: (local_tbl.a = a)
(9 rows)
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
a | b | c | a | b | c
------+-----+-----+------+-----+------
2505 | 505 | bar | 2505 | 505 | 0505
(1 row)
ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
DROP TABLE local_tbl;
DROP INDEX base_tbl1_idx;
DROP INDEX base_tbl2_idx;
DROP INDEX async_p3_idx;
-- Test that pending requests are processed properly
SET enable_mergejoin TO false;
SET enable_hashjoin TO false;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
QUERY PLAN
----------------------------------------------------------------
Nested Loop
Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
Join Filter: (t1.a = t2.a)
-> Append
-> Async Foreign Scan on public.async_p1 t1_1
Output: t1_1.a, t1_1.b, t1_1.c
Filter: (t1_1.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl1
-> Async Foreign Scan on public.async_p2 t1_2
Output: t1_2.a, t1_2.b, t1_2.c
Filter: (t1_2.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl2
-> Seq Scan on public.async_p3 t1_3
Output: t1_3.a, t1_3.b, t1_3.c
Filter: (t1_3.b === 505)
-> Materialize
Output: t2.a, t2.b, t2.c
-> Foreign Scan on public.async_p2 t2
Output: t2.a, t2.b, t2.c
Remote SQL: SELECT a, b, c FROM public.base_tbl2
(20 rows)
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
a | b | c | a | b | c
------+-----+------+------+-----+------
2505 | 505 | 0505 | 2505 | 505 | 0505
(1 row)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
QUERY PLAN
----------------------------------------------------------------
Limit
Output: t1.a, t1.b, t1.c
-> Append
-> Async Foreign Scan on public.async_p1 t1_1
Output: t1_1.a, t1_1.b, t1_1.c
Filter: (t1_1.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl1
-> Async Foreign Scan on public.async_p2 t1_2
Output: t1_2.a, t1_2.b, t1_2.c
Filter: (t1_2.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl2
-> Seq Scan on public.async_p3 t1_3
Output: t1_3.a, t1_3.b, t1_3.c
Filter: (t1_3.b === 505)
(14 rows)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
a | b | c
------+-----+------
3505 | 505 | 0505
(1 row)
-- Check with foreign modify
CREATE TABLE local_tbl (a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl3');
INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
CREATE TABLE base_tbl4 (a int, b int, c text);
CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl4');
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
QUERY PLAN
-------------------------------------------------------------------------
Insert on public.insert_tbl
Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3)
Batch Size: 1
-> Append
-> Seq Scan on public.local_tbl
Output: local_tbl.a, local_tbl.b, local_tbl.c
-> Async Foreign Scan on public.remote_tbl
Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
Remote SQL: SELECT a, b, c FROM public.base_tbl3
(9 rows)
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
SELECT * FROM insert_tbl ORDER BY a;
a | b | c
------+-----+-----
1505 | 505 | foo
2505 | 505 | bar
(2 rows)
-- Check with direct modify
EXPLAIN (VERBOSE, COSTS OFF)
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
QUERY PLAN
----------------------------------------------------------------------------------------
Insert on public.join_tbl
CTE t
-> Update on public.remote_tbl
Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
-> Foreign Update on public.remote_tbl
Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c
-> Nested Loop Left Join
Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c
Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b))
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
Filter: (async_pt_1.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl1
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
Filter: (async_pt_2.b === 505)
Remote SQL: SELECT a, b, c FROM public.base_tbl2
-> Seq Scan on public.async_p3 async_pt_3
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
Filter: (async_pt_3.b === 505)
-> CTE Scan on t
Output: t.a, t.b, t.c
(23 rows)
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
SELECT * FROM join_tbl ORDER BY a1;
a1 | b1 | c1 | a2 | b2 | c2
------+-----+------+------+-----+--------
1505 | 505 | 0505 | | |
2505 | 505 | 0505 | 2505 | 505 | barbar
3505 | 505 | 0505 | | |
(3 rows)
DELETE FROM join_tbl;
RESET enable_mergejoin;
RESET enable_hashjoin;
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
DROP TABLE local_tbl;
DROP FOREIGN TABLE remote_tbl;
DROP FOREIGN TABLE insert_tbl;
DROP TABLE base_tbl3;
DROP TABLE base_tbl4;
DROP TABLE join_tbl;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);

View File

@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
* Validate option value, when we can do so without any context.
*/
if (strcmp(def->defname, "use_remote_estimate") == 0 ||
strcmp(def->defname, "updatable") == 0)
strcmp(def->defname, "updatable") == 0 ||
strcmp(def->defname, "async_capable") == 0)
{
/* these accept only boolean values */
(void) defGetBoolean(def);
@ -217,6 +218,9 @@ InitPgFdwOptions(void)
/* batch_size is available on both server and table */
{"batch_size", ForeignServerRelationId, false},
{"batch_size", ForeignTableRelationId, false},
/* async_capable is available on both server and table */
{"async_capable", ForeignServerRelationId, false},
{"async_capable", ForeignTableRelationId, false},
{"password_required", UserMappingRelationId, false},
/*

View File

@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
#include "executor/execAsync.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@ -37,6 +38,7 @@
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "postgres_fdw.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/float.h"
#include "utils/guc.h"
@ -143,6 +145,7 @@ typedef struct PgFdwScanState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
PgFdwConnState *conn_state; /* extra per-connection state */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@ -159,6 +162,9 @@ typedef struct PgFdwScanState
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
/* for asynchronous execution */
bool async_capable; /* engage asynchronous-capable logic? */
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
PgFdwConnState *conn_state; /* extra per-connection state */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
/* for remote query execution */
PGconn *conn; /* connection for the update */
PgFdwConnState *conn_state; /* extra per-connection state */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
static void postgresForeignAsyncRequest(AsyncRequest *areq);
static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
static void postgresForeignAsyncNotify(AsyncRequest *areq);
/*
* Helper functions
@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
void *arg);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static void close_cursor(PGconn *conn, unsigned int cursor_number,
PgFdwConnState *conn_state);
static PgFdwModifyState *create_foreign_modify(EState *estate,
RangeTblEntry *rte,
ResultRelInfo *resultRelInfo,
@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double *totaldeadrows);
static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
static void fetch_more_data_begin(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
/* Support functions for asynchronous execution */
routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
PG_RETURN_POINTER(routine);
}
@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root,
/*
* Extract user-settable option values. Note that per-table settings of
* use_remote_estimate and fetch_size override per-server settings of
* them, respectively.
* use_remote_estimate, fetch_size and async_capable override per-server
* settings of them, respectively.
*/
fpinfo->use_remote_estimate = false;
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
fpinfo->shippable_extensions = NIL;
fpinfo->fetch_size = 100;
fpinfo->async_capable = false;
apply_server_options(fpinfo);
apply_table_options(fpinfo);
@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
fsstate->conn = GetConnection(user, false);
fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
/* Set the async-capable flag */
fsstate->async_capable = node->ss.ps.plan->async_capable;
}
/*
@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node)
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
* If this is the first call after Begin or ReScan, we need to create the
* cursor on the remote side.
* In sync mode, if this is the first call after Begin or ReScan, we need
* to create the cursor on the remote side. In async mode, we would have
* already created the cursor before we get here, even if this is the
* first call after Begin or ReScan.
*/
if (!fsstate->cursor_exists)
create_cursor(node);
@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node)
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
/* In async mode, just clear tuple slot. */
if (fsstate->async_capable)
return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
fetch_more_data(node);
@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_exec_query(fsstate->conn, sql);
res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
close_cursor(fsstate->conn, fsstate->cursor_number);
close_cursor(fsstate->conn, fsstate->cursor_number,
fsstate->conn_state);
/* Release remote connection */
ReleaseConnection(fsstate->conn);
@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
dmstate->conn = GetConnection(user, false);
dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
conn = GetConnection(fpinfo->user, false);
conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
res = pgfdw_exec_query(conn, sql);
res = pgfdw_exec_query(conn, sql, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node)
StringInfoData buf;
PGresult *res;
/* First, process a pending asynchronous request, if any. */
if (fsstate->conn_state->pendingAreq)
process_pending_request(fsstate->conn_state->pendingAreq);
/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node)
PG_TRY();
{
PGconn *conn = fsstate->conn;
char sql[64];
int numrows;
int i;
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
if (fsstate->async_capable)
{
Assert(fsstate->conn_state->pendingAreq);
res = pgfdw_exec_query(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
/*
* The query was already sent by an earlier call to
* fetch_more_data_begin. So now we just fetch the result.
*/
res = pgfdw_get_result(conn, fsstate->query);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
/* Reset per-connection state */
fsstate->conn_state->pendingAreq = NULL;
}
else
{
char sql[64];
/* This is a regular synchronous fetch. */
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
}
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel)
* Utility routine to close a cursor.
*/
static void
close_cursor(PGconn *conn, unsigned int cursor_number)
close_cursor(PGconn *conn, unsigned int cursor_number,
PgFdwConnState *conn_state)
{
char sql[64];
PGresult *res;
@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_exec_query(conn, sql);
res = pgfdw_exec_query(conn, sql, conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
fmstate->conn = GetConnection(user, true);
fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate,
operation == CMD_UPDATE ||
operation == CMD_DELETE);
/* First, process a pending asynchronous request, if any. */
if (fmstate->conn_state->pendingAreq)
process_pending_request(fmstate->conn_state->pendingAreq);
/*
* If the existing query was deparsed and prepared for a different number
* of rows, rebuild it for the proper number.
@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
char *p_name;
PGresult *res;
/*
* The caller would already have processed a pending asynchronous request
* if any, so no need to do it here.
*/
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
GetPrepStmtNumber(fmstate->conn));
@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_exec_query(fmstate->conn, sql);
res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node)
int numParams = dmstate->numParams;
const char **values = dmstate->param_values;
/* First, process a pending asynchronous request, if any. */
if (dmstate->conn_state->pendingAreq)
process_pending_request(dmstate->conn_state->pendingAreq);
/*
* Construct array of query parameter values in text format.
*/
@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
conn = GetConnection(user, false);
conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
res = pgfdw_exec_query(conn, sql.data);
res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
conn = GetConnection(user, false);
conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
int fetch_size;
ListCell *lc;
res = pgfdw_exec_query(conn, sql.data);
res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
*/
/* Fetch some rows */
res = pgfdw_exec_query(conn, fetch_sql);
res = pgfdw_exec_query(conn, fetch_sql, NULL);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
}
/* Close the cursor, just to be tidy. */
close_cursor(conn, cursor_number);
close_cursor(conn, cursor_number, NULL);
}
PG_CATCH();
{
@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
conn = GetConnection(mapping, false);
conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
res = pgfdw_exec_query(conn, buf.data);
res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
res = pgfdw_exec_query(conn, buf.data);
res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
ExtractExtensionList(defGetString(def), false);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
}
}
@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
fpinfo->use_remote_estimate = defGetBoolean(def);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
}
}
@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
fpinfo->async_capable = fpinfo_o->async_capable;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
* relation sizes.
*/
fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
/*
* We'll prefer to consider this join async-capable if any table from
* either side of the join is considered async-capable.
*/
fpinfo->async_capable = fpinfo_o->async_capable ||
fpinfo_i->async_capable;
}
}
@ -6489,6 +6571,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
add_path(final_rel, (Path *) final_path);
}
/*
* postgresIsForeignPathAsyncCapable
* Check whether a given ForeignPath node is async-capable.
*/
static bool
postgresIsForeignPathAsyncCapable(ForeignPath *path)
{
RelOptInfo *rel = ((Path *) path)->parent;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
return fpinfo->async_capable;
}
/*
* postgresForeignAsyncRequest
* Asynchronously request next tuple from a foreign PostgreSQL table.
*/
static void
postgresForeignAsyncRequest(AsyncRequest *areq)
{
produce_tuple_asynchronously(areq, true);
}
/*
* postgresForeignAsyncConfigureWait
* Configure a file descriptor event for which we wish to wait.
*/
static void
postgresForeignAsyncConfigureWait(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
AppendState *requestor = (AppendState *) areq->requestor;
WaitEventSet *set = requestor->as_eventset;
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
/* The core code would have registered postmaster death event */
Assert(GetNumRegisteredWaitEvents(set) >= 1);
/* Begin an asynchronous data fetch if not already done */
if (!pendingAreq)
fetch_more_data_begin(areq);
else if (pendingAreq->requestor != areq->requestor)
{
/*
* This is the case when the in-process request was made by another
* Append. Note that it might be useless to process the request,
* because the query might not need tuples from that Append anymore.
* Skip the given request if there are any configured events other
* than the postmaster death event; otherwise process the request,
* then begin a fetch to configure the event below, because otherwise
* we might end up with no configured events other than the postmaster
* death event.
*/
if (GetNumRegisteredWaitEvents(set) > 1)
return;
process_pending_request(pendingAreq);
fetch_more_data_begin(areq);
}
else if (pendingAreq->requestee != areq->requestee)
{
/*
* This is the case when the in-process request was made by the same
* parent but for a different child. Since we configure only the
* event for the request made for that child, skip the given request.
*/
return;
}
else
Assert(pendingAreq == areq);
AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
NULL, areq);
}
/*
* postgresForeignAsyncNotify
* Fetch some more tuples from a file descriptor that becomes ready,
* requesting next tuple.
*/
static void
postgresForeignAsyncNotify(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
/* The core code would have initialized the callback_pending flag */
Assert(!areq->callback_pending);
/* On error, report the original query, not the FETCH. */
if (!PQconsumeInput(fsstate->conn))
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
fetch_more_data(node);
produce_tuple_asynchronously(areq, true);
}
/*
* Asynchronously produce next tuple from a foreign PostgreSQL table.
*/
static void
produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
TupleTableSlot *result;
/* This should not be called if the request is currently in-process */
Assert(areq != pendingAreq);
/* Fetch some more tuples, if we've run out */
if (fsstate->next_tuple >= fsstate->num_tuples)
{
/* No point in another fetch if we already detected EOF, though */
if (!fsstate->eof_reached)
{
/* Mark the request as pending for a callback */
ExecAsyncRequestPending(areq);
/* Begin another fetch if requested and if no pending request */
if (fetch && !pendingAreq)
fetch_more_data_begin(areq);
}
else
{
/* There's nothing more to do; just return a NULL pointer */
result = NULL;
/* Mark the request as complete */
ExecAsyncRequestDone(areq, result);
}
return;
}
/* Get a tuple from the ForeignScan node */
result = ExecProcNode((PlanState *) node);
if (!TupIsNull(result))
{
/* Mark the request as complete */
ExecAsyncRequestDone(areq, result);
return;
}
Assert(fsstate->next_tuple >= fsstate->num_tuples);
/* Fetch some more tuples, if we've not detected EOF yet */
if (!fsstate->eof_reached)
{
/* Mark the request as pending for a callback */
ExecAsyncRequestPending(areq);
/* Begin another fetch if requested and if no pending request */
if (fetch && !pendingAreq)
fetch_more_data_begin(areq);
}
else
{
/* There's nothing more to do; just return a NULL pointer */
result = NULL;
/* Mark the request as complete */
ExecAsyncRequestDone(areq, result);
}
}
/*
* Begin an asynchronous data fetch.
*
* Note: fetch_more_data must be called to fetch the result.
*/
static void
fetch_more_data_begin(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
char sql[64];
Assert(!fsstate->conn_state->pendingAreq);
/* Create the cursor synchronously. */
if (!fsstate->cursor_exists)
create_cursor(node);
/* We will send this query, but not wait for the response. */
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
if (PQsendQuery(fsstate->conn, sql) < 0)
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
/* Remember that the request is in process */
fsstate->conn_state->pendingAreq = areq;
}
/*
* Process a pending asynchronous request.
*/
void
process_pending_request(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
EState *estate = node->ss.ps.state;
MemoryContext oldcontext;
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/* The request would have been pending for a callback */
Assert(areq->callback_pending);
/* Unlike AsyncNotify, we unset callback_pending ourselves */
areq->callback_pending = false;
fetch_more_data(node);
/* We need to send a new query afterwards; don't fetch */
produce_tuple_asynchronously(areq, false);
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
ExecAsyncResponse(areq);
MemoryContextSwitchTo(oldcontext);
}
/*
* Create a tuple from the specified row of the PGresult.
*

View File

@ -16,6 +16,7 @@
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo
Cost fdw_startup_cost;
Cost fdw_tuple_cost;
List *shippable_extensions; /* OIDs of shippable extensions */
bool async_capable;
/* Cached catalog information. */
ForeignTable *table;
@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo
int relation_index;
} PgFdwRelationInfo;
/*
* Extra control information relating to a connection.
*/
typedef struct PgFdwConnState
{
AsyncRequest *pendingAreq; /* pending async request */
} PgFdwConnState;
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
extern void process_pending_request(AsyncRequest *areq);
/* in connection.c */
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql);

View File

@ -2928,3 +2928,198 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
-- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
-- ===================================================================
-- test asynchronous execution
-- ===================================================================
ALTER SERVER loopback OPTIONS (DROP extensions);
ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
CREATE TABLE base_tbl1 (a int, b int, c text);
CREATE TABLE base_tbl2 (a int, b int, c text);
CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
SERVER loopback OPTIONS (table_name 'base_tbl1');
CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
SERVER loopback2 OPTIONS (table_name 'base_tbl2');
INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
ANALYZE async_pt;
-- simple queries
CREATE TABLE result_tbl (a int, b int, c text);
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
-- Check case where multiple partitions use the same connection
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
SERVER loopback2 OPTIONS (table_name 'base_tbl3');
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
ANALYZE async_pt;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
-- Check case where the partitioned table has local/remote partitions
CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
ANALYZE async_pt;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
-- partitionwise joins
SET enable_partitionwise_join TO true;
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
SELECT * FROM join_tbl ORDER BY a1;
DELETE FROM join_tbl;
RESET enable_partitionwise_join;
-- Test interaction of async execution with plan-time partition pruning
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt WHERE a < 3000;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt WHERE a < 2000;
-- Test interaction of async execution with run-time partition pruning
SET plan_cache_mode TO force_generic_plan;
PREPARE async_pt_query (int, int) AS
INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
EXPLAIN (VERBOSE, COSTS OFF)
EXECUTE async_pt_query (3000, 505);
EXECUTE async_pt_query (3000, 505);
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
EXPLAIN (VERBOSE, COSTS OFF)
EXECUTE async_pt_query (2000, 505);
EXECUTE async_pt_query (2000, 505);
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
RESET plan_cache_mode;
CREATE TABLE local_tbl(a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
ANALYZE local_tbl;
CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
CREATE INDEX async_p3_idx ON async_p3 (a);
ANALYZE base_tbl1;
ANALYZE base_tbl2;
ANALYZE async_p3;
ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
DROP TABLE local_tbl;
DROP INDEX base_tbl1_idx;
DROP INDEX base_tbl2_idx;
DROP INDEX async_p3_idx;
-- Test that pending requests are processed properly
SET enable_mergejoin TO false;
SET enable_hashjoin TO false;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
-- Check with foreign modify
CREATE TABLE local_tbl (a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl3');
INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
CREATE TABLE base_tbl4 (a int, b int, c text);
CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl4');
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
SELECT * FROM insert_tbl ORDER BY a;
-- Check with direct modify
EXPLAIN (VERBOSE, COSTS OFF)
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
SELECT * FROM join_tbl ORDER BY a1;
DELETE FROM join_tbl;
RESET enable_mergejoin;
RESET enable_hashjoin;
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
DROP TABLE local_tbl;
DROP FOREIGN TABLE remote_tbl;
DROP FOREIGN TABLE insert_tbl;
DROP TABLE base_tbl3;
DROP TABLE base_tbl4;
DROP TABLE join_tbl;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);

View File

@ -4787,6 +4787,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</para>
<variablelist>
<varlistentry id="guc-enable-async-append" xreflabel="enable_async_append">
<term><varname>enable_async_append</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>enable_async_append</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Enables or disables the query planner's use of async-aware
append plan types. The default is <literal>on</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
<term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
<indexterm>

View File

@ -1483,6 +1483,96 @@ ShutdownForeignScan(ForeignScanState *node);
</para>
</sect2>
<sect2 id="fdw-callbacks-async">
<title>FDW Routines for Asynchronous Execution</title>
<para>
A <structname>ForeignScan</structname> node can, optionally, support
asynchronous execution as described in
<filename>src/backend/executor/README</filename>. The following
functions are all optional, but are all required if asynchronous
execution is to be supported.
</para>
<para>
<programlisting>
bool
IsForeignPathAsyncCapable(ForeignPath *path);
</programlisting>
Test whether a given <structname>ForeignPath</structname> path can scan
the underlying foreign relation asynchronously.
This function will only be called at the end of query planning when the
given path is a direct child of an <structname>AppendPath</structname>
path and when the planner believes that asynchronous execution improves
performance, and should return true if the given path is able to scan the
foreign relation asynchronously.
</para>
<para>
If this function is not defined, it is assumed that the given path scans
the foreign relation using <function>IterateForeignScan</function>.
(This implies that the callback functions described below will never be
called, so they need not be provided either.)
</para>
<para>
<programlisting>
void
ForeignAsyncRequest(AsyncRequest *areq);
</programlisting>
Produce one tuple asynchronously from the
<structname>ForeignScan</structname> node. <literal>areq</literal> is
the <structname>AsyncRequest</structname> struct describing the
<structname>ForeignScan</structname> node and the parent
<structname>Append</structname> node that requested the tuple from it.
This function should store the tuple into the slot specified by
<literal>areq-&gt;result</literal>, and set
<literal>areq-&gt;request_complete</literal> to <literal>true</literal>;
or if it needs to wait on an event external to the core server such as
network I/O, and cannot produce any tuple immediately, set the flag to
<literal>false</literal>, and set
<literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
for the <structname>ForeignScan</structname> node to get a callback from
the callback functions described below. If no more tuples are available,
set the slot to NULL, and the
<literal>areq-&gt;request_complete</literal> flag to
<literal>true</literal>. It's recommended to use
<function>ExecAsyncRequestDone</function> or
<function>ExecAsyncRequestPending</function> to set the output parameters
in the <literal>areq</literal>.
</para>
<para>
<programlisting>
void
ForeignAsyncConfigureWait(AsyncRequest *areq);
</programlisting>
Configure a file descriptor event for which the
<structname>ForeignScan</structname> node wishes to wait.
This function will only be called when the
<structname>ForeignScan</structname> node has the
<literal>areq-&gt;callback_pending</literal> flag set, and should add
the event to the <structfield>as_eventset</structfield> of the parent
<structname>Append</structname> node described by the
<literal>areq</literal>. See the comments for
<function>ExecAsyncConfigureWait</function> in
<filename>src/backend/executor/execAsync.c</filename> for additional
information. When the file descriptor event occurs,
<function>ForeignAsyncNotify</function> will be called.
</para>
<para>
<programlisting>
void
ForeignAsyncNotify(AsyncRequest *areq);
</programlisting>
Process a relevant event that has occurred, then produce one tuple
asynchronously from the <structname>ForeignScan</structname> node.
This function should set the output parameters in the
<literal>areq</literal> in the same way as
<function>ForeignAsyncRequest</function>.
</para>
</sect2>
<sect2 id="fdw-callbacks-reparameterize-paths">
<title>FDW Routines for Reparameterization of Paths</title>

View File

@ -1564,6 +1564,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</thead>
<tbody>
<row>
<entry><literal>AppendReady</literal></entry>
<entry>Waiting for subplan nodes of an <literal>Append</literal> plan
node to be ready.</entry>
</row>
<row>
<entry><literal>BackupWaitWalArchive</literal></entry>
<entry>Waiting for WAL files required for a backup to be successfully

View File

@ -371,6 +371,34 @@ OPTIONS (ADD password_required 'false');
</sect3>
<sect3>
<title>Asynchronous Execution Options</title>
<para>
<filename>postgres_fdw</filename> supports asynchronous execution, which
runs multiple parts of an <structname>Append</structname> node
concurrently rather than serially to improve performance.
This execution can be controled using the following option:
</para>
<variablelist>
<varlistentry>
<term><literal>async_capable</literal></term>
<listitem>
<para>
This option controls whether <filename>postgres_fdw</filename> allows
foreign tables to be scanned concurrently for asynchronous execution.
It can be specified for a foreign table or a foreign server.
A table-level option overrides a server-level option.
The default is <literal>false</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect3>
<sect3>
<title>Updatability Options</title>

View File

@ -1394,6 +1394,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
if (plan->async_capable)
appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@ -1413,6 +1415,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
if (custom_name)
ExplainPropertyText("Custom Plan Provider", custom_name, es);
ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
ExplainPropertyBool("Async Capable", plan->async_capable, es);
}
switch (nodeTag(plan))

View File

@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \

View File

@ -359,3 +359,43 @@ query returning the same set of scan tuples multiple times. Likewise,
SRFs are disallowed in an UPDATE's targetlist. There, they would have the
effect of the same row being updated multiple times, which is not very
useful --- and updates after the first would have no effect anyway.
Asynchronous Execution
----------------------
In cases where a node is waiting on an event external to the database system,
such as a ForeignScan awaiting network I/O, it's desirable for the node to
indicate that it cannot return any tuple immediately but may be able to do so
at a later time. A process which discovers this type of situation can always
handle it simply by blocking, but this may waste time that could be spent
executing some other part of the plan tree where progress could be made
immediately. This is particularly likely to occur when the plan tree contains
an Append node. Asynchronous execution runs multiple parts of an Append node
concurrently rather than serially to improve performance.
For asynchronous execution, an Append node must first request a tuple from an
async-capable child node using ExecAsyncRequest. Next, it must execute the
asynchronous event loop using ExecAppendAsyncEventWait. Eventually, when a
child node to which an asynchronous request has been made produces a tuple,
the Append node will receive it from the event loop via ExecAsyncResponse. In
the current implementation of asynchronous execution, the only node type that
requests tuples from an async-capable child node is an Append, while the only
node type that might be async-capable is a ForeignScan.
Typically, the ExecAsyncResponse callback is the only one required for nodes
that wish to request tuples asynchronously. On the other hand, async-capable
nodes generally need to implement three methods:
1. When an asynchronous request is made, the node's ExecAsyncRequest callback
will be invoked; it should use ExecAsyncRequestPending to indicate that the
request is pending for a callback described below. Alternatively, it can
instead use ExecAsyncRequestDone if a result is available immediately.
2. When the event loop wishes to wait or poll for file descriptor events, the
node's ExecAsyncConfigureWait callback will be invoked to configure the
file descriptor event for which the node wishes to wait.
3. When the file descriptor becomes ready, the node's ExecAsyncNotify callback
will be invoked; like #1, it should use ExecAsyncRequestPending for another
callback or ExecAsyncRequestDone to return a result immediately.

View File

@ -531,6 +531,10 @@ ExecSupportsBackwardScan(Plan *node)
{
ListCell *l;
/* With async, tuples may be interleaved, so can't back up. */
if (((Append *) node)->nasyncplans > 0)
return false;
foreach(l, ((Append *) node)->appendplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))

View File

@ -0,0 +1,124 @@
/*-------------------------------------------------------------------------
*
* execAsync.c
* Support routines for asynchronous execution
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/executor/execAsync.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "executor/execAsync.h"
#include "executor/nodeAppend.h"
#include "executor/nodeForeignscan.h"
/*
* Asynchronously request a tuple from a designed async-capable node.
*/
void
ExecAsyncRequest(AsyncRequest *areq)
{
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
ExecAsyncForeignScanRequest(areq);
break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestee));
}
ExecAsyncResponse(areq);
}
/*
* Give the asynchronous node a chance to configure the file descriptor event
* for which it wishes to wait. We expect the node-type specific callback to
* make a single call of the following form:
*
* AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
*/
void
ExecAsyncConfigureWait(AsyncRequest *areq)
{
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
ExecAsyncForeignScanConfigureWait(areq);
break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestee));
}
}
/*
* Call the asynchronous node back when a relevant event has occurred.
*/
void
ExecAsyncNotify(AsyncRequest *areq)
{
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
ExecAsyncForeignScanNotify(areq);
break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestee));
}
ExecAsyncResponse(areq);
}
/*
* Call the requestor back when an asynchronous node has produced a result.
*/
void
ExecAsyncResponse(AsyncRequest *areq)
{
switch (nodeTag(areq->requestor))
{
case T_AppendState:
ExecAsyncAppendResponse(areq);
break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestor));
}
}
/*
* A requestee node should call this function to deliver the tuple to its
* requestor node. The requestee node can call this from its ExecAsyncRequest
* or ExecAsyncNotify callback.
*/
void
ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
{
areq->request_complete = true;
areq->result = result;
}
/*
* A requestee node should call this function to indicate that it is pending
* for a callback. The requestee node can call this from its ExecAsyncRequest
* or ExecAsyncNotify callback.
*/
void
ExecAsyncRequestPending(AsyncRequest *areq)
{
areq->callback_pending = true;
areq->request_complete = false;
areq->result = NULL;
}

View File

@ -57,10 +57,13 @@
#include "postgres.h"
#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@ -78,12 +81,18 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
static void ExecAppendAsyncBegin(AppendState *node);
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
static void ExecAppendAsyncEventWait(AppendState *node);
static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
Bitmapset *asyncplans;
int nplans;
int nasyncplans;
int firstvalid;
int i,
j;
@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
appendstate->as_syncdone = false;
appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@ -191,12 +204,25 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* While at it, find out the first valid partial plan.
*/
j = 0;
asyncplans = NULL;
nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
/*
* Record async subplans. When executing EvalPlanQual, we treat them
* as sync ones; don't do this when initializing an EvalPlanQual plan
* tree.
*/
if (initNode->async_capable && estate->es_epq_active == NULL)
{
asyncplans = bms_add_member(asyncplans, j);
nasyncplans++;
}
/*
* Record the lowest appendplans index which is a valid partial plan.
*/
@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
/* Initialize async state */
appendstate->as_asyncplans = asyncplans;
appendstate->as_nasyncplans = nasyncplans;
appendstate->as_asyncrequests = NULL;
appendstate->as_asyncresults = (TupleTableSlot **)
palloc0(nasyncplans * sizeof(TupleTableSlot *));
appendstate->as_needrequest = NULL;
appendstate->as_eventset = NULL;
if (nasyncplans > 0)
{
appendstate->as_asyncrequests = (AsyncRequest **)
palloc0(nplans * sizeof(AsyncRequest *));
i = -1;
while ((i = bms_next_member(asyncplans, i)) >= 0)
{
AsyncRequest *areq;
areq = palloc(sizeof(AsyncRequest));
areq->requestor = (PlanState *) appendstate;
areq->requestee = appendplanstates[i];
areq->request_index = i;
areq->callback_pending = false;
areq->request_complete = false;
areq->result = NULL;
appendstate->as_asyncrequests[i] = areq;
}
}
/*
* Miscellaneous initialization
*/
@ -232,31 +289,59 @@ static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
TupleTableSlot *result;
if (node->as_whichplan < 0)
/*
* If this is the first call after Init or ReScan, we need to do the
* initialization work.
*/
if (!node->as_begun)
{
Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
Assert(!node->as_syncdone);
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
/* If there are any async subplans, begin executing them. */
if (node->as_nasyncplans > 0)
ExecAppendAsyncBegin(node);
/*
* If no subplan has been chosen, we must choose one before
* If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
!node->choose_next_subplan(node))
if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
Assert(node->as_syncdone ||
(node->as_whichplan >= 0 &&
node->as_whichplan < node->as_nplans));
/* And we're initialized. */
node->as_begun = true;
}
for (;;)
{
PlanState *subnode;
TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
* figure out which subplan we are currently processing
* try to get a tuple from an async subplan if any
*/
if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
{
if (ExecAppendAsyncGetNext(node, &result))
return result;
Assert(!node->as_syncdone);
Assert(bms_is_empty(node->as_needrequest));
}
/*
* figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate)
return result;
}
/* choose new subplan; if none, we're done */
if (!node->choose_next_subplan(node))
/*
* wait or poll async events if any. We do this before checking for
* the end of iteration, because it might drain the remaining async
* subplans.
*/
if (node->as_nasyncremain > 0)
ExecAppendAsyncEventWait(node);
/* choose new sync subplan; if no sync/async subplans, we're done */
if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
int nasyncplans = node->as_nasyncplans;
int i;
/*
@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node)
{
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
if (nasyncplans > 0)
{
bms_free(node->as_valid_asyncplans);
node->as_valid_asyncplans = NULL;
}
}
for (i = 0; i < node->as_nplans; i++)
@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
/* Reset async state */
if (nasyncplans > 0)
{
i = -1;
while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
areq->callback_pending = false;
areq->request_complete = false;
areq->result = NULL;
}
bms_free(node->as_needrequest);
node->as_needrequest = NULL;
}
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
node->as_syncdone = false;
node->as_begun = false;
}
/* ----------------------------------------------------------------
@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
* Choose next subplan for a non-parallel-aware Append,
* Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node)
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
/* Nothing to do if syncdone */
if (node->as_syncdone)
return false;
/*
* If first call then have the bms member function choose the first valid
* subplan by initializing whichplan to -1. If there happen to be no
* valid subplans then the bms member function will handle that by
* returning a negative number which will allow us to exit returning a
* sync subplan by initializing whichplan to -1. If there happen to be
* no valid sync subplans then the bms member function will handle that
* by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
if (node->as_valid_subplans == NULL)
if (node->as_nasyncplans > 0)
{
/* We'd have filled as_valid_subplans already */
Assert(node->as_valid_subplans);
}
else if (node->as_valid_subplans == NULL)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node)
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
{
/* Set as_syncdone if in async mode */
if (node->as_nasyncplans > 0)
node->as_syncdone = true;
return false;
}
node->as_whichplan = nextplan;
@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node)
node->as_pstate->pa_finished[i] = true;
}
}
/* ----------------------------------------------------------------
* Asynchronous Append Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecAppendAsyncBegin
*
* Begin executing designed async-capable subplans.
* ----------------------------------------------------------------
*/
static void
ExecAppendAsyncBegin(AppendState *node)
{
int i;
/* Backward scan is not supported by async-aware Appends. */
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
/* We should never be called when there are no async subplans. */
Assert(node->as_nasyncplans > 0);
/* If we've yet to determine the valid subplans then do so now. */
if (node->as_valid_subplans == NULL)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
classify_matching_subplans(node);
/* Nothing to do if there are no valid async subplans. */
if (node->as_nasyncremain == 0)
return;
/* Make a request for each of the valid async subplans. */
i = -1;
while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
Assert(areq->request_index == i);
Assert(!areq->callback_pending);
/* Do the actual work. */
ExecAsyncRequest(areq);
}
}
/* ----------------------------------------------------------------
* ExecAppendAsyncGetNext
*
* Get the next tuple from any of the asynchronous subplans.
* ----------------------------------------------------------------
*/
static bool
ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
{
*result = NULL;
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
/* Request a tuple asynchronously. */
if (ExecAppendAsyncRequest(node, result))
return true;
while (node->as_nasyncremain > 0)
{
CHECK_FOR_INTERRUPTS();
/* Wait or poll async events. */
ExecAppendAsyncEventWait(node);
/* Request a tuple asynchronously. */
if (ExecAppendAsyncRequest(node, result))
return true;
/* Break from loop if there's any sync subplan that isn't complete. */
if (!node->as_syncdone)
break;
}
/*
* If all sync subplans are complete, we're totally done scanning the
* given node. Otherwise, we're done with the asynchronous stuff but
* must continue scanning the sync subplans.
*/
if (node->as_syncdone)
{
Assert(node->as_nasyncremain == 0);
*result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
return true;
}
return false;
}
/* ----------------------------------------------------------------
* ExecAppendAsyncRequest
*
* Request a tuple asynchronously.
* ----------------------------------------------------------------
*/
static bool
ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
{
Bitmapset *needrequest;
int i;
/* Nothing to do if there are no async subplans needing a new request. */
if (bms_is_empty(node->as_needrequest))
return false;
/*
* If there are any asynchronously-generated results that have not yet
* been returned, we have nothing to do; just return one of them.
*/
if (node->as_nasyncresults > 0)
{
--node->as_nasyncresults;
*result = node->as_asyncresults[node->as_nasyncresults];
return true;
}
/* Make a new request for each of the async subplans that need it. */
needrequest = node->as_needrequest;
node->as_needrequest = NULL;
i = -1;
while ((i = bms_next_member(needrequest, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
/* Do the actual work. */
ExecAsyncRequest(areq);
}
bms_free(needrequest);
/* Return one of the asynchronously-generated results if any. */
if (node->as_nasyncresults > 0)
{
--node->as_nasyncresults;
*result = node->as_asyncresults[node->as_nasyncresults];
return true;
}
return false;
}
/* ----------------------------------------------------------------
* ExecAppendAsyncEventWait
*
* Wait or poll for file descriptor events and fire callbacks.
* ----------------------------------------------------------------
*/
static void
ExecAppendAsyncEventWait(AppendState *node)
{
long timeout = node->as_syncdone ? -1 : 0;
WaitEvent occurred_event[EVENT_BUFFER_SIZE];
int noccurred;
int i;
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
node->as_nasyncplans + 1);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
if (areq->callback_pending)
ExecAsyncConfigureWait(areq);
}
/* Wait for at least one event to occur. */
noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
FreeWaitEventSet(node->as_eventset);
node->as_eventset = NULL;
if (noccurred == 0)
return;
/* Deliver notifications. */
for (i = 0; i < noccurred; i++)
{
WaitEvent *w = &occurred_event[i];
/*
* Each waiting subplan should have registered its wait event with
* user_data pointing back to its AsyncRequest.
*/
if ((w->events & WL_SOCKET_READABLE) != 0)
{
AsyncRequest *areq = (AsyncRequest *) w->user_data;
/*
* Mark it as no longer needing a callback. We must do this
* before dispatching the callback in case the callback resets
* the flag.
*/
Assert(areq->callback_pending);
areq->callback_pending = false;
/* Do the actual work. */
ExecAsyncNotify(areq);
}
}
}
/* ----------------------------------------------------------------
* ExecAsyncAppendResponse
*
* Receive a response from an asynchronous request we made.
* ----------------------------------------------------------------
*/
void
ExecAsyncAppendResponse(AsyncRequest *areq)
{
AppendState *node = (AppendState *) areq->requestor;
TupleTableSlot *slot = areq->result;
/* The result should be a TupleTableSlot or NULL. */
Assert(slot == NULL || IsA(slot, TupleTableSlot));
/* Nothing to do if the request is pending. */
if (!areq->request_complete)
{
/* The request would have been pending for a callback */
Assert(areq->callback_pending);
return;
}
/* If the result is NULL or an empty slot, there's nothing more to do. */
if (TupIsNull(slot))
{
/* The ending subplan wouldn't have been pending for a callback. */
Assert(!areq->callback_pending);
--node->as_nasyncremain;
return;
}
/* Save result so we can return it. */
Assert(node->as_nasyncresults < node->as_nasyncplans);
node->as_asyncresults[node->as_nasyncresults++] = slot;
/*
* Mark the subplan that returned a result as ready for a new request. We
* don't launch another one here immediately because it might complete.
*/
node->as_needrequest = bms_add_member(node->as_needrequest,
areq->request_index);
}
/* ----------------------------------------------------------------
* classify_matching_subplans
*
* Classify the node's as_valid_subplans into sync ones and
* async ones, adjust it to contain sync ones only, and save
* async ones in the node's as_valid_asyncplans.
* ----------------------------------------------------------------
*/
static void
classify_matching_subplans(AppendState *node)
{
Bitmapset *valid_asyncplans;
Assert(node->as_valid_asyncplans == NULL);
/* Nothing to do if there are no valid subplans. */
if (bms_is_empty(node->as_valid_subplans))
{
node->as_syncdone = true;
node->as_nasyncremain = 0;
return;
}
/* Nothing to do if there are no valid async subplans. */
if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
{
node->as_nasyncremain = 0;
return;
}
/* Get valid async subplans. */
valid_asyncplans = bms_copy(node->as_asyncplans);
valid_asyncplans = bms_int_members(valid_asyncplans,
node->as_valid_subplans);
/* Adjust the valid subplans to contain sync subplans only. */
node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
valid_asyncplans);
node->as_syncdone = bms_is_empty(node->as_valid_subplans);
/* Save valid async subplans. */
node->as_valid_asyncplans = valid_asyncplans;
node->as_nasyncremain = bms_num_members(valid_asyncplans);
}

View File

@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
/* ----------------------------------------------------------------
* ExecAsyncForeignScanRequest
*
* Asynchronously request a tuple from a designed async-capable node
* ----------------------------------------------------------------
*/
void
ExecAsyncForeignScanRequest(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
FdwRoutine *fdwroutine = node->fdwroutine;
Assert(fdwroutine->ForeignAsyncRequest != NULL);
fdwroutine->ForeignAsyncRequest(areq);
}
/* ----------------------------------------------------------------
* ExecAsyncForeignScanConfigureWait
*
* In async mode, configure for a wait
* ----------------------------------------------------------------
*/
void
ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
FdwRoutine *fdwroutine = node->fdwroutine;
Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
fdwroutine->ForeignAsyncConfigureWait(areq);
}
/* ----------------------------------------------------------------
* ExecAsyncForeignScanNotify
*
* Callback invoked when a relevant event has occurred
* ----------------------------------------------------------------
*/
void
ExecAsyncForeignScanNotify(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
FdwRoutine *fdwroutine = node->fdwroutine;
Assert(fdwroutine->ForeignAsyncNotify != NULL);
fdwroutine->ForeignAsyncNotify(areq);
}

View File

@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
@ -241,6 +242,7 @@ _copyAppend(const Append *from)
*/
COPY_BITMAPSET_FIELD(apprelids);
COPY_NODE_FIELD(appendplans);
COPY_SCALAR_FIELD(nasyncplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);

View File

@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_BITMAPSET_FIELD(apprelids);
WRITE_NODE_FIELD(appendplans);
WRITE_INT_FIELD(nasyncplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
}

View File

@ -1615,6 +1615,7 @@ ReadCommonPlan(Plan *local_node)
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
@ -1711,6 +1712,7 @@ _readAppend(void)
READ_BITMAPSET_FIELD(apprelids);
READ_NODE_FIELD(appendplans);
READ_INT_FIELD(nasyncplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);

View File

@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false;
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
bool enable_async_append = true;
typedef struct
{

View File

@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
static bool is_async_capable_path(Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@ -1080,6 +1081,31 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
return plan;
}
/*
* is_async_capable_path
* Check whether a given Path node is async-capable.
*/
static bool
is_async_capable_path(Path *path)
{
switch (nodeTag(path))
{
case T_ForeignPath:
{
FdwRoutine *fdwroutine = path->parent->fdwroutine;
Assert(fdwroutine != NULL);
if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
return true;
}
break;
default:
break;
}
return false;
}
/*
* create_append_plan
* Create an Append plan for 'best_path' and (recursively) plans
@ -1097,6 +1123,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
ListCell *subpaths;
int nasyncplans = 0;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
int nodenumsortkeys = 0;
@ -1104,6 +1131,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
bool consider_async = false;
/*
* The subpaths list could be empty, if every child was proven empty by
@ -1167,6 +1195,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
}
/* If appropriate, consider async append */
consider_async = (enable_async_append && pathkeys == NIL &&
!best_path->path.parallel_safe &&
list_length(best_path->subpaths) > 1);
/* Build the plan for each child */
foreach(subpaths, best_path->subpaths)
{
@ -1234,6 +1267,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
subplans = lappend(subplans, subplan);
/* Check to see if subplan can be executed asynchronously */
if (consider_async && is_async_capable_path(subpath))
{
subplan->async_capable = true;
++nasyncplans;
}
}
/*
@ -1266,6 +1306,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
plan->appendplans = subplans;
plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;

View File

@ -3995,6 +3995,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
case WAIT_EVENT_APPEND_READY:
event_name = "AppendReady";
break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;

View File

@ -2020,6 +2020,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
}
#endif
/*
* Get the number of wait events registered in a given WaitEventSet.
*/
int
GetNumRegisteredWaitEvents(WaitEventSet *set)
{
return set->nevents;
}
#if defined(WAIT_USE_POLL)
/*

View File

@ -1128,6 +1128,16 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
{
{"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of async append plans."),
NULL,
GUC_EXPLAIN
},
&enable_async_append,
true,
NULL, NULL, NULL
},
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),

View File

@ -371,6 +371,7 @@
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
#enable_async_append = on
# - Planner Cost Constants -

View File

@ -0,0 +1,25 @@
/*-------------------------------------------------------------------------
* execAsync.h
* Support functions for asynchronous execution
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/include/executor/execAsync.h
*-------------------------------------------------------------------------
*/
#ifndef EXECASYNC_H
#define EXECASYNC_H
#include "nodes/execnodes.h"
extern void ExecAsyncRequest(AsyncRequest *areq);
extern void ExecAsyncConfigureWait(AsyncRequest *areq);
extern void ExecAsyncNotify(AsyncRequest *areq);
extern void ExecAsyncResponse(AsyncRequest *areq);
extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
extern void ExecAsyncRequestPending(AsyncRequest *areq);
#endif /* EXECASYNC_H */

View File

@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
extern void ExecAsyncAppendResponse(AsyncRequest *areq);
#endif /* NODEAPPEND_H */

View File

@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
#endif /* NODEFOREIGNSCAN_H */

View File

@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@ -256,6 +264,12 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
/* Support functions for asynchronous execution */
IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
ForeignAsyncRequest_function ForeignAsyncRequest;
ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
ForeignAsyncNotify_function ForeignAsyncNotify;
} FdwRoutine;

View File

@ -515,6 +515,22 @@ typedef struct ResultRelInfo
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
} ResultRelInfo;
/* ----------------
* AsyncRequest
*
* State for an asynchronous tuple request.
* ----------------
*/
typedef struct AsyncRequest
{
struct PlanState *requestor; /* Node that wants a tuple */
struct PlanState *requestee; /* Node from which a tuple is wanted */
int request_index; /* Scratch space for requestor */
bool callback_pending; /* Callback is needed */
bool request_complete; /* Request complete, result valid */
TupleTableSlot *result; /* Result (NULL if no more tuples) */
} AsyncRequest;
/* ----------------
* EState information
*
@ -1199,12 +1215,12 @@ typedef struct ModifyTableState
* AppendState information
*
* nplans how many plans are in the array
* whichplan which plan is being executed (0 .. n-1), or a
* special negative value. See nodeAppend.c.
* whichplan which synchronous plan is being executed (0 .. n-1)
* or a special negative value. See nodeAppend.c.
* prune_state details required to allow partitions to be
* eliminated from the scan, or NULL if not possible.
* valid_subplans for runtime pruning, valid appendplans indexes to
* scan.
* valid_subplans for runtime pruning, valid synchronous appendplans
* indexes to scan.
* ----------------
*/
@ -1220,12 +1236,25 @@ struct AppendState
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
bool as_begun; /* false means need to initialize */
Bitmapset *as_asyncplans; /* asynchronous plans indexes */
int as_nasyncplans; /* # of asynchronous plans */
AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
int as_nasyncresults; /* # of valid entries in as_asyncresults */
bool as_syncdone; /* true if all synchronous plans done in
* asynchronous mode, else false */
int as_nasyncremain; /* # of remaining asynchronous plans */
Bitmapset *as_needrequest; /* asynchronous plans needing a new request */
struct WaitEventSet *as_eventset; /* WaitEventSet used to configure
* file descriptor wait events */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */
bool (*choose_next_subplan) (AppendState *);
};

View File

@ -129,6 +129,11 @@ typedef struct Plan
bool parallel_aware; /* engage parallel-aware logic? */
bool parallel_safe; /* OK to use as part of parallel plan? */
/*
* information needed for asynchronous execution
*/
bool async_capable; /* engage asynchronous-capable logic? */
/*
* Common structural data for all Plan types.
*/
@ -245,6 +250,7 @@ typedef struct Append
Plan plan;
Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
List *appendplans;
int nasyncplans; /* # of asynchronous plans */
/*
* All 'appendplans' preceding this index are non-partial plans. All

View File

@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
extern PGDLLIMPORT bool enable_parallel_append;
extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
extern PGDLLIMPORT bool enable_async_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,

View File

@ -966,7 +966,8 @@ typedef enum
*/
typedef enum
{
WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
WAIT_EVENT_APPEND_READY = PG_WAIT_IPC,
WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,

View File

@ -179,5 +179,6 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
extern void InitializeLatchWaitSet(void);
extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
#endif /* LATCH_H */

View File

@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
"Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8
<Plan> +
<Node-Type>Seq Scan</Node-Type> +
<Parallel-Aware>false</Parallel-Aware> +
<Async-Capable>false</Async-Capable> +
<Relation-Name>int8_tbl</Relation-Name> +
<Alias>i8</Alias> +
<Startup-Cost>N.N</Startup-Cost> +
@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
Async Capable: false +
Relation Name: "int8_tbl"+
Alias: "i8" +
Startup Cost: N.N +
@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8'
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
"Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@ -346,6 +350,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
"Async Capable": false, +
"Relation Name": "tenk1", +
"Parallel Aware": true, +
"Local Hit Blocks": 0, +
@ -391,6 +396,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
"Async Capable": false, +
"Parallel Aware": false, +
"Sort Space Used": 0, +
"Local Hit Blocks": 0, +
@ -433,6 +439,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
"Async Capable": false, +
"Parallel Aware": false, +
"Workers Planned": 0, +
"Local Hit Blocks": 0, +

View File

@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 55, +
"Actual Loops": 1, +
"Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
@ -760,6 +761,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 70, +
"Actual Loops": 1, +
"Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +

View File

@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
"Node Type": "ModifyTable", +
"Operation": "Insert", +
"Parallel Aware": false, +
"Async Capable": false, +
"Relation Name": "insertconflicttest", +
"Alias": "insertconflicttest", +
"Conflict Resolution": "UPDATE", +
@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
{ +
"Node Type": "Result", +
"Parent Relationship": "Member", +
"Parallel Aware": false +
"Parallel Aware": false, +
"Async Capable": false +
} +
] +
} +

View File

@ -95,6 +95,7 @@ select count(*) = 0 as ok from pg_stat_wal_receiver;
select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
enable_async_append | on
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
@ -113,7 +114,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
(18 rows)
(19 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail