Allow parallel custom and foreign scans.

This patch doesn't put the new infrastructure to use anywhere, and
indeed it's not clear how it could ever be used for something like
postgres_fdw which has to send an SQL query and wait for a reply,
but there might be FDWs or custom scan providers that are CPU-bound,
so let's give them a way to join club parallel.

KaiGai Kohei, reviewed by me.
This commit is contained in:
Robert Haas 2016-02-03 12:46:18 -05:00
parent 25e44518c1
commit 69d34408e5
9 changed files with 263 additions and 1 deletions

View File

@ -303,6 +303,43 @@ void (*RestrPosCustomScan) (CustomScanState *node);
<para>
<programlisting>
Size (*EstimateDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt);
</programlisting>
Estimate the amount of dynamic shared memory that will be required
for parallel operation. This may be higher than the amount that will
actually be used, but it must not be lower. The return value is in bytes.
This callback is optional, and need only be supplied if this custom
scan provider supports parallel execution.
</para>
<para>
<programlisting>
void (*InitializeDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt,
void *coordinate);
</programlisting>
Initialize the dynamic shared memory that will be required for parallel
operation; <literal>coordinate</> points to an amount of allocated space
equal to the return value of <function>EstimateDSMCustomScan</>.
This callback is optional, and need only be supplied if this custom
scan provider supports parallel execution.
</para>
<para>
<programlisting>
void (*InitializeWorkerCustomScan) (CustomScanState *node,
shm_toc *toc,
void *coordinate);
</programlisting>
Initialize a parallel worker's custom state based on the shared state
set up in the leader by <literal>InitializeDSMCustomScan</>.
This callback is optional, and needs only be supplied if this
custom path supports parallel execution.
</para>
<para>
<programlisting>
void (*ExplainCustomScan) (CustomScanState *node,
List *ancestors,
ExplainState *es);

View File

@ -955,6 +955,53 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
</sect2>
<sect2 id="fdw-callbacks-parallel">
<title>FDW Routines for Parallel Execution</title>
<para>
A <structname>ForeignScan</> node can, optionally, support parallel
execution. A parallel <structname>ForeignScan</> will be executed
in multiple processes and should return each row only once across
all cooperating processes. To do this, processes can coordinate through
fixed size chunks of dynamic shared memory. This shared memory is not
guaranteed to be mapped at the same address in every process, so pointers
may not be used. The following callbacks are all optional in general,
but required if parallel execution is to be supported.
</para>
<para>
<programlisting>
Size
EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
</programlisting>
Estimate the amount of dynamic shared memory that will be required
for parallel operation. This may be higher than the amount that will
actually be used, but it must not be lower. The return value is in bytes.
</para>
<para>
<programlisting>
void
InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
void *coordinate);
</programlisting>
Initialize the dynamic shared memory that will be required for parallel
operation; <literal>coordinate</> points to an amount of allocated space
equal to the return value of <function>EstimateDSMForeignScan</>.
</para>
<para>
<programlisting>
void
InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
void *coordinate);
</programlisting>
Initialize a parallel worker's custom state based on the shared state
set up in the leader by <literal>InitializeDSMForeignScan</>.
This callback is optional, and needs only be supplied if this
custom path supports parallel execution.
</para>
</sect2>
</sect1>
<sect1 id="fdw-helpers">

View File

@ -25,6 +25,8 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
case T_ForeignScanState:
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
break;
case T_CustomScanState:
ExecCustomScanEstimate((CustomScanState *) planstate,
e->pcxt);
break;
default:
break;
}
@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
case T_ForeignScanState:
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
break;
case T_CustomScanState:
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
d->pcxt);
break;
default:
break;
}
@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
case T_ForeignScanState:
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
toc);
break;
case T_CustomScanState:
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
toc);
break;
default:
break;
}

View File

@ -10,6 +10,7 @@
*/
#include "postgres.h"
#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeCustom.h"
#include "nodes/execnodes.h"
@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node)
node->methods->CustomName)));
node->methods->RestrPosCustomScan(node);
}
void
ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
{
const CustomExecMethods *methods = node->methods;
if (methods->EstimateDSMCustomScan)
{
node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt);
shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
}
void
ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
{
const CustomExecMethods *methods = node->methods;
if (methods->InitializeDSMCustomScan)
{
int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate;
coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
methods->InitializeDSMCustomScan(node, pcxt, coordinate);
shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
}
}
void
ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
{
const CustomExecMethods *methods = node->methods;
if (methods->InitializeWorkerCustomScan)
{
int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate;
coordinate = shm_toc_lookup(toc, plan_node_id);
methods->InitializeWorkerCustomScan(node, toc, coordinate);
}
}

