Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* nodeGather.c
|
|
|
|
* Support routines for scanning a plan via multiple workers.
|
|
|
|
*
|
|
|
|
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
|
|
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
|
|
*
|
2015-10-22 16:37:24 +02:00
|
|
|
* A Gather executor launches parallel workers to run multiple copies of a
|
|
|
|
* plan. It can also run the plan itself, if the workers are not available
|
|
|
|
* or have not started up yet. It then merges all of the results it produces
|
|
|
|
* and the results from the workers into a single output stream. Therefore,
|
|
|
|
* it will normally be used with a plan where running multiple copies of the
|
2015-11-30 18:54:11 +01:00
|
|
|
* same plan does not produce duplicate output, such as parallel-aware
|
|
|
|
* SeqScan.
|
2015-10-22 16:37:24 +02:00
|
|
|
*
|
|
|
|
* Alternatively, a Gather node can be configured to use just one worker
|
|
|
|
* and the single-copy flag can be set. In this case, the Gather node will
|
|
|
|
* run the plan in one worker and will not execute the plan itself. In
|
|
|
|
* this case, it simply returns whatever tuples were returned by the worker.
|
|
|
|
* If a worker cannot be obtained, then it will run the plan itself and
|
|
|
|
* return the results. Therefore, a plan used with a single-copy Gather
|
|
|
|
* node need not be parallel-aware.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* src/backend/executor/nodeGather.c
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include "access/relscan.h"
|
2015-10-16 17:56:02 +02:00
|
|
|
#include "access/xact.h"
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
#include "executor/execdebug.h"
|
|
|
|
#include "executor/execParallel.h"
|
|
|
|
#include "executor/nodeGather.h"
|
|
|
|
#include "executor/nodeSubplan.h"
|
|
|
|
#include "executor/tqueue.h"
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
#include "miscadmin.h"
|
2015-10-28 00:27:58 +01:00
|
|
|
#include "utils/memutils.h"
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
#include "utils/rel.h"
|
|
|
|
|
|
|
|
|
|
|
|
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
static HeapTuple gather_readnext(GatherState *gatherstate);
|
2015-10-30 10:43:00 +01:00
|
|
|
static void ExecShutdownGatherWorkers(GatherState *node);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecInitGather
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
GatherState *
|
|
|
|
ExecInitGather(Gather *node, EState *estate, int eflags)
|
|
|
|
{
|
|
|
|
GatherState *gatherstate;
|
2015-10-28 00:27:58 +01:00
|
|
|
Plan *outerNode;
|
|
|
|
bool hasoid;
|
|
|
|
TupleDesc tupDesc;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
/* Gather node doesn't have innerPlan node. */
|
|
|
|
Assert(innerPlan(node) == NULL);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* create state structure
|
|
|
|
*/
|
|
|
|
gatherstate = makeNode(GatherState);
|
|
|
|
gatherstate->ps.plan = (Plan *) node;
|
|
|
|
gatherstate->ps.state = estate;
|
|
|
|
gatherstate->need_to_scan_locally = !node->single_copy;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Miscellaneous initialization
|
|
|
|
*
|
|
|
|
* create expression context for node
|
|
|
|
*/
|
|
|
|
ExecAssignExprContext(estate, &gatherstate->ps);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* initialize child expressions
|
|
|
|
*/
|
|
|
|
gatherstate->ps.targetlist = (List *)
|
|
|
|
ExecInitExpr((Expr *) node->plan.targetlist,
|
|
|
|
(PlanState *) gatherstate);
|
|
|
|
gatherstate->ps.qual = (List *)
|
|
|
|
ExecInitExpr((Expr *) node->plan.qual,
|
|
|
|
(PlanState *) gatherstate);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* tuple table initialization
|
|
|
|
*/
|
2015-10-28 00:27:58 +01:00
|
|
|
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
ExecInitResultTupleSlot(estate, &gatherstate->ps);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* now initialize outer plan
|
|
|
|
*/
|
2015-10-28 00:27:58 +01:00
|
|
|
outerNode = outerPlan(node);
|
|
|
|
outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
gatherstate->ps.ps_TupFromTlist = false;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Initialize result tuple type and projection info.
|
|
|
|
*/
|
|
|
|
ExecAssignResultTypeFromTL(&gatherstate->ps);
|
|
|
|
ExecAssignProjectionInfo(&gatherstate->ps, NULL);
|
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
/*
|
|
|
|
* Initialize funnel slot to same tuple descriptor as outer plan.
|
|
|
|
*/
|
|
|
|
if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
|
|
|
|
hasoid = false;
|
|
|
|
tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
|
|
|
|
ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
|
|
|
|
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
return gatherstate;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecGather(node)
|
|
|
|
*
|
|
|
|
* Scans the relation via multiple workers and returns
|
|
|
|
* the next qualifying tuple.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
TupleTableSlot *
|
|
|
|
ExecGather(GatherState *node)
|
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
TupleTableSlot *fslot = node->funnel_slot;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
int i;
|
|
|
|
TupleTableSlot *slot;
|
2015-10-28 00:27:58 +01:00
|
|
|
TupleTableSlot *resultSlot;
|
|
|
|
ExprDoneCond isDone;
|
|
|
|
ExprContext *econtext;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Initialize the parallel context and workers on first execution. We do
|
|
|
|
* this on first execution rather than during node initialization, as it
|
|
|
|
* needs to allocate large dynamic segement, so it is better to do if it
|
|
|
|
* is really needed.
|
|
|
|
*/
|
2015-10-16 17:56:02 +02:00
|
|
|
if (!node->initialized)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
|
|
|
EState *estate = node->ps.state;
|
2015-10-16 17:56:02 +02:00
|
|
|
Gather *gather = (Gather *) node->ps.plan;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
/*
|
2015-10-16 17:56:02 +02:00
|
|
|
* Sometimes we might have to run without parallelism; but if
|
|
|
|
* parallel mode is active then we can try to fire up some workers.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*/
|
2015-10-16 17:56:02 +02:00
|
|
|
if (gather->num_workers > 0 && IsInParallelMode())
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
ParallelContext *pcxt;
|
2015-10-16 17:56:02 +02:00
|
|
|
bool got_any_worker = false;
|
|
|
|
|
|
|
|
/* Initialize the workers required to execute Gather node. */
|
2015-10-30 10:43:00 +01:00
|
|
|
if (!node->pei)
|
|
|
|
node->pei = ExecInitParallelPlan(node->ps.lefttree,
|
|
|
|
estate,
|
|
|
|
gather->num_workers);
|
2015-10-16 17:56:02 +02:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Register backend workers. We might not get as many as we
|
|
|
|
* requested, or indeed any at all.
|
|
|
|
*/
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
pcxt = node->pei->pcxt;
|
|
|
|
LaunchParallelWorkers(pcxt);
|
2015-10-16 17:56:02 +02:00
|
|
|
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
/* Set up tuple queue readers to read the results. */
|
|
|
|
if (pcxt->nworkers > 0)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
node->nreaders = 0;
|
|
|
|
node->reader =
|
|
|
|
palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
|
|
|
|
|
|
|
|
for (i = 0; i < pcxt->nworkers; ++i)
|
2015-10-16 17:56:02 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
if (pcxt->worker[i].bgwhandle == NULL)
|
|
|
|
continue;
|
|
|
|
|
2015-10-16 17:56:02 +02:00
|
|
|
shm_mq_set_handle(node->pei->tqueue[i],
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
pcxt->worker[i].bgwhandle);
|
|
|
|
node->reader[node->nreaders++] =
|
|
|
|
CreateTupleQueueReader(node->pei->tqueue[i],
|
|
|
|
fslot->tts_tupleDescriptor);
|
2015-10-16 17:56:02 +02:00
|
|
|
got_any_worker = true;
|
|
|
|
}
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
2015-10-16 17:56:02 +02:00
|
|
|
|
|
|
|
/* No workers? Then never mind. */
|
|
|
|
if (!got_any_worker)
|
2015-11-20 19:03:39 +01:00
|
|
|
ExecShutdownGatherWorkers(node);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
2015-10-16 17:56:02 +02:00
|
|
|
/* Run plan locally if no workers or not single-copy. */
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
node->need_to_scan_locally = (node->reader == NULL)
|
2015-10-16 17:56:02 +02:00
|
|
|
|| !gather->single_copy;
|
|
|
|
node->initialized = true;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
/*
|
|
|
|
* Check to see if we're still projecting out tuples from a previous scan
|
|
|
|
* tuple (because there is a function-returning-set in the projection
|
|
|
|
* expressions). If so, try to project another one.
|
|
|
|
*/
|
|
|
|
if (node->ps.ps_TupFromTlist)
|
|
|
|
{
|
|
|
|
resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
|
|
|
|
if (isDone == ExprMultipleResult)
|
|
|
|
return resultSlot;
|
|
|
|
/* Done with that source tuple... */
|
|
|
|
node->ps.ps_TupFromTlist = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Reset per-tuple memory context to free any expression evaluation
|
|
|
|
* storage allocated in the previous tuple cycle. Note we can't do this
|
|
|
|
* until we're done projecting.
|
|
|
|
*/
|
|
|
|
econtext = node->ps.ps_ExprContext;
|
|
|
|
ResetExprContext(econtext);
|
|
|
|
|
|
|
|
/* Get and return the next tuple, projecting if necessary. */
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* Get next tuple, either from one of our workers, or by running the
|
|
|
|
* plan ourselves.
|
|
|
|
*/
|
|
|
|
slot = gather_getnext(node);
|
|
|
|
if (TupIsNull(slot))
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* form the result tuple using ExecProject(), and return it --- unless
|
|
|
|
* the projection produces an empty set, in which case we must loop
|
|
|
|
* back around for another tuple
|
|
|
|
*/
|
|
|
|
econtext->ecxt_outertuple = slot;
|
|
|
|
resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
|
|
|
|
|
|
|
|
if (isDone != ExprEndResult)
|
|
|
|
{
|
|
|
|
node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
|
|
|
|
return resultSlot;
|
|
|
|
}
|
|
|
|
}
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
return slot;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecEndGather
|
|
|
|
*
|
|
|
|
* frees any storage allocated through C routines.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ExecEndGather(GatherState *node)
|
|
|
|
{
|
|
|
|
ExecShutdownGather(node);
|
|
|
|
ExecFreeExprContext(&node->ps);
|
|
|
|
ExecClearTuple(node->ps.ps_ResultTupleSlot);
|
|
|
|
ExecEndNode(outerPlanState(node));
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
* Read the next tuple. We might fetch a tuple from one of the tuple queues
|
|
|
|
* using gather_readnext, or if no tuple queue contains a tuple and the
|
|
|
|
* single_copy flag is not set, we might generate one locally instead.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*/
|
2015-10-03 16:59:42 +02:00
|
|
|
static TupleTableSlot *
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
gather_getnext(GatherState *gatherstate)
|
|
|
|
{
|
2015-10-28 00:27:58 +01:00
|
|
|
PlanState *outerPlan = outerPlanState(gatherstate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
TupleTableSlot *outerTupleSlot;
|
2015-10-28 00:27:58 +01:00
|
|
|
TupleTableSlot *fslot = gatherstate->funnel_slot;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
HeapTuple tup;
|
|
|
|
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
if (gatherstate->reader != NULL)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
tup = gather_readnext(gatherstate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
if (HeapTupleIsValid(tup))
|
|
|
|
{
|
|
|
|
ExecStoreTuple(tup, /* tuple to store */
|
2015-10-28 00:27:58 +01:00
|
|
|
fslot, /* slot in which to store the tuple */
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
InvalidBuffer, /* buffer associated with this
|
|
|
|
* tuple */
|
|
|
|
true); /* pfree this pointer if not from heap */
|
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
return fslot;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (gatherstate->need_to_scan_locally)
|
|
|
|
{
|
|
|
|
outerTupleSlot = ExecProcNode(outerPlan);
|
|
|
|
|
|
|
|
if (!TupIsNull(outerTupleSlot))
|
|
|
|
return outerTupleSlot;
|
|
|
|
|
|
|
|
gatherstate->need_to_scan_locally = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
return ExecClearTuple(fslot);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
/*
|
|
|
|
* Attempt to read a tuple from one of our parallel workers.
|
|
|
|
*/
|
|
|
|
static HeapTuple
|
|
|
|
gather_readnext(GatherState *gatherstate)
|
|
|
|
{
|
|
|
|
int waitpos = gatherstate->nextreader;
|
|
|
|
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
TupleQueueReader *reader;
|
|
|
|
HeapTuple tup;
|
|
|
|
bool readerdone;
|
|
|
|
|
|
|
|
/* Make sure we've read all messages from workers. */
|
|
|
|
HandleParallelMessages();
|
|
|
|
|
|
|
|
/* Attempt to read a tuple, but don't block if none is available. */
|
|
|
|
reader = gatherstate->reader[gatherstate->nextreader];
|
|
|
|
tup = TupleQueueReaderNext(reader, true, &readerdone);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* If this reader is done, remove it. If all readers are done,
|
|
|
|
* clean up remaining worker state.
|
|
|
|
*/
|
|
|
|
if (readerdone)
|
|
|
|
{
|
|
|
|
DestroyTupleQueueReader(reader);
|
|
|
|
--gatherstate->nreaders;
|
|
|
|
if (gatherstate->nreaders == 0)
|
|
|
|
{
|
2015-11-09 16:49:24 +01:00
|
|
|
ExecShutdownGatherWorkers(gatherstate);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
memmove(&gatherstate->reader[gatherstate->nextreader],
|
|
|
|
&gatherstate->reader[gatherstate->nextreader + 1],
|
|
|
|
sizeof(TupleQueueReader *)
|
|
|
|
* (gatherstate->nreaders - gatherstate->nextreader));
|
|
|
|
if (gatherstate->nextreader >= gatherstate->nreaders)
|
|
|
|
gatherstate->nextreader = 0;
|
|
|
|
if (gatherstate->nextreader < waitpos)
|
|
|
|
--waitpos;
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* If we got a tuple, return it. */
|
|
|
|
if (tup)
|
|
|
|
return tup;
|
|
|
|
|
2015-12-23 20:06:52 +01:00
|
|
|
/*
|
|
|
|
* Advance nextreader pointer in round-robin fashion. Note that we
|
|
|
|
* only reach this code if we weren't able to get a tuple from the
|
|
|
|
* current worker. We used to advance the nextreader pointer after
|
|
|
|
* every tuple, but it turns out to be much more efficient to keep
|
|
|
|
* reading from the same queue until that would require blocking.
|
|
|
|
*/
|
|
|
|
gatherstate->nextreader =
|
|
|
|
(gatherstate->nextreader + 1) % gatherstate->nreaders;
|
|
|
|
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
/* Have we visited every TupleQueueReader? */
|
|
|
|
if (gatherstate->nextreader == waitpos)
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* If (still) running plan locally, return NULL so caller can
|
|
|
|
* generate another tuple from the local copy of the plan.
|
|
|
|
*/
|
|
|
|
if (gatherstate->need_to_scan_locally)
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
/* Nothing to do except wait for developments. */
|
|
|
|
WaitLatch(MyLatch, WL_LATCH_SET, 0);
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
ResetLatch(MyLatch);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
/* ----------------------------------------------------------------
|
2015-10-30 10:43:00 +01:00
|
|
|
* ExecShutdownGatherWorkers
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*
|
2015-10-30 10:43:00 +01:00
|
|
|
* Destroy the parallel workers. Collect all the stats after
|
|
|
|
* workers are stopped, else some work done by workers won't be
|
|
|
|
* accounted.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
2015-11-11 00:24:18 +01:00
|
|
|
static void
|
2015-10-30 10:43:00 +01:00
|
|
|
ExecShutdownGatherWorkers(GatherState *node)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
/* Shut down tuple queue readers before shutting down workers. */
|
|
|
|
if (node->reader != NULL)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
int i;
|
|
|
|
|
|
|
|
for (i = 0; i < node->nreaders; ++i)
|
|
|
|
DestroyTupleQueueReader(node->reader[i]);
|
2015-11-20 19:03:39 +01:00
|
|
|
|
|
|
|
pfree(node->reader);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
node->reader = NULL;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
2015-10-16 17:56:02 +02:00
|
|
|
/* Now shut down the workers. */
|
|
|
|
if (node->pei != NULL)
|
|
|
|
ExecParallelFinish(node->pei);
|
2015-10-30 10:43:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecShutdownGather
|
|
|
|
*
|
|
|
|
* Destroy the setup for parallel workers including parallel context.
|
|
|
|
* Collect all the stats after workers are stopped, else some work
|
|
|
|
* done by workers won't be accounted.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ExecShutdownGather(GatherState *node)
|
|
|
|
{
|
|
|
|
ExecShutdownGatherWorkers(node);
|
|
|
|
|
|
|
|
/* Now destroy the parallel context. */
|
|
|
|
if (node->pei != NULL)
|
|
|
|
{
|
2015-10-16 17:56:02 +02:00
|
|
|
ExecParallelCleanup(node->pei);
|
|
|
|
node->pei = NULL;
|
|
|
|
}
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* Join Support
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecReScanGather
|
|
|
|
*
|
|
|
|
* Re-initialize the workers and rescans a relation via them.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ExecReScanGather(GatherState *node)
|
|
|
|
{
|
|
|
|
/*
|
2015-10-30 10:43:00 +01:00
|
|
|
* Re-initialize the parallel workers to perform rescan of relation.
|
|
|
|
* We want to gracefully shutdown all the workers so that they
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
* should be able to propagate any error or other information to master
|
2015-10-30 10:43:00 +01:00
|
|
|
* backend before dying. Parallel context will be reused for rescan.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*/
|
2015-10-30 10:43:00 +01:00
|
|
|
ExecShutdownGatherWorkers(node);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
2015-10-16 17:56:02 +02:00
|
|
|
node->initialized = false;
|
|
|
|
|
2015-10-30 10:43:00 +01:00
|
|
|
if (node->pei)
|
2015-11-18 18:35:25 +01:00
|
|
|
ExecParallelReinitialize(node->pei);
|
2015-10-30 10:43:00 +01:00
|
|
|
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
ExecReScan(node->ps.lefttree);
|
|
|
|
}
|