Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* nodeGather.c
|
|
|
|
* Support routines for scanning a plan via multiple workers.
|
|
|
|
*
|
2018-01-03 05:30:12 +01:00
|
|
|
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
|
|
*
|
2015-10-22 16:37:24 +02:00
|
|
|
* A Gather executor launches parallel workers to run multiple copies of a
|
|
|
|
* plan. It can also run the plan itself, if the workers are not available
|
|
|
|
* or have not started up yet. It then merges all of the results it produces
|
|
|
|
* and the results from the workers into a single output stream. Therefore,
|
|
|
|
* it will normally be used with a plan where running multiple copies of the
|
2015-11-30 18:54:11 +01:00
|
|
|
* same plan does not produce duplicate output, such as parallel-aware
|
|
|
|
* SeqScan.
|
2015-10-22 16:37:24 +02:00
|
|
|
*
|
|
|
|
* Alternatively, a Gather node can be configured to use just one worker
|
|
|
|
* and the single-copy flag can be set. In this case, the Gather node will
|
|
|
|
* run the plan in one worker and will not execute the plan itself. In
|
|
|
|
* this case, it simply returns whatever tuples were returned by the worker.
|
|
|
|
* If a worker cannot be obtained, then it will run the plan itself and
|
|
|
|
* return the results. Therefore, a plan used with a single-copy Gather
|
|
|
|
* node need not be parallel-aware.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* src/backend/executor/nodeGather.c
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include "access/relscan.h"
|
2015-10-16 17:56:02 +02:00
|
|
|
#include "access/xact.h"
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
#include "executor/execdebug.h"
|
|
|
|
#include "executor/execParallel.h"
|
|
|
|
#include "executor/nodeGather.h"
|
|
|
|
#include "executor/nodeSubplan.h"
|
|
|
|
#include "executor/tqueue.h"
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
#include "miscadmin.h"
|
2017-11-15 14:17:29 +01:00
|
|
|
#include "optimizer/planmain.h"
|
2016-10-04 16:50:13 +02:00
|
|
|
#include "pgstat.h"
|
2015-10-28 00:27:58 +01:00
|
|
|
#include "utils/memutils.h"
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
#include "utils/rel.h"
|
|
|
|
|
|
|
|
|
2017-07-17 09:33:49 +02:00
|
|
|
static TupleTableSlot *ExecGather(PlanState *pstate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
static HeapTuple gather_readnext(GatherState *gatherstate);
|
2015-10-30 10:43:00 +01:00
|
|
|
static void ExecShutdownGatherWorkers(GatherState *node);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecInitGather
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
GatherState *
|
|
|
|
ExecInitGather(Gather *node, EState *estate, int eflags)
|
|
|
|
{
|
|
|
|
GatherState *gatherstate;
|
2015-10-28 00:27:58 +01:00
|
|
|
Plan *outerNode;
|
|
|
|
TupleDesc tupDesc;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
/* Gather node doesn't have innerPlan node. */
|
|
|
|
Assert(innerPlan(node) == NULL);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* create state structure
|
|
|
|
*/
|
|
|
|
gatherstate = makeNode(GatherState);
|
|
|
|
gatherstate->ps.plan = (Plan *) node;
|
|
|
|
gatherstate->ps.state = estate;
|
2017-07-17 09:33:49 +02:00
|
|
|
gatherstate->ps.ExecProcNode = ExecGather;
|
Code review for nodeGatherMerge.c.
Comment the fields of GatherMergeState, and organize them a bit more
sensibly. Comment GMReaderTupleBuffer more usefully too. Improve
assorted other comments that were obsolete or just not very good English.
Get rid of the use of a GMReaderTupleBuffer for the leader process;
that was confusing, since only the "done" field was used, and that
in a way redundant with need_to_scan_locally.
In gather_merge_init, avoid calling load_tuple_array for
already-known-exhausted workers. I'm not sure if there's a live bug there,
but the case is unlikely to be well tested due to timing considerations.
Remove some useless code, such as duplicating the tts_isempty test done by
TupIsNull.
Remove useless initialization of ps.qual, replacing that with an assertion
that we have no qual to check. (If we did, the code would fail to check
it.)
Avoid applying heap_copytuple to a null tuple. While that fails to crash,
it's confusing and it makes the code less legible not more so IMO.
Propagate a couple of these changes into nodeGather.c, as well.
Back-patch to v10, partly because of the possibility that the
gather_merge_init change is fixing a live bug, but mostly to keep
the branches in sync to ease future bug fixes.
2017-08-30 23:21:08 +02:00
|
|
|
|
|
|
|
gatherstate->initialized = false;
|
2017-11-15 14:17:29 +01:00
|
|
|
gatherstate->need_to_scan_locally =
|
|
|
|
!node->single_copy && parallel_leader_participation;
|
2017-08-29 19:12:23 +02:00
|
|
|
gatherstate->tuples_needed = -1;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Miscellaneous initialization
|
|
|
|
*
|
|
|
|
* create expression context for node
|
|
|
|
*/
|
|
|
|
ExecAssignExprContext(estate, &gatherstate->ps);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* now initialize outer plan
|
|
|
|
*/
|
2015-10-28 00:27:58 +01:00
|
|
|
outerNode = outerPlan(node);
|
|
|
|
outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
|
2018-02-17 06:17:38 +01:00
|
|
|
tupDesc = ExecGetResultType(outerPlanState(gatherstate));
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Initialize result slot, type and projection.
|
|
|
|
*/
|
|
|
|
ExecInitResultTupleSlotTL(estate, &gatherstate->ps);
|
|
|
|
ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
/*
|
|
|
|
* Initialize funnel slot to same tuple descriptor as outer plan.
|
|
|
|
*/
|
2018-02-17 06:17:38 +01:00
|
|
|
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc);
|
2015-10-28 00:27:58 +01:00
|
|
|
|
2017-11-25 16:49:17 +01:00
|
|
|
/*
|
2018-02-17 06:17:38 +01:00
|
|
|
* Gather doesn't support checking a qual (it's always more efficient to
|
|
|
|
* do it in the child node).
|
2017-11-25 16:49:17 +01:00
|
|
|
*/
|
2018-02-17 06:17:38 +01:00
|
|
|
Assert(!node->plan.qual);
|
2017-11-25 16:49:17 +01:00
|
|
|
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
return gatherstate;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecGather(node)
|
|
|
|
*
|
|
|
|
* Scans the relation via multiple workers and returns
|
|
|
|
* the next qualifying tuple.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
2017-07-17 09:33:49 +02:00
|
|
|
static TupleTableSlot *
|
|
|
|
ExecGather(PlanState *pstate)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
2017-07-17 09:33:49 +02:00
|
|
|
GatherState *node = castNode(GatherState, pstate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
TupleTableSlot *slot;
|
2015-10-28 00:27:58 +01:00
|
|
|
ExprContext *econtext;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
2017-07-26 02:37:17 +02:00
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
/*
|
|
|
|
* Initialize the parallel context and workers on first execution. We do
|
|
|
|
* this on first execution rather than during node initialization, as it
|
2017-01-22 17:47:38 +01:00
|
|
|
* needs to allocate a large dynamic segment, so it is better to do it
|
|
|
|
* only if it is really needed.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*/
|
2015-10-16 17:56:02 +02:00
|
|
|
if (!node->initialized)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
|
|
|
EState *estate = node->ps.state;
|
2015-10-16 17:56:02 +02:00
|
|
|
Gather *gather = (Gather *) node->ps.plan;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
/*
|
2016-06-10 00:02:36 +02:00
|
|
|
* Sometimes we might have to run without parallelism; but if parallel
|
|
|
|
* mode is active then we can try to fire up some workers.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*/
|
2017-10-27 16:04:01 +02:00
|
|
|
if (gather->num_workers > 0 && estate->es_use_parallel_mode)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
ParallelContext *pcxt;
|
2015-10-16 17:56:02 +02:00
|
|
|
|
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 19:18:16 +02:00
|
|
|
/* Initialize, or re-initialize, shared state needed by workers. */
|
2015-10-30 10:43:00 +01:00
|
|
|
if (!node->pei)
|
|
|
|
node->pei = ExecInitParallelPlan(node->ps.lefttree,
|
|
|
|
estate,
|
2017-11-16 18:06:14 +01:00
|
|
|
gather->initParam,
|
2017-08-29 19:12:23 +02:00
|
|
|
gather->num_workers,
|
|
|
|
node->tuples_needed);
|
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 19:18:16 +02:00
|
|
|
else
|
|
|
|
ExecParallelReinitialize(node->ps.lefttree,
|
2017-11-16 18:06:14 +01:00
|
|
|
node->pei,
|
|
|
|
gather->initParam);
|
2015-10-16 17:56:02 +02:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Register backend workers. We might not get as many as we
|
|
|
|
* requested, or indeed any at all.
|
|
|
|
*/
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
pcxt = node->pei->pcxt;
|
|
|
|
LaunchParallelWorkers(pcxt);
|
Code review for nodeGatherMerge.c.
Comment the fields of GatherMergeState, and organize them a bit more
sensibly. Comment GMReaderTupleBuffer more usefully too. Improve
assorted other comments that were obsolete or just not very good English.
Get rid of the use of a GMReaderTupleBuffer for the leader process;
that was confusing, since only the "done" field was used, and that
in a way redundant with need_to_scan_locally.
In gather_merge_init, avoid calling load_tuple_array for
already-known-exhausted workers. I'm not sure if there's a live bug there,
but the case is unlikely to be well tested due to timing considerations.
Remove some useless code, such as duplicating the tts_isempty test done by
TupIsNull.
Remove useless initialization of ps.qual, replacing that with an assertion
that we have no qual to check. (If we did, the code would fail to check
it.)
Avoid applying heap_copytuple to a null tuple. While that fails to crash,
it's confusing and it makes the code less legible not more so IMO.
Propagate a couple of these changes into nodeGather.c, as well.
Back-patch to v10, partly because of the possibility that the
gather_merge_init change is fixing a live bug, but mostly to keep
the branches in sync to ease future bug fixes.
2017-08-30 23:21:08 +02:00
|
|
|
/* We save # workers launched for the benefit of EXPLAIN */
|
2016-04-15 17:49:41 +02:00
|
|
|
node->nworkers_launched = pcxt->nworkers_launched;
|
2015-10-16 17:56:02 +02:00
|
|
|
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
/* Set up tuple queue readers to read the results. */
|
2016-03-04 18:59:10 +01:00
|
|
|
if (pcxt->nworkers_launched > 0)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
2017-09-15 04:59:02 +02:00
|
|
|
ExecParallelCreateReaders(node->pei);
|
2017-09-01 23:38:54 +02:00
|
|
|
/* Make a working array showing the active readers */
|
|
|
|
node->nreaders = pcxt->nworkers_launched;
|
|
|
|
node->reader = (TupleQueueReader **)
|
|
|
|
palloc(node->nreaders * sizeof(TupleQueueReader *));
|
|
|
|
memcpy(node->reader, node->pei->reader,
|
|
|
|
node->nreaders * sizeof(TupleQueueReader *));
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
2016-03-04 18:59:10 +01:00
|
|
|
else
|
|
|
|
{
|
2016-06-10 00:02:36 +02:00
|
|
|
/* No workers? Then never mind. */
|
2017-09-01 23:38:54 +02:00
|
|
|
node->nreaders = 0;
|
|
|
|
node->reader = NULL;
|
2016-03-04 18:59:10 +01:00
|
|
|
}
|
2017-09-01 23:38:54 +02:00
|
|
|
node->nextreader = 0;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
2017-11-15 14:17:29 +01:00
|
|
|
/* Run plan locally if no workers or enabled and not single-copy. */
|
2017-09-01 23:38:54 +02:00
|
|
|
node->need_to_scan_locally = (node->nreaders == 0)
|
2017-11-15 14:17:29 +01:00
|
|
|
|| (!gather->single_copy && parallel_leader_participation);
|
2015-10-16 17:56:02 +02:00
|
|
|
node->initialized = true;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
/*
|
|
|
|
* Reset per-tuple memory context to free any expression evaluation
|
2017-12-04 16:33:09 +01:00
|
|
|
* storage allocated in the previous tuple cycle.
|
2015-10-28 00:27:58 +01:00
|
|
|
*/
|
|
|
|
econtext = node->ps.ps_ExprContext;
|
|
|
|
ResetExprContext(econtext);
|
|
|
|
|
2017-01-22 17:47:38 +01:00
|
|
|
/*
|
|
|
|
* Get next tuple, either from one of our workers, or by running the plan
|
|
|
|
* ourselves.
|
|
|
|
*/
|
|
|
|
slot = gather_getnext(node);
|
|
|
|
if (TupIsNull(slot))
|
|
|
|
return NULL;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
2017-11-25 16:49:17 +01:00
|
|
|
/* If no projection is required, we're done. */
|
|
|
|
if (node->ps.ps_ProjInfo == NULL)
|
|
|
|
return slot;
|
|
|
|
|
2017-01-22 17:47:38 +01:00
|
|
|
/*
|
|
|
|
* Form the result tuple using ExecProject(), and return it.
|
|
|
|
*/
|
|
|
|
econtext->ecxt_outertuple = slot;
|
|
|
|
return ExecProject(node->ps.ps_ProjInfo);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecEndGather
|
|
|
|
*
|
|
|
|
* frees any storage allocated through C routines.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ExecEndGather(GatherState *node)
|
|
|
|
{
|
2017-05-17 22:31:56 +02:00
|
|
|
ExecEndNode(outerPlanState(node)); /* let children clean up first */
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
ExecShutdownGather(node);
|
|
|
|
ExecFreeExprContext(&node->ps);
|
|
|
|
ExecClearTuple(node->ps.ps_ResultTupleSlot);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
* Read the next tuple. We might fetch a tuple from one of the tuple queues
|
|
|
|
* using gather_readnext, or if no tuple queue contains a tuple and the
|
|
|
|
* single_copy flag is not set, we might generate one locally instead.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*/
|
2015-10-03 16:59:42 +02:00
|
|
|
static TupleTableSlot *
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
gather_getnext(GatherState *gatherstate)
|
|
|
|
{
|
2015-10-28 00:27:58 +01:00
|
|
|
PlanState *outerPlan = outerPlanState(gatherstate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
TupleTableSlot *outerTupleSlot;
|
2015-10-28 00:27:58 +01:00
|
|
|
TupleTableSlot *fslot = gatherstate->funnel_slot;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
HeapTuple tup;
|
|
|
|
|
2017-09-01 23:38:54 +02:00
|
|
|
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
2017-07-26 02:37:17 +02:00
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
2017-09-01 23:38:54 +02:00
|
|
|
if (gatherstate->nreaders > 0)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
tup = gather_readnext(gatherstate);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
if (HeapTupleIsValid(tup))
|
|
|
|
{
|
2018-09-26 01:27:48 +02:00
|
|
|
ExecStoreHeapTuple(tup, /* tuple to store */
|
|
|
|
fslot, /* slot to store the tuple */
|
|
|
|
true); /* pfree tuple when done with it */
|
2015-10-28 00:27:58 +01:00
|
|
|
return fslot;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (gatherstate->need_to_scan_locally)
|
|
|
|
{
|
2018-04-26 20:47:16 +02:00
|
|
|
EState *estate = gatherstate->ps.state;
|
2017-12-18 18:17:37 +01:00
|
|
|
|
|
|
|
/* Install our DSA area while executing the plan. */
|
|
|
|
estate->es_query_dsa =
|
|
|
|
gatherstate->pei ? gatherstate->pei->area : NULL;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
outerTupleSlot = ExecProcNode(outerPlan);
|
2017-12-18 18:17:37 +01:00
|
|
|
estate->es_query_dsa = NULL;
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
|
|
|
if (!TupIsNull(outerTupleSlot))
|
|
|
|
return outerTupleSlot;
|
|
|
|
|
|
|
|
gatherstate->need_to_scan_locally = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-28 00:27:58 +01:00
|
|
|
return ExecClearTuple(fslot);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
/*
|
|
|
|
* Attempt to read a tuple from one of our parallel workers.
|
|
|
|
*/
|
|
|
|
static HeapTuple
|
|
|
|
gather_readnext(GatherState *gatherstate)
|
|
|
|
{
|
2016-07-30 01:31:06 +02:00
|
|
|
int nvisited = 0;
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
TupleQueueReader *reader;
|
|
|
|
HeapTuple tup;
|
|
|
|
bool readerdone;
|
|
|
|
|
2016-08-01 21:13:53 +02:00
|
|
|
/* Check for async events, particularly messages from workers. */
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
|
2018-02-02 15:00:59 +01:00
|
|
|
/*
|
|
|
|
* Attempt to read a tuple, but don't block if none is available.
|
|
|
|
*
|
|
|
|
* Note that TupleQueueReaderNext will just return NULL for a worker
|
|
|
|
* which fails to initialize. We'll treat that worker as having
|
|
|
|
* produced no tuples; WaitForParallelWorkersToFinish will error out
|
|
|
|
* when we get there.
|
|
|
|
*/
|
2016-12-05 21:54:28 +01:00
|
|
|
Assert(gatherstate->nextreader < gatherstate->nreaders);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
reader = gatherstate->reader[gatherstate->nextreader];
|
|
|
|
tup = TupleQueueReaderNext(reader, true, &readerdone);
|
|
|
|
|
|
|
|
/*
|
2017-09-01 23:38:54 +02:00
|
|
|
* If this reader is done, remove it from our working array of active
|
|
|
|
* readers. If all readers are done, we're outta here.
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
*/
|
|
|
|
if (readerdone)
|
|
|
|
{
|
2016-07-30 01:31:06 +02:00
|
|
|
Assert(!tup);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
--gatherstate->nreaders;
|
|
|
|
if (gatherstate->nreaders == 0)
|
2018-08-03 07:32:02 +02:00
|
|
|
{
|
|
|
|
ExecShutdownGatherWorkers(gatherstate);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
return NULL;
|
2018-08-03 07:32:02 +02:00
|
|
|
}
|
2016-07-30 01:31:06 +02:00
|
|
|
memmove(&gatherstate->reader[gatherstate->nextreader],
|
|
|
|
&gatherstate->reader[gatherstate->nextreader + 1],
|
|
|
|
sizeof(TupleQueueReader *)
|
|
|
|
* (gatherstate->nreaders - gatherstate->nextreader));
|
|
|
|
if (gatherstate->nextreader >= gatherstate->nreaders)
|
|
|
|
gatherstate->nextreader = 0;
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* If we got a tuple, return it. */
|
|
|
|
if (tup)
|
|
|
|
return tup;
|
|
|
|
|
2015-12-23 20:06:52 +01:00
|
|
|
/*
|
|
|
|
* Advance nextreader pointer in round-robin fashion. Note that we
|
|
|
|
* only reach this code if we weren't able to get a tuple from the
|
|
|
|
* current worker. We used to advance the nextreader pointer after
|
|
|
|
* every tuple, but it turns out to be much more efficient to keep
|
|
|
|
* reading from the same queue until that would require blocking.
|
|
|
|
*/
|
2016-07-30 01:31:06 +02:00
|
|
|
gatherstate->nextreader++;
|
|
|
|
if (gatherstate->nextreader >= gatherstate->nreaders)
|
|
|
|
gatherstate->nextreader = 0;
|
2015-12-23 20:06:52 +01:00
|
|
|
|
2016-07-30 01:31:06 +02:00
|
|
|
/* Have we visited every (surviving) TupleQueueReader? */
|
|
|
|
nvisited++;
|
|
|
|
if (nvisited >= gatherstate->nreaders)
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
{
|
|
|
|
/*
|
|
|
|
* If (still) running plan locally, return NULL so caller can
|
|
|
|
* generate another tuple from the local copy of the plan.
|
|
|
|
*/
|
|
|
|
if (gatherstate->need_to_scan_locally)
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
/* Nothing to do except wait for developments. */
|
2016-10-04 16:50:13 +02:00
|
|
|
WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER);
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
ResetLatch(MyLatch);
|
2016-07-30 01:31:06 +02:00
|
|
|
nvisited = 0;
|
Modify tqueue infrastructure to support transient record types.
Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend. Transferring such tuples without modification between two
cooperating backends does not work. This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves. The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender. That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.
Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues. This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was. typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
2015-11-06 22:58:45 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
/* ----------------------------------------------------------------
|
2015-10-30 10:43:00 +01:00
|
|
|
* ExecShutdownGatherWorkers
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
*
|
2017-09-01 23:38:54 +02:00
|
|
|
* Stop all the parallel workers.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
2015-11-11 00:24:18 +01:00
|
|
|
static void
|
2015-10-30 10:43:00 +01:00
|
|
|
ExecShutdownGatherWorkers(GatherState *node)
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
{
|
2015-10-16 17:56:02 +02:00
|
|
|
if (node->pei != NULL)
|
|
|
|
ExecParallelFinish(node->pei);
|
2017-09-01 23:38:54 +02:00
|
|
|
|
|
|
|
/* Flush local copy of reader array */
|
|
|
|
if (node->reader)
|
|
|
|
pfree(node->reader);
|
|
|
|
node->reader = NULL;
|
2015-10-30 10:43:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecShutdownGather
|
|
|
|
*
|
|
|
|
* Destroy the setup for parallel workers including parallel context.
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ExecShutdownGather(GatherState *node)
|
|
|
|
{
|
|
|
|
ExecShutdownGatherWorkers(node);
|
|
|
|
|
|
|
|
/* Now destroy the parallel context. */
|
|
|
|
if (node->pei != NULL)
|
|
|
|
{
|
2015-10-16 17:56:02 +02:00
|
|
|
ExecParallelCleanup(node->pei);
|
|
|
|
node->pei = NULL;
|
|
|
|
}
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* Join Support
|
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
/* ----------------------------------------------------------------
|
|
|
|
* ExecReScanGather
|
|
|
|
*
|
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 19:18:16 +02:00
|
|
|
* Prepare to re-scan the result of a Gather.
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
* ----------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ExecReScanGather(GatherState *node)
|
|
|
|
{
|
Force rescanning of parallel-aware scan nodes below a Gather[Merge].
The ExecReScan machinery contains various optimizations for postponing
or skipping rescans of plan subtrees; for example a HashAgg node may
conclude that it can re-use the table it built before, instead of
re-reading its input subtree. But that is wrong if the input contains
a parallel-aware table scan node, since the portion of the table scanned
by the leader process is likely to vary from one rescan to the next.
This explains the timing-dependent buildfarm failures we saw after
commit a2b70c89c.
The established mechanism for showing that a plan node's output is
potentially variable is to mark it as depending on some runtime Param.
Hence, to fix this, invent a dummy Param (one that has a PARAM_EXEC
parameter number, but carries no actual value) associated with each Gather
or GatherMerge node, mark parallel-aware nodes below that node as dependent
on that Param, and arrange for ExecReScanGather[Merge] to flag that Param
as changed whenever the Gather[Merge] node is rescanned.
This solution breaks an undocumented assumption made by the parallel
executor logic, namely that all rescans of nodes below a Gather[Merge]
will happen synchronously during the ReScan of the top node itself.
But that's fundamentally contrary to the design of the ExecReScan code,
and so was doomed to fail someday anyway (even if you want to argue
that the bug being fixed here wasn't a failure of that assumption).
A follow-on patch will address that issue. In the meantime, the worst
that's expected to happen is that given very bad timing luck, the leader
might have to do all the work during a rescan, because workers think
they have nothing to do, if they are able to start up before the eventual
ReScan of the leader's parallel-aware table scan node has reset the
shared scan state.
Although this problem exists in 9.6, there does not seem to be any way
for it to manifest there. Without GatherMerge, it seems that a plan tree
that has a rescan-short-circuiting node below Gather will always also
have one above it that will short-circuit in the same cases, preventing
the Gather from being rescanned. Hence we won't take the risk of
back-patching this change into 9.6. But v10 needs it.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 15:29:55 +02:00
|
|
|
Gather *gather = (Gather *) node->ps.plan;
|
|
|
|
PlanState *outerPlan = outerPlanState(node);
|
|
|
|
|
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 19:18:16 +02:00
|
|
|
/* Make sure any existing workers are gracefully shut down */
|
2015-10-30 10:43:00 +01:00
|
|
|
ExecShutdownGatherWorkers(node);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
|
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 19:18:16 +02:00
|
|
|
/* Mark node so that shared state will be rebuilt at next call */
|
2015-10-16 17:56:02 +02:00
|
|
|
node->initialized = false;
|
|
|
|
|
Force rescanning of parallel-aware scan nodes below a Gather[Merge].
The ExecReScan machinery contains various optimizations for postponing
or skipping rescans of plan subtrees; for example a HashAgg node may
conclude that it can re-use the table it built before, instead of
re-reading its input subtree. But that is wrong if the input contains
a parallel-aware table scan node, since the portion of the table scanned
by the leader process is likely to vary from one rescan to the next.
This explains the timing-dependent buildfarm failures we saw after
commit a2b70c89c.
The established mechanism for showing that a plan node's output is
potentially variable is to mark it as depending on some runtime Param.
Hence, to fix this, invent a dummy Param (one that has a PARAM_EXEC
parameter number, but carries no actual value) associated with each Gather
or GatherMerge node, mark parallel-aware nodes below that node as dependent
on that Param, and arrange for ExecReScanGather[Merge] to flag that Param
as changed whenever the Gather[Merge] node is rescanned.
This solution breaks an undocumented assumption made by the parallel
executor logic, namely that all rescans of nodes below a Gather[Merge]
will happen synchronously during the ReScan of the top node itself.
But that's fundamentally contrary to the design of the ExecReScan code,
and so was doomed to fail someday anyway (even if you want to argue
that the bug being fixed here wasn't a failure of that assumption).
A follow-on patch will address that issue. In the meantime, the worst
that's expected to happen is that given very bad timing luck, the leader
might have to do all the work during a rescan, because workers think
they have nothing to do, if they are able to start up before the eventual
ReScan of the leader's parallel-aware table scan node has reset the
shared scan state.
Although this problem exists in 9.6, there does not seem to be any way
for it to manifest there. Without GatherMerge, it seems that a plan tree
that has a rescan-short-circuiting node below Gather will always also
have one above it that will short-circuit in the same cases, preventing
the Gather from being rescanned. Hence we won't take the risk of
back-patching this change into 9.6. But v10 needs it.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 15:29:55 +02:00
|
|
|
/*
|
|
|
|
* Set child node's chgParam to tell it that the next scan might deliver a
|
|
|
|
* different set of rows within the leader process. (The overall rowset
|
|
|
|
* shouldn't change, but the leader process's subset might; hence nodes
|
|
|
|
* between here and the parallel table scan node mustn't optimize on the
|
|
|
|
* assumption of an unchanging rowset.)
|
|
|
|
*/
|
|
|
|
if (gather->rescan_param >= 0)
|
|
|
|
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
|
|
|
|
gather->rescan_param);
|
|
|
|
|
|
|
|
/*
|
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 19:18:16 +02:00
|
|
|
* If chgParam of subnode is not null then plan will be re-scanned by
|
|
|
|
* first ExecProcNode. Note: because this does nothing if we have a
|
|
|
|
* rescan_param, it's currently guaranteed that parallel-aware child nodes
|
|
|
|
* will not see a ReScan call until after they get a ReInitializeDSM call.
|
|
|
|
* That ordering might not be something to rely on, though. A good rule
|
|
|
|
* of thumb is that ReInitializeDSM should reset only shared state, ReScan
|
|
|
|
* should reset only local state, and anything that depends on both of
|
|
|
|
* those steps being finished must wait until the first ExecProcNode call.
|
Force rescanning of parallel-aware scan nodes below a Gather[Merge].
The ExecReScan machinery contains various optimizations for postponing
or skipping rescans of plan subtrees; for example a HashAgg node may
conclude that it can re-use the table it built before, instead of
re-reading its input subtree. But that is wrong if the input contains
a parallel-aware table scan node, since the portion of the table scanned
by the leader process is likely to vary from one rescan to the next.
This explains the timing-dependent buildfarm failures we saw after
commit a2b70c89c.
The established mechanism for showing that a plan node's output is
potentially variable is to mark it as depending on some runtime Param.
Hence, to fix this, invent a dummy Param (one that has a PARAM_EXEC
parameter number, but carries no actual value) associated with each Gather
or GatherMerge node, mark parallel-aware nodes below that node as dependent
on that Param, and arrange for ExecReScanGather[Merge] to flag that Param
as changed whenever the Gather[Merge] node is rescanned.
This solution breaks an undocumented assumption made by the parallel
executor logic, namely that all rescans of nodes below a Gather[Merge]
will happen synchronously during the ReScan of the top node itself.
But that's fundamentally contrary to the design of the ExecReScan code,
and so was doomed to fail someday anyway (even if you want to argue
that the bug being fixed here wasn't a failure of that assumption).
A follow-on patch will address that issue. In the meantime, the worst
that's expected to happen is that given very bad timing luck, the leader
might have to do all the work during a rescan, because workers think
they have nothing to do, if they are able to start up before the eventual
ReScan of the leader's parallel-aware table scan node has reset the
shared scan state.
Although this problem exists in 9.6, there does not seem to be any way
for it to manifest there. Without GatherMerge, it seems that a plan tree
that has a rescan-short-circuiting node below Gather will always also
have one above it that will short-circuit in the same cases, preventing
the Gather from being rescanned. Hence we won't take the risk of
back-patching this change into 9.6. But v10 needs it.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
2017-08-30 15:29:55 +02:00
|
|
|
*/
|
|
|
|
if (outerPlan->chgParam == NULL)
|
|
|
|
ExecReScan(outerPlan);
|
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream. It can also run the plan itself, if the workers are
unavailable or haven't started up yet. It is intended to work with
the Partial Seq Scan node which will be added in future commits.
It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used. In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results. So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes. Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.
There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne. But we're getting
close.
Amit Kapila. Some designs suggestions were provided by me, and I also
reviewed the patch. Single-copy mode, documentation, and other minor
changes also by me.
2015-10-01 01:23:36 +02:00
|
|
|
}
|