diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml index 9a8d092dec..1ca9247124 100644 --- a/doc/src/sgml/custom-scan.sgml +++ b/doc/src/sgml/custom-scan.sgml @@ -303,6 +303,43 @@ void (*RestrPosCustomScan) (CustomScanState *node); +Size (*EstimateDSMCustomScan) (CustomScanState *node, + ParallelContext *pcxt); + + Estimate the amount of dynamic shared memory that will be required + for parallel operation. This may be higher than the amount that will + actually be used, but it must not be lower. The return value is in bytes. + This callback is optional, and need only be supplied if this custom + scan provider supports parallel execution. + + + + +void (*InitializeDSMCustomScan) (CustomScanState *node, + ParallelContext *pcxt, + void *coordinate); + + Initialize the dynamic shared memory that will be required for parallel + operation; coordinate points to an amount of allocated space + equal to the return value of EstimateDSMCustomScan. + This callback is optional, and need only be supplied if this custom + scan provider supports parallel execution. + + + + +void (*InitializeWorkerCustomScan) (CustomScanState *node, + shm_toc *toc, + void *coordinate); + + Initialize a parallel worker's custom state based on the shared state + set up in the leader by InitializeDSMCustomScan. + This callback is optional, and needs only be supplied if this + custom path supports parallel execution. + + + + void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index dc2d890975..c6b60fa579 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -955,6 +955,53 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid); + + FDW Routines for Parallel Execution + + A ForeignScan node can, optionally, support parallel + execution. A parallel ForeignScan will be executed + in multiple processes and should return each row only once across + all cooperating processes. To do this, processes can coordinate through + fixed size chunks of dynamic shared memory. This shared memory is not + guaranteed to be mapped at the same address in every process, so pointers + may not be used. The following callbacks are all optional in general, + but required if parallel execution is to be supported. + + + + +Size +EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt); + + Estimate the amount of dynamic shared memory that will be required + for parallel operation. This may be higher than the amount that will + actually be used, but it must not be lower. The return value is in bytes. + + + + +void +InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, + void *coordinate); + + Initialize the dynamic shared memory that will be required for parallel + operation; coordinate points to an amount of allocated space + equal to the return value of EstimateDSMForeignScan. + + + + +void +InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, + void *coordinate); + + Initialize a parallel worker's custom state based on the shared state + set up in the leader by InitializeDSMForeignScan. + This callback is optional, and needs only be supplied if this + custom path supports parallel execution. + + + diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 29e450a571..95e8e41d2b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,8 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeCustom.h" +#include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -176,6 +178,14 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecSeqScanEstimate((SeqScanState *) planstate, e->pcxt); break; + case T_ForeignScanState: + ExecForeignScanEstimate((ForeignScanState *) planstate, + e->pcxt); + break; + case T_CustomScanState: + ExecCustomScanEstimate((CustomScanState *) planstate, + e->pcxt); + break; default: break; } @@ -220,6 +230,14 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecSeqScanInitializeDSM((SeqScanState *) planstate, d->pcxt); break; + case T_ForeignScanState: + ExecForeignScanInitializeDSM((ForeignScanState *) planstate, + d->pcxt); + break; + case T_CustomScanState: + ExecCustomScanInitializeDSM((CustomScanState *) planstate, + d->pcxt); + break; default: break; } @@ -642,6 +660,14 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) case T_SeqScanState: ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); break; + case T_ForeignScanState: + ExecForeignScanInitializeWorker((ForeignScanState *) planstate, + toc); + break; + case T_CustomScanState: + ExecCustomScanInitializeWorker((CustomScanState *) planstate, + toc); + break; default: break; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 640289e277..322abca282 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/nodeCustom.h" #include "nodes/execnodes.h" @@ -159,3 +160,47 @@ ExecCustomRestrPos(CustomScanState *node) node->methods->CustomName))); node->methods->RestrPosCustomScan(node); } + +void +ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->EstimateDSMCustomScan) + { + node->pscan_len = methods->EstimateDSMCustomScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +void +ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeDSMCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + methods->InitializeDSMCustomScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +void +ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->InitializeWorkerCustomScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + methods->InitializeWorkerCustomScan(node, toc, coordinate); + } +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 64a07bcc77..388c922749 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -282,3 +282,65 @@ ExecReScanForeignScan(ForeignScanState *node) ExecScanReScan(&node->ss); } + +/* ---------------------------------------------------------------- + * ExecForeignScanEstimate + * + * Informs size of the parallel coordination information, if any + * ---------------------------------------------------------------- + */ +void +ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->EstimateDSMForeignScan) + { + node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialize the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeDSMForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +/* ---------------------------------------------------------------- + * ExecForeignScanInitializeDSM + * + * Initialization according to the parallel coordination information + * ---------------------------------------------------------------- + */ +void +ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->InitializeWorkerForeignScan) + { + int plan_node_id = node->ss.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(toc, plan_node_id); + fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); + } +} diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index e244942d79..410a3ad14d 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -12,6 +12,7 @@ #ifndef NODECUSTOM_H #define NODECUSTOM_H +#include "access/parallel.h" #include "nodes/execnodes.h" /* @@ -26,4 +27,14 @@ extern void ExecReScanCustomScan(CustomScanState *node); extern void ExecCustomMarkPos(CustomScanState *node); extern void ExecCustomRestrPos(CustomScanState *node); +/* + * Parallel execution support + */ +extern void ExecCustomScanEstimate(CustomScanState *node, + ParallelContext *pcxt); +extern void ExecCustomScanInitializeDSM(CustomScanState *node, + ParallelContext *pcxt); +extern void ExecCustomScanInitializeWorker(CustomScanState *node, + shm_toc *toc); + #endif /* NODECUSTOM_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index a92ce5c22a..c2553295fa 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -14,6 +14,7 @@ #ifndef NODEFOREIGNSCAN_H #define NODEFOREIGNSCAN_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags); @@ -21,4 +22,11 @@ extern TupleTableSlot *ExecForeignScan(ForeignScanState *node); extern void ExecEndForeignScan(ForeignScanState *node); extern void ExecReScanForeignScan(ForeignScanState *node); +extern void ExecForeignScanEstimate(ForeignScanState *node, + ParallelContext *pcxt); +extern void ExecForeignScanInitializeDSM(ForeignScanState *node, + ParallelContext *pcxt); +extern void ExecForeignScanInitializeWorker(ForeignScanState *node, + shm_toc *toc); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index db73233f65..e16fbf34ec 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -12,6 +12,7 @@ #ifndef FDWAPI_H #define FDWAPI_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/relation.h" @@ -122,6 +123,14 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt, Oid serverOid); +typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt); +typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt, + void *coordinate); +typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, + shm_toc *toc, + void *coordinate); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -177,6 +186,11 @@ typedef struct FdwRoutine /* Support functions for IMPORT FOREIGN SCHEMA */ ImportForeignSchema_function ImportForeignSchema; + + /* Support functions for parallelism under Gather node */ + EstimateDSMForeignScan_function EstimateDSMForeignScan; + InitializeDSMForeignScan_function InitializeDSMForeignScan; + InitializeWorkerForeignScan_function InitializeWorkerForeignScan; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 07cd20ac50..064a0509c4 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1585,6 +1585,7 @@ typedef struct ForeignScanState { ScanState ss; /* its first field is NodeTag */ List *fdw_recheck_quals; /* original quals not in ss.ps.qual */ + Size pscan_len; /* size of parallel coordination information */ /* use struct pointer to avoid including fdwapi.h here */ struct FdwRoutine *fdwroutine; void *fdw_state; /* foreign-data wrapper can keep state here */ @@ -1603,6 +1604,8 @@ typedef struct ForeignScanState * the BeginCustomScan method. * ---------------- */ +struct ParallelContext; /* avoid including parallel.h here */ +struct shm_toc; /* avoid including shm_toc.h here */ struct ExplainState; /* avoid including explain.h here */ struct CustomScanState; @@ -1619,7 +1622,15 @@ typedef struct CustomExecMethods void (*ReScanCustomScan) (struct CustomScanState *node); void (*MarkPosCustomScan) (struct CustomScanState *node); void (*RestrPosCustomScan) (struct CustomScanState *node); - + /* Optional: parallel execution support */ + Size (*EstimateDSMCustomScan) (struct CustomScanState *node, + struct ParallelContext *pcxt); + void (*InitializeDSMCustomScan) (struct CustomScanState *node, + struct ParallelContext *pcxt, + void *coordinate); + void (*InitializeWorkerCustomScan) (struct CustomScanState *node, + struct shm_toc *toc, + void *coordinate); /* Optional: print additional information in EXPLAIN */ void (*ExplainCustomScan) (struct CustomScanState *node, List *ancestors, @@ -1631,6 +1642,7 @@ typedef struct CustomScanState ScanState ss; uint32 flags; /* mask of CUSTOMPATH_* flags, see relation.h */ List *custom_ps; /* list of child PlanState nodes, if any */ + Size pscan_len; /* size of parallel coordination information */ const CustomExecMethods *methods; } CustomScanState;