Fix EXPLAIN ANALYZE for async-capable nodes.

EXPLAIN ANALYZE for an async-capable ForeignScan node associated with
postgres_fdw is done just by using instrumentation for ExecProcNode()
called from the node's callbacks, causing the following problems:

1) If the remote table to scan is empty, the node is incorrectly
   considered as "never executed" by the command even if the node is
   executed, as ExecProcNode() isn't called from the node's callbacks at
   all in that case.
2) The command fails to collect timings for things other than
   ExecProcNode() done in the node, such as creating a cursor for the
   node's remote query.

To fix these problems, add instrumentation for async-capable nodes, and
modify postgres_fdw accordingly.

My oversight in commit 27e1f1456.

While at it, update a comment for the AsyncRequest struct in execnodes.h
and the documentation for the ForeignAsyncRequest API in fdwhandler.sgml
to match the code in ExecAsyncAppendResponse() in nodeAppend.c, and fix
typos in comments in nodeAppend.c.

Per report from Andrey Lepikhov, though I didn't use his patch.

Reviewed-by: Andrey Lepikhov
Discussion: https://postgr.es/m/2eb662bb-105d-fc20-7412-2f027cc3ca72%40postgrespro.ru
This commit is contained in:
Etsuro Fujita 2021-05-12 14:00:00 +09:00
parent e135743ef0
commit a363bc6da9
14 changed files with 136 additions and 28 deletions

View File

@ -314,7 +314,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
MemoryContextSwitchTo(oldcxt);
}
}

View File

@ -974,7 +974,7 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
MemoryContextSwitchTo(oldcxt);
}
}

View File

@ -10051,6 +10051,21 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
Filter: (t1_3.b === 505)
(14 rows)
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
QUERY PLAN
-------------------------------------------------------------------------
Limit (actual rows=1 loops=1)
-> Append (actual rows=1 loops=1)
-> Async Foreign Scan on async_p1 t1_1 (actual rows=0 loops=1)
Filter: (b === 505)
-> Async Foreign Scan on async_p2 t1_2 (actual rows=0 loops=1)
Filter: (b === 505)
-> Seq Scan on async_p3 t1_3 (actual rows=1 loops=1)
Filter: (b === 505)
Rows Removed by Filter: 101
(9 rows)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
a | b | c
------+-----+------
@ -10132,18 +10147,32 @@ SELECT * FROM join_tbl ORDER BY a1;
(3 rows)
DELETE FROM join_tbl;
RESET enable_mergejoin;
RESET enable_hashjoin;
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
DROP TABLE local_tbl;
DROP FOREIGN TABLE remote_tbl;
DROP FOREIGN TABLE insert_tbl;
DROP TABLE base_tbl3;
DROP TABLE base_tbl4;
RESET enable_mergejoin;
RESET enable_hashjoin;
-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
DELETE FROM async_p1;
DELETE FROM async_p2;
DELETE FROM async_p3;
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM async_pt;
QUERY PLAN
-------------------------------------------------------------------------
Append (actual rows=0 loops=1)
-> Async Foreign Scan on async_p1 async_pt_1 (actual rows=0 loops=1)
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=0 loops=1)
-> Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1)
(4 rows)
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
DROP TABLE join_tbl;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);

View File

@ -1542,7 +1542,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_values);
/* Set the async-capable flag */
fsstate->async_capable = node->ss.ps.plan->async_capable;
fsstate->async_capable = node->ss.ps.async_capable;
}
/*
@ -6867,7 +6867,7 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
}
/* Get a tuple from the ForeignScan node */
result = ExecProcNode((PlanState *) node);
result = areq->requestee->ExecProcNodeReal(areq->requestee);
if (!TupIsNull(result))
{
/* Mark the request as complete */
@ -6956,6 +6956,11 @@ process_pending_request(AsyncRequest *areq)
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
ExecAsyncResponse(areq);
/* Also, we do instrumentation ourselves, if required */
if (areq->requestee->instrument)
InstrUpdateTupleCount(areq->requestee->instrument,
TupIsNull(areq->result) ? 0.0 : 1.0);
MemoryContextSwitchTo(oldcontext);
}

