diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c index 445bb37191..e9092ba359 100644 --- a/contrib/auto_explain/auto_explain.c +++ b/contrib/auto_explain/auto_explain.c @@ -314,7 +314,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags) MemoryContext oldcxt; oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt); - queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); + queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); MemoryContextSwitchTo(oldcxt); } } diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index f42f07622e..77ca5abcdc 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -974,7 +974,7 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags) MemoryContext oldcxt; oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt); - queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); + queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); MemoryContextSwitchTo(oldcxt); } } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 6f533c745d..0b0c45f0d9 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10051,6 +10051,21 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; Filter: (t1_3.b === 505) (14 rows) +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + QUERY PLAN +------------------------------------------------------------------------- + Limit (actual rows=1 loops=1) + -> Append (actual rows=1 loops=1) + -> Async Foreign Scan on async_p1 t1_1 (actual rows=0 loops=1) + Filter: (b === 505) + -> Async Foreign Scan on async_p2 t1_2 (actual rows=0 loops=1) + Filter: (b === 505) + -> Seq Scan on async_p3 t1_3 (actual rows=1 loops=1) + Filter: (b === 505) + Rows Removed by Filter: 101 +(9 rows) + SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; a | b | c ------+-----+------ @@ -10132,18 +10147,32 @@ SELECT * FROM join_tbl ORDER BY a1; (3 rows) DELETE FROM join_tbl; -RESET enable_mergejoin; -RESET enable_hashjoin; --- Clean up -DROP TABLE async_pt; -DROP TABLE base_tbl1; -DROP TABLE base_tbl2; -DROP TABLE result_tbl; DROP TABLE local_tbl; DROP FOREIGN TABLE remote_tbl; DROP FOREIGN TABLE insert_tbl; DROP TABLE base_tbl3; DROP TABLE base_tbl4; +RESET enable_mergejoin; +RESET enable_hashjoin; +-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously +DELETE FROM async_p1; +DELETE FROM async_p2; +DELETE FROM async_p3; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM async_pt; + QUERY PLAN +------------------------------------------------------------------------- + Append (actual rows=0 loops=1) + -> Async Foreign Scan on async_p1 async_pt_1 (actual rows=0 loops=1) + -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=0 loops=1) + -> Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1) +(4 rows) + +-- Clean up +DROP TABLE async_pt; +DROP TABLE base_tbl1; +DROP TABLE base_tbl2; +DROP TABLE result_tbl; DROP TABLE join_tbl; ALTER SERVER loopback OPTIONS (DROP async_capable); ALTER SERVER loopback2 OPTIONS (DROP async_capable); diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 4ff58d9c27..ee93ee07cc 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -1542,7 +1542,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_values); /* Set the async-capable flag */ - fsstate->async_capable = node->ss.ps.plan->async_capable; + fsstate->async_capable = node->ss.ps.async_capable; } /* @@ -6867,7 +6867,7 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) } /* Get a tuple from the ForeignScan node */ - result = ExecProcNode((PlanState *) node); + result = areq->requestee->ExecProcNodeReal(areq->requestee); if (!TupIsNull(result)) { /* Mark the request as complete */ @@ -6956,6 +6956,11 @@ process_pending_request(AsyncRequest *areq) /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ ExecAsyncResponse(areq); + /* Also, we do instrumentation ourselves, if required */ + if (areq->requestee->instrument) + InstrUpdateTupleCount(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); + MemoryContextSwitchTo(oldcontext); } diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 000e2534fc..53adfe2abc 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3195,6 +3195,8 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; -- Check with foreign modify @@ -3226,19 +3228,28 @@ INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND SELECT * FROM join_tbl ORDER BY a1; DELETE FROM join_tbl; +DROP TABLE local_tbl; +DROP FOREIGN TABLE remote_tbl; +DROP FOREIGN TABLE insert_tbl; +DROP TABLE base_tbl3; +DROP TABLE base_tbl4; + RESET enable_mergejoin; RESET enable_hashjoin; +-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously +DELETE FROM async_p1; +DELETE FROM async_p2; +DELETE FROM async_p3; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM async_pt; + -- Clean up DROP TABLE async_pt; DROP TABLE base_tbl1; DROP TABLE base_tbl2; DROP TABLE result_tbl; -DROP TABLE local_tbl; -DROP FOREIGN TABLE remote_tbl; -DROP FOREIGN TABLE insert_tbl; -DROP TABLE base_tbl3; -DROP TABLE base_tbl4; DROP TABLE join_tbl; ALTER SERVER loopback OPTIONS (DROP async_capable); diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 8aa7edfe4a..d1194def82 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -1597,7 +1597,7 @@ ForeignAsyncRequest(AsyncRequest *areq); areq->callback_pending to true for the ForeignScan node to get a callback from the callback functions described below. If no more tuples are available, - set the slot to NULL, and the + set the slot to NULL or an empty slot, and the areq->request_complete flag to true. It's recommended to use ExecAsyncRequestDone or diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index f1985e658c..75108d36be 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "executor/execAsync.h" +#include "executor/executor.h" #include "executor/nodeAppend.h" #include "executor/nodeForeignscan.h" @@ -24,6 +25,13 @@ void ExecAsyncRequest(AsyncRequest *areq) { + if (areq->requestee->chgParam != NULL) /* something changed? */ + ExecReScan(areq->requestee); /* let ReScan handle this */ + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq) } ExecAsyncResponse(areq); + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); } /* @@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq) void ExecAsyncConfigureWait(AsyncRequest *areq) { + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq) elog(ERROR, "unrecognized node type: %d", (int) nodeTag(areq->requestee)); } + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, 0.0); } /* @@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq) void ExecAsyncNotify(AsyncRequest *areq) { + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq) } ExecAsyncResponse(areq); + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); } /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index df3d7f9a8b..58b4968735 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_TrigWhenExprs = (ExprState **) palloc0(n * sizeof(ExprState *)); if (instrument_options) - resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options); + resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false); } else { diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 9f8c7582e0..753f46863b 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags) /* Set up instrumentation for this node if requested */ if (estate->es_instrument) - result->instrument = InstrAlloc(1, estate->es_instrument); + result->instrument = InstrAlloc(1, estate->es_instrument, + result->async_capable); return result; } diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 237e13361b..2b106d8473 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add); /* Allocate new instrumentation structure(s) */ Instrumentation * -InstrAlloc(int n, int instrument_options) +InstrAlloc(int n, int instrument_options, bool async_mode) { Instrumentation *instr; @@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options) instr[i].need_bufusage = need_buffers; instr[i].need_walusage = need_wal; instr[i].need_timer = need_timer; + instr[i].async_mode = async_mode; } } @@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr) void InstrStopNode(Instrumentation *instr, double nTuples) { + double save_tuplecount = instr->tuplecount; instr_time endtime; /* count the returned tuples */ @@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples) instr->running = true; instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); } + else + { + /* + * In async mode, if the plan node hadn't emitted any tuples before, + * this might be the first tuple + */ + if (instr->async_mode && save_tuplecount < 1.0) + instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); + } +} + +/* Update tuple count */ +void +InstrUpdateTupleCount(Instrumentation *instr, double nTuples) +{ + /* count the returned tuples */ + instr->tuplecount += nTuples; } /* Finish a run cycle for a plan node */ diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 3c1f12adaf..1558fafad1 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate) } /* - * wait or poll async events if any. We do this before checking for - * the end of iteration, because it might drain the remaining async - * subplans. + * wait or poll for async events if any. We do this before checking + * for the end of iteration, because it might drain the remaining + * async subplans. */ if (node->as_nasyncremain > 0) ExecAppendAsyncEventWait(node); @@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node) /* * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. + * first ExecProcNode or by first ExecAsyncRequest. */ if (subnode->chgParam == NULL) ExecReScan(subnode); @@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) { CHECK_FOR_INTERRUPTS(); - /* Wait or poll async events. */ + /* Wait or poll for async events. */ ExecAppendAsyncEventWait(node); /* Request a tuple asynchronously. */ @@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq) /* Nothing to do if the request is pending. */ if (!areq->request_complete) { - /* The request would have been pending for a callback */ + /* The request would have been pending for a callback. */ Assert(areq->callback_pending); return; } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 898890fb08..9dc38d47ea 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -209,6 +209,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) scanstate->fdw_recheck_quals = ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); + /* + * Determine whether to scan the foreign relation asynchronously or not; + * this has to be kept in sync with the code in ExecInitAppend(). + */ + scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable && + estate->es_epq_active == NULL); + /* * Initialize FDW-related state. */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index c25aa1b04c..fc87eed4fb 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -55,6 +55,7 @@ typedef struct Instrumentation bool need_timer; /* true if we need timer data */ bool need_bufusage; /* true if we need buffer usage data */ bool need_walusage; /* true if we need WAL usage data */ + bool async_mode; /* true if node is in async mode */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ instr_time starttime; /* start time of current iteration of node */ @@ -84,10 +85,12 @@ typedef struct WorkerInstrumentation extern PGDLLIMPORT BufferUsage pgBufferUsage; extern PGDLLIMPORT WalUsage pgWalUsage; -extern Instrumentation *InstrAlloc(int n, int instrument_options); +extern Instrumentation *InstrAlloc(int n, int instrument_options, + bool async_mode); extern void InstrInit(Instrumentation *instr, int instrument_options); extern void InstrStartNode(Instrumentation *instr); extern void InstrStopNode(Instrumentation *instr, double nTuples); +extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); extern void InstrStartParallelQuery(void); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e7ae21c023..91a1c3a780 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -538,7 +538,8 @@ typedef struct AsyncRequest int request_index; /* Scratch space for requestor */ bool callback_pending; /* Callback is needed */ bool request_complete; /* Request complete, result valid */ - TupleTableSlot *result; /* Result (NULL if no more tuples) */ + TupleTableSlot *result; /* Result (NULL or an empty slot if no more + * tuples) */ } AsyncRequest; /* ---------------- @@ -1003,6 +1004,8 @@ typedef struct PlanState ExprContext *ps_ExprContext; /* node's expression-evaluation context */ ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */ + bool async_capable; /* true if node is async-capable */ + /* * Scanslot's descriptor if known. This is a bit of a hack, but otherwise * it's hard for expression compilation to optimize based on the