postgres_fdw: Fix handling of pending asynchronous requests.

A pending asynchronous request is handled by process_pending_request(),
which previously not only processed an in-progress remote query but
performed ExecForeignScan() to produce a tuple to return to the local
server asynchronously from the result of the remote query.  But that led
to a server crash when executing a query or led to an "InstrStartNode
called twice in a row" or "InstrEndLoop called on running node" failure
when doing EXPLAIN ANALYZE of it, in cases where the plan tree for it
contained multiple async-capable nodes accessing the same
initplan/subplan that contained multiple async-capable nodes scanning
the same foreign tables as for the parent async-capable nodes, as
reported by Andrey Lepikhov.  The reason is that the second step in
process_pending_request() invoked when executing the initplan/subplan
for one of the parent async-capable nodes caused recursive execution of
the initplan/subplan for another of the parent async-capable nodes.

To fix, split process_pending_request() into the two steps and postpone
the second step until ForeignAsyncConfigureWait() is called for each of
the pending asynchronous requests.  Also, in ExecAppendAsyncEventWait()
we assumed that FDWs would register at least one wait event in a
WaitEventSet created there when they were called from
ForeignAsyncConfigureWait() in that function, but allow FDWs to register
zero wait events in the WaitEventSet; modify ExecAppendAsyncEventWait()
to just return in that case.

Oversight in commit 27e1f1456.  Back-patch to v14 where that commit went
in.

Andrey Lepikhov and Etsuro Fujita

Discussion: https://postgr.es/m/fe5eaa19-1704-e4a4-76ee-3b9d37ade399@postgrespro.ru
This commit is contained in:
Etsuro Fujita 2021-07-30 17:00:00 +09:00
parent 16bd4becee
commit 1ec7fca859
4 changed files with 126 additions and 18 deletions

View File

@ -10300,6 +10300,59 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
2505 | 505 | 0505 | 2505 | 505 | 0505
(1 row)
CREATE TABLE local_tbl (a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
ANALYZE local_tbl;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
QUERY PLAN
----------------------------------------------------------------------------------------
Nested Loop Left Join
Output: t1.a, t1.b, t1.c, async_pt.a, async_pt.b, async_pt.c, ($0)
Join Filter: (t1.a = async_pt.a)
InitPlan 1 (returns $0)
-> Aggregate
Output: count(*)
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_4
Remote SQL: SELECT NULL FROM public.base_tbl1 WHERE ((a < 3000))
-> Async Foreign Scan on public.async_p2 async_pt_5
Remote SQL: SELECT NULL FROM public.base_tbl2 WHERE ((a < 3000))
-> Seq Scan on public.local_tbl t1
Output: t1.a, t1.b, t1.c
-> Append
-> Async Foreign Scan on public.async_p1 async_pt_1
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c, $0
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
-> Async Foreign Scan on public.async_p2 async_pt_2
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c, $0
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
(20 rows)
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
QUERY PLAN
-----------------------------------------------------------------------------------------
Nested Loop Left Join (actual rows=1 loops=1)
Join Filter: (t1.a = async_pt.a)
Rows Removed by Join Filter: 399
InitPlan 1 (returns $0)
-> Aggregate (actual rows=1 loops=1)
-> Append (actual rows=400 loops=1)
-> Async Foreign Scan on async_p1 async_pt_4 (actual rows=200 loops=1)
-> Async Foreign Scan on async_p2 async_pt_5 (actual rows=200 loops=1)
-> Seq Scan on local_tbl t1 (actual rows=1 loops=1)
-> Append (actual rows=400 loops=1)
-> Async Foreign Scan on async_p1 async_pt_1 (actual rows=200 loops=1)
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=200 loops=1)
(12 rows)
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
a | b | c | a | b | c | count
------+-----+-----+------+-----+------+-------
1505 | 505 | foo | 1505 | 505 | 0505 | 400
(1 row)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
QUERY PLAN
@ -10342,8 +10395,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
(1 row)
-- Check with foreign modify
CREATE TABLE local_tbl (a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl3');

View File

@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
static void fetch_more_data_begin(AsyncRequest *areq);
static void complete_pending_request(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
/*
* If process_pending_request() has been invoked on the given request
* before we get here, we might have some tuples already; in which case
* complete the request
*/
if (fsstate->next_tuple < fsstate->num_tuples)
{
complete_pending_request(areq);
if (areq->request_complete)
return;
Assert(areq->callback_pending);
}
/* We must have run out of tuples */
Assert(fsstate->next_tuple >= fsstate->num_tuples);
/* The core code would have registered postmaster death event */
Assert(GetNumRegisteredWaitEvents(set) >= 1);
@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* This is the case when the in-process request was made by another
* Append. Note that it might be useless to process the request,
* because the query might not need tuples from that Append anymore.
* Skip the given request if there are any configured events other
* than the postmaster death event; otherwise process the request,
* then begin a fetch to configure the event below, because otherwise
* we might end up with no configured events other than the postmaster
* death event.
* If there are any child subplans of the same parent that are ready
* for new requests, skip the given request. Likewise, if there are
* any configured events other than the postmaster death event, skip
* it. Otherwise, process the in-process request, then begin a fetch
* to configure the event below, because we might otherwise end up
* with no configured events other than the postmaster death event.
*/
if (!bms_is_empty(requestor->as_needrequest))
return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
process_pending_request(pendingAreq);
@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state;
EState *estate = node->ss.ps.state;
MemoryContext oldcontext;
/* The request would have been pending for a callback */
Assert(areq->callback_pending);
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
fetch_more_data(node);
/*
* If we didn't get any tuples, must be end of data; complete the request
* now. Otherwise, we postpone completing the request until we are called
* from postgresForeignAsyncConfigureWait().
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
/* Unlike AsyncNotify, we unset callback_pending ourselves */
areq->callback_pending = false;
/* Mark the request as complete */
ExecAsyncRequestDone(areq, NULL);
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
ExecAsyncResponse(areq);
}
}
/*
* Complete a pending asynchronous request.
*/
static void
complete_pending_request(AsyncRequest *areq)
{
/* The request would have been pending for a callback */
Assert(areq->callback_pending);
/* Unlike AsyncNotify, we unset callback_pending ourselves */
areq->callback_pending = false;
fetch_more_data(node);
/* We need to send a new query afterwards; don't fetch */
/* We begin a fetch afterwards if necessary; don't fetch */
produce_tuple_asynchronously(areq, false);
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq)
if (areq->requestee->instrument)
InstrUpdateTupleCount(areq->requestee->instrument,
TupIsNull(areq->result) ? 0.0 : 1.0);
MemoryContextSwitchTo(oldcontext);
}
/*

View File

@ -3274,6 +3274,16 @@ EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
CREATE TABLE local_tbl (a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
ANALYZE local_tbl;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
@ -3281,9 +3291,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
-- Check with foreign modify
CREATE TABLE local_tbl (a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl3');

View File

@ -1043,6 +1043,17 @@ ExecAppendAsyncEventWait(AppendState *node)
ExecAsyncConfigureWait(areq);
}
/*
* No need for further processing if there are no configured events other
* than the postmaster death event.
*/
if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
{
FreeWaitEventSet(node->as_eventset);
node->as_eventset = NULL;
return;
}
/* We wait on at most EVENT_BUFFER_SIZE events. */
if (nevents > EVENT_BUFFER_SIZE)
nevents = EVENT_BUFFER_SIZE;