Fix oversight in commit 1ec7fca859.

I failed to account for the possibility that when
ExecAppendAsyncEventWait() notifies multiple async-capable nodes using
postgres_fdw, a preceding node might invoke process_pending_request() to
process a pending asynchronous request made by a succeeding node.  In
that case the succeeding node should produce a tuple to return to the
parent Append node from tuples fetched by process_pending_request() when
notified.  Repair.

Per buildfarm via Michael Paquier.  Back-patch to v14, like the previous
commit.

Thanks to Tom Lane for testing.

Discussion: https://postgr.es/m/YQP0UPT8KmPiHTMs%40paquier.xyz
This commit is contained in:
Etsuro Fujita 2021-08-02 12:45:00 +09:00
parent eaf5321c35
commit a8ed9bd59d
2 changed files with 29 additions and 13 deletions

View File

@ -6896,12 +6896,26 @@ postgresForeignAsyncNotify(AsyncRequest *areq)
ForeignScanState *node = (ForeignScanState *) areq->requestee; ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
/* The core code would have initialized the callback_pending flag */ /* The core code would have initialized the callback_pending flag */
Assert(!areq->callback_pending); Assert(!areq->callback_pending);
/*
* If process_pending_request() has been invoked on the given request
* before we get here, we might have some tuples already; in which case
* produce the next tuple
*/
if (fsstate->next_tuple < fsstate->num_tuples)
{
produce_tuple_asynchronously(areq, true);
return;
}
/* We must have run out of tuples */
Assert(fsstate->next_tuple >= fsstate->num_tuples);
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
/* On error, report the original query, not the FETCH. */ /* On error, report the original query, not the FETCH. */
if (!PQconsumeInput(fsstate->conn)) if (!PQconsumeInput(fsstate->conn))
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
@ -7027,7 +7041,7 @@ process_pending_request(AsyncRequest *areq)
/* /*
* If we didn't get any tuples, must be end of data; complete the request * If we didn't get any tuples, must be end of data; complete the request
* now. Otherwise, we postpone completing the request until we are called * now. Otherwise, we postpone completing the request until we are called
* from postgresForeignAsyncConfigureWait(). * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify().
*/ */
if (fsstate->next_tuple >= fsstate->num_tuples) if (fsstate->next_tuple >= fsstate->num_tuples)
{ {

View File

@ -1082,16 +1082,18 @@ ExecAppendAsyncEventWait(AppendState *node)
{ {
AsyncRequest *areq = (AsyncRequest *) w->user_data; AsyncRequest *areq = (AsyncRequest *) w->user_data;
/* if (areq->callback_pending)
* Mark it as no longer needing a callback. We must do this {
* before dispatching the callback in case the callback resets the /*
* flag. * Mark it as no longer needing a callback. We must do this
*/ * before dispatching the callback in case the callback resets
Assert(areq->callback_pending); * the flag.
areq->callback_pending = false; */
areq->callback_pending = false;
/* Do the actual work. */ /* Do the actual work. */
ExecAsyncNotify(areq); ExecAsyncNotify(areq);
}
} }
} }
} }