Update hash and join routines to use fd.c's new temp-file
code, instead of not-very-bulletproof stuff they had before.
This commit is contained in:
parent
c1167a08ca
commit
71d5d95376
|
@ -5,7 +5,7 @@
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* IDENTIFICATION
|
* IDENTIFICATION
|
||||||
* $Id: nbtsort.c,v 1.37 1999/02/21 03:48:27 scrappy Exp $
|
* $Id: nbtsort.c,v 1.38 1999/05/09 00:53:19 tgl Exp $
|
||||||
*
|
*
|
||||||
* NOTES
|
* NOTES
|
||||||
*
|
*
|
||||||
|
@ -85,11 +85,9 @@ static void _bt_uppershutdown(Relation index, BTPageState *state);
|
||||||
|
|
||||||
#define MAXTAPES (7)
|
#define MAXTAPES (7)
|
||||||
#define TAPEBLCKSZ (BLCKSZ << 2)
|
#define TAPEBLCKSZ (BLCKSZ << 2)
|
||||||
#define TAPETEMP "pg_btsortXXXXXXX"
|
|
||||||
|
|
||||||
extern int NDirectFileRead;
|
extern int NDirectFileRead;
|
||||||
extern int NDirectFileWrite;
|
extern int NDirectFileWrite;
|
||||||
extern char *mktemp(char *template);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* this is what we use to shovel BTItems in and out of memory. it's
|
* this is what we use to shovel BTItems in and out of memory. it's
|
||||||
|
@ -107,7 +105,7 @@ extern char *mktemp(char *template);
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
int bttb_magic; /* magic number */
|
int bttb_magic; /* magic number */
|
||||||
int bttb_fd; /* file descriptor */
|
File bttb_fd; /* file descriptor */
|
||||||
int bttb_top; /* top of free space within bttb_data */
|
int bttb_top; /* top of free space within bttb_data */
|
||||||
short bttb_ntup; /* number of tuples in this block */
|
short bttb_ntup; /* number of tuples in this block */
|
||||||
short bttb_eor; /* End-Of-Run marker */
|
short bttb_eor; /* End-Of-Run marker */
|
||||||
|
@ -380,7 +378,7 @@ _bt_tapereset(BTTapeBlock *tape)
|
||||||
static void
|
static void
|
||||||
_bt_taperewind(BTTapeBlock *tape)
|
_bt_taperewind(BTTapeBlock *tape)
|
||||||
{
|
{
|
||||||
FileSeek(tape->bttb_fd, 0, SEEK_SET);
|
FileSeek(tape->bttb_fd, 0L, SEEK_SET);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -411,7 +409,7 @@ _bt_tapeclear(BTTapeBlock *tape)
|
||||||
* as well as opening a physical tape file.
|
* as well as opening a physical tape file.
|
||||||
*/
|
*/
|
||||||
static BTTapeBlock *
|
static BTTapeBlock *
|
||||||
_bt_tapecreate(char *fname)
|
_bt_tapecreate(void)
|
||||||
{
|
{
|
||||||
BTTapeBlock *tape = (BTTapeBlock *) palloc(sizeof(BTTapeBlock));
|
BTTapeBlock *tape = (BTTapeBlock *) palloc(sizeof(BTTapeBlock));
|
||||||
|
|
||||||
|
@ -420,11 +418,7 @@ _bt_tapecreate(char *fname)
|
||||||
|
|
||||||
tape->bttb_magic = BTTAPEMAGIC;
|
tape->bttb_magic = BTTAPEMAGIC;
|
||||||
|
|
||||||
#ifndef __CYGWIN32__
|
tape->bttb_fd = OpenTemporaryFile();
|
||||||
tape->bttb_fd = FileNameOpenFile(fname, O_RDWR | O_CREAT | O_TRUNC, 0600);
|
|
||||||
#else
|
|
||||||
tape->bttb_fd = FileNameOpenFile(fname, O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0600);
|
|
||||||
#endif
|
|
||||||
Assert(tape->bttb_fd >= 0);
|
Assert(tape->bttb_fd >= 0);
|
||||||
|
|
||||||
/* initialize the buffer */
|
/* initialize the buffer */
|
||||||
|
@ -467,7 +461,7 @@ _bt_tapewrite(BTTapeBlock *tape, int eor)
|
||||||
static int
|
static int
|
||||||
_bt_taperead(BTTapeBlock *tape)
|
_bt_taperead(BTTapeBlock *tape)
|
||||||
{
|
{
|
||||||
int fd;
|
File fd;
|
||||||
int nread;
|
int nread;
|
||||||
|
|
||||||
if (tape->bttb_eor)
|
if (tape->bttb_eor)
|
||||||
|
@ -550,9 +544,8 @@ _bt_spoolinit(Relation index, int ntapes, bool isunique)
|
||||||
{
|
{
|
||||||
BTSpool *btspool = (BTSpool *) palloc(sizeof(BTSpool));
|
BTSpool *btspool = (BTSpool *) palloc(sizeof(BTSpool));
|
||||||
int i;
|
int i;
|
||||||
char *fname = (char *) palloc(sizeof(TAPETEMP) + 1);
|
|
||||||
|
|
||||||
if (btspool == (BTSpool *) NULL || fname == (char *) NULL)
|
if (btspool == (BTSpool *) NULL)
|
||||||
elog(ERROR, "_bt_spoolinit: out of memory");
|
elog(ERROR, "_bt_spoolinit: out of memory");
|
||||||
MemSet((char *) btspool, 0, sizeof(BTSpool));
|
MemSet((char *) btspool, 0, sizeof(BTSpool));
|
||||||
btspool->bts_ntapes = ntapes;
|
btspool->bts_ntapes = ntapes;
|
||||||
|
@ -567,10 +560,9 @@ _bt_spoolinit(Relation index, int ntapes, bool isunique)
|
||||||
|
|
||||||
for (i = 0; i < ntapes; ++i)
|
for (i = 0; i < ntapes; ++i)
|
||||||
{
|
{
|
||||||
btspool->bts_itape[i] = _bt_tapecreate(mktemp(strcpy(fname, TAPETEMP)));
|
btspool->bts_itape[i] = _bt_tapecreate();
|
||||||
btspool->bts_otape[i] = _bt_tapecreate(mktemp(strcpy(fname, TAPETEMP)));
|
btspool->bts_otape[i] = _bt_tapecreate();
|
||||||
}
|
}
|
||||||
pfree((void *) fname);
|
|
||||||
|
|
||||||
_bt_isortcmpinit(index, btspool);
|
_bt_isortcmpinit(index, btspool);
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
* Copyright (c) 1994, Regents of the University of California
|
* Copyright (c) 1994, Regents of the University of California
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* $Id: nodeHash.c,v 1.33 1999/05/06 00:30:46 tgl Exp $
|
* $Id: nodeHash.c,v 1.34 1999/05/09 00:53:20 tgl Exp $
|
||||||
*
|
*
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
@ -39,9 +39,6 @@
|
||||||
|
|
||||||
extern int NBuffers;
|
extern int NBuffers;
|
||||||
|
|
||||||
#define HJ_TEMP_NAMELEN 16 /* max length for mk_hj_temp file names */
|
|
||||||
|
|
||||||
static void mk_hj_temp(char *tempname);
|
|
||||||
static int hashFunc(Datum key, int len, bool byVal);
|
static int hashFunc(Datum key, int len, bool byVal);
|
||||||
static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable);
|
static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable);
|
||||||
static void * absHashTableAlloc(int size, HashJoinTable hashtable);
|
static void * absHashTableAlloc(int size, HashJoinTable hashtable);
|
||||||
|
@ -72,7 +69,6 @@ ExecHash(Hash *node)
|
||||||
RelativeAddr *batchPos;
|
RelativeAddr *batchPos;
|
||||||
int *batchSizes;
|
int *batchSizes;
|
||||||
int i;
|
int i;
|
||||||
RelativeAddr *innerbatchNames;
|
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
* get state info from node
|
* get state info from node
|
||||||
|
@ -91,8 +87,6 @@ ExecHash(Hash *node)
|
||||||
|
|
||||||
if (nbatch > 0)
|
if (nbatch > 0)
|
||||||
{ /* if needs hash partition */
|
{ /* if needs hash partition */
|
||||||
innerbatchNames = (RelativeAddr *) ABSADDR(hashtable->innerbatchNames);
|
|
||||||
|
|
||||||
/* --------------
|
/* --------------
|
||||||
* allocate space for the file descriptors of batch files
|
* allocate space for the file descriptors of batch files
|
||||||
* then open the batch files in the current processes.
|
* then open the batch files in the current processes.
|
||||||
|
@ -101,13 +95,7 @@ ExecHash(Hash *node)
|
||||||
batches = (File *) palloc(nbatch * sizeof(File));
|
batches = (File *) palloc(nbatch * sizeof(File));
|
||||||
for (i = 0; i < nbatch; i++)
|
for (i = 0; i < nbatch; i++)
|
||||||
{
|
{
|
||||||
#ifndef __CYGWIN32__
|
batches[i] = OpenTemporaryFile();
|
||||||
batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
|
|
||||||
O_CREAT | O_RDWR, 0600);
|
|
||||||
#else
|
|
||||||
batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
|
|
||||||
O_CREAT | O_RDWR | O_BINARY, 0600);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
hashstate->hashBatches = batches;
|
hashstate->hashBatches = batches;
|
||||||
batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
|
batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
|
||||||
|
@ -291,7 +279,7 @@ absHashTableAlloc(int size, HashJoinTable hashtable)
|
||||||
* ----------------------------------------------------------------
|
* ----------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
#define NTUP_PER_BUCKET 10
|
#define NTUP_PER_BUCKET 10
|
||||||
#define FUDGE_FAC 1.5
|
#define FUDGE_FAC 2.0
|
||||||
|
|
||||||
HashJoinTable
|
HashJoinTable
|
||||||
ExecHashTableCreate(Hash *node)
|
ExecHashTableCreate(Hash *node)
|
||||||
|
@ -310,12 +298,9 @@ ExecHashTableCreate(Hash *node)
|
||||||
int totalbuckets;
|
int totalbuckets;
|
||||||
int bucketsize;
|
int bucketsize;
|
||||||
int i;
|
int i;
|
||||||
RelativeAddr *outerbatchNames;
|
|
||||||
RelativeAddr *outerbatchPos;
|
RelativeAddr *outerbatchPos;
|
||||||
RelativeAddr *innerbatchNames;
|
|
||||||
RelativeAddr *innerbatchPos;
|
RelativeAddr *innerbatchPos;
|
||||||
int *innerbatchSizes;
|
int *innerbatchSizes;
|
||||||
RelativeAddr tempname;
|
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
* Get information about the size of the relation to be hashed
|
* Get information about the size of the relation to be hashed
|
||||||
|
@ -425,46 +410,32 @@ ExecHashTableCreate(Hash *node)
|
||||||
* allocate and initialize the outer batches
|
* allocate and initialize the outer batches
|
||||||
* ---------------
|
* ---------------
|
||||||
*/
|
*/
|
||||||
outerbatchNames = (RelativeAddr *)
|
|
||||||
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
|
||||||
outerbatchPos = (RelativeAddr *)
|
outerbatchPos = (RelativeAddr *)
|
||||||
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
||||||
for (i = 0; i < nbatch; i++)
|
for (i = 0; i < nbatch; i++)
|
||||||
{
|
{
|
||||||
tempname = hashTableAlloc(HJ_TEMP_NAMELEN, hashtable);
|
|
||||||
mk_hj_temp(ABSADDR(tempname));
|
|
||||||
outerbatchNames[i] = tempname;
|
|
||||||
outerbatchPos[i] = -1;
|
outerbatchPos[i] = -1;
|
||||||
}
|
}
|
||||||
hashtable->outerbatchNames = RELADDR(outerbatchNames);
|
|
||||||
hashtable->outerbatchPos = RELADDR(outerbatchPos);
|
hashtable->outerbatchPos = RELADDR(outerbatchPos);
|
||||||
/* ---------------
|
/* ---------------
|
||||||
* allocate and initialize the inner batches
|
* allocate and initialize the inner batches
|
||||||
* ---------------
|
* ---------------
|
||||||
*/
|
*/
|
||||||
innerbatchNames = (RelativeAddr *)
|
|
||||||
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
|
||||||
innerbatchPos = (RelativeAddr *)
|
innerbatchPos = (RelativeAddr *)
|
||||||
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
|
||||||
innerbatchSizes = (int *)
|
innerbatchSizes = (int *)
|
||||||
absHashTableAlloc(nbatch * sizeof(int), hashtable);
|
absHashTableAlloc(nbatch * sizeof(int), hashtable);
|
||||||
for (i = 0; i < nbatch; i++)
|
for (i = 0; i < nbatch; i++)
|
||||||
{
|
{
|
||||||
tempname = hashTableAlloc(HJ_TEMP_NAMELEN, hashtable);
|
|
||||||
mk_hj_temp(ABSADDR(tempname));
|
|
||||||
innerbatchNames[i] = tempname;
|
|
||||||
innerbatchPos[i] = -1;
|
innerbatchPos[i] = -1;
|
||||||
innerbatchSizes[i] = 0;
|
innerbatchSizes[i] = 0;
|
||||||
}
|
}
|
||||||
hashtable->innerbatchNames = RELADDR(innerbatchNames);
|
|
||||||
hashtable->innerbatchPos = RELADDR(innerbatchPos);
|
hashtable->innerbatchPos = RELADDR(innerbatchPos);
|
||||||
hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
|
hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
hashtable->outerbatchNames = (RelativeAddr) NULL;
|
|
||||||
hashtable->outerbatchPos = (RelativeAddr) NULL;
|
hashtable->outerbatchPos = (RelativeAddr) NULL;
|
||||||
hashtable->innerbatchNames = (RelativeAddr) NULL;
|
|
||||||
hashtable->innerbatchPos = (RelativeAddr) NULL;
|
hashtable->innerbatchPos = (RelativeAddr) NULL;
|
||||||
hashtable->innerbatchSizes = (RelativeAddr) NULL;
|
hashtable->innerbatchSizes = (RelativeAddr) NULL;
|
||||||
}
|
}
|
||||||
|
@ -886,15 +857,6 @@ ExecHashTableReset(HashJoinTable hashtable, int ntuples)
|
||||||
hashtable->pcount = hashtable->nprocess;
|
hashtable->pcount = hashtable->nprocess;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
mk_hj_temp(char *tempname)
|
|
||||||
{
|
|
||||||
static int hjtmpcnt = 0;
|
|
||||||
|
|
||||||
snprintf(tempname, HJ_TEMP_NAMELEN, "HJ%d.%d", (int) MyProcPid, hjtmpcnt);
|
|
||||||
hjtmpcnt = (hjtmpcnt + 1) % 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent)
|
ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent)
|
||||||
{
|
{
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* IDENTIFICATION
|
* IDENTIFICATION
|
||||||
* $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.18 1999/05/06 00:30:47 tgl Exp $
|
* $Header: /cvsroot/pgsql/src/backend/executor/nodeHashjoin.c,v 1.19 1999/05/09 00:53:21 tgl Exp $
|
||||||
*
|
*
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
@ -74,7 +74,6 @@ ExecHashJoin(HashJoin *node)
|
||||||
int nbatch;
|
int nbatch;
|
||||||
int curbatch;
|
int curbatch;
|
||||||
File *outerbatches;
|
File *outerbatches;
|
||||||
RelativeAddr *outerbatchNames;
|
|
||||||
RelativeAddr *outerbatchPos;
|
RelativeAddr *outerbatchPos;
|
||||||
Var *innerhashkey;
|
Var *innerhashkey;
|
||||||
int batch;
|
int batch;
|
||||||
|
@ -166,21 +165,10 @@ ExecHashJoin(HashJoin *node)
|
||||||
*/
|
*/
|
||||||
innerhashkey = hashNode->hashkey;
|
innerhashkey = hashNode->hashkey;
|
||||||
hjstate->hj_InnerHashKey = innerhashkey;
|
hjstate->hj_InnerHashKey = innerhashkey;
|
||||||
outerbatchNames = (RelativeAddr *)
|
outerbatches = (File *) palloc(nbatch * sizeof(File));
|
||||||
ABSADDR(hashtable->outerbatchNames);
|
|
||||||
outerbatches = (File *)
|
|
||||||
palloc(nbatch * sizeof(File));
|
|
||||||
for (i = 0; i < nbatch; i++)
|
for (i = 0; i < nbatch; i++)
|
||||||
{
|
{
|
||||||
#ifndef __CYGWIN32__
|
outerbatches[i] = OpenTemporaryFile();
|
||||||
outerbatches[i] = FileNameOpenFile(
|
|
||||||
ABSADDR(outerbatchNames[i]),
|
|
||||||
O_CREAT | O_RDWR, 0600);
|
|
||||||
#else
|
|
||||||
outerbatches[i] = FileNameOpenFile(
|
|
||||||
ABSADDR(outerbatchNames[i]),
|
|
||||||
O_CREAT | O_RDWR | O_BINARY, 0600);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
hjstate->hj_OuterBatches = outerbatches;
|
hjstate->hj_OuterBatches = outerbatches;
|
||||||
|
|
||||||
|
@ -193,7 +181,6 @@ ExecHashJoin(HashJoin *node)
|
||||||
}
|
}
|
||||||
outerbatchPos = (RelativeAddr *) ABSADDR(hashtable->outerbatchPos);
|
outerbatchPos = (RelativeAddr *) ABSADDR(hashtable->outerbatchPos);
|
||||||
curbatch = hashtable->curbatch;
|
curbatch = hashtable->curbatch;
|
||||||
outerbatchNames = (RelativeAddr *) ABSADDR(hashtable->outerbatchNames);
|
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
* Now get an outer tuple and probe into the hash table for matches
|
* Now get an outer tuple and probe into the hash table for matches
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
*
|
*
|
||||||
* Copyright (c) 1994, Regents of the University of California
|
* Copyright (c) 1994, Regents of the University of California
|
||||||
*
|
*
|
||||||
* $Id: psort.c,v 1.50 1999/02/13 23:20:15 momjian Exp $
|
* $Id: psort.c,v 1.51 1999/05/09 00:53:22 tgl Exp $
|
||||||
*
|
*
|
||||||
* NOTES
|
* NOTES
|
||||||
* Sorts the first relation into the second relation.
|
* Sorts the first relation into the second relation.
|
||||||
|
@ -55,20 +55,17 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
static bool createfirstrun(Sort *node);
|
static bool createfirstrun(Sort *node);
|
||||||
static bool createrun(Sort *node, FILE *file);
|
static bool createrun(Sort *node, BufFile *file);
|
||||||
static void destroytape(FILE *file);
|
static void destroytape(BufFile *file);
|
||||||
static void dumptuples(FILE *file, Sort *node);
|
static void dumptuples(BufFile *file, Sort *node);
|
||||||
static FILE *gettape(void);
|
static BufFile *gettape(void);
|
||||||
static void initialrun(Sort *node);
|
static void initialrun(Sort *node);
|
||||||
static void inittapes(Sort *node);
|
static void inittapes(Sort *node);
|
||||||
static void merge(Sort *node, struct tape * dest);
|
static void merge(Sort *node, struct tape * dest);
|
||||||
static FILE *mergeruns(Sort *node);
|
static BufFile *mergeruns(Sort *node);
|
||||||
static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
|
static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define TEMPDIR "./"
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* tlenzero used to delimit runs; both vars below must have
|
* tlenzero used to delimit runs; both vars below must have
|
||||||
* the same size as HeapTuple->t_len
|
* the same size as HeapTuple->t_len
|
||||||
|
@ -76,8 +73,9 @@ static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
|
||||||
static unsigned int tlenzero = 0;
|
static unsigned int tlenzero = 0;
|
||||||
static unsigned int tlendummy;
|
static unsigned int tlendummy;
|
||||||
|
|
||||||
|
/* these are used by _psort_cmp, and are set just before calling qsort() */
|
||||||
static TupleDesc PsortTupDesc;
|
static TupleDesc PsortTupDesc;
|
||||||
static ScanKey PsortKeys; /* used by _psort_cmp */
|
static ScanKey PsortKeys;
|
||||||
static int PsortNkeys;
|
static int PsortNkeys;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -100,7 +98,7 @@ static int PsortNkeys;
|
||||||
*
|
*
|
||||||
* struct leftist *Tuples; current tuples in memory
|
* struct leftist *Tuples; current tuples in memory
|
||||||
*
|
*
|
||||||
* FILE *psort_grab_file; this holds tuples grabbed
|
* BufFile *psort_grab_file; this holds tuples grabbed
|
||||||
* from merged sort runs
|
* from merged sort runs
|
||||||
* long psort_current; current file position
|
* long psort_current; current file position
|
||||||
* long psort_saved; file position saved for
|
* long psort_saved; file position saved for
|
||||||
|
@ -221,24 +219,27 @@ inittapes(Sort *node)
|
||||||
( \
|
( \
|
||||||
(TUP)->t_len += HEAPTUPLESIZE, \
|
(TUP)->t_len += HEAPTUPLESIZE, \
|
||||||
((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \
|
((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \
|
||||||
fwrite((char *)TUP, (TUP)->t_len, 1, FP), \
|
BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \
|
||||||
fwrite((char *)&((TUP)->t_len), sizeof (tlendummy), 1, FP), \
|
BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \
|
||||||
(TUP)->t_len -= HEAPTUPLESIZE \
|
(TUP)->t_len -= HEAPTUPLESIZE \
|
||||||
)
|
)
|
||||||
|
|
||||||
#define ENDRUN(FP) fwrite((char *)&tlenzero, sizeof (tlenzero), 1, FP)
|
#define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
|
||||||
#define GETLEN(LEN, FP) fread((char *)&(LEN), sizeof (tlenzero), 1, FP)
|
#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
|
||||||
#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
|
#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
|
||||||
|
#define FREE(x) pfree((char *) x)
|
||||||
#define GETTUP(NODE, TUP, LEN, FP) \
|
#define GETTUP(NODE, TUP, LEN, FP) \
|
||||||
( \
|
( \
|
||||||
IncrProcessed(), \
|
IncrProcessed(), \
|
||||||
((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof (tlenzero), \
|
((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \
|
||||||
fread((char *)(TUP) + sizeof (tlenzero), (LEN) - sizeof (tlenzero), 1, FP), \
|
BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \
|
||||||
(TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \
|
(TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \
|
||||||
fread((char *)&tlendummy, sizeof (tlendummy), 1, FP) \
|
BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \
|
||||||
)
|
)
|
||||||
|
|
||||||
#define SETTUPLEN(TUP, LEN) (TUP)->t_len = LEN - HEAPTUPLESIZE
|
#define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
|
||||||
|
|
||||||
|
#define rewind(FP) BufFileSeek(FP, 0L, SEEK_SET)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* USEMEM - record use of memory FREEMEM - record
|
* USEMEM - record use of memory FREEMEM - record
|
||||||
|
@ -256,7 +257,7 @@ inittapes(Sort *node)
|
||||||
* - (replacement selection(R2-R3)--Knuth, Vol.3, p.257)
|
* - (replacement selection(R2-R3)--Knuth, Vol.3, p.257)
|
||||||
* - (polyphase merge Alg.D(D2-D4)--Knuth, Vol.3, p.271)
|
* - (polyphase merge Alg.D(D2-D4)--Knuth, Vol.3, p.271)
|
||||||
*
|
*
|
||||||
* Explaination:
|
* Explanation:
|
||||||
* Tuples are distributed to the tapes as in Algorithm D.
|
* Tuples are distributed to the tapes as in Algorithm D.
|
||||||
* A "tuple" with t_size == 0 is used to mark the end of a run.
|
* A "tuple" with t_size == 0 is used to mark the end of a run.
|
||||||
*
|
*
|
||||||
|
@ -390,7 +391,7 @@ createfirstrun(Sort *node)
|
||||||
Assert(PS(node)->memtuples == NULL);
|
Assert(PS(node)->memtuples == NULL);
|
||||||
Assert(PS(node)->tupcount == 0);
|
Assert(PS(node)->tupcount == 0);
|
||||||
if (LACKMEM(node))
|
if (LACKMEM(node))
|
||||||
elog(FATAL, "psort: LACKMEM in createfirstrun");
|
elog(ERROR, "psort: LACKMEM in createfirstrun");
|
||||||
|
|
||||||
memtuples = palloc(t_free * sizeof(HeapTuple));
|
memtuples = palloc(t_free * sizeof(HeapTuple));
|
||||||
|
|
||||||
|
@ -478,7 +479,7 @@ createfirstrun(Sort *node)
|
||||||
* Tuples contains the tuples for the following run upon exit
|
* Tuples contains the tuples for the following run upon exit
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
createrun(Sort *node, FILE *file)
|
createrun(Sort *node, BufFile *file)
|
||||||
{
|
{
|
||||||
HeapTuple lasttuple;
|
HeapTuple lasttuple;
|
||||||
HeapTuple tup;
|
HeapTuple tup;
|
||||||
|
@ -593,7 +594,7 @@ createrun(Sort *node, FILE *file)
|
||||||
* Returns:
|
* Returns:
|
||||||
* file of tuples in order
|
* file of tuples in order
|
||||||
*/
|
*/
|
||||||
static FILE *
|
static BufFile *
|
||||||
mergeruns(Sort *node)
|
mergeruns(Sort *node)
|
||||||
{
|
{
|
||||||
struct tape *tp;
|
struct tape *tp;
|
||||||
|
@ -609,7 +610,6 @@ mergeruns(Sort *node)
|
||||||
{
|
{
|
||||||
tp = tp->tp_prev;
|
tp = tp->tp_prev;
|
||||||
rewind(tp->tp_file);
|
rewind(tp->tp_file);
|
||||||
/* resettape(tp->tp_file); -not sufficient */
|
|
||||||
merge(node, tp);
|
merge(node, tp);
|
||||||
rewind(tp->tp_file);
|
rewind(tp->tp_file);
|
||||||
}
|
}
|
||||||
|
@ -627,7 +627,7 @@ merge(Sort *node, struct tape * dest)
|
||||||
struct tape *lasttp; /* (TAPE[P]) */
|
struct tape *lasttp; /* (TAPE[P]) */
|
||||||
struct tape *tp;
|
struct tape *tp;
|
||||||
struct leftist *tuples;
|
struct leftist *tuples;
|
||||||
FILE *destfile;
|
BufFile *destfile;
|
||||||
int times; /* runs left to merge */
|
int times; /* runs left to merge */
|
||||||
int outdummy; /* complete dummy runs */
|
int outdummy; /* complete dummy runs */
|
||||||
short fromtape;
|
short fromtape;
|
||||||
|
@ -729,7 +729,7 @@ merge(Sort *node, struct tape * dest)
|
||||||
* dumptuples - stores all the tuples in tree into file
|
* dumptuples - stores all the tuples in tree into file
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
dumptuples(FILE *file, Sort *node)
|
dumptuples(BufFile *file, Sort *node)
|
||||||
{
|
{
|
||||||
struct leftist *tp;
|
struct leftist *tp;
|
||||||
struct leftist *newp;
|
struct leftist *newp;
|
||||||
|
@ -811,28 +811,30 @@ psort_grabtuple(Sort *node, bool *should_free)
|
||||||
* psort_current is pointing to the zero tuplen at the end of
|
* psort_current is pointing to the zero tuplen at the end of
|
||||||
* file
|
* file
|
||||||
*/
|
*/
|
||||||
fseek(PS(node)->psort_grab_file,
|
BufFileSeek(PS(node)->psort_grab_file,
|
||||||
PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
|
PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
|
||||||
GETLEN(tuplen, PS(node)->psort_grab_file);
|
GETLEN(tuplen, PS(node)->psort_grab_file);
|
||||||
if (PS(node)->psort_current < tuplen)
|
if (PS(node)->psort_current < tuplen)
|
||||||
elog(FATAL, "psort_grabtuple: too big last tuple len in backward scan");
|
elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
|
||||||
PS(node)->all_fetched = false;
|
PS(node)->all_fetched = false;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* move to position of end tlen of prev tuple */
|
/* move to position of end tlen of prev tuple */
|
||||||
PS(node)->psort_current -= sizeof(tlendummy);
|
PS(node)->psort_current -= sizeof(tlendummy);
|
||||||
fseek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET);
|
BufFileSeek(PS(node)->psort_grab_file,
|
||||||
|
PS(node)->psort_current, SEEK_SET);
|
||||||
GETLEN(tuplen, PS(node)->psort_grab_file); /* get tlen of prev
|
GETLEN(tuplen, PS(node)->psort_grab_file); /* get tlen of prev
|
||||||
* tuple */
|
* tuple */
|
||||||
if (tuplen == 0)
|
if (tuplen == 0)
|
||||||
elog(FATAL, "psort_grabtuple: tuplen is 0 in backward scan");
|
elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
|
||||||
if (PS(node)->psort_current <= tuplen + sizeof(tlendummy))
|
if (PS(node)->psort_current <= tuplen + sizeof(tlendummy))
|
||||||
{ /* prev tuple should be first one */
|
{ /* prev tuple should be first one */
|
||||||
if (PS(node)->psort_current != tuplen)
|
if (PS(node)->psort_current != tuplen)
|
||||||
elog(FATAL, "psort_grabtuple: first tuple expected in backward scan");
|
elog(ERROR, "psort_grabtuple: first tuple expected in backward scan");
|
||||||
PS(node)->psort_current = 0;
|
PS(node)->psort_current = 0;
|
||||||
fseek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET);
|
BufFileSeek(PS(node)->psort_grab_file,
|
||||||
|
PS(node)->psort_current, SEEK_SET);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -842,18 +844,18 @@ psort_grabtuple(Sort *node, bool *should_free)
|
||||||
*/
|
*/
|
||||||
PS(node)->psort_current -= tuplen;
|
PS(node)->psort_current -= tuplen;
|
||||||
/* move to position of end tlen of prev tuple */
|
/* move to position of end tlen of prev tuple */
|
||||||
fseek(PS(node)->psort_grab_file,
|
BufFileSeek(PS(node)->psort_grab_file,
|
||||||
PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
|
PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
|
||||||
GETLEN(tuplen, PS(node)->psort_grab_file);
|
GETLEN(tuplen, PS(node)->psort_grab_file);
|
||||||
if (PS(node)->psort_current < tuplen + sizeof(tlendummy))
|
if (PS(node)->psort_current < tuplen + sizeof(tlendummy))
|
||||||
elog(FATAL, "psort_grabtuple: too big tuple len in backward scan");
|
elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* move to prev (or last) tuple start position + sizeof(t_len)
|
* move to prev (or last) tuple start position + sizeof(t_len)
|
||||||
*/
|
*/
|
||||||
fseek(PS(node)->psort_grab_file,
|
BufFileSeek(PS(node)->psort_grab_file,
|
||||||
PS(node)->psort_current - tuplen, SEEK_SET);
|
PS(node)->psort_current - tuplen, SEEK_SET);
|
||||||
tup = ALLOCTUP(tuplen);
|
tup = ALLOCTUP(tuplen);
|
||||||
SETTUPLEN(tup, tuplen);
|
SETTUPLEN(tup, tuplen);
|
||||||
GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
|
GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
|
||||||
|
@ -915,7 +917,8 @@ psort_restorepos(Sort *node)
|
||||||
Assert(PS(node) != (Psortstate *) NULL);
|
Assert(PS(node) != (Psortstate *) NULL);
|
||||||
|
|
||||||
if (PS(node)->using_tape_files == true)
|
if (PS(node)->using_tape_files == true)
|
||||||
fseek(PS(node)->psort_grab_file, PS(node)->psort_saved, SEEK_SET);
|
BufFileSeek(PS(node)->psort_grab_file,
|
||||||
|
PS(node)->psort_saved, SEEK_SET);
|
||||||
PS(node)->psort_current = PS(node)->psort_saved;
|
PS(node)->psort_current = PS(node)->psort_saved;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -977,141 +980,34 @@ psort_rescan(Sort *node)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* gettape - handles access temporary files in polyphase merging
|
|
||||||
*
|
|
||||||
* Optimizations:
|
|
||||||
* If guarenteed that only one sort running/process,
|
|
||||||
* can simplify the file generation--and need not store the
|
|
||||||
* name for later unlink.
|
|
||||||
*/
|
|
||||||
|
|
||||||
struct tapelst
|
|
||||||
{
|
|
||||||
char *tl_name;
|
|
||||||
int tl_fd;
|
|
||||||
struct tapelst *tl_next;
|
|
||||||
};
|
|
||||||
|
|
||||||
static struct tapelst *Tapes = NULL;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* gettape - returns an open stream for writing/reading
|
* gettape - returns an open stream for writing/reading
|
||||||
*
|
*
|
||||||
* Returns:
|
* Returns:
|
||||||
* Open stream for writing/reading.
|
* Open stream for writing/reading.
|
||||||
* NULL if unable to open temporary file.
|
* NULL if unable to open temporary file.
|
||||||
|
*
|
||||||
|
* There used to be a lot of cruft here to try to ensure that we destroyed
|
||||||
|
* all the tape files; but it didn't really work. Now we rely on fd.c to
|
||||||
|
* clean up temp files if an error occurs.
|
||||||
*/
|
*/
|
||||||
static FILE *
|
static BufFile *
|
||||||
gettape()
|
gettape()
|
||||||
{
|
{
|
||||||
struct tapelst *tp;
|
File tfile;
|
||||||
FILE *file;
|
|
||||||
static int tapeinit = 0;
|
|
||||||
char *mktemp();
|
|
||||||
static unsigned int uniqueFileId = 0;
|
|
||||||
extern int errno;
|
|
||||||
char uniqueName[MAXPGPATH];
|
|
||||||
|
|
||||||
tp = (struct tapelst *) palloc((unsigned) sizeof(struct tapelst));
|
tfile = OpenTemporaryFile();
|
||||||
|
Assert(tfile >= 0);
|
||||||
snprintf(uniqueName, MAXPGPATH - 1, "%spg_psort.%d.%u",
|
return BufFileCreate(tfile);
|
||||||
TEMPDIR, (int) MyProcPid, uniqueFileId++);
|
|
||||||
|
|
||||||
tapeinit = 1;
|
|
||||||
|
|
||||||
tp->tl_name = palloc((unsigned) sizeof(uniqueName));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* now, copy template with final null into palloc'd space
|
|
||||||
*/
|
|
||||||
|
|
||||||
StrNCpy(tp->tl_name, uniqueName, MAXPGPATH);
|
|
||||||
|
|
||||||
#ifndef __CYGWIN32__
|
|
||||||
file = AllocateFile(tp->tl_name, "w+");
|
|
||||||
#else
|
|
||||||
file = AllocateFile(tp->tl_name, "w+b");
|
|
||||||
#endif
|
|
||||||
if (file == NULL)
|
|
||||||
elog(ERROR, "Open: %s in %s line %d, %s", tp->tl_name,
|
|
||||||
__FILE__, __LINE__, strerror(errno));
|
|
||||||
|
|
||||||
tp->tl_fd = fileno(file);
|
|
||||||
tp->tl_next = Tapes;
|
|
||||||
Tapes = tp;
|
|
||||||
return file;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* resettape - resets the tape to size 0
|
* destroytape - unlinks the tape
|
||||||
*/
|
|
||||||
#ifdef NOT_USED
|
|
||||||
static void
|
|
||||||
resettape(FILE *file)
|
|
||||||
{
|
|
||||||
struct tapelst *tp;
|
|
||||||
int fd;
|
|
||||||
|
|
||||||
Assert(PointerIsValid(file));
|
|
||||||
|
|
||||||
fd = fileno(file);
|
|
||||||
for (tp = Tapes; tp != NULL && tp->tl_fd != fd; tp = tp->tl_next)
|
|
||||||
;
|
|
||||||
if (tp == NULL)
|
|
||||||
elog(ERROR, "resettape: tape not found");
|
|
||||||
|
|
||||||
file = freopen(tp->tl_name, "w+", file);
|
|
||||||
if (file == NULL)
|
|
||||||
elog(FATAL, "could not freopen temporary file");
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/*
|
|
||||||
* distroytape - unlinks the tape
|
|
||||||
*
|
|
||||||
* Efficiency note:
|
|
||||||
* More efficient to destroy more recently allocated tapes first.
|
|
||||||
*
|
|
||||||
* Possible bugs:
|
|
||||||
* Exits instead of returning status, if given invalid tape.
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
destroytape(FILE *file)
|
destroytape(BufFile *file)
|
||||||
{
|
{
|
||||||
struct tapelst *tp,
|
BufFileClose(file);
|
||||||
*tq;
|
|
||||||
int fd;
|
|
||||||
|
|
||||||
if ((tp = Tapes) == NULL)
|
|
||||||
elog(FATAL, "destroytape: tape not found");
|
|
||||||
|
|
||||||
if ((fd = fileno(file)) == tp->tl_fd)
|
|
||||||
{
|
|
||||||
Tapes = tp->tl_next;
|
|
||||||
FreeFile(file);
|
|
||||||
unlink(tp->tl_name);
|
|
||||||
FREE(tp->tl_name);
|
|
||||||
FREE(tp);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
if (tp->tl_next == NULL)
|
|
||||||
elog(FATAL, "destroytape: tape not found");
|
|
||||||
if (tp->tl_next->tl_fd == fd)
|
|
||||||
{
|
|
||||||
FreeFile(file);
|
|
||||||
tq = tp->tl_next;
|
|
||||||
tp->tl_next = tq->tl_next;
|
|
||||||
unlink(tq->tl_name);
|
|
||||||
FREE((tq->tl_name));
|
|
||||||
FREE(tq);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
tp = tp->tl_next;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
*
|
*
|
||||||
* Copyright (c) 1994, Regents of the University of California
|
* Copyright (c) 1994, Regents of the University of California
|
||||||
*
|
*
|
||||||
* $Id: hashjoin.h,v 1.9 1999/05/06 00:30:45 tgl Exp $
|
* $Id: hashjoin.h,v 1.10 1999/05/09 00:53:18 tgl Exp $
|
||||||
*
|
*
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
@ -54,9 +54,7 @@ typedef struct HashTableData
|
||||||
RelativeAddr batch; /* char* */
|
RelativeAddr batch; /* char* */
|
||||||
RelativeAddr readbuf; /* char* */
|
RelativeAddr readbuf; /* char* */
|
||||||
int nbatch;
|
int nbatch;
|
||||||
RelativeAddr outerbatchNames; /* RelativeAddr* */
|
|
||||||
RelativeAddr outerbatchPos; /* RelativeAddr* */
|
RelativeAddr outerbatchPos; /* RelativeAddr* */
|
||||||
RelativeAddr innerbatchNames; /* RelativeAddr* */
|
|
||||||
RelativeAddr innerbatchPos; /* RelativeAddr* */
|
RelativeAddr innerbatchPos; /* RelativeAddr* */
|
||||||
RelativeAddr innerbatchSizes; /* int* */
|
RelativeAddr innerbatchSizes; /* int* */
|
||||||
int curbatch;
|
int curbatch;
|
||||||
|
|
|
@ -6,27 +6,25 @@
|
||||||
*
|
*
|
||||||
* Copyright (c) 1994, Regents of the University of California
|
* Copyright (c) 1994, Regents of the University of California
|
||||||
*
|
*
|
||||||
* $Id: psort.h,v 1.17 1999/02/13 23:22:29 momjian Exp $
|
* $Id: psort.h,v 1.18 1999/05/09 00:53:18 tgl Exp $
|
||||||
*
|
*
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
#ifndef PSORT_H
|
#ifndef PSORT_H
|
||||||
#define PSORT_H
|
#define PSORT_H
|
||||||
|
|
||||||
#include <stdio.h>
|
#include "storage/fd.h"
|
||||||
#include "access/relscan.h"
|
#include "access/relscan.h"
|
||||||
#include "utils/lselect.h"
|
#include "utils/lselect.h"
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
|
|
||||||
#define MAXTAPES 7 /* 7--See Fig. 70, p273 */
|
#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */
|
||||||
#define TAPEEXTLEN strlen("pg_psort.xxxxx.xxx") /* TEMPDIR/TAPEEXT */
|
|
||||||
#define FREE(x) pfree((char *) x)
|
|
||||||
|
|
||||||
struct tape
|
struct tape
|
||||||
{
|
{
|
||||||
int tp_dummy; /* (D) */
|
int tp_dummy; /* (D) */
|
||||||
int tp_fib; /* (A) */
|
int tp_fib; /* (A) */
|
||||||
FILE *tp_file; /* (TAPE) */
|
BufFile *tp_file; /* (TAPE) */
|
||||||
struct tape *tp_prev;
|
struct tape *tp_prev;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -58,7 +56,7 @@ typedef struct Psortstate
|
||||||
|
|
||||||
struct leftist *Tuples;
|
struct leftist *Tuples;
|
||||||
|
|
||||||
FILE *psort_grab_file;
|
BufFile *psort_grab_file;
|
||||||
long psort_current; /* could be file offset, or array index */
|
long psort_current; /* could be file offset, or array index */
|
||||||
long psort_saved; /* could be file offset, or array index */
|
long psort_saved; /* could be file offset, or array index */
|
||||||
bool using_tape_files;
|
bool using_tape_files;
|
||||||
|
@ -68,7 +66,6 @@ typedef struct Psortstate
|
||||||
} Psortstate;
|
} Psortstate;
|
||||||
|
|
||||||
#ifdef EBUG
|
#ifdef EBUG
|
||||||
#include <stdio.h>
|
|
||||||
#include "utils/elog.h"
|
#include "utils/elog.h"
|
||||||
#include "storage/buf.h"
|
#include "storage/buf.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
|
|
Loading…
Reference in New Issue