diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 5fd1c5553b..301e4acba3 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, * * The hashtable control block is just palloc'd from the executor's * per-query memory context. Everything else should be kept inside the - * subsidiary hashCxt or batchCxt. + * subsidiary hashCxt, batchCxt or spillCxt. */ hashtable = palloc_object(HashJoinTableData); hashtable->nbuckets = nbuckets; @@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, "HashBatchContext", ALLOCSET_DEFAULT_SIZES); + hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt, + "HashSpillContext", + ALLOCSET_DEFAULT_SIZES); + /* Allocate data that will live for the life of the hashjoin */ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); @@ -570,12 +574,19 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, if (nbatch > 1 && hashtable->parallel_state == NULL) { + MemoryContext oldctx; + /* * allocate and initialize the file arrays in hashCxt (not needed for * parallel case which uses shared tuplestores instead of raw files) */ + oldctx = MemoryContextSwitchTo(hashtable->spillCxt); + hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); + + MemoryContextSwitchTo(oldctx); + /* The files will not be opened until needed... */ /* ... but make sure we have temp tablespaces established for them */ PrepareTempTablespaces(); @@ -913,7 +924,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) int oldnbatch = hashtable->nbatch; int curbatch = hashtable->curbatch; int nbatch; - MemoryContext oldcxt; long ninmemory; long nfreed; HashMemoryChunk oldchunks; @@ -934,13 +944,16 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable, nbatch, hashtable->spaceUsed); #endif - oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); - if (hashtable->innerBatchFile == NULL) { + MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); + /* we had no file arrays before */ hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); + + MemoryContextSwitchTo(oldcxt); + /* time to establish the temp tablespaces, too */ PrepareTempTablespaces(); } @@ -951,8 +964,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch); } - MemoryContextSwitchTo(oldcxt); - hashtable->nbatch = nbatch; /* @@ -1024,7 +1035,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) Assert(batchno > curbatch); ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno]); + &hashtable->innerBatchFile[batchno], + hashtable); hashtable->spaceUsed -= hashTupleSize; nfreed++; @@ -1683,7 +1695,8 @@ ExecHashTableInsert(HashJoinTable hashtable, Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + &hashtable->innerBatchFile[batchno], + hashtable); } if (shouldFree) @@ -2664,7 +2677,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + &hashtable->innerBatchFile[batchno], + hashtable); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; @@ -3093,8 +3107,11 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) pstate->nbatch = nbatch; batches = dsa_get_address(hashtable->area, pstate->batches); - /* Use hash join memory context. */ - oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + /* + * Use hash join spill memory context to allocate accessors, including + * buffers for the temporary files. + */ + oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); /* Allocate this backend's accessor array. */ hashtable->nbatch = nbatch; @@ -3196,8 +3213,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) */ Assert(DsaPointerIsValid(pstate->batches)); - /* Use hash join memory context. */ - oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + /* + * Use hash join spill memory context to allocate accessors, including + * buffers for the temporary files. + */ + oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); /* Allocate this backend's accessor array. */ hashtable->nbatch = pstate->nbatch; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 615d9980cf..e40436db38 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -495,7 +495,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(mintuple, hashvalue, - &hashtable->outerBatchFile[batchno]); + &hashtable->outerBatchFile[batchno], + hashtable); if (shouldFree) heap_free_minimal_tuple(mintuple); @@ -1317,21 +1318,39 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * The data recorded in the file for each tuple is its hash value, * then the tuple in MinimalTuple format. * - * Note: it is important always to call this in the regular executor - * context, not in a shorter-lived context; else the temp file buffers - * will get messed up. + * fileptr points to a batch file in one of the the hashtable arrays. + * + * The batch files (and their buffers) are allocated in the spill context + * created for the hashtable. */ void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, - BufFile **fileptr) + BufFile **fileptr, HashJoinTable hashtable) { BufFile *file = *fileptr; + /* + * The batch file is lazily created. If this is the first tuple + * written to this batch, the batch file is created and its buffer is + * allocated in the spillCxt context, NOT in the batchCxt. + * + * During the build phase, buffered files are created for inner + * batches. Each batch's buffered file is closed (and its buffer freed) + * after the batch is loaded into memory during the outer side scan. + * Therefore, it is necessary to allocate the batch file buffer in a + * memory context which outlives the batch itself. + * + * Also, we use spillCxt instead of hashCxt for a better accounting of + * the spilling memory consumption. + */ if (file == NULL) { - /* First write to this batch file, so open it. */ + MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt); + file = BufFileCreateTemp(false); *fileptr = file; + + MemoryContextSwitchTo(oldctx); } BufFileWrite(file, &hashvalue, sizeof(uint32)); diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 0831249159..236be65f22 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -308,11 +308,15 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, { SharedTuplestoreParticipant *participant; char name[MAXPGPATH]; + MemoryContext oldcxt; /* Create one. Only this backend will write into it. */ sts_filename(name, accessor, accessor->participant); + + oldcxt = MemoryContextSwitchTo(accessor->context); accessor->write_file = BufFileCreateFileSet(&accessor->fileset->fs, name); + MemoryContextSwitchTo(oldcxt); /* Set up the shared state for this backend's file. */ participant = &accessor->sts->participants[accessor->participant]; @@ -527,11 +531,15 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) if (accessor->read_file == NULL) { char name[MAXPGPATH]; + MemoryContext oldcxt; sts_filename(name, accessor, accessor->read_participant); + + oldcxt = MemoryContextSwitchTo(accessor->context); accessor->read_file = BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, false); + MemoryContextSwitchTo(oldcxt); } /* Seek and load the chunk header. */ diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 8ee59d2c71..857ca58f6f 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -23,12 +23,12 @@ /* ---------------------------------------------------------------- * hash-join hash table structures * - * Each active hashjoin has a HashJoinTable control block, which is - * palloc'd in the executor's per-query context. All other storage needed - * for the hashjoin is kept in private memory contexts, two for each hashjoin. - * This makes it easy and fast to release the storage when we don't need it - * anymore. (Exception: data associated with the temp files lives in the - * per-query context too, since we always call buffile.c in that context.) + * Each active hashjoin has a HashJoinTable structure, which is + * palloc'd in the executor's per-query context. Other storage needed for + * each hashjoin is kept in child contexts, three for each hashjoin: + * - HashTableContext (hashCxt): the parent hash table storage context + * - HashSpillContext (spillCxt): storage for temp files buffers + * - HashBatchContext (batchCxt): storage for a batch in serial hash join * * The hashtable contexts are made children of the per-query context, ensuring * that they will be discarded at end of statement even if the join is @@ -36,9 +36,20 @@ * be cleaned up by the virtual file manager in event of an error.) * * Storage that should live through the entire join is allocated from the - * "hashCxt", while storage that is only wanted for the current batch is - * allocated in the "batchCxt". By resetting the batchCxt at the end of - * each batch, we free all the per-batch storage reliably and without tedium. + * "hashCxt" (mainly the hashtable's metadata). Also, the "hashCxt" context is + * the parent of "spillCxt" and "batchCxt". It makes it easy and fast to + * release the storage when we don't need it anymore. + * + * Data associated with temp files is allocated in the "spillCxt" context + * which lives for the duration of the entire join as batch files' + * creation and usage may span batch execution. These files are + * explicitly destroyed by calling BufFileClose() when the code is done + * with them. The aim of this context is to help accounting for the + * memory allocated for temp files and their buffers. + * + * Finally, data used only during a single batch's execution is allocated + * in the "batchCxt". By resetting the batchCxt at the end of each batch, + * we free all the per-batch storage reliably and without tedium. * * During first scan of inner relation, we get its tuples from executor. * If nbatch > 1 then tuples that don't belong in first batch get saved @@ -350,6 +361,7 @@ typedef struct HashJoinTableData MemoryContext hashCxt; /* context for whole-hash-join storage */ MemoryContext batchCxt; /* context for this-batch-only storage */ + MemoryContext spillCxt; /* context for spilling to temp files */ /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index d367070883..ccb704ede1 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -29,6 +29,6 @@ extern void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt); extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, - BufFile **fileptr); + BufFile **fileptr, HashJoinTable hashtable); #endif /* NODEHASHJOIN_H */