diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index ef810a5834..9c1533e311 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -36,6 +36,7 @@ #include "executor/nodeGather.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" +#include "utils/memutils.h" #include "utils/rel.h" @@ -50,6 +51,9 @@ GatherState * ExecInitGather(Gather *node, EState *estate, int eflags) { GatherState *gatherstate; + Plan *outerNode; + bool hasoid; + TupleDesc tupDesc; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags) /* * tuple table initialization */ + gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate); ExecInitResultTupleSlot(estate, &gatherstate->ps); /* * now initialize outer plan */ - outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags); - + outerNode = outerPlan(node); + outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags); gatherstate->ps.ps_TupFromTlist = false; @@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ExecAssignResultTypeFromTL(&gatherstate->ps); ExecAssignProjectionInfo(&gatherstate->ps, NULL); + /* + * Initialize funnel slot to same tuple descriptor as outer plan. + */ + if (!ExecContextForcesOids(&gatherstate->ps, &hasoid)) + hasoid = false; + tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid); + ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc); + return gatherstate; } @@ -113,6 +126,9 @@ ExecGather(GatherState *node) { int i; TupleTableSlot *slot; + TupleTableSlot *resultSlot; + ExprDoneCond isDone; + ExprContext *econtext; /* * Initialize the parallel context and workers on first execution. We do @@ -169,7 +185,53 @@ ExecGather(GatherState *node) node->initialized = true; } - slot = gather_getnext(node); + /* + * Check to see if we're still projecting out tuples from a previous scan + * tuple (because there is a function-returning-set in the projection + * expressions). If so, try to project another one. + */ + if (node->ps.ps_TupFromTlist) + { + resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone); + if (isDone == ExprMultipleResult) + return resultSlot; + /* Done with that source tuple... */ + node->ps.ps_TupFromTlist = false; + } + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous tuple cycle. Note we can't do this + * until we're done projecting. + */ + econtext = node->ps.ps_ExprContext; + ResetExprContext(econtext); + + /* Get and return the next tuple, projecting if necessary. */ + for (;;) + { + /* + * Get next tuple, either from one of our workers, or by running the + * plan ourselves. + */ + slot = gather_getnext(node); + if (TupIsNull(slot)) + return NULL; + + /* + * form the result tuple using ExecProject(), and return it --- unless + * the projection produces an empty set, in which case we must loop + * back around for another tuple + */ + econtext->ecxt_outertuple = slot; + resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone); + + if (isDone != ExprEndResult) + { + node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult); + return resultSlot; + } + } return slot; } @@ -201,18 +263,11 @@ ExecEndGather(GatherState *node) static TupleTableSlot * gather_getnext(GatherState *gatherstate) { - PlanState *outerPlan; + PlanState *outerPlan = outerPlanState(gatherstate); TupleTableSlot *outerTupleSlot; - TupleTableSlot *slot; + TupleTableSlot *fslot = gatherstate->funnel_slot; HeapTuple tup; - /* - * We can use projection info of Gather for the tuples received from - * worker backends as currently for all cases worker backends sends the - * projected tuple as required by Gather node. - */ - slot = gatherstate->ps.ps_ProjInfo->pi_slot; - while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally) { if (gatherstate->funnel != NULL) @@ -229,19 +284,17 @@ gather_getnext(GatherState *gatherstate) if (HeapTupleIsValid(tup)) { ExecStoreTuple(tup, /* tuple to store */ - slot, /* slot to store in */ + fslot, /* slot in which to store the tuple */ InvalidBuffer, /* buffer associated with this * tuple */ true); /* pfree this pointer if not from heap */ - return slot; + return fslot; } } if (gatherstate->need_to_scan_locally) { - outerPlan = outerPlanState(gatherstate); - outerTupleSlot = ExecProcNode(outerPlan); if (!TupIsNull(outerTupleSlot)) @@ -251,7 +304,7 @@ gather_getnext(GatherState *gatherstate) } } - return ExecClearTuple(slot); + return ExecClearTuple(fslot); } /* ---------------------------------------------------------------- diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 8c6c57101c..48d6e6fd78 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -602,12 +602,15 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) set_join_references(root, (Join *) plan, rtoffset); break; + case T_Gather: + set_upper_references(root, plan, rtoffset); + break; + case T_Hash: case T_Material: case T_Sort: case T_Unique: case T_SetOp: - case T_Gather: /* * These plan types don't actually bother to evaluate their diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4fcdcc4067..939bc0ed73 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1964,6 +1964,7 @@ typedef struct GatherState bool initialized; struct ParallelExecutorInfo *pei; struct TupleQueueFunnel *funnel; + TupleTableSlot *funnel_slot; bool need_to_scan_locally; } GatherState;