diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index ed25e7a743..78c05ee5ea 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10300,6 +10300,59 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; 2505 | 505 | 0505 | 2505 | 505 | 0505 (1 row) +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); +ANALYZE local_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + QUERY PLAN +---------------------------------------------------------------------------------------- + Nested Loop Left Join + Output: t1.a, t1.b, t1.c, async_pt.a, async_pt.b, async_pt.c, ($0) + Join Filter: (t1.a = async_pt.a) + InitPlan 1 (returns $0) + -> Aggregate + Output: count(*) + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_4 + Remote SQL: SELECT NULL FROM public.base_tbl1 WHERE ((a < 3000)) + -> Async Foreign Scan on public.async_p2 async_pt_5 + Remote SQL: SELECT NULL FROM public.base_tbl2 WHERE ((a < 3000)) + -> Seq Scan on public.local_tbl t1 + Output: t1.a, t1.b, t1.c + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c, $0 + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c, $0 + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000)) +(20 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + QUERY PLAN +----------------------------------------------------------------------------------------- + Nested Loop Left Join (actual rows=1 loops=1) + Join Filter: (t1.a = async_pt.a) + Rows Removed by Join Filter: 399 + InitPlan 1 (returns $0) + -> Aggregate (actual rows=1 loops=1) + -> Append (actual rows=400 loops=1) + -> Async Foreign Scan on async_p1 async_pt_4 (actual rows=200 loops=1) + -> Async Foreign Scan on async_p2 async_pt_5 (actual rows=200 loops=1) + -> Seq Scan on local_tbl t1 (actual rows=1 loops=1) + -> Append (actual rows=400 loops=1) + -> Async Foreign Scan on async_p1 async_pt_1 (actual rows=200 loops=1) + -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=200 loops=1) +(12 rows) + +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + a | b | c | a | b | c | count +------+-----+-----+------+-----+------+------- + 1505 | 505 | foo | 1505 | 505 | 0505 | 400 +(1 row) + EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; QUERY PLAN @@ -10342,8 +10395,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; (1 row) -- Check with foreign modify -CREATE TABLE local_tbl (a int, b int, c text); -INSERT INTO local_tbl VALUES (1505, 505, 'foo'); CREATE TABLE base_tbl3 (a int, b int, c text); CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) SERVER loopback OPTIONS (table_name 'base_tbl3'); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 51fac77f3d..8dfee2e02c 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); static void fetch_more_data_begin(AsyncRequest *areq); +static void complete_pending_request(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) /* This should not be called unless callback_pending */ Assert(areq->callback_pending); + /* + * If process_pending_request() has been invoked on the given request + * before we get here, we might have some tuples already; in which case + * complete the request + */ + if (fsstate->next_tuple < fsstate->num_tuples) + { + complete_pending_request(areq); + if (areq->request_complete) + return; + Assert(areq->callback_pending); + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + /* The core code would have registered postmaster death event */ Assert(GetNumRegisteredWaitEvents(set) >= 1); @@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * This is the case when the in-process request was made by another * Append. Note that it might be useless to process the request, * because the query might not need tuples from that Append anymore. - * Skip the given request if there are any configured events other - * than the postmaster death event; otherwise process the request, - * then begin a fetch to configure the event below, because otherwise - * we might end up with no configured events other than the postmaster - * death event. + * If there are any child subplans of the same parent that are ready + * for new requests, skip the given request. Likewise, if there are + * any configured events other than the postmaster death event, skip + * it. Otherwise, process the in-process request, then begin a fetch + * to configure the event below, because we might otherwise end up + * with no configured events other than the postmaster death event. */ + if (!bms_is_empty(requestor->as_needrequest)) + return; if (GetNumRegisteredWaitEvents(set) > 1) return; process_pending_request(pendingAreq); @@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state; - EState *estate = node->ss.ps.state; - MemoryContext oldcontext; + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); /* The request should be currently in-process */ Assert(fsstate->conn_state->pendingAreq == areq); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + fetch_more_data(node); + /* + * If we didn't get any tuples, must be end of data; complete the request + * now. Otherwise, we postpone completing the request until we are called + * from postgresForeignAsyncConfigureWait(). + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, NULL); + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + } +} + +/* + * Complete a pending asynchronous request. + */ +static void +complete_pending_request(AsyncRequest *areq) +{ /* The request would have been pending for a callback */ Assert(areq->callback_pending); /* Unlike AsyncNotify, we unset callback_pending ourselves */ areq->callback_pending = false; - fetch_more_data(node); - - /* We need to send a new query afterwards; don't fetch */ + /* We begin a fetch afterwards if necessary; don't fetch */ produce_tuple_asynchronously(areq, false); /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ @@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq) if (areq->requestee->instrument) InstrUpdateTupleCount(areq->requestee->instrument, TupIsNull(areq->result) ? 0.0 : 1.0); - - MemoryContextSwitchTo(oldcontext); } /* diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 02a6b15a13..75fff9bad0 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3274,6 +3274,16 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); +ANALYZE local_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) @@ -3281,9 +3291,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; -- Check with foreign modify -CREATE TABLE local_tbl (a int, b int, c text); -INSERT INTO local_tbl VALUES (1505, 505, 'foo'); - CREATE TABLE base_tbl3 (a int, b int, c text); CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) SERVER loopback OPTIONS (table_name 'base_tbl3'); diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 755c1392f0..a4eef19d7f 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -1043,6 +1043,17 @@ ExecAppendAsyncEventWait(AppendState *node) ExecAsyncConfigureWait(areq); } + /* + * No need for further processing if there are no configured events other + * than the postmaster death event. + */ + if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) + { + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + return; + } + /* We wait on at most EVENT_BUFFER_SIZE events. */ if (nevents > EVENT_BUFFER_SIZE) nevents = EVENT_BUFFER_SIZE;