From 27e1f14563cf982f1f4d71e21ef247866662a052 Mon Sep 17 00:00:00 2001 From: Etsuro Fujita Date: Wed, 31 Mar 2021 18:45:00 +0900 Subject: [PATCH] Add support for asynchronous execution. This implements asynchronous execution, which runs multiple parts of a non-parallel-aware Append concurrently rather than serially to improve performance when possible. Currently, the only node type that can be run concurrently is a ForeignScan that is an immediate child of such an Append. In the case where such ForeignScans access data on different remote servers, this would run those ForeignScans concurrently, and overlap the remote operations to be performed simultaneously, so it'll improve the performance especially when the operations involve time-consuming ones such as remote join and remote aggregation. We may extend this to other node types such as joins or aggregates over ForeignScans in the future. This also adds the support for postgres_fdw, which is enabled by the table-level/server-level option "async_capable". The default is false. Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself. This commit is mostly based on the patch proposed by Robert Haas, but also uses stuff from the patch proposed by Kyotaro Horiguchi and from the patch proposed by Thomas Munro. Reviewed by Kyotaro Horiguchi, Konstantin Knizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, and others. Discussion: https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com Discussion: https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com Discussion: https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com --- contrib/postgres_fdw/connection.c | 26 +- .../postgres_fdw/expected/postgres_fdw.out | 509 +++++++++++++++++- contrib/postgres_fdw/option.c | 6 +- contrib/postgres_fdw/postgres_fdw.c | 374 +++++++++++-- contrib/postgres_fdw/postgres_fdw.h | 17 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 195 +++++++ doc/src/sgml/config.sgml | 14 + doc/src/sgml/fdwhandler.sgml | 90 ++++ doc/src/sgml/monitoring.sgml | 5 + doc/src/sgml/postgres-fdw.sgml | 28 + src/backend/commands/explain.c | 3 + src/backend/executor/Makefile | 1 + src/backend/executor/README | 40 ++ src/backend/executor/execAmi.c | 4 + src/backend/executor/execAsync.c | 124 +++++ src/backend/executor/nodeAppend.c | 461 +++++++++++++++- src/backend/executor/nodeForeignscan.c | 48 ++ src/backend/nodes/copyfuncs.c | 2 + src/backend/nodes/outfuncs.c | 2 + src/backend/nodes/readfuncs.c | 2 + src/backend/optimizer/path/costsize.c | 1 + src/backend/optimizer/plan/createplan.c | 41 ++ src/backend/postmaster/pgstat.c | 3 + src/backend/storage/ipc/latch.c | 9 + src/backend/utils/misc/guc.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/executor/execAsync.h | 25 + src/include/executor/nodeAppend.h | 2 + src/include/executor/nodeForeignscan.h | 4 + src/include/foreign/fdwapi.h | 14 + src/include/nodes/execnodes.h | 37 +- src/include/nodes/plannodes.h | 6 + src/include/optimizer/cost.h | 1 + src/include/pgstat.h | 3 +- src/include/storage/latch.h | 1 + src/test/regress/expected/explain.out | 7 + .../regress/expected/incremental_sort.out | 2 + src/test/regress/expected/insert_conflict.out | 4 +- src/test/regress/expected/sysviews.out | 3 +- 39 files changed, 2068 insertions(+), 57 deletions(-) create mode 100644 src/backend/executor/execAsync.c create mode 100644 src/include/executor/execAsync.h diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ee0b4acf0b..54ab8edfab 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -62,6 +62,7 @@ typedef struct ConnCacheEntry Oid serverid; /* foreign server OID used to get server name */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid); * will_prep_stmt must be true if caller intends to create any prepared * statements. Since those don't go away automatically at transaction end * (not even on error), we need this flag to cue manual cleanup. + * + * If state is not NULL, *state receives the per-connection state associated + * with the PGconn. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; bool retry = false; @@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt) */ PG_TRY(); { + /* Process a pending asynchronous request if any. */ + if (entry->state.pendingAreq) + process_pending_request(entry->state.pendingAreq); /* Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); } @@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } @@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); @@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn) * Caller is responsible for the error handling on the result. */ PGresult * -pgfdw_exec_query(PGconn *conn, const char *query) +pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) { + /* First, process a pending asynchronous request, if any. */ + if (state && state->pendingAreq) + process_pending_request(state->pendingAreq); + /* * Submit a query. Since we don't use non-blocking mode, this also can * block. But its risk is relatively small, so we ignore that for now. @@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) { entry->have_prep_stmt = false; entry->have_error = false; + /* Also reset per-connection state */ + memset(&entry->state, 0, sizeof(entry->state)); } /* Disarm changing_xact_state if it all worked. */ @@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) * Cancel the currently-in-progress query (whose query text we do not have) * and ignore the result. Returns true if we successfully cancel the query * and discard any pending result, and false if not. + * + * XXX: if the query was one sent by fetch_more_data_begin(), we could get the + * query text from the pendingAreq saved in the per-connection state, then + * report the query using it. */ static bool pgfdw_cancel_query(PGconn *conn) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index f2c91c4782..f61e59cd20 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8946,7 +8946,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -9437,3 +9437,510 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test; -- Clean up DROP TABLE batch_table, batch_cp_upd_test CASCADE; +-- =================================================================== +-- test asynchronous execution +-- =================================================================== +ALTER SERVER loopback OPTIONS (DROP extensions); +ALTER SERVER loopback OPTIONS (ADD async_capable 'true'); +ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true'); +CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a); +CREATE TABLE base_tbl1 (a int, b int, c text); +CREATE TABLE base_tbl2 (a int, b int, c text); +CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000) + SERVER loopback OPTIONS (table_name 'base_tbl1'); +CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000) + SERVER loopback2 OPTIONS (table_name 'base_tbl2'); +INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; +-- simple queries +CREATE TABLE result_tbl (a int, b int, c text); +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; + QUERY PLAN +---------------------------------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) +(8 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1000 | 0 | 0000 + 1100 | 100 | 0100 + 1200 | 200 | 0200 + 1300 | 300 | 0300 + 1400 | 400 | 0400 + 1500 | 500 | 0500 + 1600 | 600 | 0600 + 1700 | 700 | 0700 + 1800 | 800 | 0800 + 1900 | 900 | 0900 + 2000 | 0 | 0000 + 2100 | 100 | 0100 + 2200 | 200 | 0200 + 2300 | 300 | 0300 + 2400 | 400 | 0400 + 2500 | 500 | 0500 + 2600 | 600 | 0600 + 2700 | 700 | 0700 + 2800 | 800 | 0800 + 2900 | 900 | 0900 +(20 rows) + +DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + QUERY PLAN +---------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 +(10 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 +(2 rows) + +DELETE FROM result_tbl; +-- Check case where multiple partitions use the same connection +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000) + SERVER loopback2 OPTIONS (table_name 'base_tbl3'); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + QUERY PLAN +---------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl3 +(14 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + +DELETE FROM result_tbl; +DROP FOREIGN TABLE async_p3; +DROP TABLE base_tbl3; +-- Check case where the partitioned table has local/remote partitions +CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + QUERY PLAN +---------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) +(13 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + +DELETE FROM result_tbl; +-- partitionwise joins +SET enable_partitionwise_join TO true; +CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text); +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Insert on public.join_tbl + -> Append + -> Async Foreign Scan + Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c + Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1) + Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0)))) + -> Async Foreign Scan + Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c + Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2) + Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0)))) + -> Hash Join + Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c + Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b)) + -> Seq Scan on public.async_p3 t2_3 + Output: t2_3.a, t2_3.b, t2_3.c + -> Hash + Output: t1_3.a, t1_3.b, t1_3.c + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: ((t1_3.b % 100) = 0) +(20 rows) + +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +SELECT * FROM join_tbl ORDER BY a1; + a1 | b1 | c1 | a2 | b2 | c2 +------+-----+------+------+-----+------ + 1000 | 0 | 0000 | 1000 | 0 | 0000 + 1100 | 100 | 0100 | 1100 | 100 | 0100 + 1200 | 200 | 0200 | 1200 | 200 | 0200 + 1300 | 300 | 0300 | 1300 | 300 | 0300 + 1400 | 400 | 0400 | 1400 | 400 | 0400 + 1500 | 500 | 0500 | 1500 | 500 | 0500 + 1600 | 600 | 0600 | 1600 | 600 | 0600 + 1700 | 700 | 0700 | 1700 | 700 | 0700 + 1800 | 800 | 0800 | 1800 | 800 | 0800 + 1900 | 900 | 0900 | 1900 | 900 | 0900 + 2000 | 0 | 0000 | 2000 | 0 | 0000 + 2100 | 100 | 0100 | 2100 | 100 | 0100 + 2200 | 200 | 0200 | 2200 | 200 | 0200 + 2300 | 300 | 0300 | 2300 | 300 | 0300 + 2400 | 400 | 0400 | 2400 | 400 | 0400 + 2500 | 500 | 0500 | 2500 | 500 | 0500 + 2600 | 600 | 0600 | 2600 | 600 | 0600 + 2700 | 700 | 0700 | 2700 | 700 | 0700 + 2800 | 800 | 0800 | 2800 | 800 | 0800 + 2900 | 900 | 0900 | 2900 | 900 | 0900 + 3000 | 0 | 0000 | 3000 | 0 | 0000 + 3100 | 100 | 0100 | 3100 | 100 | 0100 + 3200 | 200 | 0200 | 3200 | 200 | 0200 + 3300 | 300 | 0300 | 3300 | 300 | 0300 + 3400 | 400 | 0400 | 3400 | 400 | 0400 + 3500 | 500 | 0500 | 3500 | 500 | 0500 + 3600 | 600 | 0600 | 3600 | 600 | 0600 + 3700 | 700 | 0700 | 3700 | 700 | 0700 + 3800 | 800 | 0800 | 3800 | 800 | 0800 + 3900 | 900 | 0900 | 3900 | 900 | 0900 +(30 rows) + +DELETE FROM join_tbl; +RESET enable_partitionwise_join; +-- Test interaction of async execution with plan-time partition pruning +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 3000; + QUERY PLAN +----------------------------------------------------------------------------- + Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000)) +(7 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 2000; + QUERY PLAN +----------------------------------------------------------------------- + Foreign Scan on public.async_p1 async_pt + Output: async_pt.a, async_pt.b, async_pt.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000)) +(3 rows) + +-- Test interaction of async execution with run-time partition pruning +SET plan_cache_mode TO force_generic_plan; +PREPARE async_pt_query (int, int) AS + INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2; +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (3000, 505); + QUERY PLAN +------------------------------------------------------------------------------------------ + Insert on public.result_tbl + -> Append + Subplans Removed: 1 + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer)) +(11 rows) + +EXECUTE async_pt_query (3000, 505); +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 +(2 rows) + +DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (2000, 505); + QUERY PLAN +------------------------------------------------------------------------------------------ + Insert on public.result_tbl + -> Append + Subplans Removed: 2 + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) +(7 rows) + +EXECUTE async_pt_query (2000, 505); +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 +(1 row) + +DELETE FROM result_tbl; +RESET plan_cache_mode; +CREATE TABLE local_tbl(a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar'); +ANALYZE local_tbl; +CREATE INDEX base_tbl1_idx ON base_tbl1 (a); +CREATE INDEX base_tbl2_idx ON base_tbl2 (a); +CREATE INDEX async_p3_idx ON async_p3 (a); +ANALYZE base_tbl1; +ANALYZE base_tbl2; +ANALYZE async_p3; +ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true'); +ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true'); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + QUERY PLAN +------------------------------------------------------------------------------------------ + Nested Loop + Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c + -> Seq Scan on public.local_tbl + Output: local_tbl.a, local_tbl.b, local_tbl.c + Filter: (local_tbl.c = 'bar'::text) + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a)) + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (local_tbl.a = async_pt_3.a) +(15 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + QUERY PLAN +------------------------------------------------------------------------------- + Nested Loop (actual rows=1 loops=1) + -> Seq Scan on local_tbl (actual rows=1 loops=1) + Filter: (c = 'bar'::text) + Rows Removed by Filter: 1 + -> Append (actual rows=1 loops=1) + -> Async Foreign Scan on async_p1 async_pt_1 (never executed) + -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1) + -> Seq Scan on async_p3 async_pt_3 (never executed) + Filter: (local_tbl.a = a) +(9 rows) + +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + a | b | c | a | b | c +------+-----+-----+------+-----+------ + 2505 | 505 | bar | 2505 | 505 | 0505 +(1 row) + +ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate); +ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate); +DROP TABLE local_tbl; +DROP INDEX base_tbl1_idx; +DROP INDEX base_tbl2_idx; +DROP INDEX async_p3_idx; +-- Test that pending requests are processed properly +SET enable_mergejoin TO false; +SET enable_hashjoin TO false; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; + QUERY PLAN +---------------------------------------------------------------- + Nested Loop + Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c + Join Filter: (t1.a = t2.a) + -> Append + -> Async Foreign Scan on public.async_p1 t1_1 + Output: t1_1.a, t1_1.b, t1_1.c + Filter: (t1_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 t1_2 + Output: t1_2.a, t1_2.b, t1_2.c + Filter: (t1_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: (t1_3.b === 505) + -> Materialize + Output: t2.a, t2.b, t2.c + -> Foreign Scan on public.async_p2 t2 + Output: t2.a, t2.b, t2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 +(20 rows) + +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; + a | b | c | a | b | c +------+-----+------+------+-----+------ + 2505 | 505 | 0505 | 2505 | 505 | 0505 +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + QUERY PLAN +---------------------------------------------------------------- + Limit + Output: t1.a, t1.b, t1.c + -> Append + -> Async Foreign Scan on public.async_p1 t1_1 + Output: t1_1.a, t1_1.b, t1_1.c + Filter: (t1_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 t1_2 + Output: t1_2.a, t1_2.b, t1_2.c + Filter: (t1_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: (t1_3.b === 505) +(14 rows) + +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + a | b | c +------+-----+------ + 3505 | 505 | 0505 +(1 row) + +-- Check with foreign modify +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl3'); +INSERT INTO remote_tbl VALUES (2505, 505, 'bar'); +CREATE TABLE base_tbl4 (a int, b int, c text); +CREATE FOREIGN TABLE insert_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl4'); +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); + QUERY PLAN +------------------------------------------------------------------------- + Insert on public.insert_tbl + Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3) + Batch Size: 1 + -> Append + -> Seq Scan on public.local_tbl + Output: local_tbl.a, local_tbl.b, local_tbl.c + -> Async Foreign Scan on public.remote_tbl + Output: remote_tbl.a, remote_tbl.b, remote_tbl.c + Remote SQL: SELECT a, b, c FROM public.base_tbl3 +(9 rows) + +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); +SELECT * FROM insert_tbl ORDER BY a; + a | b | c +------+-----+----- + 1505 | 505 | foo + 2505 | 505 | bar +(2 rows) + +-- Check with direct modify +EXPLAIN (VERBOSE, COSTS OFF) +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; + QUERY PLAN +---------------------------------------------------------------------------------------- + Insert on public.join_tbl + CTE t + -> Update on public.remote_tbl + Output: remote_tbl.a, remote_tbl.b, remote_tbl.c + -> Foreign Update on public.remote_tbl + Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c + -> Nested Loop Left Join + Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c + Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b)) + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) + -> CTE Scan on t + Output: t.a, t.b, t.c +(23 rows) + +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; +SELECT * FROM join_tbl ORDER BY a1; + a1 | b1 | c1 | a2 | b2 | c2 +------+-----+------+------+-----+-------- + 1505 | 505 | 0505 | | | + 2505 | 505 | 0505 | 2505 | 505 | barbar + 3505 | 505 | 0505 | | | +(3 rows) + +DELETE FROM join_tbl; +RESET enable_mergejoin; +RESET enable_hashjoin; +-- Clean up +DROP TABLE async_pt; +DROP TABLE base_tbl1; +DROP TABLE base_tbl2; +DROP TABLE result_tbl; +DROP TABLE local_tbl; +DROP FOREIGN TABLE remote_tbl; +DROP FOREIGN TABLE insert_tbl; +DROP TABLE base_tbl3; +DROP TABLE base_tbl4; +DROP TABLE join_tbl; +ALTER SERVER loopback OPTIONS (DROP async_capable); +ALTER SERVER loopback2 OPTIONS (DROP async_capable); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 64698c4da3..530d7a66d4 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) * Validate option value, when we can do so without any context. */ if (strcmp(def->defname, "use_remote_estimate") == 0 || - strcmp(def->defname, "updatable") == 0) + strcmp(def->defname, "updatable") == 0 || + strcmp(def->defname, "async_capable") == 0) { /* these accept only boolean values */ (void) defGetBoolean(def); @@ -217,6 +218,9 @@ InitPgFdwOptions(void) /* batch_size is available on both server and table */ {"batch_size", ForeignServerRelationId, false}, {"batch_size", ForeignTableRelationId, false}, + /* async_capable is available on both server and table */ + {"async_capable", ForeignServerRelationId, false}, + {"async_capable", ForeignTableRelationId, false}, {"password_required", UserMappingRelationId, false}, /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 20b25935ce..cc73a6902f 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -37,6 +38,7 @@ #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" @@ -143,6 +145,7 @@ typedef struct PgFdwScanState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -159,6 +162,9 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for asynchronous execution */ + bool async_capable; /* engage asynchronous-capable logic? */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -176,6 +182,7 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState /* for remote query execution */ PGconn *conn; /* connection for the update */ + PgFdwConnState *conn_state; /* extra per-connection state */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ @@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncRequest(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); /* * Helper functions @@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, void *arg); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, @@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); +static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); +static void fetch_more_data_begin(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + PG_RETURN_POINTER(routine); } @@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root, /* * Extract user-settable option values. Note that per-table settings of - * use_remote_estimate and fetch_size override per-server settings of - * them, respectively. + * use_remote_estimate, fetch_size and async_capable override per-server + * settings of them, respectively. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; + fpinfo->async_capable = false; apply_server_options(fpinfo); apply_table_options(fpinfo); @@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + + /* Set the async-capable flag */ + fsstate->async_capable = node->ss.ps.plan->async_capable; } /* @@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node) TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. + * In sync mode, if this is the first call after Begin or ReScan, we need + * to create the cursor on the remote side. In async mode, we would have + * already created the cursor before we get here, even if this is the + * first call after Begin or ReScan. */ if (!fsstate->cursor_exists) create_cursor(node); @@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node) */ if (fsstate->next_tuple >= fsstate->num_tuples) { + /* In async mode, just clear tuple slot. */ + if (fsstate->async_capable) + return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); @@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->conn, fsstate->cursor_number, + fsstate->conn_state); /* Release remote connection */ ReleaseConnection(fsstate->conn); @@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, &dmstate->conn_state); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node) StringInfoData buf; PGresult *res; + /* First, process a pending asynchronous request, if any. */ + if (fsstate->conn_state->pendingAreq) + process_pending_request(fsstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a @@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node) PG_TRY(); { PGconn *conn = fsstate->conn; - char sql[64]; int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (fsstate->async_capable) + { + Assert(fsstate->conn_state->pendingAreq); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = pgfdw_get_result(conn, fsstate->query); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + + /* Reset per-connection state */ + fsstate->conn_state->pendingAreq = NULL; + } + else + { + char sql[64]; + + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql, fsstate->conn_state); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state) { char sql[64]; PGresult *res; @@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, &fmstate->conn_state); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + /* First, process a pending asynchronous request, if any. */ + if (fmstate->conn_state->pendingAreq) + process_pending_request(fmstate->conn_state->pendingAreq); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) char *p_name; PGresult *res; + /* + * The caller would already have processed a pending asynchronous request + * if any, so no need to do it here. + */ + /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); @@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node) int numParams = dmstate->numParams; const char **values = dmstate->param_values; + /* First, process a pending asynchronous request, if any. */ + if (dmstate->conn_state->pendingAreq) + process_pending_request(dmstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. */ @@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, int fetch_size; ListCell *lc; - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, */ /* Fetch some rows */ - res = pgfdw_exec_query(conn, fetch_sql); + res = pgfdw_exec_query(conn, fetch_sql, NULL); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, } /* Close the cursor, just to be tidy. */ - close_cursor(conn, cursor_number); + close_cursor(conn, cursor_number, NULL); } PG_CATCH(); { @@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) ExtractExtensionList(defGetString(def), false); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; + fpinfo->async_capable = fpinfo_o->async_capable; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + + /* + * We'll prefer to consider this join async-capable if any table from + * either side of the join is considered async-capable. + */ + fpinfo->async_capable = fpinfo_o->async_capable || + fpinfo_i->async_capable; } } @@ -6489,6 +6571,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, add_path(final_rel, (Path *) final_path); } +/* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + RelOptInfo *rel = ((Path *) path)->parent; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + + return fpinfo->async_capable; +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + produce_tuple_asynchronously(areq, true); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + /* This should not be called unless callback_pending */ + Assert(areq->callback_pending); + + /* The core code would have registered postmaster death event */ + Assert(GetNumRegisteredWaitEvents(set) >= 1); + + /* Begin an asynchronous data fetch if not already done */ + if (!pendingAreq) + fetch_more_data_begin(areq); + else if (pendingAreq->requestor != areq->requestor) + { + /* + * This is the case when the in-process request was made by another + * Append. Note that it might be useless to process the request, + * because the query might not need tuples from that Append anymore. + * Skip the given request if there are any configured events other + * than the postmaster death event; otherwise process the request, + * then begin a fetch to configure the event below, because otherwise + * we might end up with no configured events other than the postmaster + * death event. + */ + if (GetNumRegisteredWaitEvents(set) > 1) + return; + process_pending_request(pendingAreq); + fetch_more_data_begin(areq); + } + else if (pendingAreq->requestee != areq->requestee) + { + /* + * This is the case when the in-process request was made by the same + * parent but for a different child. Since we configure only the + * event for the request made for that child, skip the given request. + */ + return; + } + else + Assert(pendingAreq == areq); + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch some more tuples from a file descriptor that becomes ready, + * requesting next tuple. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + /* On error, report the original query, not the FETCH. */ + if (!PQconsumeInput(fsstate->conn)) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + fetch_more_data(node); + + produce_tuple_asynchronously(areq, true); +} + +/* + * Asynchronously produce next tuple from a foreign PostgreSQL table. + */ +static void +produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + TupleTableSlot *result; + + /* This should not be called if the request is currently in-process */ + Assert(areq != pendingAreq); + + /* Fetch some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } + return; + } + + /* Get a tuple from the ForeignScan node */ + result = ExecProcNode((PlanState *) node); + if (!TupIsNull(result)) + { + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Fetch some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } +} + +/* + * Begin an asynchronous data fetch. + * + * Note: fetch_more_data must be called to fetch the result. + */ +static void +fetch_more_data_begin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + + Assert(!fsstate->conn_state->pendingAreq); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(fsstate->conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + /* Remember that the request is in process */ + fsstate->conn_state->pendingAreq = areq; +} + +/* + * Process a pending asynchronous request. + */ +void +process_pending_request(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + EState *estate = node->ss.ps.state; + MemoryContext oldcontext; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + + fetch_more_data(node); + + /* We need to send a new query afterwards; don't fetch */ + produce_tuple_asynchronously(areq, false); + + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + + MemoryContextSwitchTo(oldcontext); +} + /* * Create a tuple from the specified row of the PGresult. * diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 1f67b4d9fd..88d94da6f6 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -16,6 +16,7 @@ #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "nodes/execnodes.h" #include "nodes/pathnodes.h" #include "utils/relcache.h" @@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo Cost fdw_startup_cost; Cost fdw_tuple_cost; List *shippable_extensions; /* OIDs of shippable extensions */ + bool async_capable; /* Cached catalog information. */ ForeignTable *table; @@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo int relation_index; } PgFdwRelationInfo; +/* + * Extra control information relating to a connection. + */ +typedef struct PgFdwConnState +{ + AsyncRequest *pendingAreq; /* pending async request */ +} PgFdwConnState; + /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); +extern void process_pending_request(AsyncRequest *areq); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); -extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); +extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, + PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e9b30517a5..806a5bca28 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2928,3 +2928,198 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test; -- Clean up DROP TABLE batch_table, batch_cp_upd_test CASCADE; + +-- =================================================================== +-- test asynchronous execution +-- =================================================================== + +ALTER SERVER loopback OPTIONS (DROP extensions); +ALTER SERVER loopback OPTIONS (ADD async_capable 'true'); +ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true'); + +CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a); +CREATE TABLE base_tbl1 (a int, b int, c text); +CREATE TABLE base_tbl2 (a int, b int, c text); +CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000) + SERVER loopback OPTIONS (table_name 'base_tbl1'); +CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000) + SERVER loopback2 OPTIONS (table_name 'base_tbl2'); +INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; + +-- simple queries +CREATE TABLE result_tbl (a int, b int, c text); + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +-- Check case where multiple partitions use the same connection +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000) + SERVER loopback2 OPTIONS (table_name 'base_tbl3'); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +DROP FOREIGN TABLE async_p3; +DROP TABLE base_tbl3; + +-- Check case where the partitioned table has local/remote partitions +CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +-- partitionwise joins +SET enable_partitionwise_join TO true; + +CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text); + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + +SELECT * FROM join_tbl ORDER BY a1; +DELETE FROM join_tbl; + +RESET enable_partitionwise_join; + +-- Test interaction of async execution with plan-time partition pruning +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 3000; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 2000; + +-- Test interaction of async execution with run-time partition pruning +SET plan_cache_mode TO force_generic_plan; + +PREPARE async_pt_query (int, int) AS + INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2; + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (3000, 505); +EXECUTE async_pt_query (3000, 505); + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (2000, 505); +EXECUTE async_pt_query (2000, 505); + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +RESET plan_cache_mode; + +CREATE TABLE local_tbl(a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar'); +ANALYZE local_tbl; + +CREATE INDEX base_tbl1_idx ON base_tbl1 (a); +CREATE INDEX base_tbl2_idx ON base_tbl2 (a); +CREATE INDEX async_p3_idx ON async_p3 (a); +ANALYZE base_tbl1; +ANALYZE base_tbl2; +ANALYZE async_p3; + +ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true'); +ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true'); + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + +ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate); +ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate); + +DROP TABLE local_tbl; +DROP INDEX base_tbl1_idx; +DROP INDEX base_tbl2_idx; +DROP INDEX async_p3_idx; + +-- Test that pending requests are processed properly +SET enable_mergejoin TO false; +SET enable_hashjoin TO false; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + +-- Check with foreign modify +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); + +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl3'); +INSERT INTO remote_tbl VALUES (2505, 505, 'bar'); + +CREATE TABLE base_tbl4 (a int, b int, c text); +CREATE FOREIGN TABLE insert_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl4'); + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); + +SELECT * FROM insert_tbl ORDER BY a; + +-- Check with direct modify +EXPLAIN (VERBOSE, COSTS OFF) +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; + +SELECT * FROM join_tbl ORDER BY a1; +DELETE FROM join_tbl; + +RESET enable_mergejoin; +RESET enable_hashjoin; + +-- Clean up +DROP TABLE async_pt; +DROP TABLE base_tbl1; +DROP TABLE base_tbl2; +DROP TABLE result_tbl; +DROP TABLE local_tbl; +DROP FOREIGN TABLE remote_tbl; +DROP FOREIGN TABLE insert_tbl; +DROP TABLE base_tbl3; +DROP TABLE base_tbl4; +DROP TABLE join_tbl; + +ALTER SERVER loopback OPTIONS (DROP async_capable); +ALTER SERVER loopback2 OPTIONS (DROP async_capable); diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index ddc6d789d8..701cb65cc7 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4787,6 +4787,20 @@ ANY num_sync ( + enable_async_append (boolean) + + enable_async_append configuration parameter + + + + + Enables or disables the query planner's use of async-aware + append plan types. The default is on. + + + + enable_bitmapscan (boolean) diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 04bc052ee8..635c9ec559 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -1483,6 +1483,96 @@ ShutdownForeignScan(ForeignScanState *node); + + FDW Routines for Asynchronous Execution + + A ForeignScan node can, optionally, support + asynchronous execution as described in + src/backend/executor/README. The following + functions are all optional, but are all required if asynchronous + execution is to be supported. + + + + +bool +IsForeignPathAsyncCapable(ForeignPath *path); + + Test whether a given ForeignPath path can scan + the underlying foreign relation asynchronously. + This function will only be called at the end of query planning when the + given path is a direct child of an AppendPath + path and when the planner believes that asynchronous execution improves + performance, and should return true if the given path is able to scan the + foreign relation asynchronously. + + + + If this function is not defined, it is assumed that the given path scans + the foreign relation using IterateForeignScan. + (This implies that the callback functions described below will never be + called, so they need not be provided either.) + + + + +void +ForeignAsyncRequest(AsyncRequest *areq); + + Produce one tuple asynchronously from the + ForeignScan node. areq is + the AsyncRequest struct describing the + ForeignScan node and the parent + Append node that requested the tuple from it. + This function should store the tuple into the slot specified by + areq->result, and set + areq->request_complete to true; + or if it needs to wait on an event external to the core server such as + network I/O, and cannot produce any tuple immediately, set the flag to + false, and set + areq->callback_pending to true + for the ForeignScan node to get a callback from + the callback functions described below. If no more tuples are available, + set the slot to NULL, and the + areq->request_complete flag to + true. It's recommended to use + ExecAsyncRequestDone or + ExecAsyncRequestPending to set the output parameters + in the areq. + + + + +void +ForeignAsyncConfigureWait(AsyncRequest *areq); + + Configure a file descriptor event for which the + ForeignScan node wishes to wait. + This function will only be called when the + ForeignScan node has the + areq->callback_pending flag set, and should add + the event to the as_eventset of the parent + Append node described by the + areq. See the comments for + ExecAsyncConfigureWait in + src/backend/executor/execAsync.c for additional + information. When the file descriptor event occurs, + ForeignAsyncNotify will be called. + + + + +void +ForeignAsyncNotify(AsyncRequest *areq); + + Process a relevant event that has occurred, then produce one tuple + asynchronously from the ForeignScan node. + This function should set the output parameters in the + areq in the same way as + ForeignAsyncRequest. + + + FDW Routines for Reparameterization of Paths diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 43c07da20e..af540fb02f 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1564,6 +1564,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + AppendReady + Waiting for subplan nodes of an Append plan + node to be ready. + BackupWaitWalArchive Waiting for WAL files required for a backup to be successfully diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index 07aa25799d..a1b426c50b 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -371,6 +371,34 @@ OPTIONS (ADD password_required 'false'); + + Asynchronous Execution Options + + + postgres_fdw supports asynchronous execution, which + runs multiple parts of an Append node + concurrently rather than serially to improve performance. + This execution can be controled using the following option: + + + + + + async_capable + + + This option controls whether postgres_fdw allows + foreign tables to be scanned concurrently for asynchronous execution. + It can be specified for a foreign table or a foreign server. + A table-level option overrides a server-level option. + The default is false. + + + + + + + Updatability Options diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index afc45429ba..fe75cabdcc 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1394,6 +1394,8 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); + if (plan->async_capable) + appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1413,6 +1415,7 @@ ExplainNode(PlanState *planstate, List *ancestors, if (custom_name) ExplainPropertyText("Custom Plan Provider", custom_name, es); ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es); + ExplainPropertyBool("Async Capable", plan->async_capable, es); } switch (nodeTag(plan)) diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 74ac59faa1..680fd69151 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ + execAsync.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/README b/src/backend/executor/README index 18b2ac1865..3726048c4a 100644 --- a/src/backend/executor/README +++ b/src/backend/executor/README @@ -359,3 +359,43 @@ query returning the same set of scan tuples multiple times. Likewise, SRFs are disallowed in an UPDATE's targetlist. There, they would have the effect of the same row being updated multiple times, which is not very useful --- and updates after the first would have no effect anyway. + + +Asynchronous Execution +---------------------- + +In cases where a node is waiting on an event external to the database system, +such as a ForeignScan awaiting network I/O, it's desirable for the node to +indicate that it cannot return any tuple immediately but may be able to do so +at a later time. A process which discovers this type of situation can always +handle it simply by blocking, but this may waste time that could be spent +executing some other part of the plan tree where progress could be made +immediately. This is particularly likely to occur when the plan tree contains +an Append node. Asynchronous execution runs multiple parts of an Append node +concurrently rather than serially to improve performance. + +For asynchronous execution, an Append node must first request a tuple from an +async-capable child node using ExecAsyncRequest. Next, it must execute the +asynchronous event loop using ExecAppendAsyncEventWait. Eventually, when a +child node to which an asynchronous request has been made produces a tuple, +the Append node will receive it from the event loop via ExecAsyncResponse. In +the current implementation of asynchronous execution, the only node type that +requests tuples from an async-capable child node is an Append, while the only +node type that might be async-capable is a ForeignScan. + +Typically, the ExecAsyncResponse callback is the only one required for nodes +that wish to request tuples asynchronously. On the other hand, async-capable +nodes generally need to implement three methods: + +1. When an asynchronous request is made, the node's ExecAsyncRequest callback + will be invoked; it should use ExecAsyncRequestPending to indicate that the + request is pending for a callback described below. Alternatively, it can + instead use ExecAsyncRequestDone if a result is available immediately. + +2. When the event loop wishes to wait or poll for file descriptor events, the + node's ExecAsyncConfigureWait callback will be invoked to configure the + file descriptor event for which the node wishes to wait. + +3. When the file descriptor becomes ready, the node's ExecAsyncNotify callback + will be invoked; like #1, it should use ExecAsyncRequestPending for another + callback or ExecAsyncRequestDone to return a result immediately. diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 4543ac79ed..58a8aa5ab7 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -531,6 +531,10 @@ ExecSupportsBackwardScan(Plan *node) { ListCell *l; + /* With async, tuples may be interleaved, so can't back up. */ + if (((Append *) node)->nasyncplans > 0) + return false; + foreach(l, ((Append *) node)->appendplans) { if (!ExecSupportsBackwardScan((Plan *) lfirst(l))) diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c new file mode 100644 index 0000000000..f1985e658c --- /dev/null +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,124 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/nodeAppend.h" +#include "executor/nodeForeignscan.h" + +/* + * Asynchronously request a tuple from a designed async-capable node. + */ +void +ExecAsyncRequest(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanRequest(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } + + ExecAsyncResponse(areq); +} + +/* + * Give the asynchronous node a chance to configure the file descriptor event + * for which it wishes to wait. We expect the node-type specific callback to + * make a single call of the following form: + * + * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); + */ +void +ExecAsyncConfigureWait(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanConfigureWait(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } +} + +/* + * Call the asynchronous node back when a relevant event has occurred. + */ +void +ExecAsyncNotify(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanNotify(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } + + ExecAsyncResponse(areq); +} + +/* + * Call the requestor back when an asynchronous node has produced a result. + */ +void +ExecAsyncResponse(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestor)) + { + case T_AppendState: + ExecAsyncAppendResponse(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestor)); + } +} + +/* + * A requestee node should call this function to deliver the tuple to its + * requestor node. The requestee node can call this from its ExecAsyncRequest + * or ExecAsyncNotify callback. + */ +void +ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result) +{ + areq->request_complete = true; + areq->result = result; +} + +/* + * A requestee node should call this function to indicate that it is pending + * for a callback. The requestee node can call this from its ExecAsyncRequest + * or ExecAsyncNotify callback. + */ +void +ExecAsyncRequestPending(AsyncRequest *areq) +{ + areq->callback_pending = true; + areq->request_complete = false; + areq->result = NULL; +} diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 15e4115bd6..7da8ffe065 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -57,10 +57,13 @@ #include "postgres.h" +#include "executor/execAsync.h" #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" /* Shared state for parallel-aware Append. */ struct ParallelAppendState @@ -78,12 +81,18 @@ struct ParallelAppendState }; #define INVALID_SUBPLAN_INDEX -1 +#define EVENT_BUFFER_SIZE 16 static TupleTableSlot *ExecAppend(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); static void mark_invalid_subplans_as_finished(AppendState *node); +static void ExecAppendAsyncBegin(AppendState *node); +static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); +static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); +static void ExecAppendAsyncEventWait(AppendState *node); +static void classify_matching_subplans(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags) AppendState *appendstate = makeNode(AppendState); PlanState **appendplanstates; Bitmapset *validsubplans; + Bitmapset *asyncplans; int nplans; + int nasyncplans; int firstvalid; int i, j; @@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* Let choose_next_subplan_* function handle setting the first subplan */ appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_syncdone = false; + appendstate->as_begun = false; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -191,12 +204,25 @@ ExecInitAppend(Append *node, EState *estate, int eflags) * While at it, find out the first valid partial plan. */ j = 0; + asyncplans = NULL; + nasyncplans = 0; firstvalid = nplans; i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + /* * Record the lowest appendplans index which is a valid partial plan. */ @@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* Initialize async state */ + appendstate->as_asyncplans = asyncplans; + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_asyncrequests = NULL; + appendstate->as_asyncresults = (TupleTableSlot **) + palloc0(nasyncplans * sizeof(TupleTableSlot *)); + appendstate->as_needrequest = NULL; + appendstate->as_eventset = NULL; + + if (nasyncplans > 0) + { + appendstate->as_asyncrequests = (AsyncRequest **) + palloc0(nplans * sizeof(AsyncRequest *)); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc(sizeof(AsyncRequest)); + areq->requestor = (PlanState *) appendstate; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + appendstate->as_asyncrequests[i] = areq; + } + } + /* * Miscellaneous initialization */ @@ -232,31 +289,59 @@ static TupleTableSlot * ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + TupleTableSlot *result; - if (node->as_whichplan < 0) + /* + * If this is the first call after Init or ReScan, we need to do the + * initialization work. + */ + if (!node->as_begun) { + Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX); + Assert(!node->as_syncdone); + /* Nothing to do if there are no subplans */ if (node->as_nplans == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); + /* If there are any async subplans, begin executing them. */ + if (node->as_nasyncplans > 0) + ExecAppendAsyncBegin(node); + /* - * If no subplan has been chosen, we must choose one before + * If no sync subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && - !node->choose_next_subplan(node)) + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); + + Assert(node->as_syncdone || + (node->as_whichplan >= 0 && + node->as_whichplan < node->as_nplans)); + + /* And we're initialized. */ + node->as_begun = true; } for (;;) { PlanState *subnode; - TupleTableSlot *result; CHECK_FOR_INTERRUPTS(); /* - * figure out which subplan we are currently processing + * try to get a tuple from an async subplan if any + */ + if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) + { + if (ExecAppendAsyncGetNext(node, &result)) + return result; + Assert(!node->as_syncdone); + Assert(bms_is_empty(node->as_needrequest)); + } + + /* + * figure out which sync subplan we are currently processing */ Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; @@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate) return result; } - /* choose new subplan; if none, we're done */ - if (!node->choose_next_subplan(node)) + /* + * wait or poll async events if any. We do this before checking for + * the end of iteration, because it might drain the remaining async + * subplans. + */ + if (node->as_nasyncremain > 0) + ExecAppendAsyncEventWait(node); + + /* choose new sync subplan; if no sync/async subplans, we're done */ + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } } @@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node) void ExecReScanAppend(AppendState *node) { + int nasyncplans = node->as_nasyncplans; int i; /* @@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node) { bms_free(node->as_valid_subplans); node->as_valid_subplans = NULL; + if (nasyncplans > 0) + { + bms_free(node->as_valid_asyncplans); + node->as_valid_asyncplans = NULL; + } } for (i = 0; i < node->as_nplans; i++) @@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node) ExecReScan(subnode); } + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + bms_free(node->as_needrequest); + node->as_needrequest = NULL; + } + /* Let choose_next_subplan_* function handle setting the first subplan */ node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_syncdone = false; + node->as_begun = false; } /* ---------------------------------------------------------------- @@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) /* ---------------------------------------------------------------- * choose_next_subplan_locally * - * Choose next subplan for a non-parallel-aware Append, + * Choose next sync subplan for a non-parallel-aware Append, * returning false if there are no more. * ---------------------------------------------------------------- */ @@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node) /* We should never be called when there are no subplans */ Assert(node->as_nplans > 0); + /* Nothing to do if syncdone */ + if (node->as_syncdone) + return false; + /* * If first call then have the bms member function choose the first valid - * subplan by initializing whichplan to -1. If there happen to be no - * valid subplans then the bms member function will handle that by - * returning a negative number which will allow us to exit returning a + * sync subplan by initializing whichplan to -1. If there happen to be + * no valid sync subplans then the bms member function will handle that + * by returning a negative number which will allow us to exit returning a * false value. */ if (whichplan == INVALID_SUBPLAN_INDEX) { - if (node->as_valid_subplans == NULL) + if (node->as_nasyncplans > 0) + { + /* We'd have filled as_valid_subplans already */ + Assert(node->as_valid_subplans); + } + else if (node->as_valid_subplans == NULL) node->as_valid_subplans = ExecFindMatchingSubPlans(node->as_prune_state); @@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node) nextplan = bms_prev_member(node->as_valid_subplans, whichplan); if (nextplan < 0) + { + /* Set as_syncdone if in async mode */ + if (node->as_nasyncplans > 0) + node->as_syncdone = true; return false; + } node->as_whichplan = nextplan; @@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node) node->as_pstate->pa_finished[i] = true; } } + +/* ---------------------------------------------------------------- + * Asynchronous Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendAsyncBegin + * + * Begin executing designed async-capable subplans. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncBegin(AppendState *node) +{ + int i; + + /* Backward scan is not supported by async-aware Appends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no async subplans. */ + Assert(node->as_nasyncplans > 0); + + /* If we've yet to determine the valid subplans then do so now. */ + if (node->as_valid_subplans == NULL) + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + classify_matching_subplans(node); + + /* Nothing to do if there are no valid async subplans. */ + if (node->as_nasyncremain == 0) + return; + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncGetNext + * + * Get the next tuple from any of the asynchronous subplans. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) +{ + *result = NULL; + + /* We should never be called when there are no valid async subplans. */ + Assert(node->as_nasyncremain > 0); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + while (node->as_nasyncremain > 0) + { + CHECK_FOR_INTERRUPTS(); + + /* Wait or poll async events. */ + ExecAppendAsyncEventWait(node); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + /* Break from loop if there's any sync subplan that isn't complete. */ + if (!node->as_syncdone) + break; + } + + /* + * If all sync subplans are complete, we're totally done scanning the + * given node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the sync subplans. + */ + if (node->as_syncdone) + { + Assert(node->as_nasyncremain == 0); + *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncRequest + * + * Request a tuple asynchronously. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) +{ + Bitmapset *needrequest; + int i; + + /* Nothing to do if there are no async subplans needing a new request. */ + if (bms_is_empty(node->as_needrequest)) + return false; + + /* + * If there are any asynchronously-generated results that have not yet + * been returned, we have nothing to do; just return one of them. + */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + /* Make a new request for each of the async subplans that need it. */ + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + i = -1; + while ((i = bms_next_member(needrequest, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } + bms_free(needrequest); + + /* Return one of the asynchronously-generated results if any. */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncEventWait + * + * Wait or poll for file descriptor events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncEventWait(AppendState *node) +{ + long timeout = node->as_syncdone ? -1 : 0; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred; + int i; + + /* We should never be called when there are no valid async subplans. */ + Assert(node->as_nasyncremain > 0); + + node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, + node->as_nasyncplans + 1); + AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* Wait for at least one event to occur. */ + noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, + EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY); + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + Assert(areq->callback_pending); + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } +} + +/* ---------------------------------------------------------------- + * ExecAsyncAppendResponse + * + * Receive a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +void +ExecAsyncAppendResponse(AsyncRequest *areq) +{ + AppendState *node = (AppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + + /* Nothing to do if the request is pending. */ + if (!areq->request_complete) + { + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + return; + } + + /* If the result is NULL or an empty slot, there's nothing more to do. */ + if (TupIsNull(slot)) + { + /* The ending subplan wouldn't have been pending for a callback. */ + Assert(!areq->callback_pending); + --node->as_nasyncremain; + return; + } + + /* Save result so we can return it. */ + Assert(node->as_nasyncresults < node->as_nasyncplans); + node->as_asyncresults[node->as_nasyncresults++] = slot; + + /* + * Mark the subplan that returned a result as ready for a new request. We + * don't launch another one here immediately because it might complete. + */ + node->as_needrequest = bms_add_member(node->as_needrequest, + areq->request_index); +} + +/* ---------------------------------------------------------------- + * classify_matching_subplans + * + * Classify the node's as_valid_subplans into sync ones and + * async ones, adjust it to contain sync ones only, and save + * async ones in the node's as_valid_asyncplans. + * ---------------------------------------------------------------- + */ +static void +classify_matching_subplans(AppendState *node) +{ + Bitmapset *valid_asyncplans; + + Assert(node->as_valid_asyncplans == NULL); + + /* Nothing to do if there are no valid subplans. */ + if (bms_is_empty(node->as_valid_subplans)) + { + node->as_syncdone = true; + node->as_nasyncremain = 0; + return; + } + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) + { + node->as_nasyncremain = 0; + return; + } + + /* Get valid async subplans. */ + valid_asyncplans = bms_copy(node->as_asyncplans); + valid_asyncplans = bms_int_members(valid_asyncplans, + node->as_valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + node->as_valid_subplans = bms_del_members(node->as_valid_subplans, + valid_asyncplans); + node->as_syncdone = bms_is_empty(node->as_valid_subplans); + + /* Save valid async subplans. */ + node->as_valid_asyncplans = valid_asyncplans; + node->as_nasyncremain = bms_num_members(valid_asyncplans); +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 0969e53c3a..898890fb08 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanRequest + * + * Asynchronously request a tuple from a designed async-capable node + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanRequest(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncRequest != NULL); + fdwroutine->ForeignAsyncRequest(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanConfigureWait + * + * In async mode, configure for a wait + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); + fdwroutine->ForeignAsyncConfigureWait(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanNotify + * + * Callback invoked when a relevant event has occurred + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncNotify != NULL); + fdwroutine->ForeignAsyncNotify(areq); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 1d0bb6e2e7..d58b79d525 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(plan_width); COPY_SCALAR_FIELD(parallel_aware); COPY_SCALAR_FIELD(parallel_safe); + COPY_SCALAR_FIELD(async_capable); COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); @@ -241,6 +242,7 @@ _copyAppend(const Append *from) */ COPY_BITMAPSET_FIELD(apprelids); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(nasyncplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 301fa30490..ff127a19ad 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_INT_FIELD(plan_width); WRITE_BOOL_FIELD(parallel_aware); WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(async_capable); WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); @@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_BITMAPSET_FIELD(apprelids); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(nasyncplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); } diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 377185f7c6..6a563e9903 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1615,6 +1615,7 @@ ReadCommonPlan(Plan *local_node) READ_INT_FIELD(plan_width); READ_BOOL_FIELD(parallel_aware); READ_BOOL_FIELD(parallel_safe); + READ_BOOL_FIELD(async_capable); READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); @@ -1711,6 +1712,7 @@ _readAppend(void) READ_BITMAPSET_FIELD(apprelids); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(nasyncplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index b92c948588..0c016a03dd 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false; bool enable_parallel_append = true; bool enable_parallel_hash = true; bool enable_partition_pruning = true; +bool enable_async_append = true; typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 906cab7053..78ef068fb7 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals); static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan, List *gating_quals); static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path); +static bool is_async_capable_path(Path *path); static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags); static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, @@ -1080,6 +1081,31 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) return plan; } +/* + * is_async_capable_path + * Check whether a given Path node is async-capable. + */ +static bool +is_async_capable_path(Path *path) +{ + switch (nodeTag(path)) + { + case T_ForeignPath: + { + FdwRoutine *fdwroutine = path->parent->fdwroutine; + + Assert(fdwroutine != NULL); + if (fdwroutine->IsForeignPathAsyncCapable != NULL && + fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) + return true; + } + break; + default: + break; + } + return false; +} + /* * create_append_plan * Create an Append plan for 'best_path' and (recursively) plans @@ -1097,6 +1123,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; ListCell *subpaths; + int nasyncplans = 0; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; int nodenumsortkeys = 0; @@ -1104,6 +1131,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) Oid *nodeSortOperators = NULL; Oid *nodeCollations = NULL; bool *nodeNullsFirst = NULL; + bool consider_async = false; /* * The subpaths list could be empty, if every child was proven empty by @@ -1167,6 +1195,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist)); } + /* If appropriate, consider async append */ + consider_async = (enable_async_append && pathkeys == NIL && + !best_path->path.parallel_safe && + list_length(best_path->subpaths) > 1); + /* Build the plan for each child */ foreach(subpaths, best_path->subpaths) { @@ -1234,6 +1267,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } subplans = lappend(subplans, subplan); + + /* Check to see if subplan can be executed asynchronously */ + if (consider_async && is_async_capable_path(subpath)) + { + subplan->async_capable = true; + ++nasyncplans; + } } /* @@ -1266,6 +1306,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } plan->appendplans = subplans; + plan->nasyncplans = nasyncplans; plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 60f45ccc4e..4b9bcd2b41 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3995,6 +3995,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) switch (w) { + case WAIT_EVENT_APPEND_READY: + event_name = "AppendReady"; + break; case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE: event_name = "BackupWaitWalArchive"; break; diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 43a5fded10..5f3318fa8f 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -2020,6 +2020,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, } #endif +/* + * Get the number of wait events registered in a given WaitEventSet. + */ +int +GetNumRegisteredWaitEvents(WaitEventSet *set) +{ + return set->nevents; +} + #if defined(WAIT_USE_POLL) /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0c5dc4d3e8..03daec9a08 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1128,6 +1128,16 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of async append plans."), + NULL, + GUC_EXPLAIN + }, + &enable_async_append, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, gettext_noop("Enables genetic query optimization."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index b234a6bfe6..791d39cf07 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -371,6 +371,7 @@ #enable_partitionwise_aggregate = off #enable_parallel_hash = on #enable_partition_pruning = on +#enable_async_append = on # - Planner Cost Constants - diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h new file mode 100644 index 0000000000..724034f226 --- /dev/null +++ b/src/include/executor/execAsync.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * execAsync.h + * Support functions for asynchronous execution + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAsync.h + *------------------------------------------------------------------------- + */ + +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" + +extern void ExecAsyncRequest(AsyncRequest *areq); +extern void ExecAsyncConfigureWait(AsyncRequest *areq); +extern void ExecAsyncNotify(AsyncRequest *areq); +extern void ExecAsyncResponse(AsyncRequest *areq); +extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result); +extern void ExecAsyncRequestPending(AsyncRequest *areq); + +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index cafd410a5d..fa54ac6ad2 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt); extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt); +extern void ExecAsyncAppendResponse(AsyncRequest *areq); + #endif /* NODEAPPEND_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 6ae7733e25..8ffc0ca5bf 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern void ExecAsyncForeignScanRequest(AsyncRequest *areq); +extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq); +extern void ExecAsyncForeignScanNotify(AsyncRequest *areq); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 248f78da45..7c89d081c7 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); + +typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -256,6 +264,12 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ForeignAsyncRequest_function ForeignAsyncRequest; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ForeignAsyncNotify_function ForeignAsyncNotify; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e31ad6204e..09ea7ef6a6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -515,6 +515,22 @@ typedef struct ResultRelInfo struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer; } ResultRelInfo; +/* ---------------- + * AsyncRequest + * + * State for an asynchronous tuple request. + * ---------------- + */ +typedef struct AsyncRequest +{ + struct PlanState *requestor; /* Node that wants a tuple */ + struct PlanState *requestee; /* Node from which a tuple is wanted */ + int request_index; /* Scratch space for requestor */ + bool callback_pending; /* Callback is needed */ + bool request_complete; /* Request complete, result valid */ + TupleTableSlot *result; /* Result (NULL if no more tuples) */ +} AsyncRequest; + /* ---------------- * EState information * @@ -1199,12 +1215,12 @@ typedef struct ModifyTableState * AppendState information * * nplans how many plans are in the array - * whichplan which plan is being executed (0 .. n-1), or a - * special negative value. See nodeAppend.c. + * whichplan which synchronous plan is being executed (0 .. n-1) + * or a special negative value. See nodeAppend.c. * prune_state details required to allow partitions to be * eliminated from the scan, or NULL if not possible. - * valid_subplans for runtime pruning, valid appendplans indexes to - * scan. + * valid_subplans for runtime pruning, valid synchronous appendplans + * indexes to scan. * ---------------- */ @@ -1220,12 +1236,25 @@ struct AppendState PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + bool as_begun; /* false means need to initialize */ + Bitmapset *as_asyncplans; /* asynchronous plans indexes */ + int as_nasyncplans; /* # of asynchronous plans */ + AsyncRequest **as_asyncrequests; /* array of AsyncRequests */ + TupleTableSlot **as_asyncresults; /* unreturned results of async plans */ + int as_nasyncresults; /* # of valid entries in as_asyncresults */ + bool as_syncdone; /* true if all synchronous plans done in + * asynchronous mode, else false */ + int as_nasyncremain; /* # of remaining asynchronous plans */ + Bitmapset *as_needrequest; /* asynchronous plans needing a new request */ + struct WaitEventSet *as_eventset; /* WaitEventSet used to configure + * file descriptor wait events */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ ParallelAppendState *as_pstate; /* parallel coordination info */ Size pstate_len; /* size of parallel coordination info */ struct PartitionPruneState *as_prune_state; Bitmapset *as_valid_subplans; + Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */ bool (*choose_next_subplan) (AppendState *); }; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 6e62104d0b..24ca616740 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -129,6 +129,11 @@ typedef struct Plan bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + /* + * information needed for asynchronous execution + */ + bool async_capable; /* engage asynchronous-capable logic? */ + /* * Common structural data for all Plan types. */ @@ -245,6 +250,7 @@ typedef struct Append Plan plan; Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ List *appendplans; + int nasyncplans; /* # of asynchronous plans */ /* * All 'appendplans' preceding this index are non-partial plans. All diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 1be93be098..a3fd93fe07 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate; extern PGDLLIMPORT bool enable_parallel_append; extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; +extern PGDLLIMPORT bool enable_async_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 87672e6f30..d699502cd9 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -966,7 +966,8 @@ typedef enum */ typedef enum { - WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC, + WAIT_EVENT_APPEND_READY = PG_WAIT_IPC, + WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WAIT_EVENT_BTREE_PAGE, diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 9e94fcaec2..44f9368c64 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -179,5 +179,6 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); extern void InitializeLatchWaitSet(void); +extern int GetNumRegisteredWaitEvents(WaitEventSet *set); #endif /* LATCH_H */ diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out index 791eba8511..b89b99fb02 100644 --- a/src/test/regress/expected/explain.out +++ b/src/test/regress/expected/explain.out @@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + + "Async Capable": false, + "Relation Name": "int8_tbl",+ "Alias": "i8", + "Startup Cost": N.N, + @@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8 + Seq Scan + false + + false + int8_tbl + i8 + N.N + @@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int - Plan: + Node Type: "Seq Scan" + Parallel Aware: false + + Async Capable: false + Relation Name: "int8_tbl"+ Alias: "i8" + Startup Cost: N.N + @@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8' "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + + "Async Capable": false, + "Relation Name": "int8_tbl",+ "Alias": "i8", + "Startup Cost": N.N, + @@ -346,6 +350,7 @@ select jsonb_pretty( "Actual Rows": 0, + "Actual Loops": 0, + "Startup Cost": 0.0, + + "Async Capable": false, + "Relation Name": "tenk1", + "Parallel Aware": true, + "Local Hit Blocks": 0, + @@ -391,6 +396,7 @@ select jsonb_pretty( "Actual Rows": 0, + "Actual Loops": 0, + "Startup Cost": 0.0, + + "Async Capable": false, + "Parallel Aware": false, + "Sort Space Used": 0, + "Local Hit Blocks": 0, + @@ -433,6 +439,7 @@ select jsonb_pretty( "Actual Rows": 0, + "Actual Loops": 0, + "Startup Cost": 0.0, + + "Async Capable": false, + "Parallel Aware": false, + "Workers Planned": 0, + "Local Hit Blocks": 0, + diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index 68ca321163..a417b566d9 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from "Node Type": "Incremental Sort", + "Actual Rows": 55, + "Actual Loops": 1, + + "Async Capable": false, + "Presorted Key": [ + "t.a" + ], + @@ -760,6 +761,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from "Node Type": "Incremental Sort", + "Actual Rows": 70, + "Actual Loops": 1, + + "Async Capable": false, + "Presorted Key": [ + "t.a" + ], + diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out index ff157ceb1c..499245068a 100644 --- a/src/test/regress/expected/insert_conflict.out +++ b/src/test/regress/expected/insert_conflict.out @@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + + "Async Capable": false, + "Relation Name": "insertconflicttest", + "Alias": "insertconflicttest", + "Conflict Resolution": "UPDATE", + @@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb { + "Node Type": "Result", + "Parent Relationship": "Member", + - "Parallel Aware": false + + "Parallel Aware": false, + + "Async Capable": false + } + ] + } + diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 6d048e309c..98dde452e6 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -95,6 +95,7 @@ select count(*) = 0 as ok from pg_stat_wal_receiver; select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- + enable_async_append | on enable_bitmapscan | on enable_gathermerge | on enable_hashagg | on @@ -113,7 +114,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(18 rows) +(19 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail