Enable parallel SELECT for "INSERT INTO ... SELECT ...".

Parallel SELECT can't be utilized for INSERT in the following cases:
- INSERT statement uses the ON CONFLICT DO UPDATE clause
- Target table has a parallel-unsafe: trigger, index expression or
  predicate, column default expression or check constraint
- Target table has a parallel-unsafe domain constraint on any column
- Target table is a partitioned table with a parallel-unsafe partition key
  expression or support function

The planner is updated to perform additional parallel-safety checks for
the cases listed above, for determining whether it is safe to run INSERT
in parallel-mode with an underlying parallel SELECT. The planner will
consider using parallel SELECT for "INSERT INTO ... SELECT ...", provided
nothing unsafe is found from the additional parallel-safety checks, or
from the existing parallel-safety checks for SELECT.

While checking parallel-safety, we need to check it for all the partitions
on the table which can be costly especially when we decide not to use a
parallel plan. So, in a separate patch, we will introduce a GUC and or a
reloption to enable/disable parallelism for Insert statements.

Prior to entering parallel-mode for the execution of INSERT with parallel
SELECT, a TransactionId is acquired and assigned to the current
transaction state. This is necessary to prevent the INSERT from attempting
to assign the TransactionId whilst in parallel-mode, which is not allowed.
This approach has a disadvantage in that if the underlying SELECT does not
return any rows, then the TransactionId is not used, however that
shouldn't happen in practice in many cases.

Author: Greg Nancarrow, Amit Langote, Amit Kapila
Reviewed-by: Amit Langote, Hou Zhijie, Takayuki Tsunakawa, Antonin Houska, Bharath Rupireddy, Dilip Kumar, Vignesh C, Zhihong Yu, Amit Kapila
Tested-by: Tang, Haiying
Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com
Discussion: https://postgr.es/m/CAJcOf-fAdj=nDKMsRhQzndm-O13NY4dL6xGcEvdX5Xvbbi0V7g@mail.gmail.com
This commit is contained in:
Amit Kapila 2021-03-10 07:38:58 +05:30
parent 0ba71107ef
commit 05c8482f7f
17 changed files with 1531 additions and 21 deletions

View File

@ -146,7 +146,9 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
a CTE, no parallel plans for that query will be generated. As an
exception, the commands <literal>CREATE TABLE ... AS</literal>, <literal>SELECT
INTO</literal>, and <literal>CREATE MATERIALIZED VIEW</literal> which create a new
table and populate it can use a parallel plan.
table and populate it can use a parallel plan. Another exception is the command
<literal>INSERT INTO ... SELECT ...</literal> which can use a parallel plan for
the underlying <literal>SELECT</literal> part of the query.
</para>
</listitem>

View File

@ -1014,6 +1014,32 @@ IsInParallelMode(void)
return CurrentTransactionState->parallelModeLevel != 0;
}
/*
* PrepareParallelModePlanExec
*
* Prepare for entering parallel mode plan execution, based on command-type.
*/
void
PrepareParallelModePlanExec(CmdType commandType)
{
if (IsModifySupportedInParallelMode(commandType))
{
Assert(!IsInParallelMode());
/*
* Prepare for entering parallel mode by assigning a TransactionId.
* Failure to do this now would result in heap_insert() subsequently
* attempting to assign a TransactionId whilst in parallel-mode, which
* is not allowed.
*
* This approach has a disadvantage in that if the underlying SELECT
* does not return any rows, then the TransactionId is not used,
* however that shouldn't happen in practice in many cases.
*/
(void) GetCurrentTransactionId();
}
}
/*
* CommandCounterIncrement
*/

View File

@ -1512,7 +1512,10 @@ ExecutePlan(EState *estate,
estate->es_use_parallel_mode = use_parallel_mode;
if (use_parallel_mode)
{
PrepareParallelModePlanExec(estate->es_plannedstmt->commandType);
EnterParallelMode();
}
/*
* Loop until we've processed the proper number of tuples from the plan.

View File

@ -96,6 +96,7 @@ _copyPlannedStmt(const PlannedStmt *from)
COPY_BITMAPSET_FIELD(rewindPlanIDs);
COPY_NODE_FIELD(rowMarks);
COPY_NODE_FIELD(relationOids);
COPY_NODE_FIELD(partitionOids);
COPY_NODE_FIELD(invalItems);
COPY_NODE_FIELD(paramExecTypes);
COPY_NODE_FIELD(utilityStmt);

View File

@ -314,6 +314,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
WRITE_BITMAPSET_FIELD(rewindPlanIDs);
WRITE_NODE_FIELD(rowMarks);
WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(partitionOids);
WRITE_NODE_FIELD(invalItems);
WRITE_NODE_FIELD(paramExecTypes);
WRITE_NODE_FIELD(utilityStmt);
@ -2221,6 +2222,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node)
WRITE_NODE_FIELD(resultRelations);
WRITE_NODE_FIELD(appendRelations);
WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(partitionOids);
WRITE_NODE_FIELD(invalItems);
WRITE_NODE_FIELD(paramExecTypes);
WRITE_UINT_FIELD(lastPHId);

View File

@ -1590,6 +1590,7 @@ _readPlannedStmt(void)
READ_BITMAPSET_FIELD(rewindPlanIDs);
READ_NODE_FIELD(rowMarks);
READ_NODE_FIELD(relationOids);
READ_NODE_FIELD(partitionOids);
READ_NODE_FIELD(invalItems);
READ_NODE_FIELD(paramExecTypes);
READ_NODE_FIELD(utilityStmt);

View File

@ -305,6 +305,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
glob->resultRelations = NIL;
glob->appendRelations = NIL;
glob->relationOids = NIL;
glob->partitionOids = NIL;
glob->invalItems = NIL;
glob->paramExecTypes = NIL;
glob->lastPHId = 0;
@ -316,16 +317,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
/*
* Assess whether it's feasible to use parallel mode for this query. We
* can't do this in a standalone backend, or if the command will try to
* modify any data, or if this is a cursor operation, or if GUCs are set
* to values that don't permit parallelism, or if parallel-unsafe
* functions are present in the query tree.
* modify any data (except for Insert), or if this is a cursor operation,
* or if GUCs are set to values that don't permit parallelism, or if
* parallel-unsafe functions are present in the query tree.
*
* (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
* MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
* backend writes into a completely new table. In the future, we can
* extend it to allow workers to write into the table. However, to allow
* parallel updates and deletes, we have to solve other problems,
* especially around combo CIDs.)
* (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT
* INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as
* of now, only the leader backend writes into a completely new table. In
* the future, we can extend it to allow workers to write into the table.
* However, to allow parallel updates and deletes, we have to solve other
* problems, especially around combo CIDs.)
*
* For now, we don't try to use parallel mode if we're running inside a
* parallel worker. We might eventually be able to relax this
@ -334,13 +335,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
*/
if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster &&
parse->commandType == CMD_SELECT &&
(parse->commandType == CMD_SELECT ||
is_parallel_allowed_for_modify(parse)) &&
!parse->hasModifyingCTE &&
max_parallel_workers_per_gather > 0 &&
!IsParallelWorker())
{
/* all the cheap tests pass, so scan the query tree */
glob->maxParallelHazard = max_parallel_hazard(parse);
glob->maxParallelHazard = max_parallel_hazard(parse, glob);
glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
}
else
@ -521,6 +523,19 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
result->rewindPlanIDs = glob->rewindPlanIDs;
result->rowMarks = glob->finalrowmarks;
result->relationOids = glob->relationOids;
/*
* Register the Oids of parallel-safety-checked partitions as plan
* dependencies. This is only really needed in the case of a parallel plan
* so that if parallel-unsafe properties are subsequently defined on the
* partitions, the cached parallel plan will be invalidated, and a
* non-parallel plan will be generated.
*
* We also use this list to acquire locks on partitions before executing
* cached plan. See AcquireExecutorLocks().
*/
if (glob->partitionOids != NIL && glob->parallelModeNeeded)
result->partitionOids = glob->partitionOids;
result->invalItems = glob->invalItems;
result->paramExecTypes = glob->paramExecTypes;
/* utilityStmt should be null, but we might as well copy it */

