From 1ec7fca8592178281cd5cdada0f27a340fb813fc Mon Sep 17 00:00:00 2001 From: Etsuro Fujita Date: Fri, 30 Jul 2021 17:00:00 +0900 Subject: [PATCH] postgres_fdw: Fix handling of pending asynchronous requests. A pending asynchronous request is handled by process_pending_request(), which previously not only processed an in-progress remote query but performed ExecForeignScan() to produce a tuple to return to the local server asynchronously from the result of the remote query. But that led to a server crash when executing a query or led to an "InstrStartNode called twice in a row" or "InstrEndLoop called on running node" failure when doing EXPLAIN ANALYZE of it, in cases where the plan tree for it contained multiple async-capable nodes accessing the same initplan/subplan that contained multiple async-capable nodes scanning the same foreign tables as for the parent async-capable nodes, as reported by Andrey Lepikhov. The reason is that the second step in process_pending_request() invoked when executing the initplan/subplan for one of the parent async-capable nodes caused recursive execution of the initplan/subplan for another of the parent async-capable nodes. To fix, split process_pending_request() into the two steps and postpone the second step until ForeignAsyncConfigureWait() is called for each of the pending asynchronous requests. Also, in ExecAppendAsyncEventWait() we assumed that FDWs would register at least one wait event in a WaitEventSet created there when they were called from ForeignAsyncConfigureWait() in that function, but allow FDWs to register zero wait events in the WaitEventSet; modify ExecAppendAsyncEventWait() to just return in that case. Oversight in commit 27e1f1456. Back-patch to v14 where that commit went in. Andrey Lepikhov and Etsuro Fujita Discussion: https://postgr.es/m/fe5eaa19-1704-e4a4-76ee-3b9d37ade399@postgrespro.ru --- .../postgres_fdw/expected/postgres_fdw.out | 55 +++++++++++++++- contrib/postgres_fdw/postgres_fdw.c | 65 +++++++++++++++---- contrib/postgres_fdw/sql/postgres_fdw.sql | 13 +++- src/backend/executor/nodeAppend.c | 11 ++++ 4 files changed, 126 insertions(+), 18 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index ed25e7a743..78c05ee5ea 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10300,6 +10300,59 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; 2505 | 505 | 0505 | 2505 | 505 | 0505 (1 row) +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); +ANALYZE local_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + QUERY PLAN +---------------------------------------------------------------------------------------- + Nested Loop Left Join + Output: t1.a, t1.b, t1.c, async_pt.a, async_pt.b, async_pt.c, ($0) + Join Filter: (t1.a = async_pt.a) + InitPlan 1 (returns $0) + -> Aggregate + Output: count(*) + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_4 + Remote SQL: SELECT NULL FROM public.base_tbl1 WHERE ((a < 3000)) + -> Async Foreign Scan on public.async_p2 async_pt_5 + Remote SQL: SELECT NULL FROM public.base_tbl2 WHERE ((a < 3000)) + -> Seq Scan on public.local_tbl t1 + Output: t1.a, t1.b, t1.c + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c, $0 + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c, $0 + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000)) +(20 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + QUERY PLAN +----------------------------------------------------------------------------------------- + Nested Loop Left Join (actual rows=1 loops=1) + Join Filter: (t1.a = async_pt.a) + Rows Removed by Join Filter: 399 + InitPlan 1 (returns $0) + -> Aggregate (actual rows=1 loops=1) + -> Append (actual rows=400 loops=1) + -> Async Foreign Scan on async_p1 async_pt_4 (actual rows=200 loops=1) + -> Async Foreign Scan on async_p2 async_pt_5 (actual rows=200 loops=1) + -> Seq Scan on local_tbl t1 (actual rows=1 loops=1) + -> Append (actual rows=400 loops=1) + -> Async Foreign Scan on async_p1 async_pt_1 (actual rows=200 loops=1) + -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=200 loops=1) +(12 rows) + +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + a | b | c | a | b | c | count +------+-----+-----+------+-----+------+------- + 1505 | 505 | foo | 1505 | 505 | 0505 | 400 +(1 row) + EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; QUERY PLAN @@ -10342,8 +10395,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; (1 row) -- Check with foreign modify -CREATE TABLE local_tbl (a int, b int, c text); -INSERT INTO local_tbl VALUES (1505, 505, 'foo'); CREATE TABLE base_tbl3 (a int, b int, c text); CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) SERVER loopback OPTIONS (table_name 'base_tbl3'); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 51fac77f3d..8dfee2e02c 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); static void fetch_more_data_begin(AsyncRequest *areq); +static void complete_pending_request(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) /* This should not be called unless callback_pending */ Assert(areq->callback_pending); + /* + * If process_pending_request() has been invoked on the given request + * before we get here, we might have some tuples already; in which case + * complete the request + */ + if (fsstate->next_tuple < fsstate->num_tuples) + { + complete_pending_request(areq); + if (areq->request_complete) + return; + Assert(areq->callback_pending); + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + /* The core code would have registered postmaster death event */ Assert(GetNumRegisteredWaitEvents(set) >= 1); @@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * This is the case when the in-process request was made by another * Append. Note that it might be useless to process the request, * because the query might not need tuples from that Append anymore. - * Skip the given request if there are any configured events other - * than the postmaster death event; otherwise process the request, - * then begin a fetch to configure the event below, because otherwise - * we might end up with no configured events other than the postmaster - * death event. + * If there are any child subplans of the same parent that are ready + * for new requests, skip the given request. Likewise, if there are + * any configured events other than the postmaster death event, skip + * it. Otherwise, process the in-process request, then begin a fetch + * to configure the event below, because we might otherwise end up + * with no configured events other than the postmaster death event. */ + if (!bms_is_empty(requestor->as_needrequest)) + return; if (GetNumRegisteredWaitEvents(set) > 1) return; process_pending_request(pendingAreq); @@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state; - EState *estate = node->ss.ps.state; - MemoryContext oldcontext; + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); /* The request should be currently in-process */ Assert(fsstate->conn_state->pendingAreq == areq); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + fetch_more_data(node); + /* + * If we didn't get any tuples, must be end of data; complete the request + * now. Otherwise, we postpone completing the request until we are called + * from postgresForeignAsyncConfigureWait(). + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, NULL); + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + } +} + +/* + * Complete a pending asynchronous request. + */ +static void +complete_pending_request(AsyncRequest *areq) +{ /* The request would have been pending for a callback */ Assert(areq->callback_pending); /* Unlike AsyncNotify, we unset callback_pending ourselves */ areq->callback_pending = false; - fetch_more_data(node); - - /* We need to send a new query afterwards; don't fetch */ + /* We begin a fetch afterwards if necessary; don't fetch */ produce_tuple_asynchronously(areq, false); /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ @@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq) if (areq->requestee->instrument) InstrUpdateTupleCount(areq->requestee->instrument, TupIsNull(areq->result) ? 0.0 : 1.0); - - MemoryContextSwitchTo(oldcontext); } /* diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 02a6b15a13..75fff9bad0 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3274,6 +3274,16 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); +ANALYZE local_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; +SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a; + EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) @@ -3281,9 +3291,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; -- Check with foreign modify -CREATE TABLE local_tbl (a int, b int, c text); -INSERT INTO local_tbl VALUES (1505, 505, 'foo'); - CREATE TABLE base_tbl3 (a int, b int, c text); CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) SERVER loopback OPTIONS (table_name 'base_tbl3'); diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 755c1392f0..a4eef19d7f 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -1043,6 +1043,17 @@ ExecAppendAsyncEventWait(AppendState *node) ExecAsyncConfigureWait(areq); } + /* + * No need for further processing if there are no configured events other + * than the postmaster death event. + */ + if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) + { + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + return; + } + /* We wait on at most EVENT_BUFFER_SIZE events. */ if (nevents > EVENT_BUFFER_SIZE) nevents = EVENT_BUFFER_SIZE;