View File

@ -3195,6 +3195,8 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
-- Check with foreign modify
@ -3226,19 +3228,28 @@ INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND
SELECT * FROM join_tbl ORDER BY a1;
DELETE FROM join_tbl;
DROP TABLE local_tbl;
DROP FOREIGN TABLE remote_tbl;
DROP FOREIGN TABLE insert_tbl;
DROP TABLE base_tbl3;
DROP TABLE base_tbl4;
RESET enable_mergejoin;
RESET enable_hashjoin;
-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
DELETE FROM async_p1;
DELETE FROM async_p2;
DELETE FROM async_p3;
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
SELECT * FROM async_pt;
-- Clean up
DROP TABLE async_pt;
DROP TABLE base_tbl1;
DROP TABLE base_tbl2;
DROP TABLE result_tbl;
DROP TABLE local_tbl;
DROP FOREIGN TABLE remote_tbl;
DROP FOREIGN TABLE insert_tbl;
DROP TABLE base_tbl3;
DROP TABLE base_tbl4;
DROP TABLE join_tbl;
ALTER SERVER loopback OPTIONS (DROP async_capable);

View File

@ -1597,7 +1597,7 @@ ForeignAsyncRequest(AsyncRequest *areq);
<literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
for the <structname>ForeignScan</structname> node to get a callback from
the callback functions described below. If no more tuples are available,
set the slot to NULL, and the
set the slot to NULL or an empty slot, and the
<literal>areq-&gt;request_complete</literal> flag to
<literal>true</literal>. It's recommended to use
<function>ExecAsyncRequestDone</function> or

View File

@ -15,6 +15,7 @@
#include "postgres.h"
#include "executor/execAsync.h"
#include "executor/executor.h"
#include "executor/nodeAppend.h"
#include "executor/nodeForeignscan.h"
@ -24,6 +25,13 @@
void
ExecAsyncRequest(AsyncRequest *areq)
{
if (areq->requestee->chgParam != NULL) /* something changed? */
ExecReScan(areq->requestee); /* let ReScan handle this */
/* must provide our own instrumentation support */
if (areq->requestee->instrument)
InstrStartNode(areq->requestee->instrument);
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq)
}
ExecAsyncResponse(areq);
/* must provide our own instrumentation support */
if (areq->requestee->instrument)
InstrStopNode(areq->requestee->instrument,
TupIsNull(areq->result) ? 0.0 : 1.0);
}
/*
@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq)
void
ExecAsyncConfigureWait(AsyncRequest *areq)
{
/* must provide our own instrumentation support */
if (areq->requestee->instrument)
InstrStartNode(areq->requestee->instrument);
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(areq->requestee));
}
/* must provide our own instrumentation support */
if (areq->requestee->instrument)
InstrStopNode(areq->requestee->instrument, 0.0);
}
/*
@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
void
ExecAsyncNotify(AsyncRequest *areq)
{
/* must provide our own instrumentation support */
if (areq->requestee->instrument)
InstrStartNode(areq->requestee->instrument);
switch (nodeTag(areq->requestee))
{
case T_ForeignScanState:
@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq)
}
ExecAsyncResponse(areq);
/* must provide our own instrumentation support */
if (areq->requestee->instrument)
InstrStopNode(areq->requestee->instrument,
TupIsNull(areq->result) ? 0.0 : 1.0);
}
/*

View File

@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_TrigWhenExprs = (ExprState **)
palloc0(n * sizeof(ExprState *));
if (instrument_options)
resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options);
resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false);
}
else
{

View File

@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
/* Set up instrumentation for this node if requested */
if (estate->es_instrument)
result->instrument = InstrAlloc(1, estate->es_instrument);
result->instrument = InstrAlloc(1, estate->es_instrument,
result->async_capable);
return result;
}

View File

