diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index eff7b04f11..2be14c5437 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -1602,6 +1602,7 @@ SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) FULL 20 | 0 | AAA020 (10 rows) +SET enable_resultcache TO off; -- right outer join + left outer join EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 RIGHT JOIN ft2 t2 ON (t1.c1 = t2.c1) LEFT JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; @@ -1628,6 +1629,7 @@ SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 RIGHT JOIN ft2 t2 ON (t1.c1 = t2.c1) LEFT 20 | 0 | AAA020 (10 rows) +RESET enable_resultcache; -- left outer join + right outer join EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) RIGHT JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; @@ -2139,22 +2141,25 @@ SELECT t1c1, avg(t1c1 + t2c1) FROM (SELECT t1.c1, t2.c1 FROM ft1 t1 JOIN ft2 t2 -- join with lateral reference EXPLAIN (VERBOSE, COSTS OFF) SELECT t1."C 1" FROM "S 1"."T 1" t1, LATERAL (SELECT DISTINCT t2.c1, t3.c1 FROM ft1 t2, ft2 t3 WHERE t2.c1 = t3.c1 AND t2.c2 = t1.c2) q ORDER BY t1."C 1" OFFSET 10 LIMIT 10; - QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit Output: t1."C 1" -> Nested Loop Output: t1."C 1" -> Index Scan using t1_pkey on "S 1"."T 1" t1 Output: t1."C 1", t1.c2, t1.c3, t1.c4, t1.c5, t1.c6, t1.c7, t1.c8 - -> HashAggregate - Output: t2.c1, t3.c1 - Group Key: t2.c1, t3.c1 - -> Foreign Scan - Output: t2.c1, t3.c1 - Relations: (public.ft1 t2) INNER JOIN (public.ft2 t3) - Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")) AND ((r1.c2 = $1::integer)))) -(13 rows) + -> Result Cache + Cache Key: t1.c2 + -> Subquery Scan on q + -> HashAggregate + Output: t2.c1, t3.c1 + Group Key: t2.c1, t3.c1 + -> Foreign Scan + Output: t2.c1, t3.c1 + Relations: (public.ft1 t2) INNER JOIN (public.ft2 t3) + Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")) AND ((r1.c2 = $1::integer)))) +(16 rows) SELECT t1."C 1" FROM "S 1"."T 1" t1, LATERAL (SELECT DISTINCT t2.c1, t3.c1 FROM ft1 t2, ft2 t3 WHERE t2.c1 = t3.c1 AND t2.c2 = t1.c2) q ORDER BY t1."C 1" OFFSET 10 LIMIT 10; C 1 diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 806a5bca28..21a29cc062 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -502,10 +502,12 @@ SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 FULL JOIN ft2 t2 ON (t1.c1 = t2.c1) LEFT EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) FULL JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) FULL JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; +SET enable_resultcache TO off; -- right outer join + left outer join EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 RIGHT JOIN ft2 t2 ON (t1.c1 = t2.c1) LEFT JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 RIGHT JOIN ft2 t2 ON (t1.c1 = t2.c1) LEFT JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; +RESET enable_resultcache; -- left outer join + right outer join EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) RIGHT JOIN ft4 t3 ON (t2.c1 = t3.c1) OFFSET 10 LIMIT 10; diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index d1e2e8c4c3..9d87b5097a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1770,8 +1770,9 @@ include_dir 'conf.d' fact in mind when choosing the value. Sort operations are used for ORDER BY, DISTINCT, and merge joins. - Hash tables are used in hash joins, hash-based aggregation, and - hash-based processing of IN subqueries. + Hash tables are used in hash joins, hash-based aggregation, result + cache nodes and hash-based processing of IN + subqueries. Hash-based operations are generally more sensitive to memory @@ -4925,6 +4926,25 @@ ANY num_sync ( + enable_resultcache (boolean) + + enable_resultcache configuration parameter + + + + + Enables or disables the query planner's use of result cache plans for + caching results from parameterized scans inside nested-loop joins. + This plan type allows scans to the underlying plans to be skipped when + the results for the current parameters are already in the cache. Less + commonly looked up results may be evicted from the cache when more + space is required for new entries. The default is + on. + + + + enable_mergejoin (boolean) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 872aaa7aed..ede8cec947 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -108,6 +108,8 @@ static void show_sort_info(SortState *sortstate, ExplainState *es); static void show_incremental_sort_info(IncrementalSortState *incrsortstate, ExplainState *es); static void show_hash_info(HashState *hashstate, ExplainState *es); +static void show_resultcache_info(ResultCacheState *rcstate, List *ancestors, + ExplainState *es); static void show_hashagg_info(AggState *hashstate, ExplainState *es); static void show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es); @@ -1284,6 +1286,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Material: pname = sname = "Materialize"; break; + case T_ResultCache: + pname = sname = "Result Cache"; + break; case T_Sort: pname = sname = "Sort"; break; @@ -1996,6 +2001,10 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Hash: show_hash_info(castNode(HashState, planstate), es); break; + case T_ResultCache: + show_resultcache_info(castNode(ResultCacheState, planstate), + ancestors, es); + break; default: break; } @@ -3063,6 +3072,141 @@ show_hash_info(HashState *hashstate, ExplainState *es) } } +/* + * Show information on result cache hits/misses/evictions and memory usage. + */ +static void +show_resultcache_info(ResultCacheState *rcstate, List *ancestors, + ExplainState *es) +{ + Plan *plan = ((PlanState *) rcstate)->plan; + ListCell *lc; + List *context; + StringInfoData keystr; + char *seperator = ""; + bool useprefix; + int64 memPeakKb; + + initStringInfo(&keystr); + + /* + * It's hard to imagine having a result cache with fewer than 2 RTEs, but + * let's just keep the same useprefix logic as elsewhere in this file. + */ + useprefix = list_length(es->rtable) > 1 || es->verbose; + + /* Set up deparsing context */ + context = set_deparse_context_plan(es->deparse_cxt, + plan, + ancestors); + + foreach(lc, ((ResultCache *) plan)->param_exprs) + { + Node *expr = (Node *) lfirst(lc); + + appendStringInfoString(&keystr, seperator); + + appendStringInfoString(&keystr, deparse_expression(expr, context, + useprefix, false)); + seperator = ", "; + } + + if (es->format != EXPLAIN_FORMAT_TEXT) + { + ExplainPropertyText("Cache Key", keystr.data, es); + } + else + { + ExplainIndentText(es); + appendStringInfo(es->str, "Cache Key: %s\n", keystr.data); + } + + pfree(keystr.data); + + if (!es->analyze) + return; + + /* + * mem_peak is only set when we freed memory, so we must use mem_used when + * mem_peak is 0. + */ + if (rcstate->stats.mem_peak > 0) + memPeakKb = (rcstate->stats.mem_peak + 1023) / 1024; + else + memPeakKb = (rcstate->mem_used + 1023) / 1024; + + if (rcstate->stats.cache_misses > 0) + { + if (es->format != EXPLAIN_FORMAT_TEXT) + { + ExplainPropertyInteger("Cache Hits", NULL, rcstate->stats.cache_hits, es); + ExplainPropertyInteger("Cache Misses", NULL, rcstate->stats.cache_misses, es); + ExplainPropertyInteger("Cache Evictions", NULL, rcstate->stats.cache_evictions, es); + ExplainPropertyInteger("Cache Overflows", NULL, rcstate->stats.cache_overflows, es); + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es); + } + else + { + ExplainIndentText(es); + appendStringInfo(es->str, + "Hits: " UINT64_FORMAT " Misses: " UINT64_FORMAT " Evictions: " UINT64_FORMAT " Overflows: " UINT64_FORMAT " Memory Usage: " INT64_FORMAT "kB\n", + rcstate->stats.cache_hits, + rcstate->stats.cache_misses, + rcstate->stats.cache_evictions, + rcstate->stats.cache_overflows, + memPeakKb); + } + } + + if (rcstate->shared_info == NULL) + return; + + /* Show details from parallel workers */ + for (int n = 0; n < rcstate->shared_info->num_workers; n++) + { + ResultCacheInstrumentation *si; + + si = &rcstate->shared_info->sinstrument[n]; + + if (es->workers_state) + ExplainOpenWorker(n, es); + + /* + * Since the worker's ResultCacheState.mem_used field is unavailable + * to us, ExecEndResultCache will have set the + * ResultCacheInstrumentation.mem_peak field for us. No need to do + * the zero checks like we did for the serial case above. + */ + memPeakKb = (si->mem_peak + 1023) / 1024; + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + ExplainIndentText(es); + appendStringInfo(es->str, + "Hits: " UINT64_FORMAT " Misses: " UINT64_FORMAT " Evictions: " UINT64_FORMAT " Overflows: " UINT64_FORMAT " Memory Usage: " INT64_FORMAT "kB\n", + si->cache_hits, si->cache_misses, + si->cache_evictions, si->cache_overflows, + memPeakKb); + } + else + { + ExplainPropertyInteger("Cache Hits", NULL, + si->cache_hits, es); + ExplainPropertyInteger("Cache Misses", NULL, + si->cache_misses, es); + ExplainPropertyInteger("Cache Evictions", NULL, + si->cache_evictions, es); + ExplainPropertyInteger("Cache Overflows", NULL, + si->cache_overflows, es); + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, + es); + } + + if (es->workers_state) + ExplainCloseWorker(n, es); + } +} + /* * Show information on hash aggregate memory usage and batches. */ diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 680fd69151..f08b282a5e 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -61,6 +61,7 @@ OBJS = \ nodeProjectSet.o \ nodeRecursiveunion.o \ nodeResult.o \ + nodeResultCache.o \ nodeSamplescan.o \ nodeSeqscan.o \ nodeSetOp.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 58a8aa5ab7..b3726a54f3 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -44,6 +44,7 @@ #include "executor/nodeProjectSet.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" +#include "executor/nodeResultCache.h" #include "executor/nodeSamplescan.h" #include "executor/nodeSeqscan.h" #include "executor/nodeSetOp.h" @@ -254,6 +255,10 @@ ExecReScan(PlanState *node) ExecReScanMaterial((MaterialState *) node); break; + case T_ResultCacheState: + ExecReScanResultCache((ResultCacheState *) node); + break; + case T_SortState: ExecReScanSort((SortState *) node); break; diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index e33231f7be..23c0fb9379 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -3696,3 +3696,137 @@ ExecBuildGroupingEqual(TupleDesc ldesc, TupleDesc rdesc, return state; } + +/* + * Build equality expression that can be evaluated using ExecQual(), returning + * true if the expression context's inner/outer tuples are equal. Datums in + * the inner/outer slots are assumed to be in the same order and quantity as + * the 'eqfunctions' parameter. NULLs are treated as equal. + * + * desc: tuple descriptor of the to-be-compared tuples + * lops: the slot ops for the inner tuple slots + * rops: the slot ops for the outer tuple slots + * eqFunctions: array of function oids of the equality functions to use + * this must be the same length as the 'param_exprs' list. + * collations: collation Oids to use for equality comparison. Must be the + * same length as the 'param_exprs' list. + * parent: parent executor node + */ +ExprState * +ExecBuildParamSetEqual(TupleDesc desc, + const TupleTableSlotOps *lops, + const TupleTableSlotOps *rops, + const Oid *eqfunctions, + const Oid *collations, + const List *param_exprs, + PlanState *parent) +{ + ExprState *state = makeNode(ExprState); + ExprEvalStep scratch = {0}; + int maxatt = list_length(param_exprs); + List *adjust_jumps = NIL; + ListCell *lc; + + state->expr = NULL; + state->flags = EEO_FLAG_IS_QUAL; + state->parent = parent; + + scratch.resvalue = &state->resvalue; + scratch.resnull = &state->resnull; + + /* push deform steps */ + scratch.opcode = EEOP_INNER_FETCHSOME; + scratch.d.fetch.last_var = maxatt; + scratch.d.fetch.fixed = false; + scratch.d.fetch.known_desc = desc; + scratch.d.fetch.kind = lops; + if (ExecComputeSlotInfo(state, &scratch)) + ExprEvalPushStep(state, &scratch); + + scratch.opcode = EEOP_OUTER_FETCHSOME; + scratch.d.fetch.last_var = maxatt; + scratch.d.fetch.fixed = false; + scratch.d.fetch.known_desc = desc; + scratch.d.fetch.kind = rops; + if (ExecComputeSlotInfo(state, &scratch)) + ExprEvalPushStep(state, &scratch); + + for (int attno = 0; attno < maxatt; attno++) + { + Form_pg_attribute att = TupleDescAttr(desc, attno); + Oid foid = eqfunctions[attno]; + Oid collid = collations[attno]; + FmgrInfo *finfo; + FunctionCallInfo fcinfo; + AclResult aclresult; + + /* Check permission to call function */ + aclresult = pg_proc_aclcheck(foid, GetUserId(), ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(foid)); + + InvokeFunctionExecuteHook(foid); + + /* Set up the primary fmgr lookup information */ + finfo = palloc0(sizeof(FmgrInfo)); + fcinfo = palloc0(SizeForFunctionCallInfo(2)); + fmgr_info(foid, finfo); + fmgr_info_set_expr(NULL, finfo); + InitFunctionCallInfoData(*fcinfo, finfo, 2, + collid, NULL, NULL); + + /* left arg */ + scratch.opcode = EEOP_INNER_VAR; + scratch.d.var.attnum = attno; + scratch.d.var.vartype = att->atttypid; + scratch.resvalue = &fcinfo->args[0].value; + scratch.resnull = &fcinfo->args[0].isnull; + ExprEvalPushStep(state, &scratch); + + /* right arg */ + scratch.opcode = EEOP_OUTER_VAR; + scratch.d.var.attnum = attno; + scratch.d.var.vartype = att->atttypid; + scratch.resvalue = &fcinfo->args[1].value; + scratch.resnull = &fcinfo->args[1].isnull; + ExprEvalPushStep(state, &scratch); + + /* evaluate distinctness */ + scratch.opcode = EEOP_NOT_DISTINCT; + scratch.d.func.finfo = finfo; + scratch.d.func.fcinfo_data = fcinfo; + scratch.d.func.fn_addr = finfo->fn_addr; + scratch.d.func.nargs = 2; + scratch.resvalue = &state->resvalue; + scratch.resnull = &state->resnull; + ExprEvalPushStep(state, &scratch); + + /* then emit EEOP_QUAL to detect if result is false (or null) */ + scratch.opcode = EEOP_QUAL; + scratch.d.qualexpr.jumpdone = -1; + scratch.resvalue = &state->resvalue; + scratch.resnull = &state->resnull; + ExprEvalPushStep(state, &scratch); + adjust_jumps = lappend_int(adjust_jumps, + state->steps_len - 1); + } + + /* adjust jump targets */ + foreach(lc, adjust_jumps) + { + ExprEvalStep *as = &state->steps[lfirst_int(lc)]; + + Assert(as->opcode == EEOP_QUAL); + Assert(as->d.qualexpr.jumpdone == -1); + as->d.qualexpr.jumpdone = state->steps_len; + } + + scratch.resvalue = NULL; + scratch.resnull = NULL; + scratch.opcode = EEOP_DONE; + ExprEvalPushStep(state, &scratch); + + ExecReadyExpr(state); + + return state; +} diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c95d5170e4..366d0b20b9 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -35,6 +35,7 @@ #include "executor/nodeIncrementalSort.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeIndexscan.h" +#include "executor/nodeResultCache.h" #include "executor/nodeSeqscan.h" #include "executor/nodeSort.h" #include "executor/nodeSubplan.h" @@ -292,6 +293,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecAggEstimate((AggState *) planstate, e->pcxt); break; + case T_ResultCacheState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecResultCacheEstimate((ResultCacheState *) planstate, e->pcxt); + break; default: break; } @@ -512,6 +517,10 @@ ExecParallelInitializeDSM(PlanState *planstate, /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecAggInitializeDSM((AggState *) planstate, d->pcxt); break; + case T_ResultCacheState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecResultCacheInitializeDSM((ResultCacheState *) planstate, d->pcxt); + break; default: break; } @@ -988,6 +997,7 @@ ExecParallelReInitializeDSM(PlanState *planstate, case T_HashState: case T_SortState: case T_IncrementalSortState: + case T_ResultCacheState: /* these nodes have DSM state, but no reinitialization is required */ break; @@ -1057,6 +1067,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, case T_AggState: ExecAggRetrieveInstrumentation((AggState *) planstate); break; + case T_ResultCacheState: + ExecResultCacheRetrieveInstrumentation((ResultCacheState *) planstate); + break; default: break; } @@ -1349,6 +1362,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecAggInitializeWorker((AggState *) planstate, pwcxt); break; + case T_ResultCacheState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecResultCacheInitializeWorker((ResultCacheState *) planstate, + pwcxt); + break; default: break; } diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 29766d8196..9f8c7582e0 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -102,6 +102,7 @@ #include "executor/nodeProjectSet.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" +#include "executor/nodeResultCache.h" #include "executor/nodeSamplescan.h" #include "executor/nodeSeqscan.h" #include "executor/nodeSetOp.h" @@ -325,6 +326,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_ResultCache: + result = (PlanState *) ExecInitResultCache((ResultCache *) node, + estate, eflags); + break; + case T_Group: result = (PlanState *) ExecInitGroup((Group *) node, estate, eflags); @@ -713,6 +719,10 @@ ExecEndNode(PlanState *node) ExecEndIncrementalSort((IncrementalSortState *) node); break; + case T_ResultCacheState: + ExecEndResultCache((ResultCacheState *) node); + break; + case T_GroupState: ExecEndGroup((GroupState *) node); break; diff --git a/src/backend/executor/nodeResultCache.c b/src/backend/executor/nodeResultCache.c new file mode 100644 index 0000000000..c8de818c8c --- /dev/null +++ b/src/backend/executor/nodeResultCache.c @@ -0,0 +1,1138 @@ +/*------------------------------------------------------------------------- + * + * nodeResultCache.c + * Routines to handle caching of results from parameterized nodes + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeResultCache.c + * + * ResultCache nodes are intended to sit above parameterized nodes in the plan + * tree in order to cache results from them. The intention here is that a + * repeat scan with a parameter value that has already been seen by the node + * can fetch tuples from the cache rather than having to re-scan the outer + * node all over again. The query planner may choose to make use of one of + * these when it thinks rescans for previously seen values are likely enough + * to warrant adding the additional node. + * + * The method of cache we use is a hash table. When the cache fills, we never + * spill tuples to disk, instead, we choose to evict the least recently used + * cache entry from the cache. We remember the least recently used entry by + * always pushing new entries and entries we look for onto the tail of a + * doubly linked list. This means that older items always bubble to the top + * of this LRU list. + * + * Sometimes our callers won't run their scans to completion. For example a + * semi-join only needs to run until it finds a matching tuple, and once it + * does, the join operator skips to the next outer tuple and does not execute + * the inner side again on that scan. Because of this, we must keep track of + * when a cache entry is complete, and by default, we know it is when we run + * out of tuples to read during the scan. However, there are cases where we + * can mark the cache entry as complete without exhausting the scan of all + * tuples. One case is unique joins, where the join operator knows that there + * will only be at most one match for any given outer tuple. In order to + * support such cases we allow the "singlerow" option to be set for the cache. + * This option marks the cache entry as complete after we read the first tuple + * from the subnode. + * + * It's possible when we're filling the cache for a given set of parameters + * that we're unable to free enough memory to store any more tuples. If this + * happens then we'll have already evicted all other cache entries. When + * caching another tuple would cause us to exceed our memory budget, we must + * free the entry that we're currently populating and move the state machine + * into RC_CACHE_BYPASS_MODE. This means that we'll not attempt to cache any + * further tuples for this particular scan. We don't have the memory for it. + * The state machine will be reset again on the next rescan. If the memory + * requirements to cache the next parameter's tuples are less demanding, then + * that may allow us to start putting useful entries back into the cache + * again. + * + * + * INTERFACE ROUTINES + * ExecResultCache - lookup cache, exec subplan when not found + * ExecInitResultCache - initialize node and subnodes + * ExecEndResultCache - shutdown node and subnodes + * ExecReScanResultCache - rescan the result cache + * + * ExecResultCacheEstimate estimates DSM space needed for parallel plan + * ExecResultCacheInitializeDSM initialize DSM for parallel plan + * ExecResultCacheInitializeWorker attach to DSM info in parallel worker + * ExecResultCacheRetrieveInstrumentation get instrumentation from worker + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/parallel.h" +#include "common/hashfn.h" +#include "executor/executor.h" +#include "executor/nodeResultCache.h" +#include "lib/ilist.h" +#include "miscadmin.h" +#include "utils/lsyscache.h" + +/* States of the ExecResultCache state machine */ +#define RC_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */ +#define RC_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */ +#define RC_FILLING_CACHE 3 /* Read outer node to fill cache */ +#define RC_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our + * subplan without caching anything */ +#define RC_END_OF_SCAN 5 /* Ready for rescan */ + + +/* Helper macros for memory accounting */ +#define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(ResultCacheEntry) + \ + sizeof(ResultCacheKey) + \ + (e)->key->params->t_len); +#define CACHE_TUPLE_BYTES(t) (sizeof(ResultCacheTuple) + \ + (t)->mintuple->t_len) + + /* ResultCacheTuple Stores an individually cached tuple */ +typedef struct ResultCacheTuple +{ + MinimalTuple mintuple; /* Cached tuple */ + struct ResultCacheTuple *next; /* The next tuple with the same parameter + * values or NULL if it's the last one */ +} ResultCacheTuple; + +/* + * ResultCacheKey + * The hash table key for cached entries plus the LRU list link + */ +typedef struct ResultCacheKey +{ + MinimalTuple params; + dlist_node lru_node; /* Pointer to next/prev key in LRU list */ +} ResultCacheKey; + +/* + * ResultCacheEntry + * The data struct that the cache hash table stores + */ +typedef struct ResultCacheEntry +{ + ResultCacheKey *key; /* Hash key for hash table lookups */ + ResultCacheTuple *tuplehead; /* Pointer to the first tuple or NULL if + * no tuples are cached for this entry */ + uint32 hash; /* Hash value (cached) */ + char status; /* Hash status */ + bool complete; /* Did we read the outer plan to completion? */ +} ResultCacheEntry; + + +#define SH_PREFIX resultcache +#define SH_ELEMENT_TYPE ResultCacheEntry +#define SH_KEY_TYPE ResultCacheKey * +#define SH_SCOPE static inline +#define SH_DECLARE +#include "lib/simplehash.h" + +static uint32 ResultCacheHash_hash(struct resultcache_hash *tb, + const ResultCacheKey *key); +static int ResultCacheHash_equal(struct resultcache_hash *tb, + const ResultCacheKey *params1, + const ResultCacheKey *params2); + +#define SH_PREFIX resultcache +#define SH_ELEMENT_TYPE ResultCacheEntry +#define SH_KEY_TYPE ResultCacheKey * +#define SH_KEY key +#define SH_HASH_KEY(tb, key) ResultCacheHash_hash(tb, key) +#define SH_EQUAL(tb, a, b) (ResultCacheHash_equal(tb, a, b) == 0) +#define SH_SCOPE static inline +#define SH_STORE_HASH +#define SH_GET_HASH(tb, a) a->hash +#define SH_DEFINE +#include "lib/simplehash.h" + +/* + * ResultCacheHash_hash + * Hash function for simplehash hashtable. 'key' is unused here as we + * require that all table lookups first populate the ResultCacheState's + * probeslot with the key values to be looked up. + */ +static uint32 +ResultCacheHash_hash(struct resultcache_hash *tb, const ResultCacheKey *key) +{ + ResultCacheState *rcstate = (ResultCacheState *) tb->private_data; + TupleTableSlot *pslot = rcstate->probeslot; + uint32 hashkey = 0; + int numkeys = rcstate->nkeys; + FmgrInfo *hashfunctions = rcstate->hashfunctions; + Oid *collations = rcstate->collations; + + for (int i = 0; i < numkeys; i++) + { + /* rotate hashkey left 1 bit at each step */ + hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); + + if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */ + { + uint32 hkey; + + hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], + collations[i], pslot->tts_values[i])); + hashkey ^= hkey; + } + } + + return murmurhash32(hashkey); +} + +/* + * ResultCacheHash_equal + * Equality function for confirming hash value matches during a hash + * table lookup. 'key2' is never used. Instead the ResultCacheState's + * probeslot is always populated with details of what's being looked up. + */ +static int +ResultCacheHash_equal(struct resultcache_hash *tb, const ResultCacheKey *key1, + const ResultCacheKey *key2) +{ + ResultCacheState *rcstate = (ResultCacheState *) tb->private_data; + ExprContext *econtext = rcstate->ss.ps.ps_ExprContext; + TupleTableSlot *tslot = rcstate->tableslot; + TupleTableSlot *pslot = rcstate->probeslot; + + /* probeslot should have already been prepared by prepare_probe_slot() */ + + ExecStoreMinimalTuple(key1->params, tslot, false); + + econtext->ecxt_innertuple = tslot; + econtext->ecxt_outertuple = pslot; + return !ExecQualAndReset(rcstate->cache_eq_expr, econtext); +} + +/* + * Initialize the hash table to empty. + */ +static void +build_hash_table(ResultCacheState *rcstate, uint32 size) +{ + /* Make a guess at a good size when we're not given a valid size. */ + if (size == 0) + size = 1024; + + /* resultcache_create will convert the size to a power of 2 */ + rcstate->hashtable = resultcache_create(rcstate->tableContext, size, + rcstate); +} + +/* + * prepare_probe_slot + * Populate rcstate's probeslot with the values from the tuple stored + * in 'key'. If 'key' is NULL, then perform the population by evaluating + * rcstate's param_exprs. + */ +static inline void +prepare_probe_slot(ResultCacheState *rcstate, ResultCacheKey *key) +{ + TupleTableSlot *pslot = rcstate->probeslot; + TupleTableSlot *tslot = rcstate->tableslot; + int numKeys = rcstate->nkeys; + + ExecClearTuple(pslot); + + if (key == NULL) + { + /* Set the probeslot's values based on the current parameter values */ + for (int i = 0; i < numKeys; i++) + pslot->tts_values[i] = ExecEvalExpr(rcstate->param_exprs[i], + rcstate->ss.ps.ps_ExprContext, + &pslot->tts_isnull[i]); + } + else + { + /* Process the key's MinimalTuple and store the values in probeslot */ + ExecStoreMinimalTuple(key->params, tslot, false); + slot_getallattrs(tslot); + memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys); + memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys); + } + + ExecStoreVirtualTuple(pslot); +} + +/* + * entry_purge_tuples + * Remove all tuples from the cache entry pointed to by 'entry'. This + * leaves an empty cache entry. Also, update the memory accounting to + * reflect the removal of the tuples. + */ +static inline void +entry_purge_tuples(ResultCacheState *rcstate, ResultCacheEntry *entry) +{ + ResultCacheTuple *tuple = entry->tuplehead; + uint64 freed_mem = 0; + + while (tuple != NULL) + { + ResultCacheTuple *next = tuple->next; + + freed_mem += CACHE_TUPLE_BYTES(tuple); + + /* Free memory used for this tuple */ + pfree(tuple->mintuple); + pfree(tuple); + + tuple = next; + } + + entry->complete = false; + entry->tuplehead = NULL; + + /* Update the memory accounting */ + rcstate->mem_used -= freed_mem; + + Assert(rcstate->mem_used >= 0); +} + +/* + * remove_cache_entry + * Remove 'entry' from the cache and free memory used by it. + */ +static void +remove_cache_entry(ResultCacheState *rcstate, ResultCacheEntry *entry) +{ + ResultCacheKey *key = entry->key; + + dlist_delete(&entry->key->lru_node); + +#ifdef USE_ASSERT_CHECKING + + /* + * Validate the memory accounting code is correct in assert builds. XXX is + * this too expensive for USE_ASSERT_CHECKING? + */ + { + int i, + count; + uint64 mem = 0; + + count = 0; + for (i = 0; i < rcstate->hashtable->size; i++) + { + ResultCacheEntry *entry = &rcstate->hashtable->data[i]; + + if (entry->status == resultcache_SH_IN_USE) + { + ResultCacheTuple *tuple = entry->tuplehead; + + mem += EMPTY_ENTRY_MEMORY_BYTES(entry); + while (tuple != NULL) + { + mem += CACHE_TUPLE_BYTES(tuple); + tuple = tuple->next; + } + count++; + } + } + + Assert(count == rcstate->hashtable->members); + Assert(mem == rcstate->mem_used); + } +#endif + + /* Remove all of the tuples from this entry */ + entry_purge_tuples(rcstate, entry); + + /* + * Update memory accounting. entry_purge_tuples should have already + * subtracted the memory used for each cached tuple. Here we just update + * the amount used by the entry itself. + */ + rcstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry); + + Assert(rcstate->mem_used >= 0); + + /* Remove the entry from the cache */ + resultcache_delete_item(rcstate->hashtable, entry); + + pfree(key->params); + pfree(key); +} + +/* + * cache_reduce_memory + * Evict older and less recently used items from the cache in order to + * reduce the memory consumption back to something below the + * ResultCacheState's mem_limit. + * + * 'specialkey', if not NULL, causes the function to return false if the entry + * which the key belongs to is removed from the cache. + */ +static bool +cache_reduce_memory(ResultCacheState *rcstate, ResultCacheKey *specialkey) +{ + bool specialkey_intact = true; /* for now */ + dlist_mutable_iter iter; + uint64 evictions = 0; + + /* Update peak memory usage */ + if (rcstate->mem_used > rcstate->stats.mem_peak) + rcstate->stats.mem_peak = rcstate->mem_used; + + /* We expect only to be called when we've gone over budget on memory */ + Assert(rcstate->mem_used > rcstate->mem_limit); + + /* Start the eviction process starting at the head of the LRU list. */ + dlist_foreach_modify(iter, &rcstate->lru_list) + { + ResultCacheKey *key = dlist_container(ResultCacheKey, lru_node, + iter.cur); + ResultCacheEntry *entry; + + /* + * Populate the hash probe slot in preparation for looking up this LRU + * entry. + */ + prepare_probe_slot(rcstate, key); + + /* + * Ideally the LRU list pointers would be stored in the entry itself + * rather than in the key. Unfortunately, we can't do that as the + * simplehash.h code may resize the table and allocate new memory for + * entries which would result in those pointers pointing to the old + * buckets. However, it's fine to use the key to store this as that's + * only referenced by a pointer in the entry, which of course follows + * the entry whenever the hash table is resized. Since we only have a + * pointer to the key here, we must perform a hash table lookup to + * find the entry that the key belongs to. + */ + entry = resultcache_lookup(rcstate->hashtable, NULL); + + /* A good spot to check for corruption of the table and LRU list. */ + Assert(entry != NULL); + Assert(entry->key == key); + + /* + * If we're being called to free memory while the cache is being + * populated with new tuples, then we'd better take some care as we + * could end up freeing the entry which 'specialkey' belongs to. + * Generally callers will pass 'specialkey' as the key for the cache + * entry which is currently being populated, so we must set + * 'specialkey_intact' to false to inform the caller the specialkey + * entry has been removed. + */ + if (key == specialkey) + specialkey_intact = false; + + /* + * Finally remove the entry. This will remove from the LRU list too. + */ + remove_cache_entry(rcstate, entry); + + evictions++; + + /* Exit if we've freed enough memory */ + if (rcstate->mem_used <= rcstate->mem_limit) + break; + } + + rcstate->stats.cache_evictions += evictions; /* Update Stats */ + + return specialkey_intact; +} + +/* + * cache_lookup + * Perform a lookup to see if we've already cached results based on the + * scan's current parameters. If we find an existing entry we move it to + * the end of the LRU list, set *found to true then return it. If we + * don't find an entry then we create a new one and add it to the end of + * the LRU list. We also update cache memory accounting and remove older + * entries if we go over the memory budget. If we managed to free enough + * memory we return the new entry, else we return NULL. + * + * Callers can assume we'll never return NULL when *found is true. + */ +static ResultCacheEntry * +cache_lookup(ResultCacheState *rcstate, bool *found) +{ + ResultCacheKey *key; + ResultCacheEntry *entry; + MemoryContext oldcontext; + + /* prepare the probe slot with the current scan parameters */ + prepare_probe_slot(rcstate, NULL); + + /* + * Add the new entry to the cache. No need to pass a valid key since the + * hash function uses rcstate's probeslot, which we populated above. + */ + entry = resultcache_insert(rcstate->hashtable, NULL, found); + + if (*found) + { + /* + * Move existing entry to the tail of the LRU list to mark it as the + * most recently used item. + */ + dlist_move_tail(&rcstate->lru_list, &entry->key->lru_node); + + return entry; + } + + oldcontext = MemoryContextSwitchTo(rcstate->tableContext); + + /* Allocate a new key */ + entry->key = key = (ResultCacheKey *) palloc(sizeof(ResultCacheKey)); + key->params = ExecCopySlotMinimalTuple(rcstate->probeslot); + + /* Update the total cache memory utilization */ + rcstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry); + + /* Initialize this entry */ + entry->complete = false; + entry->tuplehead = NULL; + + /* + * Since this is the most recently used entry, push this entry onto the + * end of the LRU list. + */ + dlist_push_tail(&rcstate->lru_list, &entry->key->lru_node); + + rcstate->last_tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + /* + * If we've gone over our memory budget, then we'll free up some space in + * the cache. + */ + if (rcstate->mem_used > rcstate->mem_limit) + { + /* + * Try to free up some memory. It's highly unlikely that we'll fail + * to do so here since the entry we've just added is yet to contain + * any tuples and we're able to remove any other entry to reduce the + * memory consumption. + */ + if (unlikely(!cache_reduce_memory(rcstate, key))) + return NULL; + + /* + * The process of removing entries from the cache may have caused the + * code in simplehash.h to shuffle elements to earlier buckets in the + * hash table. If it has, we'll need to find the entry again by + * performing a lookup. Fortunately, we can detect if this has + * happened by seeing if the entry is still in use and that the key + * pointer matches our expected key. + */ + if (entry->status != resultcache_SH_IN_USE || entry->key != key) + { + /* + * We need to repopulate the probeslot as lookups performed during + * the cache evictions above will have stored some other key. + */ + prepare_probe_slot(rcstate, key); + + /* Re-find the newly added entry */ + entry = resultcache_lookup(rcstate->hashtable, NULL); + Assert(entry != NULL); + } + } + + return entry; +} + +/* + * cache_store_tuple + * Add the tuple stored in 'slot' to the rcstate's current cache entry. + * The cache entry must have already been made with cache_lookup(). + * rcstate's last_tuple field must point to the tail of rcstate->entry's + * list of tuples. + */ +static bool +cache_store_tuple(ResultCacheState *rcstate, TupleTableSlot *slot) +{ + ResultCacheTuple *tuple; + ResultCacheEntry *entry = rcstate->entry; + MemoryContext oldcontext; + + Assert(slot != NULL); + Assert(entry != NULL); + + oldcontext = MemoryContextSwitchTo(rcstate->tableContext); + + tuple = (ResultCacheTuple *) palloc(sizeof(ResultCacheTuple)); + tuple->mintuple = ExecCopySlotMinimalTuple(slot); + tuple->next = NULL; + + /* Account for the memory we just consumed */ + rcstate->mem_used += CACHE_TUPLE_BYTES(tuple); + + if (entry->tuplehead == NULL) + { + /* + * This is the first tuple for this entry, so just point the list head + * to it. + */ + entry->tuplehead = tuple; + } + else + { + /* push this tuple onto the tail of the list */ + rcstate->last_tuple->next = tuple; + } + + rcstate->last_tuple = tuple; + MemoryContextSwitchTo(oldcontext); + + /* + * If we've gone over our memory budget then free up some space in the + * cache. + */ + if (rcstate->mem_used > rcstate->mem_limit) + { + ResultCacheKey *key = entry->key; + + if (!cache_reduce_memory(rcstate, key)) + return false; + + /* + * The process of removing entries from the cache may have caused the + * code in simplehash.h to shuffle elements to earlier buckets in the + * hash table. If it has, we'll need to find the entry again by + * performing a lookup. Fortunately, we can detect if this has + * happened by seeing if the entry is still in use and that the key + * pointer matches our expected key. + */ + if (entry->status != resultcache_SH_IN_USE || entry->key != key) + { + /* + * We need to repopulate the probeslot as lookups performed during + * the cache evictions above will have stored some other key. + */ + prepare_probe_slot(rcstate, key); + + /* Re-find the entry */ + rcstate->entry = entry = resultcache_lookup(rcstate->hashtable, + NULL); + Assert(entry != NULL); + } + } + + return true; +} + +static TupleTableSlot * +ExecResultCache(PlanState *pstate) +{ + ResultCacheState *node = castNode(ResultCacheState, pstate); + PlanState *outerNode; + TupleTableSlot *slot; + + switch (node->rc_status) + { + case RC_CACHE_LOOKUP: + { + ResultCacheEntry *entry; + TupleTableSlot *outerslot; + bool found; + + Assert(node->entry == NULL); + + /* + * We're only ever in this state for the first call of the + * scan. Here we have a look to see if we've already seen the + * current parameters before and if we have already cached a + * complete set of records that the outer plan will return for + * these parameters. + * + * When we find a valid cache entry, we'll return the first + * tuple from it. If not found, we'll create a cache entry and + * then try to fetch a tuple from the outer scan. If we find + * one there, we'll try to cache it. + */ + + /* see if we've got anything cached for the current parameters */ + entry = cache_lookup(node, &found); + + if (found && entry->complete) + { + node->stats.cache_hits += 1; /* stats update */ + + /* + * Set last_tuple and entry so that the state + * RC_CACHE_FETCH_NEXT_TUPLE can easily find the next + * tuple for these parameters. + */ + node->last_tuple = entry->tuplehead; + node->entry = entry; + + /* Fetch the first cached tuple, if there is one */ + if (entry->tuplehead) + { + node->rc_status = RC_CACHE_FETCH_NEXT_TUPLE; + + slot = node->ss.ps.ps_ResultTupleSlot; + ExecStoreMinimalTuple(entry->tuplehead->mintuple, + slot, false); + + return slot; + } + + /* The cache entry is void of any tuples. */ + node->rc_status = RC_END_OF_SCAN; + return NULL; + } + + /* Handle cache miss */ + node->stats.cache_misses += 1; /* stats update */ + + if (found) + { + /* + * A cache entry was found, but the scan for that entry + * did not run to completion. We'll just remove all + * tuples and start again. It might be tempting to + * continue where we left off, but there's no guarantee + * the outer node will produce the tuples in the same + * order as it did last time. + */ + entry_purge_tuples(node, entry); + } + + /* Scan the outer node for a tuple to cache */ + outerNode = outerPlanState(node); + outerslot = ExecProcNode(outerNode); + if (TupIsNull(outerslot)) + { + /* + * cache_lookup may have returned NULL due to failure to + * free enough cache space, so ensure we don't do anything + * here that assumes it worked. There's no need to go into + * bypass mode here as we're setting rc_status to end of + * scan. + */ + if (likely(entry)) + entry->complete = true; + + node->rc_status = RC_END_OF_SCAN; + return NULL; + } + + node->entry = entry; + + /* + * If we failed to create the entry or failed to store the + * tuple in the entry, then go into bypass mode. + */ + if (unlikely(entry == NULL || + !cache_store_tuple(node, outerslot))) + { + node->stats.cache_overflows += 1; /* stats update */ + + node->rc_status = RC_CACHE_BYPASS_MODE; + + /* + * No need to clear out last_tuple as we'll stay in bypass + * mode until the end of the scan. + */ + } + else + { + /* + * If we only expect a single row from this scan then we + * can mark that we're not expecting more. This allows + * cache lookups to work even when the scan has not been + * executed to completion. + */ + entry->complete = node->singlerow; + node->rc_status = RC_FILLING_CACHE; + } + + slot = node->ss.ps.ps_ResultTupleSlot; + ExecCopySlot(slot, outerslot); + return slot; + } + + case RC_CACHE_FETCH_NEXT_TUPLE: + { + /* We shouldn't be in this state if these are not set */ + Assert(node->entry != NULL); + Assert(node->last_tuple != NULL); + + /* Skip to the next tuple to output */ + node->last_tuple = node->last_tuple->next; + + /* No more tuples in the cache */ + if (node->last_tuple == NULL) + { + node->rc_status = RC_END_OF_SCAN; + return NULL; + } + + slot = node->ss.ps.ps_ResultTupleSlot; + ExecStoreMinimalTuple(node->last_tuple->mintuple, slot, + false); + + return slot; + } + + case RC_FILLING_CACHE: + { + TupleTableSlot *outerslot; + ResultCacheEntry *entry = node->entry; + + /* entry should already have been set by RC_CACHE_LOOKUP */ + Assert(entry != NULL); + + /* + * When in the RC_FILLING_CACHE state, we've just had a cache + * miss and are populating the cache with the current scan + * tuples. + */ + outerNode = outerPlanState(node); + outerslot = ExecProcNode(outerNode); + if (TupIsNull(outerslot)) + { + /* No more tuples. Mark it as complete */ + entry->complete = true; + node->rc_status = RC_END_OF_SCAN; + return NULL; + } + + /* + * Validate if the planner properly set the singlerow flag. It + * should only set that if each cache entry can, at most, + * return 1 row. XXX maybe this should be an Assert? + */ + if (unlikely(entry->complete)) + elog(ERROR, "cache entry already complete"); + + /* Record the tuple in the current cache entry */ + if (unlikely(!cache_store_tuple(node, outerslot))) + { + /* Couldn't store it? Handle overflow */ + node->stats.cache_overflows += 1; /* stats update */ + + node->rc_status = RC_CACHE_BYPASS_MODE; + + /* + * No need to clear out entry or last_tuple as we'll stay + * in bypass mode until the end of the scan. + */ + } + + slot = node->ss.ps.ps_ResultTupleSlot; + ExecCopySlot(slot, outerslot); + return slot; + } + + case RC_CACHE_BYPASS_MODE: + { + TupleTableSlot *outerslot; + + /* + * When in bypass mode we just continue to read tuples without + * caching. We need to wait until the next rescan before we + * can come out of this mode. + */ + outerNode = outerPlanState(node); + outerslot = ExecProcNode(outerNode); + if (TupIsNull(outerslot)) + { + node->rc_status = RC_END_OF_SCAN; + return NULL; + } + + slot = node->ss.ps.ps_ResultTupleSlot; + ExecCopySlot(slot, outerslot); + return slot; + } + + case RC_END_OF_SCAN: + + /* + * We've already returned NULL for this scan, but just in case + * something calls us again by mistake. + */ + return NULL; + + default: + elog(ERROR, "unrecognized resultcache state: %d", + (int) node->rc_status); + return NULL; + } /* switch */ +} + +ResultCacheState * +ExecInitResultCache(ResultCache *node, EState *estate, int eflags) +{ + ResultCacheState *rcstate = makeNode(ResultCacheState); + Plan *outerNode; + int i; + int nkeys; + Oid *eqfuncoids; + + /* check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); + + rcstate->ss.ps.plan = (Plan *) node; + rcstate->ss.ps.state = estate; + rcstate->ss.ps.ExecProcNode = ExecResultCache; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &rcstate->ss.ps); + + outerNode = outerPlan(node); + outerPlanState(rcstate) = ExecInitNode(outerNode, estate, eflags); + + /* + * Initialize return slot and type. No need to initialize projection info + * because this node doesn't do projections. + */ + ExecInitResultTupleSlotTL(&rcstate->ss.ps, &TTSOpsMinimalTuple); + rcstate->ss.ps.ps_ProjInfo = NULL; + + /* + * Initialize scan slot and type. + */ + ExecCreateScanSlotFromOuterPlan(estate, &rcstate->ss, &TTSOpsMinimalTuple); + + /* + * Set the state machine to lookup the cache. We won't find anything + * until we cache something, but this saves a special case to create the + * first entry. + */ + rcstate->rc_status = RC_CACHE_LOOKUP; + + rcstate->nkeys = nkeys = node->numKeys; + rcstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs); + rcstate->tableslot = MakeSingleTupleTableSlot(rcstate->hashkeydesc, + &TTSOpsMinimalTuple); + rcstate->probeslot = MakeSingleTupleTableSlot(rcstate->hashkeydesc, + &TTSOpsVirtual); + + rcstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *)); + rcstate->collations = node->collations; /* Just point directly to the plan + * data */ + rcstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); + + eqfuncoids = palloc(nkeys * sizeof(Oid)); + + for (i = 0; i < nkeys; i++) + { + Oid hashop = node->hashOperators[i]; + Oid left_hashfn; + Oid right_hashfn; + Expr *param_expr = (Expr *) list_nth(node->param_exprs, i); + + if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) + elog(ERROR, "could not find hash function for hash operator %u", + hashop); + + fmgr_info(left_hashfn, &rcstate->hashfunctions[i]); + + rcstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) rcstate); + eqfuncoids[i] = get_opcode(hashop); + } + + rcstate->cache_eq_expr = ExecBuildParamSetEqual(rcstate->hashkeydesc, + &TTSOpsMinimalTuple, + &TTSOpsVirtual, + eqfuncoids, + node->collations, + node->param_exprs, + (PlanState *) rcstate); + + pfree(eqfuncoids); + rcstate->mem_used = 0; + + /* Limit the total memory consumed by the cache to this */ + rcstate->mem_limit = get_hash_mem() * 1024L; + + /* A memory context dedicated for the cache */ + rcstate->tableContext = AllocSetContextCreate(CurrentMemoryContext, + "ResultCacheHashTable", + ALLOCSET_DEFAULT_SIZES); + + dlist_init(&rcstate->lru_list); + rcstate->last_tuple = NULL; + rcstate->entry = NULL; + + /* + * Mark if we can assume the cache entry is completed after we get the + * first record for it. Some callers might not call us again after + * getting the first match. e.g. A join operator performing a unique join + * is able to skip to the next outer tuple after getting the first + * matching inner tuple. In this case, the cache entry is complete after + * getting the first tuple. This allows us to mark it as so. + */ + rcstate->singlerow = node->singlerow; + + /* Zero the statistics counters */ + memset(&rcstate->stats, 0, sizeof(ResultCacheInstrumentation)); + + /* Allocate and set up the actual cache */ + build_hash_table(rcstate, node->est_entries); + + return rcstate; +} + +void +ExecEndResultCache(ResultCacheState *node) +{ + /* + * When ending a parallel worker, copy the statistics gathered by the + * worker back into shared memory so that it can be picked up by the main + * process to report in EXPLAIN ANALYZE. + */ + if (node->shared_info != NULL && IsParallelWorker()) + { + ResultCacheInstrumentation *si; + + /* Make mem_peak available for EXPLAIN */ + if (node->stats.mem_peak == 0) + node->stats.mem_peak = node->mem_used; + + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); + si = &node->shared_info->sinstrument[ParallelWorkerNumber]; + memcpy(si, &node->stats, sizeof(ResultCacheInstrumentation)); + } + + /* Remove the cache context */ + MemoryContextDelete(node->tableContext); + + ExecClearTuple(node->ss.ss_ScanTupleSlot); + /* must drop pointer to cache result tuple */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + + /* + * free exprcontext + */ + ExecFreeExprContext(&node->ss.ps); + + /* + * shut down the subplan + */ + ExecEndNode(outerPlanState(node)); +} + +void +ExecReScanResultCache(ResultCacheState *node) +{ + PlanState *outerPlan = outerPlanState(node); + + /* Mark that we must lookup the cache for a new set of parameters */ + node->rc_status = RC_CACHE_LOOKUP; + + /* nullify pointers used for the last scan */ + node->entry = NULL; + node->last_tuple = NULL; + + /* + * if chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. + */ + if (outerPlan->chgParam == NULL) + ExecReScan(outerPlan); + +} + +/* + * ExecEstimateCacheEntryOverheadBytes + * For use in the query planner to help it estimate the amount of memory + * required to store a single entry in the cache. + */ +double +ExecEstimateCacheEntryOverheadBytes(double ntuples) +{ + return sizeof(ResultCacheEntry) + sizeof(ResultCacheKey) + + sizeof(ResultCacheTuple) * ntuples; +} + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + + /* ---------------------------------------------------------------- + * ExecResultCacheEstimate + * + * Estimate space required to propagate result cache statistics. + * ---------------------------------------------------------------- + */ +void +ExecResultCacheEstimate(ResultCacheState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(ResultCacheInstrumentation)); + size = add_size(size, offsetof(SharedResultCacheInfo, sinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecResultCacheInitializeDSM + * + * Initialize DSM space for result cache statistics. + * ---------------------------------------------------------------- + */ +void +ExecResultCacheInitializeDSM(ResultCacheState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedResultCacheInfo, sinstrument) + + pcxt->nworkers * sizeof(ResultCacheInstrumentation); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecResultCacheInitializeWorker + * + * Attach worker to DSM space for result cache statistics. + * ---------------------------------------------------------------- + */ +void +ExecResultCacheInitializeWorker(ResultCacheState *node, ParallelWorkerContext *pwcxt) +{ + node->shared_info = + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); +} + +/* ---------------------------------------------------------------- + * ExecResultCacheRetrieveInstrumentation + * + * Transfer result cache statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecResultCacheRetrieveInstrumentation(ResultCacheState *node) +{ + Size size; + SharedResultCacheInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedResultCacheInfo, sinstrument) + + node->shared_info->num_workers * sizeof(ResultCacheInstrumentation); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 44c7fce20a..ad729d10a8 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -948,6 +948,33 @@ _copyMaterial(const Material *from) } +/* + * _copyResultCache + */ +static ResultCache * +_copyResultCache(const ResultCache *from) +{ + ResultCache *newnode = makeNode(ResultCache); + + /* + * copy node superclass fields + */ + CopyPlanFields((const Plan *) from, (Plan *) newnode); + + /* + * copy remainder of node + */ + COPY_SCALAR_FIELD(numKeys); + COPY_POINTER_FIELD(hashOperators, sizeof(Oid) * from->numKeys); + COPY_POINTER_FIELD(collations, sizeof(Oid) * from->numKeys); + COPY_NODE_FIELD(param_exprs); + COPY_SCALAR_FIELD(singlerow); + COPY_SCALAR_FIELD(est_entries); + + return newnode; +} + + /* * CopySortFields * @@ -2340,6 +2367,7 @@ _copyRestrictInfo(const RestrictInfo *from) COPY_SCALAR_FIELD(right_bucketsize); COPY_SCALAR_FIELD(left_mcvfreq); COPY_SCALAR_FIELD(right_mcvfreq); + COPY_SCALAR_FIELD(hasheqoperator); return newnode; } @@ -5024,6 +5052,9 @@ copyObjectImpl(const void *from) case T_Material: retval = _copyMaterial(from); break; + case T_ResultCache: + retval = _copyResultCache(from); + break; case T_Sort: retval = _copySort(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 785465d8c4..fa8f65fbc5 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -846,6 +846,21 @@ _outMaterial(StringInfo str, const Material *node) _outPlanInfo(str, (const Plan *) node); } +static void +_outResultCache(StringInfo str, const ResultCache *node) +{ + WRITE_NODE_TYPE("RESULTCACHE"); + + _outPlanInfo(str, (const Plan *) node); + + WRITE_INT_FIELD(numKeys); + WRITE_OID_ARRAY(hashOperators, node->numKeys); + WRITE_OID_ARRAY(collations, node->numKeys); + WRITE_NODE_FIELD(param_exprs); + WRITE_BOOL_FIELD(singlerow); + WRITE_UINT_FIELD(est_entries); +} + static void _outSortInfo(StringInfo str, const Sort *node) { @@ -1920,6 +1935,21 @@ _outMaterialPath(StringInfo str, const MaterialPath *node) WRITE_NODE_FIELD(subpath); } +static void +_outResultCachePath(StringInfo str, const ResultCachePath *node) +{ + WRITE_NODE_TYPE("RESULTCACHEPATH"); + + _outPathInfo(str, (const Path *) node); + + WRITE_NODE_FIELD(subpath); + WRITE_NODE_FIELD(hash_operators); + WRITE_NODE_FIELD(param_exprs); + WRITE_BOOL_FIELD(singlerow); + WRITE_FLOAT_FIELD(calls, "%.0f"); + WRITE_UINT_FIELD(est_entries); +} + static void _outUniquePath(StringInfo str, const UniquePath *node) { @@ -2521,6 +2551,7 @@ _outRestrictInfo(StringInfo str, const RestrictInfo *node) WRITE_NODE_FIELD(right_em); WRITE_BOOL_FIELD(outer_is_left); WRITE_OID_FIELD(hashjoinoperator); + WRITE_OID_FIELD(hasheqoperator); } static void @@ -3907,6 +3938,9 @@ outNode(StringInfo str, const void *obj) case T_Material: _outMaterial(str, obj); break; + case T_ResultCache: + _outResultCache(str, obj); + break; case T_Sort: _outSort(str, obj); break; @@ -4141,6 +4175,9 @@ outNode(StringInfo str, const void *obj) case T_MaterialPath: _outMaterialPath(str, obj); break; + case T_ResultCachePath: + _outResultCachePath(str, obj); + break; case T_UniquePath: _outUniquePath(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index a6e723a273..ecce23b747 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2211,6 +2211,26 @@ _readMaterial(void) READ_DONE(); } +/* + * _readResultCache + */ +static ResultCache * +_readResultCache(void) +{ + READ_LOCALS(ResultCache); + + ReadCommonPlan(&local_node->plan); + + READ_INT_FIELD(numKeys); + READ_OID_ARRAY(hashOperators, local_node->numKeys); + READ_OID_ARRAY(collations, local_node->numKeys); + READ_NODE_FIELD(param_exprs); + READ_BOOL_FIELD(singlerow); + READ_UINT_FIELD(est_entries); + + READ_DONE(); +} + /* * ReadCommonSort * Assign the basic stuff of all nodes that inherit from Sort @@ -2899,6 +2919,8 @@ parseNodeString(void) return_value = _readHashJoin(); else if (MATCH("MATERIAL", 8)) return_value = _readMaterial(); + else if (MATCH("RESULTCACHE", 11)) + return_value = _readResultCache(); else if (MATCH("SORT", 4)) return_value = _readSort(); else if (MATCH("INCREMENTALSORT", 15)) diff --git a/src/backend/optimizer/README b/src/backend/optimizer/README index 4a6c348162..4aefde8bb1 100644 --- a/src/backend/optimizer/README +++ b/src/backend/optimizer/README @@ -382,6 +382,7 @@ RelOptInfo - a relation or joined relations MergeAppendPath - merge multiple subpaths, preserving their common sort order GroupResultPath - childless Result plan node (used for degenerate grouping) MaterialPath - a Material plan node + ResultCachePath - a result cache plan node for caching tuples from sub-paths UniquePath - remove duplicate rows (either by hashing or sorting) GatherPath - collect the results of parallel workers GatherMergePath - collect parallel results, preserving their common sort order diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index f34399e3ec..edba5e49a8 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -4032,6 +4032,10 @@ print_path(PlannerInfo *root, Path *path, int indent) ptype = "Material"; subpath = ((MaterialPath *) path)->subpath; break; + case T_ResultCachePath: + ptype = "ResultCache"; + subpath = ((ResultCachePath *) path)->subpath; + break; case T_UniquePath: ptype = "Unique"; subpath = ((UniquePath *) path)->subpath; diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 0c016a03dd..05686d0194 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -79,6 +79,7 @@ #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeHash.h" +#include "executor/nodeResultCache.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -139,6 +140,7 @@ bool enable_incremental_sort = true; bool enable_hashagg = true; bool enable_nestloop = true; bool enable_material = true; +bool enable_resultcache = true; bool enable_mergejoin = true; bool enable_hashjoin = true; bool enable_gathermerge = true; @@ -2402,6 +2404,147 @@ cost_material(Path *path, path->total_cost = startup_cost + run_cost; } +/* + * cost_resultcache_rescan + * Determines the estimated cost of rescanning a ResultCache node. + * + * In order to estimate this, we must gain knowledge of how often we expect to + * be called and how many distinct sets of parameters we are likely to be + * called with. If we expect a good cache hit ratio, then we can set our + * costs to account for that hit ratio, plus a little bit of cost for the + * caching itself. Caching will not work out well if we expect to be called + * with too many distinct parameter values. The worst-case here is that we + * never see any parameter value twice, in which case we'd never get a cache + * hit and caching would be a complete waste of effort. + */ +static void +cost_resultcache_rescan(PlannerInfo *root, ResultCachePath *rcpath, + Cost *rescan_startup_cost, Cost *rescan_total_cost) +{ + EstimationInfo estinfo; + Cost input_startup_cost = rcpath->subpath->startup_cost; + Cost input_total_cost = rcpath->subpath->total_cost; + double tuples = rcpath->subpath->rows; + double calls = rcpath->calls; + int width = rcpath->subpath->pathtarget->width; + + double hash_mem_bytes; + double est_entry_bytes; + double est_cache_entries; + double ndistinct; + double evict_ratio; + double hit_ratio; + Cost startup_cost; + Cost total_cost; + + /* available cache space */ + hash_mem_bytes = get_hash_mem() * 1024L; + + /* + * Set the number of bytes each cache entry should consume in the cache. + * To provide us with better estimations on how many cache entries we can + * store at once, we make a call to the executor here to ask it what + * memory overheads there are for a single cache entry. + * + * XXX we also store the cache key, but that's not accounted for here. + */ + est_entry_bytes = relation_byte_size(tuples, width) + + ExecEstimateCacheEntryOverheadBytes(tuples); + + /* estimate on the upper limit of cache entries we can hold at once */ + est_cache_entries = floor(hash_mem_bytes / est_entry_bytes); + + /* estimate on the distinct number of parameter values */ + ndistinct = estimate_num_groups(root, rcpath->param_exprs, calls, NULL, + &estinfo); + + /* + * When the estimation fell back on using a default value, it's a bit too + * risky to assume that it's ok to use a Result Cache. The use of a + * default could cause us to use a Result Cache when it's really + * inappropriate to do so. If we see that this has been done, then we'll + * assume that every call will have unique parameters, which will almost + * certainly mean a ResultCachePath will never survive add_path(). + */ + if ((estinfo.flags & SELFLAG_USED_DEFAULT) != 0) + ndistinct = calls; + + /* + * Since we've already estimated the maximum number of entries we can + * store at once and know the estimated number of distinct values we'll be + * called with, we'll take this opportunity to set the path's est_entries. + * This will ultimately determine the hash table size that the executor + * will use. If we leave this at zero, the executor will just choose the + * size itself. Really this is not the right place to do this, but it's + * convenient since everything is already calculated. + */ + rcpath->est_entries = Min(Min(ndistinct, est_cache_entries), + PG_UINT32_MAX); + + /* + * When the number of distinct parameter values is above the amount we can + * store in the cache, then we'll have to evict some entries from the + * cache. This is not free. Here we estimate how often we'll incur the + * cost of that eviction. + */ + evict_ratio = 1.0 - Min(est_cache_entries, ndistinct) / ndistinct; + + /* + * In order to estimate how costly a single scan will be, we need to + * attempt to estimate what the cache hit ratio will be. To do that we + * must look at how many scans are estimated in total for this node and + * how many of those scans we expect to get a cache hit. + */ + hit_ratio = 1.0 / ndistinct * Min(est_cache_entries, ndistinct) - + (ndistinct / calls); + + /* Ensure we don't go negative */ + hit_ratio = Max(hit_ratio, 0.0); + + /* + * Set the total_cost accounting for the expected cache hit ratio. We + * also add on a cpu_operator_cost to account for a cache lookup. This + * will happen regardless of whether it's a cache hit or not. + */ + total_cost = input_total_cost * (1.0 - hit_ratio) + cpu_operator_cost; + + /* Now adjust the total cost to account for cache evictions */ + + /* Charge a cpu_tuple_cost for evicting the actual cache entry */ + total_cost += cpu_tuple_cost * evict_ratio; + + /* + * Charge a 10th of cpu_operator_cost to evict every tuple in that entry. + * The per-tuple eviction is really just a pfree, so charging a whole + * cpu_operator_cost seems a little excessive. + */ + total_cost += cpu_operator_cost / 10.0 * evict_ratio * tuples; + + /* + * Now adjust for storing things in the cache, since that's not free + * either. Everything must go in the cache. We don't proportion this + * over any ratio, just apply it once for the scan. We charge a + * cpu_tuple_cost for the creation of the cache entry and also a + * cpu_operator_cost for each tuple we expect to cache. + */ + total_cost += cpu_tuple_cost + cpu_operator_cost * tuples; + + /* + * Getting the first row must be also be proportioned according to the + * expected cache hit ratio. + */ + startup_cost = input_startup_cost * (1.0 - hit_ratio); + + /* + * Additionally we charge a cpu_tuple_cost to account for cache lookups, + * which we'll do regardless of whether it was a cache hit or not. + */ + startup_cost += cpu_tuple_cost; + + *rescan_startup_cost = startup_cost; + *rescan_total_cost = total_cost; +} + /* * cost_agg * Determines and returns the cost of performing an Agg plan node, @@ -4142,6 +4285,11 @@ cost_rescan(PlannerInfo *root, Path *path, *rescan_total_cost = run_cost; } break; + case T_ResultCache: + /* All the hard work is done by cost_resultcache_rescan */ + cost_resultcache_rescan(root, (ResultCachePath *) path, + rescan_startup_cost, rescan_total_cost); + break; default: *rescan_startup_cost = path->startup_cost; *rescan_total_cost = path->total_cost; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index 57ce97fd53..e9b6968b1d 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -18,10 +18,13 @@ #include "executor/executor.h" #include "foreign/fdwapi.h" +#include "nodes/nodeFuncs.h" #include "optimizer/cost.h" +#include "optimizer/optimizer.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" +#include "utils/typcache.h" /* Hook for plugins to get control in add_paths_to_joinrel() */ set_join_pathlist_hook_type set_join_pathlist_hook = NULL; @@ -52,6 +55,9 @@ static void try_partial_mergejoin_path(PlannerInfo *root, static void sort_inner_and_outer(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra); +static inline bool clause_sides_match_join(RestrictInfo *rinfo, + RelOptInfo *outerrel, + RelOptInfo *innerrel); static void match_unsorted_outer(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra); @@ -163,6 +169,11 @@ add_paths_to_joinrel(PlannerInfo *root, { case JOIN_SEMI: case JOIN_ANTI: + + /* + * XXX it may be worth proving this to allow a ResultCache to be + * considered for Nested Loop Semi/Anti Joins. + */ extra.inner_unique = false; /* well, unproven */ break; case JOIN_UNIQUE_INNER: @@ -354,6 +365,180 @@ allow_star_schema_join(PlannerInfo *root, bms_nonempty_difference(inner_paramrels, outerrelids)); } +/* + * paraminfo_get_equal_hashops + * Determine if param_info and innerrel's lateral_vars can be hashed. + * Returns true the hashing is possible, otherwise return false. + * + * Additionally we also collect the outer exprs and the hash operators for + * each parameter to innerrel. These set in 'param_exprs' and 'operators' + * when we return true. + */ +static bool +paraminfo_get_equal_hashops(PlannerInfo *root, ParamPathInfo *param_info, + RelOptInfo *outerrel, RelOptInfo *innerrel, + List **param_exprs, List **operators) + +{ + ListCell *lc; + + *param_exprs = NIL; + *operators = NIL; + + if (param_info != NULL) + { + List *clauses = param_info->ppi_clauses; + + foreach(lc, clauses) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + OpExpr *opexpr; + Node *expr; + + /* can't use result cache without a valid hash equals operator */ + if (!OidIsValid(rinfo->hasheqoperator) || + !clause_sides_match_join(rinfo, outerrel, innerrel)) + { + list_free(*operators); + list_free(*param_exprs); + return false; + } + + /* + * We already checked that this is an OpExpr with 2 args when + * setting hasheqoperator. + */ + opexpr = (OpExpr *) rinfo->clause; + if (rinfo->outer_is_left) + expr = (Node *) linitial(opexpr->args); + else + expr = (Node *) lsecond(opexpr->args); + + *operators = lappend_oid(*operators, rinfo->hasheqoperator); + *param_exprs = lappend(*param_exprs, expr); + } + } + + /* Now add any lateral vars to the cache key too */ + foreach(lc, innerrel->lateral_vars) + { + Node *expr = (Node *) lfirst(lc); + TypeCacheEntry *typentry; + + /* Reject if there are any volatile functions */ + if (contain_volatile_functions(expr)) + { + list_free(*operators); + list_free(*param_exprs); + return false; + } + + typentry = lookup_type_cache(exprType(expr), + TYPECACHE_HASH_PROC | TYPECACHE_EQ_OPR); + + /* can't use result cache without a valid hash equals operator */ + if (!OidIsValid(typentry->hash_proc) || !OidIsValid(typentry->eq_opr)) + { + list_free(*operators); + list_free(*param_exprs); + return false; + } + + *operators = lappend_oid(*operators, typentry->eq_opr); + *param_exprs = lappend(*param_exprs, expr); + } + + /* We're okay to use result cache */ + return true; +} + +/* + * get_resultcache_path + * If possible, make and return a Result Cache path atop of 'inner_path'. + * Otherwise return NULL. + */ +static Path * +get_resultcache_path(PlannerInfo *root, RelOptInfo *innerrel, + RelOptInfo *outerrel, Path *inner_path, + Path *outer_path, JoinType jointype, + JoinPathExtraData *extra) +{ + List *param_exprs; + List *hash_operators; + ListCell *lc; + + /* Obviously not if it's disabled */ + if (!enable_resultcache) + return NULL; + + /* + * We can safely not bother with all this unless we expect to perform more + * than one inner scan. The first scan is always going to be a cache + * miss. This would likely fail later anyway based on costs, so this is + * really just to save some wasted effort. + */ + if (outer_path->parent->rows < 2) + return NULL; + + /* + * We can only have a result cache when there's some kind of cache key, + * either parameterized path clauses or lateral Vars. No cache key sounds + * more like something a Materialize node might be more useful for. + */ + if ((inner_path->param_info == NULL || + inner_path->param_info->ppi_clauses == NIL) && + innerrel->lateral_vars == NIL) + return NULL; + + /* + * Currently we don't do this for SEMI and ANTI joins unless they're + * marked as inner_unique. This is because nested loop SEMI/ANTI joins + * don't scan the inner node to completion, which will mean result cache + * cannot mark the cache entry as complete. + * + * XXX Currently we don't attempt to mark SEMI/ANTI joins as inner_unique + * = true. Should we? See add_paths_to_joinrel() + */ + if (!extra->inner_unique && (jointype == JOIN_SEMI || + jointype == JOIN_ANTI)) + return NULL; + + /* + * We can't use a result cache if there are volatile functions in the + * inner rel's target list or restrict list. A cache hit could reduce the + * number of calls to these functions. + */ + if (contain_volatile_functions((Node *) innerrel->reltarget)) + return NULL; + + foreach(lc, innerrel->baserestrictinfo) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + if (contain_volatile_functions((Node *) rinfo)) + return NULL; + } + + /* Check if we have hash ops for each parameter to the path */ + if (paraminfo_get_equal_hashops(root, + inner_path->param_info, + outerrel, + innerrel, + ¶m_exprs, + &hash_operators)) + { + return (Path *) create_resultcache_path(root, + innerrel, + inner_path, + param_exprs, + hash_operators, + extra->inner_unique, + outer_path->parent->rows); + } + + return NULL; +} + /* * try_nestloop_path * Consider a nestloop join path; if it appears useful, push it into @@ -1471,6 +1656,7 @@ match_unsorted_outer(PlannerInfo *root, foreach(lc2, innerrel->cheapest_parameterized_paths) { Path *innerpath = (Path *) lfirst(lc2); + Path *rcpath; try_nestloop_path(root, joinrel, @@ -1479,6 +1665,22 @@ match_unsorted_outer(PlannerInfo *root, merge_pathkeys, jointype, extra); + + /* + * Try generating a result cache path and see if that makes + * the nested loop any cheaper. + */ + rcpath = get_resultcache_path(root, innerrel, outerrel, + innerpath, outerpath, jointype, + extra); + if (rcpath != NULL) + try_nestloop_path(root, + joinrel, + outerpath, + rcpath, + merge_pathkeys, + jointype, + extra); } /* Also consider materialized form of the cheapest inner path */ @@ -1633,6 +1835,7 @@ consider_parallel_nestloop(PlannerInfo *root, foreach(lc2, innerrel->cheapest_parameterized_paths) { Path *innerpath = (Path *) lfirst(lc2); + Path *rcpath; /* Can't join to an inner path that is not parallel-safe */ if (!innerpath->parallel_safe) @@ -1657,6 +1860,17 @@ consider_parallel_nestloop(PlannerInfo *root, try_partial_nestloop_path(root, joinrel, outerpath, innerpath, pathkeys, jointype, extra); + + /* + * Try generating a result cache path and see if that makes the + * nested loop any cheaper. + */ + rcpath = get_resultcache_path(root, innerrel, outerrel, + innerpath, outerpath, jointype, + extra); + if (rcpath != NULL) + try_partial_nestloop_path(root, joinrel, outerpath, rcpath, + pathkeys, jointype, extra); } } } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index a56936e0e9..22f10fa339 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -91,6 +91,9 @@ static Result *create_group_result_plan(PlannerInfo *root, static ProjectSet *create_project_set_plan(PlannerInfo *root, ProjectSetPath *best_path); static Material *create_material_plan(PlannerInfo *root, MaterialPath *best_path, int flags); +static ResultCache *create_resultcache_plan(PlannerInfo *root, + ResultCachePath *best_path, + int flags); static Plan *create_unique_plan(PlannerInfo *root, UniquePath *best_path, int flags); static Gather *create_gather_plan(PlannerInfo *root, GatherPath *best_path); @@ -277,6 +280,11 @@ static Sort *make_sort_from_groupcols(List *groupcls, AttrNumber *grpColIdx, Plan *lefttree); static Material *make_material(Plan *lefttree); +static ResultCache *make_resultcache(Plan *lefttree, Oid *hashoperators, + Oid *collations, + List *param_exprs, + bool singlerow, + uint32 est_entries); static WindowAgg *make_windowagg(List *tlist, Index winref, int partNumCols, AttrNumber *partColIdx, Oid *partOperators, Oid *partCollations, int ordNumCols, AttrNumber *ordColIdx, Oid *ordOperators, Oid *ordCollations, @@ -453,6 +461,11 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) (MaterialPath *) best_path, flags); break; + case T_ResultCache: + plan = (Plan *) create_resultcache_plan(root, + (ResultCachePath *) best_path, + flags); + break; case T_Unique: if (IsA(best_path, UpperUniquePath)) { @@ -1566,6 +1579,56 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path, int flags) return plan; } +/* + * create_resultcache_plan + * Create a ResultCache plan for 'best_path' and (recursively) plans + * for its subpaths. + * + * Returns a Plan node. + */ +static ResultCache * +create_resultcache_plan(PlannerInfo *root, ResultCachePath *best_path, int flags) +{ + ResultCache *plan; + Plan *subplan; + Oid *operators; + Oid *collations; + List *param_exprs = NIL; + ListCell *lc; + ListCell *lc2; + int nkeys; + int i; + + subplan = create_plan_recurse(root, best_path->subpath, + flags | CP_SMALL_TLIST); + + param_exprs = (List *) replace_nestloop_params(root, (Node *) + best_path->param_exprs); + + nkeys = list_length(param_exprs); + Assert(nkeys > 0); + operators = palloc(nkeys * sizeof(Oid)); + collations = palloc(nkeys * sizeof(Oid)); + + i = 0; + forboth(lc, param_exprs, lc2, best_path->hash_operators) + { + Expr *param_expr = (Expr *) lfirst(lc); + Oid opno = lfirst_oid(lc2); + + operators[i] = opno; + collations[i] = exprCollation((Node *) param_expr); + i++; + } + + plan = make_resultcache(subplan, operators, collations, param_exprs, + best_path->singlerow, best_path->est_entries); + + copy_generic_path_info(&plan->plan, (Path *) best_path); + + return plan; +} + /* * create_unique_plan * Create a Unique plan for 'best_path' and (recursively) plans @@ -6452,6 +6515,28 @@ materialize_finished_plan(Plan *subplan) return matplan; } +static ResultCache * +make_resultcache(Plan *lefttree, Oid *hashoperators, Oid *collations, + List *param_exprs, bool singlerow, uint32 est_entries) +{ + ResultCache *node = makeNode(ResultCache); + Plan *plan = &node->plan; + + plan->targetlist = lefttree->targetlist; + plan->qual = NIL; + plan->lefttree = lefttree; + plan->righttree = NULL; + + node->numKeys = list_length(param_exprs); + node->hashOperators = hashoperators; + node->collations = collations; + node->param_exprs = param_exprs; + node->singlerow = singlerow; + node->est_entries = est_entries; + + return node; +} + Agg * make_agg(List *tlist, List *qual, AggStrategy aggstrategy, AggSplit aggsplit, @@ -7038,6 +7123,7 @@ is_projection_capable_path(Path *path) { case T_Hash: case T_Material: + case T_ResultCache: case T_Sort: case T_IncrementalSort: case T_Unique: @@ -7083,6 +7169,7 @@ is_projection_capable_plan(Plan *plan) { case T_Hash: case T_Material: + case T_ResultCache: case T_Sort: case T_Unique: case T_SetOp: diff --git a/src/backend/optimizer/plan/initsplan.c b/src/backend/optimizer/plan/initsplan.c index 20df2152ea..3ac853d9ef 100644 --- a/src/backend/optimizer/plan/initsplan.c +++ b/src/backend/optimizer/plan/initsplan.c @@ -33,6 +33,7 @@ #include "parser/analyze.h" #include "rewrite/rewriteManip.h" #include "utils/lsyscache.h" +#include "utils/typcache.h" /* These parameters are set by GUC */ int from_collapse_limit; @@ -77,6 +78,7 @@ static bool check_equivalence_delay(PlannerInfo *root, static bool check_redundant_nullability_qual(PlannerInfo *root, Node *clause); static void check_mergejoinable(RestrictInfo *restrictinfo); static void check_hashjoinable(RestrictInfo *restrictinfo); +static void check_resultcacheable(RestrictInfo *restrictinfo); /***************************************************************************** @@ -2208,6 +2210,13 @@ distribute_restrictinfo_to_rels(PlannerInfo *root, */ check_hashjoinable(restrictinfo); + /* + * Likewise, check if the clause is suitable to be used with a + * Result Cache node to cache inner tuples during a parameterized + * nested loop. + */ + check_resultcacheable(restrictinfo); + /* * Add clause to the join lists of all the relevant relations. */ @@ -2450,6 +2459,7 @@ build_implied_join_equality(PlannerInfo *root, /* Set mergejoinability/hashjoinability flags */ check_mergejoinable(restrictinfo); check_hashjoinable(restrictinfo); + check_resultcacheable(restrictinfo); return restrictinfo; } @@ -2697,3 +2707,34 @@ check_hashjoinable(RestrictInfo *restrictinfo) !contain_volatile_functions((Node *) restrictinfo)) restrictinfo->hashjoinoperator = opno; } + +/* + * check_resultcacheable + * If the restrictinfo's clause is suitable to be used for a Result Cache + * node, set the hasheqoperator to the hash equality operator that will be + * needed during caching. + */ +static void +check_resultcacheable(RestrictInfo *restrictinfo) +{ + TypeCacheEntry *typentry; + Expr *clause = restrictinfo->clause; + Node *leftarg; + + if (restrictinfo->pseudoconstant) + return; + if (!is_opclause(clause)) + return; + if (list_length(((OpExpr *) clause)->args) != 2) + return; + + leftarg = linitial(((OpExpr *) clause)->args); + + typentry = lookup_type_cache(exprType(leftarg), TYPECACHE_HASH_PROC | + TYPECACHE_EQ_OPR); + + if (!OidIsValid(typentry->hash_proc) || !OidIsValid(typentry->eq_opr)) + return; + + restrictinfo->hasheqoperator = typentry->eq_opr; +} diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 4a25431bec..70c0fa07e6 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -752,6 +752,16 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) set_hash_references(root, plan, rtoffset); break; + case T_ResultCache: + { + ResultCache *rcplan = (ResultCache *) plan; + + rcplan->param_exprs = fix_scan_list(root, rcplan->param_exprs, + rtoffset, + NUM_EXEC_TLIST(plan)); + break; + } + case T_Material: case T_Sort: case T_IncrementalSort: diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 15b9453975..0881a208ac 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2745,6 +2745,11 @@ finalize_plan(PlannerInfo *root, Plan *plan, /* rescan_param does *not* get added to scan_params */ break; + case T_ResultCache: + finalize_primnode((Node *) ((ResultCache *) plan)->param_exprs, + &context); + break; + case T_ProjectSet: case T_Hash: case T_Material: diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 1c47a2fb49..b248b038e0 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1576,6 +1576,56 @@ create_material_path(RelOptInfo *rel, Path *subpath) return pathnode; } +/* + * create_resultcache_path + * Creates a path corresponding to a ResultCache plan, returning the + * pathnode. + */ +ResultCachePath * +create_resultcache_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, + List *param_exprs, List *hash_operators, + bool singlerow, double calls) +{ + ResultCachePath *pathnode = makeNode(ResultCachePath); + + Assert(subpath->parent == rel); + + pathnode->path.pathtype = T_ResultCache; + pathnode->path.parent = rel; + pathnode->path.pathtarget = rel->reltarget; + pathnode->path.param_info = subpath->param_info; + pathnode->path.parallel_aware = false; + pathnode->path.parallel_safe = rel->consider_parallel && + subpath->parallel_safe; + pathnode->path.parallel_workers = subpath->parallel_workers; + pathnode->path.pathkeys = subpath->pathkeys; + + pathnode->subpath = subpath; + pathnode->hash_operators = hash_operators; + pathnode->param_exprs = param_exprs; + pathnode->singlerow = singlerow; + pathnode->calls = calls; + + /* + * For now we set est_entries to 0. cost_resultcache_rescan() does all + * the hard work to determine how many cache entries there are likely to + * be, so it seems best to leave it up to that function to fill this field + * in. If left at 0, the executor will make a guess at a good value. + */ + pathnode->est_entries = 0; + + /* + * Add a small additional charge for caching the first entry. All the + * harder calculations for rescans are performed in + * cost_resultcache_rescan(). + */ + pathnode->path.startup_cost = subpath->startup_cost + cpu_tuple_cost; + pathnode->path.total_cost = subpath->total_cost + cpu_tuple_cost; + pathnode->path.rows = subpath->rows; + + return pathnode; +} + /* * create_unique_path * Creates a path representing elimination of distinct rows from the @@ -3869,6 +3919,17 @@ reparameterize_path(PlannerInfo *root, Path *path, apath->path.parallel_aware, -1); } + case T_ResultCache: + { + ResultCachePath *rcpath = (ResultCachePath *) path; + + return (Path *) create_resultcache_path(root, rel, + rcpath->subpath, + rcpath->param_exprs, + rcpath->hash_operators, + rcpath->singlerow, + rcpath->calls); + } default: break; } @@ -4087,6 +4148,16 @@ do { \ } break; + case T_ResultCachePath: + { + ResultCachePath *rcpath; + + FLAT_COPY_PATH(rcpath, path, ResultCachePath); + REPARAMETERIZE_CHILD_PATH(rcpath->subpath); + new_path = (Path *) rcpath; + } + break; + case T_GatherPath: { GatherPath *gpath; diff --git a/src/backend/optimizer/util/restrictinfo.c b/src/backend/optimizer/util/restrictinfo.c index 59ff35926e..aa9fb3a9fa 100644 --- a/src/backend/optimizer/util/restrictinfo.c +++ b/src/backend/optimizer/util/restrictinfo.c @@ -217,6 +217,8 @@ make_restrictinfo_internal(PlannerInfo *root, restrictinfo->left_mcvfreq = -1; restrictinfo->right_mcvfreq = -1; + restrictinfo->hasheqoperator = InvalidOid; + return restrictinfo; } @@ -366,6 +368,7 @@ commute_restrictinfo(RestrictInfo *rinfo, Oid comm_op) result->right_bucketsize = rinfo->left_bucketsize; result->left_mcvfreq = rinfo->right_mcvfreq; result->right_mcvfreq = rinfo->left_mcvfreq; + result->hasheqoperator = InvalidOid; return result; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 130374789e..584daffc8a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1036,6 +1036,16 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_resultcache", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of result caching."), + NULL, + GUC_EXPLAIN + }, + &enable_resultcache, + true, + NULL, NULL, NULL + }, { {"enable_nestloop", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of nested-loop join plans."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 791d39cf07..30cfddac1f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -366,6 +366,7 @@ #enable_seqscan = on #enable_sort = on #enable_incremental_sort = on +#enable_resultcache = on #enable_tidscan = on #enable_partitionwise_join = off #enable_partitionwise_aggregate = off diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 34dd861eff..26dcc4485e 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -275,6 +275,13 @@ extern ExprState *ExecBuildGroupingEqual(TupleDesc ldesc, TupleDesc rdesc, const Oid *eqfunctions, const Oid *collations, PlanState *parent); +extern ExprState *ExecBuildParamSetEqual(TupleDesc desc, + const TupleTableSlotOps *lops, + const TupleTableSlotOps *rops, + const Oid *eqfunctions, + const Oid *collations, + const List *param_exprs, + PlanState *parent); extern ProjectionInfo *ExecBuildProjectionInfo(List *targetList, ExprContext *econtext, TupleTableSlot *slot, diff --git a/src/include/executor/nodeResultCache.h b/src/include/executor/nodeResultCache.h new file mode 100644 index 0000000000..df671d16f9 --- /dev/null +++ b/src/include/executor/nodeResultCache.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * nodeResultCache.h + * + * + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeResultCache.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODERESULTCACHE_H +#define NODERESULTCACHE_H + +#include "nodes/execnodes.h" + +extern ResultCacheState *ExecInitResultCache(ResultCache *node, EState *estate, int eflags); +extern void ExecEndResultCache(ResultCacheState *node); +extern void ExecReScanResultCache(ResultCacheState *node); +extern double ExecEstimateCacheEntryOverheadBytes(double ntuples); +extern void ExecResultCacheEstimate(ResultCacheState *node, + ParallelContext *pcxt); +extern void ExecResultCacheInitializeDSM(ResultCacheState *node, + ParallelContext *pcxt); +extern void ExecResultCacheInitializeWorker(ResultCacheState *node, + ParallelWorkerContext *pwcxt); +extern void ExecResultCacheRetrieveInstrumentation(ResultCacheState *node); + +#endif /* NODERESULTCACHE_H */ diff --git a/src/include/lib/ilist.h b/src/include/lib/ilist.h index aa196428ed..ddbdb207af 100644 --- a/src/include/lib/ilist.h +++ b/src/include/lib/ilist.h @@ -394,6 +394,25 @@ dlist_move_head(dlist_head *head, dlist_node *node) dlist_check(head); } +/* + * Move element from its current position in the list to the tail position in + * the same list. + * + * Undefined behaviour if 'node' is not already part of the list. + */ +static inline void +dlist_move_tail(dlist_head *head, dlist_node *node) +{ + /* fast path if it's already at the tail */ + if (head->head.prev == node) + return; + + dlist_delete(node); + dlist_push_tail(head, node); + + dlist_check(head); +} + /* * Check whether 'node' has a following node. * Caution: unreliable if 'node' is not in the list. diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3b39369a49..52d1fa018b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -17,6 +17,7 @@ #include "access/tupconvert.h" #include "executor/instrument.h" #include "fmgr.h" +#include "lib/ilist.h" #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" @@ -2037,6 +2038,71 @@ typedef struct MaterialState Tuplestorestate *tuplestorestate; } MaterialState; +struct ResultCacheEntry; +struct ResultCacheTuple; +struct ResultCacheKey; + +typedef struct ResultCacheInstrumentation +{ + uint64 cache_hits; /* number of rescans where we've found the + * scan parameter values to be cached */ + uint64 cache_misses; /* number of rescans where we've not found the + * scan parameter values to be cached. */ + uint64 cache_evictions; /* number of cache entries removed due to + * the need to free memory */ + uint64 cache_overflows; /* number of times we've had to bypass the + * cache when filling it due to not being + * able to free enough space to store the + * current scan's tuples. */ + uint64 mem_peak; /* peak memory usage in bytes */ +} ResultCacheInstrumentation; + +/* ---------------- + * Shared memory container for per-worker resultcache information + * ---------------- + */ +typedef struct SharedResultCacheInfo +{ + int num_workers; + ResultCacheInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]; +} SharedResultCacheInfo; + +/* ---------------- + * ResultCacheState information + * + * resultcache nodes are used to cache recent and commonly seen results + * from a parameterized scan. + * ---------------- + */ +typedef struct ResultCacheState +{ + ScanState ss; /* its first field is NodeTag */ + int rc_status; /* value of ExecResultCache state machine */ + int nkeys; /* number of cache keys */ + struct resultcache_hash *hashtable; /* hash table for cache entries */ + TupleDesc hashkeydesc; /* tuple descriptor for cache keys */ + TupleTableSlot *tableslot; /* min tuple slot for existing cache entries */ + TupleTableSlot *probeslot; /* virtual slot used for hash lookups */ + ExprState *cache_eq_expr; /* Compare exec params to hash key */ + ExprState **param_exprs; /* exprs containing the parameters to this + * node */ + FmgrInfo *hashfunctions; /* lookup data for hash funcs nkeys in size */ + Oid *collations; /* collation for comparisons nkeys in size */ + uint64 mem_used; /* bytes of memory used by cache */ + uint64 mem_limit; /* memory limit in bytes for the cache */ + MemoryContext tableContext; /* memory context to store cache data */ + dlist_head lru_list; /* least recently used entry list */ + struct ResultCacheTuple *last_tuple; /* Used to point to the last tuple + * returned during a cache hit and + * the tuple we last stored when + * populating the cache. */ + struct ResultCacheEntry *entry; /* the entry that 'last_tuple' belongs to + * or NULL if 'last_tuple' is NULL. */ + bool singlerow; /* true if the cache entry is to be marked as + * complete after caching the first tuple. */ + ResultCacheInstrumentation stats; /* execution statistics */ + SharedResultCacheInfo *shared_info; /* statistics for parallel workers */ +} ResultCacheState; /* ---------------- * When performing sorting by multiple keys, it's possible that the input diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 704f00fd30..2051abbbf9 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -74,6 +74,7 @@ typedef enum NodeTag T_MergeJoin, T_HashJoin, T_Material, + T_ResultCache, T_Sort, T_IncrementalSort, T_Group, @@ -132,6 +133,7 @@ typedef enum NodeTag T_MergeJoinState, T_HashJoinState, T_MaterialState, + T_ResultCacheState, T_SortState, T_IncrementalSortState, T_GroupState, @@ -242,6 +244,7 @@ typedef enum NodeTag T_MergeAppendPath, T_GroupResultPath, T_MaterialPath, + T_ResultCachePath, T_UniquePath, T_GatherPath, T_GatherMergePath, diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index e4e1c15986..a65bda7e3c 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -1494,6 +1494,25 @@ typedef struct MaterialPath Path *subpath; } MaterialPath; +/* + * ResultCachePath represents a ResultCache plan node, i.e., a cache that + * caches tuples from parameterized paths to save the underlying node from + * having to be rescanned for parameter values which are already cached. + */ +typedef struct ResultCachePath +{ + Path path; + Path *subpath; /* outerpath to cache tuples from */ + List *hash_operators; /* hash operators for each key */ + List *param_exprs; /* cache keys */ + bool singlerow; /* true if the cache entry is to be marked as + * complete after caching the first record. */ + double calls; /* expected number of rescans */ + uint32 est_entries; /* The maximum number of entries that the + * planner expects will fit in the cache, or 0 + * if unknown */ +} ResultCachePath; + /* * UniquePath represents elimination of distinct rows from the output of * its subpath. @@ -2091,6 +2110,9 @@ typedef struct RestrictInfo Selectivity right_bucketsize; /* avg bucketsize of right side */ Selectivity left_mcvfreq; /* left side's most common val's freq */ Selectivity right_mcvfreq; /* right side's most common val's freq */ + + /* hash equality operator used for result cache, else InvalidOid */ + Oid hasheqoperator; } RestrictInfo; /* diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 623dc450ee..1678bd66fe 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -779,6 +779,27 @@ typedef struct Material Plan plan; } Material; +/* ---------------- + * result cache node + * ---------------- + */ +typedef struct ResultCache +{ + Plan plan; + + int numKeys; /* size of the two arrays below */ + + Oid *hashOperators; /* hash operators for each key */ + Oid *collations; /* cache keys */ + List *param_exprs; /* exprs containing parameters */ + bool singlerow; /* true if the cache entry should be marked as + * complete after we store the first tuple in + * it. */ + uint32 est_entries; /* The maximum number of entries that the + * planner expects will fit in the cache, or 0 + * if unknown */ +} ResultCache; + /* ---------------- * sort node * ---------------- diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index a3fd93fe07..0fe60d82e4 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -57,6 +57,7 @@ extern PGDLLIMPORT bool enable_incremental_sort; extern PGDLLIMPORT bool enable_hashagg; extern PGDLLIMPORT bool enable_nestloop; extern PGDLLIMPORT bool enable_material; +extern PGDLLIMPORT bool enable_resultcache; extern PGDLLIMPORT bool enable_mergejoin; extern PGDLLIMPORT bool enable_hashjoin; extern PGDLLIMPORT bool enable_gathermerge; diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index d539bc2783..53261ee91f 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -82,6 +82,13 @@ extern GroupResultPath *create_group_result_path(PlannerInfo *root, PathTarget *target, List *havingqual); extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath); +extern ResultCachePath *create_resultcache_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + List *param_exprs, + List *hash_operators, + bool singlerow, + double calls); extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, SpecialJoinInfo *sjinfo); extern GatherPath *create_gather_path(PlannerInfo *root, diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index 1ae0e5d939..ca06d41dd0 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -2584,6 +2584,7 @@ select v||'a', case when v||'a' = 'aa' then 1 else 0 end, count(*) -- Make sure that generation of HashAggregate for uniqification purposes -- does not lead to array overflow due to unexpected duplicate hash keys -- see CAFeeJoKKu0u+A_A9R9316djW-YW3-+Gtgvy3ju655qRHR3jtdA@mail.gmail.com +set enable_resultcache to off; explain (costs off) select 1 from tenk1 where (hundred, thousand) in (select twothousand, twothousand from onek); @@ -2599,6 +2600,7 @@ explain (costs off) -> Seq Scan on onek (8 rows) +reset enable_resultcache; -- -- Hash Aggregation Spill tests -- diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index 04e802d421..86fd3907c5 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -2536,6 +2536,7 @@ reset enable_nestloop; -- set work_mem to '64kB'; set enable_mergejoin to off; +set enable_resultcache to off; explain (costs off) select count(*) from tenk1 a, tenk1 b where a.hundred = b.thousand and (b.fivethous % 10) < 10; @@ -2559,6 +2560,7 @@ select count(*) from tenk1 a, tenk1 b reset work_mem; reset enable_mergejoin; +reset enable_resultcache; -- -- regression test for 8.2 bug with improper re-ordering of left joins -- @@ -3663,8 +3665,8 @@ select * from tenk1 t1 left join (tenk1 t2 join tenk1 t3 on t2.thousand = t3.unique2) on t1.hundred = t2.hundred and t1.ten = t3.ten where t1.unique1 = 1; - QUERY PLAN --------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------- Nested Loop Left Join -> Index Scan using tenk1_unique1 on tenk1 t1 Index Cond: (unique1 = 1) @@ -3674,17 +3676,19 @@ where t1.unique1 = 1; Recheck Cond: (t1.hundred = hundred) -> Bitmap Index Scan on tenk1_hundred Index Cond: (hundred = t1.hundred) - -> Index Scan using tenk1_unique2 on tenk1 t3 - Index Cond: (unique2 = t2.thousand) -(11 rows) + -> Result Cache + Cache Key: t2.thousand + -> Index Scan using tenk1_unique2 on tenk1 t3 + Index Cond: (unique2 = t2.thousand) +(13 rows) explain (costs off) select * from tenk1 t1 left join (tenk1 t2 join tenk1 t3 on t2.thousand = t3.unique2) on t1.hundred = t2.hundred and t1.ten + t2.ten = t3.ten where t1.unique1 = 1; - QUERY PLAN --------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------- Nested Loop Left Join -> Index Scan using tenk1_unique1 on tenk1 t1 Index Cond: (unique1 = 1) @@ -3694,9 +3698,11 @@ where t1.unique1 = 1; Recheck Cond: (t1.hundred = hundred) -> Bitmap Index Scan on tenk1_hundred Index Cond: (hundred = t1.hundred) - -> Index Scan using tenk1_unique2 on tenk1 t3 - Index Cond: (unique2 = t2.thousand) -(11 rows) + -> Result Cache + Cache Key: t2.thousand + -> Index Scan using tenk1_unique2 on tenk1 t3 + Index Cond: (unique2 = t2.thousand) +(13 rows) explain (costs off) select count(*) from @@ -4210,8 +4216,8 @@ where t1.f1 = ss.f1; QUERY PLAN -------------------------------------------------- Nested Loop - Output: t1.f1, i8.q1, i8.q2, (i8.q1), t2.f1 - Join Filter: (t1.f1 = t2.f1) + Output: t1.f1, i8.q1, i8.q2, q1, f1 + Join Filter: (t1.f1 = f1) -> Nested Loop Left Join Output: t1.f1, i8.q1, i8.q2 -> Seq Scan on public.text_tbl t1 @@ -4221,11 +4227,14 @@ where t1.f1 = ss.f1; -> Seq Scan on public.int8_tbl i8 Output: i8.q1, i8.q2 Filter: (i8.q2 = 123) - -> Limit - Output: (i8.q1), t2.f1 - -> Seq Scan on public.text_tbl t2 - Output: i8.q1, t2.f1 -(16 rows) + -> Result Cache + Output: q1, f1 + Cache Key: i8.q1 + -> Limit + Output: (i8.q1), t2.f1 + -> Seq Scan on public.text_tbl t2 + Output: i8.q1, t2.f1 +(19 rows) select * from text_tbl t1 @@ -4246,13 +4255,13 @@ select * from lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1, lateral (select ss1.* from text_tbl t3 limit 1) as ss2 where t1.f1 = ss2.f1; - QUERY PLAN -------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------- Nested Loop - Output: t1.f1, i8.q1, i8.q2, (i8.q1), t2.f1, ((i8.q1)), (t2.f1) - Join Filter: (t1.f1 = (t2.f1)) + Output: t1.f1, i8.q1, i8.q2, q1, f1, q1, f1 + Join Filter: (t1.f1 = f1) -> Nested Loop - Output: t1.f1, i8.q1, i8.q2, (i8.q1), t2.f1 + Output: t1.f1, i8.q1, i8.q2, q1, f1 -> Nested Loop Left Join Output: t1.f1, i8.q1, i8.q2 -> Seq Scan on public.text_tbl t1 @@ -4262,15 +4271,21 @@ where t1.f1 = ss2.f1; -> Seq Scan on public.int8_tbl i8 Output: i8.q1, i8.q2 Filter: (i8.q2 = 123) + -> Result Cache + Output: q1, f1 + Cache Key: i8.q1 + -> Limit + Output: (i8.q1), t2.f1 + -> Seq Scan on public.text_tbl t2 + Output: i8.q1, t2.f1 + -> Result Cache + Output: q1, f1 + Cache Key: q1, f1 -> Limit - Output: (i8.q1), t2.f1 - -> Seq Scan on public.text_tbl t2 - Output: i8.q1, t2.f1 - -> Limit - Output: ((i8.q1)), (t2.f1) - -> Seq Scan on public.text_tbl t3 - Output: (i8.q1), t2.f1 -(22 rows) + Output: (q1), (f1) + -> Seq Scan on public.text_tbl t3 + Output: q1, f1 +(28 rows) select * from text_tbl t1 @@ -4316,14 +4331,17 @@ where tt1.f1 = ss1.c0; -> Seq Scan on public.text_tbl tt4 Output: tt4.f1 Filter: (tt4.f1 = 'foo'::text) - -> Subquery Scan on ss1 + -> Result Cache Output: ss1.c0 - Filter: (ss1.c0 = 'foo'::text) - -> Limit - Output: (tt4.f1) - -> Seq Scan on public.text_tbl tt5 - Output: tt4.f1 -(29 rows) + Cache Key: tt4.f1 + -> Subquery Scan on ss1 + Output: ss1.c0 + Filter: (ss1.c0 = 'foo'::text) + -> Limit + Output: (tt4.f1) + -> Seq Scan on public.text_tbl tt5 + Output: tt4.f1 +(32 rows) select 1 from text_tbl as tt1 @@ -4997,34 +5015,40 @@ select count(*) from tenk1 a, lateral generate_series(1,two) g; explain (costs off) select count(*) from tenk1 a, lateral generate_series(1,two) g; - QUERY PLAN ------------------------------------------------- + QUERY PLAN +------------------------------------------------------ Aggregate -> Nested Loop -> Seq Scan on tenk1 a - -> Function Scan on generate_series g -(4 rows) + -> Result Cache + Cache Key: a.two + -> Function Scan on generate_series g +(6 rows) explain (costs off) select count(*) from tenk1 a cross join lateral generate_series(1,two) g; - QUERY PLAN ------------------------------------------------- + QUERY PLAN +------------------------------------------------------ Aggregate -> Nested Loop -> Seq Scan on tenk1 a - -> Function Scan on generate_series g -(4 rows) + -> Result Cache + Cache Key: a.two + -> Function Scan on generate_series g +(6 rows) -- don't need the explicit LATERAL keyword for functions explain (costs off) select count(*) from tenk1 a, generate_series(1,two) g; - QUERY PLAN ------------------------------------------------- + QUERY PLAN +------------------------------------------------------ Aggregate -> Nested Loop -> Seq Scan on tenk1 a - -> Function Scan on generate_series g -(4 rows) + -> Result Cache + Cache Key: a.two + -> Function Scan on generate_series g +(6 rows) -- lateral with UNION ALL subselect explain (costs off) @@ -5079,14 +5103,15 @@ explain (costs off) QUERY PLAN ------------------------------------------------------------------ Aggregate - -> Hash Join - Hash Cond: ("*VALUES*".column1 = b.unique2) + -> Nested Loop -> Nested Loop -> Index Only Scan using tenk1_unique1 on tenk1 a -> Values Scan on "*VALUES*" - -> Hash + -> Result Cache + Cache Key: "*VALUES*".column1 -> Index Only Scan using tenk1_unique2 on tenk1 b -(8 rows) + Index Cond: (unique2 = "*VALUES*".column1) +(9 rows) select count(*) from tenk1 a, tenk1 b join lateral (values(a.unique1),(-1)) ss(x) on b.unique2 = ss.x; diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out index c4e827caec..1a7149bfd5 100644 --- a/src/test/regress/expected/partition_prune.out +++ b/src/test/regress/expected/partition_prune.out @@ -1958,6 +1958,9 @@ begin ln := regexp_replace(ln, 'Workers Launched: \d+', 'Workers Launched: N'); ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=N loops=N'); ln := regexp_replace(ln, 'Rows Removed by Filter: \d+', 'Rows Removed by Filter: N'); + ln := regexp_replace(ln, 'Hits: \d+', 'Hits: N'); + ln := regexp_replace(ln, 'Misses: \d+', 'Misses: N'); + ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); return next ln; end loop; end; @@ -2085,6 +2088,7 @@ create index ab_a3_b2_a_idx on ab_a3_b2 (a); create index ab_a3_b3_a_idx on ab_a3_b3 (a); set enable_hashjoin = 0; set enable_mergejoin = 0; +set enable_resultcache = 0; select explain_parallel_append('select avg(ab.a) from ab inner join lprt_a a on ab.a = a.a where a.a in(0, 0, 1)'); explain_parallel_append -------------------------------------------------------------------------------------------------------- @@ -2253,6 +2257,7 @@ select explain_parallel_append('select avg(ab.a) from ab inner join lprt_a a on reset enable_hashjoin; reset enable_mergejoin; +reset enable_resultcache; reset parallel_setup_cost; reset parallel_tuple_cost; reset min_parallel_table_scan_size; diff --git a/src/test/regress/expected/resultcache.out b/src/test/regress/expected/resultcache.out new file mode 100644 index 0000000000..65d9e25169 --- /dev/null +++ b/src/test/regress/expected/resultcache.out @@ -0,0 +1,158 @@ +-- Perform tests on the Result Cache node. +-- The cache hits/misses/evictions from the Result Cache node can vary between +-- machines. Let's just replace the number with an 'N'. In order to allow us +-- to perform validation when the measure was zero, we replace a zero value +-- with "Zero". All other numbers are replaced with 'N'. +create function explain_resultcache(query text, hide_hitmiss bool) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in + execute format('explain (analyze, costs off, summary off, timing off) %s', + query) + loop + if hide_hitmiss = true then + ln := regexp_replace(ln, 'Hits: 0', 'Hits: Zero'); + ln := regexp_replace(ln, 'Hits: \d+', 'Hits: N'); + ln := regexp_replace(ln, 'Misses: 0', 'Misses: Zero'); + ln := regexp_replace(ln, 'Misses: \d+', 'Misses: N'); + end if; + ln := regexp_replace(ln, 'Evictions: 0', 'Evictions: Zero'); + ln := regexp_replace(ln, 'Evictions: \d+', 'Evictions: N'); + ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); + ln := regexp_replace(ln, 'Heap Fetches: \d+', 'Heap Fetches: N'); + return next ln; + end loop; +end; +$$; +-- Ensure we get a result cache on the inner side of the nested loop +SET enable_hashjoin TO off; +SET enable_bitmapscan TO off; +SELECT explain_resultcache(' +SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 +INNER JOIN tenk1 t2 ON t1.unique1 = t2.twenty +WHERE t2.unique1 < 1000;', false); + explain_resultcache +-------------------------------------------------------------------------------------------- + Aggregate (actual rows=1 loops=1) + -> Nested Loop (actual rows=1000 loops=1) + -> Seq Scan on tenk1 t2 (actual rows=1000 loops=1) + Filter: (unique1 < 1000) + Rows Removed by Filter: 9000 + -> Result Cache (actual rows=1 loops=1000) + Cache Key: t2.twenty + Hits: 980 Misses: 20 Evictions: Zero Overflows: 0 Memory Usage: NkB + -> Index Only Scan using tenk1_unique1 on tenk1 t1 (actual rows=1 loops=20) + Index Cond: (unique1 = t2.twenty) + Heap Fetches: N +(11 rows) + +-- And check we get the expected results. +SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 +INNER JOIN tenk1 t2 ON t1.unique1 = t2.twenty +WHERE t2.unique1 < 1000; + count | avg +-------+-------------------- + 1000 | 9.5000000000000000 +(1 row) + +-- Try with LATERAL joins +SELECT explain_resultcache(' +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000;', false); + explain_resultcache +-------------------------------------------------------------------------------------------- + Aggregate (actual rows=1 loops=1) + -> Nested Loop (actual rows=1000 loops=1) + -> Seq Scan on tenk1 t1 (actual rows=1000 loops=1) + Filter: (unique1 < 1000) + Rows Removed by Filter: 9000 + -> Result Cache (actual rows=1 loops=1000) + Cache Key: t1.twenty + Hits: 980 Misses: 20 Evictions: Zero Overflows: 0 Memory Usage: NkB + -> Index Only Scan using tenk1_unique1 on tenk1 t2 (actual rows=1 loops=20) + Index Cond: (unique1 = t1.twenty) + Heap Fetches: N +(11 rows) + +-- And check we get the expected results. +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000; + count | avg +-------+-------------------- + 1000 | 9.5000000000000000 +(1 row) + +-- Reduce work_mem so that we see some cache evictions +SET work_mem TO '64kB'; +SET enable_mergejoin TO off; +-- Ensure we get some evictions. We're unable to validate the hits and misses +-- here as the number of entries that fit in the cache at once will vary +-- between different machines. +SELECT explain_resultcache(' +SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 +INNER JOIN tenk1 t2 ON t1.unique1 = t2.thousand +WHERE t2.unique1 < 1200;', true); + explain_resultcache +---------------------------------------------------------------------------------------------- + Aggregate (actual rows=1 loops=1) + -> Nested Loop (actual rows=1200 loops=1) + -> Seq Scan on tenk1 t2 (actual rows=1200 loops=1) + Filter: (unique1 < 1200) + Rows Removed by Filter: 8800 + -> Result Cache (actual rows=1 loops=1200) + Cache Key: t2.thousand + Hits: N Misses: N Evictions: N Overflows: 0 Memory Usage: NkB + -> Index Only Scan using tenk1_unique1 on tenk1 t1 (actual rows=1 loops=1028) + Index Cond: (unique1 = t2.thousand) + Heap Fetches: N +(11 rows) + +RESET enable_mergejoin; +RESET work_mem; +RESET enable_bitmapscan; +RESET enable_hashjoin; +-- Test parallel plans with Result Cache. +SET min_parallel_table_scan_size TO 0; +SET parallel_setup_cost TO 0; +SET parallel_tuple_cost TO 0; +SET max_parallel_workers_per_gather TO 2; +-- Ensure we get a parallel plan. +EXPLAIN (COSTS OFF) +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000; + QUERY PLAN +------------------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Nested Loop + -> Parallel Bitmap Heap Scan on tenk1 t1 + Recheck Cond: (unique1 < 1000) + -> Bitmap Index Scan on tenk1_unique1 + Index Cond: (unique1 < 1000) + -> Result Cache + Cache Key: t1.twenty + -> Index Only Scan using tenk1_unique1 on tenk1 t2 + Index Cond: (unique1 = t1.twenty) +(13 rows) + +-- And ensure the parallel plan gives us the correct results. +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000; + count | avg +-------+-------------------- + 1000 | 9.5000000000000000 +(1 row) + +RESET max_parallel_workers_per_gather; +RESET parallel_tuple_cost; +RESET parallel_setup_cost; +RESET min_parallel_table_scan_size; diff --git a/src/test/regress/expected/subselect.out b/src/test/regress/expected/subselect.out index d5532d0ccc..c7986fb7fc 100644 --- a/src/test/regress/expected/subselect.out +++ b/src/test/regress/expected/subselect.out @@ -1091,19 +1091,21 @@ select sum(o.four), sum(ss.a) from select * from x ) ss where o.ten = 1; - QUERY PLAN ---------------------------------------------------- + QUERY PLAN +--------------------------------------------------------- Aggregate -> Nested Loop -> Seq Scan on onek o Filter: (ten = 1) - -> CTE Scan on x - CTE x - -> Recursive Union - -> Result - -> WorkTable Scan on x x_1 - Filter: (a < 10) -(10 rows) + -> Result Cache + Cache Key: o.four + -> CTE Scan on x + CTE x + -> Recursive Union + -> Result + -> WorkTable Scan on x x_1 + Filter: (a < 10) +(12 rows) select sum(o.four), sum(ss.a) from onek o cross join lateral ( diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 98dde452e6..0bb558d93c 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -111,10 +111,11 @@ select name, setting from pg_settings where name like 'enable%'; enable_partition_pruning | on enable_partitionwise_aggregate | off enable_partitionwise_join | off + enable_resultcache | on enable_seqscan | on enable_sort | on enable_tidscan | on -(19 rows) +(20 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 312c11a4bd..2e89839089 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -119,7 +119,7 @@ test: plancache limit plpgsql copy2 temp domain rangefuncs prepare conversion tr # ---------- # Another group of parallel tests # ---------- -test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression +test: partition_join partition_prune reloptions hash_part indexing partition_aggregate partition_info tuplesort explain compression resultcache # event triggers cannot run concurrently with any test that runs DDL # oidjoins is read-only, though, and should run late for best coverage diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 5a80bfacd8..a46f3d0178 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -203,6 +203,7 @@ test: partition_info test: tuplesort test: explain test: compression +test: resultcache test: event_trigger test: oidjoins test: fast_default diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index eb53668299..eb80a2fe06 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -1098,9 +1098,11 @@ select v||'a', case when v||'a' = 'aa' then 1 else 0 end, count(*) -- Make sure that generation of HashAggregate for uniqification purposes -- does not lead to array overflow due to unexpected duplicate hash keys -- see CAFeeJoKKu0u+A_A9R9316djW-YW3-+Gtgvy3ju655qRHR3jtdA@mail.gmail.com +set enable_resultcache to off; explain (costs off) select 1 from tenk1 where (hundred, thousand) in (select twothousand, twothousand from onek); +reset enable_resultcache; -- -- Hash Aggregation Spill tests diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql index 8164383fb5..7f866c603b 100644 --- a/src/test/regress/sql/join.sql +++ b/src/test/regress/sql/join.sql @@ -550,6 +550,7 @@ reset enable_nestloop; set work_mem to '64kB'; set enable_mergejoin to off; +set enable_resultcache to off; explain (costs off) select count(*) from tenk1 a, tenk1 b @@ -559,6 +560,7 @@ select count(*) from tenk1 a, tenk1 b reset work_mem; reset enable_mergejoin; +reset enable_resultcache; -- -- regression test for 8.2 bug with improper re-ordering of left joins diff --git a/src/test/regress/sql/partition_prune.sql b/src/test/regress/sql/partition_prune.sql index 6ccb52ad1d..247264f93b 100644 --- a/src/test/regress/sql/partition_prune.sql +++ b/src/test/regress/sql/partition_prune.sql @@ -464,6 +464,9 @@ begin ln := regexp_replace(ln, 'Workers Launched: \d+', 'Workers Launched: N'); ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=N loops=N'); ln := regexp_replace(ln, 'Rows Removed by Filter: \d+', 'Rows Removed by Filter: N'); + ln := regexp_replace(ln, 'Hits: \d+', 'Hits: N'); + ln := regexp_replace(ln, 'Misses: \d+', 'Misses: N'); + ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); return next ln; end loop; end; @@ -515,6 +518,7 @@ create index ab_a3_b3_a_idx on ab_a3_b3 (a); set enable_hashjoin = 0; set enable_mergejoin = 0; +set enable_resultcache = 0; select explain_parallel_append('select avg(ab.a) from ab inner join lprt_a a on ab.a = a.a where a.a in(0, 0, 1)'); @@ -533,6 +537,7 @@ select explain_parallel_append('select avg(ab.a) from ab inner join lprt_a a on reset enable_hashjoin; reset enable_mergejoin; +reset enable_resultcache; reset parallel_setup_cost; reset parallel_tuple_cost; reset min_parallel_table_scan_size; diff --git a/src/test/regress/sql/resultcache.sql b/src/test/regress/sql/resultcache.sql new file mode 100644 index 0000000000..2be5b8f2d8 --- /dev/null +++ b/src/test/regress/sql/resultcache.sql @@ -0,0 +1,91 @@ +-- Perform tests on the Result Cache node. + +-- The cache hits/misses/evictions from the Result Cache node can vary between +-- machines. Let's just replace the number with an 'N'. In order to allow us +-- to perform validation when the measure was zero, we replace a zero value +-- with "Zero". All other numbers are replaced with 'N'. +create function explain_resultcache(query text, hide_hitmiss bool) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in + execute format('explain (analyze, costs off, summary off, timing off) %s', + query) + loop + if hide_hitmiss = true then + ln := regexp_replace(ln, 'Hits: 0', 'Hits: Zero'); + ln := regexp_replace(ln, 'Hits: \d+', 'Hits: N'); + ln := regexp_replace(ln, 'Misses: 0', 'Misses: Zero'); + ln := regexp_replace(ln, 'Misses: \d+', 'Misses: N'); + end if; + ln := regexp_replace(ln, 'Evictions: 0', 'Evictions: Zero'); + ln := regexp_replace(ln, 'Evictions: \d+', 'Evictions: N'); + ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); + ln := regexp_replace(ln, 'Heap Fetches: \d+', 'Heap Fetches: N'); + return next ln; + end loop; +end; +$$; + +-- Ensure we get a result cache on the inner side of the nested loop +SET enable_hashjoin TO off; +SET enable_bitmapscan TO off; +SELECT explain_resultcache(' +SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 +INNER JOIN tenk1 t2 ON t1.unique1 = t2.twenty +WHERE t2.unique1 < 1000;', false); + +-- And check we get the expected results. +SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 +INNER JOIN tenk1 t2 ON t1.unique1 = t2.twenty +WHERE t2.unique1 < 1000; + +-- Try with LATERAL joins +SELECT explain_resultcache(' +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000;', false); + +-- And check we get the expected results. +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000; + +-- Reduce work_mem so that we see some cache evictions +SET work_mem TO '64kB'; +SET enable_mergejoin TO off; +-- Ensure we get some evictions. We're unable to validate the hits and misses +-- here as the number of entries that fit in the cache at once will vary +-- between different machines. +SELECT explain_resultcache(' +SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 +INNER JOIN tenk1 t2 ON t1.unique1 = t2.thousand +WHERE t2.unique1 < 1200;', true); +RESET enable_mergejoin; +RESET work_mem; +RESET enable_bitmapscan; +RESET enable_hashjoin; + +-- Test parallel plans with Result Cache. +SET min_parallel_table_scan_size TO 0; +SET parallel_setup_cost TO 0; +SET parallel_tuple_cost TO 0; +SET max_parallel_workers_per_gather TO 2; + +-- Ensure we get a parallel plan. +EXPLAIN (COSTS OFF) +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000; + +-- And ensure the parallel plan gives us the correct results. +SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, +LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 +WHERE t1.unique1 < 1000; + +RESET max_parallel_workers_per_gather; +RESET parallel_tuple_cost; +RESET parallel_setup_cost; +RESET min_parallel_table_scan_size;