View File

@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node)
ExecScanReScan(&node->ss);
}
/* ----------------------------------------------------------------
* ExecForeignScanEstimate
*
* Informs size of the parallel coordination information, if any
* ----------------------------------------------------------------
*/
void
ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
{
FdwRoutine *fdwroutine = node->fdwroutine;
if (fdwroutine->EstimateDSMForeignScan)
{
node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
}
/* ----------------------------------------------------------------
* ExecForeignScanInitializeDSM
*
* Initialize the parallel coordination information
* ----------------------------------------------------------------
*/
void
ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
{
FdwRoutine *fdwroutine = node->fdwroutine;
if (fdwroutine->InitializeDSMForeignScan)
{
int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate;
coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
}
}
/* ----------------------------------------------------------------
* ExecForeignScanInitializeDSM
*
* Initialization according to the parallel coordination information
* ----------------------------------------------------------------
*/
void
ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
{
FdwRoutine *fdwroutine = node->fdwroutine;
if (fdwroutine->InitializeWorkerForeignScan)
{
int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate;
coordinate = shm_toc_lookup(toc, plan_node_id);
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
}
}

View File

@ -12,6 +12,7 @@
#ifndef NODECUSTOM_H
#define NODECUSTOM_H
#include "access/parallel.h"
#include "nodes/execnodes.h"
/*
@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node);
extern void ExecCustomMarkPos(CustomScanState *node);
extern void ExecCustomRestrPos(CustomScanState *node);
/*
* Parallel execution support
*/
extern void ExecCustomScanEstimate(CustomScanState *node,
ParallelContext *pcxt);
extern void ExecCustomScanInitializeDSM(CustomScanState *node,
ParallelContext *pcxt);
extern void ExecCustomScanInitializeWorker(CustomScanState *node,
shm_toc *toc);
#endif /* NODECUSTOM_H */

View File

@ -14,6 +14,7 @@
#ifndef NODEFOREIGNSCAN_H
#define NODEFOREIGNSCAN_H
#include "access/parallel.h"
#include "nodes/execnodes.h"
extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node);
extern void ExecEndForeignScan(ForeignScanState *node);
extern void ExecReScanForeignScan(ForeignScanState *node);
extern void ExecForeignScanEstimate(ForeignScanState *node,
ParallelContext *pcxt);
extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt);
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
shm_toc *toc);
#endif /* NODEFOREIGNSCAN_H */

View File

@ -12,6 +12,7 @@
#ifndef FDWAPI_H
#define FDWAPI_H
#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
Oid serverOid);
typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt);
typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt,
void *coordinate);
typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
shm_toc *toc,
void *coordinate);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@ -177,6 +186,11 @@ typedef struct FdwRoutine
/* Support functions for IMPORT FOREIGN SCHEMA */
ImportForeignSchema_function ImportForeignSchema;
/* Support functions for parallelism under Gather node */
EstimateDSMForeignScan_function EstimateDSMForeignScan;
InitializeDSMForeignScan_function InitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
} FdwRoutine;

View File

@ -1585,6 +1585,7 @@ typedef struct ForeignScanState
{
ScanState ss; /* its first field is NodeTag */
List *fdw_recheck_quals; /* original quals not in ss.ps.qual */
Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
void *fdw_state; /* foreign-data wrapper can keep state here */
@ -1603,6 +1604,8 @@ typedef struct ForeignScanState
* the BeginCustomScan method.
* ----------------
*/
struct ParallelContext; /* avoid including parallel.h here */
struct shm_toc; /* avoid including shm_toc.h here */
struct ExplainState; /* avoid including explain.h here */
struct CustomScanState;
@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods
void (*ReScanCustomScan) (struct CustomScanState *node);
void (*MarkPosCustomScan) (struct CustomScanState *node);
void (*RestrPosCustomScan) (struct CustomScanState *node);
/* Optional: parallel execution support */
Size (*EstimateDSMCustomScan) (struct CustomScanState *node,
struct ParallelContext *pcxt);
void (*InitializeDSMCustomScan) (struct CustomScanState *node,
struct ParallelContext *pcxt,
void *coordinate);
void (*InitializeWorkerCustomScan) (struct CustomScanState *node,
struct shm_toc *toc,
void *coordinate);
/* Optional: print additional information in EXPLAIN */
void (*ExplainCustomScan) (struct CustomScanState *node,
List *ancestors,
@ -1631,6 +1642,7 @@ typedef struct CustomScanState
ScanState ss;
uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */
List *custom_ps; /* list of child PlanState nodes, if any */
Size pscan_len; /* size of parallel coordination information */
const CustomExecMethods *methods;
} CustomScanState;