@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add);
/* Allocate new instrumentation structure(s) */
Instrumentation *
InstrAlloc(int n, int instrument_options)
InstrAlloc(int n, int instrument_options, bool async_mode)
{
Instrumentation *instr;
@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options)
instr[i].need_bufusage = need_buffers;
instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
instr[i].async_mode = async_mode;
}
}
@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr)
void
InstrStopNode(Instrumentation *instr, double nTuples)
{
double save_tuplecount = instr->tuplecount;
instr_time endtime;
/* count the returned tuples */
@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples)
instr->running = true;
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
}
else
{
/*
* In async mode, if the plan node hadn't emitted any tuples before,
* this might be the first tuple
*/
if (instr->async_mode && save_tuplecount < 1.0)
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
}
}
/* Update tuple count */
void
InstrUpdateTupleCount(Instrumentation *instr, double nTuples)
{
/* count the returned tuples */
instr->tuplecount += nTuples;
}
/* Finish a run cycle for a plan node */

View File

@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate)
}
/*
* wait or poll async events if any. We do this before checking for
* the end of iteration, because it might drain the remaining async
* subplans.
* wait or poll for async events if any. We do this before checking
* for the end of iteration, because it might drain the remaining
* async subplans.
*/
if (node->as_nasyncremain > 0)
ExecAppendAsyncEventWait(node);
@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node)
/*
* If chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
* first ExecProcNode or by first ExecAsyncRequest.
*/
if (subnode->chgParam == NULL)
ExecReScan(subnode);
@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
{
CHECK_FOR_INTERRUPTS();
/* Wait or poll async events. */
/* Wait or poll for async events. */
ExecAppendAsyncEventWait(node);
/* Request a tuple asynchronously. */
@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
/* Nothing to do if the request is pending. */
if (!areq->request_complete)
{
/* The request would have been pending for a callback */
/* The request would have been pending for a callback. */
Assert(areq->callback_pending);
return;
}

View File

@ -209,6 +209,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
scanstate->fdw_recheck_quals =
ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
/*
* Determine whether to scan the foreign relation asynchronously or not;
* this has to be kept in sync with the code in ExecInitAppend().
*/
scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
estate->es_epq_active == NULL);
/*
* Initialize FDW-related state.
*/

View File

@ -55,6 +55,7 @@ typedef struct Instrumentation
bool need_timer; /* true if we need timer data */
bool need_bufusage; /* true if we need buffer usage data */
bool need_walusage; /* true if we need WAL usage data */
bool async_mode; /* true if node is in async mode */
/* Info about current plan cycle: */
bool running; /* true if we've completed first tuple */
instr_time starttime; /* start time of current iteration of node */
@ -84,10 +85,12 @@ typedef struct WorkerInstrumentation
extern PGDLLIMPORT BufferUsage pgBufferUsage;
extern PGDLLIMPORT WalUsage pgWalUsage;
extern Instrumentation *InstrAlloc(int n, int instrument_options);
extern Instrumentation *InstrAlloc(int n, int instrument_options,
bool async_mode);
extern void InstrInit(Instrumentation *instr, int instrument_options);
extern void InstrStartNode(Instrumentation *instr);
extern void InstrStopNode(Instrumentation *instr, double nTuples);
extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
extern void InstrEndLoop(Instrumentation *instr);
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
extern void InstrStartParallelQuery(void);

View File

@ -538,7 +538,8 @@ typedef struct AsyncRequest
int request_index; /* Scratch space for requestor */
bool callback_pending; /* Callback is needed */
bool request_complete; /* Request complete, result valid */
TupleTableSlot *result; /* Result (NULL if no more tuples) */
TupleTableSlot *result; /* Result (NULL or an empty slot if no more
* tuples) */
} AsyncRequest;
/* ----------------
@ -1003,6 +1004,8 @@ typedef struct PlanState
ExprContext *ps_ExprContext; /* node's expression-evaluation context */
ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */
bool async_capable; /* true if node is async-capable */
/*
* Scanslot's descriptor if known. This is a bit of a hack, but otherwise
* it's hard for expression compilation to optimize based on the