diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1881236726..69844e5b29 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3497,6 +3497,20 @@ ANY num_sync ( + enable_gathermerge (boolean) + + enable_gathermerge configuration parameter + + + + + Enables or disables the query planner's use of gather + merge plan types. The default is on. + + + + enable_hashagg (boolean) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 6fd82e9d52..c9b55ead3d 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -918,6 +918,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Gather: pname = sname = "Gather"; break; + case T_GatherMerge: + pname = sname = "Gather Merge"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1411,6 +1414,26 @@ ExplainNode(PlanState *planstate, List *ancestors, ExplainPropertyBool("Single Copy", gather->single_copy, es); } break; + case T_GatherMerge: + { + GatherMerge *gm = (GatherMerge *) plan; + + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Workers Planned", + gm->num_workers, es); + if (es->analyze) + { + int nworkers; + + nworkers = ((GatherMergeState *) planstate)->nworkers_launched; + ExplainPropertyInteger("Workers Launched", + nworkers, es); + } + } + break; case T_FunctionScan: if (es->verbose) { diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index a9893c2b22..d281906cd5 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -20,7 +20,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ nodeBitmapHeapscan.o nodeBitmapIndexscan.o \ nodeCustom.o nodeFunctionscan.o nodeGather.o \ nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ - nodeLimit.o nodeLockRows.o \ + nodeLimit.o nodeLockRows.o nodeGatherMerge.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \ nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 468f50e6a6..80c77addb8 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -89,6 +89,7 @@ #include "executor/nodeForeignscan.h" #include "executor/nodeFunctionscan.h" #include "executor/nodeGather.h" +#include "executor/nodeGatherMerge.h" #include "executor/nodeGroup.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" @@ -326,6 +327,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_GatherMerge: + result = (PlanState *) ExecInitGatherMerge((GatherMerge *) node, + estate, eflags); + break; + case T_Hash: result = (PlanState *) ExecInitHash((Hash *) node, estate, eflags); @@ -535,6 +541,10 @@ ExecProcNode(PlanState *node) result = ExecGather((GatherState *) node); break; + case T_GatherMergeState: + result = ExecGatherMerge((GatherMergeState *) node); + break; + case T_HashState: result = ExecHash((HashState *) node); break; @@ -697,6 +707,10 @@ ExecEndNode(PlanState *node) ExecEndGather((GatherState *) node); break; + case T_GatherMergeState: + ExecEndGatherMerge((GatherMergeState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; @@ -842,6 +856,9 @@ ExecShutdownNode(PlanState *node) case T_CustomScanState: ExecShutdownCustomScan((CustomScanState *) node); break; + case T_GatherMergeState: + ExecShutdownGatherMerge((GatherMergeState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c new file mode 100644 index 0000000000..62a6b1866d --- /dev/null +++ b/src/backend/executor/nodeGatherMerge.c @@ -0,0 +1,687 @@ +/*------------------------------------------------------------------------- + * + * nodeGatherMerge.c + * Scan a plan in multiple workers, and do order-preserving merge. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/nodeGatherMerge.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relscan.h" +#include "access/xact.h" +#include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/nodeGatherMerge.h" +#include "executor/nodeSubplan.h" +#include "executor/tqueue.h" +#include "lib/binaryheap.h" +#include "miscadmin.h" +#include "utils/memutils.h" +#include "utils/rel.h" + +/* + * Tuple array for each worker + */ +typedef struct GMReaderTupleBuffer +{ + HeapTuple *tuple; + int readCounter; + int nTuples; + bool done; +} GMReaderTupleBuffer; + +/* + * When we read tuples from workers, it's a good idea to read several at once + * for efficiency when possible: this minimizes context-switching overhead. + * But reading too many at a time wastes memory without improving performance. + */ +#define MAX_TUPLE_STORE 10 + +static int32 heap_compare_slots(Datum a, Datum b, void *arg); +static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); +static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, + bool nowait, bool *done); +static void gather_merge_init(GatherMergeState *gm_state); +static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); +static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, + bool nowait); +static void form_tuple_array(GatherMergeState *gm_state, int reader); + +/* ---------------------------------------------------------------- + * ExecInitGather + * ---------------------------------------------------------------- + */ +GatherMergeState * +ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) +{ + GatherMergeState *gm_state; + Plan *outerNode; + bool hasoid; + TupleDesc tupDesc; + + /* Gather merge node doesn't have innerPlan node. */ + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + gm_state = makeNode(GatherMergeState); + gm_state->ps.plan = (Plan *) node; + gm_state->ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &gm_state->ps); + + /* + * initialize child expressions + */ + gm_state->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) gm_state); + gm_state->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) gm_state); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &gm_state->ps); + + /* + * now initialize outer plan + */ + outerNode = outerPlan(node); + outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags); + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&gm_state->ps); + ExecAssignProjectionInfo(&gm_state->ps, NULL); + + gm_state->gm_initialized = false; + + /* + * initialize sort-key information + */ + if (node->numCols) + { + int i; + + gm_state->gm_nkeys = node->numCols; + gm_state->gm_sortkeys = + palloc0(sizeof(SortSupportData) * node->numCols); + + for (i = 0; i < node->numCols; i++) + { + SortSupport sortKey = gm_state->gm_sortkeys + i; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = node->collations[i]; + sortKey->ssup_nulls_first = node->nullsFirst[i]; + sortKey->ssup_attno = node->sortColIdx[i]; + + /* + * We don't perform abbreviated key conversion here, for the same + * reasons that it isn't used in MergeAppend + */ + sortKey->abbreviate = false; + + PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); + } + } + + /* + * store the tuple descriptor into gather merge state, so we can use it + * later while initializing the gather merge slots. + */ + if (!ExecContextForcesOids(&gm_state->ps, &hasoid)) + hasoid = false; + tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid); + gm_state->tupDesc = tupDesc; + + return gm_state; +} + +/* ---------------------------------------------------------------- + * ExecGatherMerge(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecGatherMerge(GatherMergeState *node) +{ + TupleTableSlot *slot; + ExprContext *econtext; + int i; + + /* + * As with Gather, we don't launch workers until this node is actually + * executed. + */ + if (!node->initialized) + { + EState *estate = node->ps.state; + GatherMerge *gm = (GatherMerge *) node->ps.plan; + + /* + * Sometimes we might have to run without parallelism; but if parallel + * mode is active then we can try to fire up some workers. + */ + if (gm->num_workers > 0 && IsInParallelMode()) + { + ParallelContext *pcxt; + + /* Initialize data structures for workers. */ + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gm->num_workers); + + /* Try to launch workers. */ + pcxt = node->pei->pcxt; + LaunchParallelWorkers(pcxt); + node->nworkers_launched = pcxt->nworkers_launched; + + /* Set up tuple queue readers to read the results. */ + if (pcxt->nworkers_launched > 0) + { + node->nreaders = 0; + node->reader = palloc(pcxt->nworkers_launched * + sizeof(TupleQueueReader *)); + + Assert(gm->numCols); + + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + shm_mq_set_handle(node->pei->tqueue[i], + pcxt->worker[i].bgwhandle); + node->reader[node->nreaders++] = + CreateTupleQueueReader(node->pei->tqueue[i], + node->tupDesc); + } + } + else + { + /* No workers? Then never mind. */ + ExecShutdownGatherMergeWorkers(node); + } + } + + /* always allow leader to participate */ + node->need_to_scan_locally = true; + node->initialized = true; + } + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous tuple cycle. + */ + econtext = node->ps.ps_ExprContext; + ResetExprContext(econtext); + + /* + * Get next tuple, either from one of our workers, or by running the + * plan ourselves. + */ + slot = gather_merge_getnext(node); + if (TupIsNull(slot)) + return NULL; + + /* + * form the result tuple using ExecProject(), and return it --- unless + * the projection produces an empty set, in which case we must loop + * back around for another tuple + */ + econtext->ecxt_outertuple = slot; + return ExecProject(node->ps.ps_ProjInfo); +} + +/* ---------------------------------------------------------------- + * ExecEndGatherMerge + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndGatherMerge(GatherMergeState *node) +{ + ExecEndNode(outerPlanState(node)); /* let children clean up first */ + ExecShutdownGatherMerge(node); + ExecFreeExprContext(&node->ps); + ExecClearTuple(node->ps.ps_ResultTupleSlot); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGatherMerge + * + * Destroy the setup for parallel workers including parallel context. + * Collect all the stats after workers are stopped, else some work + * done by workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGatherMerge(GatherMergeState *node) +{ + ExecShutdownGatherMergeWorkers(node); + + /* Now destroy the parallel context. */ + if (node->pei != NULL) + { + ExecParallelCleanup(node->pei); + node->pei = NULL; + } +} + +/* ---------------------------------------------------------------- + * ExecShutdownGatherMergeWorkers + * + * Destroy the parallel workers. Collect all the stats after + * workers are stopped, else some work done by workers won't be + * accounted. + * ---------------------------------------------------------------- + */ +static void +ExecShutdownGatherMergeWorkers(GatherMergeState *node) +{ + /* Shut down tuple queue readers before shutting down workers. */ + if (node->reader != NULL) + { + int i; + + for (i = 0; i < node->nreaders; ++i) + if (node->reader[i]) + DestroyTupleQueueReader(node->reader[i]); + + pfree(node->reader); + node->reader = NULL; + } + + /* Now shut down the workers. */ + if (node->pei != NULL) + ExecParallelFinish(node->pei); +} + +/* ---------------------------------------------------------------- + * ExecReScanGatherMerge + * + * Re-initialize the workers and rescans a relation via them. + * ---------------------------------------------------------------- + */ +void +ExecReScanGatherMerge(GatherMergeState *node) +{ + /* + * Re-initialize the parallel workers to perform rescan of relation. We + * want to gracefully shutdown all the workers so that they should be able + * to propagate any error or other information to master backend before + * dying. Parallel context will be reused for rescan. + */ + ExecShutdownGatherMergeWorkers(node); + + node->initialized = false; + + if (node->pei) + ExecParallelReinitialize(node->pei); + + ExecReScan(node->ps.lefttree); +} + +/* + * Initialize the Gather merge tuple read. + * + * Pull at least a single tuple from each worker + leader and set up the heap. + */ +static void +gather_merge_init(GatherMergeState *gm_state) +{ + int nreaders = gm_state->nreaders; + bool initialize = true; + int i; + + /* + * Allocate gm_slots for the number of worker + one more slot for leader. + * Last slot is always for leader. Leader always calls ExecProcNode() to + * read the tuple which will return the TupleTableSlot. Later it will + * directly get assigned to gm_slot. So just initialize leader gm_slot + * with NULL. For other slots below code will call + * ExecInitExtraTupleSlot() which will do the initialization of worker + * slots. + */ + gm_state->gm_slots = + palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); + gm_state->gm_slots[gm_state->nreaders] = NULL; + + /* Initialize the tuple slot and tuple array for each worker */ + gm_state->gm_tuple_buffers = + (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * + (gm_state->nreaders + 1)); + for (i = 0; i < gm_state->nreaders; i++) + { + /* Allocate the tuple array with MAX_TUPLE_STORE size */ + gm_state->gm_tuple_buffers[i].tuple = + (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); + + /* Initialize slot for worker */ + gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state); + ExecSetSlotDescriptor(gm_state->gm_slots[i], + gm_state->tupDesc); + } + + /* Allocate the resources for the merge */ + gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1, + heap_compare_slots, + gm_state); + + /* + * First, try to read a tuple from each worker (including leader) in + * nowait mode, so that we initialize read from each worker as well as + * leader. After this, if all active workers are unable to produce a + * tuple, then re-read and this time use wait mode. For workers that were + * able to produce a tuple in the earlier loop and are still active, just + * try to fill the tuple array if more tuples are avaiable. + */ +reread: + for (i = 0; i < nreaders + 1; i++) + { + if (!gm_state->gm_tuple_buffers[i].done && + (TupIsNull(gm_state->gm_slots[i]) || + gm_state->gm_slots[i]->tts_isempty)) + { + if (gather_merge_readnext(gm_state, i, initialize)) + { + binaryheap_add_unordered(gm_state->gm_heap, + Int32GetDatum(i)); + } + } + else + form_tuple_array(gm_state, i); + } + initialize = false; + + for (i = 0; i < nreaders; i++) + if (!gm_state->gm_tuple_buffers[i].done && + (TupIsNull(gm_state->gm_slots[i]) || + gm_state->gm_slots[i]->tts_isempty)) + goto reread; + + binaryheap_build(gm_state->gm_heap); + gm_state->gm_initialized = true; +} + +/* + * Clear out a slot in the tuple table for each gather merge + * slot and return the clear cleared slot. + */ +static TupleTableSlot * +gather_merge_clear_slots(GatherMergeState *gm_state) +{ + int i; + + for (i = 0; i < gm_state->nreaders; i++) + { + pfree(gm_state->gm_tuple_buffers[i].tuple); + gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]); + } + + /* Free tuple array as we don't need it any more */ + pfree(gm_state->gm_tuple_buffers); + /* Free the binaryheap, which was created for sort */ + binaryheap_free(gm_state->gm_heap); + + /* return any clear slot */ + return gm_state->gm_slots[0]; +} + +/* + * Read the next tuple for gather merge. + * + * Fetch the sorted tuple out of the heap. + */ +static TupleTableSlot * +gather_merge_getnext(GatherMergeState *gm_state) +{ + int i; + + /* + * First time through: pull the first tuple from each participate, and set + * up the heap. + */ + if (gm_state->gm_initialized == false) + gather_merge_init(gm_state); + else + { + /* + * Otherwise, pull the next tuple from whichever participant we + * returned from last time, and reinsert the index into the heap, + * because it might now compare differently against the existing + * elements of the heap. + */ + i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + + if (gather_merge_readnext(gm_state, i, false)) + binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); + else + (void) binaryheap_remove_first(gm_state->gm_heap); + } + + if (binaryheap_empty(gm_state->gm_heap)) + { + /* All the queues are exhausted, and so is the heap */ + return gather_merge_clear_slots(gm_state); + } + else + { + i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + return gm_state->gm_slots[i]; + } + + return gather_merge_clear_slots(gm_state); +} + +/* + * Read the tuple for given reader in nowait mode, and form the tuple array. + */ +static void +form_tuple_array(GatherMergeState *gm_state, int reader) +{ + GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + int i; + + /* Last slot is for leader and we don't build tuple array for leader */ + if (reader == gm_state->nreaders) + return; + + /* + * We here because we already read all the tuples from the tuple array, so + * initialize the counter to zero. + */ + if (tuple_buffer->nTuples == tuple_buffer->readCounter) + tuple_buffer->nTuples = tuple_buffer->readCounter = 0; + + /* Tuple array is already full? */ + if (tuple_buffer->nTuples == MAX_TUPLE_STORE) + return; + + for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) + { + tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state, + reader, + false, + &tuple_buffer->done)); + if (!HeapTupleIsValid(tuple_buffer->tuple[i])) + break; + tuple_buffer->nTuples++; + } +} + +/* + * Store the next tuple for a given reader into the appropriate slot. + * + * Returns false if the reader is exhausted, and true otherwise. + */ +static bool +gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) +{ + GMReaderTupleBuffer *tuple_buffer; + HeapTuple tup = NULL; + + /* + * If we're being asked to generate a tuple from the leader, then we + * just call ExecProcNode as normal to produce one. + */ + if (gm_state->nreaders == reader) + { + if (gm_state->need_to_scan_locally) + { + PlanState *outerPlan = outerPlanState(gm_state); + TupleTableSlot *outerTupleSlot; + + outerTupleSlot = ExecProcNode(outerPlan); + + if (!TupIsNull(outerTupleSlot)) + { + gm_state->gm_slots[reader] = outerTupleSlot; + return true; + } + gm_state->gm_tuple_buffers[reader].done = true; + gm_state->need_to_scan_locally = false; + } + return false; + } + + /* Otherwise, check the state of the relevant tuple buffer. */ + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + + if (tuple_buffer->nTuples > tuple_buffer->readCounter) + { + /* Return any tuple previously read that is still buffered. */ + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; + } + else if (tuple_buffer->done) + { + /* Reader is known to be exhausted. */ + DestroyTupleQueueReader(gm_state->reader[reader]); + gm_state->reader[reader] = NULL; + return false; + } + else + { + /* Read and buffer next tuple. */ + tup = heap_copytuple(gm_readnext_tuple(gm_state, + reader, + nowait, + &tuple_buffer->done)); + + /* + * Attempt to read more tuples in nowait mode and store them in + * the tuple array. + */ + if (HeapTupleIsValid(tup)) + form_tuple_array(gm_state, reader); + else + return false; + } + + Assert(HeapTupleIsValid(tup)); + + /* Build the TupleTableSlot for the given tuple */ + ExecStoreTuple(tup, /* tuple to store */ + gm_state->gm_slots[reader], /* slot in which to store the + * tuple */ + InvalidBuffer, /* buffer associated with this tuple */ + true); /* pfree this pointer if not from heap */ + + return true; +} + +/* + * Attempt to read a tuple from given reader. + */ +static HeapTuple +gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, + bool *done) +{ + TupleQueueReader *reader; + HeapTuple tup = NULL; + MemoryContext oldContext; + MemoryContext tupleContext; + + tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; + + if (done != NULL) + *done = false; + + /* Check for async events, particularly messages from workers. */ + CHECK_FOR_INTERRUPTS(); + + /* Attempt to read a tuple. */ + reader = gm_state->reader[nreader]; + + /* Run TupleQueueReaders in per-tuple context */ + oldContext = MemoryContextSwitchTo(tupleContext); + tup = TupleQueueReaderNext(reader, nowait, done); + MemoryContextSwitchTo(oldContext); + + return tup; +} + +/* + * We have one slot for each item in the heap array. We use SlotNumber + * to store slot indexes. This doesn't actually provide any formal + * type-safety, but it makes the code more self-documenting. + */ +typedef int32 SlotNumber; + +/* + * Compare the tuples in the two given slots. + */ +static int32 +heap_compare_slots(Datum a, Datum b, void *arg) +{ + GatherMergeState *node = (GatherMergeState *) arg; + SlotNumber slot1 = DatumGetInt32(a); + SlotNumber slot2 = DatumGetInt32(b); + + TupleTableSlot *s1 = node->gm_slots[slot1]; + TupleTableSlot *s2 = node->gm_slots[slot2]; + int nkey; + + Assert(!TupIsNull(s1)); + Assert(!TupIsNull(s2)); + + for (nkey = 0; nkey < node->gm_nkeys; nkey++) + { + SortSupport sortKey = node->gm_sortkeys + nkey; + AttrNumber attno = sortKey->ssup_attno; + Datum datum1, + datum2; + bool isNull1, + isNull2; + int compare; + + datum1 = slot_getattr(s1, attno, &isNull1); + datum2 = slot_getattr(s2, attno, &isNull2); + + compare = ApplySortComparator(datum1, isNull1, + datum2, isNull2, + sortKey); + if (compare != 0) + return -compare; + } + return 0; +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index ac8e50ef1d..bfc2ac1716 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -360,6 +360,31 @@ _copyGather(const Gather *from) return newnode; } +/* + * _copyGatherMerge + */ +static GatherMerge * +_copyGatherMerge(const GatherMerge *from) +{ + GatherMerge *newnode = makeNode(GatherMerge); + + /* + * copy node superclass fields + */ + CopyPlanFields((const Plan *) from, (Plan *) newnode); + + /* + * copy remainder of node + */ + COPY_SCALAR_FIELD(num_workers); + COPY_SCALAR_FIELD(numCols); + COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber)); + COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); + COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid)); + COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool)); + + return newnode; +} /* * CopyScanFields @@ -4594,6 +4619,9 @@ copyObject(const void *from) case T_Gather: retval = _copyGather(from); break; + case T_GatherMerge: + retval = _copyGatherMerge(from); + break; case T_SeqScan: retval = _copySeqScan(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 825a7b283a..7418fbeded 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -457,6 +457,35 @@ _outGather(StringInfo str, const Gather *node) WRITE_BOOL_FIELD(invisible); } +static void +_outGatherMerge(StringInfo str, const GatherMerge *node) +{ + int i; + + WRITE_NODE_TYPE("GATHERMERGE"); + + _outPlanInfo(str, (const Plan *) node); + + WRITE_INT_FIELD(num_workers); + WRITE_INT_FIELD(numCols); + + appendStringInfoString(str, " :sortColIdx"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %d", node->sortColIdx[i]); + + appendStringInfoString(str, " :sortOperators"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %u", node->sortOperators[i]); + + appendStringInfoString(str, " :collations"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %u", node->collations[i]); + + appendStringInfoString(str, " :nullsFirst"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %s", booltostr(node->nullsFirst[i])); +} + static void _outScan(StringInfo str, const Scan *node) { @@ -2016,6 +2045,17 @@ _outLimitPath(StringInfo str, const LimitPath *node) WRITE_NODE_FIELD(limitCount); } +static void +_outGatherMergePath(StringInfo str, const GatherMergePath *node) +{ + WRITE_NODE_TYPE("GATHERMERGEPATH"); + + _outPathInfo(str, (const Path *) node); + + WRITE_NODE_FIELD(subpath); + WRITE_INT_FIELD(num_workers); +} + static void _outNestPath(StringInfo str, const NestPath *node) { @@ -3473,6 +3513,9 @@ outNode(StringInfo str, const void *obj) case T_Gather: _outGather(str, obj); break; + case T_GatherMerge: + _outGatherMerge(str, obj); + break; case T_Scan: _outScan(str, obj); break; @@ -3809,6 +3852,9 @@ outNode(StringInfo str, const void *obj) case T_LimitPath: _outLimitPath(str, obj); break; + case T_GatherMergePath: + _outGatherMergePath(str, obj); + break; case T_NestPath: _outNestPath(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 8f39d93a12..d3bbc02f24 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2137,6 +2137,26 @@ _readGather(void) READ_DONE(); } +/* + * _readGatherMerge + */ +static GatherMerge * +_readGatherMerge(void) +{ + READ_LOCALS(GatherMerge); + + ReadCommonPlan(&local_node->plan); + + READ_INT_FIELD(num_workers); + READ_INT_FIELD(numCols); + READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols); + READ_OID_ARRAY(sortOperators, local_node->numCols); + READ_OID_ARRAY(collations, local_node->numCols); + READ_BOOL_ARRAY(nullsFirst, local_node->numCols); + + READ_DONE(); +} + /* * _readHash */ @@ -2577,6 +2597,8 @@ parseNodeString(void) return_value = _readUnique(); else if (MATCH("GATHER", 6)) return_value = _readGather(); + else if (MATCH("GATHERMERGE", 11)) + return_value = _readGatherMerge(); else if (MATCH("HASH", 4)) return_value = _readHash(); else if (MATCH("SETOP", 5)) diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index fbb2cda9d7..b263359fde 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -2084,39 +2084,51 @@ set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* * generate_gather_paths - * Generate parallel access paths for a relation by pushing a Gather on - * top of a partial path. + * Generate parallel access paths for a relation by pushing a Gather or + * Gather Merge on top of a partial path. * * This must not be called until after we're done creating all partial paths * for the specified relation. (Otherwise, add_partial_path might delete a - * path that some GatherPath has a reference to.) + * path that some GatherPath or GatherMergePath has a reference to.) */ void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel) { Path *cheapest_partial_path; Path *simple_gather_path; + ListCell *lc; /* If there are no partial paths, there's nothing to do here. */ if (rel->partial_pathlist == NIL) return; /* - * The output of Gather is currently always unsorted, so there's only one - * partial path of interest: the cheapest one. That will be the one at - * the front of partial_pathlist because of the way add_partial_path - * works. - * - * Eventually, we should have a Gather Merge operation that can merge - * multiple tuple streams together while preserving their ordering. We - * could usefully generate such a path from each partial path that has - * non-NIL pathkeys. + * The output of Gather is always unsorted, so there's only one partial + * path of interest: the cheapest one. That will be the one at the front + * of partial_pathlist because of the way add_partial_path works. */ cheapest_partial_path = linitial(rel->partial_pathlist); simple_gather_path = (Path *) create_gather_path(root, rel, cheapest_partial_path, rel->reltarget, NULL, NULL); add_path(rel, simple_gather_path); + + /* + * For each useful ordering, we can consider an order-preserving Gather + * Merge. + */ + foreach (lc, rel->partial_pathlist) + { + Path *subpath = (Path *) lfirst(lc); + GatherMergePath *path; + + if (subpath->pathkeys == NIL) + continue; + + path = create_gather_merge_path(root, rel, subpath, rel->reltarget, + subpath->pathkeys, NULL, NULL); + add_path(rel, &path->path); + } } /* diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 627e3f1b95..e78f3a84c5 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -126,6 +126,7 @@ bool enable_nestloop = true; bool enable_material = true; bool enable_mergejoin = true; bool enable_hashjoin = true; +bool enable_gathermerge = true; typedef struct { @@ -372,6 +373,73 @@ cost_gather(GatherPath *path, PlannerInfo *root, path->path.total_cost = (startup_cost + run_cost); } +/* + * cost_gather_merge + * Determines and returns the cost of gather merge path. + * + * GatherMerge merges several pre-sorted input streams, using a heap that at + * any given instant holds the next tuple from each stream. If there are N + * streams, we need about N*log2(N) tuple comparisons to construct the heap at + * startup, and then for each output tuple, about log2(N) comparisons to + * replace the top heap entry with the next tuple from the same stream. + */ +void +cost_gather_merge(GatherMergePath *path, PlannerInfo *root, + RelOptInfo *rel, ParamPathInfo *param_info, + Cost input_startup_cost, Cost input_total_cost, + double *rows) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + Cost comparison_cost; + double N; + double logN; + + /* Mark the path with the correct row estimate */ + if (rows) + path->path.rows = *rows; + else if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = rel->rows; + + if (!enable_gathermerge) + startup_cost += disable_cost; + + /* + * Add one to the number of workers to account for the leader. This might + * be overgenerous since the leader will do less work than other workers + * in typical cases, but we'll go with it for now. + */ + Assert(path->num_workers > 0); + N = (double) path->num_workers + 1; + logN = LOG2(N); + + /* Assumed cost per tuple comparison */ + comparison_cost = 2.0 * cpu_operator_cost; + + /* Heap creation cost */ + startup_cost += comparison_cost * N * logN; + + /* Per-tuple heap maintenance cost */ + run_cost += path->path.rows * comparison_cost * logN; + + /* small cost for heap management, like cost_merge_append */ + run_cost += cpu_operator_cost * path->path.rows; + + /* + * Parallel setup and communication cost. Since Gather Merge, unlike + * Gather, requires us to block until a tuple is available from every + * worker, we bump the IPC cost up a little bit as compared with Gather. + * For lack of a better idea, charge an extra 5%. + */ + startup_cost += parallel_setup_cost; + run_cost += parallel_tuple_cost * path->path.rows * 1.05; + + path->path.startup_cost = startup_cost + input_startup_cost; + path->path.total_cost = (startup_cost + run_cost + input_total_cost); +} + /* * cost_index * Determines and returns the cost of scanning a relation using an index. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 8f8663c1e1..e18c634a7b 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -277,6 +277,8 @@ static ModifyTable *make_modifytable(PlannerInfo *root, List *resultRelations, List *subplans, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, int epqParam); +static GatherMerge *create_gather_merge_plan(PlannerInfo *root, + GatherMergePath *best_path); /* @@ -475,6 +477,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) (LimitPath *) best_path, flags); break; + case T_GatherMerge: + plan = (Plan *) create_gather_merge_plan(root, + (GatherMergePath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->pathtype); @@ -1451,6 +1457,86 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) return gather_plan; } +/* + * create_gather_merge_plan + * + * Create a Gather Merge plan for 'best_path' and (recursively) + * plans for its subpaths. + */ +static GatherMerge * +create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path) +{ + GatherMerge *gm_plan; + Plan *subplan; + List *pathkeys = best_path->path.pathkeys; + int numsortkeys; + AttrNumber *sortColIdx; + Oid *sortOperators; + Oid *collations; + bool *nullsFirst; + + /* As with Gather, it's best to project away columns in the workers. */ + subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST); + + /* See create_merge_append_plan for why there's no make_xxx function */ + gm_plan = makeNode(GatherMerge); + gm_plan->plan.targetlist = subplan->targetlist; + gm_plan->num_workers = best_path->num_workers; + copy_generic_path_info(&gm_plan->plan, &best_path->path); + + /* Gather Merge is pointless with no pathkeys; use Gather instead. */ + Assert(pathkeys != NIL); + + /* Compute sort column info, and adjust GatherMerge tlist as needed */ + (void) prepare_sort_from_pathkeys(&gm_plan->plan, pathkeys, + best_path->path.parent->relids, + NULL, + true, + &gm_plan->numCols, + &gm_plan->sortColIdx, + &gm_plan->sortOperators, + &gm_plan->collations, + &gm_plan->nullsFirst); + + + /* Compute sort column info, and adjust subplan's tlist as needed */ + subplan = prepare_sort_from_pathkeys(subplan, pathkeys, + best_path->subpath->parent->relids, + gm_plan->sortColIdx, + false, + &numsortkeys, + &sortColIdx, + &sortOperators, + &collations, + &nullsFirst); + + /* As for MergeAppend, check that we got the same sort key information. */ + Assert(numsortkeys == gm_plan->numCols); + if (memcmp(sortColIdx, gm_plan->sortColIdx, + numsortkeys * sizeof(AttrNumber)) != 0) + elog(ERROR, "GatherMerge child's targetlist doesn't match GatherMerge"); + Assert(memcmp(sortOperators, gm_plan->sortOperators, + numsortkeys * sizeof(Oid)) == 0); + Assert(memcmp(collations, gm_plan->collations, + numsortkeys * sizeof(Oid)) == 0); + Assert(memcmp(nullsFirst, gm_plan->nullsFirst, + numsortkeys * sizeof(bool)) == 0); + + /* Now, insert a Sort node if subplan isn't sufficiently ordered */ + if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys)) + subplan = (Plan *) make_sort(subplan, numsortkeys, + sortColIdx, sortOperators, + collations, nullsFirst); + + /* Now insert the subplan under GatherMerge. */ + gm_plan->plan.lefttree = subplan; + + /* use parallel mode for parallel plans. */ + root->glob->parallelModeNeeded = true; + + return gm_plan; +} + /* * create_projection_plan * diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1636a69dba..209f769632 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3663,8 +3663,7 @@ create_grouping_paths(PlannerInfo *root, /* * Now generate a complete GroupAgg Path atop of the cheapest partial - * path. We need only bother with the cheapest path here, as the - * output of Gather is never sorted. + * path. We can do this using either Gather or Gather Merge. */ if (grouped_rel->partial_pathlist) { @@ -3711,6 +3710,70 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, dNumGroups)); + + /* + * The point of using Gather Merge rather than Gather is that it + * can preserve the ordering of the input path, so there's no + * reason to try it unless (1) it's possible to produce more than + * one output row and (2) we want the output path to be ordered. + */ + if (parse->groupClause != NIL && root->group_pathkeys != NIL) + { + foreach(lc, grouped_rel->partial_pathlist) + { + Path *subpath = (Path *) lfirst(lc); + Path *gmpath; + double total_groups; + + /* + * It's useful to consider paths that are already properly + * ordered for Gather Merge, because those don't need a + * sort. It's also useful to consider the cheapest path, + * because sorting it in parallel and then doing Gather + * Merge may be better than doing an unordered Gather + * followed by a sort. But there's no point in + * considering non-cheapest paths that aren't already + * sorted correctly. + */ + if (path != subpath && + !pathkeys_contained_in(root->group_pathkeys, + subpath->pathkeys)) + continue; + + total_groups = subpath->rows * subpath->parallel_workers; + + gmpath = (Path *) + create_gather_merge_path(root, + grouped_rel, + subpath, + NULL, + root->group_pathkeys, + NULL, + &total_groups); + + if (parse->hasAggs) + add_path(grouped_rel, (Path *) + create_agg_path(root, + grouped_rel, + gmpath, + target, + parse->groupClause ? AGG_SORTED : AGG_PLAIN, + AGGSPLIT_FINAL_DESERIAL, + parse->groupClause, + (List *) parse->havingQual, + &agg_final_costs, + dNumGroups)); + else + add_path(grouped_rel, (Path *) + create_group_path(root, + grouped_rel, + gmpath, + target, + parse->groupClause, + (List *) parse->havingQual, + dNumGroups)); + } + } } } @@ -3808,6 +3871,16 @@ create_grouping_paths(PlannerInfo *root, /* Now choose the best path(s) */ set_cheapest(grouped_rel); + /* + * We've been using the partial pathlist for the grouped relation to hold + * partially aggregated paths, but that's actually a little bit bogus + * because it's unsafe for later planning stages -- like ordered_rel --- + * to get the idea that they can use these partial paths as if they didn't + * need a FinalizeAggregate step. Zap the partial pathlist at this stage + * so we don't get confused. + */ + grouped_rel->partial_pathlist = NIL; + return grouped_rel; } @@ -4275,6 +4348,56 @@ create_ordered_paths(PlannerInfo *root, } } + /* + * generate_gather_paths() will have already generated a simple Gather + * path for the best parallel path, if any, and the loop above will have + * considered sorting it. Similarly, generate_gather_paths() will also + * have generated order-preserving Gather Merge plans which can be used + * without sorting if they happen to match the sort_pathkeys, and the loop + * above will have handled those as well. However, there's one more + * possibility: it may make sense to sort the cheapest partial path + * according to the required output order and then use Gather Merge. + */ + if (ordered_rel->consider_parallel && root->sort_pathkeys != NIL && + input_rel->partial_pathlist != NIL) + { + Path *cheapest_partial_path; + + cheapest_partial_path = linitial(input_rel->partial_pathlist); + + /* + * If cheapest partial path doesn't need a sort, this is redundant + * with what's already been tried. + */ + if (!pathkeys_contained_in(root->sort_pathkeys, + cheapest_partial_path->pathkeys)) + { + Path *path; + double total_groups; + + path = (Path *) create_sort_path(root, + ordered_rel, + cheapest_partial_path, + root->sort_pathkeys, + limit_tuples); + + total_groups = cheapest_partial_path->rows * + cheapest_partial_path->parallel_workers; + path = (Path *) + create_gather_merge_path(root, ordered_rel, + path, + target, root->sort_pathkeys, NULL, + &total_groups); + + /* Add projection step if needed */ + if (path->pathtarget != target) + path = apply_projection_to_path(root, ordered_rel, + path, target); + + add_path(ordered_rel, path); + } + } + /* * If there is an FDW that's responsible for all baserels of the query, * let it consider adding ForeignPaths. diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 3d2c12433d..5f3027e96f 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -616,6 +616,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) break; case T_Gather: + case T_GatherMerge: set_upper_references(root, plan, rtoffset); break; diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index da9a84be3d..6fa6540662 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2700,6 +2700,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, case T_Sort: case T_Unique: case T_Gather: + case T_GatherMerge: case T_SetOp: case T_Group: /* no node-type-specific fields need fixing */ diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 0d925c6fcb..8ce772d274 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1627,6 +1627,66 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, return pathnode; } +/* + * create_gather_merge_path + * + * Creates a path corresponding to a gather merge scan, returning + * the pathnode. + */ +GatherMergePath * +create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, + PathTarget *target, List *pathkeys, + Relids required_outer, double *rows) +{ + GatherMergePath *pathnode = makeNode(GatherMergePath); + Cost input_startup_cost = 0; + Cost input_total_cost = 0; + + Assert(subpath->parallel_safe); + Assert(pathkeys); + + pathnode->path.pathtype = T_GatherMerge; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + required_outer); + pathnode->path.parallel_aware = false; + + pathnode->subpath = subpath; + pathnode->num_workers = subpath->parallel_workers; + pathnode->path.pathkeys = pathkeys; + pathnode->path.pathtarget = target ? target : rel->reltarget; + pathnode->path.rows += subpath->rows; + + if (pathkeys_contained_in(pathkeys, subpath->pathkeys)) + { + /* Subpath is adequately ordered, we won't need to sort it */ + input_startup_cost += subpath->startup_cost; + input_total_cost += subpath->total_cost; + } + else + { + /* We'll need to insert a Sort node, so include cost for that */ + Path sort_path; /* dummy for result of cost_sort */ + + cost_sort(&sort_path, + root, + pathkeys, + subpath->total_cost, + subpath->rows, + subpath->pathtarget->width, + 0.0, + work_mem, + -1); + input_startup_cost += sort_path.startup_cost; + input_total_cost += sort_path.total_cost; + } + + cost_gather_merge(pathnode, root, rel, pathnode->path.param_info, + input_startup_cost, input_total_cost, rows); + + return pathnode; +} + /* * translate_sub_tlist - get subquery column numbers represented by tlist * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f8b073d8a9..811ea5153b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -902,6 +902,15 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_gathermerge", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of gather merge plans."), + NULL + }, + &enable_gathermerge, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/include/executor/nodeGatherMerge.h b/src/include/executor/nodeGatherMerge.h new file mode 100644 index 0000000000..3c8b42b6e5 --- /dev/null +++ b/src/include/executor/nodeGatherMerge.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * nodeGatherMerge.h + * prototypes for nodeGatherMerge.c + * + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeGatherMerge.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEGATHERMERGE_H +#define NODEGATHERMERGE_H + +#include "nodes/execnodes.h" + +extern GatherMergeState *ExecInitGatherMerge(GatherMerge * node, + EState *estate, + int eflags); +extern TupleTableSlot *ExecGatherMerge(GatherMergeState * node); +extern void ExecEndGatherMerge(GatherMergeState * node); +extern void ExecReScanGatherMerge(GatherMergeState * node); +extern void ExecShutdownGatherMerge(GatherMergeState * node); + +#endif /* NODEGATHERMERGE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6a0d590ef2..f856f6036f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2094,6 +2094,35 @@ typedef struct GatherState bool need_to_scan_locally; } GatherState; +/* ---------------- + * GatherMergeState information + * + * Gather merge nodes launch 1 or more parallel workers, run a + * subplan which produces sorted output in each worker, and then + * merge the results into a single sorted stream. + * ---------------- + */ +struct GMReaderTuple; + +typedef struct GatherMergeState +{ + PlanState ps; /* its first field is NodeTag */ + bool initialized; + struct ParallelExecutorInfo *pei; + int nreaders; + int nworkers_launched; + struct TupleQueueReader **reader; + TupleDesc tupDesc; + TupleTableSlot **gm_slots; + struct binaryheap *gm_heap; /* binary heap of slot indices */ + bool gm_initialized; /* gather merge initilized ? */ + bool need_to_scan_locally; + int gm_nkeys; + SortSupport gm_sortkeys; /* array of length ms_nkeys */ + struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per + * reader */ +} GatherMergeState; + /* ---------------- * HashState information * ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 49fa944755..2bc7a5df11 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -77,6 +77,7 @@ typedef enum NodeTag T_WindowAgg, T_Unique, T_Gather, + T_GatherMerge, T_Hash, T_SetOp, T_LockRows, @@ -127,6 +128,7 @@ typedef enum NodeTag T_WindowAggState, T_UniqueState, T_GatherState, + T_GatherMergeState, T_HashState, T_SetOpState, T_LockRowsState, @@ -249,6 +251,7 @@ typedef enum NodeTag T_MaterialPath, T_UniquePath, T_GatherPath, + T_GatherMergePath, T_ProjectionPath, T_ProjectSetPath, T_SortPath, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 7fbb0c2c77..b880dc16cf 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -797,6 +797,22 @@ typedef struct Gather bool invisible; /* suppress EXPLAIN display (for testing)? */ } Gather; +/* ------------ + * gather merge node + * ------------ + */ +typedef struct GatherMerge +{ + Plan plan; + int num_workers; + /* remaining fields are just like the sort-key info in struct Sort */ + int numCols; /* number of sort-key columns */ + AttrNumber *sortColIdx; /* their indexes in the target list */ + Oid *sortOperators; /* OIDs of operators to sort them by */ + Oid *collations; /* OIDs of collations */ + bool *nullsFirst; /* NULLS FIRST/LAST directions */ +} GatherMerge; + /* ---------------- * hash build node * diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index f7ac6f600f..05d6f07aea 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1203,6 +1203,19 @@ typedef struct GatherPath bool single_copy; /* don't execute path more than once */ } GatherPath; +/* + * GatherMergePath runs several copies of a plan in parallel and + * collects the results. For gather merge parallel leader always execute the + * plan. + */ +typedef struct GatherMergePath +{ + Path path; + Path *subpath; /* path for each worker */ + int num_workers; /* number of workers sought to help */ +} GatherMergePath; + + /* * All join-type paths share these fields. */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 2b386835e3..d9a9b12a06 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -66,6 +66,7 @@ extern bool enable_nestloop; extern bool enable_material; extern bool enable_mergejoin; extern bool enable_hashjoin; +extern bool enable_gathermerge; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -205,5 +206,9 @@ extern Selectivity clause_selectivity(PlannerInfo *root, int varRelid, JoinType jointype, SpecialJoinInfo *sjinfo); +extern void cost_gather_merge(GatherMergePath *path, PlannerInfo *root, + RelOptInfo *rel, ParamPathInfo *param_info, + Cost input_startup_cost, Cost input_total_cost, + double *rows); #endif /* COST_H */ diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index f0fe830722..373c7221a8 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -78,6 +78,13 @@ extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel, extern GatherPath *create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, PathTarget *target, Relids required_outer, double *rows); +extern GatherMergePath *create_gather_merge_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + PathTarget *target, + List *pathkeys, + Relids required_outer, + double *rows); extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, List *pathkeys, Relids required_outer); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 290b735b6b..038a62efd7 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -213,6 +213,33 @@ select count(*) from tenk1, tenk2 where tenk1.unique1 = tenk2.unique1; reset enable_hashjoin; reset enable_nestloop; +--test gather merge +set enable_hashagg to off; +explain (costs off) + select string4, count((unique2)) from tenk1 group by string4 order by string4; + QUERY PLAN +---------------------------------------------------- + Finalize GroupAggregate + Group Key: string4 + -> Gather Merge + Workers Planned: 4 + -> Partial GroupAggregate + Group Key: string4 + -> Sort + Sort Key: string4 + -> Parallel Seq Scan on tenk1 +(9 rows) + +select string4, count((unique2)) from tenk1 group by string4 order by string4; + string4 | count +---------+------- + AAAAxx | 2500 + HHHHxx | 2500 + OOOOxx | 2500 + VVVVxx | 2500 +(4 rows) + +reset enable_hashagg; set force_parallel_mode=1; explain (costs off) select stringu1::int2 from tenk1 where unique1 = 1; diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index d48abd7e09..568b783f5e 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -73,6 +73,7 @@ select name, setting from pg_settings where name like 'enable%'; name | setting ----------------------+--------- enable_bitmapscan | on + enable_gathermerge | on enable_hashagg | on enable_hashjoin | on enable_indexonlyscan | on @@ -83,7 +84,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(11 rows) +(12 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 80412b990d..9311a775af 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -84,6 +84,17 @@ select count(*) from tenk1, tenk2 where tenk1.unique1 = tenk2.unique1; reset enable_hashjoin; reset enable_nestloop; + +--test gather merge +set enable_hashagg to off; + +explain (costs off) + select string4, count((unique2)) from tenk1 group by string4 order by string4; + +select string4, count((unique2)) from tenk1 group by string4 order by string4; + +reset enable_hashagg; + set force_parallel_mode=1; explain (costs off) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3155ec6d5b..296552e394 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -779,6 +779,9 @@ GV Gather GatherPath GatherState +GatherMerge +GatherMergePath +GatherMergeState Gene GenericCosts GenericExprState