Revert "Enable parallel SELECT for "INSERT INTO ... SELECT ..."."

To allow inserts in parallel-mode this feature has to ensure that all the
constraints, triggers, etc. are parallel-safe for the partition hierarchy
which is costly and we need to find a better way to do that. Additionally,
we could have used existing cached information in some cases like indexes,
domains, etc. to determine the parallel-safety.

List of commits reverted, in reverse chronological order:

ed62d3737c Doc: Update description for parallel insert reloption.
c8f78b6161 Add a new GUC and a reloption to enable inserts in parallel-mode.
c5be48f092 Improve FK trigger parallel-safety check added by 05c8482f7f.
e2cda3c20a Fix use of relcache TriggerDesc field introduced by commit 05c8482f7f.
e4e87a32cc Fix valgrind issue in commit 05c8482f7f.
05c8482f7f Enable parallel SELECT for "INSERT INTO ... SELECT ...".

Discussion: https://postgr.es/m/E1lMiB9-0001c3-SY@gemulon.postgresql.org
This commit is contained in:
Amit Kapila 2021-03-24 11:10:12 +05:30
parent 84007043fc
commit 26acb54a13
31 changed files with 35 additions and 1761 deletions

View File

@ -5072,29 +5072,6 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
<varlistentry id="guc-enable-parallel-insert" xreflabel="enable_parallel_insert">
<term><varname>enable_parallel_insert</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>enable_parallel_insert</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Enables or disables the query planner's use of parallel plans for
<command>INSERT</command> commands. The default is <literal>on</literal>.
When enabled, the planner performs additional parallel-safety checks
on the target table's attributes and indexes, in order to determine
if it's safe to use a parallel plan for <command>INSERT</command>. In
cases such as when the target table has a large number of partitions,
and particularly also when that table uses something parallel-unsafe
that prevents parallelism, the overhead of these checks may become
prohibitively high. To address this potential overhead in these cases,
this option can be used to disable the use of parallel plans for
<command>INSERT</command>.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
<sect2 id="runtime-config-query-constants">

View File

@ -155,9 +155,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
<listitem>
<para><command>SELECT INTO</command></para>
</listitem>
<listitem>
<para><command>INSERT INTO ... SELECT</command></para>
</listitem>
<listitem>
<para><command>CREATE MATERIALIZED VIEW</command></para>
</listitem>

View File

@ -738,8 +738,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
<para>
<literal>SHARE UPDATE EXCLUSIVE</literal> lock will be taken for
fillfactor, toast and autovacuum storage parameters, as well as the
planner parameters <varname>parallel_workers</varname> and
<varname>parallel_insert_enabled</varname>.
planner parameter <varname>parallel_workers</varname>.
</para>
</listitem>
</varlistentry>

View File

@ -1369,9 +1369,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
If a table parameter value is set and the
equivalent <literal>toast.</literal> parameter is not, the TOAST table
will use the table's parameter value.
These parameters, with the exception of
<literal>parallel_insert_enabled</literal>, are not supported on partitioned
tables, but may be specified for individual leaf partitions.
Specifying these parameters for partitioned tables is not supported,
but you may specify them for individual leaf partitions.
</para>
<variablelist>
@ -1441,32 +1440,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
</listitem>
</varlistentry>
<varlistentry id="reloption-parallel-insert-enabled" xreflabel="parallel_insert_enabled">
<term><literal>parallel_insert_enabled</literal> (<type>boolean</type>)
<indexterm>
<primary><varname>parallel_insert_enabled</varname> storage parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Enables or disables the query planner's use of parallel insert for
this table. When enabled (and provided that
<xref linkend="guc-enable-parallel-insert"/> is also <literal>true</literal>),
the planner performs additional parallel-safety checks on the table's
attributes and indexes, in order to determine if it's safe to use a
parallel plan for <command>INSERT</command>. The default is
<literal>true</literal>. In cases such as when the table has a large
number of partitions, and particularly also when that table uses a
parallel-unsafe feature that prevents parallelism, the overhead of these
checks may become prohibitively high. To address this potential overhead
in these cases, this option can be used to disable the use of parallel
insert for this table. Note that if the target table of the parallel
insert is partitioned, the <literal>parallel_insert_enabled</literal>
option values of the partitions are ignored.
</para>
</listitem>
</varlistentry>
<varlistentry id="reloption-autovacuum-enabled" xreflabel="autovacuum_enabled">
<term><literal>autovacuum_enabled</literal>, <literal>toast.autovacuum_enabled</literal> (<type>boolean</type>)
<indexterm>

View File

@ -168,15 +168,6 @@ static relopt_bool boolRelOpts[] =
},
true
},
{
{
"parallel_insert_enabled",
"Enables \"parallel insert\" feature for this table",
RELOPT_KIND_HEAP | RELOPT_KIND_PARTITIONED,
ShareUpdateExclusiveLock
},
true
},
/* list terminator */
{{NULL}}
};
@ -1868,9 +1859,7 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
{"vacuum_index_cleanup", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, vacuum_index_cleanup)},
{"vacuum_truncate", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, vacuum_truncate)},
{"parallel_insert_enabled", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, parallel_insert_enabled)}
offsetof(StdRdOptions, vacuum_truncate)}
};
return (bytea *) build_reloptions(reloptions, validate, kind,
@ -1972,15 +1961,13 @@ build_local_reloptions(local_relopts *relopts, Datum options, bool validate)
bytea *
partitioned_table_reloptions(Datum reloptions, bool validate)
{
static const relopt_parse_elt tab[] = {
{"parallel_insert_enabled", RELOPT_TYPE_BOOL,
offsetof(PartitionedTableRdOptions, parallel_insert_enabled)}
};
/*
* There are no options for partitioned tables yet, but this is able to do
* some validation.
*/
return (bytea *) build_reloptions(reloptions, validate,
RELOPT_KIND_PARTITIONED,
sizeof(PartitionedTableRdOptions),
tab, lengthof(tab));
0, NULL, 0);
}
/*

View File

@ -1014,32 +1014,6 @@ IsInParallelMode(void)
return CurrentTransactionState->parallelModeLevel != 0;
}
/*
* PrepareParallelModePlanExec
*
* Prepare for entering parallel mode plan execution, based on command-type.
*/
void
PrepareParallelModePlanExec(CmdType commandType)
{
if (IsModifySupportedInParallelMode(commandType))
{
Assert(!IsInParallelMode());
/*
* Prepare for entering parallel mode by assigning a TransactionId.
* Failure to do this now would result in heap_insert() subsequently
* attempting to assign a TransactionId whilst in parallel-mode, which
* is not allowed.
*
* This approach has a disadvantage in that if the underlying SELECT
* does not return any rows, then the TransactionId is not used,
* however that shouldn't happen in practice in many cases.
*/
(void) GetCurrentTransactionId();
}
}
/*
* CommandCounterIncrement
*/

View File