View File

@ -19,13 +19,18 @@
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_language.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/functions.h"
#include "funcapi.h"
@ -43,6 +48,8 @@
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_func.h"
#include "parser/parsetree.h"
#include "partitioning/partdesc.h"
#include "rewrite/rewriteManip.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@ -51,6 +58,8 @@
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/partcache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
@ -88,6 +97,9 @@ typedef struct
char max_hazard; /* worst proparallel hazard found so far */
char max_interesting; /* worst proparallel hazard of interest */
List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
RangeTblEntry *target_rte; /* query's target relation if any */
CmdType command_type; /* query's command type */
PlannerGlobal *planner_global; /* global info for planner invocation */
} max_parallel_hazard_context;
static bool contain_agg_clause_walker(Node *node, void *context);
@ -98,6 +110,20 @@ static bool contain_volatile_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
static bool max_parallel_hazard_walker(Node *node,
max_parallel_hazard_context *context);
static bool target_rel_max_parallel_hazard(max_parallel_hazard_context *context);
static bool target_rel_max_parallel_hazard_recurse(Relation relation,
CmdType command_type,
max_parallel_hazard_context *context);
static bool target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc,
max_parallel_hazard_context *context);
static bool target_rel_index_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool target_rel_domain_max_parallel_hazard(Oid typid,
max_parallel_hazard_context *context);
static bool target_rel_partitions_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool target_rel_chk_constr_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context);
static bool contain_nonstrict_functions_walker(Node *node, void *context);
static bool contain_exec_param_walker(Node *node, List *param_ids);
static bool contain_context_dependent_node(Node *clause);
@ -545,14 +571,19 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
* later, in the common case where everything is SAFE.
*/
char
max_parallel_hazard(Query *parse)
max_parallel_hazard(Query *parse, PlannerGlobal *glob)
{
max_parallel_hazard_context context;
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_UNSAFE;
context.safe_param_ids = NIL;
context.target_rte = parse->resultRelation > 0 ?
rt_fetch(parse->resultRelation, parse->rtable) : NULL;
context.command_type = parse->commandType;
context.planner_global = glob;
(void) max_parallel_hazard_walker((Node *) parse, &context);
return context.max_hazard;
}
@ -583,6 +614,9 @@ is_parallel_safe(PlannerInfo *root, Node *node)
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_RESTRICTED;
context.safe_param_ids = NIL;
context.command_type = node != NULL && IsA(node, Query) ?
castNode(Query, node)->commandType : CMD_UNKNOWN;
context.planner_global = root->glob;
/*
* The params that refer to the same or parent query level are considered
@ -655,14 +689,20 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
* opclass support functions are generally parallel-safe. XmlExpr is a
* bit more dubious but we can probably get away with it. We err on the
* side of caution by treating CoerceToDomain as parallel-restricted.
* (Note: in principle that's wrong because a domain constraint could
* contain a parallel-unsafe function; but useful constraints probably
* never would have such, and assuming they do would cripple use of
* parallel query in the presence of domain types.) SQLValueFunction
* should be safe in all cases. NextValueExpr is parallel-unsafe.
* However, for table modification statements, we check the parallel
* safety of domain constraints as that could contain a parallel-unsafe
* function, and executing that in parallel mode will lead to error.
* SQLValueFunction should be safe in all cases. NextValueExpr is
* parallel-unsafe.
*/
if (IsA(node, CoerceToDomain))
{
if (context->target_rte != NULL)
{
if (target_rel_domain_max_parallel_hazard(((CoerceToDomain *) node)->resulttype, context))
return true;
}
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
return true;
}
@ -687,6 +727,27 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
return true;
}
/*
* ModifyingCTE expressions are treated as parallel-unsafe.
*
* XXX Normally, if the Query has a modifying CTE, the hasModifyingCTE
* flag is set in the Query tree, and the query will be regarded as
* parallel-usafe. However, in some cases, a re-written query with a
* modifying CTE does not have that flag set, due to a bug in the query
* rewriter.
*/
else if (IsA(node, CommonTableExpr))
{
CommonTableExpr *cte = (CommonTableExpr *) node;
Query *ctequery = castNode(Query, cte->ctequery);
if (ctequery->commandType != CMD_SELECT)
{
context->max_hazard = PROPARALLEL_UNSAFE;
return true;
}
}
/*
* As a notational convenience for callers, look through RestrictInfo.
*/
@ -757,6 +818,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
}
return false; /* nothing to recurse to */
}
else if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rte = (RangeTblEntry *) node;
/* Nothing interesting to check for SELECTs */
if (context->target_rte == NULL)
return false;
if (rte == context->target_rte)
return target_rel_max_parallel_hazard(context);
return false;
}
/*
* When we're first invoked on a completely unplanned tree, we must
@ -777,7 +851,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
/* Recurse into subselects */
return query_tree_walker(query,
max_parallel_hazard_walker,
context, 0);
context,
context->target_rte != NULL ?
QTW_EXAMINE_RTES_BEFORE : 0);
}
/* Recurse to check arguments */
@ -786,6 +862,466 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
context);
}
/*
* target_rel_max_parallel_hazard
*
* Determines the maximum parallel-mode hazard level for modification
* of a specified relation.
*/
static bool
target_rel_max_parallel_hazard(max_parallel_hazard_context *context)
{
bool max_hazard_found;
Relation targetRel;
/*
* The target table is already locked by the caller (this is done in the
* parse/analyze phase), and remains locked until end-of-transaction.
*/
targetRel = table_open(context->target_rte->relid, NoLock);
max_hazard_found = target_rel_max_parallel_hazard_recurse(targetRel,
context->command_type,
context);
table_close(targetRel, NoLock);
return max_hazard_found;
}
static bool
target_rel_max_parallel_hazard_recurse(Relation rel,
CmdType command_type,
max_parallel_hazard_context *context)
{
/* Currently only CMD_INSERT is supported */
Assert(command_type == CMD_INSERT);
/*
* We can't support table modification in a parallel worker if it's a
* foreign table/partition (no FDW API for supporting parallel access) or
* a temporary table.
*/
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
RelationUsesLocalBuffers(rel))
{
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
return true;
}
/*
* If a partitioned table, check that each partition is safe for
* modification in parallel-mode.
*/
if (target_rel_partitions_max_parallel_hazard(rel, context))
return true;
/*
* If there are any index expressions or index predicate, check that they
* are parallel-mode safe.
*/
if (target_rel_index_max_parallel_hazard(rel, context))
return true;
/*
* If any triggers exist, check that they are parallel-safe.
*/
if (target_rel_trigger_max_parallel_hazard(rel->trigdesc, context))
return true;
/*
* Column default expressions are only applicable to INSERT and UPDATE.
* For columns in the target-list, these are already being checked for
* parallel-safety in the max_parallel_hazard() scan of the query tree in
* standard_planner(), so there's no need to do it here. Note that even
* though column defaults may be specified separately for each partition
* in a partitioned table, a partition's default value is not applied when
* inserting a tuple through a partitioned table.
*/
/*
* CHECK constraints are only applicable to INSERT and UPDATE. If any
* CHECK constraints exist, determine if they are parallel-safe.
*/
if (target_rel_chk_constr_max_parallel_hazard(rel, context))
return true;
return false;
}
/*
* target_rel_trigger_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for the specified trigger data.
*/
static bool
target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc,
max_parallel_hazard_context *context)
{
int i;
if (trigdesc == NULL)
return false;
for (i = 0; i < trigdesc->numtriggers; i++)
{
int trigtype;
Trigger *trigger = &trigdesc->triggers[i];
if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context))
return true;
/*
* If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in
* the relation, and this would result in creation of new CommandIds
* on insert/update and this isn't supported in a parallel worker (but
* is safe in the parallel leader).
*/
trigtype = RI_FKey_trigger_type(trigger->tgfoid);
if (trigtype == RI_TRIGGER_FK)
{
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
return true;
}
}
return false;
}
/*
* target_rel_index_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for any existing index
* expressions or index predicate of a specified relation.
*/
static bool
target_rel_index_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
List *index_oid_list;
ListCell *lc;
bool found_max_hazard = false;
LOCKMODE lockmode = AccessShareLock;
index_oid_list = RelationGetIndexList(rel);
foreach(lc, index_oid_list)
{
Relation index_rel;
Form_pg_index indexStruct;
List *ii_Expressions;
List *ii_Predicate;
Oid index_oid = lfirst_oid(lc);
index_rel = index_open(index_oid, lockmode);
indexStruct = index_rel->rd_index;
ii_Expressions = RelationGetIndexExpressions(index_rel);
if (ii_Expressions != NIL)
{
int i;
ListCell *index_expr_item = list_head(ii_Expressions);
for (i = 0; i < indexStruct->indnatts; i++)
{
int keycol = indexStruct->indkey.values[i];
if (keycol == 0)
{
/* Found an index expression */
Node *index_expr;
Assert(index_expr_item != NULL);
if (index_expr_item == NULL) /* shouldn't happen */
{
elog(WARNING, "too few entries in indexprs list");
context->max_hazard = PROPARALLEL_UNSAFE;
found_max_hazard = true;
break;
}
index_expr = (Node *) lfirst(index_expr_item);
if (max_parallel_hazard_walker(index_expr, context))
{
found_max_hazard = true;
break;
}
index_expr_item = lnext(ii_Expressions, index_expr_item);
}
}
}
if (!found_max_hazard)
{
ii_Predicate = RelationGetIndexPredicate(index_rel);
if (ii_Predicate != NIL)
{
if (max_parallel_hazard_walker((Node *) ii_Predicate, context))
found_max_hazard = true;
}
}
/*
* XXX We don't need to retain lock on index as index expressions
* can't be changed later.
*/
index_close(index_rel, lockmode);
}
list_free(index_oid_list);
return found_max_hazard;
}
/*
* target_rel_domain_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for the specified DOMAIN type.
* Only any CHECK expressions are examined for parallel-safety.
*/
static bool
target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context)
{
Relation con_rel;
ScanKeyData key[1];
SysScanDesc scan;
HeapTuple tup;
bool found_max_hazard = false;
LOCKMODE lockmode = AccessShareLock;
con_rel = table_open(ConstraintRelationId, lockmode);
ScanKeyInit(&key[0],
Anum_pg_constraint_contypid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(typid));
scan = systable_beginscan(con_rel, ConstraintTypidIndexId, true,
NULL, 1, key);
while (HeapTupleIsValid((tup = systable_getnext(scan))))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
if (con->contype == CONSTRAINT_CHECK)
{
char *conbin;
Datum val;
bool isnull;
Expr *check_expr;
val = SysCacheGetAttr(CONSTROID, tup,
Anum_pg_constraint_conbin, &isnull);
Assert(!isnull);
if (isnull)
{
/*
* This shouldn't ever happen, but if it does, log a WARNING
* and return UNSAFE, rather than erroring out.
*/
elog(WARNING, "null conbin for constraint %u", con->oid);
context->max_hazard = PROPARALLEL_UNSAFE;
found_max_hazard = true;
break;
}
conbin = TextDatumGetCString(val);
check_expr = stringToNode(conbin);
pfree(conbin);
if (max_parallel_hazard_walker((Node *) check_expr, context))
{
found_max_hazard = true;
break;
}
}
}
systable_endscan(scan);
table_close(con_rel, lockmode);
return found_max_hazard;
}
/*
* target_rel_partitions_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for any partitions of a
* of a specified relation.
*/
static bool
target_rel_partitions_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
int i;
PartitionDesc pdesc;
PartitionKey pkey;
ListCell *partexprs_item;
int partnatts;
List *partexprs;
PlannerGlobal *glob;
if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
return false;
pkey = RelationGetPartitionKey(rel);
partnatts = get_partition_natts(pkey);
partexprs = get_partition_exprs(pkey);
partexprs_item = list_head(partexprs);
for (i = 0; i < partnatts; i++)
{
/* Check parallel-safety of partition key support functions */
if (OidIsValid(pkey->partsupfunc[i].fn_oid))
{
if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context))
return true;
}
/* Check parallel-safety of any expressions in the partition key */
if (get_partition_col_attnum(pkey, i) == 0)
{
Node *check_expr = (Node *) lfirst(partexprs_item);
if (max_parallel_hazard_walker(check_expr, context))
return true;
partexprs_item = lnext(partexprs, partexprs_item);
}
}
/* Recursively check each partition ... */
/* Create the PartitionDirectory infrastructure if we didn't already */
glob = context->planner_global;
if (glob->partition_directory == NULL)
glob->partition_directory =
CreatePartitionDirectory(CurrentMemoryContext);
pdesc = PartitionDirectoryLookup(glob->partition_directory, rel);
for (i = 0; i < pdesc->nparts; i++)
{
bool max_hazard_found;
Relation part_rel;
/*
* The partition needs to be locked, and remain locked until
* end-of-transaction to ensure its parallel-safety state is not
* hereafter altered.
*/
part_rel = table_open(pdesc->oids[i], context->target_rte->rellockmode);
max_hazard_found = target_rel_max_parallel_hazard_recurse(part_rel,
context->command_type,
context);
table_close(part_rel, NoLock);
/*
* Remember partitionOids to record the partition as a potential plan
* dependency.
*/
glob->partitionOids = lappend_oid(glob->partitionOids, pdesc->oids[i]);
if (max_hazard_found)
return true;
}
return false;
}
/*
* target_rel_chk_constr_max_parallel_hazard
*
* Finds the maximum parallel-mode hazard level for any CHECK expressions or
* CHECK constraints related to the specified relation.
*/
static bool
target_rel_chk_constr_max_parallel_hazard(Relation rel,
max_parallel_hazard_context *context)
{
TupleDesc tupdesc;
tupdesc = RelationGetDescr(rel);
/*
* Determine if there are any CHECK constraints which are not
* parallel-safe.
*/
if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
{
int i;
ConstrCheck *check = tupdesc->constr->check;
for (i = 0; i < tupdesc->constr->num_check; i++)
{
Expr *check_expr = stringToNode(check->ccbin);
if (max_parallel_hazard_walker((Node *) check_expr, context))
return true;
}
}
return false;
}
/*
* is_parallel_allowed_for_modify
*
* Check at a high-level if parallel mode is able to be used for the specified
* table-modification statement. Currently, we support only Inserts.
*
* It's not possible in the following cases:
*
* 1) INSERT...ON CONFLICT...DO UPDATE
* 2) INSERT without SELECT
*
* (Note: we don't do in-depth parallel-safety checks here, we do only the
* cheaper tests that can quickly exclude obvious cases for which
* parallelism isn't supported, to avoid having to do further parallel-safety
* checks for these)
*/
bool
is_parallel_allowed_for_modify(Query *parse)
{
bool hasSubQuery;
RangeTblEntry *rte;
ListCell *lc;
if (!IsModifySupportedInParallelMode(parse->commandType))
return false;
/*
* UPDATE is not currently supported in parallel-mode, so prohibit
* INSERT...ON CONFLICT...DO UPDATE...
*
* In order to support update, even if only in the leader, some further
* work would need to be done. A mechanism would be needed for sharing
* combo-cids between leader and workers during parallel-mode, since for
* example, the leader might generate a combo-cid and it needs to be
* propagated to the workers.
*/
if (parse->commandType == CMD_INSERT &&
parse->onConflict != NULL &&
parse->onConflict->action == ONCONFLICT_UPDATE)
return false;
/*
* If there is no underlying SELECT, a parallel insert operation is not
* desirable.
*/
hasSubQuery = false;
foreach(lc, parse->rtable)
{
rte = lfirst_node(RangeTblEntry, lc);
if (rte->rtekind == RTE_SUBQUERY)
{
hasSubQuery = true;
break;
}
}
return hasSubQuery;
}
/*****************************************************************************
* Check clauses for nonstrict functions

View File

@ -1735,6 +1735,23 @@ QueryListGetPrimaryStmt(List *stmts)
return NULL;
}
static void
AcquireExecutorLocksOnPartitions(List *partitionOids, int lockmode,
bool acquire)
{
ListCell *lc;
foreach(lc, partitionOids)
{
Oid partOid = lfirst_oid(lc);
if (acquire)
LockRelationOid(partOid, lockmode);
else
UnlockRelationOid(partOid, lockmode);
}
}
/*
* AcquireExecutorLocks: acquire locks needed for execution of a cached plan;
* or release them if acquire is false.
@ -1748,6 +1765,8 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
{
PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc1);
ListCell *lc2;
Index rti,
resultRelation = 0;
if (plannedstmt->commandType == CMD_UTILITY)
{
@ -1765,6 +1784,9 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
continue;
}
rti = 1;
if (plannedstmt->resultRelations)
resultRelation = linitial_int(plannedstmt->resultRelations);
foreach(lc2, plannedstmt->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2);
@ -1782,6 +1804,14 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
LockRelationOid(rte->relid, rte->rellockmode);
else
UnlockRelationOid(rte->relid, rte->rellockmode);
/* Lock partitions ahead of modifying them in parallel mode. */
if (rti == resultRelation &&
plannedstmt->partitionOids != NIL)
AcquireExecutorLocksOnPartitions(plannedstmt->partitionOids,
rte->rellockmode, acquire);
rti++;
}
}
}
@ -1990,7 +2020,8 @@ PlanCacheRelCallback(Datum arg, Oid relid)
if (plannedstmt->commandType == CMD_UTILITY)
continue; /* Ignore utility statements */
if ((relid == InvalidOid) ? plannedstmt->relationOids != NIL :
list_member_oid(plannedstmt->relationOids, relid))
(list_member_oid(plannedstmt->relationOids, relid) ||
list_member_oid(plannedstmt->partitionOids, relid)))
{
/* Invalidate the generic plan only */
plansource->gplan->is_valid = false;

View File

@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
extern void EnterParallelMode(void);
extern void ExitParallelMode(void);
extern bool IsInParallelMode(void);
extern void PrepareParallelModePlanExec(CmdType commandType);
/*
* IsModifySupportedInParallelMode
*
* Indicates whether execution of the specified table-modification command
* (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
* parallel-safety conditions.
*/
static inline bool
IsModifySupportedInParallelMode(CmdType commandType)
{
/* Currently only INSERT is supported */
return (commandType == CMD_INSERT);
}
#endif /* XACT_H */

View File

@ -120,6 +120,8 @@ typedef struct PlannerGlobal
List *relationOids; /* OIDs of relations the plan depends on */
List *partitionOids; /* OIDs of partitions the plan depends on */
List *invalItems; /* other dependencies, as PlanInvalItems */
List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */

View File

@ -79,6 +79,8 @@ typedef struct PlannedStmt
List *relationOids; /* OIDs of relations the plan depends on */
List *partitionOids; /* OIDs of partitions the plan depends on */
List *invalItems; /* other dependencies, as PlanInvalItems */
List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */

View File

@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause);
extern bool contain_subplans(Node *clause);
extern char max_parallel_hazard(Query *parse);
extern char max_parallel_hazard(Query *parse, PlannerGlobal *glob);
extern bool is_parallel_safe(PlannerInfo *root, Node *node);
extern bool contain_nonstrict_functions(Node *clause);
extern bool contain_exec_param(Node *clause, List *param_ids);
@ -52,5 +52,6 @@ extern void CommuteOpExpr(OpExpr *clause);
extern Query *inline_set_returning_function(PlannerInfo *root,
RangeTblEntry *rte);
extern bool is_parallel_allowed_for_modify(Query *parse);
#endif /* CLAUSES_H */

