diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index b36a2ba405..4589da32bc 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -6,7 +6,7 @@ * Copyright (c) 1994, Regents of the University of California * * - * $Id: nodeHash.c,v 1.34 1999/05/09 00:53:20 tgl Exp $ + * $Id: nodeHash.c,v 1.35 1999/05/18 21:33:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -22,11 +22,6 @@ #include #include #include -#include -#include -#include - -#include #include "postgres.h" #include "miscadmin.h" @@ -34,17 +29,12 @@ #include "executor/executor.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" -#include "storage/ipc.h" #include "utils/hsearch.h" +#include "utils/portal.h" -extern int NBuffers; +extern int SortMem; static int hashFunc(Datum key, int len, bool byVal); -static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable); -static void * absHashTableAlloc(int size, HashJoinTable hashtable); -static void ExecHashOverflowInsert(HashJoinTable hashtable, - HashBucket bucket, - HeapTuple heapTuple); /* ---------------------------------------------------------------- * ExecHash @@ -63,11 +53,7 @@ ExecHash(Hash *node) HashJoinTable hashtable; TupleTableSlot *slot; ExprContext *econtext; - int nbatch; - File *batches = NULL; - RelativeAddr *batchPos; - int *batchSizes; int i; /* ---------------- @@ -79,27 +65,25 @@ ExecHash(Hash *node) estate = node->plan.state; outerNode = outerPlan(node); - hashtable = node->hashtable; + hashtable = hashstate->hashtable; if (hashtable == NULL) elog(ERROR, "ExecHash: hash table is NULL."); nbatch = hashtable->nbatch; if (nbatch > 0) - { /* if needs hash partition */ - /* -------------- - * allocate space for the file descriptors of batch files - * then open the batch files in the current processes. - * -------------- + { + /* ---------------- + * Open temp files for inner batches, if needed. + * Note that file buffers are palloc'd in regular executor context. + * ---------------- */ - batches = (File *) palloc(nbatch * sizeof(File)); for (i = 0; i < nbatch; i++) { - batches[i] = OpenTemporaryFile(); + File tfile = OpenTemporaryFile(); + Assert(tfile >= 0); + hashtable->innerBatchFile[i] = BufFileCreate(tfile); } - hashstate->hashBatches = batches; - batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos); - batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes); } /* ---------------- @@ -110,7 +94,7 @@ ExecHash(Hash *node) econtext = hashstate->cstate.cs_ExprContext; /* ---------------- - * get tuple and insert into the hash table + * get all inner tuples and insert into the hash table (or temp files) * ---------------- */ for (;;) @@ -118,26 +102,11 @@ ExecHash(Hash *node) slot = ExecProcNode(outerNode, (Plan *) node); if (TupIsNull(slot)) break; - econtext->ecxt_innertuple = slot; - ExecHashTableInsert(hashtable, econtext, hashkey, - hashstate->hashBatches); - + ExecHashTableInsert(hashtable, econtext, hashkey); ExecClearTuple(slot); } - /* - * end of build phase, flush all the last pages of the batches. - */ - for (i = 0; i < nbatch; i++) - { - if (FileSeek(batches[i], 0L, SEEK_END) < 0) - perror("FileSeek"); - if (FileWrite(batches[i], ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ) < 0) - perror("FileWrite"); - NDirectFileWrite++; - } - /* --------------------- * Return the slot so that we have the tuple descriptor * when we need to save/restore them. -Jeff 11 July 1991 @@ -173,10 +142,10 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent) */ hashstate = makeNode(HashState); node->hashstate = hashstate; - hashstate->hashBatches = NULL; + hashstate->hashtable = NULL; /* ---------------- - * Miscellanious initialization + * Miscellaneous initialization * * + assign node's base_id * + assign debugging hooks and @@ -186,7 +155,6 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent) ExecAssignNodeBaseInfo(estate, &hashstate->cstate, parent); ExecAssignExprContext(estate, &hashstate->cstate); -#define HASH_NSLOTS 1 /* ---------------- * initialize our result slot * ---------------- @@ -214,6 +182,7 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent) int ExecCountSlotsHash(Hash *node) { +#define HASH_NSLOTS 1 return ExecCountSlotsNode(outerPlan(node)) + ExecCountSlotsNode(innerPlan(node)) + HASH_NSLOTS; @@ -230,16 +199,12 @@ ExecEndHash(Hash *node) { HashState *hashstate; Plan *outerPlan; - File *batches; /* ---------------- * get info from the hash state * ---------------- */ hashstate = node->hashstate; - batches = hashstate->hashBatches; - if (batches != NULL) - pfree(batches); /* ---------------- * free projection info. no need to free result type info @@ -256,21 +221,6 @@ ExecEndHash(Hash *node) ExecEndNode(outerPlan, (Plan *) node); } -static RelativeAddr -hashTableAlloc(int size, HashJoinTable hashtable) -{ - RelativeAddr p = hashtable->top; - hashtable->top += MAXALIGN(size); - return p; -} - -static void * -absHashTableAlloc(int size, HashJoinTable hashtable) -{ - RelativeAddr p = hashTableAlloc(size, hashtable); - return ABSADDR(p); -} - /* ---------------------------------------------------------------- * ExecHashTableCreate @@ -285,22 +235,19 @@ HashJoinTable ExecHashTableCreate(Hash *node) { Plan *outerNode; - int HashTBSize; - int nbatch; int ntuples; int tupsize; - int pages; - int sqrtpages; - IpcMemoryId shmid; + double inner_rel_bytes; + double hash_table_bytes; + int nbatch; HashJoinTable hashtable; - HashBucket bucket; int nbuckets; int totalbuckets; int bucketsize; int i; - RelativeAddr *outerbatchPos; - RelativeAddr *innerbatchPos; - int *innerbatchSizes; + Portal myPortal; + char myPortalName[64]; + MemoryContext oldcxt; /* ---------------- * Get information about the size of the relation to be hashed @@ -314,38 +261,48 @@ ExecHashTableCreate(Hash *node) ntuples = outerNode->plan_size; if (ntuples <= 0) /* force a plausible size if no info */ ntuples = 1000; - tupsize = outerNode->plan_width + sizeof(HeapTupleData); - pages = (int) ceil((double) ntuples * tupsize * FUDGE_FAC / BLCKSZ); + /* estimate tupsize based on footprint of tuple in hashtable... + * but what about palloc overhead? + */ + tupsize = MAXALIGN(outerNode->plan_width) + + MAXALIGN(sizeof(HashJoinTupleData)); + inner_rel_bytes = (double) ntuples * tupsize * FUDGE_FAC; /* - * Max hashtable size is NBuffers pages, but not less than + * Target hashtable size is SortMem kilobytes, but not less than * sqrt(estimated inner rel size), so as to avoid horrible performance. - * XXX since the hashtable is not allocated in shared mem anymore, - * it would probably be more appropriate to drive this from -S than -B. */ - sqrtpages = (int) ceil(sqrt((double) pages)); - HashTBSize = NBuffers; - if (sqrtpages > HashTBSize) - HashTBSize = sqrtpages; + hash_table_bytes = sqrt(inner_rel_bytes); + if (hash_table_bytes < (SortMem * 1024L)) + hash_table_bytes = SortMem * 1024L; /* * Count the number of hash buckets we want for the whole relation, - * and the number we can actually fit in the allowed memory. - * NOTE: FUDGE_FAC here determines the fraction of the hashtable space - * saved for overflow records. Need a better approach... + * for an average bucket load of NTUP_PER_BUCKET (per virtual bucket!). */ - totalbuckets = (int) ceil((double) ntuples / NTUP_PER_BUCKET); - bucketsize = MAXALIGN(NTUP_PER_BUCKET * tupsize + sizeof(*bucket)); - nbuckets = (int) ((HashTBSize * BLCKSZ) / (bucketsize * FUDGE_FAC)); + totalbuckets = (int) ceil((double) ntuples * FUDGE_FAC / NTUP_PER_BUCKET); + + /* + * Count the number of buckets we think will actually fit in the + * target memory size, at a loading of NTUP_PER_BUCKET (physical buckets). + * NOTE: FUDGE_FAC here determines the fraction of the hashtable space + * reserved to allow for nonuniform distribution of hash values. + * Perhaps this should be a different number from the other uses of + * FUDGE_FAC, but since we have no real good way to pick either one... + */ + bucketsize = NTUP_PER_BUCKET * tupsize; + nbuckets = (int) (hash_table_bytes / (bucketsize * FUDGE_FAC)); + if (nbuckets <= 0) + nbuckets = 1; if (totalbuckets <= nbuckets) { /* We have enough space, so no batching. In theory we could - * even reduce HashTBSize, but as long as we don't have a way - * to deal with overflow-space overrun, best to leave the - * extra space available for overflow. + * even reduce nbuckets, but since that could lead to poor + * behavior if estimated ntuples is much less than reality, + * it seems better to make more buckets instead of fewer. */ - nbuckets = totalbuckets; + totalbuckets = nbuckets; nbatch = 0; } else @@ -356,7 +313,8 @@ ExecHashTableCreate(Hash *node) * of groups we will use for the part of the data that doesn't * fall into the first nbuckets hash buckets. */ - nbatch = (int) ceil((double) (pages - HashTBSize) / HashTBSize); + nbatch = (int) ceil((inner_rel_bytes - hash_table_bytes) / + hash_table_bytes); if (nbatch <= 0) nbatch = 1; } @@ -374,89 +332,116 @@ ExecHashTableCreate(Hash *node) #endif /* ---------------- - * in non-parallel machines, we don't need to put the hash table - * in the shared memory. We just palloc it. The space needed - * is the hash area itself plus nbatch+1 I/O buffer pages. - * ---------------- - */ - hashtable = (HashJoinTable) palloc((HashTBSize + nbatch + 1) * BLCKSZ); - shmid = 0; - - if (hashtable == NULL) - elog(ERROR, "not enough memory for hashjoin."); - /* ---------------- - * initialize the hash table header + * Initialize the hash table control block. + * The hashtable control block is just palloc'd from executor memory. * ---------------- */ + hashtable = (HashJoinTable) palloc(sizeof(HashTableData)); hashtable->nbuckets = nbuckets; hashtable->totalbuckets = totalbuckets; - hashtable->bucketsize = bucketsize; - hashtable->shmid = shmid; - hashtable->top = MAXALIGN(sizeof(HashTableData)); - hashtable->bottom = HashTBSize * BLCKSZ; - /* - * hashtable->readbuf has to be maxaligned!!! - * Note there are nbatch additional pages available after readbuf; - * these are used for buffering the outgoing batch data. - */ - hashtable->readbuf = hashtable->bottom; - hashtable->batch = hashtable->bottom + BLCKSZ; + hashtable->buckets = NULL; hashtable->nbatch = nbatch; hashtable->curbatch = 0; - hashtable->pcount = hashtable->nprocess = 0; + hashtable->innerBatchFile = NULL; + hashtable->outerBatchFile = NULL; + hashtable->innerBatchSize = NULL; + hashtable->outerBatchSize = NULL; + + /* ---------------- + * Create a named portal in which to keep the hashtable working storage. + * Each hashjoin must have its own portal, so be wary of name conflicts. + * ---------------- + */ + i = 0; + do { + i++; + sprintf(myPortalName, "", i); + myPortal = GetPortalByName(myPortalName); + } while (PortalIsValid(myPortal)); + myPortal = CreatePortal(myPortalName); + Assert(PortalIsValid(myPortal)); + hashtable->myPortal = (void*) myPortal; /* kluge for circular includes */ + hashtable->hashCxt = (MemoryContext) PortalGetVariableMemory(myPortal); + hashtable->batchCxt = (MemoryContext) PortalGetHeapMemory(myPortal); + + /* Allocate data that will live for the life of the hashjoin */ + + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + if (nbatch > 0) { /* --------------- - * allocate and initialize the outer batches + * allocate and initialize the file arrays in hashCxt * --------------- */ - outerbatchPos = (RelativeAddr *) - absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable); + hashtable->innerBatchFile = (BufFile **) + palloc(nbatch * sizeof(BufFile *)); + hashtable->outerBatchFile = (BufFile **) + palloc(nbatch * sizeof(BufFile *)); + hashtable->innerBatchSize = (long *) + palloc(nbatch * sizeof(long)); + hashtable->outerBatchSize = (long *) + palloc(nbatch * sizeof(long)); for (i = 0; i < nbatch; i++) { - outerbatchPos[i] = -1; + hashtable->innerBatchFile[i] = NULL; + hashtable->outerBatchFile[i] = NULL; + hashtable->innerBatchSize[i] = 0; + hashtable->outerBatchSize[i] = 0; } - hashtable->outerbatchPos = RELADDR(outerbatchPos); - /* --------------- - * allocate and initialize the inner batches - * --------------- - */ - innerbatchPos = (RelativeAddr *) - absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable); - innerbatchSizes = (int *) - absHashTableAlloc(nbatch * sizeof(int), hashtable); - for (i = 0; i < nbatch; i++) - { - innerbatchPos[i] = -1; - innerbatchSizes[i] = 0; - } - hashtable->innerbatchPos = RELADDR(innerbatchPos); - hashtable->innerbatchSizes = RELADDR(innerbatchSizes); - } - else - { - hashtable->outerbatchPos = (RelativeAddr) NULL; - hashtable->innerbatchPos = (RelativeAddr) NULL; - hashtable->innerbatchSizes = (RelativeAddr) NULL; + /* The files will not be opened until later... */ } - hashtable->overflownext = hashtable->top + bucketsize * nbuckets; - Assert(hashtable->overflownext < hashtable->bottom); - /* ---------------- - * initialize each hash bucket - * ---------------- + /* Prepare portal for the first-scan space allocations; + * allocate the hashbucket array therein, and set each bucket "empty". */ - bucket = (HashBucket) ABSADDR(hashtable->top); + MemoryContextSwitchTo(hashtable->batchCxt); + StartPortalAllocMode(DefaultAllocMode, 0); + + hashtable->buckets = (HashJoinTuple *) + palloc(nbuckets * sizeof(HashJoinTuple)); + + if (hashtable->buckets == NULL) + elog(ERROR, "Insufficient memory for hash table."); + for (i = 0; i < nbuckets; i++) { - bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket))); - bucket->bottom = bucket->top; - bucket->firstotuple = bucket->lastotuple = -1; - bucket = (HashBucket) ((char *) bucket + bucketsize); + hashtable->buckets[i] = NULL; } + + MemoryContextSwitchTo(oldcxt); + return hashtable; } +/* ---------------------------------------------------------------- + * ExecHashTableDestroy + * + * destroy a hash table + * ---------------------------------------------------------------- + */ +void +ExecHashTableDestroy(HashJoinTable hashtable) +{ + int i; + + /* Make sure all the temp files are closed */ + for (i = 0; i < hashtable->nbatch; i++) + { + if (hashtable->innerBatchFile[i]) + BufFileClose(hashtable->innerBatchFile[i]); + if (hashtable->outerBatchFile[i]) + BufFileClose(hashtable->outerBatchFile[i]); + } + + /* Destroy the portal to release all working memory */ + /* cast here is a kluge for circular includes... */ + PortalDestroy((Portal*) & hashtable->myPortal); + + /* And drop the control block */ + pfree(hashtable); +} + /* ---------------------------------------------------------------- * ExecHashTableInsert * @@ -467,32 +452,11 @@ ExecHashTableCreate(Hash *node) void ExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext, - Var *hashkey, - File *batches) + Var *hashkey) { - TupleTableSlot *slot; - HeapTuple heapTuple; - HashBucket bucket; - int bucketno; - int nbatch; - int batchno; - char *buffer; - RelativeAddr *batchPos; - int *batchSizes; - char *pos; - - nbatch = hashtable->nbatch; - batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos); - batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes); - - slot = econtext->ecxt_innertuple; - heapTuple = slot->val; - -#ifdef HJDEBUG - printf("Inserting "); -#endif - - bucketno = ExecHashGetBucket(hashtable, econtext, hashkey); + int bucketno = ExecHashGetBucket(hashtable, econtext, hashkey); + TupleTableSlot *slot = econtext->ecxt_innertuple; + HeapTuple heapTuple = slot->val; /* ---------------- * decide whether to put the tuple in the hash table or a tmp file @@ -504,22 +468,24 @@ ExecHashTableInsert(HashJoinTable hashtable, * put the tuple in hash table * --------------- */ - bucket = (HashBucket) - (ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize); - if (((char *) MAXALIGN(ABSADDR(bucket->bottom)) - (char *) bucket) - + heapTuple->t_len + HEAPTUPLESIZE > hashtable->bucketsize) - ExecHashOverflowInsert(hashtable, bucket, heapTuple); - else - { - memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)), - heapTuple, - HEAPTUPLESIZE); - memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)) + HEAPTUPLESIZE, - heapTuple->t_data, - heapTuple->t_len); - bucket->bottom = ((RelativeAddr) MAXALIGN(bucket->bottom) + - heapTuple->t_len + HEAPTUPLESIZE); - } + HashJoinTuple hashTuple; + int hashTupleSize; + + hashTupleSize = MAXALIGN(sizeof(*hashTuple)) + heapTuple->t_len; + hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, + hashTupleSize); + if (hashTuple == NULL) + elog(ERROR, "Insufficient memory for hash table."); + memcpy((char *) & hashTuple->htup, + (char *) heapTuple, + sizeof(hashTuple->htup)); + hashTuple->htup.t_data = (HeapTupleHeader) + (((char *) hashTuple) + MAXALIGN(sizeof(*hashTuple))); + memcpy((char *) hashTuple->htup.t_data, + (char *) heapTuple->t_data, + heapTuple->t_len); + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; } else { @@ -527,31 +493,14 @@ ExecHashTableInsert(HashJoinTable hashtable, * put the tuple into a tmp file for other batches * ----------------- */ - batchno = (nbatch * (bucketno - hashtable->nbuckets)) / + int batchno = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) / (hashtable->totalbuckets - hashtable->nbuckets); - buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ; - batchSizes[batchno]++; - pos = (char *) - ExecHashJoinSaveTuple(heapTuple, - buffer, - batches[batchno], - (char *) ABSADDR(batchPos[batchno])); - batchPos[batchno] = RELADDR(pos); + hashtable->innerBatchSize[batchno]++; + ExecHashJoinSaveTuple(heapTuple, + hashtable->innerBatchFile[batchno]); } } -/* ---------------------------------------------------------------- - * ExecHashTableDestroy - * - * destroy a hash table - * ---------------------------------------------------------------- - */ -void -ExecHashTableDestroy(HashJoinTable hashtable) -{ - pfree(hashtable); -} - /* ---------------------------------------------------------------- * ExecHashGetBucket * @@ -567,12 +516,12 @@ ExecHashGetBucket(HashJoinTable hashtable, Datum keyval; bool isNull; - /* ---------------- * Get the join attribute value of the tuple - * ---------------- + * * ...It's quick hack - use ExecEvalExpr instead of ExecEvalVar: * hashkey may be T_ArrayRef, not just T_Var. - vadim 04/22/97 + * ---------------- */ keyval = ExecEvalExpr((Node *) hashkey, econtext, &isNull, NULL); @@ -603,62 +552,6 @@ ExecHashGetBucket(HashJoinTable hashtable, return bucketno; } -/* ---------------------------------------------------------------- - * ExecHashOverflowInsert - * - * insert into the overflow area of a hash bucket - * ---------------------------------------------------------------- - */ -static void -ExecHashOverflowInsert(HashJoinTable hashtable, - HashBucket bucket, - HeapTuple heapTuple) -{ - OverflowTuple otuple; - RelativeAddr newend; - OverflowTuple firstotuple; - OverflowTuple lastotuple; - - firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple); - lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple); - /* ---------------- - * see if we run out of overflow space - * ---------------- - */ - newend = (RelativeAddr) MAXALIGN(hashtable->overflownext + sizeof(*otuple) - + heapTuple->t_len + HEAPTUPLESIZE); - if (newend > hashtable->bottom) - elog(ERROR, - "hash table out of memory. Use -B parameter to increase buffers."); - - /* ---------------- - * establish the overflow chain - * ---------------- - */ - otuple = (OverflowTuple) ABSADDR(hashtable->overflownext); - hashtable->overflownext = newend; - if (firstotuple == NULL) - bucket->firstotuple = bucket->lastotuple = RELADDR(otuple); - else - { - lastotuple->next = RELADDR(otuple); - bucket->lastotuple = RELADDR(otuple); - } - - /* ---------------- - * copy the tuple into the overflow area - * ---------------- - */ - otuple->next = -1; - otuple->tuple = RELADDR(MAXALIGN(((char *) otuple + sizeof(*otuple)))); - memmove(ABSADDR(otuple->tuple), - heapTuple, - HEAPTUPLESIZE); - memmove(ABSADDR(otuple->tuple) + HEAPTUPLESIZE, - heapTuple->t_data, - heapTuple->t_len); -} - /* ---------------------------------------------------------------- * ExecScanHashBucket * @@ -667,95 +560,46 @@ ExecHashOverflowInsert(HashJoinTable hashtable, */ HeapTuple ExecScanHashBucket(HashJoinState *hjstate, - HashBucket bucket, - HeapTuple curtuple, List *hjclauses, ExprContext *econtext) { - HeapTuple heapTuple; - bool qualResult; - OverflowTuple otuple = NULL; - OverflowTuple curotuple; - TupleTableSlot *inntuple; - OverflowTuple firstotuple; - OverflowTuple lastotuple; - HashJoinTable hashtable; + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; - hashtable = hjstate->hj_HashTable; - firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple); - lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple); - - /* ---------------- - * search the hash bucket - * ---------------- + /* hj_CurTuple is NULL to start scanning a new bucket, or the address + * of the last tuple returned from the current bucket. */ - if (curtuple == NULL || curtuple < (HeapTuple) ABSADDR(bucket->bottom)) + if (hashTuple == NULL) { - if (curtuple == NULL) - heapTuple = (HeapTuple) - MAXALIGN(ABSADDR(bucket->top)); - else - heapTuple = (HeapTuple) - MAXALIGN(((char *) curtuple + curtuple->t_len + HEAPTUPLESIZE)); - - while (heapTuple < (HeapTuple) ABSADDR(bucket->bottom)) - { - - heapTuple->t_data = (HeapTupleHeader) - ((char *) heapTuple + HEAPTUPLESIZE); - - inntuple = ExecStoreTuple(heapTuple, /* tuple to store */ - hjstate->hj_HashTupleSlot, /* slot */ - InvalidBuffer, /* tuple has no buffer */ - false); /* do not pfree this tuple */ - - econtext->ecxt_innertuple = inntuple; - qualResult = ExecQual((List *) hjclauses, econtext); - - if (qualResult) - return heapTuple; - - heapTuple = (HeapTuple) - MAXALIGN(((char *) heapTuple + heapTuple->t_len + HEAPTUPLESIZE)); - } - - if (firstotuple == NULL) - return NULL; - otuple = firstotuple; + hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + } + else + { + hashTuple = hashTuple->next; } - /* ---------------- - * search the overflow area of the hash bucket - * ---------------- - */ - if (otuple == NULL) + while (hashTuple != NULL) { - curotuple = hjstate->hj_CurOTuple; - otuple = (OverflowTuple) ABSADDR(curotuple->next); - } - - while (otuple != NULL) - { - heapTuple = (HeapTuple) ABSADDR(otuple->tuple); - heapTuple->t_data = (HeapTupleHeader) - ((char *) heapTuple + HEAPTUPLESIZE); + HeapTuple heapTuple = & hashTuple->htup; + TupleTableSlot *inntuple; + bool qualResult; + /* insert hashtable's tuple into exec slot so ExecQual sees it */ inntuple = ExecStoreTuple(heapTuple, /* tuple to store */ hjstate->hj_HashTupleSlot, /* slot */ - InvalidBuffer, /* SP?? this tuple has - * no buffer */ + InvalidBuffer, false); /* do not pfree this tuple */ - econtext->ecxt_innertuple = inntuple; - qualResult = ExecQual((List *) hjclauses, econtext); + + qualResult = ExecQual(hjclauses, econtext); if (qualResult) { - hjstate->hj_CurOTuple = otuple; + hjstate->hj_CurTuple = hashTuple; return heapTuple; } - otuple = (OverflowTuple) ABSADDR(otuple->next); + hashTuple = hashTuple->next; } /* ---------------- @@ -819,60 +663,57 @@ hashFunc(Datum key, int len, bool byVal) * reset hash table header for new batch * * ntuples is the number of tuples in the inner relation's batch + * (which we currently don't actually use...) * ---------------------------------------------------------------- */ void -ExecHashTableReset(HashJoinTable hashtable, int ntuples) +ExecHashTableReset(HashJoinTable hashtable, long ntuples) { + MemoryContext oldcxt; + int nbuckets = hashtable->nbuckets; int i; - HashBucket bucket; /* - * We can reset the number of hashbuckets since we are going to - * recalculate the hash values of all the tuples in the new batch - * anyway. We might as well spread out the hash values as much as - * we can within the available space. Note we must set nbuckets - * equal to totalbuckets since we will NOT generate any new output - * batches after this point. + * Release all the hash buckets and tuples acquired in the prior pass, + * and reinitialize the portal for a new pass. */ - hashtable->nbuckets = hashtable->totalbuckets = - (int) (hashtable->bottom / (hashtable->bucketsize * FUDGE_FAC)); + oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); + EndPortalAllocMode(); + StartPortalAllocMode(DefaultAllocMode, 0); /* - * reinitialize the overflow area to empty, and reinit each hash bucket. + * We still use the same number of physical buckets as in the first pass. + * (It could be different; but we already decided how many buckets would + * be appropriate for the allowed memory, so stick with that number.) + * We MUST set totalbuckets to equal nbuckets, because from now on + * no tuples will go out to temp files; there are no more virtual buckets, + * only real buckets. (This implies that tuples will go into different + * bucket numbers than they did on the first pass, but that's OK.) */ - hashtable->overflownext = hashtable->top + hashtable->bucketsize * - hashtable->nbuckets; - Assert(hashtable->overflownext < hashtable->bottom); + hashtable->totalbuckets = nbuckets; - bucket = (HashBucket) ABSADDR(hashtable->top); - for (i = 0; i < hashtable->nbuckets; i++) + /* Reallocate and reinitialize the hash bucket headers. */ + hashtable->buckets = (HashJoinTuple *) + palloc(nbuckets * sizeof(HashJoinTuple)); + + if (hashtable->buckets == NULL) + elog(ERROR, "Insufficient memory for hash table."); + + for (i = 0; i < nbuckets; i++) { - bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket))); - bucket->bottom = bucket->top; - bucket->firstotuple = bucket->lastotuple = -1; - bucket = (HashBucket) ((char *) bucket + hashtable->bucketsize); + hashtable->buckets[i] = NULL; } - hashtable->pcount = hashtable->nprocess; + MemoryContextSwitchTo(oldcxt); } void ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent) { - HashState *hashstate = node->hashstate; - - if (hashstate->hashBatches != NULL) - { - pfree(hashstate->hashBatches); - hashstate->hashBatches = NULL; - } - /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (((Plan *) node)->lefttree->chgParam == NULL) ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node); - } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 10e4cfb44f..b3808fab36 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -7,15 +7,12 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.19 1999/05/09 00:53:21 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.20 1999/05/18 21:33:06 tgl Exp $ * *------------------------------------------------------------------------- */ #include #include -#include -#include -#include #include "postgres.h" @@ -25,19 +22,15 @@ #include "executor/nodeHashjoin.h" #include "optimizer/clauses.h" /* for get_leftop */ -static TupleTableSlot * - ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, HashJoinState *hjstate); - -static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, char *buffer, - File file, TupleTableSlot *tupleSlot, int *block, char **position); - -static int ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable, - int nbatch); - +static TupleTableSlot *ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, + HashJoinState *hjstate); +static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, + BufFile *file, + TupleTableSlot *tupleSlot); +static int ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable); static int ExecHashJoinNewBatch(HashJoinState *hjstate); - /* ---------------------------------------------------------------- * ExecHashJoin * @@ -61,27 +54,14 @@ ExecHashJoin(HashJoin *node) TupleTableSlot *inntuple; Var *outerVar; ExprContext *econtext; - HashJoinTable hashtable; - int bucketno; - HashBucket bucket; HeapTuple curtuple; - bool qualResult; - TupleTableSlot *outerTupleSlot; TupleTableSlot *innerTupleSlot; - int nbatch; - int curbatch; - File *outerbatches; - RelativeAddr *outerbatchPos; Var *innerhashkey; - int batch; - int batchno; - char *buffer; int i; bool hashPhaseDone; - char *pos; /* ---------------- * get information from HashJoin node @@ -103,8 +83,6 @@ ExecHashJoin(HashJoin *node) * ----------------- */ hashtable = hjstate->hj_HashTable; - bucket = hjstate->hj_CurBucket; - curtuple = hjstate->hj_CurTuple; /* -------------------- * initialize expression context @@ -121,13 +99,13 @@ ExecHashJoin(HashJoin *node) if (!isDone) return result; } + /* ---------------- * if this is the first call, build the hash table for inner relation * ---------------- */ if (!hashPhaseDone) { /* if the hash phase not completed */ - hashtable = node->hashjointable; if (hashtable == NULL) { /* if the hash table has not been created */ /* ---------------- @@ -143,45 +121,25 @@ ExecHashJoin(HashJoin *node) * execute the Hash node, to build the hash table * ---------------- */ - hashNode->hashtable = hashtable; + hashNode->hashstate->hashtable = hashtable; innerTupleSlot = ExecProcNode((Plan *) hashNode, (Plan *) node); } - bucket = NULL; - curtuple = NULL; - curbatch = 0; node->hashdone = true; + /* ---------------- + * Open temp files for outer batches, if needed. + * Note that file buffers are palloc'd in regular executor context. + * ---------------- + */ + for (i = 0; i < hashtable->nbatch; i++) + { + File tfile = OpenTemporaryFile(); + Assert(tfile >= 0); + hashtable->outerBatchFile[i] = BufFileCreate(tfile); + } } else if (hashtable == NULL) return NULL; - nbatch = hashtable->nbatch; - outerbatches = hjstate->hj_OuterBatches; - if (nbatch > 0 && outerbatches == NULL) - { /* if needs hash partition */ - /* ----------------- - * allocate space for file descriptors of outer batch files - * then open the batch files in the current process - * ----------------- - */ - innerhashkey = hashNode->hashkey; - hjstate->hj_InnerHashKey = innerhashkey; - outerbatches = (File *) palloc(nbatch * sizeof(File)); - for (i = 0; i < nbatch; i++) - { - outerbatches[i] = OpenTemporaryFile(); - } - hjstate->hj_OuterBatches = outerbatches; - - /* ------------------ - * get the inner batch file descriptors from the - * hash node - * ------------------ - */ - hjstate->hj_InnerBatches = hashNode->hashstate->hashBatches; - } - outerbatchPos = (RelativeAddr *) ABSADDR(hashtable->outerbatchPos); - curbatch = hashtable->curbatch; - /* ---------------- * Now get an outer tuple and probe into the hash table for matches * ---------------- @@ -189,185 +147,106 @@ ExecHashJoin(HashJoin *node) outerTupleSlot = hjstate->jstate.cs_OuterTupleSlot; outerVar = get_leftop(clause); - bucketno = -1; /* if bucketno remains -1, means use old - * outer tuple */ - if (TupIsNull(outerTupleSlot)) + for (;;) { - /* * if the current outer tuple is nil, get a new one */ - outerTupleSlot = (TupleTableSlot *) - ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate); - - while (curbatch <= nbatch && TupIsNull(outerTupleSlot)) + if (TupIsNull(outerTupleSlot)) { - - /* - * if the current batch runs out, switch to new batch - */ - curbatch = ExecHashJoinNewBatch(hjstate); - if (curbatch > nbatch) + outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode, + (Plan *) node, + hjstate); + if (TupIsNull(outerTupleSlot)) { - /* - * when the last batch runs out, clean up + * when the last batch runs out, clean up and exit */ ExecHashTableDestroy(hashtable); hjstate->hj_HashTable = NULL; return NULL; } - else - outerTupleSlot = (TupleTableSlot *) - ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate); + + /* + * now we have an outer tuple, find the corresponding bucket for + * this tuple from the hash table + */ + econtext->ecxt_outertuple = outerTupleSlot; + hjstate->hj_CurBucketNo = ExecHashGetBucket(hashtable, econtext, + outerVar); + hjstate->hj_CurTuple = NULL; + + /* ---------------- + * Now we've got an outer tuple and the corresponding hash bucket, + * but this tuple may not belong to the current batch. + * This need only be checked in the first pass. + * ---------------- + */ + if (hashtable->curbatch == 0) + { + int batch = ExecHashJoinGetBatch(hjstate->hj_CurBucketNo, + hashtable); + if (batch > 0) + { + /* + * Need to postpone this outer tuple to a later batch. + * Save it in the corresponding outer-batch file. + */ + int batchno = batch - 1; + hashtable->outerBatchSize[batchno]++; + ExecHashJoinSaveTuple(outerTupleSlot->val, + hashtable->outerBatchFile[batchno]); + ExecClearTuple(outerTupleSlot); + continue; /* loop around for a new outer tuple */ + } + } } /* - * now we get an outer tuple, find the corresponding bucket for - * this tuple from the hash table + * OK, scan the selected hash bucket for matches */ - econtext->ecxt_outertuple = outerTupleSlot; - -#ifdef HJDEBUG - printf("Probing "); -#endif - bucketno = ExecHashGetBucket(hashtable, econtext, outerVar); - bucket = (HashBucket) (ABSADDR(hashtable->top) - + bucketno * hashtable->bucketsize); - } - - for (;;) - { - /* ---------------- - * Now we've got an outer tuple and the corresponding hash bucket, - * but this tuple may not belong to the current batch. - * ---------------- - */ - if (curbatch == 0 && bucketno != -1) /* if this is the first - * pass */ - batch = ExecHashJoinGetBatch(bucketno, hashtable, nbatch); - else - batch = 0; - if (batch > 0) + for (;;) { - + curtuple = ExecScanHashBucket(hjstate, + hjclauses, + econtext); + if (curtuple == NULL) + break; /* out of matches */ /* - * if the current outer tuple does not belong to the current - * batch, save to the tmp file for the corresponding batch. + * we've got a match, but still need to test qpqual */ - buffer = ABSADDR(hashtable->batch) + (batch - 1) * BLCKSZ; - batchno = batch - 1; - pos = ExecHashJoinSaveTuple(outerTupleSlot->val, - buffer, - outerbatches[batchno], - ABSADDR(outerbatchPos[batchno])); - - outerbatchPos[batchno] = RELADDR(pos); - } - else if (bucket != NULL) - { - do + inntuple = ExecStoreTuple(curtuple, + hjstate->hj_HashTupleSlot, + InvalidBuffer, + false); /* don't pfree this tuple */ + econtext->ecxt_innertuple = inntuple; + qualResult = ExecQual(qual, econtext); + /* ---------------- + * if we pass the qual, then save state for next call and + * have ExecProject form the projection, store it + * in the tuple table, and return the slot. + * ---------------- + */ + if (qualResult) { + ProjectionInfo *projInfo; + TupleTableSlot *result; + bool isDone; - /* - * scan the hash bucket for matches - */ - curtuple = ExecScanHashBucket(hjstate, - bucket, - curtuple, - hjclauses, - econtext); - - if (curtuple != NULL) - { - - /* - * we've got a match, but still need to test qpqual - */ - inntuple = ExecStoreTuple(curtuple, - hjstate->hj_HashTupleSlot, - InvalidBuffer, - false); /* don't pfree this - * tuple */ - - econtext->ecxt_innertuple = inntuple; - - /* ---------------- - * test to see if we pass the qualification - * ---------------- - */ - qualResult = ExecQual((List *) qual, econtext); - - /* ---------------- - * if we pass the qual, then save state for next call and - * have ExecProject form the projection, store it - * in the tuple table, and return the slot. - * ---------------- - */ - if (qualResult) - { - ProjectionInfo *projInfo; - TupleTableSlot *result; - bool isDone; - - hjstate->hj_CurBucket = bucket; - hjstate->hj_CurTuple = curtuple; - hashtable->curbatch = curbatch; - hjstate->jstate.cs_OuterTupleSlot = outerTupleSlot; - - projInfo = hjstate->jstate.cs_ProjInfo; - result = ExecProject(projInfo, &isDone); - hjstate->jstate.cs_TupFromTlist = !isDone; - return result; - } - } + hjstate->jstate.cs_OuterTupleSlot = outerTupleSlot; + projInfo = hjstate->jstate.cs_ProjInfo; + result = ExecProject(projInfo, &isDone); + hjstate->jstate.cs_TupFromTlist = !isDone; + return result; } - while (curtuple != NULL); } /* ---------------- * Now the current outer tuple has run out of matches, - * so we free it and get a new outer tuple. + * so we free it and loop around to get a new outer tuple. * ---------------- */ - outerTupleSlot = (TupleTableSlot *) - ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate); - - while (curbatch <= nbatch && TupIsNull(outerTupleSlot)) - { - - /* - * if the current batch runs out, switch to new batch - */ - curbatch = ExecHashJoinNewBatch(hjstate); - if (curbatch > nbatch) - { - - /* - * when the last batch runs out, clean up - */ - ExecHashTableDestroy(hashtable); - hjstate->hj_HashTable = NULL; - return NULL; - } - else - outerTupleSlot = (TupleTableSlot *) - ExecHashJoinOuterGetTuple(outerNode, (Plan *) node, hjstate); - } - - /* ---------------- - * Now get the corresponding hash bucket for the new - * outer tuple. - * ---------------- - */ - econtext->ecxt_outertuple = outerTupleSlot; -#ifdef HJDEBUG - printf("Probing "); -#endif - bucketno = ExecHashGetBucket(hashtable, econtext, outerVar); - bucket = (HashBucket) (ABSADDR(hashtable->top) - + bucketno * hashtable->bucketsize); - curtuple = NULL; + ExecClearTuple(outerTupleSlot); } } @@ -399,7 +278,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent) node->hashjoinstate = hjstate; /* ---------------- - * Miscellanious initialization + * Miscellaneous initialization * * + assign node's base_id * + assign debugging hooks and @@ -456,22 +335,16 @@ ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent) ExecAssignProjectionInfo((Plan *) node, &hjstate->jstate); /* ---------------- - * XXX comment me + * initialize hash-specific info * ---------------- */ node->hashdone = false; hjstate->hj_HashTable = (HashJoinTable) NULL; - hjstate->hj_HashTableShmId = (IpcMemoryId) 0; - hjstate->hj_CurBucket = (HashBucket) NULL; - hjstate->hj_CurTuple = (HeapTuple) NULL; - hjstate->hj_CurOTuple = (OverflowTuple) NULL; + hjstate->hj_CurBucketNo = 0; + hjstate->hj_CurTuple = (HashJoinTuple) NULL; hjstate->hj_InnerHashKey = (Var *) NULL; - hjstate->hj_OuterBatches = (File *) NULL; - hjstate->hj_InnerBatches = (File *) NULL; - hjstate->hj_OuterReadPos = (char *) NULL; - hjstate->hj_OuterReadBlk = (int) 0; hjstate->jstate.cs_OuterTupleSlot = (TupleTableSlot *) NULL; hjstate->jstate.cs_TupFromTlist = (bool) false; @@ -554,93 +427,69 @@ ExecEndHashJoin(HashJoin *node) static TupleTableSlot * ExecHashJoinOuterGetTuple(Plan *node, Plan *parent, HashJoinState *hjstate) { + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; TupleTableSlot *slot; - HashJoinTable hashtable; - int curbatch; - File *outerbatches; - char *outerreadPos; - int batchno; - char *outerreadBuf; - int outerreadBlk; - - hashtable = hjstate->hj_HashTable; - curbatch = hashtable->curbatch; if (curbatch == 0) { /* if it is the first pass */ slot = ExecProcNode(node, parent); - return slot; + if (! TupIsNull(slot)) + return slot; + /* + * We have just reached the end of the first pass. + * Try to switch to a saved batch. + */ + curbatch = ExecHashJoinNewBatch(hjstate); } /* - * otherwise, read from the tmp files + * Try to read from a temp file. + * Loop allows us to advance to new batch as needed. */ - outerbatches = hjstate->hj_OuterBatches; - outerreadPos = hjstate->hj_OuterReadPos; - outerreadBlk = hjstate->hj_OuterReadBlk; - outerreadBuf = ABSADDR(hashtable->readbuf); - batchno = curbatch - 1; + while (curbatch <= hashtable->nbatch) + { + slot = ExecHashJoinGetSavedTuple(hjstate, + hashtable->outerBatchFile[curbatch-1], + hjstate->hj_OuterTupleSlot); + if (! TupIsNull(slot)) + return slot; + curbatch = ExecHashJoinNewBatch(hjstate); + } - slot = ExecHashJoinGetSavedTuple(hjstate, - outerreadBuf, - outerbatches[batchno], - hjstate->hj_OuterTupleSlot, - &outerreadBlk, - &outerreadPos); - - hjstate->hj_OuterReadPos = outerreadPos; - hjstate->hj_OuterReadBlk = outerreadBlk; - - return slot; + /* Out of batches... */ + return NULL; } /* ---------------------------------------------------------------- * ExecHashJoinGetSavedTuple * - * read the next tuple from a tmp file using a certain buffer + * read the next tuple from a tmp file * ---------------------------------------------------------------- */ static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, - char *buffer, - File file, - TupleTableSlot *tupleSlot, - int *block, /* return parameter */ - char **position) /* return parameter */ + BufFile *file, + TupleTableSlot *tupleSlot) { - char *bufstart; - char *bufend; - int cc; - HeapTuple heapTuple; - HashJoinTable hashtable; + HeapTupleData htup; + size_t nread; + HeapTuple heapTuple; - hashtable = hjstate->hj_HashTable; - bufend = buffer + *(long *) buffer; - bufstart = (char *) (buffer + sizeof(long)); - if ((*position == NULL) || (*position >= bufend)) - { - if (*position == NULL) - (*block) = 0; - else - (*block)++; - FileSeek(file, *block * BLCKSZ, SEEK_SET); - cc = FileRead(file, buffer, BLCKSZ); - NDirectFileRead++; - if (cc < 0) - perror("FileRead"); - if (cc == 0) /* end of file */ - return NULL; - else - (*position) = bufstart; - } - heapTuple = (HeapTuple) (*position); + nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData)); + if (nread == 0) + return NULL; /* end of file */ + if (nread != sizeof(HeapTupleData)) + elog(ERROR, "Read from hashjoin temp file failed"); + heapTuple = palloc(HEAPTUPLESIZE + htup.t_len); + memcpy((char *) heapTuple, (char *) &htup, sizeof(HeapTupleData)); heapTuple->t_data = (HeapTupleHeader) ((char *) heapTuple + HEAPTUPLESIZE); - (*position) = (char *) MAXALIGN(*position + - heapTuple->t_len + HEAPTUPLESIZE); - - return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, false); + nread = BufFileRead(file, (void *) heapTuple->t_data, htup.t_len); + if (nread != (size_t) htup.t_len) + elog(ERROR, "Read from hashjoin temp file failed"); + return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, true); } /* ---------------------------------------------------------------- @@ -652,116 +501,80 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate, static int ExecHashJoinNewBatch(HashJoinState *hjstate) { - File *innerBatches; - File *outerBatches; - int *innerBatchSizes; - Var *innerhashkey; - HashJoinTable hashtable; - int nbatch; - char *readPos; - int readBlk; - char *readBuf; + HashJoinTable hashtable = hjstate->hj_HashTable; + int nbatch = hashtable->nbatch; + int newbatch = hashtable->curbatch + 1; + long *innerBatchSize = hashtable->innerBatchSize; + long *outerBatchSize = hashtable->outerBatchSize; + BufFile *innerFile; TupleTableSlot *slot; ExprContext *econtext; - int i; - int cc; - int newbatch; + Var *innerhashkey; - hashtable = hjstate->hj_HashTable; - outerBatches = hjstate->hj_OuterBatches; - innerBatches = hjstate->hj_InnerBatches; - nbatch = hashtable->nbatch; - newbatch = hashtable->curbatch + 1; - - /* ------------------ - * this is the last process, so it will do the cleanup and - * batch-switching. - * ------------------ - */ - if (newbatch == 1) - { - - /* - * if it is end of the first pass, flush all the last pages for - * the batches. - */ - outerBatches = hjstate->hj_OuterBatches; - for (i = 0; i < nbatch; i++) - { - cc = FileSeek(outerBatches[i], 0L, SEEK_END); - if (cc < 0) - perror("FileSeek"); - cc = FileWrite(outerBatches[i], - ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ); - NDirectFileWrite++; - if (cc < 0) - perror("FileWrite"); - } - } if (newbatch > 1) { - /* - * remove the previous outer batch + * We no longer need the previous outer batch file; + * close it right away to free disk space. */ - FileUnlink(outerBatches[newbatch - 2]); + BufFileClose(hashtable->outerBatchFile[newbatch - 2]); + hashtable->outerBatchFile[newbatch - 2] = NULL; } - /* - * rebuild the hash table for the new inner batch - */ - innerBatchSizes = (int *) ABSADDR(hashtable->innerbatchSizes); /* -------------- - * skip over empty inner batches + * We can skip over any batches that are empty on either side. + * Release associated temp files right away. * -------------- */ - while (newbatch <= nbatch && innerBatchSizes[newbatch - 1] == 0) + while (newbatch <= nbatch && + (innerBatchSize[newbatch - 1] == 0L || + outerBatchSize[newbatch - 1] == 0L)) { - FileUnlink(outerBatches[newbatch - 1]); - FileUnlink(innerBatches[newbatch - 1]); + BufFileClose(hashtable->innerBatchFile[newbatch - 1]); + hashtable->innerBatchFile[newbatch - 1] = NULL; + BufFileClose(hashtable->outerBatchFile[newbatch - 1]); + hashtable->outerBatchFile[newbatch - 1] = NULL; newbatch++; } + if (newbatch > nbatch) - { - hashtable->pcount = hashtable->nprocess; + return newbatch; /* no more batches */ - return newbatch; - } - ExecHashTableReset(hashtable, innerBatchSizes[newbatch - 1]); + /* + * Rewind inner and outer batch files for this batch, + * so that we can start reading them. + */ + if (BufFileSeek(hashtable->outerBatchFile[newbatch - 1], 0L, + SEEK_SET) != 0L) + elog(ERROR, "Failed to rewind hash temp file"); + innerFile = hashtable->innerBatchFile[newbatch - 1]; + + if (BufFileSeek(innerFile, 0L, SEEK_SET) != 0L) + elog(ERROR, "Failed to rewind hash temp file"); + + /* + * Reload the hash table with the new inner batch + */ + ExecHashTableReset(hashtable, innerBatchSize[newbatch - 1]); econtext = hjstate->jstate.cs_ExprContext; innerhashkey = hjstate->hj_InnerHashKey; - readPos = NULL; - readBlk = 0; - readBuf = ABSADDR(hashtable->readbuf); while ((slot = ExecHashJoinGetSavedTuple(hjstate, - readBuf, - innerBatches[newbatch - 1], - hjstate->hj_HashTupleSlot, - &readBlk, - &readPos)) + innerFile, + hjstate->hj_HashTupleSlot)) && !TupIsNull(slot)) { econtext->ecxt_innertuple = slot; - ExecHashTableInsert(hashtable, econtext, innerhashkey, NULL); - /* possible bug - glass */ + ExecHashTableInsert(hashtable, econtext, innerhashkey); } - - /* ----------------- - * only the last process comes to this branch - * now all the processes have finished the build phase - * ---------------- - */ - /* - * after we build the hash table, the inner batch is no longer needed + * after we build the hash table, the inner batch file is no longer needed */ - FileUnlink(innerBatches[newbatch - 1]); - hjstate->hj_OuterReadPos = NULL; - hashtable->pcount = hashtable->nprocess; + BufFileClose(innerFile); + hashtable->innerBatchFile[newbatch - 1] = NULL; hashtable->curbatch = newbatch; return newbatch; @@ -777,63 +590,41 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * ---------------------------------------------------------------- */ static int -ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable, int nbatch) +ExecHashJoinGetBatch(int bucketno, HashJoinTable hashtable) { int b; - if (bucketno < hashtable->nbuckets || nbatch == 0) + if (bucketno < hashtable->nbuckets || hashtable->nbatch == 0) return 0; - b = (float) (bucketno - hashtable->nbuckets) / - (float) (hashtable->totalbuckets - hashtable->nbuckets) * - nbatch; + b = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) / + (hashtable->totalbuckets - hashtable->nbuckets); return b + 1; } /* ---------------------------------------------------------------- * ExecHashJoinSaveTuple * - * save a tuple to a tmp file using a buffer. - * the first few bytes in a page is an offset to the end - * of the page. + * save a tuple to a tmp file. + * + * The data recorded in the file for each tuple is an image of its + * HeapTupleData (with meaningless t_data pointer) followed by the + * HeapTupleHeader and tuple data. * ---------------------------------------------------------------- */ -char * +void ExecHashJoinSaveTuple(HeapTuple heapTuple, - char *buffer, - File file, - char *position) + BufFile *file) { - long *pageend; - char *pagestart; - char *pagebound; - int cc; + size_t written; - pageend = (long *) buffer; - pagestart = (char *) (buffer + sizeof(long)); - pagebound = buffer + BLCKSZ; - if (position == NULL) - position = pagestart; - - if (position + heapTuple->t_len + HEAPTUPLESIZE >= pagebound) - { - cc = FileSeek(file, 0L, SEEK_END); - if (cc < 0) - perror("FileSeek"); - cc = FileWrite(file, buffer, BLCKSZ); - NDirectFileWrite++; - if (cc < 0) - perror("FileWrite"); - position = pagestart; - *pageend = 0; - } - memmove(position, heapTuple, HEAPTUPLESIZE); - memmove(position + HEAPTUPLESIZE, heapTuple->t_data, heapTuple->t_len); - position = (char *) MAXALIGN(position + heapTuple->t_len + HEAPTUPLESIZE); - *pageend = position - buffer; - - return position; + written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData)); + if (written != sizeof(HeapTupleData)) + elog(ERROR, "Write to hashjoin temp file failed"); + written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len); + if (written != (size_t) heapTuple->t_len) + elog(ERROR, "Write to hashjoin temp file failed"); } void @@ -855,14 +646,10 @@ ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent) ExecHashTableDestroy(hjstate->hj_HashTable); hjstate->hj_HashTable = NULL; } - hjstate->hj_CurBucket = (HashBucket) NULL; - hjstate->hj_CurTuple = (HeapTuple) NULL; - hjstate->hj_CurOTuple = (OverflowTuple) NULL; + + hjstate->hj_CurBucketNo = 0; + hjstate->hj_CurTuple = (HashJoinTuple) NULL; hjstate->hj_InnerHashKey = (Var *) NULL; - hjstate->hj_OuterBatches = (File *) NULL; - hjstate->hj_InnerBatches = (File *) NULL; - hjstate->hj_OuterReadPos = (char *) NULL; - hjstate->hj_OuterReadBlk = (int) 0; hjstate->jstate.cs_OuterTupleSlot = (TupleTableSlot *) NULL; hjstate->jstate.cs_TupFromTlist = (bool) false; @@ -875,5 +662,4 @@ ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent) ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node); if (((Plan *) node)->righttree->chgParam == NULL) ExecReScan(((Plan *) node)->righttree, exprCtxt, (Plan *) node); - } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 62e2164df3..751b5efee1 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -1,85 +1,92 @@ /*------------------------------------------------------------------------- * * hashjoin.h - * internal structures for hash table and buckets + * internal structures for hash joins * * * Copyright (c) 1994, Regents of the University of California * - * $Id: hashjoin.h,v 1.10 1999/05/09 00:53:18 tgl Exp $ + * $Id: hashjoin.h,v 1.11 1999/05/18 21:33:04 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef HASHJOIN_H #define HASHJOIN_H -#include - -/* ----------------- - * have to use relative address as pointers in the hashtable - * because the hashtable may reallocate in different processes - * - * XXX: this relative-address stuff is useless on all supported platforms - * and is a ever-dangerous source of bugs. Really ought to rip it out. - * ----------------- - */ -typedef int RelativeAddr; - -/* ------------------ - * The relative addresses are always relative to the head of the - * hashtable, the following macros convert them to/from absolute address. - * NULL is represented as -1 (CAUTION: RELADDR() doesn't handle that!). - * CAUTION: ABSADDR evaluates its arg twice!! - * ------------------ - */ -#define ABSADDR(X) ((X) < 0 ? (char*) NULL : (char*)hashtable + (X)) -#define RELADDR(X) ((RelativeAddr)((char*)(X) - (char*)hashtable)) - -typedef char **charPP; -typedef int *intP; +#include "access/htup.h" +#include "storage/fd.h" +#include "utils/mcxt.h" /* ---------------------------------------------------------------- * hash-join hash table structures + * + * Each active hashjoin has a HashJoinTable control block which is + * palloc'd in the executor's context. All other storage needed for + * the hashjoin is kept in a private "named portal", one for each hashjoin. + * This makes it easy and fast to release the storage when we don't need it + * anymore. + * + * The portal manager guarantees that portals will be discarded at end of + * transaction, so we have no problem with a memory leak if the join is + * aborted early by an error. (Likewise, any temporary files we make will + * be cleaned up by the virtual file manager in event of an error.) + * + * Storage that should live through the entire join is allocated from the + * portal's "variable context", while storage that is only wanted for the + * current batch is allocated in the portal's "heap context". By popping + * the portal's heap at the end of a batch, we free all the per-batch storage + * reliably and without tedium. * ---------------------------------------------------------------- */ + +typedef struct HashJoinTupleData +{ + struct HashJoinTupleData *next; /* link to next tuple in same bucket */ + HeapTupleData htup; /* tuple header */ +} HashJoinTupleData; + +typedef HashJoinTupleData *HashJoinTuple; + typedef struct HashTableData { - int nbuckets; - int totalbuckets; - int bucketsize; - IpcMemoryId shmid; - RelativeAddr top; /* char* */ - RelativeAddr bottom; /* char* */ - RelativeAddr overflownext; /* char* */ - RelativeAddr batch; /* char* */ - RelativeAddr readbuf; /* char* */ - int nbatch; - RelativeAddr outerbatchPos; /* RelativeAddr* */ - RelativeAddr innerbatchPos; /* RelativeAddr* */ - RelativeAddr innerbatchSizes; /* int* */ - int curbatch; - int nprocess; - int pcount; -} HashTableData; /* real hash table follows here */ + int nbuckets; /* buckets in use during this batch */ + int totalbuckets; /* total number of (virtual) buckets */ + HashJoinTuple *buckets; /* buckets[i] is head of list of tuples */ + /* buckets array is per-batch storage, as are all the tuples */ + + int nbatch; /* number of batches; 0 means 1-pass join */ + int curbatch; /* current batch #, or 0 during 1st pass */ + + /* all these arrays are allocated for the life of the hash join, + * but only if nbatch > 0: + */ + BufFile **innerBatchFile; /* buffered virtual temp file per batch */ + BufFile **outerBatchFile; /* buffered virtual temp file per batch */ + long *outerBatchSize; /* count of tuples in each outer batch file */ + long *innerBatchSize; /* count of tuples in each inner batch file */ + + /* During 1st scan of inner relation, we get tuples from executor. + * If nbatch > 0 then tuples that don't belong in first nbuckets logical + * buckets get dumped into inner-batch temp files. + * The same statements apply for the 1st scan of the outer relation, + * except we write tuples to outer-batch temp files. + * If nbatch > 0 then we do the following for each batch: + * 1. Read tuples from inner batch file, load into hash buckets. + * 2. Read tuples from outer batch file, match to hash buckets and output. + */ + + /* Ugly kluge: myPortal ought to be declared as type Portal (ie, PortalD*) + * but if we try to include utils/portal.h here, we end up with a + * circular dependency of include files! Until the various node.h files + * are restructured in a cleaner way, we have to fake it. The most + * reliable fake seems to be to declare myPortal as void * and then + * cast it to the right things in nodeHash.c. + */ + void *myPortal; /* where to keep working storage */ + MemoryContext hashCxt; /* context for whole-hash-join storage */ + MemoryContext batchCxt; /* context for this-batch-only storage */ +} HashTableData; typedef HashTableData *HashJoinTable; -typedef struct OverflowTupleData -{ - RelativeAddr tuple; /* HeapTuple */ - RelativeAddr next; /* struct OverflowTupleData * */ -} OverflowTupleData; /* real tuple follows here */ - -typedef OverflowTupleData *OverflowTuple; - -typedef struct HashBucketData -{ - RelativeAddr top; /* HeapTuple */ - RelativeAddr bottom; /* HeapTuple */ - RelativeAddr firstotuple; /* OverflowTuple */ - RelativeAddr lastotuple; /* OverflowTuple */ -} HashBucketData; /* real bucket follows here */ - -typedef HashBucketData *HashBucket; - #endif /* HASHJOIN_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index c062e93a0a..2e19824257 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: nodeHash.h,v 1.11 1999/02/13 23:21:25 momjian Exp $ + * $Id: nodeHash.h,v 1.12 1999/05/18 21:33:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -18,7 +18,6 @@ #include "nodes/execnodes.h" #include "nodes/pg_list.h" #include "nodes/plannodes.h" -#include "storage/fd.h" #include "utils/syscache.h" extern TupleTableSlot *ExecHash(Hash *node); @@ -26,15 +25,14 @@ extern bool ExecInitHash(Hash *node, EState *estate, Plan *parent); extern int ExecCountSlotsHash(Hash *node); extern void ExecEndHash(Hash *node); extern HashJoinTable ExecHashTableCreate(Hash *node); -extern void ExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext, - Var *hashkey, File *batches); extern void ExecHashTableDestroy(HashJoinTable hashtable); +extern void ExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext, + Var *hashkey); extern int ExecHashGetBucket(HashJoinTable hashtable, ExprContext *econtext, - Var *hashkey); -extern HeapTuple ExecScanHashBucket(HashJoinState *hjstate, HashBucket bucket, - HeapTuple curtuple, List *hjclauses, - ExprContext *econtext); -extern void ExecHashTableReset(HashJoinTable hashtable, int ntuples); + Var *hashkey); +extern HeapTuple ExecScanHashBucket(HashJoinState *hjstate, List *hjclauses, + ExprContext *econtext); +extern void ExecHashTableReset(HashJoinTable hashtable, long ntuples); extern void ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent); #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index cb917c73e9..7b6c8a6e84 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: nodeHashjoin.h,v 1.11 1999/02/13 23:21:26 momjian Exp $ + * $Id: nodeHashjoin.h,v 1.12 1999/05/18 21:33:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -21,9 +21,7 @@ extern TupleTableSlot *ExecHashJoin(HashJoin *node); extern bool ExecInitHashJoin(HashJoin *node, EState *estate, Plan *parent); extern int ExecCountSlotsHashJoin(HashJoin *node); extern void ExecEndHashJoin(HashJoin *node); -extern char *ExecHashJoinSaveTuple(HeapTuple heapTuple, char *buffer, - File file, char *position); +extern void ExecHashJoinSaveTuple(HeapTuple heapTuple, BufFile *file); extern void ExecReScanHashJoin(HashJoin *node, ExprContext *exprCtxt, Plan *parent); - #endif /* NODEHASHJOIN_H */