diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 1381a4a4f0..afddf4d392 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/executor/execAmi.c,v 1.97 2008/08/05 21:28:29 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/execAmi.c,v 1.98 2008/10/01 19:51:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -239,10 +239,6 @@ ExecMarkPos(PlanState *node) ExecTidMarkPos((TidScanState *) node); break; - case T_FunctionScanState: - ExecFunctionMarkPos((FunctionScanState *) node); - break; - case T_ValuesScanState: ExecValuesMarkPos((ValuesScanState *) node); break; @@ -296,10 +292,6 @@ ExecRestrPos(PlanState *node) ExecTidRestrPos((TidScanState *) node); break; - case T_FunctionScanState: - ExecFunctionRestrPos((FunctionScanState *) node); - break; - case T_ValuesScanState: ExecValuesRestrPos((ValuesScanState *) node); break; @@ -332,7 +324,7 @@ ExecRestrPos(PlanState *node) * (However, since the only present use of mark/restore is in mergejoin, * there is no need to support mark/restore in any plan type that is not * capable of generating ordered output. So the seqscan, tidscan, - * functionscan, and valuesscan support is actually useless code at present.) + * and valuesscan support is actually useless code at present.) */ bool ExecSupportsMarkRestore(NodeTag plantype) @@ -342,7 +334,6 @@ ExecSupportsMarkRestore(NodeTag plantype) case T_SeqScan: case T_IndexScan: case T_TidScan: - case T_FunctionScan: case T_ValuesScan: case T_Material: case T_Sort: diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c index 6bbb5b139b..6113a2c906 100644 --- a/src/backend/executor/nodeFunctionscan.c +++ b/src/backend/executor/nodeFunctionscan.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/nodeFunctionscan.c,v 1.46 2008/02/29 02:49:39 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeFunctionscan.c,v 1.47 2008/10/01 19:51:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -131,6 +131,9 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags) TypeFuncClass functypclass; TupleDesc tupdesc = NULL; + /* check for unsupported flags */ + Assert(!(eflags & EXEC_FLAG_MARK)); + /* * FunctionScan should not have any children. */ @@ -273,42 +276,6 @@ ExecEndFunctionScan(FunctionScanState *node) node->tuplestorestate = NULL; } -/* ---------------------------------------------------------------- - * ExecFunctionMarkPos - * - * Calls tuplestore to save the current position in the stored file. - * ---------------------------------------------------------------- - */ -void -ExecFunctionMarkPos(FunctionScanState *node) -{ - /* - * if we haven't materialized yet, just return. - */ - if (!node->tuplestorestate) - return; - - tuplestore_markpos(node->tuplestorestate); -} - -/* ---------------------------------------------------------------- - * ExecFunctionRestrPos - * - * Calls tuplestore to restore the last saved file position. - * ---------------------------------------------------------------- - */ -void -ExecFunctionRestrPos(FunctionScanState *node) -{ - /* - * if we haven't materialized yet, just return. - */ - if (!node->tuplestorestate) - return; - - tuplestore_restorepos(node->tuplestorestate); -} - /* ---------------------------------------------------------------- * ExecFunctionReScan * diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c index 3c096356a3..494560d0f4 100644 --- a/src/backend/executor/nodeMaterial.c +++ b/src/backend/executor/nodeMaterial.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/nodeMaterial.c,v 1.62 2008/03/23 00:54:04 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeMaterial.c,v 1.63 2008/10/01 19:51:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -51,7 +51,7 @@ ExecMaterial(MaterialState *node) estate = node->ss.ps.state; dir = estate->es_direction; forward = ScanDirectionIsForward(dir); - tuplestorestate = (Tuplestorestate *) node->tuplestorestate; + tuplestorestate = node->tuplestorestate; /* * If first time through, and we need a tuplestore, initialize it. @@ -60,7 +60,19 @@ ExecMaterial(MaterialState *node) { tuplestorestate = tuplestore_begin_heap(true, false, work_mem); tuplestore_set_eflags(tuplestorestate, node->eflags); - node->tuplestorestate = (void *) tuplestorestate; + if (node->eflags & EXEC_FLAG_MARK) + { + /* + * Allocate a second read pointer to serve as the mark. + * We know it must have index 1, so needn't store that. + */ + int ptrn; + + ptrn = tuplestore_alloc_read_pointer(tuplestorestate, + node->eflags); + Assert(ptrn == 1); + } + node->tuplestorestate = tuplestorestate; } /* @@ -236,7 +248,7 @@ ExecEndMaterial(MaterialState *node) * Release tuplestore resources */ if (node->tuplestorestate != NULL) - tuplestore_end((Tuplestorestate *) node->tuplestorestate); + tuplestore_end(node->tuplestorestate); node->tuplestorestate = NULL; /* @@ -262,7 +274,10 @@ ExecMaterialMarkPos(MaterialState *node) if (!node->tuplestorestate) return; - tuplestore_markpos((Tuplestorestate *) node->tuplestorestate); + /* + * copy the active read pointer to the mark. + */ + tuplestore_copy_read_pointer(node->tuplestorestate, 0, 1); } /* ---------------------------------------------------------------- @@ -283,9 +298,9 @@ ExecMaterialRestrPos(MaterialState *node) return; /* - * restore the scan to the previously marked position + * copy the mark to the active read pointer. */ - tuplestore_restorepos((Tuplestorestate *) node->tuplestorestate); + tuplestore_copy_read_pointer(node->tuplestorestate, 1, 0); } /* ---------------------------------------------------------------- @@ -322,14 +337,14 @@ ExecMaterialReScan(MaterialState *node, ExprContext *exprCtxt) if (((PlanState *) node)->lefttree->chgParam != NULL || (node->eflags & EXEC_FLAG_REWIND) == 0) { - tuplestore_end((Tuplestorestate *) node->tuplestorestate); + tuplestore_end(node->tuplestorestate); node->tuplestorestate = NULL; if (((PlanState *) node)->lefttree->chgParam == NULL) ExecReScan(((PlanState *) node)->lefttree, exprCtxt); node->eof_underlying = false; } else - tuplestore_rescan((Tuplestorestate *) node->tuplestorestate); + tuplestore_rescan(node->tuplestorestate); } else { diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c index 6d28a0eba5..3b53ad28a5 100644 --- a/src/backend/utils/sort/tuplestore.c +++ b/src/backend/utils/sort/tuplestore.c @@ -11,6 +11,8 @@ * before it has all been written. This is particularly useful for cursors, * because it allows random access within the already-scanned portion of * a query without having to process the underlying scan to completion. + * Also, it is possible to support multiple independent read pointers. + * * A temporary file is used to handle the data if it exceeds the * space limit specified by the caller. * @@ -20,25 +22,31 @@ * maxKBytes, we dump all the tuples into a temp file and then read from that * when needed. * + * Upon creation, a tuplestore supports a single read pointer, numbered 0. + * Additional read pointers can be created using tuplestore_alloc_read_pointer. + * Mark/restore behavior is supported by copying read pointers. + * * When the caller requests backward-scan capability, we write the temp file * in a format that allows either forward or backward scan. Otherwise, only - * forward scan is allowed. Rewind and markpos/restorepos are normally allowed - * but can be turned off via tuplestore_set_eflags; turning off both backward - * scan and rewind enables truncation of the tuplestore at the mark point - * (if any) for minimal memory usage. + * forward scan is allowed. A request for backward scan must be made before + * putting any tuples into the tuplestore. Rewind is normally allowed but + * can be turned off via tuplestore_set_eflags; turning off both backward + * scan and rewind for all read pointers enables truncation of the tuplestore + * at the oldest read point for minimal memory usage. * - * Because we allow reading before writing is complete, there are two - * interesting positions in the temp file: the current read position and - * the current write position. At any given instant, the temp file's seek - * position corresponds to one of these, and the other one is remembered in - * the Tuplestore's state. + * Note: in TSS_WRITEFILE state, the temp file's seek position is the + * current write position, and the write-position variables in the tuplestore + * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's + * seek position is the active read pointer's position, and that read pointer + * isn't kept up to date. We update the appropriate variables using ftell() + * before switching to the other state or activating a different read pointer. * * * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.39 2008/05/12 00:00:53 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.40 2008/10/01 19:51:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -63,13 +71,35 @@ typedef enum TSS_READFILE /* Reading from temp file */ } TupStoreStatus; +/* + * State for a single read pointer. If we are in state INMEM then all the + * read pointers' "current" fields denote the read positions. In state + * WRITEFILE, the file/offset fields denote the read positions. In state + * READFILE, inactive read pointers have valid file/offset, but the active + * read pointer implicitly has position equal to the temp file's seek position. + * + * Special case: if eof_reached is true, then the pointer's read position is + * implicitly equal to the write position, and current/file/offset aren't + * maintained. This way we need not update all the read pointers each time + * we write. + */ +typedef struct +{ + int eflags; /* capability flags */ + bool eof_reached; /* read reached EOF */ + int current; /* next array index to read */ + int file; /* temp file# */ + off_t offset; /* byte offset in file */ +} TSReadPointer; + /* * Private state of a Tuplestore operation. */ struct Tuplestorestate { TupStoreStatus status; /* enumerated value as shown above */ - int eflags; /* capability flags */ + int eflags; /* capability flags (OR of pointers' flags) */ + bool backward; /* store extra length words in file? */ bool interXact; /* keep open through transactions? */ long availMem; /* remaining memory available, in bytes */ BufFile *myfile; /* underlying file, or NULL if none */ @@ -116,31 +146,20 @@ struct Tuplestorestate int memtupsize; /* allocated length of memtuples array */ /* - * These variables are used to keep track of the current position. + * These variables are used to keep track of the current positions. * - * In state WRITEFILE, the current file seek position is the write point, - * and the read position is remembered in readpos_xxx; in state READFILE, - * the current file seek position is the read point, and the write - * position is remembered in writepos_xxx. (The write position is the - * same as EOF, but since BufFileSeek doesn't currently implement - * SEEK_END, we have to remember it explicitly.) - * - * Special case: if we are in WRITEFILE state and eof_reached is true, - * then the read position is implicitly equal to the write position (and - * hence to the file seek position); this way we need not update the - * readpos_xxx variables on each write. + * In state WRITEFILE, the current file seek position is the write point; + * in state READFILE, the write position is remembered in writepos_xxx. + * (The write position is the same as EOF, but since BufFileSeek doesn't + * currently implement SEEK_END, we have to remember it explicitly.) */ - bool eof_reached; /* read reached EOF (always valid) */ - int current; /* next array index (valid if INMEM) */ - int readpos_file; /* file# (valid if WRITEFILE and not eof) */ - off_t readpos_offset; /* offset (valid if WRITEFILE and not eof) */ - int writepos_file; /* file# (valid if READFILE) */ - off_t writepos_offset; /* offset (valid if READFILE) */ + TSReadPointer *readptrs; /* array of read pointers */ + int activeptr; /* index of the active read pointer */ + int readptrcount; /* number of pointers currently valid */ + int readptrsize; /* allocated length of readptrs array */ - /* markpos_xxx holds marked position for mark and restore */ - int markpos_current; /* saved "current" */ - int markpos_file; /* saved "readpos_file" */ - off_t markpos_offset; /* saved "readpos_offset" */ + int writepos_file; /* file# (valid if READFILE state) */ + off_t writepos_offset; /* offset (valid if READFILE state) */ }; #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup)) @@ -160,11 +179,11 @@ struct Tuplestorestate * may or may not match the in-memory representation of the tuple --- * any conversion needed is the job of the writetup and readtup routines. * - * If state->eflags & EXEC_FLAG_BACKWARD, then the stored representation of + * If state->backward is true, then the stored representation of * the tuple must be followed by another "unsigned int" that is a copy of the * length --- so the total tape space used is actually sizeof(unsigned int) * more than the stored length value. This allows read-backwards. When - * EXEC_FLAG_BACKWARD is not set, the write/read routines may omit the extra + * state->backward is not set, the write/read routines may omit the extra * length word. * * writetup is expected to write both length words as well as the tuple @@ -184,6 +203,7 @@ struct Tuplestorestate * We count space allocated for tuples against the maxKBytes limit, * plus the space used by the variable-size array memtuples. * Fixed-size space (primarily the BufFile I/O buffer) is not counted. + * We don't worry about the size of the read pointer array, either. * * Note that we count actual space used (as shown by GetMemoryChunkSpace) * rather than the originally-requested size. This is important since @@ -200,7 +220,7 @@ static Tuplestorestate *tuplestore_begin_common(int eflags, int maxKBytes); static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple); static void dumptuples(Tuplestorestate *state); -static void tuplestore_trim(Tuplestorestate *state, int ntuples); +static void tuplestore_trim(Tuplestorestate *state); static unsigned int getlen(Tuplestorestate *state, bool eofOK); static void *copytup_heap(Tuplestorestate *state, void *tup); static void writetup_heap(Tuplestorestate *state, void *tup); @@ -231,8 +251,15 @@ tuplestore_begin_common(int eflags, bool interXact, int maxKBytes) USEMEM(state, GetMemoryChunkSpace(state->memtuples)); - state->eof_reached = false; - state->current = 0; + state->activeptr = 0; + state->readptrcount = 1; + state->readptrsize = 8; /* arbitrary */ + state->readptrs = (TSReadPointer *) + palloc(state->readptrsize * sizeof(TSReadPointer)); + + state->readptrs[0].eflags = eflags; + state->readptrs[0].eof_reached = false; + state->readptrs[0].current = 0; return state; } @@ -267,8 +294,8 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) * the pre-8.3 behavior of tuplestores. */ eflags = randomAccess ? - (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND | EXEC_FLAG_MARK) : - (EXEC_FLAG_REWIND | EXEC_FLAG_MARK); + (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : + (EXEC_FLAG_REWIND); state = tuplestore_begin_common(eflags, interXact, maxKBytes); @@ -282,27 +309,70 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) /* * tuplestore_set_eflags * - * Set capability flags at a finer grain than is allowed by - * tuplestore_begin_xxx. This must be called before inserting any data - * into the tuplestore. + * Set the capability flags for read pointer 0 at a finer grain than is + * allowed by tuplestore_begin_xxx. This must be called before inserting + * any data into the tuplestore. * * eflags is a bitmask following the meanings used for executor node * startup flags (see executor.h). tuplestore pays attention to these bits: * EXEC_FLAG_REWIND need rewind to start * EXEC_FLAG_BACKWARD need backward fetch - * EXEC_FLAG_MARK need mark/restore - * If tuplestore_set_eflags is not called, REWIND and MARK are allowed, - * and BACKWARD is set per "randomAccess" in the tuplestore_begin_xxx call. + * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD + * is set per "randomAccess" in the tuplestore_begin_xxx call. */ void tuplestore_set_eflags(Tuplestorestate *state, int eflags) { - Assert(state->status == TSS_INMEM); - Assert(state->memtupcount == 0); + int i; + if (state->status != TSS_INMEM || state->memtupcount != 0) + elog(ERROR, "too late to call tuplestore_set_eflags"); + + state->readptrs[0].eflags = eflags; + for (i = 1; i < state->readptrcount; i++) + eflags |= state->readptrs[i].eflags; state->eflags = eflags; } +/* + * tuplestore_alloc_read_pointer - allocate another read pointer. + * + * Returns the pointer's index. + * + * The new pointer initially copies the position of read pointer 0. + * It can have its own eflags, but if any data has been inserted into + * the tuplestore, these eflags must not represent an increase in + * requirements. + */ +int +tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags) +{ + /* Check for possible increase of requirements */ + if (state->status != TSS_INMEM || state->memtupcount != 0) + { + if ((state->eflags | eflags) != state->eflags) + elog(ERROR, "too late to require new tuplestore eflags"); + } + + /* Make room for another read pointer if needed */ + if (state->readptrcount >= state->readptrsize) + { + int newcnt = state->readptrsize * 2; + + state->readptrs = (TSReadPointer *) + repalloc(state->readptrs, newcnt * sizeof(TSReadPointer)); + state->readptrsize = newcnt; + } + + /* And set it up */ + state->readptrs[state->readptrcount] = state->readptrs[0]; + state->readptrs[state->readptrcount].eflags = eflags; + + state->eflags |= eflags; + + return state->readptrcount++; +} + /* * tuplestore_end * @@ -321,18 +391,71 @@ tuplestore_end(Tuplestorestate *state) pfree(state->memtuples[i]); pfree(state->memtuples); } + pfree(state->readptrs); pfree(state); } +/* + * tuplestore_select_read_pointer - make the specified read pointer active + */ +void +tuplestore_select_read_pointer(Tuplestorestate *state, int ptr) +{ + TSReadPointer *readptr = &state->readptrs[ptr]; + + Assert(ptr >= 0 && ptr < state->readptrcount); + + /* No work if already active */ + if (ptr == state->activeptr) + return; + + switch (state->status) + { + case TSS_INMEM: + case TSS_WRITEFILE: + /* no work */ + break; + case TSS_READFILE: + /* + * We have to make the temp file's seek position equal to the + * logical position of the read pointer. In eof_reached state, + * that's the EOF, which we have available from the saved + * write position. + */ + if (readptr->eof_reached) + { + if (BufFileSeek(state->myfile, + state->writepos_file, + state->writepos_offset, + SEEK_SET) != 0) + elog(ERROR, "tuplestore seek failed"); + } + else + { + if (BufFileSeek(state->myfile, + readptr->file, + readptr->offset, + SEEK_SET) != 0) + elog(ERROR, "tuplestore seek failed"); + } + break; + default: + elog(ERROR, "invalid tuplestore state"); + break; + } + + state->activeptr = ptr; +} + /* * tuplestore_ateof * - * Returns the current eof_reached state. + * Returns the active read pointer's eof_reached state. */ bool tuplestore_ateof(Tuplestorestate *state) { - return state->eof_reached; + return state->readptrs[state->activeptr].eof_reached; } /* @@ -340,8 +463,8 @@ tuplestore_ateof(Tuplestorestate *state) * * Note that the input tuple is always copied; the caller need not save it. * - * If the read status is currently "AT EOF" then it remains so (the read - * pointer advances along with the write pointer); otherwise the read + * Any read pointer that is currently "AT EOF" remains so (the read pointer + * implicitly advances along with the write pointer); otherwise the read * pointer is unchanged. This is for the convenience of nodeMaterial.c. * * tuplestore_puttupleslot() is a convenience routine to collect data from @@ -427,10 +550,6 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) /* Stash the tuple in the in-memory array */ state->memtuples[state->memtupcount++] = tuple; - /* If eof_reached, keep read position in sync */ - if (state->eof_reached) - state->current = state->memtupcount; - /* * Done if we still fit in available memory and have array slots. */ @@ -443,6 +562,12 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) */ PrepareTempTablespaces(); state->myfile = BufFileCreateTemp(state->interXact); + /* + * Freeze the decision about whether trailing length words + * will be used. We can't change this choice once data is on + * tape, even though callers might drop the requirement. + */ + state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0; state->status = TSS_WRITEFILE; dumptuples(state); break; @@ -454,13 +579,14 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) /* * Switch from reading to writing. */ - if (!state->eof_reached) + if (!state->readptrs[state->activeptr].eof_reached) BufFileTell(state->myfile, - &state->readpos_file, &state->readpos_offset); + &state->readptrs[state->activeptr].file, + &state->readptrs[state->activeptr].offset); if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0) - elog(ERROR, "seek to EOF failed"); + elog(ERROR, "tuplestore seek to EOF failed"); state->status = TSS_WRITEFILE; WRITETUP(state, tuple); break; @@ -482,10 +608,11 @@ static void * tuplestore_gettuple(Tuplestorestate *state, bool forward, bool *should_free) { + TSReadPointer *readptr = &state->readptrs[state->activeptr]; unsigned int tuplen; void *tup; - Assert(forward || (state->eflags & EXEC_FLAG_BACKWARD)); + Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); switch (state->status) { @@ -493,35 +620,47 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward, *should_free = false; if (forward) { - if (state->current < state->memtupcount) - return state->memtuples[state->current++]; - state->eof_reached = true; + if (readptr->eof_reached) + return NULL; + if (readptr->current < state->memtupcount) + { + /* + * We have another tuple, so return it. Note: in + * principle we could try tuplestore_trim() here after + * advancing current, but this would cost cycles with + * little chance of success, so we don't bother. + */ + return state->memtuples[readptr->current++]; + } + readptr->eof_reached = true; return NULL; } else { - if (state->current <= 0) - return NULL; - /* * if all tuples are fetched already then we return last * tuple, else - tuple before last returned. */ - if (state->eof_reached) - state->eof_reached = false; + if (readptr->eof_reached) + { + readptr->current = state->memtupcount; + readptr->eof_reached = false; + } else { - state->current--; /* last returned tuple */ - if (state->current <= 0) + if (readptr->current <= 0) return NULL; + readptr->current--; /* last returned tuple */ } - return state->memtuples[state->current - 1]; + if (readptr->current <= 0) + return NULL; + return state->memtuples[readptr->current - 1]; } break; case TSS_WRITEFILE: /* Skip state change if we'll just return NULL */ - if (state->eof_reached && forward) + if (readptr->eof_reached && forward) return NULL; /* @@ -529,11 +668,11 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward, */ BufFileTell(state->myfile, &state->writepos_file, &state->writepos_offset); - if (!state->eof_reached) + if (!readptr->eof_reached) if (BufFileSeek(state->myfile, - state->readpos_file, state->readpos_offset, + readptr->file, readptr->offset, SEEK_SET) != 0) - elog(ERROR, "seek failed"); + elog(ERROR, "tuplestore seek failed"); state->status = TSS_READFILE; /* FALL THRU into READFILE case */ @@ -548,7 +687,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward, } else { - state->eof_reached = true; + readptr->eof_reached = true; return NULL; } } @@ -564,12 +703,16 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward, */ if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int), SEEK_CUR) != 0) + { + /* even a failed backwards fetch gets you out of eof state */ + readptr->eof_reached = false; return NULL; + } tuplen = getlen(state, false); - if (state->eof_reached) + if (readptr->eof_reached) { - state->eof_reached = false; + readptr->eof_reached = false; /* We will return the tuple returned before returning NULL */ } else @@ -670,9 +813,9 @@ tuplestore_advance(Tuplestorestate *state, bool forward) /* * dumptuples - remove tuples from memory and write to tape * - * As a side effect, we must set readpos and markpos to the value - * corresponding to "current"; otherwise, a dump would lose the current read - * position. + * As a side effect, we must convert each read pointer's position from + * "current" to file/offset format. But eof_reached pointers don't + * need to change state. */ static void dumptuples(Tuplestorestate *state) @@ -681,12 +824,15 @@ dumptuples(Tuplestorestate *state) for (i = 0;; i++) { - if (i == state->current) - BufFileTell(state->myfile, - &state->readpos_file, &state->readpos_offset); - if (i == state->markpos_current) - BufFileTell(state->myfile, - &state->markpos_file, &state->markpos_offset); + TSReadPointer *readptr = state->readptrs; + int j; + + for (j = 0; j < state->readptrcount; readptr++, j++) + { + if (i == readptr->current && !readptr->eof_reached) + BufFileTell(state->myfile, + &readptr->file, &readptr->offset); + } if (i >= state->memtupcount) break; WRITETUP(state, state->memtuples[i]); @@ -695,28 +841,30 @@ dumptuples(Tuplestorestate *state) } /* - * tuplestore_rescan - rewind and replay the scan + * tuplestore_rescan - rewind the active read pointer to start */ void tuplestore_rescan(Tuplestorestate *state) { - Assert(state->eflags & EXEC_FLAG_REWIND); + TSReadPointer *readptr = &state->readptrs[state->activeptr]; + + Assert(readptr->eflags & EXEC_FLAG_REWIND); switch (state->status) { case TSS_INMEM: - state->eof_reached = false; - state->current = 0; + readptr->eof_reached = false; + readptr->current = 0; break; case TSS_WRITEFILE: - state->eof_reached = false; - state->readpos_file = 0; - state->readpos_offset = 0L; + readptr->eof_reached = false; + readptr->file = 0; + readptr->offset = 0L; break; case TSS_READFILE: - state->eof_reached = false; + readptr->eof_reached = false; if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0) - elog(ERROR, "seek to start failed"); + elog(ERROR, "tuplestore seek to start failed"); break; default: elog(ERROR, "invalid tuplestore state"); @@ -725,52 +873,79 @@ tuplestore_rescan(Tuplestorestate *state) } /* - * tuplestore_markpos - saves current position in the tuple sequence + * tuplestore_copy_read_pointer - copy a read pointer's state to another */ void -tuplestore_markpos(Tuplestorestate *state) +tuplestore_copy_read_pointer(Tuplestorestate *state, + int srcptr, int destptr) { - Assert(state->eflags & EXEC_FLAG_MARK); + TSReadPointer *sptr = &state->readptrs[srcptr]; + TSReadPointer *dptr = &state->readptrs[destptr]; + + Assert(srcptr >= 0 && srcptr < state->readptrcount); + Assert(destptr >= 0 && destptr < state->readptrcount); + + /* Assigning to self is a no-op */ + if (srcptr == destptr) + return; + + if (dptr->eflags != sptr->eflags) + { + /* Possible change of overall eflags, so copy and then recompute */ + int eflags; + int i; + + *dptr = *sptr; + eflags = state->readptrs[0].eflags; + for (i = 1; i < state->readptrcount; i++) + eflags |= state->readptrs[i].eflags; + state->eflags = eflags; + } + else + *dptr = *sptr; switch (state->status) { case TSS_INMEM: - state->markpos_current = state->current; - + /* We might be able to truncate the tuplestore */ + tuplestore_trim(state); + break; + case TSS_WRITEFILE: + break; + case TSS_READFILE: /* - * We can truncate the tuplestore if neither backward scan nor - * rewind capability are required by the caller. There will never - * be a need to back up past the mark point. - * - * Note: you might think we could remove all the tuples before - * "current", since that one is the next to be returned. However, - * since tuplestore_gettuple returns a direct pointer to our - * internal copy of the tuple, it's likely that the caller has - * still got the tuple just before "current" referenced in a slot. - * Don't free it yet. + * This case is a bit tricky since the active read pointer's + * position corresponds to the seek point, not what is in its + * variables. Assigning to the active requires a seek, and + * assigning from the active requires a tell, except when + * eof_reached. */ - if (!(state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND))) - tuplestore_trim(state, 1); - break; - case TSS_WRITEFILE: - if (state->eof_reached) + if (destptr == state->activeptr) { - /* Need to record the implicit read position */ - BufFileTell(state->myfile, - &state->markpos_file, - &state->markpos_offset); + if (dptr->eof_reached) + { + if (BufFileSeek(state->myfile, + state->writepos_file, + state->writepos_offset, + SEEK_SET) != 0) + elog(ERROR, "tuplestore seek failed"); + } + else + { + if (BufFileSeek(state->myfile, + dptr->file, dptr->offset, + SEEK_SET) != 0) + elog(ERROR, "tuplestore seek failed"); + } } - else + else if (srcptr == state->activeptr) { - state->markpos_file = state->readpos_file; - state->markpos_offset = state->readpos_offset; + if (!dptr->eof_reached) + BufFileTell(state->myfile, + &dptr->file, + &dptr->offset); } break; - case TSS_READFILE: - BufFileTell(state->myfile, - &state->markpos_file, - &state->markpos_offset); - break; default: elog(ERROR, "invalid tuplestore state"); break; @@ -778,48 +953,22 @@ tuplestore_markpos(Tuplestorestate *state) } /* - * tuplestore_restorepos - restores current position in tuple sequence to - * last saved position - */ -void -tuplestore_restorepos(Tuplestorestate *state) -{ - Assert(state->eflags & EXEC_FLAG_MARK); - - switch (state->status) - { - case TSS_INMEM: - state->eof_reached = false; - state->current = state->markpos_current; - break; - case TSS_WRITEFILE: - state->eof_reached = false; - state->readpos_file = state->markpos_file; - state->readpos_offset = state->markpos_offset; - break; - case TSS_READFILE: - state->eof_reached = false; - if (BufFileSeek(state->myfile, - state->markpos_file, - state->markpos_offset, - SEEK_SET) != 0) - elog(ERROR, "tuplestore_restorepos failed"); - break; - default: - elog(ERROR, "invalid tuplestore state"); - break; - } -} - -/* - * tuplestore_trim - remove all but ntuples tuples before current + * tuplestore_trim - remove all no-longer-needed tuples */ static void -tuplestore_trim(Tuplestorestate *state, int ntuples) +tuplestore_trim(Tuplestorestate *state) { + int oldest; int nremove; int i; + /* + * We can truncate the tuplestore if neither backward scan nor + * rewind capability are required by any read pointer. + */ + if (state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND)) + return; + /* * We don't bother trimming temp files since it usually would mean more * work than just letting them sit in kernel buffers until they age out. @@ -827,7 +976,23 @@ tuplestore_trim(Tuplestorestate *state, int ntuples) if (state->status != TSS_INMEM) return; - nremove = state->current - ntuples; + /* Find the oldest read pointer */ + oldest = state->memtupcount; + for (i = 0; i < state->readptrcount; i++) + { + if (!state->readptrs[i].eof_reached) + oldest = Min(oldest, state->readptrs[i].current); + } + + /* + * Note: you might think we could remove all the tuples before the oldest + * "current", since that one is the next to be returned. However, + * since tuplestore_gettuple returns a direct pointer to our + * internal copy of the tuple, it's likely that the caller has + * still got the tuple just before "current" referenced in a slot. + * So we keep one extra tuple before the oldest "current". + */ + nremove = oldest - 1; if (nremove <= 0) return; /* nothing to do */ Assert(nremove <= state->memtupcount); @@ -856,8 +1021,11 @@ tuplestore_trim(Tuplestorestate *state, int ntuples) (state->memtupcount - nremove) * sizeof(void *)); state->memtupcount -= nremove; - state->current -= nremove; - state->markpos_current -= nremove; + for (i = 0; i < state->readptrcount; i++) + { + if (!state->readptrs[i].eof_reached) + state->readptrs[i].current -= nremove; + } } @@ -910,7 +1078,7 @@ writetup_heap(Tuplestorestate *state, void *tup) if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen) elog(ERROR, "write failed"); - if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */ + if (state->backward) /* need trailing length word? */ if (BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen)) != sizeof(tuplen)) elog(ERROR, "write failed"); @@ -931,7 +1099,7 @@ readtup_heap(Tuplestorestate *state, unsigned int len) if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)), len - sizeof(int)) != (size_t) (len - sizeof(int))) elog(ERROR, "unexpected end of data"); - if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */ + if (state->backward) /* need trailing length word? */ if (BufFileRead(state->myfile, (void *) &tuplen, sizeof(tuplen)) != sizeof(tuplen)) elog(ERROR, "unexpected end of data"); diff --git a/src/include/executor/nodeFunctionscan.h b/src/include/executor/nodeFunctionscan.h index dd499a73c6..d83e9a4f86 100644 --- a/src/include/executor/nodeFunctionscan.h +++ b/src/include/executor/nodeFunctionscan.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/executor/nodeFunctionscan.h,v 1.11 2008/01/01 19:45:57 momjian Exp $ + * $PostgreSQL: pgsql/src/include/executor/nodeFunctionscan.h,v 1.12 2008/10/01 19:51:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -20,8 +20,6 @@ extern int ExecCountSlotsFunctionScan(FunctionScan *node); extern FunctionScanState *ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags); extern TupleTableSlot *ExecFunctionScan(FunctionScanState *node); extern void ExecEndFunctionScan(FunctionScanState *node); -extern void ExecFunctionMarkPos(FunctionScanState *node); -extern void ExecFunctionRestrPos(FunctionScanState *node); extern void ExecFunctionReScan(FunctionScanState *node, ExprContext *exprCtxt); #endif /* NODEFUNCTIONSCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index ee8c4a2bef..eb187ce459 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.187 2008/08/22 00:16:04 tgl Exp $ + * $PostgreSQL: pgsql/src/include/nodes/execnodes.h,v 1.188 2008/10/01 19:51:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1324,7 +1324,7 @@ typedef struct MaterialState ScanState ss; /* its first field is NodeTag */ int eflags; /* capability flags to pass to tuplestore */ bool eof_underlying; /* reached end of underlying plan? */ - void *tuplestorestate; /* private state of tuplestore.c */ + Tuplestorestate *tuplestorestate; } MaterialState; /* ---------------- diff --git a/src/include/utils/tuplestore.h b/src/include/utils/tuplestore.h index 37f99fea3b..3fe32f682b 100644 --- a/src/include/utils/tuplestore.h +++ b/src/include/utils/tuplestore.h @@ -11,6 +11,8 @@ * before it has all been written. This is particularly useful for cursors, * because it allows random access within the already-scanned portion of * a query without having to process the underlying scan to completion. + * Also, it is possible to support multiple independent read pointers. + * * A temporary file is used to handle the data if it exceeds the * space limit specified by the caller. * @@ -22,7 +24,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/utils/tuplestore.h,v 1.23 2008/03/25 19:26:53 neilc Exp $ + * $PostgreSQL: pgsql/src/include/utils/tuplestore.h,v 1.24 2008/10/01 19:51:50 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -57,16 +59,21 @@ extern void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, /* tuplestore_donestoring() used to be required, but is no longer used */ #define tuplestore_donestoring(state) ((void) 0) +extern int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags); + +extern void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr); + +extern void tuplestore_copy_read_pointer(Tuplestorestate *state, + int srcptr, int destptr); + extern bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, TupleTableSlot *slot); extern bool tuplestore_advance(Tuplestorestate *state, bool forward); -extern void tuplestore_end(Tuplestorestate *state); - extern bool tuplestore_ateof(Tuplestorestate *state); extern void tuplestore_rescan(Tuplestorestate *state); -extern void tuplestore_markpos(Tuplestorestate *state); -extern void tuplestore_restorepos(Tuplestorestate *state); + +extern void tuplestore_end(Tuplestorestate *state); #endif /* TUPLESTORE_H */