diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index c81abff48d..cec1329e25 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -146,7 +146,9 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; a CTE, no parallel plans for that query will be generated. As an exception, the commands CREATE TABLE ... AS, SELECT INTO, and CREATE MATERIALIZED VIEW which create a new - table and populate it can use a parallel plan. + table and populate it can use a parallel plan. Another exception is the command + INSERT INTO ... SELECT ... which can use a parallel plan for + the underlying SELECT part of the query. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c83aa16f2c..6395a9b240 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1014,6 +1014,32 @@ IsInParallelMode(void) return CurrentTransactionState->parallelModeLevel != 0; } +/* + * PrepareParallelModePlanExec + * + * Prepare for entering parallel mode plan execution, based on command-type. + */ +void +PrepareParallelModePlanExec(CmdType commandType) +{ + if (IsModifySupportedInParallelMode(commandType)) + { + Assert(!IsInParallelMode()); + + /* + * Prepare for entering parallel mode by assigning a TransactionId. + * Failure to do this now would result in heap_insert() subsequently + * attempting to assign a TransactionId whilst in parallel-mode, which + * is not allowed. + * + * This approach has a disadvantage in that if the underlying SELECT + * does not return any rows, then the TransactionId is not used, + * however that shouldn't happen in practice in many cases. + */ + (void) GetCurrentTransactionId(); + } +} + /* * CommandCounterIncrement */ diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index c74ce36ffb..0648dd82ba 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1512,7 +1512,10 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) + { + PrepareParallelModePlanExec(estate->es_plannedstmt->commandType); EnterParallelMode(); + } /* * Loop until we've processed the proper number of tuples from the plan. diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index aaba1ec2c4..da91cbd2b1 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -96,6 +96,7 @@ _copyPlannedStmt(const PlannedStmt *from) COPY_BITMAPSET_FIELD(rewindPlanIDs); COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(relationOids); + COPY_NODE_FIELD(partitionOids); COPY_NODE_FIELD(invalItems); COPY_NODE_FIELD(paramExecTypes); COPY_NODE_FIELD(utilityStmt); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 8fc432bfe1..6493a03ff8 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -314,6 +314,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node) WRITE_BITMAPSET_FIELD(rewindPlanIDs); WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(relationOids); + WRITE_NODE_FIELD(partitionOids); WRITE_NODE_FIELD(invalItems); WRITE_NODE_FIELD(paramExecTypes); WRITE_NODE_FIELD(utilityStmt); @@ -2221,6 +2222,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node) WRITE_NODE_FIELD(resultRelations); WRITE_NODE_FIELD(appendRelations); WRITE_NODE_FIELD(relationOids); + WRITE_NODE_FIELD(partitionOids); WRITE_NODE_FIELD(invalItems); WRITE_NODE_FIELD(paramExecTypes); WRITE_UINT_FIELD(lastPHId); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 718fb58e86..c5e136e9c3 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1590,6 +1590,7 @@ _readPlannedStmt(void) READ_BITMAPSET_FIELD(rewindPlanIDs); READ_NODE_FIELD(rowMarks); READ_NODE_FIELD(relationOids); + READ_NODE_FIELD(partitionOids); READ_NODE_FIELD(invalItems); READ_NODE_FIELD(paramExecTypes); READ_NODE_FIELD(utilityStmt); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 545b56bcaf..424d25cbd5 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -305,6 +305,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->resultRelations = NIL; glob->appendRelations = NIL; glob->relationOids = NIL; + glob->partitionOids = NIL; glob->invalItems = NIL; glob->paramExecTypes = NIL; glob->lastPHId = 0; @@ -316,16 +317,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* * Assess whether it's feasible to use parallel mode for this query. We * can't do this in a standalone backend, or if the command will try to - * modify any data, or if this is a cursor operation, or if GUCs are set - * to values that don't permit parallelism, or if parallel-unsafe - * functions are present in the query tree. + * modify any data (except for Insert), or if this is a cursor operation, + * or if GUCs are set to values that don't permit parallelism, or if + * parallel-unsafe functions are present in the query tree. * - * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE - * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader - * backend writes into a completely new table. In the future, we can - * extend it to allow workers to write into the table. However, to allow - * parallel updates and deletes, we have to solve other problems, - * especially around combo CIDs.) + * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT + * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as + * of now, only the leader backend writes into a completely new table. In + * the future, we can extend it to allow workers to write into the table. + * However, to allow parallel updates and deletes, we have to solve other + * problems, especially around combo CIDs.) * * For now, we don't try to use parallel mode if we're running inside a * parallel worker. We might eventually be able to relax this @@ -334,13 +335,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || + is_parallel_allowed_for_modify(parse)) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) { /* all the cheap tests pass, so scan the query tree */ - glob->maxParallelHazard = max_parallel_hazard(parse); + glob->maxParallelHazard = max_parallel_hazard(parse, glob); glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); } else @@ -521,6 +523,19 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->rewindPlanIDs = glob->rewindPlanIDs; result->rowMarks = glob->finalrowmarks; result->relationOids = glob->relationOids; + + /* + * Register the Oids of parallel-safety-checked partitions as plan + * dependencies. This is only really needed in the case of a parallel plan + * so that if parallel-unsafe properties are subsequently defined on the + * partitions, the cached parallel plan will be invalidated, and a + * non-parallel plan will be generated. + * + * We also use this list to acquire locks on partitions before executing + * cached plan. See AcquireExecutorLocks(). + */ + if (glob->partitionOids != NIL && glob->parallelModeNeeded) + result->partitionOids = glob->partitionOids; result->invalItems = glob->invalItems; result->paramExecTypes = glob->paramExecTypes; /* utilityStmt should be null, but we might as well copy it */ diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index f3786dd2b6..7ecdc783d5 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -19,13 +19,18 @@ #include "postgres.h" +#include "access/genam.h" #include "access/htup_details.h" +#include "access/table.h" +#include "access/xact.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -43,6 +48,8 @@ #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_func.h" +#include "parser/parsetree.h" +#include "partitioning/partdesc.h" #include "rewrite/rewriteManip.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -51,6 +58,8 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/partcache.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -88,6 +97,9 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + RangeTblEntry *target_rte; /* query's target relation if any */ + CmdType command_type; /* query's command type */ + PlannerGlobal *planner_global; /* global info for planner invocation */ } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -98,6 +110,20 @@ static bool contain_volatile_functions_walker(Node *node, void *context); static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context); static bool max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context); +static bool target_rel_max_parallel_hazard(max_parallel_hazard_context *context); +static bool target_rel_max_parallel_hazard_recurse(Relation relation, + CmdType command_type, + max_parallel_hazard_context *context); +static bool target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc, + max_parallel_hazard_context *context); +static bool target_rel_index_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static bool target_rel_domain_max_parallel_hazard(Oid typid, + max_parallel_hazard_context *context); +static bool target_rel_partitions_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static bool target_rel_chk_constr_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); static bool contain_nonstrict_functions_walker(Node *node, void *context); static bool contain_exec_param_walker(Node *node, List *param_ids); static bool contain_context_dependent_node(Node *clause); @@ -545,14 +571,19 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context) * later, in the common case where everything is SAFE. */ char -max_parallel_hazard(Query *parse) +max_parallel_hazard(Query *parse, PlannerGlobal *glob) { max_parallel_hazard_context context; context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; context.safe_param_ids = NIL; + context.target_rte = parse->resultRelation > 0 ? + rt_fetch(parse->resultRelation, parse->rtable) : NULL; + context.command_type = parse->commandType; + context.planner_global = glob; (void) max_parallel_hazard_walker((Node *) parse, &context); + return context.max_hazard; } @@ -583,6 +614,9 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + context.command_type = node != NULL && IsA(node, Query) ? + castNode(Query, node)->commandType : CMD_UNKNOWN; + context.planner_global = root->glob; /* * The params that refer to the same or parent query level are considered @@ -655,14 +689,20 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) * opclass support functions are generally parallel-safe. XmlExpr is a * bit more dubious but we can probably get away with it. We err on the * side of caution by treating CoerceToDomain as parallel-restricted. - * (Note: in principle that's wrong because a domain constraint could - * contain a parallel-unsafe function; but useful constraints probably - * never would have such, and assuming they do would cripple use of - * parallel query in the presence of domain types.) SQLValueFunction - * should be safe in all cases. NextValueExpr is parallel-unsafe. + * However, for table modification statements, we check the parallel + * safety of domain constraints as that could contain a parallel-unsafe + * function, and executing that in parallel mode will lead to error. + * SQLValueFunction should be safe in all cases. NextValueExpr is + * parallel-unsafe. */ if (IsA(node, CoerceToDomain)) { + if (context->target_rte != NULL) + { + if (target_rel_domain_max_parallel_hazard(((CoerceToDomain *) node)->resulttype, context)) + return true; + } + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) return true; } @@ -687,6 +727,27 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) return true; } + /* + * ModifyingCTE expressions are treated as parallel-unsafe. + * + * XXX Normally, if the Query has a modifying CTE, the hasModifyingCTE + * flag is set in the Query tree, and the query will be regarded as + * parallel-usafe. However, in some cases, a re-written query with a + * modifying CTE does not have that flag set, due to a bug in the query + * rewriter. + */ + else if (IsA(node, CommonTableExpr)) + { + CommonTableExpr *cte = (CommonTableExpr *) node; + Query *ctequery = castNode(Query, cte->ctequery); + + if (ctequery->commandType != CMD_SELECT) + { + context->max_hazard = PROPARALLEL_UNSAFE; + return true; + } + } + /* * As a notational convenience for callers, look through RestrictInfo. */ @@ -757,6 +818,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) } return false; /* nothing to recurse to */ } + else if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rte = (RangeTblEntry *) node; + + /* Nothing interesting to check for SELECTs */ + if (context->target_rte == NULL) + return false; + + if (rte == context->target_rte) + return target_rel_max_parallel_hazard(context); + + return false; + } /* * When we're first invoked on a completely unplanned tree, we must @@ -777,7 +851,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* Recurse into subselects */ return query_tree_walker(query, max_parallel_hazard_walker, - context, 0); + context, + context->target_rte != NULL ? + QTW_EXAMINE_RTES_BEFORE : 0); } /* Recurse to check arguments */ @@ -786,6 +862,466 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) context); } +/* + * target_rel_max_parallel_hazard + * + * Determines the maximum parallel-mode hazard level for modification + * of a specified relation. + */ +static bool +target_rel_max_parallel_hazard(max_parallel_hazard_context *context) +{ + bool max_hazard_found; + + Relation targetRel; + + /* + * The target table is already locked by the caller (this is done in the + * parse/analyze phase), and remains locked until end-of-transaction. + */ + targetRel = table_open(context->target_rte->relid, NoLock); + max_hazard_found = target_rel_max_parallel_hazard_recurse(targetRel, + context->command_type, + context); + table_close(targetRel, NoLock); + + return max_hazard_found; +} + +static bool +target_rel_max_parallel_hazard_recurse(Relation rel, + CmdType command_type, + max_parallel_hazard_context *context) +{ + /* Currently only CMD_INSERT is supported */ + Assert(command_type == CMD_INSERT); + + /* + * We can't support table modification in a parallel worker if it's a + * foreign table/partition (no FDW API for supporting parallel access) or + * a temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + return true; + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + if (target_rel_partitions_max_parallel_hazard(rel, context)) + return true; + + /* + * If there are any index expressions or index predicate, check that they + * are parallel-mode safe. + */ + if (target_rel_index_max_parallel_hazard(rel, context)) + return true; + + /* + * If any triggers exist, check that they are parallel-safe. + */ + if (target_rel_trigger_max_parallel_hazard(rel->trigdesc, context)) + return true; + + /* + * Column default expressions are only applicable to INSERT and UPDATE. + * For columns in the target-list, these are already being checked for + * parallel-safety in the max_parallel_hazard() scan of the query tree in + * standard_planner(), so there's no need to do it here. Note that even + * though column defaults may be specified separately for each partition + * in a partitioned table, a partition's default value is not applied when + * inserting a tuple through a partitioned table. + */ + + /* + * CHECK constraints are only applicable to INSERT and UPDATE. If any + * CHECK constraints exist, determine if they are parallel-safe. + */ + if (target_rel_chk_constr_max_parallel_hazard(rel, context)) + return true; + + return false; +} + +/* + * target_rel_trigger_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for the specified trigger data. + */ +static bool +target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc, + max_parallel_hazard_context *context) +{ + int i; + + if (trigdesc == NULL) + return false; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + int trigtype; + Trigger *trigger = &trigdesc->triggers[i]; + + if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context)) + return true; + + /* + * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in + * the relation, and this would result in creation of new CommandIds + * on insert/update and this isn't supported in a parallel worker (but + * is safe in the parallel leader). + */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_FK) + { + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + return true; + } + } + + return false; +} + +/* + * target_rel_index_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for any existing index + * expressions or index predicate of a specified relation. + */ +static bool +target_rel_index_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + List *index_oid_list; + ListCell *lc; + bool found_max_hazard = false; + LOCKMODE lockmode = AccessShareLock; + + index_oid_list = RelationGetIndexList(rel); + foreach(lc, index_oid_list) + { + Relation index_rel; + Form_pg_index indexStruct; + List *ii_Expressions; + List *ii_Predicate; + Oid index_oid = lfirst_oid(lc); + + index_rel = index_open(index_oid, lockmode); + + indexStruct = index_rel->rd_index; + ii_Expressions = RelationGetIndexExpressions(index_rel); + + if (ii_Expressions != NIL) + { + int i; + ListCell *index_expr_item = list_head(ii_Expressions); + + for (i = 0; i < indexStruct->indnatts; i++) + { + int keycol = indexStruct->indkey.values[i]; + + if (keycol == 0) + { + /* Found an index expression */ + + Node *index_expr; + + Assert(index_expr_item != NULL); + if (index_expr_item == NULL) /* shouldn't happen */ + { + elog(WARNING, "too few entries in indexprs list"); + context->max_hazard = PROPARALLEL_UNSAFE; + found_max_hazard = true; + break; + } + + index_expr = (Node *) lfirst(index_expr_item); + + if (max_parallel_hazard_walker(index_expr, context)) + { + found_max_hazard = true; + break; + } + + index_expr_item = lnext(ii_Expressions, index_expr_item); + } + } + } + + if (!found_max_hazard) + { + ii_Predicate = RelationGetIndexPredicate(index_rel); + if (ii_Predicate != NIL) + { + if (max_parallel_hazard_walker((Node *) ii_Predicate, context)) + found_max_hazard = true; + } + } + + /* + * XXX We don't need to retain lock on index as index expressions + * can't be changed later. + */ + index_close(index_rel, lockmode); + } + list_free(index_oid_list); + + return found_max_hazard; +} + +/* + * target_rel_domain_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for the specified DOMAIN type. + * Only any CHECK expressions are examined for parallel-safety. + */ +static bool +target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context) +{ + Relation con_rel; + ScanKeyData key[1]; + SysScanDesc scan; + HeapTuple tup; + bool found_max_hazard = false; + + LOCKMODE lockmode = AccessShareLock; + + con_rel = table_open(ConstraintRelationId, lockmode); + + ScanKeyInit(&key[0], + Anum_pg_constraint_contypid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(typid)); + scan = systable_beginscan(con_rel, ConstraintTypidIndexId, true, + NULL, 1, key); + + while (HeapTupleIsValid((tup = systable_getnext(scan)))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup); + + if (con->contype == CONSTRAINT_CHECK) + { + char *conbin; + Datum val; + bool isnull; + Expr *check_expr; + + val = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_conbin, &isnull); + Assert(!isnull); + if (isnull) + { + /* + * This shouldn't ever happen, but if it does, log a WARNING + * and return UNSAFE, rather than erroring out. + */ + elog(WARNING, "null conbin for constraint %u", con->oid); + context->max_hazard = PROPARALLEL_UNSAFE; + found_max_hazard = true; + break; + } + conbin = TextDatumGetCString(val); + check_expr = stringToNode(conbin); + pfree(conbin); + if (max_parallel_hazard_walker((Node *) check_expr, context)) + { + found_max_hazard = true; + break; + } + } + } + + systable_endscan(scan); + table_close(con_rel, lockmode); + return found_max_hazard; +} + +/* + * target_rel_partitions_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for any partitions of a + * of a specified relation. + */ +static bool +target_rel_partitions_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + int i; + PartitionDesc pdesc; + PartitionKey pkey; + ListCell *partexprs_item; + int partnatts; + List *partexprs; + PlannerGlobal *glob; + + + if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + return false; + + pkey = RelationGetPartitionKey(rel); + + partnatts = get_partition_natts(pkey); + partexprs = get_partition_exprs(pkey); + + partexprs_item = list_head(partexprs); + for (i = 0; i < partnatts; i++) + { + /* Check parallel-safety of partition key support functions */ + if (OidIsValid(pkey->partsupfunc[i].fn_oid)) + { + if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context)) + return true; + } + + /* Check parallel-safety of any expressions in the partition key */ + if (get_partition_col_attnum(pkey, i) == 0) + { + Node *check_expr = (Node *) lfirst(partexprs_item); + + if (max_parallel_hazard_walker(check_expr, context)) + return true; + + partexprs_item = lnext(partexprs, partexprs_item); + } + } + + /* Recursively check each partition ... */ + + /* Create the PartitionDirectory infrastructure if we didn't already */ + glob = context->planner_global; + if (glob->partition_directory == NULL) + glob->partition_directory = + CreatePartitionDirectory(CurrentMemoryContext); + + pdesc = PartitionDirectoryLookup(glob->partition_directory, rel); + + for (i = 0; i < pdesc->nparts; i++) + { + bool max_hazard_found; + Relation part_rel; + + /* + * The partition needs to be locked, and remain locked until + * end-of-transaction to ensure its parallel-safety state is not + * hereafter altered. + */ + part_rel = table_open(pdesc->oids[i], context->target_rte->rellockmode); + max_hazard_found = target_rel_max_parallel_hazard_recurse(part_rel, + context->command_type, + context); + table_close(part_rel, NoLock); + + /* + * Remember partitionOids to record the partition as a potential plan + * dependency. + */ + glob->partitionOids = lappend_oid(glob->partitionOids, pdesc->oids[i]); + + if (max_hazard_found) + return true; + } + + return false; +} + +/* + * target_rel_chk_constr_max_parallel_hazard + * + * Finds the maximum parallel-mode hazard level for any CHECK expressions or + * CHECK constraints related to the specified relation. + */ +static bool +target_rel_chk_constr_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + TupleDesc tupdesc; + + tupdesc = RelationGetDescr(rel); + + /* + * Determine if there are any CHECK constraints which are not + * parallel-safe. + */ + if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0) + { + int i; + + ConstrCheck *check = tupdesc->constr->check; + + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check->ccbin); + + if (max_parallel_hazard_walker((Node *) check_expr, context)) + return true; + } + } + + return false; +} + +/* + * is_parallel_allowed_for_modify + * + * Check at a high-level if parallel mode is able to be used for the specified + * table-modification statement. Currently, we support only Inserts. + * + * It's not possible in the following cases: + * + * 1) INSERT...ON CONFLICT...DO UPDATE + * 2) INSERT without SELECT + * + * (Note: we don't do in-depth parallel-safety checks here, we do only the + * cheaper tests that can quickly exclude obvious cases for which + * parallelism isn't supported, to avoid having to do further parallel-safety + * checks for these) + */ +bool +is_parallel_allowed_for_modify(Query *parse) +{ + bool hasSubQuery; + RangeTblEntry *rte; + ListCell *lc; + + if (!IsModifySupportedInParallelMode(parse->commandType)) + return false; + + /* + * UPDATE is not currently supported in parallel-mode, so prohibit + * INSERT...ON CONFLICT...DO UPDATE... + * + * In order to support update, even if only in the leader, some further + * work would need to be done. A mechanism would be needed for sharing + * combo-cids between leader and workers during parallel-mode, since for + * example, the leader might generate a combo-cid and it needs to be + * propagated to the workers. + */ + if (parse->commandType == CMD_INSERT && + parse->onConflict != NULL && + parse->onConflict->action == ONCONFLICT_UPDATE) + return false; + + /* + * If there is no underlying SELECT, a parallel insert operation is not + * desirable. + */ + hasSubQuery = false; + foreach(lc, parse->rtable) + { + rte = lfirst_node(RangeTblEntry, lc); + if (rte->rtekind == RTE_SUBQUERY) + { + hasSubQuery = true; + break; + } + } + + return hasSubQuery; +} /***************************************************************************** * Check clauses for nonstrict functions diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 1a0950489d..c1f4128445 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -1735,6 +1735,23 @@ QueryListGetPrimaryStmt(List *stmts) return NULL; } +static void +AcquireExecutorLocksOnPartitions(List *partitionOids, int lockmode, + bool acquire) +{ + ListCell *lc; + + foreach(lc, partitionOids) + { + Oid partOid = lfirst_oid(lc); + + if (acquire) + LockRelationOid(partOid, lockmode); + else + UnlockRelationOid(partOid, lockmode); + } +} + /* * AcquireExecutorLocks: acquire locks needed for execution of a cached plan; * or release them if acquire is false. @@ -1748,6 +1765,8 @@ AcquireExecutorLocks(List *stmt_list, bool acquire) { PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc1); ListCell *lc2; + Index rti, + resultRelation = 0; if (plannedstmt->commandType == CMD_UTILITY) { @@ -1765,6 +1784,9 @@ AcquireExecutorLocks(List *stmt_list, bool acquire) continue; } + rti = 1; + if (plannedstmt->resultRelations) + resultRelation = linitial_int(plannedstmt->resultRelations); foreach(lc2, plannedstmt->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2); @@ -1782,6 +1804,14 @@ AcquireExecutorLocks(List *stmt_list, bool acquire) LockRelationOid(rte->relid, rte->rellockmode); else UnlockRelationOid(rte->relid, rte->rellockmode); + + /* Lock partitions ahead of modifying them in parallel mode. */ + if (rti == resultRelation && + plannedstmt->partitionOids != NIL) + AcquireExecutorLocksOnPartitions(plannedstmt->partitionOids, + rte->rellockmode, acquire); + + rti++; } } } @@ -1990,7 +2020,8 @@ PlanCacheRelCallback(Datum arg, Oid relid) if (plannedstmt->commandType == CMD_UTILITY) continue; /* Ignore utility statements */ if ((relid == InvalidOid) ? plannedstmt->relationOids != NIL : - list_member_oid(plannedstmt->relationOids, relid)) + (list_member_oid(plannedstmt->relationOids, relid) || + list_member_oid(plannedstmt->partitionOids, relid))) { /* Invalidate the generic plan only */ plansource->gplan->is_valid = false; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index f49a57b35e..34cfaf542c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +extern void PrepareParallelModePlanExec(CmdType commandType); + +/* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * parallel-safety conditions. + */ +static inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} #endif /* XACT_H */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index b8a6e0fc9f..86405a274e 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -120,6 +120,8 @@ typedef struct PlannerGlobal List *relationOids; /* OIDs of relations the plan depends on */ + List *partitionOids; /* OIDs of partitions the plan depends on */ + List *invalItems; /* other dependencies, as PlanInvalItems */ List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 6e62104d0b..95292d7573 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -79,6 +79,8 @@ typedef struct PlannedStmt List *relationOids; /* OIDs of relations the plan depends on */ + List *partitionOids; /* OIDs of partitions the plan depends on */ + List *invalItems; /* other dependencies, as PlanInvalItems */ List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */ diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 0673887a85..8d85b02514 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause); extern bool contain_subplans(Node *clause); -extern char max_parallel_hazard(Query *parse); +extern char max_parallel_hazard(Query *parse, PlannerGlobal *glob); extern bool is_parallel_safe(PlannerInfo *root, Node *node); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_exec_param(Node *clause, List *param_ids); @@ -52,5 +52,6 @@ extern void CommuteOpExpr(OpExpr *clause); extern Query *inline_set_returning_function(PlannerInfo *root, RangeTblEntry *rte); +extern bool is_parallel_allowed_for_modify(Query *parse); #endif /* CLAUSES_H */ diff --git a/src/test/regress/expected/insert_parallel.out b/src/test/regress/expected/insert_parallel.out new file mode 100644 index 0000000000..d5fae79031 --- /dev/null +++ b/src/test/regress/expected/insert_parallel.out @@ -0,0 +1,536 @@ +-- +-- PARALLEL +-- +-- +-- START: setup some tables and data needed by the tests. +-- +-- Setup - index expressions test +-- For testing purposes, we'll mark this function as parallel-unsafe +create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel unsafe; +create or replace function fullname_parallel_restricted(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel restricted; +create table names(index int, first_name text, last_name text); +create table names2(index int, first_name text, last_name text); +create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name)); +create table names4(index int, first_name text, last_name text); +create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name)); +insert into names values + (1, 'albert', 'einstein'), + (2, 'niels', 'bohr'), + (3, 'erwin', 'schrodinger'), + (4, 'leonhard', 'euler'), + (5, 'stephen', 'hawking'), + (6, 'isaac', 'newton'), + (7, 'alan', 'turing'), + (8, 'richard', 'feynman'); +-- Setup - column default tests +create or replace function bdefault_unsafe () +returns int language plpgsql parallel unsafe as $$ +begin + RETURN 5; +end $$; +create or replace function cdefault_restricted () +returns int language plpgsql parallel restricted as $$ +begin + RETURN 10; +end $$; +create or replace function ddefault_safe () +returns int language plpgsql parallel safe as $$ +begin + RETURN 20; +end $$; +create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe()); +create table test_data(a int); +insert into test_data select * from generate_series(1,10); +-- +-- END: setup some tables and data needed by the tests. +-- +-- Serializable isolation would disable parallel query, so explicitly use an +-- arbitrary other level. +begin isolation level repeatable read; +-- encourage use of parallel plans +set parallel_setup_cost=0; +set parallel_tuple_cost=0; +set min_parallel_table_scan_size=0; +set max_parallel_workers_per_gather=4; +create table para_insert_p1 ( + unique1 int4 PRIMARY KEY, + stringu1 name +); +create table para_insert_f1 ( + unique1 int4 REFERENCES para_insert_p1(unique1), + stringu1 name +); +-- +-- Test INSERT with underlying query. +-- (should create plan with parallel SELECT, Gather parent node) +-- +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1; + QUERY PLAN +---------------------------------------- + Insert on para_insert_p1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 +(4 rows) + +insert into para_insert_p1 select unique1, stringu1 from tenk1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; + count | sum +-------+---------- + 10000 | 49995000 +(1 row) + +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + count +------- + 1 +(1 row) + +-- +-- Test INSERT with ordered underlying query. +-- (should create plan with parallel SELECT, GatherMerge parent node) +-- +truncate para_insert_p1 cascade; +NOTICE: truncate cascades to table "para_insert_f1" +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; + QUERY PLAN +---------------------------------------------- + Insert on para_insert_p1 + -> Gather Merge + Workers Planned: 4 + -> Sort + Sort Key: tenk1.unique1 + -> Parallel Seq Scan on tenk1 +(6 rows) + +insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; + count | sum +-------+---------- + 10000 | 49995000 +(1 row) + +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + count +------- + 1 +(1 row) + +-- +-- Test INSERT with RETURNING clause. +-- (should create plan with parallel SELECT, Gather parent node) +-- +create table test_data1(like test_data); +explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data; + QUERY PLAN +-------------------------------------------- + Insert on test_data1 + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on test_data + Filter: (a = 10) +(5 rows) + +insert into test_data1 select * from test_data where a = 10 returning a as data; + data +------ + 10 +(1 row) + +-- +-- Test INSERT into a table with a foreign key. +-- (Insert into a table with a foreign key is parallel-restricted, +-- as doing this in a parallel worker would create a new commandId +-- and within a worker this is not currently supported) +-- +explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1; + QUERY PLAN +---------------------------------------- + Insert on para_insert_f1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 +(4 rows) + +insert into para_insert_f1 select unique1, stringu1 from tenk1; +-- select some values to verify that the insert worked +select count(*), sum(unique1) from para_insert_f1; + count | sum +-------+---------- + 10000 | 49995000 +(1 row) + +-- +-- Test INSERT with ON CONFLICT ... DO UPDATE ... +-- (should not create a parallel plan) +-- +create table test_conflict_table(id serial primary key, somedata int); +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data; + QUERY PLAN +-------------------------------------------- + Insert on test_conflict_table + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on test_data +(4 rows) + +insert into test_conflict_table(id, somedata) select a, a from test_data; +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1; + QUERY PLAN +------------------------------------------------------ + Insert on test_conflict_table + Conflict Resolution: UPDATE + Conflict Arbiter Indexes: test_conflict_table_pkey + -> Seq Scan on test_data +(4 rows) + +-- +-- Test INSERT with parallel-unsafe index expression +-- (should not create a parallel plan) +-- +explain (costs off) insert into names2 select * from names; + QUERY PLAN +------------------------- + Insert on names2 + -> Seq Scan on names +(2 rows) + +-- +-- Test INSERT with parallel-restricted index expression +-- (should create a parallel plan) +-- +explain (costs off) insert into names4 select * from names; + QUERY PLAN +---------------------------------------- + Insert on names4 + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on names +(4 rows) + +-- +-- Test INSERT with underlying query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names5 (like names); +explain (costs off) insert into names5 select * from names returning *; + QUERY PLAN +---------------------------------------- + Insert on names5 + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on names +(4 rows) + +-- +-- Test INSERT with underlying ordered query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names6 (like names); +explain (costs off) insert into names6 select * from names order by last_name returning *; + QUERY PLAN +---------------------------------------------- + Insert on names6 + -> Gather Merge + Workers Planned: 3 + -> Sort + Sort Key: names.last_name + -> Parallel Seq Scan on names +(6 rows) + +insert into names6 select * from names order by last_name returning *; + index | first_name | last_name +-------+------------+------------- + 2 | niels | bohr + 1 | albert | einstein + 4 | leonhard | euler + 8 | richard | feynman + 5 | stephen | hawking + 6 | isaac | newton + 3 | erwin | schrodinger + 7 | alan | turing +(8 rows) + +-- +-- Test INSERT with underlying ordered query - and RETURNING (with projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names7 (like names); +explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; + QUERY PLAN +---------------------------------------------- + Insert on names7 + -> Gather Merge + Workers Planned: 3 + -> Sort + Sort Key: names.last_name + -> Parallel Seq Scan on names +(6 rows) + +insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; + last_name_then_first_name +--------------------------- + bohr, niels + einstein, albert + euler, leonhard + feynman, richard + hawking, stephen + newton, isaac + schrodinger, erwin + turing, alan +(8 rows) + +-- +-- Test INSERT into temporary table with underlying query. +-- (Insert into a temp table is parallel-restricted; +-- should create a parallel plan; parallel SELECT) +-- +create temporary table temp_names (like names); +explain (costs off) insert into temp_names select * from names; + QUERY PLAN +---------------------------------------- + Insert on temp_names + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on names +(4 rows) + +insert into temp_names select * from names; +-- +-- Test INSERT with column defaults +-- +-- +-- +-- Parallel unsafe column default, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data; + QUERY PLAN +----------------------------- + Insert on testdef + -> Seq Scan on test_data +(2 rows) + +-- +-- Parallel restricted column default, should use parallel SELECT +-- +explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data; + QUERY PLAN +-------------------------------------------- + Insert on testdef + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on test_data +(4 rows) + +insert into testdef(a,b,d) select a,a*2,a*8 from test_data; +select * from testdef order by a; + a | b | c | d +----+----+----+---- + 1 | 2 | 10 | 8 + 2 | 4 | 10 | 16 + 3 | 6 | 10 | 24 + 4 | 8 | 10 | 32 + 5 | 10 | 10 | 40 + 6 | 12 | 10 | 48 + 7 | 14 | 10 | 56 + 8 | 16 | 10 | 64 + 9 | 18 | 10 | 72 + 10 | 20 | 10 | 80 +(10 rows) + +truncate testdef; +-- +-- Parallel restricted and unsafe column defaults, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,d) select a,a*8 from test_data; + QUERY PLAN +----------------------------- + Insert on testdef + -> Seq Scan on test_data +(2 rows) + +-- +-- Test INSERT into partition with underlying query. +-- +create table parttable1 (a int, b name) partition by range (a); +create table parttable1_1 partition of parttable1 for values from (0) to (5000); +create table parttable1_2 partition of parttable1 for values from (5000) to (10000); +explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1; + QUERY PLAN +---------------------------------------- + Insert on parttable1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 +(4 rows) + +insert into parttable1 select unique1,stringu1 from tenk1; +select count(*) from parttable1_1; + count +------- + 5000 +(1 row) + +select count(*) from parttable1_2; + count +------- + 5000 +(1 row) + +-- +-- Test INSERT into table with parallel-unsafe check constraint +-- (should not create a parallel plan) +-- +create or replace function check_b_unsafe(b name) returns boolean as $$ + begin + return (b <> 'XXXXXX'); + end; +$$ language plpgsql parallel unsafe; +create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name); +explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1; + QUERY PLAN +------------------------- + Insert on table_check_b + -> Seq Scan on tenk1 +(2 rows) + +-- +-- Test INSERT into table with parallel-safe after stmt-level triggers +-- (should create a parallel SELECT plan; triggers should fire) +-- +create table names_with_safe_trigger (like names); +create or replace function insert_after_trigger_safe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_safe'; + return new; + end; +$$ language plpgsql parallel safe; +create trigger insert_after_trigger_safe after insert on names_with_safe_trigger + for each statement execute procedure insert_after_trigger_safe(); +explain (costs off) insert into names_with_safe_trigger select * from names; + QUERY PLAN +---------------------------------------- + Insert on names_with_safe_trigger + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on names +(4 rows) + +insert into names_with_safe_trigger select * from names; +NOTICE: hello from insert_after_trigger_safe +-- +-- Test INSERT into table with parallel-unsafe after stmt-level triggers +-- (should not create a parallel plan; triggers should fire) +-- +create table names_with_unsafe_trigger (like names); +create or replace function insert_after_trigger_unsafe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_unsafe'; + return new; + end; +$$ language plpgsql parallel unsafe; +create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger + for each statement execute procedure insert_after_trigger_unsafe(); +explain (costs off) insert into names_with_unsafe_trigger select * from names; + QUERY PLAN +------------------------------------- + Insert on names_with_unsafe_trigger + -> Seq Scan on names +(2 rows) + +insert into names_with_unsafe_trigger select * from names; +NOTICE: hello from insert_after_trigger_unsafe +-- +-- Test INSERT into partition with parallel-unsafe trigger +-- (should not create a parallel plan) +-- +create table part_unsafe_trigger (a int4, b name) partition by range (a); +create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000); +create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000); +create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1 + for each statement execute procedure insert_after_trigger_unsafe(); +explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1; + QUERY PLAN +------------------------------- + Insert on part_unsafe_trigger + -> Seq Scan on tenk1 +(2 rows) + +-- +-- Test that parallel-safety-related changes to partitions are detected and +-- plan cache invalidation is working correctly. +-- +create table rp (a int) partition by range (a); +create table rp1 partition of rp for values from (minvalue) to (0); +create table rp2 partition of rp for values from (0) to (maxvalue); +create table foo (a) as select unique1 from tenk1; +prepare q as insert into rp select * from foo where a%2 = 0; +-- should create a parallel plan +explain (costs off) execute q; + QUERY PLAN +-------------------------------------- + Insert on rp + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on foo + Filter: ((a % 2) = 0) +(5 rows) + +create or replace function make_table_bar () returns trigger language +plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe; +create trigger ai_rp2 after insert on rp2 for each row execute +function make_table_bar(); +-- should create a non-parallel plan +explain (costs off) execute q; + QUERY PLAN +------------------------------- + Insert on rp + -> Seq Scan on foo + Filter: ((a % 2) = 0) +(3 rows) + +-- +-- Test INSERT into table having a DOMAIN column with a CHECK constraint +-- +create function sql_is_distinct_from_u(anyelement, anyelement) +returns boolean language sql parallel unsafe +as 'select $1 is distinct from $2 limit 1'; +create domain inotnull_u int + check (sql_is_distinct_from_u(value, null)); +create table dom_table_u (x inotnull_u, y int); +-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint +explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1; + QUERY PLAN +------------------------- + Insert on dom_table_u + -> Seq Scan on tenk1 +(2 rows) + +rollback; +-- +-- Clean up anything not created in the transaction +-- +drop table names; +drop index names2_fullname_idx; +drop table names2; +drop index names4_fullname_idx; +drop table names4; +drop table testdef; +drop table test_data; +drop function bdefault_unsafe; +drop function cdefault_restricted; +drop function ddefault_safe; +drop function fullname_parallel_unsafe; +drop function fullname_parallel_restricted; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index c77b0d7342..e280198b17 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -90,6 +90,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 # run by itself so it can run parallel workers test: select_parallel test: write_parallel +test: insert_parallel # no relation related tests can be put in this group test: publication subscription diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 0264a97324..6a57e889a1 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -148,6 +148,7 @@ test: stats_ext test: collate.linux.utf8 test: select_parallel test: write_parallel +test: insert_parallel test: publication test: subscription test: select_views diff --git a/src/test/regress/sql/insert_parallel.sql b/src/test/regress/sql/insert_parallel.sql new file mode 100644 index 0000000000..70ad31a087 --- /dev/null +++ b/src/test/regress/sql/insert_parallel.sql @@ -0,0 +1,335 @@ +-- +-- PARALLEL +-- + +-- +-- START: setup some tables and data needed by the tests. +-- + +-- Setup - index expressions test + +-- For testing purposes, we'll mark this function as parallel-unsafe +create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel unsafe; + +create or replace function fullname_parallel_restricted(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel restricted; + +create table names(index int, first_name text, last_name text); +create table names2(index int, first_name text, last_name text); +create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name)); +create table names4(index int, first_name text, last_name text); +create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name)); + +insert into names values + (1, 'albert', 'einstein'), + (2, 'niels', 'bohr'), + (3, 'erwin', 'schrodinger'), + (4, 'leonhard', 'euler'), + (5, 'stephen', 'hawking'), + (6, 'isaac', 'newton'), + (7, 'alan', 'turing'), + (8, 'richard', 'feynman'); + +-- Setup - column default tests + +create or replace function bdefault_unsafe () +returns int language plpgsql parallel unsafe as $$ +begin + RETURN 5; +end $$; + +create or replace function cdefault_restricted () +returns int language plpgsql parallel restricted as $$ +begin + RETURN 10; +end $$; + +create or replace function ddefault_safe () +returns int language plpgsql parallel safe as $$ +begin + RETURN 20; +end $$; + +create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe()); + +create table test_data(a int); +insert into test_data select * from generate_series(1,10); + +-- +-- END: setup some tables and data needed by the tests. +-- + +-- Serializable isolation would disable parallel query, so explicitly use an +-- arbitrary other level. +begin isolation level repeatable read; + +-- encourage use of parallel plans +set parallel_setup_cost=0; +set parallel_tuple_cost=0; +set min_parallel_table_scan_size=0; +set max_parallel_workers_per_gather=4; + +create table para_insert_p1 ( + unique1 int4 PRIMARY KEY, + stringu1 name +); + +create table para_insert_f1 ( + unique1 int4 REFERENCES para_insert_p1(unique1), + stringu1 name +); + + +-- +-- Test INSERT with underlying query. +-- (should create plan with parallel SELECT, Gather parent node) +-- +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1; +insert into para_insert_p1 select unique1, stringu1 from tenk1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + +-- +-- Test INSERT with ordered underlying query. +-- (should create plan with parallel SELECT, GatherMerge parent node) +-- +truncate para_insert_p1 cascade; +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; +insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + +-- +-- Test INSERT with RETURNING clause. +-- (should create plan with parallel SELECT, Gather parent node) +-- +create table test_data1(like test_data); +explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data; +insert into test_data1 select * from test_data where a = 10 returning a as data; + +-- +-- Test INSERT into a table with a foreign key. +-- (Insert into a table with a foreign key is parallel-restricted, +-- as doing this in a parallel worker would create a new commandId +-- and within a worker this is not currently supported) +-- +explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1; +insert into para_insert_f1 select unique1, stringu1 from tenk1; +-- select some values to verify that the insert worked +select count(*), sum(unique1) from para_insert_f1; + +-- +-- Test INSERT with ON CONFLICT ... DO UPDATE ... +-- (should not create a parallel plan) +-- +create table test_conflict_table(id serial primary key, somedata int); +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data; +insert into test_conflict_table(id, somedata) select a, a from test_data; +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1; + + +-- +-- Test INSERT with parallel-unsafe index expression +-- (should not create a parallel plan) +-- +explain (costs off) insert into names2 select * from names; + +-- +-- Test INSERT with parallel-restricted index expression +-- (should create a parallel plan) +-- +explain (costs off) insert into names4 select * from names; + +-- +-- Test INSERT with underlying query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names5 (like names); +explain (costs off) insert into names5 select * from names returning *; + +-- +-- Test INSERT with underlying ordered query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names6 (like names); +explain (costs off) insert into names6 select * from names order by last_name returning *; +insert into names6 select * from names order by last_name returning *; + +-- +-- Test INSERT with underlying ordered query - and RETURNING (with projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names7 (like names); +explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; +insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; + + +-- +-- Test INSERT into temporary table with underlying query. +-- (Insert into a temp table is parallel-restricted; +-- should create a parallel plan; parallel SELECT) +-- +create temporary table temp_names (like names); +explain (costs off) insert into temp_names select * from names; +insert into temp_names select * from names; + +-- +-- Test INSERT with column defaults +-- +-- + +-- +-- Parallel unsafe column default, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data; + +-- +-- Parallel restricted column default, should use parallel SELECT +-- +explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data; +insert into testdef(a,b,d) select a,a*2,a*8 from test_data; +select * from testdef order by a; +truncate testdef; + +-- +-- Parallel restricted and unsafe column defaults, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,d) select a,a*8 from test_data; + +-- +-- Test INSERT into partition with underlying query. +-- +create table parttable1 (a int, b name) partition by range (a); +create table parttable1_1 partition of parttable1 for values from (0) to (5000); +create table parttable1_2 partition of parttable1 for values from (5000) to (10000); + +explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1; +insert into parttable1 select unique1,stringu1 from tenk1; +select count(*) from parttable1_1; +select count(*) from parttable1_2; + +-- +-- Test INSERT into table with parallel-unsafe check constraint +-- (should not create a parallel plan) +-- +create or replace function check_b_unsafe(b name) returns boolean as $$ + begin + return (b <> 'XXXXXX'); + end; +$$ language plpgsql parallel unsafe; + +create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name); +explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1; + +-- +-- Test INSERT into table with parallel-safe after stmt-level triggers +-- (should create a parallel SELECT plan; triggers should fire) +-- +create table names_with_safe_trigger (like names); +create or replace function insert_after_trigger_safe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_safe'; + return new; + end; +$$ language plpgsql parallel safe; +create trigger insert_after_trigger_safe after insert on names_with_safe_trigger + for each statement execute procedure insert_after_trigger_safe(); +explain (costs off) insert into names_with_safe_trigger select * from names; +insert into names_with_safe_trigger select * from names; + +-- +-- Test INSERT into table with parallel-unsafe after stmt-level triggers +-- (should not create a parallel plan; triggers should fire) +-- +create table names_with_unsafe_trigger (like names); +create or replace function insert_after_trigger_unsafe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_unsafe'; + return new; + end; +$$ language plpgsql parallel unsafe; +create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger + for each statement execute procedure insert_after_trigger_unsafe(); +explain (costs off) insert into names_with_unsafe_trigger select * from names; +insert into names_with_unsafe_trigger select * from names; + +-- +-- Test INSERT into partition with parallel-unsafe trigger +-- (should not create a parallel plan) +-- + +create table part_unsafe_trigger (a int4, b name) partition by range (a); +create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000); +create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000); +create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1 + for each statement execute procedure insert_after_trigger_unsafe(); + +explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1; + +-- +-- Test that parallel-safety-related changes to partitions are detected and +-- plan cache invalidation is working correctly. +-- + +create table rp (a int) partition by range (a); +create table rp1 partition of rp for values from (minvalue) to (0); +create table rp2 partition of rp for values from (0) to (maxvalue); +create table foo (a) as select unique1 from tenk1; +prepare q as insert into rp select * from foo where a%2 = 0; +-- should create a parallel plan +explain (costs off) execute q; + +create or replace function make_table_bar () returns trigger language +plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe; +create trigger ai_rp2 after insert on rp2 for each row execute +function make_table_bar(); +-- should create a non-parallel plan +explain (costs off) execute q; + +-- +-- Test INSERT into table having a DOMAIN column with a CHECK constraint +-- +create function sql_is_distinct_from_u(anyelement, anyelement) +returns boolean language sql parallel unsafe +as 'select $1 is distinct from $2 limit 1'; + +create domain inotnull_u int + check (sql_is_distinct_from_u(value, null)); + +create table dom_table_u (x inotnull_u, y int); + + +-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint +explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1; + + +rollback; + +-- +-- Clean up anything not created in the transaction +-- + +drop table names; +drop index names2_fullname_idx; +drop table names2; +drop index names4_fullname_idx; +drop table names4; +drop table testdef; +drop table test_data; + +drop function bdefault_unsafe; +drop function cdefault_restricted; +drop function ddefault_safe; +drop function fullname_parallel_unsafe; +drop function fullname_parallel_restricted;