diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 8f7062cd6e..447f69d044 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -107,6 +107,7 @@ static void show_tidbitmap_info(BitmapHeapScanState *planstate, static void show_instrumentation_count(const char *qlabel, int which, PlanState *planstate, ExplainState *es); static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es); +static void show_eval_params(Bitmapset *bms_params, ExplainState *es); static const char *explain_get_index_name(Oid indexId); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir, @@ -1441,6 +1442,11 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, es); ExplainPropertyInteger("Workers Planned", gather->num_workers, es); + + /* Show params evaluated at gather node */ + if (gather->initParam) + show_eval_params(gather->initParam, es); + if (es->analyze) { int nworkers; @@ -1463,6 +1469,11 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, es); ExplainPropertyInteger("Workers Planned", gm->num_workers, es); + + /* Show params evaluated at gather-merge node */ + if (gm->initParam) + show_eval_params(gm->initParam, es); + if (es->analyze) { int nworkers; @@ -2487,6 +2498,29 @@ show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es) } } +/* + * Show initplan params evaluated at Gather or Gather Merge node. + */ +static void +show_eval_params(Bitmapset *bms_params, ExplainState *es) +{ + int paramid = -1; + List *params = NIL; + + Assert(bms_params); + + while ((paramid = bms_next_member(bms_params, paramid)) >= 0) + { + char param[32]; + + snprintf(param, sizeof(param), "$%d", paramid); + params = lappend(params, pstrdup(param)); + } + + if (params) + ExplainPropertyList("Params Evaluated", params, es); +} + /* * Fetch the name of an index in an EXPLAIN * diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index a0f537b706..6c4612dad4 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -1926,6 +1926,33 @@ ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext) *op->resnull = prm->isnull; } +/* + * ExecEvalParamExecParams + * + * Execute the subplan stored in PARAM_EXEC initplans params, if not executed + * till now. + */ +void +ExecEvalParamExecParams(Bitmapset *params, EState *estate) +{ + ParamExecData *prm; + int paramid; + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + prm = &(estate->es_param_exec_vals[paramid]); + + if (prm->execPlan != NULL) + { + /* Parameter not evaluated yet, so go do it */ + ExecSetParamPlan(prm->execPlan, GetPerTupleExprContext(estate)); + /* ExecSetParamPlan should have processed this param... */ + Assert(prm->execPlan == NULL); + } + } +} + /* * Evaluate a PARAM_EXTERN parameter. * diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index fd7e7cbf3d..c435550637 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "executor/execExpr.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeBitmapHeapscan.h" @@ -38,7 +39,9 @@ #include "optimizer/planner.h" #include "storage/spin.h" #include "tcop/tcopprot.h" +#include "utils/datum.h" #include "utils/dsa.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "pgstat.h" @@ -50,7 +53,7 @@ */ #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) @@ -65,6 +68,7 @@ typedef struct FixedParallelExecutorState { int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ + dsa_pointer param_exec; } FixedParallelExecutorState; /* @@ -266,6 +270,133 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) return planstate_tree_walker(planstate, ExecParallelEstimate, e); } +/* + * Estimate the amount of space required to serialize the indicated parameters. + */ +static Size +EstimateParamExecSpace(EState *estate, Bitmapset *params) +{ + int paramid; + Size sz = sizeof(int); + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + ParamExecData *prm; + + prm = &(estate->es_param_exec_vals[paramid]); + typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes, + paramid); + + sz = add_size(sz, sizeof(int)); /* space for paramid */ + + /* space for datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + sz = add_size(sz, + datumEstimateSpace(prm->value, prm->isnull, + typByVal, typLen)); + } + return sz; +} + +/* + * Serialize specified PARAM_EXEC parameters. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte paramid (location of param in execution time internal + * parameter array) and then the datum as serialized by datumSerialize(). + */ +static dsa_pointer +SerializeParamExecParams(EState *estate, Bitmapset *params) +{ + Size size; + int nparams; + int paramid; + ParamExecData *prm; + dsa_pointer handle; + char *start_address; + + /* Allocate enough space for the current parameter values. */ + size = EstimateParamExecSpace(estate, params); + handle = dsa_allocate(estate->es_query_dsa, size); + start_address = dsa_get_address(estate->es_query_dsa, handle); + + /* First write the number of parameters as a 4-byte integer. */ + nparams = bms_num_members(params); + memcpy(start_address, &nparams, sizeof(int)); + start_address += sizeof(int); + + /* Write details for each parameter in turn. */ + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + + prm = &(estate->es_param_exec_vals[paramid]); + typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes, + paramid); + + /* Write paramid. */ + memcpy(start_address, ¶mid, sizeof(int)); + start_address += sizeof(int); + + /* Write datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + datumSerialize(prm->value, prm->isnull, typByVal, typLen, + &start_address); + } + + return handle; +} + +/* + * Restore specified PARAM_EXEC parameters. + */ +static void +RestoreParamExecParams(char *start_address, EState *estate) +{ + int nparams; + int i; + int paramid; + + memcpy(&nparams, start_address, sizeof(int)); + start_address += sizeof(int); + + for (i = 0; i < nparams; i++) + { + ParamExecData *prm; + + /* Read paramid */ + memcpy(¶mid, start_address, sizeof(int)); + start_address += sizeof(int); + prm = &(estate->es_param_exec_vals[paramid]); + + /* Read datum/isnull. */ + prm->value = datumRestore(&start_address, &prm->isnull); + prm->execPlan = NULL; + } +} + /* * Initialize the dynamic shared memory segment that will be used to control * parallel execution. @@ -395,7 +526,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, +ExecInitParallelPlan(PlanState *planstate, EState *estate, + Bitmapset *sendParams, int nworkers, int64 tuples_needed) { ParallelExecutorInfo *pei; @@ -405,17 +537,20 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; - char *param_space; + char *paramlistinfo_space; BufferUsage *bufusage_space; SharedExecutorInstrumentation *instrumentation = NULL; int pstmt_len; - int param_len; + int paramlistinfo_len; int instrumentation_len = 0; int instrument_offset = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; + /* Force parameters we're going to pass to workers to be evaluated. */ + ExecEvalParamExecParams(sendParams, estate); + /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); pei->finished = false; @@ -450,8 +585,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate space for serialized ParamListInfo. */ - param_len = EstimateParamListSpace(estate->es_param_list_info); - shm_toc_estimate_chunk(&pcxt->estimator, param_len); + paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info); + shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); /* @@ -511,6 +646,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, /* Store fixed-size state. */ fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); fpes->tuples_needed = tuples_needed; + fpes->param_exec = InvalidDsaPointer; shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -524,9 +660,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); /* Store serialized ParamListInfo. */ - param_space = shm_toc_allocate(pcxt->toc, param_len); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space); - SerializeParamList(estate->es_param_list_info, ¶m_space); + paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); + SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, @@ -577,13 +713,25 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, pei->area = dsa_create_in_place(area_space, dsa_minsize, LWTRANCHE_PARALLEL_QUERY_DSA, pcxt->seg); - } - /* - * Make the area available to executor nodes running in the leader. See - * also ParallelQueryMain which makes it available to workers. - */ - estate->es_query_dsa = pei->area; + /* + * Make the area available to executor nodes running in the leader. + * See also ParallelQueryMain which makes it available to workers. + */ + estate->es_query_dsa = pei->area; + + /* + * Serialize parameters, if any, using DSA storage. We don't dare use + * the main parallel query DSM for this because we might relaunch + * workers after the values have changed (and thus the amount of + * storage required has changed). + */ + if (!bms_is_empty(sendParams)) + { + pei->param_exec = SerializeParamExecParams(estate, sendParams); + fpes->param_exec = pei->param_exec; + } + } /* * Give parallel-aware nodes a chance to initialize their shared data. @@ -640,16 +788,39 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei) */ void ExecParallelReinitialize(PlanState *planstate, - ParallelExecutorInfo *pei) + ParallelExecutorInfo *pei, + Bitmapset *sendParams) { + EState *estate = planstate->state; + FixedParallelExecutorState *fpes; + /* Old workers must already be shut down */ Assert(pei->finished); + /* Force parameters we're going to pass to workers to be evaluated. */ + ExecEvalParamExecParams(sendParams, estate); + ReinitializeParallelDSM(pei->pcxt); pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); pei->reader = NULL; pei->finished = false; + fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + + /* Free any serialized parameters from the last round. */ + if (DsaPointerIsValid(fpes->param_exec)) + { + dsa_free(estate->es_query_dsa, fpes->param_exec); + fpes->param_exec = InvalidDsaPointer; + } + + /* Serialize current parameter values if required. */ + if (!bms_is_empty(sendParams)) + { + pei->param_exec = SerializeParamExecParams(estate, sendParams); + fpes->param_exec = pei->param_exec; + } + /* Traverse plan tree and let each child node reset associated state. */ ExecParallelReInitializeDSM(planstate, pei->pcxt); } @@ -831,6 +1002,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { + /* Free any serialized parameters. */ + if (DsaPointerIsValid(pei->param_exec)) + { + dsa_free(pei->area, pei->param_exec); + pei->param_exec = InvalidDsaPointer; + } if (pei->area != NULL) { dsa_detach(pei->area); @@ -882,7 +1059,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, pstmt = (PlannedStmt *) stringToNode(pstmtspace); /* Reconstruct ParamListInfo. */ - paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false); + paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false); paramLI = RestoreParamList(¶mspace); /* @@ -1046,6 +1223,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; + if (DsaPointerIsValid(fpes->param_exec)) + { + char *paramexec_space; + + paramexec_space = dsa_get_address(area, fpes->param_exec); + RestoreParamExecParams(paramexec_space, queryDesc->estate); + + } ExecParallelInitializeWorker(queryDesc->planstate, toc); /* Pass down any tuple bound */ diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 0298c65d06..07c62d2fea 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -160,11 +160,13 @@ ExecGather(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, + gather->initParam, gather->num_workers, node->tuples_needed); else ExecParallelReinitialize(node->ps.lefttree, - node->pei); + node->pei, + gather->initParam); /* * Register backend workers. We might not get as many as we diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 7206ab9197..7dd655c448 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -203,11 +203,13 @@ ExecGatherMerge(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, + gm->initParam, gm->num_workers, node->tuples_needed); else ExecParallelReinitialize(node->ps.lefttree, - node->pei); + node->pei, + gm->initParam); /* Try to launch workers. */ pcxt = node->pei->pcxt; diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 76e75459b4..d9ff8a7e51 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -364,6 +364,7 @@ _copyGather(const Gather *from) COPY_SCALAR_FIELD(rescan_param); COPY_SCALAR_FIELD(single_copy); COPY_SCALAR_FIELD(invisible); + COPY_BITMAPSET_FIELD(initParam); return newnode; } @@ -391,6 +392,7 @@ _copyGatherMerge(const GatherMerge *from) COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool)); + COPY_BITMAPSET_FIELD(initParam); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index dc35df9e4f..c97ee24ade 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -487,6 +487,7 @@ _outGather(StringInfo str, const Gather *node) WRITE_INT_FIELD(rescan_param); WRITE_BOOL_FIELD(single_copy); WRITE_BOOL_FIELD(invisible); + WRITE_BITMAPSET_FIELD(initParam); } static void @@ -517,6 +518,8 @@ _outGatherMerge(StringInfo str, const GatherMerge *node) appendStringInfoString(str, " :nullsFirst"); for (i = 0; i < node->numCols; i++) appendStringInfo(str, " %s", booltostr(node->nullsFirst[i])); + + WRITE_BITMAPSET_FIELD(initParam); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 593658dd8a..7eb67fc040 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2172,6 +2172,7 @@ _readGather(void) READ_INT_FIELD(rescan_param); READ_BOOL_FIELD(single_copy); READ_BOOL_FIELD(invisible); + READ_BITMAPSET_FIELD(initParam); READ_DONE(); } @@ -2193,6 +2194,7 @@ _readGatherMerge(void) READ_OID_ARRAY(sortOperators, local_node->numCols); READ_OID_ARRAY(collations, local_node->numCols); READ_BOOL_ARRAY(nullsFirst, local_node->numCols); + READ_BITMAPSET_FIELD(initParam); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 9c74e39bd3..d4454779ee 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -6279,6 +6279,7 @@ make_gather(List *qptlist, node->rescan_param = rescan_param; node->single_copy = single_copy; node->invisible = false; + node->initParam = NULL; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 4c00a1453b..f6b8bbf5fa 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -377,6 +377,14 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { Gather *gather = makeNode(Gather); + /* + * If there are any initPlans attached to the formerly-top plan node, + * move them up to the Gather node; same as we do for Material node in + * materialize_finished_plan. + */ + gather->plan.initPlan = top_plan->initPlan; + top_plan->initPlan = NIL; + gather->plan.targetlist = top_plan->targetlist; gather->plan.qual = NIL; gather->plan.lefttree = top_plan; diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index fa9a3f0b47..28a7f7ec45 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -107,6 +107,7 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_param_references(PlannerInfo *root, Plan *plan); static Node *convert_combining_aggrefs(Node *node, void *context); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); @@ -632,7 +633,10 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_Gather: case T_GatherMerge: - set_upper_references(root, plan, rtoffset); + { + set_upper_references(root, plan, rtoffset); + set_param_references(root, plan); + } break; case T_Hash: @@ -1781,6 +1785,51 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) pfree(subplan_itlist); } +/* + * set_param_references + * Initialize the initParam list in Gather or Gather merge node such that + * it contains reference of all the params that needs to be evaluated + * before execution of the node. It contains the initplan params that are + * being passed to the plan nodes below it. + */ +static void +set_param_references(PlannerInfo *root, Plan *plan) +{ + Assert(IsA(plan, Gather) || IsA(plan, GatherMerge)); + + if (plan->lefttree->extParam) + { + PlannerInfo *proot; + Bitmapset *initSetParam = NULL; + ListCell *l; + + for (proot = root; proot != NULL; proot = proot->parent_root) + { + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); + ListCell *l2; + + foreach(l2, initsubplan->setParam) + { + initSetParam = bms_add_member(initSetParam, lfirst_int(l2)); + } + } + } + + /* + * Remember the list of all external initplan params that are used by + * the children of Gather or Gather merge node. + */ + if (IsA(plan, Gather)) + ((Gather *) plan)->initParam = + bms_intersect(plan->lefttree->extParam, initSetParam); + else + ((GatherMerge *) plan)->initParam = + bms_intersect(plan->lefttree->extParam, initSetParam); + } +} + /* * Recursively scan an expression tree and convert Aggrefs to the proper * intermediate form for combining aggregates. This means (1) replacing each diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 66e098f488..d14ef31eae 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -1087,6 +1087,8 @@ bool is_parallel_safe(PlannerInfo *root, Node *node) { max_parallel_hazard_context context; + PlannerInfo *proot; + ListCell *l; /* * Even if the original querytree contained nothing unsafe, we need to @@ -1101,6 +1103,25 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + + /* + * The params that refer to the same or parent query level are considered + * parallel-safe. The idea is that we compute such params at Gather or + * Gather Merge node and pass their value to workers. + */ + for (proot = root; proot != NULL; proot = proot->parent_root) + { + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); + ListCell *l2; + + foreach(l2, initsubplan->setParam) + context.safe_param_ids = lcons_int(lfirst_int(l2), + context.safe_param_ids); + } + } + return !max_parallel_hazard_walker(node, &context); } @@ -1225,7 +1246,8 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) * We can't pass Params to workers at the moment either, so they are also * parallel-restricted, unless they are PARAM_EXTERN Params or are * PARAM_EXEC Params listed in safe_param_ids, meaning they could be - * generated within the worker. + * either generated within the worker or can be computed in master and + * then their value can be passed to the worker. */ else if (IsA(node, Param)) { diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 78d2247816..5bbb63a9d8 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -609,6 +609,7 @@ extern ExprEvalOp ExecEvalStepOp(ExprState *state, ExprEvalStep *op); */ extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalParamExecParams(Bitmapset *params, EState *estate); extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index e1b3e7af1f..99a13f3b7d 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -28,6 +28,7 @@ typedef struct ParallelExecutorInfo BufferUsage *buffer_usage; /* points to bufusage area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ + dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */ bool finished; /* set true by ExecParallelFinish */ /* These two arrays have pcxt->nworkers_launched entries: */ shm_mq_handle **tqueue; /* tuple queues for worker output */ @@ -35,12 +36,13 @@ typedef struct ParallelExecutorInfo } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, - EState *estate, int nworkers, int64 tuples_needed); + EState *estate, Bitmapset *sendParam, int nworkers, + int64 tuples_needed); extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(PlanState *planstate, - ParallelExecutorInfo *pei); + ParallelExecutorInfo *pei, Bitmapset *sendParam); extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index a127682b0e..9b38d44ba0 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -841,6 +841,8 @@ typedef struct Gather int rescan_param; /* ID of Param that signals a rescan, or -1 */ bool single_copy; /* don't execute plan more than once */ bool invisible; /* suppress EXPLAIN display (for testing)? */ + Bitmapset *initParam; /* param id's of initplans which are referred + * at gather or one of it's child node */ } Gather; /* ------------ @@ -858,6 +860,8 @@ typedef struct GatherMerge Oid *sortOperators; /* OIDs of operators to sort them by */ Oid *collations; /* OIDs of collations */ bool *nullsFirst; /* NULLS FIRST/LAST directions */ + Bitmapset *initParam; /* param id's of initplans which are referred + * at gather merge or one of it's child node */ } GatherMerge; /* ---------------- diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 06aeddd805..d1d5b228ce 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -201,6 +201,41 @@ explain (costs off) -> Seq Scan on tenk2 (4 rows) +alter table tenk2 reset (parallel_workers); +-- test parallel plan for a query containing initplan. +set enable_indexscan = off; +set enable_indexonlyscan = off; +set enable_bitmapscan = off; +alter table tenk2 set (parallel_workers = 2); +explain (costs off) + select count(*) from tenk1 + where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); + QUERY PLAN +------------------------------------------------------ + Aggregate + InitPlan 1 (returns $2) + -> Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Seq Scan on tenk2 + -> Gather + Workers Planned: 4 + Params Evaluated: $2 + -> Parallel Seq Scan on tenk1 + Filter: (unique1 = $2) +(12 rows) + +select count(*) from tenk1 + where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); + count +------- + 1 +(1 row) + +reset enable_indexscan; +reset enable_indexonlyscan; +reset enable_bitmapscan; alter table tenk2 reset (parallel_workers); -- test parallel index scans. set enable_seqscan to off; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index b701b35408..bb4e34adbe 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -74,6 +74,23 @@ explain (costs off) (select ten from tenk2); alter table tenk2 reset (parallel_workers); +-- test parallel plan for a query containing initplan. +set enable_indexscan = off; +set enable_indexonlyscan = off; +set enable_bitmapscan = off; +alter table tenk2 set (parallel_workers = 2); + +explain (costs off) + select count(*) from tenk1 + where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); +select count(*) from tenk1 + where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2); + +reset enable_indexscan; +reset enable_indexonlyscan; +reset enable_bitmapscan; +alter table tenk2 reset (parallel_workers); + -- test parallel index scans. set enable_seqscan to off; set enable_bitmapscan to off;