@ -1512,10 +1512,7 @@ ExecutePlan(EState *estate,
estate->es_use_parallel_mode = use_parallel_mode;
if (use_parallel_mode)
{
PrepareParallelModePlanExec(estate->es_plannedstmt->commandType);
EnterParallelMode();
}
/*
* Loop until we've processed the proper number of tuples from the plan.

View File

@ -96,7 +96,6 @@ _copyPlannedStmt(const PlannedStmt *from)
COPY_BITMAPSET_FIELD(rewindPlanIDs);
COPY_NODE_FIELD(rowMarks);
COPY_NODE_FIELD(relationOids);
COPY_NODE_FIELD(partitionOids);
COPY_NODE_FIELD(invalItems);
COPY_NODE_FIELD(paramExecTypes);
COPY_NODE_FIELD(utilityStmt);

View File

@ -314,7 +314,6 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
WRITE_BITMAPSET_FIELD(rewindPlanIDs);
WRITE_NODE_FIELD(rowMarks);
WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(partitionOids);
WRITE_NODE_FIELD(invalItems);
WRITE_NODE_FIELD(paramExecTypes);
WRITE_NODE_FIELD(utilityStmt);
@ -2222,7 +2221,6 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node)
WRITE_NODE_FIELD(resultRelations);
WRITE_NODE_FIELD(appendRelations);
WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(partitionOids);
WRITE_NODE_FIELD(invalItems);
WRITE_NODE_FIELD(paramExecTypes);
WRITE_UINT_FIELD(lastPHId);

View File

@ -1591,7 +1591,6 @@ _readPlannedStmt(void)
READ_BITMAPSET_FIELD(rewindPlanIDs);
READ_NODE_FIELD(rowMarks);
READ_NODE_FIELD(relationOids);
READ_NODE_FIELD(partitionOids);
READ_NODE_FIELD(invalItems);
READ_NODE_FIELD(paramExecTypes);
READ_NODE_FIELD(utilityStmt);

View File

@ -129,8 +129,6 @@ Cost disable_cost = 1.0e10;
int max_parallel_workers_per_gather = 2;
bool enable_parallel_insert = true;
bool enable_seqscan = true;
bool enable_indexscan = true;
bool enable_indexonlyscan = true;

View File

@ -305,7 +305,6 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
glob->resultRelations = NIL;
glob->appendRelations = NIL;
glob->relationOids = NIL;
glob->partitionOids = NIL;
glob->invalItems = NIL;
glob->paramExecTypes = NIL;
glob->lastPHId = 0;
@ -317,16 +316,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
/*
* Assess whether it's feasible to use parallel mode for this query. We
* can't do this in a standalone backend, or if the command will try to
* modify any data (except for Insert), or if this is a cursor operation,
* or if GUCs are set to values that don't permit parallelism, or if
* parallel-unsafe functions are present in the query tree.
* modify any data, or if this is a cursor operation, or if GUCs are set
* to values that don't permit parallelism, or if parallel-unsafe
* functions are present in the query tree.
*
* (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT
* INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as
* of now, only the leader backend writes into a completely new table. In
* the future, we can extend it to allow workers to write into the table.
* However, to allow parallel updates and deletes, we have to solve other
* problems, especially around combo CIDs.)
* (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
* MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
* backend writes into a completely new table. In the future, we can
* extend it to allow workers to write into the table. However, to allow
* parallel updates and deletes, we have to solve other problems,
* especially around combo CIDs.)
*
* For now, we don't try to use parallel mode if we're running inside a
* parallel worker. We might eventually be able to relax this
@ -335,14 +334,13 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
*/
if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster &&
(parse->commandType == CMD_SELECT ||
is_parallel_allowed_for_modify(parse)) &&
parse->commandType == CMD_SELECT &&
!parse->hasModifyingCTE &&
max_parallel_workers_per_gather > 0 &&
!IsParallelWorker())
{
/* all the cheap tests pass, so scan the query tree */
glob->maxParallelHazard = max_parallel_hazard(parse, glob);
glob->maxParallelHazard = max_parallel_hazard(parse);
glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
}
else
@ -523,19 +521,6 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
result->rewindPlanIDs = glob->rewindPlanIDs;
result->rowMarks = glob->finalrowmarks;
result->relationOids = glob->relationOids;
/*
* Register the Oids of parallel-safety-checked partitions as plan
* dependencies. This is only really needed in the case of a parallel plan
* so that if parallel-unsafe properties are subsequently defined on the
* partitions, the cached parallel plan will be invalidated, and a
* non-parallel plan will be generated.
*
* We also use this list to acquire locks on partitions before executing
* cached plan. See AcquireExecutorLocks().
*/
if (glob->partitionOids != NIL && glob->parallelModeNeeded)
result->partitionOids = glob->partitionOids;
result->invalItems = glob->invalItems;
result->paramExecTypes = glob->paramExecTypes;
/* utilityStmt should be null, but we might as well copy it */

View File