View File

@ -0,0 +1,536 @@
--
-- PARALLEL
--
--
-- START: setup some tables and data needed by the tests.
--
-- Setup - index expressions test
-- For testing purposes, we'll mark this function as parallel-unsafe
create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel unsafe;
create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel restricted;
create table names(index int, first_name text, last_name text);
create table names2(index int, first_name text, last_name text);
create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
create table names4(index int, first_name text, last_name text);
create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
insert into names values
(1, 'albert', 'einstein'),
(2, 'niels', 'bohr'),
(3, 'erwin', 'schrodinger'),
(4, 'leonhard', 'euler'),
(5, 'stephen', 'hawking'),
(6, 'isaac', 'newton'),
(7, 'alan', 'turing'),
(8, 'richard', 'feynman');
-- Setup - column default tests
create or replace function bdefault_unsafe ()
returns int language plpgsql parallel unsafe as $$
begin
RETURN 5;
end $$;
create or replace function cdefault_restricted ()
returns int language plpgsql parallel restricted as $$
begin
RETURN 10;
end $$;
create or replace function ddefault_safe ()
returns int language plpgsql parallel safe as $$
begin
RETURN 20;
end $$;
create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
create table test_data(a int);
insert into test_data select * from generate_series(1,10);
--
-- END: setup some tables and data needed by the tests.
--
-- Serializable isolation would disable parallel query, so explicitly use an
-- arbitrary other level.
begin isolation level repeatable read;
-- encourage use of parallel plans
set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
create table para_insert_p1 (
unique1 int4 PRIMARY KEY,
stringu1 name
);
create table para_insert_f1 (
unique1 int4 REFERENCES para_insert_p1(unique1),
stringu1 name
);
--
-- Test INSERT with underlying query.
-- (should create plan with parallel SELECT, Gather parent node)
--
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
QUERY PLAN
----------------------------------------
Insert on para_insert_p1
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on tenk1
(4 rows)
insert into para_insert_p1 select unique1, stringu1 from tenk1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
count | sum
-------+----------
10000 | 49995000
(1 row)
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
count
-------
1
(1 row)
--
-- Test INSERT with ordered underlying query.
-- (should create plan with parallel SELECT, GatherMerge parent node)
--
truncate para_insert_p1 cascade;
NOTICE: truncate cascades to table "para_insert_f1"
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
QUERY PLAN
----------------------------------------------
Insert on para_insert_p1
-> Gather Merge
Workers Planned: 4
-> Sort
Sort Key: tenk1.unique1
-> Parallel Seq Scan on tenk1
(6 rows)
insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
count | sum
-------+----------
10000 | 49995000
(1 row)
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
count
-------
1
(1 row)
--
-- Test INSERT with RETURNING clause.
-- (should create plan with parallel SELECT, Gather parent node)
--
create table test_data1(like test_data);
explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
QUERY PLAN
--------------------------------------------
Insert on test_data1
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on test_data
Filter: (a = 10)
(5 rows)
insert into test_data1 select * from test_data where a = 10 returning a as data;
data
------
10
(1 row)
--
-- Test INSERT into a table with a foreign key.
-- (Insert into a table with a foreign key is parallel-restricted,
-- as doing this in a parallel worker would create a new commandId
-- and within a worker this is not currently supported)
--
explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
QUERY PLAN
----------------------------------------
Insert on para_insert_f1
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on tenk1
(4 rows)
insert into para_insert_f1 select unique1, stringu1 from tenk1;
-- select some values to verify that the insert worked
select count(*), sum(unique1) from para_insert_f1;
count | sum
-------+----------
10000 | 49995000
(1 row)
--
-- Test INSERT with ON CONFLICT ... DO UPDATE ...
-- (should not create a parallel plan)
--
create table test_conflict_table(id serial primary key, somedata int);
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
QUERY PLAN
--------------------------------------------
Insert on test_conflict_table
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on test_data
(4 rows)
insert into test_conflict_table(id, somedata) select a, a from test_data;
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
QUERY PLAN
------------------------------------------------------
Insert on test_conflict_table
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: test_conflict_table_pkey
-> Seq Scan on test_data
(4 rows)
--
-- Test INSERT with parallel-unsafe index expression
-- (should not create a parallel plan)
--
explain (costs off) insert into names2 select * from names;
QUERY PLAN
-------------------------
Insert on names2
-> Seq Scan on names
(2 rows)
--
-- Test INSERT with parallel-restricted index expression
-- (should create a parallel plan)
--
explain (costs off) insert into names4 select * from names;
QUERY PLAN
----------------------------------------
Insert on names4
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
--
-- Test INSERT with underlying query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names5 (like names);
explain (costs off) insert into names5 select * from names returning *;
QUERY PLAN
----------------------------------------
Insert on names5
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
--
-- Test INSERT with underlying ordered query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names6 (like names);
explain (costs off) insert into names6 select * from names order by last_name returning *;
QUERY PLAN
----------------------------------------------
Insert on names6
-> Gather Merge
Workers Planned: 3
-> Sort
Sort Key: names.last_name
-> Parallel Seq Scan on names
(6 rows)
insert into names6 select * from names order by last_name returning *;
index | first_name | last_name
-------+------------+-------------
2 | niels | bohr
1 | albert | einstein
4 | leonhard | euler
8 | richard | feynman
5 | stephen | hawking
6 | isaac | newton
3 | erwin | schrodinger
7 | alan | turing
(8 rows)
--
-- Test INSERT with underlying ordered query - and RETURNING (with projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names7 (like names);
explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
QUERY PLAN
----------------------------------------------
Insert on names7
-> Gather Merge
Workers Planned: 3
-> Sort
Sort Key: names.last_name
-> Parallel Seq Scan on names
(6 rows)
insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
last_name_then_first_name
---------------------------
bohr, niels
einstein, albert
euler, leonhard
feynman, richard
hawking, stephen
newton, isaac
schrodinger, erwin
turing, alan
(8 rows)
--
-- Test INSERT into temporary table with underlying query.
-- (Insert into a temp table is parallel-restricted;
-- should create a parallel plan; parallel SELECT)
--
create temporary table temp_names (like names);
explain (costs off) insert into temp_names select * from names;
QUERY PLAN
----------------------------------------
Insert on temp_names
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
insert into temp_names select * from names;
--
-- Test INSERT with column defaults
--
--
--
-- Parallel unsafe column default, should not use a parallel plan
--
explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
QUERY PLAN
-----------------------------
Insert on testdef
-> Seq Scan on test_data
(2 rows)
--
-- Parallel restricted column default, should use parallel SELECT
--
explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
QUERY PLAN
--------------------------------------------
Insert on testdef
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on test_data
(4 rows)
insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
select * from testdef order by a;
a | b | c | d
----+----+----+----
1 | 2 | 10 | 8
2 | 4 | 10 | 16
3 | 6 | 10 | 24
4 | 8 | 10 | 32
5 | 10 | 10 | 40
6 | 12 | 10 | 48
7 | 14 | 10 | 56
8 | 16 | 10 | 64
9 | 18 | 10 | 72
10 | 20 | 10 | 80
(10 rows)
truncate testdef;
--
-- Parallel restricted and unsafe column defaults, should not use a parallel plan
--
explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
QUERY PLAN
-----------------------------
Insert on testdef
-> Seq Scan on test_data
(2 rows)
--
-- Test INSERT into partition with underlying query.
--
create table parttable1 (a int, b name) partition by range (a);
create table parttable1_1 partition of parttable1 for values from (0) to (5000);
create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
QUERY PLAN
----------------------------------------
Insert on parttable1
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on tenk1
(4 rows)
insert into parttable1 select unique1,stringu1 from tenk1;
select count(*) from parttable1_1;
count
-------
5000
(1 row)
select count(*) from parttable1_2;
count
-------
5000
(1 row)
--
-- Test INSERT into table with parallel-unsafe check constraint
-- (should not create a parallel plan)
--
create or replace function check_b_unsafe(b name) returns boolean as $$
begin
return (b <> 'XXXXXX');
end;
$$ language plpgsql parallel unsafe;
create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1;
QUERY PLAN
-------------------------
Insert on table_check_b
-> Seq Scan on tenk1
(2 rows)
--
-- Test INSERT into table with parallel-safe after stmt-level triggers
-- (should create a parallel SELECT plan; triggers should fire)
--
create table names_with_safe_trigger (like names);
create or replace function insert_after_trigger_safe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_safe';
return new;
end;
$$ language plpgsql parallel safe;
create trigger insert_after_trigger_safe after insert on names_with_safe_trigger
for each statement execute procedure insert_after_trigger_safe();
explain (costs off) insert into names_with_safe_trigger select * from names;
QUERY PLAN
----------------------------------------
Insert on names_with_safe_trigger
-> Gather
Workers Planned: 3
-> Parallel Seq Scan on names
(4 rows)
insert into names_with_safe_trigger select * from names;
NOTICE: hello from insert_after_trigger_safe
--
-- Test INSERT into table with parallel-unsafe after stmt-level triggers
-- (should not create a parallel plan; triggers should fire)
--
create table names_with_unsafe_trigger (like names);
create or replace function insert_after_trigger_unsafe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_unsafe';
return new;
end;
$$ language plpgsql parallel unsafe;
create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into names_with_unsafe_trigger select * from names;
QUERY PLAN
-------------------------------------
Insert on names_with_unsafe_trigger
-> Seq Scan on names
(2 rows)
insert into names_with_unsafe_trigger select * from names;
NOTICE: hello from insert_after_trigger_unsafe
--
-- Test INSERT into partition with parallel-unsafe trigger
-- (should not create a parallel plan)
--
create table part_unsafe_trigger (a int4, b name) partition by range (a);
create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1;
QUERY PLAN
-------------------------------
Insert on part_unsafe_trigger
-> Seq Scan on tenk1
(2 rows)
--
-- Test that parallel-safety-related changes to partitions are detected and
-- plan cache invalidation is working correctly.
--
create table rp (a int) partition by range (a);
create table rp1 partition of rp for values from (minvalue) to (0);
create table rp2 partition of rp for values from (0) to (maxvalue);
create table foo (a) as select unique1 from tenk1;
prepare q as insert into rp select * from foo where a%2 = 0;
-- should create a parallel plan
explain (costs off) execute q;
QUERY PLAN
--------------------------------------
Insert on rp
-> Gather
Workers Planned: 4
-> Parallel Seq Scan on foo
Filter: ((a % 2) = 0)
(5 rows)
create or replace function make_table_bar () returns trigger language
plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe;
create trigger ai_rp2 after insert on rp2 for each row execute
function make_table_bar();
-- should create a non-parallel plan
explain (costs off) execute q;
QUERY PLAN
-------------------------------
Insert on rp
-> Seq Scan on foo
Filter: ((a % 2) = 0)
(3 rows)
--
-- Test INSERT into table having a DOMAIN column with a CHECK constraint
--
create function sql_is_distinct_from_u(anyelement, anyelement)
returns boolean language sql parallel unsafe
as 'select $1 is distinct from $2 limit 1';
create domain inotnull_u int
check (sql_is_distinct_from_u(value, null));
create table dom_table_u (x inotnull_u, y int);
-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint
explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1;
QUERY PLAN
-------------------------
Insert on dom_table_u
-> Seq Scan on tenk1
(2 rows)
rollback;
--
-- Clean up anything not created in the transaction
--
drop table names;
drop index names2_fullname_idx;
drop table names2;
drop index names4_fullname_idx;
drop table names4;
drop table testdef;
drop table test_data;
drop function bdefault_unsafe;
drop function cdefault_restricted;
drop function ddefault_safe;
drop function fullname_parallel_unsafe;
drop function fullname_parallel_restricted;

View File

@ -90,6 +90,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
# run by itself so it can run parallel workers
test: select_parallel
test: write_parallel
test: insert_parallel
# no relation related tests can be put in this group
test: publication subscription

View File

@ -148,6 +148,7 @@ test: stats_ext
test: collate.linux.utf8
test: select_parallel
test: write_parallel
test: insert_parallel
test: publication
test: subscription
test: select_views

View File

@ -0,0 +1,335 @@
--
-- PARALLEL
--
--
-- START: setup some tables and data needed by the tests.
--
-- Setup - index expressions test
-- For testing purposes, we'll mark this function as parallel-unsafe
create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel unsafe;
create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
begin
return f || l;
end;
$$ language plpgsql immutable parallel restricted;
create table names(index int, first_name text, last_name text);
create table names2(index int, first_name text, last_name text);
create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
create table names4(index int, first_name text, last_name text);
create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
insert into names values
(1, 'albert', 'einstein'),
(2, 'niels', 'bohr'),
(3, 'erwin', 'schrodinger'),
(4, 'leonhard', 'euler'),
(5, 'stephen', 'hawking'),
(6, 'isaac', 'newton'),
(7, 'alan', 'turing'),
(8, 'richard', 'feynman');
-- Setup - column default tests
create or replace function bdefault_unsafe ()
returns int language plpgsql parallel unsafe as $$
begin
RETURN 5;
end $$;
create or replace function cdefault_restricted ()
returns int language plpgsql parallel restricted as $$
begin
RETURN 10;
end $$;
create or replace function ddefault_safe ()
returns int language plpgsql parallel safe as $$
begin
RETURN 20;
end $$;
create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
create table test_data(a int);
insert into test_data select * from generate_series(1,10);
--
-- END: setup some tables and data needed by the tests.
--
-- Serializable isolation would disable parallel query, so explicitly use an
-- arbitrary other level.
begin isolation level repeatable read;
-- encourage use of parallel plans
set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
create table para_insert_p1 (
unique1 int4 PRIMARY KEY,
stringu1 name
);
create table para_insert_f1 (
unique1 int4 REFERENCES para_insert_p1(unique1),
stringu1 name
);
--
-- Test INSERT with underlying query.
-- (should create plan with parallel SELECT, Gather parent node)
--
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
insert into para_insert_p1 select unique1, stringu1 from tenk1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
--
-- Test INSERT with ordered underlying query.
-- (should create plan with parallel SELECT, GatherMerge parent node)
--
truncate para_insert_p1 cascade;
explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
-- select some values to verify that the parallel insert worked
select count(*), sum(unique1) from para_insert_p1;
-- verify that the same transaction has been used by all parallel workers
select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
--
-- Test INSERT with RETURNING clause.
-- (should create plan with parallel SELECT, Gather parent node)
--
create table test_data1(like test_data);
explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
insert into test_data1 select * from test_data where a = 10 returning a as data;
--
-- Test INSERT into a table with a foreign key.
-- (Insert into a table with a foreign key is parallel-restricted,
-- as doing this in a parallel worker would create a new commandId
-- and within a worker this is not currently supported)
--
explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
insert into para_insert_f1 select unique1, stringu1 from tenk1;
-- select some values to verify that the insert worked
select count(*), sum(unique1) from para_insert_f1;
--
-- Test INSERT with ON CONFLICT ... DO UPDATE ...
-- (should not create a parallel plan)
--
create table test_conflict_table(id serial primary key, somedata int);
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
insert into test_conflict_table(id, somedata) select a, a from test_data;
explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
--
-- Test INSERT with parallel-unsafe index expression
-- (should not create a parallel plan)
--
explain (costs off) insert into names2 select * from names;
--
-- Test INSERT with parallel-restricted index expression
-- (should create a parallel plan)
--
explain (costs off) insert into names4 select * from names;
--
-- Test INSERT with underlying query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names5 (like names);
explain (costs off) insert into names5 select * from names returning *;
--
-- Test INSERT with underlying ordered query - and RETURNING (no projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names6 (like names);
explain (costs off) insert into names6 select * from names order by last_name returning *;
insert into names6 select * from names order by last_name returning *;
--
-- Test INSERT with underlying ordered query - and RETURNING (with projection)
-- (should create a parallel plan; parallel SELECT)
--
create table names7 (like names);
explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
--
-- Test INSERT into temporary table with underlying query.
-- (Insert into a temp table is parallel-restricted;
-- should create a parallel plan; parallel SELECT)
--
create temporary table temp_names (like names);
explain (costs off) insert into temp_names select * from names;
insert into temp_names select * from names;
--
-- Test INSERT with column defaults
--
--
--
-- Parallel unsafe column default, should not use a parallel plan
--
explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
--
-- Parallel restricted column default, should use parallel SELECT
--
explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
select * from testdef order by a;
truncate testdef;
--
-- Parallel restricted and unsafe column defaults, should not use a parallel plan
--
explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
--
-- Test INSERT into partition with underlying query.
--
create table parttable1 (a int, b name) partition by range (a);
create table parttable1_1 partition of parttable1 for values from (0) to (5000);
create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
insert into parttable1 select unique1,stringu1 from tenk1;
select count(*) from parttable1_1;
select count(*) from parttable1_2;
--
-- Test INSERT into table with parallel-unsafe check constraint
-- (should not create a parallel plan)
--
create or replace function check_b_unsafe(b name) returns boolean as $$
begin
return (b <> 'XXXXXX');
end;
$$ language plpgsql parallel unsafe;
create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1;
--
-- Test INSERT into table with parallel-safe after stmt-level triggers
-- (should create a parallel SELECT plan; triggers should fire)
--
create table names_with_safe_trigger (like names);
create or replace function insert_after_trigger_safe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_safe';
return new;
end;
$$ language plpgsql parallel safe;
create trigger insert_after_trigger_safe after insert on names_with_safe_trigger
for each statement execute procedure insert_after_trigger_safe();
explain (costs off) insert into names_with_safe_trigger select * from names;
insert into names_with_safe_trigger select * from names;
--
-- Test INSERT into table with parallel-unsafe after stmt-level triggers
-- (should not create a parallel plan; triggers should fire)
--
create table names_with_unsafe_trigger (like names);
create or replace function insert_after_trigger_unsafe() returns trigger as $$
begin
raise notice 'hello from insert_after_trigger_unsafe';
return new;
end;
$$ language plpgsql parallel unsafe;
create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into names_with_unsafe_trigger select * from names;
insert into names_with_unsafe_trigger select * from names;
--
-- Test INSERT into partition with parallel-unsafe trigger
-- (should not create a parallel plan)
--
create table part_unsafe_trigger (a int4, b name) partition by range (a);
create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1
for each statement execute procedure insert_after_trigger_unsafe();
explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1;
--
-- Test that parallel-safety-related changes to partitions are detected and
-- plan cache invalidation is working correctly.
--
create table rp (a int) partition by range (a);
create table rp1 partition of rp for values from (minvalue) to (0);
create table rp2 partition of rp for values from (0) to (maxvalue);
create table foo (a) as select unique1 from tenk1;
prepare q as insert into rp select * from foo where a%2 = 0;
-- should create a parallel plan
explain (costs off) execute q;
create or replace function make_table_bar () returns trigger language
plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe;
create trigger ai_rp2 after insert on rp2 for each row execute
function make_table_bar();
-- should create a non-parallel plan
explain (costs off) execute q;
--
-- Test INSERT into table having a DOMAIN column with a CHECK constraint
--
create function sql_is_distinct_from_u(anyelement, anyelement)
returns boolean language sql parallel unsafe
as 'select $1 is distinct from $2 limit 1';
create domain inotnull_u int
check (sql_is_distinct_from_u(value, null));
create table dom_table_u (x inotnull_u, y int);
-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint
explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1;
rollback;
--
-- Clean up anything not created in the transaction
--
drop table names;
drop index names2_fullname_idx;
drop table names2;
drop index names4_fullname_idx;
drop table names4;
drop table testdef;
drop table test_data;
drop function bdefault_unsafe;
drop function cdefault_restricted;
drop function ddefault_safe;
drop function fullname_parallel_unsafe;
drop function fullname_parallel_restricted;