From 69d34408e5e7adcef8ef2f4e9c4f2919637e9a06 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 3 Feb 2016 12:46:18 -0500 Subject: [PATCH] Allow parallel custom and foreign scans. This patch doesn't put the new infrastructure to use anywhere, and indeed it's not clear how it could ever be used for something like postgres_fdw which has to send an SQL query and wait for a reply, but there might be FDWs or custom scan providers that are CPU-bound, so let's give them a way to join club parallel. KaiGai Kohei, reviewed by me. --- doc/src/sgml/custom-scan.sgml | 37 +++++++++++++++ doc/src/sgml/fdwhandler.sgml | 47 +++++++++++++++++++ src/backend/executor/execParallel.c | 26 +++++++++++ src/backend/executor/nodeCustom.c | 45 +++++++++++++++++++ src/backend/executor/nodeForeignscan.c | 62 ++++++++++++++++++++++++++ src/include/executor/nodeCustom.h | 11 +++++ src/include/executor/nodeForeignscan.h | 8 ++++ src/include/foreign/fdwapi.h | 14 ++++++ src/include/nodes/execnodes.h | 14 +++++- 9 files changed, 263 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml index 9a8d092dec..1ca9247124 100644 --- a/doc/src/sgml/custom-scan.sgml +++ b/doc/src/sgml/custom-scan.sgml @@ -303,6 +303,43 @@ void (*RestrPosCustomScan) (CustomScanState *node); +Size (*EstimateDSMCustomScan) (CustomScanState *node, + ParallelContext *pcxt); + + Estimate the amount of dynamic shared memory that will be required + for parallel operation. This may be higher than the amount that will + actually be used, but it must not be lower. The return value is in bytes. + This callback is optional, and need only be supplied if this custom + scan provider supports parallel execution. + + + + +void (*InitializeDSMCustomScan) (CustomScanState *node, + ParallelContext *pcxt, + void *coordinate); + + Initialize the dynamic shared memory that will be required for parallel + operation; coordinate points to an amount of allocated space + equal to the return value of EstimateDSMCustomScan. + This callback is optional, and need only be supplied if this custom + scan provider supports parallel execution. + + + + +void (*InitializeWorkerCustomScan) (CustomScanState *node, + shm_toc *toc, + void *coordinate); + + Initialize a parallel worker's custom state based on the shared state + set up in the leader by InitializeDSMCustomScan. + This callback is optional, and needs only be supplied if this + custom path supports parallel execution. + + + + void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index dc2d890975..c6b60fa579 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -955,6 +955,53 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid); + + FDW Routines for Parallel Execution + + A ForeignScan node can, optionally, support parallel + execution. A parallel ForeignScan will be executed + in multiple processes and should return each row only once across + all cooperating processes. To do this, processes can coordinate through + fixed size chunks of dynamic shared memory. This shared memory is not + guaranteed to be mapped at the same address in every process, so pointers + may not be used. The following callbacks are all optional in general, + but required if parallel execution is to be supported. + + + + +Size +EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt); + + Estimate the amount of dynamic shared memory that will be required + for parallel operation. This may be higher than the amount that will + actually be used, but it must not be lower. The return value is in bytes. + + + + +void +InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, + void *coordinate); + + Initialize the dynamic shared memory that will be required for parallel + operation; coordinate points to an amount of allocated space + equal to the return value of EstimateDSMForeignScan. + + + + +void +InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, + void *coordinate); + + Initialize a parallel worker's custom state based on the shared state + set up in the leader by InitializeDSMForeignScan. + This callback is optional, and needs only be supplied if this + custom path supports parallel execution. + + + diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 29e450a571..95e8e41d2b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,8 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeCustom.h" +#include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecSeqScanEstimate((SeqScanState *) planstate, e->pcxt); break; + case T_ForeignScanState: + ExecForeignScanEstimate((ForeignScanState *) planstate, + e->pcxt); + break; + case T_CustomScanState: + ExecCustomScanEstimate((CustomScanState *) planstate, + e->pcxt); + break; default: break; } @@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecSeqScanInitializeDSM((SeqScanState *) planstate, d->pcxt); break; + case T_ForeignScanState: + ExecForeignScanInitializeDSM((ForeignScanState *) planstate, + d->pcxt); + break; + case T_CustomScanState: + ExecCustomScanInitializeDSM((CustomScanState *) planstate, + d->pcxt); + break; default: break; } @@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) case T_SeqScanState: ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); break; + case T_ForeignScanState: + ExecForeignScanInitializeWorker((ForeignScanState *) planstate, + toc); + break; + case T_CustomScanState: + ExecCustomScanInitializeWorker((CustomScanState *) planstate, + toc); + break; default: break; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 640289e277..322abca282 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/nodeCustom.h" #include "nodes/execnodes.h" @@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node) node->methods->CustomName))); node->methods->RestrPosCustomScan(node); } + +void +ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->EstimateDSMCustomScan) + { + node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +void +ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeDSMCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + methods->InitializeDSMCustomScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +void +ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeWorkerCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + methods->InitializeWorkerCustomScan(node, toc, coordinate); + } +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 64a07bcc77..388c922749 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node) ExecScanReScan(&node->ss); } + +/* ---------------------------------------------------------------- + * ExecForeignScanEstimate + * + * Informs size of the parallel coordination information, if any + * ---------------------------------------------------------------- + */ +void +ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->EstimateDSMForeignScan) + { + node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialize the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeDSMForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialization according to the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeWorkerForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); + } +} diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index e244942d79..410a3ad14d 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -12,6 +12,7 @@ #ifndef NODECUSTOM_H #define NODECUSTOM_H +#include "access/parallel.h" #include "nodes/execnodes.h" /* @@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node); extern void ExecCustomMarkPos(CustomScanState *node); extern void ExecCustomRestrPos(CustomScanState *node); +/* + * Parallel execution support + */ +extern void ExecCustomScanEstimate(CustomScanState *node, + ParallelContext *pcxt); +extern void ExecCustomScanInitializeDSM(CustomScanState *node, + ParallelContext *pcxt); +extern void ExecCustomScanInitializeWorker(CustomScanState *node, + shm_toc *toc); + #endif /* NODECUSTOM_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index a92ce5c22a..c2553295fa 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -14,6 +14,7 @@ #ifndef NODEFOREIGNSCAN_H #define NODEFOREIGNSCAN_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags); @@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node); extern void ExecEndForeignScan(ForeignScanState *node); extern void ExecReScanForeignScan(ForeignScanState *node); +extern void ExecForeignScanEstimate(ForeignScanState *node, + ParallelContext *pcxt); +extern void ExecForeignScanInitializeDSM(ForeignScanState *node, + ParallelContext *pcxt); +extern void ExecForeignScanInitializeWorker(ForeignScanState *node, + shm_toc *toc); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index db73233f65..e16fbf34ec 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -12,6 +12,7 @@ #ifndef FDWAPI_H #define FDWAPI_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/relation.h" @@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt, Oid serverOid); +typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt); +typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt, + void *coordinate); +typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, + shm_toc *toc, + void *coordinate); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -177,6 +186,11 @@ typedef struct FdwRoutine /* Support functions for IMPORT FOREIGN SCHEMA */ ImportForeignSchema_function ImportForeignSchema; + + /* Support functions for parallelism under Gather node */ + EstimateDSMForeignScan_function EstimateDSMForeignScan; + InitializeDSMForeignScan_function InitializeDSMForeignScan; + InitializeWorkerForeignScan_function InitializeWorkerForeignScan; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 07cd20ac50..064a0509c4 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1585,6 +1585,7 @@ typedef struct ForeignScanState { ScanState ss; /* its first field is NodeTag */ List *fdw_recheck_quals; /* original quals not in ss.ps.qual */ + Size pscan_len; /* size of parallel coordination information */ /* use struct pointer to avoid including fdwapi.h here */ struct FdwRoutine *fdwroutine; void *fdw_state; /* foreign-data wrapper can keep state here */ @@ -1603,6 +1604,8 @@ typedef struct ForeignScanState * the BeginCustomScan method. * ---------------- */ +struct ParallelContext; /* avoid including parallel.h here */ +struct shm_toc; /* avoid including shm_toc.h here */ struct ExplainState; /* avoid including explain.h here */ struct CustomScanState; @@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods void (*ReScanCustomScan) (struct CustomScanState *node); void (*MarkPosCustomScan) (struct CustomScanState *node); void (*RestrPosCustomScan) (struct CustomScanState *node); - + /* Optional: parallel execution support */ + Size (*EstimateDSMCustomScan) (struct CustomScanState *node, + struct ParallelContext *pcxt); + void (*InitializeDSMCustomScan) (struct CustomScanState *node, + struct ParallelContext *pcxt, + void *coordinate); + void (*InitializeWorkerCustomScan) (struct CustomScanState *node, + struct shm_toc *toc, + void *coordinate); /* Optional: print additional information in EXPLAIN */ void (*ExplainCustomScan) (struct CustomScanState *node, List *ancestors, @@ -1631,6 +1642,7 @@ typedef struct CustomScanState ScanState ss; uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */ List *custom_ps; /* list of child PlanState nodes, if any */ + Size pscan_len; /* size of parallel coordination information */ const CustomExecMethods *methods; } CustomScanState;