@ -19,18 +19,13 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_language.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/functions.h"
#include "funcapi.h"
@ -48,8 +43,6 @@
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_func.h"
#include "parser/parsetree.h"
#include "partitioning/partdesc.h"
#include "rewrite/rewriteManip.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@ -58,8 +51,6 @@
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/partcache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
@ -97,9 +88,6 @@ typedef struct
char max_hazard; /* worst proparallel hazard found so far */
char max_interesting; /* worst proparallel hazard of interest */
List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
RangeTblEntry *target_rte; /* query's target relation if any */
CmdType command_type; /* query's command type */
PlannerGlobal *planner_global; /* global info for planner invocation */
} max_parallel_hazard_context;
static bool contain_agg_clause_walker(Node *node, void *context);
@ -110,20 +98,6 @@ static bool contain_volatile_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
static bool max_parallel_hazard_walker(Node *node,
max_parallel_hazard_context *context);
static bool target_rel_max_parallel_hazard(max_parallel_hazard_context *context);
static bool target_rel_max_parallel_hazard_recurse(Relation relation,
CmdType command_type,
max_parallel_hazard_context *context);
static bool target_rel_trigger_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool target_rel_index_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool target_rel_domain_max_parallel_hazard(Oid typid,
max_parallel_hazard_context *context);
static bool target_rel_partitions_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool target_rel_chk_constr_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool contain_nonstrict_functions_walker(Node *node, void *context);
static bool contain_exec_param_walker(Node *node, List *param_ids);
static bool contain_context_dependent_node(Node *clause);
@ -571,19 +545,14 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
* later, in the common case where everything is SAFE.
*/
char
max_parallel_hazard(Query *parse, PlannerGlobal *glob)
max_parallel_hazard(Query *parse)
{
max_parallel_hazard_context context;
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_UNSAFE;
context.safe_param_ids = NIL;
context.target_rte = parse->resultRelation > 0 ?
rt_fetch(parse->resultRelation, parse->rtable) : NULL;
context.command_type = parse->commandType;
context.planner_global = glob;
(void) max_parallel_hazard_walker((Node *) parse, &context);
return context.max_hazard;
}
@ -614,10 +583,6 @@ is_parallel_safe(PlannerInfo *root, Node *node)
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_RESTRICTED;
context.safe_param_ids = NIL;
/* We don't need to evaluate target relation's parallel-safety here. */
context.target_rte = NULL;
context.command_type = CMD_UNKNOWN;
context.planner_global = NULL;
/*
* The params that refer to the same or parent query level are considered
@ -690,20 +655,14 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
* opclass support functions are generally parallel-safe. XmlExpr is a
* bit more dubious but we can probably get away with it. We err on the
* side of caution by treating CoerceToDomain as parallel-restricted.
* However, for table modification statements, we check the parallel
* safety of domain constraints as that could contain a parallel-unsafe
* function, and executing that in parallel mode will lead to error.
* SQLValueFunction should be safe in all cases. NextValueExpr is
* parallel-unsafe.
* (Note: in principle that's wrong because a domain constraint could
* contain a parallel-unsafe function; but useful constraints probably
* never would have such, and assuming they do would cripple use of
* parallel query in the presence of domain types.) SQLValueFunction
* should be safe in all cases. NextValueExpr is parallel-unsafe.
*/
if (IsA(node, CoerceToDomain))
{
if (context->target_rte != NULL)
{
if (target_rel_domain_max_parallel_hazard(((CoerceToDomain *) node)->resulttype, context))
return true;
}
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
return true;
}
@ -728,27 +687,6 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
return true;
}
/*
* ModifyingCTE expressions are treated as parallel-unsafe.
*
* XXX Normally, if the Query has a modifying CTE, the hasModifyingCTE
* flag is set in the Query tree, and the query will be regarded as
* parallel-usafe. However, in some cases, a re-written query with a
* modifying CTE does not have that flag set, due to a bug in the query
* rewriter.
*/
else if (IsA(node, CommonTableExpr))
{
CommonTableExpr *cte = (CommonTableExpr *) node;
Query *ctequery = castNode(Query, cte->ctequery);
if (ctequery->commandType != CMD_SELECT)
{
context->max_hazard = PROPARALLEL_UNSAFE;
return true;
}
}
/*
* As a notational convenience for callers, look through RestrictInfo.
*/
@ -819,19 +757,6 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
}
return false; /* nothing to recurse to */
}
else if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rte = (RangeTblEntry *) node;
/* Nothing interesting to check for SELECTs */
if (context->target_rte == NULL)
return false;
if (rte == context->target_rte)
return target_rel_max_parallel_hazard(context);
return false;
}
/*
* When we're first invoked on a completely unplanned tree, we must
@ -852,9 +777,7 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
/* Recurse into subselects */
return query_tree_walker(query,
max_parallel_hazard_walker,
context,
context->target_rte != NULL ?
QTW_EXAMINE_RTES_BEFORE : 0);
context, 0);
}
/* Recurse to check arguments */
@ -863,486 +786,6 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
context);
}
/*
* target_rel_max_parallel_hazard
*
* Determines the maximum parallel-mode hazard level for modification
* of a specified relation.
*/
static bool
target_rel_max_parallel_hazard(max_parallel_hazard_context *context)
{
bool max_hazard_found;
Relation targetRel;
/*
* The target table is already locked by the caller (this is done in the
* parse/analyze phase), and remains locked until end-of-transaction.
*/
targetRel = table_open(context->target_rte->relid, NoLock);
max_hazard_found = target_rel_max_parallel_hazard_recurse(targetRel,
context->command_type,
context);
table_close(targetRel, NoLock);
return max_hazard_found;
}
static bool
target_rel_max_parallel_hazard_recurse(Relation rel,
CmdType command_type,
max_parallel_hazard_context *context)
{
/* Currently only CMD_INSERT is supported */
Assert(command_type == CMD_INSERT);
/*
* We can't support table modification in a parallel worker if it's a
* foreign table/partition (no FDW API for supporting parallel access) or
* a temporary table.
*/
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
RelationUsesLocalBuffers(rel))
{
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
return true;
}
/*
* If a partitioned table, check that each partition is safe for
* modification in parallel-mode.
*/
if (target_rel_partitions_max_parallel_hazard(rel, context))
return true;
/*
* If there are any index expressions or index predicate, check that they
* are parallel-mode safe.
*/
if (target_rel_index_max_parallel_hazard(rel, context))
return true;
/*
* If any triggers exist, check that they are parallel-safe.
*/
if (target_rel_trigger_max_parallel_hazard(rel, context))
return true;
/*
* Column default expressions are only applicable to INSERT and UPDATE.
* For columns in the target-list, these are already being checked for
* parallel-safety in the max_parallel_hazard() scan of the query tree in
* standard_planner(), so there's no need to do it here. Note that even
* though column defaults may be specified separately for each partition
* in a partitioned table, a partition's default value is not applied when
* inserting a tuple through a partitioned table.
*/
/*
* CHECK constraints are only applicable to INSERT and UPDATE. If any
* CHECK constraints exist, determine if they are parallel-safe.
*/
if (target_rel_chk_constr_max_parallel_hazard(rel, context))
return true;
return false;
}
/*
* target_rel_trigger_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for the specified relation's
* trigger data.
*/
static bool
target_rel_trigger_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
int i;
if (rel->trigdesc == NULL)
return false;
/*
* Care is needed here to avoid using the same relcache TriggerDesc field
* across other cache accesses, because relcache doesn't guarantee that it
* won't move.
*/
for (i = 0; i < rel->trigdesc->numtriggers; i++)
{
Oid tgfoid = rel->trigdesc->triggers[i].tgfoid;
if (max_parallel_hazard_test(func_parallel(tgfoid), context))
return true;
}
return false;
}
/*
* target_rel_index_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for any existing index
* expressions or index predicate of a specified relation.
*/
static bool
target_rel_index_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
List *index_oid_list;
ListCell *lc;
bool found_max_hazard = false;
LOCKMODE lockmode = AccessShareLock;
index_oid_list = RelationGetIndexList(rel);
foreach(lc, index_oid_list)
{
Relation index_rel;
Form_pg_index indexStruct;
List *ii_Expressions;
List *ii_Predicate;
Oid index_oid = lfirst_oid(lc);
index_rel = index_open(index_oid, lockmode);
indexStruct = index_rel->rd_index;
ii_Expressions = RelationGetIndexExpressions(index_rel);
if (ii_Expressions != NIL)
{
int i;
ListCell *index_expr_item = list_head(ii_Expressions);
for (i = 0; i < indexStruct->indnatts; i++)
{
int keycol = indexStruct->indkey.values[i];
if (keycol == 0)
{
/* Found an index expression */
Node *index_expr;
Assert(index_expr_item != NULL);
if (index_expr_item == NULL) /* shouldn't happen */
{
elog(WARNING, "too few entries in indexprs list");
context->max_hazard = PROPARALLEL_UNSAFE;
found_max_hazard = true;
break;
}
index_expr = (Node *) lfirst(index_expr_item);
if (max_parallel_hazard_walker(index_expr, context))
{
found_max_hazard = true;
break;
}
index_expr_item = lnext(ii_Expressions, index_expr_item);
}
}
}
if (!found_max_hazard)
{
ii_Predicate = RelationGetIndexPredicate(index_rel);
if (ii_Predicate != NIL)
{
if (max_parallel_hazard_walker((Node *) ii_Predicate, context))
found_max_hazard = true;
}
}
/*
* XXX We don't need to retain lock on index as index expressions
* can't be changed later.
*/
index_close(index_rel, lockmode);
}
list_free(index_oid_list);
return found_max_hazard;
}
/*
* target_rel_domain_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for the specified DOMAIN type.
* Only any CHECK expressions are examined for parallel-safety.
*/
static bool
target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context)
{
Relation con_rel;
ScanKeyData key[1];
SysScanDesc scan;
HeapTuple tup;
bool found_max_hazard = false;
LOCKMODE lockmode = AccessShareLock;
con_rel = table_open(ConstraintRelationId, lockmode);
ScanKeyInit(&key[0],
Anum_pg_constraint_contypid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(typid));
scan = systable_beginscan(con_rel, ConstraintTypidIndexId, true,
NULL, 1, key);
while (HeapTupleIsValid((tup = systable_getnext(scan))))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
if (con->contype == CONSTRAINT_CHECK)
{
char *conbin;
Datum val;
bool isnull;
Expr *check_expr;
val = SysCacheGetAttr(CONSTROID, tup,
Anum_pg_constraint_conbin, &isnull);
Assert(!isnull);
if (isnull)
{
/*
* This shouldn't ever happen, but if it does, log a WARNING
* and return UNSAFE, rather than erroring out.
*/
elog(WARNING, "null conbin for constraint %u", con->oid);
context->max_hazard = PROPARALLEL_UNSAFE;
found_max_hazard = true;
break;
}
conbin = TextDatumGetCString(val);
check_expr = stringToNode(conbin);
pfree(conbin);
if (max_parallel_hazard_walker((Node *) check_expr, context))
{
found_max_hazard = true;
break;
}
}
}
systable_endscan(scan);
table_close(con_rel, lockmode);
return found_max_hazard;
}
/*
* target_rel_partitions_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for any partitions of a
* of a specified relation.
*/
static bool
target_rel_partitions_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
int i;
PartitionDesc pdesc;
PartitionKey pkey;
ListCell *partexprs_item;
int partnatts;
List *partexprs;
PlannerGlobal *glob;
if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
return false;
pkey = RelationGetPartitionKey(rel);
partnatts = get_partition_natts(pkey);
partexprs = get_partition_exprs(pkey);
partexprs_item = list_head(partexprs);
for (i = 0; i < partnatts; i++)
{
/* Check parallel-safety of partition key support functions */
if (OidIsValid(pkey->partsupfunc[i].fn_oid))
{
if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context))
return true;
}
/* Check parallel-safety of any expressions in the partition key */
if (get_partition_col_attnum(pkey, i) == 0)
{
Node *check_expr = (Node *) lfirst(partexprs_item);
if (max_parallel_hazard_walker(check_expr, context))
return true;
partexprs_item = lnext(partexprs, partexprs_item);
}
}
/* Recursively check each partition ... */
/* Create the PartitionDirectory infrastructure if we didn't already */
glob = context->planner_global;
if (glob->partition_directory == NULL)
glob->partition_directory =
CreatePartitionDirectory(CurrentMemoryContext);
pdesc = PartitionDirectoryLookup(glob->partition_directory, rel);
for (i = 0; i < pdesc->nparts; i++)
{
bool max_hazard_found;
Relation part_rel;
/*
* The partition needs to be locked, and remain locked until
* end-of-transaction to ensure its parallel-safety state is not
* hereafter altered.
*/
part_rel = table_open(pdesc->oids[i], context->target_rte->rellockmode);
max_hazard_found = target_rel_max_parallel_hazard_recurse(part_rel,
context->command_type,
context);
table_close(part_rel, NoLock);
/*
* Remember partitionOids to record the partition as a potential plan
* dependency.
*/
glob->partitionOids = lappend_oid(glob->partitionOids, pdesc->oids[i]);
if (max_hazard_found)
return true;
}
return false;
}
/*
* target_rel_chk_constr_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for any CHECK expressions or
* CHECK constraints related to the specified relation.
*/
static bool
target_rel_chk_constr_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
TupleDesc tupdesc;
tupdesc = RelationGetDescr(rel);
/*
* Determine if there are any CHECK constraints which are not
* parallel-safe.
*/
if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
{
int i;
ConstrCheck *check = tupdesc->constr->check;
for (i = 0; i < tupdesc->constr->num_check; i++)
{
Expr *check_expr = stringToNode(check->ccbin);
if (max_parallel_hazard_walker((Node *) check_expr, context))
return true;
}
}
return false;
}
/*
* is_parallel_allowed_for_modify
*
* Check at a high-level if parallel mode is able to be used for the specified
* table-modification statement. Currently, we support only Inserts.
*
* It's not possible in the following cases:
*
* 1) enable_parallel_insert is off
* 2) INSERT...ON CONFLICT...DO UPDATE
* 3) INSERT without SELECT
* 4) the reloption parallel_insert_enabled is set to off
*
* (Note: we don't do in-depth parallel-safety checks here, we do only the
* cheaper tests that can quickly exclude obvious cases for which
* parallelism isn't supported, to avoid having to do further parallel-safety
* checks for these)
*/
bool
is_parallel_allowed_for_modify(Query *parse)
{
bool hasSubQuery;
bool parallel_enabled;
RangeTblEntry *rte;
ListCell *lc;
Relation rel;
if (!IsModifySupportedInParallelMode(parse->commandType))
return false;
if (!enable_parallel_insert)
return false;
/*
* UPDATE is not currently supported in parallel-mode, so prohibit
* INSERT...ON CONFLICT...DO UPDATE...
*
* In order to support update, even if only in the leader, some further
* work would need to be done. A mechanism would be needed for sharing
* combo-cids between leader and workers during parallel-mode, since for
* example, the leader might generate a combo-cid and it needs to be
* propagated to the workers.
*/
if (parse->commandType == CMD_INSERT &&
parse->onConflict != NULL &&
parse->onConflict->action == ONCONFLICT_UPDATE)
return false;
/*
* If there is no underlying SELECT, a parallel insert operation is not
* desirable.
*/
hasSubQuery = false;
foreach(lc, parse->rtable)
{
rte = lfirst_node(RangeTblEntry, lc);
if (rte->rtekind == RTE_SUBQUERY)
{
hasSubQuery = true;
break;
}
}
if (!hasSubQuery)
return false;
/*
* Check if parallel_insert_enabled is enabled for the target table, if
* not, skip the safety checks.
*
* (Note: if the target table is partitioned, the parallel_insert_enabled
* option setting of the partitions are ignored).
*/
rte = rt_fetch(parse->resultRelation, parse->rtable);
/*
* The target table is already locked by the caller (this is done in the
* parse/analyze phase), and remains locked until end-of-transaction.
*/
rel = table_open(rte->relid, NoLock);
parallel_enabled = RelationGetParallelInsert(rel, true);
table_close(rel, NoLock);
return parallel_enabled;
}
/*****************************************************************************
* Check clauses for nonstrict functions

View File

@ -1735,23 +1735,6 @@ QueryListGetPrimaryStmt(List *stmts)
return NULL;
}
static void
AcquireExecutorLocksOnPartitions(List *partitionOids, int lockmode,
bool acquire)
{
ListCell *lc;
foreach(lc, partitionOids)
{
Oid partOid = lfirst_oid(lc);
if (acquire)
LockRelationOid(partOid, lockmode);
else
UnlockRelationOid(partOid, lockmode);
}
}
/*
* AcquireExecutorLocks: acquire locks needed for execution of a cached plan;
* or release them if acquire is false.
@ -1765,8 +1748,6 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
{
PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc1);
ListCell *lc2;
Index rti,
resultRelation = 0;
if (plannedstmt->commandType == CMD_UTILITY)
{
@ -1784,9 +1765,6 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
continue;
}
rti = 1;
if (plannedstmt->resultRelations)
resultRelation = linitial_int(plannedstmt->resultRelations);
foreach(lc2, plannedstmt->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2);
@ -1804,14 +1782,6 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
LockRelationOid(rte->relid, rte->rellockmode);
else
UnlockRelationOid(rte->relid, rte->rellockmode);
/* Lock partitions ahead of modifying them in parallel mode. */
if (rti == resultRelation &&
plannedstmt->partitionOids != NIL)
AcquireExecutorLocksOnPartitions(plannedstmt->partitionOids,
rte->rellockmode, acquire);
rti++;
}
}
}
@ -2020,8 +1990,7 @@ PlanCacheRelCallback(Datum arg, Oid relid)
if (plannedstmt->commandType == CMD_UTILITY)
continue; /* Ignore utility statements */
if ((relid == InvalidOid) ? plannedstmt->relationOids != NIL :
(list_member_oid(plannedstmt->relationOids, relid) ||
list_member_oid(plannedstmt->partitionOids, relid)))
list_member_oid(plannedstmt->relationOids, relid))
{
/* Invalidate the generic plan only */
plansource->gplan->is_valid = false;

View File

@ -1131,16 +1131,6 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
{
{"enable_parallel_insert", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of parallel plans for INSERT commands."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_insert,
true,
NULL, NULL, NULL
},
{
/* Not for general use --- used by SET SESSION AUTHORIZATION */
{"is_superuser", PGC_INTERNAL, UNGROUPED,

View File

@ -371,7 +371,6 @@
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
#enable_parallel_insert = on
# - Planner Cost Constants -

View File

@ -1118,7 +1118,6 @@ static const char *const table_storage_parameters[] = {
"autovacuum_vacuum_threshold",
"fillfactor",
"log_autovacuum_min_duration",
"parallel_insert_enabled",
"parallel_workers",
"toast.autovacuum_enabled",
"toast.autovacuum_freeze_max_age",

View File

@ -466,20 +466,5 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
extern void EnterParallelMode(void);
extern void ExitParallelMode(void);
extern bool IsInParallelMode(void);
extern void PrepareParallelModePlanExec(CmdType commandType);
/*
* IsModifySupportedInParallelMode
*
* Indicates whether execution of the specified table-modification command
* (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
* parallel-safety conditions.
*/
static inline bool
IsModifySupportedInParallelMode(CmdType commandType)
{
/* Currently only INSERT is supported */
return (commandType == CMD_INSERT);
}
#endif /* XACT_H */

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202103232
#define CATALOG_VERSION_NO 202103241
#endif

View File

@ -3748,11 +3748,11 @@
# Generic referential integrity constraint triggers
{ oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r',
prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' },
proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger',
proargtypes => '', prosrc => 'RI_FKey_check_ins' },
{ oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r',
prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' },
proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger',
proargtypes => '', prosrc => 'RI_FKey_check_upd' },
{ oid => '1646', descr => 'referential integrity ON DELETE CASCADE',
proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger',
proargtypes => '', prosrc => 'RI_FKey_cascade_del' },

View File

@ -120,8 +120,6 @@ typedef struct PlannerGlobal
List *relationOids; /* OIDs of relations the plan depends on */
List *partitionOids; /* OIDs of partitions the plan depends on */
List *invalItems; /* other dependencies, as PlanInvalItems */
List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */

View File

@ -79,8 +79,6 @@ typedef struct PlannedStmt
List *relationOids; /* OIDs of relations the plan depends on */
List *partitionOids; /* OIDs of partitions the plan depends on */
List *invalItems; /* other dependencies, as PlanInvalItems */
List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */

View File

@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause);
extern bool contain_subplans(Node *clause);
extern char max_parallel_hazard(Query *parse, PlannerGlobal *glob);
extern char max_parallel_hazard(Query *parse);
extern bool is_parallel_safe(PlannerInfo *root, Node *node);
extern bool contain_nonstrict_functions(Node *clause);
extern bool contain_exec_param(Node *clause, List *param_ids);
@ -52,6 +52,5 @@ extern void CommuteOpExpr(OpExpr *clause);
extern Query *inline_set_returning_function(PlannerInfo *root,
RangeTblEntry *rte);
extern bool is_parallel_allowed_for_modify(Query *parse);
#endif /* CLAUSES_H */

View File

@ -47,7 +47,6 @@ typedef enum
/* parameter variables and flags (see also optimizer.h) */
extern PGDLLIMPORT Cost disable_cost;
extern PGDLLIMPORT int max_parallel_workers_per_gather;
extern PGDLLIMPORT bool enable_parallel_insert;
extern PGDLLIMPORT bool enable_seqscan;
extern PGDLLIMPORT bool enable_indexscan;
extern PGDLLIMPORT bool enable_indexonlyscan;

View File

@ -306,8 +306,6 @@ typedef struct StdRdOptions
int parallel_workers; /* max number of parallel workers */
bool vacuum_index_cleanup; /* enables index vacuuming and cleanup */
bool vacuum_truncate; /* enables vacuum to truncate a relation */
bool parallel_insert_enabled; /* enables planner's use of
* parallel insert */
} StdRdOptions;
#define HEAP_MIN_FILLFACTOR 10
@ -425,29 +423,6 @@ typedef struct ViewOptions
((ViewOptions *) (relation)->rd_options)->check_option == \
VIEW_OPTION_CHECK_OPTION_CASCADED)
/*
* PartitionedTableRdOptions
* Contents of rd_options for partitioned tables
*/
typedef struct PartitionedTableRdOptions
{
int32 vl_len_; /* varlena header (do not touch directly!) */
bool parallel_insert_enabled; /* enables planner's use of
* parallel insert */
} PartitionedTableRdOptions;
/*
* RelationGetParallelInsert
* Returns the relation's parallel_insert_enabled reloption setting.
* Note multiple eval of argument!
*/
#define RelationGetParallelInsert(relation, defaultpd) \
((relation)->rd_options ? \
(relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? \
((PartitionedTableRdOptions *) (relation)->rd_options)->parallel_insert_enabled : \
((StdRdOptions *) (relation)->rd_options)->parallel_insert_enabled) : \
(defaultpd))
/*
* RelationIsValid
* True iff relation descriptor is valid.

View File

@ -1,586 +0,0 @@
--
-- PARALLEL
--
--
-- START: setup some tables and data needed by the tests.
--
-- Setup - index expressions test
-- For testing purposes, we'll mark this function as parallel-unsafe
create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel unsafe;
create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel restricted;
create table names(index int, first_name text, last_name text);
create table names2(index int, first_name text, last_name text);
create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
create table names4(index int, first_name text, last_name text);
create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
insert into names values
(1, 'albert', 'einstein'),
(2, 'niels', 'bohr'),
(3, 'erwin', 'schrodinger'),
(4, 'leonhard', 'euler'),
(5, 'stephen', 'hawking'),
(6, 'isaac', 'newton'),
(7, 'alan', 'turing'),
(8, 'richard', 'feynman');
-- Setup - column default tests
create or replace function bdefault_unsafe ()
returns int language plpgsql parallel unsafe as $$
begin
RETURN 5;
end $$;
create or replace function cdefault_restricted ()
returns int language plpgsql parallel restricted as $$
begin
RETURN 10;
end $$;
create or replace function ddefault_safe ()
returns int language plpgsql parallel safe as $$
begin
RETURN 20;
end $$;
create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
create table test_data(a int);
insert into test_data select * from generate_series(1,10);
--
-- END: setup some tables and data needed by the tests.
--
begin;
-- encourage use of parallel plans
set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
create table para_insert_p1 (
unique1 int4 PRIMARY KEY,
stringu1 name
) with (parallel_insert_enabled = off);
create table para_insert_f1 (
unique1 int4 REFERENCES para_insert_p1(unique1),
stringu1 name
);
--
-- Disable guc option enable_parallel_insert
--
set enable_parallel_insert = off;
-- Test INSERT with underlying query when enable_parallel_insert=off and reloption.parallel_insert_enabled=off.
-- (should create plan with serial INSERT + SELECT)
--
explain(costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
QUERY PLAN
--------------------------
Insert on para_insert_p1
-> Seq Scan on tenk1
(2 rows)
--
-- Reset guc option enable_parallel_insert
--
reset enable_parallel_insert;
--
-- Test INSERT with underlying query when enable_parallel_insert=on and reloption.parallel_insert_enabled=off.
-- (should create plan with serial INSERT + SELECT)
--
explain(costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
QUERY PLAN
--------------------------
Insert on para_insert_p1
-> Seq Scan on tenk1
(2 rows)
--
-- Enable reloption parallel_insert_enabled
--
alter table para_insert_p1 set (parallel_insert_enabled = on);
--
-- Test INSERT with underlying query.
-- (should create plan with parallel SELECT, Gather parent node)
--
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
QUERY PLAN
----------------------------------------
Insert on para_insert_p1
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on tenk1
(4 rows)
insert into para_insert_p1 select unique1, stringu1 from tenk1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
count | sum
-------+----------
10000 | 49995000
(1 row)
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
count
-------
1
(1 row)
--
-- Test INSERT with ordered underlying query.
-- (should create plan with parallel SELECT, GatherMerge parent node)
--
truncate para_insert_p1 cascade;
NOTICE: truncate cascades to table "para_insert_f1"
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
QUERY PLAN
----------------------------------------------
Insert on para_insert_p1
-> Gather Merge
Workers Planned: 4
-> Sort
Sort Key: tenk1.unique1
-> Parallel Seq Scan on tenk1
(6 rows)
insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
count | sum
-------+----------
10000 | 49995000
(1 row)
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
count
-------
1
(1 row)
--
-- Test INSERT with RETURNING clause.
-- (should create plan with parallel SELECT, Gather parent node)
--
create table test_data1(like test_data);
explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
QUERY PLAN
--------------------------------------------
Insert on test_data1
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on test_data
Filter: (a = 10)
(5 rows)
insert into test_data1 select * from test_data where a = 10 returning a as data;
data
------
10
(1 row)
--
-- Test INSERT into a table with a foreign key.
-- (Insert into a table with a foreign key is parallel-restricted,
-- as doing this in a parallel worker would create a new commandId
-- and within a worker this is not currently supported)
--
explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
QUERY PLAN
----------------------------------------
Insert on para_insert_f1
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on tenk1
(4 rows)
insert into para_insert_f1 select unique1, stringu1 from tenk1;
-- select some values to verify that the insert worked
select count(*), sum(unique1) from para_insert_f1;
count | sum
-------+----------
10000 | 49995000
(1 row)
--
-- Test INSERT with ON CONFLICT ... DO UPDATE ...
-- (should not create a parallel plan)
--
create table test_conflict_table(id serial primary key, somedata int);
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
QUERY PLAN
--------------------------------------------
Insert on test_conflict_table
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on test_data
(4 rows)
insert into test_conflict_table(id, somedata) select a, a from test_data;
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
QUERY PLAN
------------------------------------------------------
Insert on test_conflict_table
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: test_conflict_table_pkey
-> Seq Scan on test_data
(4 rows)
--
-- Test INSERT with parallel-unsafe index expression
-- (should not create a parallel plan)
--
explain (costs off) insert into names2 select * from names;
QUERY PLAN
-------------------------
Insert on names2
-> Seq Scan on names
(2 rows)
--
-- Test INSERT with parallel-restricted index expression
-- (should create a parallel plan)
--
explain (costs off) insert into names4 select * from names;
QUERY PLAN
----------------------------------------
Insert on names4
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
--
-- Test INSERT with underlying query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names5 (like names);
explain (costs off) insert into names5 select * from names returning *;
QUERY PLAN
----------------------------------------
Insert on names5
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
--
-- Test INSERT with underlying ordered query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names6 (like names);
explain (costs off) insert into names6 select * from names order by last_name returning *;
QUERY PLAN
----------------------------------------------
Insert on names6
-> Gather Merge
Workers Planned: 3
-> Sort
Sort Key: names.last_name
-> Parallel Seq Scan on names
(6 rows)
insert into names6 select * from names order by last_name returning *;
index | first_name | last_name
-------+------------+-------------
2 | niels | bohr
1 | albert | einstein
4 | leonhard | euler
8 | richard | feynman
5 | stephen | hawking
6 | isaac | newton
3 | erwin | schrodinger
7 | alan | turing
(8 rows)
--
-- Test INSERT with underlying ordered query - and RETURNING (with projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names7 (like names);
explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
QUERY PLAN
----------------------------------------------
Insert on names7
-> Gather Merge
Workers Planned: 3
-> Sort
Sort Key: names.last_name
-> Parallel Seq Scan on names
(6 rows)
insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
last_name_then_first_name
---------------------------
bohr, niels
einstein, albert
euler, leonhard
feynman, richard
hawking, stephen
newton, isaac
schrodinger, erwin
turing, alan
(8 rows)
--
-- Test INSERT into temporary table with underlying query.
-- (Insert into a temp table is parallel-restricted;
-- should create a parallel plan; parallel SELECT)
--
create temporary table temp_names (like names);
explain (costs off) insert into temp_names select * from names;
QUERY PLAN
----------------------------------------
Insert on temp_names
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
insert into temp_names select * from names;
--
-- Test INSERT with column defaults
--
--
--
-- Parallel unsafe column default, should not use a parallel plan
--
explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
QUERY PLAN
-----------------------------
Insert on testdef
-> Seq Scan on test_data
(2 rows)
--
-- Parallel restricted column default, should use parallel SELECT
--
explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
QUERY PLAN
--------------------------------------------
Insert on testdef
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on test_data
(4 rows)
insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
select * from testdef order by a;
a | b | c | d
----+----+----+----
1 | 2 | 10 | 8
2 | 4 | 10 | 16
3 | 6 | 10 | 24
4 | 8 | 10 | 32
5 | 10 | 10 | 40
6 | 12 | 10 | 48
7 | 14 | 10 | 56
8 | 16 | 10 | 64
9 | 18 | 10 | 72
10 | 20 | 10 | 80
(10 rows)
truncate testdef;
--
-- Parallel restricted and unsafe column defaults, should not use a parallel plan
--
explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
QUERY PLAN
-----------------------------
Insert on testdef
-> Seq Scan on test_data
(2 rows)
--
-- Test INSERT into partition with underlying query.
--
create table parttable1 (a int, b name) partition by range (a) with (parallel_insert_enabled=off);
create table parttable1_1 partition of parttable1 for values from (0) to (5000);
create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
--
-- Test INSERT into partition when reloption.parallel_insert_enabled=off
-- (should not create a parallel plan)
--
explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
QUERY PLAN
-------------------------
Insert on parttable1
-> Seq Scan on tenk1
(2 rows)
--
-- Enable reloption parallel_insert_enabled
--
alter table parttable1 set (parallel_insert_enabled = on);
--
-- Test INSERT into partition when reloption.parallel_insert_enabled=on
-- (should create a parallel plan)
--
explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
QUERY PLAN
----------------------------------------
Insert on parttable1
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on tenk1
(4 rows)
insert into parttable1 select unique1,stringu1 from tenk1;
select count(*) from parttable1_1;
count
-------
5000
(1 row)
select count(*) from parttable1_2;
count
-------
5000
(1 row)
--
-- Test INSERT into table with parallel-unsafe check constraint
-- (should not create a parallel plan)
--
create or replace function check_b_unsafe(b name) returns boolean as $$
begin
return (b <> 'XXXXXX');
end;
$$ language plpgsql parallel unsafe;
create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1;
QUERY PLAN
-------------------------
Insert on table_check_b
-> Seq Scan on tenk1
(2 rows)
--
-- Test INSERT into table with parallel-safe after stmt-level triggers
-- (should create a parallel SELECT plan; triggers should fire)
--
create table names_with_safe_trigger (like names);
create or replace function insert_after_trigger_safe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_safe';
return new;
end;
$$ language plpgsql parallel safe;
create trigger insert_after_trigger_safe after insert on names_with_safe_trigger
for each statement execute procedure insert_after_trigger_safe();
explain (costs off) insert into names_with_safe_trigger select * from names;
QUERY PLAN
----------------------------------------
Insert on names_with_safe_trigger
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
insert into names_with_safe_trigger select * from names;
NOTICE: hello from insert_after_trigger_safe
--
-- Test INSERT into table with parallel-unsafe after stmt-level triggers
-- (should not create a parallel plan; triggers should fire)
--
create table names_with_unsafe_trigger (like names);
create or replace function insert_after_trigger_unsafe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_unsafe';
return new;
end;
$$ language plpgsql parallel unsafe;
create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into names_with_unsafe_trigger select * from names;
QUERY PLAN
-------------------------------------
Insert on names_with_unsafe_trigger
-> Seq Scan on names
(2 rows)
insert into names_with_unsafe_trigger select * from names;
NOTICE: hello from insert_after_trigger_unsafe
--
-- Test INSERT into partition with parallel-unsafe trigger
-- (should not create a parallel plan)
--
create table part_unsafe_trigger (a int4, b name) partition by range (a);
create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1;
QUERY PLAN
-------------------------------
Insert on part_unsafe_trigger
-> Seq Scan on tenk1
(2 rows)
--
-- Test that parallel-safety-related changes to partitions are detected and
-- plan cache invalidation is working correctly.
--
create table rp (a int) partition by range (a);
create table rp1 partition of rp for values from (minvalue) to (0);
create table rp2 partition of rp for values from (0) to (maxvalue);
create table foo (a) as select unique1 from tenk1;
prepare q as insert into rp select * from foo where a%2 = 0;
-- should create a parallel plan
explain (costs off) execute q;
QUERY PLAN
--------------------------------------
Insert on rp
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on foo
Filter: ((a % 2) = 0)
(5 rows)
create or replace function make_table_bar () returns trigger language
plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe;
create trigger ai_rp2 after insert on rp2 for each row execute
function make_table_bar();
-- should create a non-parallel plan
explain (costs off) execute q;
QUERY PLAN
-------------------------------
Insert on rp
-> Seq Scan on foo
Filter: ((a % 2) = 0)
(3 rows)
--
-- Test INSERT into table having a DOMAIN column with a CHECK constraint
--
create function sql_is_distinct_from_u(anyelement, anyelement)
returns boolean language sql parallel unsafe
as 'select $1 is distinct from $2 limit 1';
create domain inotnull_u int
check (sql_is_distinct_from_u(value, null));
create table dom_table_u (x inotnull_u, y int);
-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint
explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1;
QUERY PLAN
-------------------------
Insert on dom_table_u
-> Seq Scan on tenk1
(2 rows)
rollback;
--
-- Clean up anything not created in the transaction
--
drop table names;
drop index names2_fullname_idx;
drop table names2;
drop index names4_fullname_idx;
drop table names4;
drop table testdef;
drop table test_data;
drop function bdefault_unsafe;
drop function cdefault_restricted;
drop function ddefault_safe;
drop function fullname_parallel_unsafe;
drop function fullname_parallel_restricted;

View File

@ -107,14 +107,13 @@ select name, setting from pg_settings where name like 'enable%';
enable_nestloop | on
enable_parallel_append | on
enable_parallel_hash | on
enable_parallel_insert | on
enable_partition_pruning | on
enable_partitionwise_aggregate | off
enable_partitionwise_join | off
enable_seqscan | on
enable_sort | on
enable_tidscan | on
(19 rows)
(18 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail

View File

@ -90,7 +90,6 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
# run by itself so it can run parallel workers
test: select_parallel
test: write_parallel
test: insert_parallel
# no relation related tests can be put in this group
test: publication subscription

View File

@ -148,7 +148,6 @@ test: stats_ext
test: collate.linux.utf8
test: select_parallel
test: write_parallel
test: insert_parallel
test: publication
test: subscription
test: select_views

View File

@ -1,373 +0,0 @@
--
-- PARALLEL
--
--
-- START: setup some tables and data needed by the tests.
--
-- Setup - index expressions test
-- For testing purposes, we'll mark this function as parallel-unsafe
create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel unsafe;
create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel restricted;
create table names(index int, first_name text, last_name text);
create table names2(index int, first_name text, last_name text);
create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
create table names4(index int, first_name text, last_name text);
create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
insert into names values
(1, 'albert', 'einstein'),
(2, 'niels', 'bohr'),
(3, 'erwin', 'schrodinger'),
(4, 'leonhard', 'euler'),
(5, 'stephen', 'hawking'),
(6, 'isaac', 'newton'),
(7, 'alan', 'turing'),
(8, 'richard', 'feynman');
-- Setup - column default tests
create or replace function bdefault_unsafe ()
returns int language plpgsql parallel unsafe as $$
begin
RETURN 5;
end $$;
create or replace function cdefault_restricted ()
returns int language plpgsql parallel restricted as $$
begin
RETURN 10;
end $$;
create or replace function ddefault_safe ()
returns int language plpgsql parallel safe as $$
begin
RETURN 20;
end $$;
create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
create table test_data(a int);
insert into test_data select * from generate_series(1,10);
--
-- END: setup some tables and data needed by the tests.
--
begin;
-- encourage use of parallel plans
set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
create table para_insert_p1 (
unique1 int4 PRIMARY KEY,
stringu1 name
) with (parallel_insert_enabled = off);
create table para_insert_f1 (
unique1 int4 REFERENCES para_insert_p1(unique1),
stringu1 name
);
--
-- Disable guc option enable_parallel_insert
--
set enable_parallel_insert = off;
-- Test INSERT with underlying query when enable_parallel_insert=off and reloption.parallel_insert_enabled=off.
-- (should create plan with serial INSERT + SELECT)
--
explain(costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
--
-- Reset guc option enable_parallel_insert
--
reset enable_parallel_insert;
--
-- Test INSERT with underlying query when enable_parallel_insert=on and reloption.parallel_insert_enabled=off.
-- (should create plan with serial INSERT + SELECT)
--
explain(costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
--
-- Enable reloption parallel_insert_enabled
--
alter table para_insert_p1 set (parallel_insert_enabled = on);
--
-- Test INSERT with underlying query.
-- (should create plan with parallel SELECT, Gather parent node)
--
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
insert into para_insert_p1 select unique1, stringu1 from tenk1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
--
-- Test INSERT with ordered underlying query.
-- (should create plan with parallel SELECT, GatherMerge parent node)
--
truncate para_insert_p1 cascade;
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
--
-- Test INSERT with RETURNING clause.
-- (should create plan with parallel SELECT, Gather parent node)
--
create table test_data1(like test_data);
explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
insert into test_data1 select * from test_data where a = 10 returning a as data;
--
-- Test INSERT into a table with a foreign key.
-- (Insert into a table with a foreign key is parallel-restricted,
-- as doing this in a parallel worker would create a new commandId
-- and within a worker this is not currently supported)
--
explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
insert into para_insert_f1 select unique1, stringu1 from tenk1;
-- select some values to verify that the insert worked
select count(*), sum(unique1) from para_insert_f1;
--
-- Test INSERT with ON CONFLICT ... DO UPDATE ...
-- (should not create a parallel plan)
--
create table test_conflict_table(id serial primary key, somedata int);
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
insert into test_conflict_table(id, somedata) select a, a from test_data;
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
--
-- Test INSERT with parallel-unsafe index expression
-- (should not create a parallel plan)
--
explain (costs off) insert into names2 select * from names;
--
-- Test INSERT with parallel-restricted index expression
-- (should create a parallel plan)
--
explain (costs off) insert into names4 select * from names;
--
-- Test INSERT with underlying query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names5 (like names);
explain (costs off) insert into names5 select * from names returning *;
--
-- Test INSERT with underlying ordered query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names6 (like names);
explain (costs off) insert into names6 select * from names order by last_name returning *;
insert into names6 select * from names order by last_name returning *;
--
-- Test INSERT with underlying ordered query - and RETURNING (with projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names7 (like names);
explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
--
-- Test INSERT into temporary table with underlying query.
-- (Insert into a temp table is parallel-restricted;
-- should create a parallel plan; parallel SELECT)
--
create temporary table temp_names (like names);
explain (costs off) insert into temp_names select * from names;
insert into temp_names select * from names;
--
-- Test INSERT with column defaults
--
--
--
-- Parallel unsafe column default, should not use a parallel plan
--
explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
--
-- Parallel restricted column default, should use parallel SELECT
--
explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
select * from testdef order by a;
truncate testdef;
--
-- Parallel restricted and unsafe column defaults, should not use a parallel plan
--
explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
--
-- Test INSERT into partition with underlying query.
--
create table parttable1 (a int, b name) partition by range (a) with (parallel_insert_enabled=off);
create table parttable1_1 partition of parttable1 for values from (0) to (5000);
create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
--
-- Test INSERT into partition when reloption.parallel_insert_enabled=off
-- (should not create a parallel plan)
--
explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
--
-- Enable reloption parallel_insert_enabled
--
alter table parttable1 set (parallel_insert_enabled = on);
--
-- Test INSERT into partition when reloption.parallel_insert_enabled=on
-- (should create a parallel plan)
--
explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
insert into parttable1 select unique1,stringu1 from tenk1;
select count(*) from parttable1_1;
select count(*) from parttable1_2;
--
-- Test INSERT into table with parallel-unsafe check constraint
-- (should not create a parallel plan)
--
create or replace function check_b_unsafe(b name) returns boolean as $$
begin
return (b <> 'XXXXXX');
end;
$$ language plpgsql parallel unsafe;
create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1;
--
-- Test INSERT into table with parallel-safe after stmt-level triggers
-- (should create a parallel SELECT plan; triggers should fire)
--
create table names_with_safe_trigger (like names);
create or replace function insert_after_trigger_safe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_safe';
return new;
end;
$$ language plpgsql parallel safe;
create trigger insert_after_trigger_safe after insert on names_with_safe_trigger
for each statement execute procedure insert_after_trigger_safe();
explain (costs off) insert into names_with_safe_trigger select * from names;
insert into names_with_safe_trigger select * from names;
--
-- Test INSERT into table with parallel-unsafe after stmt-level triggers
-- (should not create a parallel plan; triggers should fire)
--
create table names_with_unsafe_trigger (like names);
create or replace function insert_after_trigger_unsafe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_unsafe';
return new;
end;
$$ language plpgsql parallel unsafe;
create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into names_with_unsafe_trigger select * from names;
insert into names_with_unsafe_trigger select * from names;
--
-- Test INSERT into partition with parallel-unsafe trigger
-- (should not create a parallel plan)
--
create table part_unsafe_trigger (a int4, b name) partition by range (a);
create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1;
--
-- Test that parallel-safety-related changes to partitions are detected and
-- plan cache invalidation is working correctly.
--
create table rp (a int) partition by range (a);
create table rp1 partition of rp for values from (minvalue) to (0);
create table rp2 partition of rp for values from (0) to (maxvalue);
create table foo (a) as select unique1 from tenk1;
prepare q as insert into rp select * from foo where a%2 = 0;
-- should create a parallel plan
explain (costs off) execute q;
create or replace function make_table_bar () returns trigger language
plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe;
create trigger ai_rp2 after insert on rp2 for each row execute
function make_table_bar();
-- should create a non-parallel plan
explain (costs off) execute q;
--
-- Test INSERT into table having a DOMAIN column with a CHECK constraint
--
create function sql_is_distinct_from_u(anyelement, anyelement)
returns boolean language sql parallel unsafe
as 'select $1 is distinct from $2 limit 1';
create domain inotnull_u int
check (sql_is_distinct_from_u(value, null));
create table dom_table_u (x inotnull_u, y int);
-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint
explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1;
rollback;
--
-- Clean up anything not created in the transaction
--
drop table names;
drop index names2_fullname_idx;
drop table names2;
drop index names4_fullname_idx;
drop table names4;
drop table testdef;
drop table test_data;
drop function bdefault_unsafe;
drop function cdefault_restricted;
drop function ddefault_safe;
drop function fullname_parallel_unsafe;
drop function fullname_parallel_restricted;

View File

@ -1797,7 +1797,6 @@ PartitionSpec
PartitionTupleRouting
PartitionedRelPruneInfo
PartitionedRelPruningData
PartitionedTableRdOptions
PartitionwiseAggregateType
PasswordType
Path