Add new FDW API to test for parallel-safety.

This is basically a bug fix; the old code assumes that a ForeignScan
is always parallel-safe, but for postgres_fdw, for example, this is
definitely false.  It should be true for file_fdw, though, since a
worker can read a file from the filesystem just as well as any other
backend process.

Original patch by Thomas Munro.  Documentation, and changes to the
comments, by me.
This commit is contained in:
Robert Haas 2016-02-26 16:14:46 +05:30
parent 9117985b6b
commit 35746bc348
4 changed files with 60 additions and 0 deletions

View File

@ -131,6 +131,8 @@ static void fileEndForeignScan(ForeignScanState *node);
static bool fileAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
/*
* Helper functions
@ -170,6 +172,7 @@ file_fdw_handler(PG_FUNCTION_ARGS)
fdwroutine->ReScanForeignScan = fileReScanForeignScan;
fdwroutine->EndForeignScan = fileEndForeignScan;
fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
PG_RETURN_POINTER(fdwroutine);
}
@ -761,6 +764,18 @@ fileAnalyzeForeignTable(Relation relation,
return true;
}
/*
* fileIsForeignScanParallelSafe
* Reading a file in a parallel worker should work just the same as
* reading it in the leader, so mark scans safe.
*/
static bool
fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte)
{
return true;
}
/*
* check_selective_binary_conversion
*

View File

@ -988,6 +988,29 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
<para>
<programlisting>
Size
IsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
</programlisting>
Test whether a scan can be performed within a parallel worker. This
function will only be called when the planner believes that a parallel
plan might be possible, and should return true if it is safe for that scan
to run within a parallel worker. This will generally not be the case if
the remote data source has transaction semantics, unless the worker's
connection to the data can somehow be made to share the same transaction
context as the leader.
</para>
<para>
If this callback is not defined, it is assumed that the scan must take
place within the parallel leader. Note that returning true does not mean
that the scan itself can be done in parallel, only that the scan can be
performed within a parallel worker. Therefore, it can be useful to define
this method even when parallel execution is not supported.
</para>
<para>
<programlisting>
Size
EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
</programlisting>
Estimate the amount of dynamic shared memory that will be required

View File

@ -527,6 +527,23 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
return;
return;
}
/*
* Ask FDWs whether they can support performing a ForeignScan
* within a worker. Most often, the answer will be no. For
* example, if the nature of the FDW is such that it opens a TCP
* connection with a remote server, each parallel worker would end
* up with a separate connection, and these connections might not
* be appropriately coordinated between workers and the leader.
*/
if (rte->relkind == RELKIND_FOREIGN_TABLE)
{
Assert(rel->fdwroutine);
if (!rel->fdwroutine->IsForeignScanParallelSafe)
return;
if (!rel->fdwroutine->IsForeignScanParallelSafe(root, rel, rte))
return;
}
break;
case RTE_SUBQUERY:

View File

@ -131,6 +131,10 @@ typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
shm_toc *toc,
void *coordinate);
typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
RelOptInfo *rel,
RangeTblEntry *rte);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@ -188,6 +192,7 @@ typedef struct FdwRoutine
ImportForeignSchema_function ImportForeignSchema;
/* Support functions for parallelism under Gather node */
IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
EstimateDSMForeignScan_function EstimateDSMForeignScan;
InitializeDSMForeignScan_function InitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;