postgresql/src/backend/executor/nodeHash.c
Tom Lane 53e757689c Make NestLoop plan nodes pass outer-relation variables into their inner
relation using the general PARAM_EXEC executor parameter mechanism, rather
than the ad-hoc kluge of passing the outer tuple down through ExecReScan.
The previous method was hard to understand and could never be extended to
handle parameters coming from multiple join levels.  This patch doesn't
change the set of possible plans nor have any significant performance effect,
but it's necessary infrastructure for future generalization of the concept
of an inner indexscan plan.

ExecReScan's second parameter is now unused, so it's removed.
2010-07-12 17:01:06 +00:00

1325 lines
37 KiB
C

/*-------------------------------------------------------------------------
*
* nodeHash.c
* Routines to hash relations for hashjoin
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.130 2010/07/12 17:01:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUTINES
* MultiExecHash - generate an in-memory hash table of the relation
* ExecInitHash - initialize node and subnodes
* ExecEndHash - shutdown node and subnodes
*/
#include "postgres.h"
#include <math.h>
#include <limits.h>
#include "catalog/pg_statistic.h"
#include "commands/tablespace.h"
#include "executor/execdebug.h"
#include "executor/hashjoin.h"
#include "executor/instrument.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
#include "parser/parse_expr.h"
#include "utils/dynahash.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
int mcvsToUse);
static void ExecHashSkewTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue,
int bucketNumber);
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
*
* stub for pro forma compliance
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecHash(HashState *node)
{
elog(ERROR, "Hash node does not support ExecProcNode call convention");
return NULL;
}
/* ----------------------------------------------------------------
* MultiExecHash
*
* build hash table for hashjoin, doing partitioning if more
* than one batch is required.
* ----------------------------------------------------------------
*/
Node *
MultiExecHash(HashState *node)
{
PlanState *outerNode;
List *hashkeys;
HashJoinTable hashtable;
TupleTableSlot *slot;
ExprContext *econtext;
uint32 hashvalue;
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStartNode(node->ps.instrument);
/*
* get state info from node
*/
outerNode = outerPlanState(node);
hashtable = node->hashtable;
/*
* set expression context
*/
hashkeys = node->hashkeys;
econtext = node->ps.ps_ExprContext;
/*
* get all inner tuples and insert into the hash table (or temp files)
*/
for (;;)
{
slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
break;
/* We have to compute the hash value */
econtext->ecxt_innertuple = slot;
if (ExecHashGetHashValue(hashtable, econtext, hashkeys, false, false,
&hashvalue))
{
int bucketNumber;
bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
if (bucketNumber != INVALID_SKEW_BUCKET_NO)
{
/* It's a skew tuple, so put it into that hash table */
ExecHashSkewTableInsert(hashtable, slot, hashvalue,
bucketNumber);
}
else
{
/* Not subject to skew optimization, so insert normally */
ExecHashTableInsert(hashtable, slot, hashvalue);
}
hashtable->totalTuples += 1;
}
}
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, hashtable->totalTuples);
/*
* We do not return the hash table directly because it's not a subtype of
* Node, and so would violate the MultiExecProcNode API. Instead, our
* parent Hashjoin node is expected to know how to fish it out of our node
* state. Ugly but not really worth cleaning up, since Hashjoin knows
* quite a bit more about Hash besides that.
*/
return NULL;
}
/* ----------------------------------------------------------------
* ExecInitHash
*
* Init routine for Hash node
* ----------------------------------------------------------------
*/
HashState *
ExecInitHash(Hash *node, EState *estate, int eflags)
{
HashState *hashstate;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
/*
* create state structure
*/
hashstate = makeNode(HashState);
hashstate->ps.plan = (Plan *) node;
hashstate->ps.state = estate;
hashstate->hashtable = NULL;
hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &hashstate->ps);
/*
* initialize our result slot
*/
ExecInitResultTupleSlot(estate, &hashstate->ps);
/*
* initialize child expressions
*/
hashstate->ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist,
(PlanState *) hashstate);
hashstate->ps.qual = (List *)
ExecInitExpr((Expr *) node->plan.qual,
(PlanState *) hashstate);
/*
* initialize child nodes
*/
outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
/*
* initialize tuple type. no need to initialize projection info because
* this node doesn't do projections
*/
ExecAssignResultTypeFromTL(&hashstate->ps);
hashstate->ps.ps_ProjInfo = NULL;
return hashstate;
}
/* ---------------------------------------------------------------
* ExecEndHash
*
* clean up routine for Hash node
* ----------------------------------------------------------------
*/
void
ExecEndHash(HashState *node)
{
PlanState *outerPlan;
/*
* free exprcontext
*/
ExecFreeExprContext(&node->ps);
/*
* shut down the subplan
*/
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
}
/* ----------------------------------------------------------------
* ExecHashTableCreate
*
* create an empty hashtable data structure for hashjoin.
* ----------------------------------------------------------------
*/
HashJoinTable
ExecHashTableCreate(Hash *node, List *hashOperators)
{
HashJoinTable hashtable;
Plan *outerNode;
int nbuckets;
int nbatch;
int num_skew_mcvs;
int log2_nbuckets;
int nkeys;
int i;
ListCell *ho;
MemoryContext oldcxt;
/*
* Get information about the size of the relation to be hashed (it's the
* "outer" subtree of this node, but the inner relation of the hashjoin).
* Compute the appropriate size of the hash table.
*/
outerNode = outerPlan(node);
ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
OidIsValid(node->skewTable),
&nbuckets, &nbatch, &num_skew_mcvs);
#ifdef HJDEBUG
printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
#endif
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
Assert(nbuckets == (1 << log2_nbuckets));
/*
* Initialize the hash table control block.
*
* The hashtable control block is just palloc'd from the executor's
* per-query memory context.
*/
hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
hashtable->nbuckets = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->buckets = NULL;
hashtable->skewEnabled = false;
hashtable->skewBucket = NULL;
hashtable->skewBucketLen = 0;
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = work_mem * 1024L;
hashtable->spaceUsedSkew = 0;
hashtable->spaceAllowedSkew =
hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
/*
* Get info about the hash functions to be used for each hash key. Also
* remember whether the join operators are strict.
*/
nkeys = list_length(hashOperators);
hashtable->outer_hashfunctions =
(FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
hashtable->inner_hashfunctions =
(FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
i = 0;
foreach(ho, hashOperators)
{
Oid hashop = lfirst_oid(ho);
Oid left_hashfn;
Oid right_hashfn;
if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
elog(ERROR, "could not find hash function for hash operator %u",
hashop);
fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
hashtable->hashStrict[i] = op_strict(hashop);
i++;
}
/*
* Create temporary memory contexts in which to keep the hashtable working
* storage. See notes in executor/hashjoin.h.
*/
hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
"HashTableContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
"HashBatchContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* Allocate data that will live for the life of the hashjoin */
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
if (nbatch > 1)
{
/*
* allocate and initialize the file arrays in hashCxt
*/
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for them */
PrepareTempTablespaces();
}
/*
* Prepare context for the first-scan space allocations; allocate the
* hashbucket array therein, and set each bucket "empty".
*/
MemoryContextSwitchTo(hashtable->batchCxt);
hashtable->buckets = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));
/*
* Set up for skew optimization, if possible and there's a need for more
* than one batch. (In a one-batch join, there's no point in it.)
*/
if (nbatch > 1)
ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
MemoryContextSwitchTo(oldcxt);
return hashtable;
}
/*
* Compute appropriate size for hashtable given the estimated size of the
* relation to be hashed (number of rows and average row width).
*
* This is exported so that the planner's costsize.c can use it.
*/
/* Target bucket loading (tuples per bucket) */
#define NTUP_PER_BUCKET 10
void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs)
{
int tupsize;
double inner_rel_bytes;
long hash_table_bytes;
long skew_table_bytes;
long max_pointers;
int nbatch;
int nbuckets;
int i;
/* Force a plausible relation size if no info */
if (ntuples <= 0.0)
ntuples = 1000.0;
/*
* Estimate tupsize based on footprint of tuple in hashtable... note this
* does not allow for any palloc overhead. The manipulations of spaceUsed
* don't count palloc overhead either.
*/
tupsize = HJTUPLE_OVERHEAD +
MAXALIGN(sizeof(MinimalTupleData)) +
MAXALIGN(tupwidth);
inner_rel_bytes = ntuples * tupsize;
/*
* Target in-memory hashtable size is work_mem kilobytes.
*/
hash_table_bytes = work_mem * 1024L;
/*
* If skew optimization is possible, estimate the number of skew buckets
* that will fit in the memory allowed, and decrement the assumed space
* available for the main hash table accordingly.
*
* We make the optimistic assumption that each skew bucket will contain
* one inner-relation tuple. If that turns out to be low, we will recover
* at runtime by reducing the number of skew buckets.
*
* hashtable->skewBucket will have up to 8 times as many HashSkewBucket
* pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
* will round up to the next power of 2 and then multiply by 4 to reduce
* collisions.
*/
if (useskew)
{
skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
/*----------
* Divisor is:
* size of a hash tuple +
* worst-case size of skewBucket[] per MCV +
* size of skewBucketNums[] entry +
* size of skew bucket struct itself
*----------
*/
*num_skew_mcvs = skew_table_bytes / (tupsize +
(8 * sizeof(HashSkewBucket *)) +
sizeof(int) +
SKEW_BUCKET_OVERHEAD);
if (*num_skew_mcvs > 0)
hash_table_bytes -= skew_table_bytes;
}
else
*num_skew_mcvs = 0;
/*
* Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
* memory is filled. Set nbatch to the smallest power of 2 that appears
* sufficient. The Min() steps limit the results so that the pointer
* arrays we'll try to allocate do not exceed work_mem.
*/
max_pointers = (work_mem * 1024L) / sizeof(void *);
/* also ensure we avoid integer overflow in nbatch and nbuckets */
max_pointers = Min(max_pointers, INT_MAX / 2);
if (inner_rel_bytes > hash_table_bytes)
{
/* We'll need multiple batches */
long lbuckets;
double dbatch;
int minbatch;
lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
lbuckets = Min(lbuckets, max_pointers);
nbuckets = (int) lbuckets;
dbatch = ceil(inner_rel_bytes / hash_table_bytes);
dbatch = Min(dbatch, max_pointers);
minbatch = (int) dbatch;
nbatch = 2;
while (nbatch < minbatch)
nbatch <<= 1;
}
else
{
/* We expect the hashtable to fit in memory */
double dbuckets;
dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
dbuckets = Min(dbuckets, max_pointers);
nbuckets = (int) dbuckets;
nbatch = 1;
}
/*
* Both nbuckets and nbatch must be powers of 2 to make
* ExecHashGetBucketAndBatch fast. We already fixed nbatch; now inflate
* nbuckets to the next larger power of 2. We also force nbuckets to not
* be real small, by starting the search at 2^10.
*/
i = 10;
while ((1 << i) < nbuckets)
i++;
nbuckets = (1 << i);
*numbuckets = nbuckets;
*numbatches = nbatch;
}
/* ----------------------------------------------------------------
* ExecHashTableDestroy
*
* destroy a hash table
* ----------------------------------------------------------------
*/
void
ExecHashTableDestroy(HashJoinTable hashtable)
{
int i;
/*
* Make sure all the temp files are closed. We skip batch 0, since it
* can't have any temp files (and the arrays might not even exist if
* nbatch is only 1).
*/
for (i = 1; i < hashtable->nbatch; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
/* Release working memory (batchCxt is a child, so it goes away too) */
MemoryContextDelete(hashtable->hashCxt);
/* And drop the control block */
pfree(hashtable);
}
/*
* ExecHashIncreaseNumBatches
* increase the original number of batches in order to reduce
* current memory consumption
*/
static void
ExecHashIncreaseNumBatches(HashJoinTable hashtable)
{
int oldnbatch = hashtable->nbatch;
int curbatch = hashtable->curbatch;
int nbatch;
int i;
MemoryContext oldcxt;
long ninmemory;
long nfreed;
/* do nothing if we've decided to shut off growth */
if (!hashtable->growEnabled)
return;
/* safety check to avoid overflow */
if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
return;
nbatch = oldnbatch * 2;
Assert(nbatch > 1);
#ifdef HJDEBUG
printf("Increasing nbatch to %d because space = %lu\n",
nbatch, (unsigned long) hashtable->spaceUsed);
#endif
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
if (hashtable->innerBatchFile == NULL)
{
/* we had no file arrays before */
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
/* time to establish the temp tablespaces, too */
PrepareTempTablespaces();
}
else
{
/* enlarge arrays and zero out added entries */
hashtable->innerBatchFile = (BufFile **)
repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
MemSet(hashtable->innerBatchFile + oldnbatch, 0,
(nbatch - oldnbatch) * sizeof(BufFile *));
MemSet(hashtable->outerBatchFile + oldnbatch, 0,
(nbatch - oldnbatch) * sizeof(BufFile *));
}
MemoryContextSwitchTo(oldcxt);
hashtable->nbatch = nbatch;
/*
* Scan through the existing hash table entries and dump out any that are
* no longer of the current batch.
*/
ninmemory = nfreed = 0;
for (i = 0; i < hashtable->nbuckets; i++)
{
HashJoinTuple prevtuple;
HashJoinTuple tuple;
prevtuple = NULL;
tuple = hashtable->buckets[i];
while (tuple != NULL)
{
/* save link in case we delete */
HashJoinTuple nexttuple = tuple->next;
int bucketno;
int batchno;
ninmemory++;
ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
&bucketno, &batchno);
Assert(bucketno == i);
if (batchno == curbatch)
{
/* keep tuple */
prevtuple = tuple;
}
else
{
/* dump it out */
Assert(batchno > curbatch);
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
tuple->hashvalue,
&hashtable->innerBatchFile[batchno]);
/* and remove from hash table */
if (prevtuple)
prevtuple->next = nexttuple;
else
hashtable->buckets[i] = nexttuple;
/* prevtuple doesn't change */
hashtable->spaceUsed -=
HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
pfree(tuple);
nfreed++;
}
tuple = nexttuple;
}
}
#ifdef HJDEBUG
printf("Freed %ld of %ld tuples, space now %lu\n",
nfreed, ninmemory, (unsigned long) hashtable->spaceUsed);
#endif
/*
* If we dumped out either all or none of the tuples in the table, disable
* further expansion of nbatch. This situation implies that we have
* enough tuples of identical hashvalues to overflow spaceAllowed.
* Increasing nbatch will not fix it since there's no way to subdivide the
* group any more finely. We have to just gut it out and hope the server
* has enough RAM.
*/
if (nfreed == 0 || nfreed == ninmemory)
{
hashtable->growEnabled = false;
#ifdef HJDEBUG
printf("Disabling further increase of nbatch\n");
#endif
}
}
/*
* ExecHashTableInsert
* insert a tuple into the hash table depending on the hash value
* it may just go to a temp file for later batches
*
* Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
* tuple; the minimal case in particular is certain to happen while reloading
* tuples from batch files. We could save some cycles in the regular-tuple
* case by not forcing the slot contents into minimal form; not clear if it's
* worth the messiness required.
*/
void
ExecHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue)
{
MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
int bucketno;
int batchno;
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&bucketno, &batchno);
/*
* decide whether to put the tuple in the hash table or a temp file
*/
if (batchno == hashtable->curbatch)
{
/*
* put the tuple in hash table
*/
HashJoinTuple hashTuple;
int hashTupleSize;
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
hashTupleSize);
hashTuple->hashvalue = hashvalue;
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
hashtable->spaceUsed += hashTupleSize;
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
if (hashtable->spaceUsed > hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(tuple,
hashvalue,
&hashtable->innerBatchFile[batchno]);
}
}
/*
* ExecHashGetHashValue
* Compute the hash value for a tuple
*
* The tuple to be tested must be in either econtext->ecxt_outertuple or
* econtext->ecxt_innertuple. Vars in the hashkeys expressions reference
* either OUTER or INNER.
*
* A TRUE result means the tuple's hash value has been successfully computed
* and stored at *hashvalue. A FALSE result means the tuple cannot match
* because it contains a null attribute, and hence it should be discarded
* immediately. (If keep_nulls is true then FALSE is never returned.)
*/
bool
ExecHashGetHashValue(HashJoinTable hashtable,
ExprContext *econtext,
List *hashkeys,
bool outer_tuple,
bool keep_nulls,
uint32 *hashvalue)
{
uint32 hashkey = 0;
FmgrInfo *hashfunctions;
ListCell *hk;
int i = 0;
MemoryContext oldContext;
/*
* We reset the eval context each time to reclaim any memory leaked in the
* hashkey expressions.
*/
ResetExprContext(econtext);
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
if (outer_tuple)
hashfunctions = hashtable->outer_hashfunctions;
else
hashfunctions = hashtable->inner_hashfunctions;
foreach(hk, hashkeys)
{
ExprState *keyexpr = (ExprState *) lfirst(hk);
Datum keyval;
bool isNull;
/* rotate hashkey left 1 bit at each step */
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
/*
* Get the join attribute value of the tuple
*/
keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
/*
* If the attribute is NULL, and the join operator is strict, then
* this tuple cannot pass the join qual so we can reject it
* immediately (unless we're scanning the outside of an outer join, in
* which case we must not reject it). Otherwise we act like the
* hashcode of NULL is zero (this will support operators that act like
* IS NOT DISTINCT, though not any more-random behavior). We treat
* the hash support function as strict even if the operator is not.
*
* Note: currently, all hashjoinable operators must be strict since
* the hash index AM assumes that. However, it takes so little extra
* code here to allow non-strict that we may as well do it.
*/
if (isNull)
{
if (hashtable->hashStrict[i] && !keep_nulls)
{
MemoryContextSwitchTo(oldContext);
return false; /* cannot match */
}
/* else, leave hashkey unmodified, equivalent to hashcode 0 */
}
else
{
/* Compute the hash function */
uint32 hkey;
hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
hashkey ^= hkey;
}
i++;
}
MemoryContextSwitchTo(oldContext);
*hashvalue = hashkey;
return true;
}
/*
* ExecHashGetBucketAndBatch
* Determine the bucket number and batch number for a hash value
*
* Note: on-the-fly increases of nbatch must not change the bucket number
* for a given hash code (since we don't move tuples to different hash
* chains), and must only cause the batch number to remain the same or
* increase. Our algorithm is
* bucketno = hashvalue MOD nbuckets
* batchno = (hashvalue DIV nbuckets) MOD nbatch
* where nbuckets and nbatch are both expected to be powers of 2, so we can
* do the computations by shifting and masking. (This assumes that all hash
* functions are good about randomizing all their output bits, else we are
* likely to have very skewed bucket or batch occupancy.)
*
* nbuckets doesn't change over the course of the join.
*
* nbatch is always a power of 2; we increase it only by doubling it. This
* effectively adds one more bit to the top of the batchno.
*/
void
ExecHashGetBucketAndBatch(HashJoinTable hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno)
{
uint32 nbuckets = (uint32) hashtable->nbuckets;
uint32 nbatch = (uint32) hashtable->nbatch;
if (nbatch > 1)
{
/* we can do MOD by masking, DIV by shifting */
*bucketno = hashvalue & (nbuckets - 1);
*batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
}
else
{
*bucketno = hashvalue & (nbuckets - 1);
*batchno = 0;
}
}
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
*
* The current outer tuple must be stored in econtext->ecxt_outertuple.
*/
HashJoinTuple
ExecScanHashBucket(HashJoinState *hjstate,
ExprContext *econtext)
{
List *hjclauses = hjstate->hashclauses;
HashJoinTable hashtable = hjstate->hj_HashTable;
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
uint32 hashvalue = hjstate->hj_CurHashValue;
/*
* hj_CurTuple is the address of the tuple last returned from the current
* bucket, or NULL if it's time to start scanning a new bucket.
*
* If the tuple hashed to a skew bucket then scan the skew bucket
* otherwise scan the standard hashtable bucket.
*/
if (hashTuple != NULL)
hashTuple = hashTuple->next;
else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
else
hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
while (hashTuple != NULL)
{
if (hashTuple->hashvalue == hashvalue)
{
TupleTableSlot *inntuple;
/* insert hashtable's tuple into exec slot so ExecQual sees it */
inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
hjstate->hj_HashTupleSlot,
false); /* do not pfree */
econtext->ecxt_innertuple = inntuple;
/* reset temp memory each time to avoid leaks from qual expr */
ResetExprContext(econtext);
if (ExecQual(hjclauses, econtext, false))
{
hjstate->hj_CurTuple = hashTuple;
return hashTuple;
}
}
hashTuple = hashTuple->next;
}
/*
* no match
*/
return NULL;
}
/*
* ExecHashTableReset
*
* reset hash table header for new batch
*/
void
ExecHashTableReset(HashJoinTable hashtable)
{
MemoryContext oldcxt;
int nbuckets = hashtable->nbuckets;
/*
* Release all the hash buckets and tuples acquired in the prior pass, and
* reinitialize the context for a new pass.
*/
MemoryContextReset(hashtable->batchCxt);
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
/* Reallocate and reinitialize the hash bucket headers. */
hashtable->buckets = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));
hashtable->spaceUsed = 0;
MemoryContextSwitchTo(oldcxt);
}
void
ExecReScanHash(HashState *node)
{
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);
}
/*
* ExecHashBuildSkewHash
*
* Set up for skew optimization if we can identify the most common values
* (MCVs) of the outer relation's join key. We make a skew hash bucket
* for the hash value of each MCV, up to the number of slots allowed
* based on available memory.
*/
static void
ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
{
HeapTupleData *statsTuple;
Datum *values;
int nvalues;
float4 *numbers;
int nnumbers;
/* Do nothing if planner didn't identify the outer relation's join key */
if (!OidIsValid(node->skewTable))
return;
/* Also, do nothing if we don't have room for at least one skew bucket */
if (mcvsToUse <= 0)
return;
/*
* Try to find the MCV statistics for the outer relation's join key.
*/
statsTuple = SearchSysCache3(STATRELATTINH,
ObjectIdGetDatum(node->skewTable),
Int16GetDatum(node->skewColumn),
BoolGetDatum(node->skewInherit));
if (!HeapTupleIsValid(statsTuple))
return;
if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
STATISTIC_KIND_MCV, InvalidOid,
NULL,
&values, &nvalues,
&numbers, &nnumbers))
{
double frac;
int nbuckets;
FmgrInfo *hashfunctions;
int i;
if (mcvsToUse > nvalues)
mcvsToUse = nvalues;
/*
* Calculate the expected fraction of outer relation that will
* participate in the skew optimization. If this isn't at least
* SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
*/
frac = 0;
for (i = 0; i < mcvsToUse; i++)
frac += numbers[i];
if (frac < SKEW_MIN_OUTER_FRACTION)
{
free_attstatsslot(node->skewColType,
values, nvalues, numbers, nnumbers);
ReleaseSysCache(statsTuple);
return;
}
/*
* Okay, set up the skew hashtable.
*
* skewBucket[] is an open addressing hashtable with a power of 2 size
* that is greater than the number of MCV values. (This ensures there
* will be at least one null entry, so searches will always
* terminate.)
*
* Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
* MaxAllocSize/sizeof(void *)/8, but that is not currently possible
* since we limit pg_statistic entries to much less than that.
*/
nbuckets = 2;
while (nbuckets <= mcvsToUse)
nbuckets <<= 1;
/* use two more bits just to help avoid collisions */
nbuckets <<= 2;
hashtable->skewEnabled = true;
hashtable->skewBucketLen = nbuckets;
/*
* We allocate the bucket memory in the hashtable's batch context. It
* is only needed during the first batch, and this ensures it will be
* automatically removed once the first batch is done.
*/
hashtable->skewBucket = (HashSkewBucket **)
MemoryContextAllocZero(hashtable->batchCxt,
nbuckets * sizeof(HashSkewBucket *));
hashtable->skewBucketNums = (int *)
MemoryContextAllocZero(hashtable->batchCxt,
mcvsToUse * sizeof(int));
hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
/*
* Create a skew bucket for each MCV hash value.
*
* Note: it is very important that we create the buckets in order of
* decreasing MCV frequency. If we have to remove some buckets, they
* must be removed in reverse order of creation (see notes in
* ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
* be removed first.
*/
hashfunctions = hashtable->outer_hashfunctions;
for (i = 0; i < mcvsToUse; i++)
{
uint32 hashvalue;
int bucket;
hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
values[i]));
/*
* While we have not hit a hole in the hashtable and have not hit
* the desired bucket, we have collided with some previous hash
* value, so try the next bucket location. NB: this code must
* match ExecHashGetSkewBucket.
*/
bucket = hashvalue & (nbuckets - 1);
while (hashtable->skewBucket[bucket] != NULL &&
hashtable->skewBucket[bucket]->hashvalue != hashvalue)
bucket = (bucket + 1) & (nbuckets - 1);
/*
* If we found an existing bucket with the same hashvalue, leave
* it alone. It's okay for two MCVs to share a hashvalue.
*/
if (hashtable->skewBucket[bucket] != NULL)
continue;
/* Okay, create a new skew bucket for this hashvalue. */
hashtable->skewBucket[bucket] = (HashSkewBucket *)
MemoryContextAlloc(hashtable->batchCxt,
sizeof(HashSkewBucket));
hashtable->skewBucket[bucket]->hashvalue = hashvalue;
hashtable->skewBucket[bucket]->tuples = NULL;
hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
}
free_attstatsslot(node->skewColType,
values, nvalues, numbers, nnumbers);
}
ReleaseSysCache(statsTuple);
}
/*
* ExecHashGetSkewBucket
*
* Returns the index of the skew bucket for this hashvalue,
* or INVALID_SKEW_BUCKET_NO if the hashvalue is not
* associated with any active skew bucket.
*/
int
ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
{
int bucket;
/*
* Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
* particular, this happens after the initial batch is done).
*/
if (!hashtable->skewEnabled)
return INVALID_SKEW_BUCKET_NO;
/*
* Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
*/
bucket = hashvalue & (hashtable->skewBucketLen - 1);
/*
* While we have not hit a hole in the hashtable and have not hit the
* desired bucket, we have collided with some other hash value, so try the
* next bucket location.
*/
while (hashtable->skewBucket[bucket] != NULL &&
hashtable->skewBucket[bucket]->hashvalue != hashvalue)
bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
/*
* Found the desired bucket?
*/
if (hashtable->skewBucket[bucket] != NULL)
return bucket;
/*
* There must not be any hashtable entry for this hash value.
*/
return INVALID_SKEW_BUCKET_NO;
}
/*
* ExecHashSkewTableInsert
*
* Insert a tuple into the skew hashtable.
*
* This should generally match up with the current-batch case in
* ExecHashTableInsert.
*/
static void
ExecHashSkewTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue,
int bucketNumber)
{
MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
HashJoinTuple hashTuple;
int hashTupleSize;
/* Create the HashJoinTuple */
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
hashTupleSize);
hashTuple->hashvalue = hashvalue;
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
/* Push it onto the front of the skew bucket's list */
hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
/* Check we are not over the total spaceAllowed, either */
if (hashtable->spaceUsed > hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
/*
* ExecHashRemoveNextSkewBucket
*
* Remove the least valuable skew bucket by pushing its tuples into
* the main hash table.
*/
static void
ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
{
int bucketToRemove;
HashSkewBucket *bucket;
uint32 hashvalue;
int bucketno;
int batchno;
HashJoinTuple hashTuple;
/* Locate the bucket to remove */
bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
bucket = hashtable->skewBucket[bucketToRemove];
/*
* Calculate which bucket and batch the tuples belong to in the main
* hashtable. They all have the same hash value, so it's the same for all
* of them. Also note that it's not possible for nbatch to increase while
* we are processing the tuples.
*/
hashvalue = bucket->hashvalue;
ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
/* Process all tuples in the bucket */
hashTuple = bucket->tuples;
while (hashTuple != NULL)
{
HashJoinTuple nextHashTuple = hashTuple->next;
MinimalTuple tuple;
Size tupleSize;
/*
* This code must agree with ExecHashTableInsert. We do not use
* ExecHashTableInsert directly as ExecHashTableInsert expects a
* TupleTableSlot while we already have HashJoinTuples.
*/
tuple = HJTUPLE_MINTUPLE(hashTuple);
tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
/* Decide whether to put the tuple in the hash table or a temp file */
if (batchno == hashtable->curbatch)
{
/* Move the tuple to the main hash table */
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
/* We have reduced skew space, but overall space doesn't change */
hashtable->spaceUsedSkew -= tupleSize;
}
else
{
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(tuple, hashvalue,
&hashtable->innerBatchFile[batchno]);
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
hashtable->spaceUsedSkew -= tupleSize;
}
hashTuple = nextHashTuple;
}
/*
* Free the bucket struct itself and reset the hashtable entry to NULL.
*
* NOTE: this is not nearly as simple as it looks on the surface, because
* of the possibility of collisions in the hashtable. Suppose that hash
* values A and B collide at a particular hashtable entry, and that A was
* entered first so B gets shifted to a different table entry. If we were
* to remove A first then ExecHashGetSkewBucket would mistakenly start
* reporting that B is not in the hashtable, because it would hit the NULL
* before finding B. However, we always remove entries in the reverse
* order of creation, so this failure cannot happen.
*/
hashtable->skewBucket[bucketToRemove] = NULL;
hashtable->nSkewBuckets--;
pfree(bucket);
hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
/*
* If we have removed all skew buckets then give up on skew optimization.
* Release the arrays since they aren't useful any more.
*/
if (hashtable->nSkewBuckets == 0)
{
hashtable->skewEnabled = false;
pfree(hashtable->skewBucket);
pfree(hashtable->skewBucketNums);
hashtable->skewBucket = NULL;
hashtable->skewBucketNums = NULL;
hashtable->spaceUsed -= hashtable->spaceUsedSkew;
hashtable->spaceUsedSkew = 0;
}
}