READ COMMITTED isolevel is implemented and is default now.

This commit is contained in:
Vadim B. Mikheev 1999-01-29 09:23:17 +00:00
parent 3e2f87f3f3
commit e3a1ab764e
12 changed files with 499 additions and 128 deletions

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.39 1998/12/15 12:45:13 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.40 1999/01/29 09:22:51 vadim Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
@ -1373,6 +1373,7 @@ l3:
if (result != HeapTupleMayBeUpdated) if (result != HeapTupleMayBeUpdated)
{ {
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
tuple->t_self = tuple->t_data->t_ctid;
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return result; return result;
} }

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.32 1998/12/15 12:45:20 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.33 1999/01/29 09:22:52 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -134,6 +134,7 @@ l1:
* If this tuple is being updated by other transaction * If this tuple is being updated by other transaction
* then we have to wait for its commit/abort. * then we have to wait for its commit/abort.
*/ */
ReleaseBuffer(buffer);
if (TransactionIdIsValid(xwait)) if (TransactionIdIsValid(xwait))
{ {
if (nbuf != InvalidBuffer) if (nbuf != InvalidBuffer)

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.28 1998/12/18 09:10:18 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.29 1999/01/29 09:22:53 vadim Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
@ -194,7 +194,7 @@ TransactionStateData CurrentTransactionStateData = {
TransactionState CurrentTransactionState = TransactionState CurrentTransactionState =
&CurrentTransactionStateData; &CurrentTransactionStateData;
int DefaultXactIsoLevel = XACT_SERIALIZABLE; int DefaultXactIsoLevel = XACT_READ_COMMITTED;
int XactIsoLevel; int XactIsoLevel;
/* ---------------- /* ----------------

View File

@ -28,6 +28,7 @@
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "executor/executor.h"
#ifndef NO_SECURITY #ifndef NO_SECURITY
#include "miscadmin.h" #include "miscadmin.h"
@ -790,6 +791,8 @@ ExecARUpdateTriggers(EState *estate, ItemPointer tupleid, HeapTuple newtuple)
return; return;
} }
extern TupleTableSlot *EvalPlanQual(EState *estate, Index rti, ItemPointer tid);
static HeapTuple static HeapTuple
GetTupleForTrigger(EState *estate, ItemPointer tid, bool before) GetTupleForTrigger(EState *estate, ItemPointer tid, bool before)
{ {
@ -806,6 +809,7 @@ GetTupleForTrigger(EState *estate, ItemPointer tid, bool before)
* mark tuple for update * mark tuple for update
*/ */
tuple.t_self = *tid; tuple.t_self = *tid;
ltrmark:;
test = heap_mark4update(relation, &tuple, &buffer); test = heap_mark4update(relation, &tuple, &buffer);
switch (test) switch (test)
{ {
@ -820,8 +824,23 @@ GetTupleForTrigger(EState *estate, ItemPointer tid, bool before)
ReleaseBuffer(buffer); ReleaseBuffer(buffer);
if (XactIsoLevel == XACT_SERIALIZABLE) if (XactIsoLevel == XACT_SERIALIZABLE)
elog(ERROR, "Can't serialize access due to concurrent update"); elog(ERROR, "Can't serialize access due to concurrent update");
else else if (!(ItemPointerEquals(&(tuple.t_self), tid)))
elog(ERROR, "Isolation level %u is not supported", XactIsoLevel); {
TupleTableSlot *slot = EvalPlanQual(estate,
estate->es_result_relation_info->ri_RangeTableIndex,
&(tuple.t_self));
if (!(TupIsNull(slot)))
{
*tid = tuple.t_self;
goto ltrmark;
}
}
/*
* if tuple was deleted or PlanQual failed
* for updated tuple - we have not process
* this tuple!
*/
return(NULL); return(NULL);
default: default:

View File

@ -26,7 +26,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.65 1999/01/27 16:48:20 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.66 1999/01/29 09:22:57 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -64,10 +64,9 @@ static TupleDesc InitPlan(CmdType operation, Query *parseTree,
Plan *plan, EState *estate); Plan *plan, EState *estate);
static void EndPlan(Plan *plan, EState *estate); static void EndPlan(Plan *plan, EState *estate);
static TupleTableSlot *ExecutePlan(EState *estate, Plan *plan, static TupleTableSlot *ExecutePlan(EState *estate, Plan *plan,
Query *parseTree, CmdType operation, CmdType operation, int numberTuples, ScanDirection direction,
int numberTuples, ScanDirection direction, void (*printfunc) ());
DestReceiver *destfunc); static void ExecRetrieve(TupleTableSlot *slot,
static void ExecRetrieve(TupleTableSlot *slot,
DestReceiver *destfunc, DestReceiver *destfunc,
EState *estate); EState *estate);
static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid, static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid,
@ -75,7 +74,11 @@ static void ExecAppend(TupleTableSlot *slot, ItemPointer tupleid,
static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid, static void ExecDelete(TupleTableSlot *slot, ItemPointer tupleid,
EState *estate); EState *estate);
static void ExecReplace(TupleTableSlot *slot, ItemPointer tupleid, static void ExecReplace(TupleTableSlot *slot, ItemPointer tupleid,
EState *estate, Query *parseTree); EState *estate);
TupleTableSlot *EvalPlanQual(EState *estate, Index rti, ItemPointer tid);
static TupleTableSlot *EvalPlanQualNext(EState *estate);
/* end of local decls */ /* end of local decls */
@ -168,11 +171,10 @@ TupleTableSlot *
ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count) ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
{ {
CmdType operation; CmdType operation;
Query *parseTree;
Plan *plan; Plan *plan;
TupleTableSlot *result; TupleTableSlot *result;
CommandDest dest; CommandDest dest;
DestReceiver *destfunc; void (*destination) ();
/****************** /******************
* sanity checks * sanity checks
@ -186,42 +188,30 @@ ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
****************** ******************
*/ */
operation = queryDesc->operation; operation = queryDesc->operation;
parseTree = queryDesc->parsetree;
plan = queryDesc->plantree; plan = queryDesc->plantree;
dest = queryDesc->dest; dest = queryDesc->dest;
destfunc = DestToFunction(dest); destination = (void (*) ()) DestToFunction(dest);
estate->es_processed = 0; estate->es_processed = 0;
estate->es_lastoid = InvalidOid; estate->es_lastoid = InvalidOid;
/******************
* FIXME: the dest setup function ought to be handed the tuple desc
* for the tuples to be output, but I'm not quite sure how to get that
* info at this point. For now, passing NULL is OK because no existing
* dest setup function actually uses the pointer.
******************
*/
(*destfunc->setup) (destfunc, (TupleDesc) NULL);
switch (feature) switch (feature)
{ {
case EXEC_RUN: case EXEC_RUN:
result = ExecutePlan(estate, result = ExecutePlan(estate,
plan, plan,
parseTree,
operation, operation,
ALL_TUPLES, ALL_TUPLES,
ForwardScanDirection, ForwardScanDirection,
destfunc); destination);
break; break;
case EXEC_FOR: case EXEC_FOR:
result = ExecutePlan(estate, result = ExecutePlan(estate,
plan, plan,
parseTree,
operation, operation,
count, count,
ForwardScanDirection, ForwardScanDirection,
destfunc); destination);
break; break;
/****************** /******************
@ -231,11 +221,10 @@ ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
case EXEC_BACK: case EXEC_BACK:
result = ExecutePlan(estate, result = ExecutePlan(estate,
plan, plan,
parseTree,
operation, operation,
count, count,
BackwardScanDirection, BackwardScanDirection,
destfunc); destination);
break; break;
/****************** /******************
@ -246,11 +235,10 @@ ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
case EXEC_RETONE: case EXEC_RETONE:
result = ExecutePlan(estate, result = ExecutePlan(estate,
plan, plan,
parseTree,
operation, operation,
ONE_TUPLE, ONE_TUPLE,
ForwardScanDirection, ForwardScanDirection,
destfunc); destination);
break; break;
default: default:
result = NULL; result = NULL;
@ -258,8 +246,6 @@ ExecutorRun(QueryDesc *queryDesc, EState *estate, int feature, int count)
break; break;
} }
(*destfunc->cleanup) (destfunc);
return result; return result;
} }
@ -413,9 +399,18 @@ ExecCheckPerms(CmdType operation,
typedef struct execRowMark typedef struct execRowMark
{ {
Relation relation; Relation relation;
Index rti;
char resname[32]; char resname[32];
} execRowMark; } execRowMark;
typedef struct evalPlanQual
{
Plan *plan;
Index rti;
EState estate;
struct evalPlanQual *free;
} evalPlanQual;
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* InitPlan * InitPlan
* *
@ -426,13 +421,12 @@ typedef struct execRowMark
static TupleDesc static TupleDesc
InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate) InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
{ {
List *rangeTable; List *rangeTable;
int resultRelation; int resultRelation;
Relation intoRelationDesc; Relation intoRelationDesc;
TupleDesc tupType;
TupleDesc tupType; List *targetList;
List *targetList; int len;
int len;
/****************** /******************
* get information from query descriptor * get information from query descriptor
@ -537,6 +531,7 @@ InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
continue; continue;
erm = (execRowMark*) palloc(sizeof(execRowMark)); erm = (execRowMark*) palloc(sizeof(execRowMark));
erm->relation = relation; erm->relation = relation;
erm->rti = rm->rti;
sprintf(erm->resname, "ctid%u", rm->rti); sprintf(erm->resname, "ctid%u", rm->rti);
estate->es_rowMark = lappend(estate->es_rowMark, erm); estate->es_rowMark = lappend(estate->es_rowMark, erm);
} }
@ -669,6 +664,11 @@ InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
estate->es_into_relation_descriptor = intoRelationDesc; estate->es_into_relation_descriptor = intoRelationDesc;
estate->es_origPlan = plan;
estate->es_evalPlanQual = NULL;
estate->es_evTuple = NULL;
estate->es_useEvalPlan = false;
return tupType; return tupType;
} }
@ -753,11 +753,10 @@ EndPlan(Plan *plan, EState *estate)
static TupleTableSlot * static TupleTableSlot *
ExecutePlan(EState *estate, ExecutePlan(EState *estate,
Plan *plan, Plan *plan,
Query *parseTree,
CmdType operation, CmdType operation,
int numberTuples, int numberTuples,
ScanDirection direction, ScanDirection direction,
DestReceiver* destfunc) void (*printfunc) ())
{ {
JunkFilter *junkfilter; JunkFilter *junkfilter;
@ -794,7 +793,15 @@ ExecutePlan(EState *estate,
****************** ******************
*/ */
/* at the top level, the parent of a plan (2nd arg) is itself */ /* at the top level, the parent of a plan (2nd arg) is itself */
slot = ExecProcNode(plan, plan); lnext:;
if (estate->es_useEvalPlan)
{
slot = EvalPlanQualNext(estate);
if (TupIsNull(slot))
slot = ExecProcNode(plan, plan);
}
else
slot = ExecProcNode(plan, plan);
/****************** /******************
* if the tuple is null, then we assume * if the tuple is null, then we assume
@ -821,8 +828,6 @@ ExecutePlan(EState *estate,
if ((junkfilter = estate->es_junkFilter) != (JunkFilter *) NULL) if ((junkfilter = estate->es_junkFilter) != (JunkFilter *) NULL)
{ {
Datum datum; Datum datum;
/* NameData attrName; */
HeapTuple newTuple; HeapTuple newTuple;
bool isNull; bool isNull;
@ -853,8 +858,10 @@ ExecutePlan(EState *estate,
execRowMark *erm; execRowMark *erm;
Buffer buffer; Buffer buffer;
HeapTupleData tuple; HeapTupleData tuple;
TupleTableSlot *newSlot;
int test; int test;
lmark:;
foreach (l, estate->es_rowMark) foreach (l, estate->es_rowMark)
{ {
erm = lfirst(l); erm = lfirst(l);
@ -879,10 +886,27 @@ ExecutePlan(EState *estate,
case HeapTupleUpdated: case HeapTupleUpdated:
if (XactIsoLevel == XACT_SERIALIZABLE) if (XactIsoLevel == XACT_SERIALIZABLE)
{
elog(ERROR, "Can't serialize access due to concurrent update"); elog(ERROR, "Can't serialize access due to concurrent update");
else return(NULL);
elog(ERROR, "Isolation level %u is not supported", XactIsoLevel); }
return(NULL); else if (!(ItemPointerEquals(&(tuple.t_self),
(ItemPointer)DatumGetPointer(datum))))
{
newSlot = EvalPlanQual(estate, erm->rti, &(tuple.t_self));
if (!(TupIsNull(newSlot)))
{
slot = newSlot;
estate->es_useEvalPlan = true;
goto lmark;
}
}
/*
* if tuple was deleted or PlanQual failed
* for updated tuple - we have not return
* this tuple!
*/
goto lnext;
default: default:
elog(ERROR, "Unknown status %u from heap_mark4update", test); elog(ERROR, "Unknown status %u from heap_mark4update", test);
@ -917,7 +941,7 @@ ExecutePlan(EState *estate,
{ {
case CMD_SELECT: case CMD_SELECT:
ExecRetrieve(slot, /* slot containing tuple */ ExecRetrieve(slot, /* slot containing tuple */
destfunc, /* destination's tuple-receiver obj */ printfunc, /* print function */
estate); /* */ estate); /* */
result = slot; result = slot;
break; break;
@ -933,7 +957,7 @@ ExecutePlan(EState *estate,
break; break;
case CMD_UPDATE: case CMD_UPDATE:
ExecReplace(slot, tupleid, estate, parseTree); ExecReplace(slot, tupleid, estate);
result = NULL; result = NULL;
break; break;
@ -973,7 +997,7 @@ ExecutePlan(EState *estate,
*/ */
static void static void
ExecRetrieve(TupleTableSlot *slot, ExecRetrieve(TupleTableSlot *slot,
DestReceiver *destfunc, void (*printfunc) (),
EState *estate) EState *estate)
{ {
HeapTuple tuple; HeapTuple tuple;
@ -1000,7 +1024,7 @@ ExecRetrieve(TupleTableSlot *slot,
* send the tuple to the front end (or the screen) * send the tuple to the front end (or the screen)
****************** ******************
*/ */
(*destfunc->receiveTuple) (tuple, attrtype, destfunc); (*printfunc) (tuple, attrtype);
IncrRetrieved(); IncrRetrieved();
(estate->es_processed)++; (estate->es_processed)++;
} }
@ -1115,7 +1139,8 @@ ExecDelete(TupleTableSlot *slot,
{ {
RelationInfo *resultRelationInfo; RelationInfo *resultRelationInfo;
Relation resultRelationDesc; Relation resultRelationDesc;
ItemPointerData ctid; ItemPointerData ctid,
oldtid;
int result; int result;
/****************** /******************
@ -1140,6 +1165,7 @@ ExecDelete(TupleTableSlot *slot,
/* /*
* delete the tuple * delete the tuple
*/ */
ldelete:;
result = heap_delete(resultRelationDesc, tupleid, &ctid); result = heap_delete(resultRelationDesc, tupleid, &ctid);
switch (result) switch (result)
{ {
@ -1152,8 +1178,18 @@ ExecDelete(TupleTableSlot *slot,
case HeapTupleUpdated: case HeapTupleUpdated:
if (XactIsoLevel == XACT_SERIALIZABLE) if (XactIsoLevel == XACT_SERIALIZABLE)
elog(ERROR, "Can't serialize access due to concurrent update"); elog(ERROR, "Can't serialize access due to concurrent update");
else else if (!(ItemPointerEquals(tupleid, &ctid)))
elog(ERROR, "Isolation level %u is not supported", XactIsoLevel); {
TupleTableSlot *slot = EvalPlanQual(estate,
resultRelationInfo->ri_RangeTableIndex, &ctid);
if (!TupIsNull(slot))
{
tupleid = &oldtid;
*tupleid = ctid;
goto ldelete;
}
}
return; return;
default: default:
@ -1197,13 +1233,13 @@ ExecDelete(TupleTableSlot *slot,
static void static void
ExecReplace(TupleTableSlot *slot, ExecReplace(TupleTableSlot *slot,
ItemPointer tupleid, ItemPointer tupleid,
EState *estate, EState *estate)
Query *parseTree)
{ {
HeapTuple tuple; HeapTuple tuple;
RelationInfo *resultRelationInfo; RelationInfo *resultRelationInfo;
Relation resultRelationDesc; Relation resultRelationDesc;
ItemPointerData ctid; ItemPointerData ctid,
oldtid;
int result; int result;
int numIndices; int numIndices;
@ -1270,6 +1306,7 @@ ExecReplace(TupleTableSlot *slot,
/* /*
* replace the heap tuple * replace the heap tuple
*/ */
lreplace:;
result = heap_replace(resultRelationDesc, tupleid, tuple, &ctid); result = heap_replace(resultRelationDesc, tupleid, tuple, &ctid);
switch (result) switch (result)
{ {
@ -1282,8 +1319,18 @@ ExecReplace(TupleTableSlot *slot,
case HeapTupleUpdated: case HeapTupleUpdated:
if (XactIsoLevel == XACT_SERIALIZABLE) if (XactIsoLevel == XACT_SERIALIZABLE)
elog(ERROR, "Can't serialize access due to concurrent update"); elog(ERROR, "Can't serialize access due to concurrent update");
else else if (!(ItemPointerEquals(tupleid, &ctid)))
elog(ERROR, "Isolation level %u is not supported", XactIsoLevel); {
TupleTableSlot *slot = EvalPlanQual(estate,
resultRelationInfo->ri_RangeTableIndex, &ctid);
if (!TupIsNull(slot))
{
tupleid = &oldtid;
*tupleid = ctid;
goto lreplace;
}
}
return; return;
default: default:
@ -1480,3 +1527,256 @@ ExecConstraints(char *caller, Relation rel, HeapTuple tuple)
return; return;
} }
TupleTableSlot*
EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
{
evalPlanQual *epq = (evalPlanQual*) estate->es_evalPlanQual;
evalPlanQual *oldepq;
EState *epqstate = NULL;
Relation relation;
Buffer buffer;
HeapTupleData tuple;
bool endNode = true;
Assert(rti != 0);
if (epq != NULL && epq->rti == 0)
{
Assert(!(estate->es_useEvalPlan) &&
epq->estate.es_evalPlanQual == NULL);
epq->rti = rti;
endNode = false;
}
/*
* If this is request for another RTE - Ra, - then we have to check
* wasn't PlanQual requested for Ra already and if so then Ra' row
* was updated again and we have to re-start old execution for Ra
* and forget all what we done after Ra was suspended. Cool? -:))
*/
if (epq != NULL && epq->rti != rti &&
epq->estate.es_evTuple[rti - 1] != NULL)
{
do
{
/* pop previous PlanQual from the stack */
epqstate = &(epq->estate);
oldepq = (evalPlanQual*) epqstate->es_evalPlanQual;
Assert(oldepq->rti != 0);
/* stop execution */
ExecEndNode(epq->plan, epq->plan);
pfree(epqstate->es_evTuple[epq->rti - 1]);
epqstate->es_evTuple[epq->rti - 1] = NULL;
/* push current PQ to freePQ stack */
oldepq->free = epq;
epq = oldepq;
} while (epq->rti != rti);
estate->es_evalPlanQual = (Pointer) epq;
}
/*
* If we are requested for another RTE then we have to suspend
* execution of current PlanQual and start execution for new one.
*/
if (epq == NULL || epq->rti != rti)
{
/* try to reuse plan used previously */
evalPlanQual *newepq = (epq != NULL) ? epq->free : NULL;
if (newepq == NULL)
{
newepq = (evalPlanQual*) palloc(sizeof(evalPlanQual));
/* Init EState */
epqstate = &(newepq->estate);
memset(epqstate, 0, sizeof(EState));
epqstate->type = T_EState;
epqstate->es_direction = ForwardScanDirection;
epqstate->es_snapshot = estate->es_snapshot;
epqstate->es_range_table = estate->es_range_table;
epqstate->es_param_list_info = estate->es_param_list_info;
if (estate->es_origPlan->nParamExec > 0)
epqstate->es_param_exec_vals = (ParamExecData *)
palloc(estate->es_origPlan->nParamExec *
sizeof(ParamExecData));
epqstate->es_tupleTable =
ExecCreateTupleTable(estate->es_tupleTable->size);
epqstate->es_refcount = estate->es_refcount;
/* ... rest */
newepq->plan = copyObject(estate->es_origPlan);
newepq->free = NULL;
if (epq == NULL)
{
epqstate->es_evTuple = (HeapTuple*)
palloc(length(estate->es_range_table) * sizeof(HeapTuple));
memset(epqstate->es_evTuple, 0,
length(estate->es_range_table) * sizeof(HeapTuple));
epqstate->es_evTupleNull = (bool*)
palloc(length(estate->es_range_table) * sizeof(bool));
memset(epqstate->es_evTupleNull, false,
length(estate->es_range_table) * sizeof(bool));
}
else
{
epqstate->es_evTuple = epq->estate.es_evTuple;
epqstate->es_evTupleNull = epq->estate.es_evTupleNull;
}
}
else
{
epqstate = &(newepq->estate);
}
/* push current PQ to the stack */
epqstate->es_evalPlanQual = (Pointer) epq;
estate->es_evalPlanQual = (Pointer) epq = newepq;
epq->rti = rti;
endNode = false;
}
epqstate = &(epq->estate);
/*
* Ok - we're requested for the same RTE (-:)).
* I'm not sure about ability to use ExecReScan instead of
* ExecInitNode, so...
*/
if (endNode)
ExecEndNode(epq->plan, epq->plan);
/* free old RTE' tuple */
if (epqstate->es_evTuple[epq->rti - 1] != NULL)
{
pfree(epqstate->es_evTuple[epq->rti - 1]);
epqstate->es_evTuple[epq->rti - 1] = NULL;
}
/* ** fetch tid tuple ** */
if (estate->es_result_relation_info != NULL &&
estate->es_result_relation_info->ri_RangeTableIndex == rti)
relation = estate->es_result_relation_info->ri_RelationDesc;
else
{
List *l;
foreach (l, estate->es_rowMark)
{
if (((execRowMark*) lfirst(l))->rti == rti)
break;
}
relation = ((execRowMark*) lfirst(l))->relation;
}
tuple.t_self = *tid;
for ( ; ; )
{
heap_fetch(relation, SnapshotDirty, &tuple, &buffer);
if (tuple.t_data != NULL)
{
TransactionId xwait = SnapshotDirty->xmax;
if (TransactionIdIsValid(SnapshotDirty->xmin))
elog(ERROR, "EvalPlanQual: t_xmin is uncommitted ?!");
/*
* If tuple is being updated by other transaction then
* we have to wait for its commit/abort.
*/
if (TransactionIdIsValid(xwait))
{
ReleaseBuffer(buffer);
XactLockTableWait(xwait);
continue;
}
/*
* Nice! We got tuple - now copy it.
*/
epqstate->es_evTuple[epq->rti - 1] = heap_copytuple(&tuple);
epqstate->es_evTupleNull[epq->rti - 1] = false;
ReleaseBuffer(buffer);
break;
}
/*
* Ops! Invalid tuple. Have to check is it updated or deleted.
* Note that it's possible to get invalid SnapshotDirty->tid
* if tuple updated by this transaction. Have we to check this ?
*/
if (ItemPointerIsValid(&(SnapshotDirty->tid)) &&
!(ItemPointerEquals(&(tuple.t_self), &(SnapshotDirty->tid))))
{
tuple.t_self = SnapshotDirty->tid; /* updated ... */
continue;
}
/*
* Deleted or updated by this transaction. Do not
* (re-)start execution of this PQ. Continue previous PQ.
*/
oldepq = (evalPlanQual*) epqstate->es_evalPlanQual;
if (oldepq != NULL)
{
Assert(oldepq->rti != 0);
/* push current PQ to freePQ stack */
oldepq->free = epq;
epq = oldepq;
epqstate = &(epq->estate);
estate->es_evalPlanQual = (Pointer) epq;
}
else
{ /* this is the first (oldest) PQ
epq->rti = 0; * - mark as free and
estate->es_useEvalPlan = false; * continue Query execution
return (NULL); */
}
}
if (estate->es_origPlan->nParamExec > 0)
memset(epqstate->es_param_exec_vals, 0,
estate->es_origPlan->nParamExec * sizeof(ParamExecData));
ExecInitNode(epq->plan, epqstate, NULL);
/*
* For UPDATE/DELETE we have to return tid of actual row
* we're executing PQ for.
*/
*tid = tuple.t_self;
return (EvalPlanQualNext(estate));
}
static TupleTableSlot*
EvalPlanQualNext(EState *estate)
{
evalPlanQual *epq = (evalPlanQual*) estate->es_evalPlanQual;
EState *epqstate = &(epq->estate);
evalPlanQual *oldepq;
TupleTableSlot *slot;
Assert(epq->rti != 0);
lpqnext:;
slot = ExecProcNode(epq->plan, epq->plan);
/*
* No more tuples for this PQ. Continue previous one.
*/
if (TupIsNull(slot))
{
ExecEndNode(epq->plan, epq->plan);
pfree(epqstate->es_evTuple[epq->rti - 1]);
epqstate->es_evTuple[epq->rti - 1] = NULL;
/* pop old PQ from the stack */
oldepq = (evalPlanQual*) epqstate->es_evalPlanQual;
if (oldepq == (evalPlanQual*) NULL)
{ /* this is the first (oldest) */
epq->rti = 0; /* PQ - mark as free and */
estate->es_useEvalPlan = false; /* continue Query execution */
return (NULL);
}
Assert(oldepq->rti != 0);
/* push current PQ to freePQ stack */
oldepq->free = epq;
epq = oldepq;
epqstate = &(epq->estate);
estate->es_evalPlanQual = (Pointer) epq;
goto lpqnext;
}
return (slot);
}

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.29 1998/11/27 19:52:03 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.30 1999/01/29 09:22:58 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -109,6 +109,40 @@ IndexNext(IndexScan *node)
heapRelation = scanstate->css_currentRelation; heapRelation = scanstate->css_currentRelation;
numIndices = indexstate->iss_NumIndices; numIndices = indexstate->iss_NumIndices;
slot = scanstate->css_ScanTupleSlot; slot = scanstate->css_ScanTupleSlot;
/*
* Check if we are evaluating PlanQual for tuple of this relation.
* Additional checking is not good, but no other way for now.
* We could introduce new nodes for this case and handle
* IndexScan --> NewNode switching in Init/ReScan plan...
*/
if (estate->es_evTuple != NULL &&
estate->es_evTuple[node->scan.scanrelid - 1] != NULL)
{
int iptr;
slot->ttc_buffer = InvalidBuffer;
slot->ttc_shouldFree = false;
if (estate->es_evTupleNull[node->scan.scanrelid - 1])
{
slot->val = NULL; /* must not free tuple! */
return (slot);
}
slot->val = estate->es_evTuple[node->scan.scanrelid - 1];
for (iptr = 0; iptr < numIndices; iptr++)
{
scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
if (ExecQual(nth(iptr, node->indxqualorig),
scanstate->cstate.cs_ExprContext))
break;
}
if (iptr == numIndices) /* would not be returned by indices */
slot->val = NULL;
/* Flag for the next call that no more tuples */
estate->es_evTupleNull[node->scan.scanrelid - 1] = true;
return (slot);
}
tuple = &(indexstate->iss_htup); tuple = &(indexstate->iss_htup);
/* ---------------- /* ----------------
@ -262,6 +296,14 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
numScanKeys = indexstate->iss_NumScanKeys; numScanKeys = indexstate->iss_NumScanKeys;
indexstate->iss_IndexPtr = 0; indexstate->iss_IndexPtr = 0;
/* If this is re-scanning of PlanQual ... */
if (estate->es_evTuple != NULL &&
estate->es_evTuple[node->scan.scanrelid - 1] != NULL)
{
estate->es_evTupleNull[node->scan.scanrelid - 1] = false;
return;
}
/* it's possible in subselects */ /* it's possible in subselects */
if (exprCtxt == NULL) if (exprCtxt == NULL)
exprCtxt = node->scan.scanstate->cstate.cs_ExprContext; exprCtxt = node->scan.scanstate->cstate.cs_ExprContext;

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeSeqscan.c,v 1.15 1998/09/25 13:38:32 thomas Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/nodeSeqscan.c,v 1.16 1999/01/29 09:22:58 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -64,6 +64,34 @@ SeqNext(SeqScan *node)
scanstate = node->scanstate; scanstate = node->scanstate;
scandesc = scanstate->css_currentScanDesc; scandesc = scanstate->css_currentScanDesc;
direction = estate->es_direction; direction = estate->es_direction;
slot = scanstate->css_ScanTupleSlot;
/*
* Check if we are evaluating PlanQual for tuple of this relation.
* Additional checking is not good, but no other way for now.
* We could introduce new nodes for this case and handle
* SeqScan --> NewNode switching in Init/ReScan plan...
*/
if (estate->es_evTuple != NULL &&
estate->es_evTuple[node->scanrelid - 1] != NULL)
{
slot->ttc_buffer = InvalidBuffer;
slot->ttc_shouldFree = false;
if (estate->es_evTupleNull[node->scanrelid - 1])
{
slot->val = NULL; /* must not free tuple! */
return (slot);
}
slot->val = estate->es_evTuple[node->scanrelid - 1];
/*
* Note that unlike IndexScan, SeqScan never use keys
* in heap_beginscan (and this is very bad) - so, here
* we have not check are keys ok or not.
*/
/* Flag for the next call that no more tuples */
estate->es_evTupleNull[node->scanrelid - 1] = true;
return (slot);
}
/* ---------------- /* ----------------
* get the next tuple from the access methods * get the next tuple from the access methods
@ -79,7 +107,6 @@ SeqNext(SeqScan *node)
* be pfree()'d. * be pfree()'d.
* ---------------- * ----------------
*/ */
slot = scanstate->css_ScanTupleSlot;
slot = ExecStoreTuple(tuple,/* tuple to store */ slot = ExecStoreTuple(tuple,/* tuple to store */
slot, /* slot to store in */ slot, /* slot to store in */
@ -374,9 +401,15 @@ ExecSeqReScan(SeqScan *node, ExprContext *exprCtxt, Plan *parent)
outerPlan = outerPlan((Plan *) node); outerPlan = outerPlan((Plan *) node);
ExecReScan(outerPlan, exprCtxt, parent); ExecReScan(outerPlan, exprCtxt, parent);
} }
else else /* otherwise, we are scanning a relation */
{ {
/* otherwise, we are scanning a relation */ /* If this is re-scanning of PlanQual ... */
if (estate->es_evTuple != NULL &&
estate->es_evTuple[node->scanrelid - 1] != NULL)
{
estate->es_evTupleNull[node->scanrelid - 1] = false;
return;
}
rel = scanstate->css_currentRelation; rel = scanstate->css_currentRelation;
scan = scanstate->css_currentScanDesc; scan = scanstate->css_currentScanDesc;
direction = estate->es_direction; direction = estate->es_direction;

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v 1.55 1999/01/25 18:02:14 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v 1.56 1999/01/29 09:22:59 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -83,7 +83,6 @@ CopyPlanFields(Plan *from, Plan *newnode)
newnode->plan_size = from->plan_size; newnode->plan_size = from->plan_size;
newnode->plan_width = from->plan_width; newnode->plan_width = from->plan_width;
newnode->plan_tupperpage = from->plan_tupperpage; newnode->plan_tupperpage = from->plan_tupperpage;
newnode->state = from->state;
newnode->targetlist = copyObject(from->targetlist); newnode->targetlist = copyObject(from->targetlist);
newnode->qual = copyObject(from->qual); newnode->qual = copyObject(from->qual);
newnode->lefttree = copyObject(from->lefttree); newnode->lefttree = copyObject(from->lefttree);
@ -138,7 +137,6 @@ _copyResult(Result *from)
* ---------------- * ----------------
*/ */
Node_Copy(from, newnode, resconstantqual); Node_Copy(from, newnode, resconstantqual);
Node_Copy(from, newnode, resstate);
return newnode; return newnode;
} }
@ -166,7 +164,6 @@ _copyAppend(Append *from)
Node_Copy(from, newnode, unionrtables); Node_Copy(from, newnode, unionrtables);
newnode->inheritrelid = from->inheritrelid; newnode->inheritrelid = from->inheritrelid;
Node_Copy(from, newnode, inheritrtable); Node_Copy(from, newnode, inheritrtable);
Node_Copy(from, newnode, appendstate);
return newnode; return newnode;
} }
@ -183,7 +180,6 @@ static void
CopyScanFields(Scan *from, Scan *newnode) CopyScanFields(Scan *from, Scan *newnode)
{ {
newnode->scanrelid = from->scanrelid; newnode->scanrelid = from->scanrelid;
Node_Copy(from, newnode, scanstate);
return; return;
} }
@ -248,7 +244,6 @@ _copyIndexScan(IndexScan *from)
newnode->indxid = listCopy(from->indxid); newnode->indxid = listCopy(from->indxid);
Node_Copy(from, newnode, indxqual); Node_Copy(from, newnode, indxqual);
Node_Copy(from, newnode, indxqualorig); Node_Copy(from, newnode, indxqualorig);
Node_Copy(from, newnode, indxstate);
return newnode; return newnode;
} }
@ -304,12 +299,6 @@ _copyNestLoop(NestLoop *from)
CopyPlanFields((Plan *) from, (Plan *) newnode); CopyPlanFields((Plan *) from, (Plan *) newnode);
CopyJoinFields((Join *) from, (Join *) newnode); CopyJoinFields((Join *) from, (Join *) newnode);
/* ----------------
* copy remainder of node
* ----------------
*/
Node_Copy(from, newnode, nlstate);
return newnode; return newnode;
} }
@ -346,8 +335,6 @@ _copyMergeJoin(MergeJoin *from)
newnode->mergeleftorder[0] = from->mergeleftorder[0]; newnode->mergeleftorder[0] = from->mergeleftorder[0];
newnode->mergeleftorder[1] = 0; newnode->mergeleftorder[1] = 0;
Node_Copy(from, newnode, mergestate);
return newnode; return newnode;
} }
@ -375,12 +362,9 @@ _copyHashJoin(HashJoin *from)
newnode->hashjoinop = from->hashjoinop; newnode->hashjoinop = from->hashjoinop;
Node_Copy(from, newnode, hashjoinstate); /* both are unused !.. */
newnode->hashjointable = from->hashjointable;
newnode->hashjointablekey = from->hashjointablekey; newnode->hashjointablekey = from->hashjointablekey;
newnode->hashjointablesize = from->hashjointablesize; newnode->hashjointablesize = from->hashjointablesize;
newnode->hashdone = from->hashdone;
return newnode; return newnode;
} }
@ -437,12 +421,6 @@ _copyMaterial(Material *from)
CopyPlanFields((Plan *) from, (Plan *) newnode); CopyPlanFields((Plan *) from, (Plan *) newnode);
CopyTempFields((Temp *) from, (Temp *) newnode); CopyTempFields((Temp *) from, (Temp *) newnode);
/* ----------------
* copy remainder of node
* ----------------
*/
Node_Copy(from, newnode, matstate);
return newnode; return newnode;
} }
@ -463,14 +441,6 @@ _copySort(Sort *from)
CopyPlanFields((Plan *) from, (Plan *) newnode); CopyPlanFields((Plan *) from, (Plan *) newnode);
CopyTempFields((Temp *) from, (Temp *) newnode); CopyTempFields((Temp *) from, (Temp *) newnode);
/* ----------------
* copy remainder of node
* ----------------
*/
Node_Copy(from, newnode, sortstate);
Node_Copy(from, newnode, psortstate);
newnode->cleaned = from->cleaned;
return newnode; return newnode;
} }
@ -490,7 +460,6 @@ _copyGroup(Group *from)
newnode->numCols = from->numCols; newnode->numCols = from->numCols;
newnode->grpColIdx = palloc(from->numCols * sizeof(AttrNumber)); newnode->grpColIdx = palloc(from->numCols * sizeof(AttrNumber));
memcpy(newnode->grpColIdx, from->grpColIdx, from->numCols * sizeof(AttrNumber)); memcpy(newnode->grpColIdx, from->grpColIdx, from->numCols * sizeof(AttrNumber));
Node_Copy(from, newnode, grpstate);
return newnode; return newnode;
} }
@ -507,7 +476,6 @@ _copyAgg(Agg *from)
CopyPlanFields((Plan *) from, (Plan *) newnode); CopyPlanFields((Plan *) from, (Plan *) newnode);
newnode->aggs = get_agg_tlist_references(newnode); newnode->aggs = get_agg_tlist_references(newnode);
Node_Copy(from, newnode, aggstate);
return newnode; return newnode;
} }
@ -553,7 +521,6 @@ _copyUnique(Unique *from)
else else
newnode->uniqueAttr = NULL; newnode->uniqueAttr = NULL;
newnode->uniqueAttrNum = from->uniqueAttrNum; newnode->uniqueAttrNum = from->uniqueAttrNum;
Node_Copy(from, newnode, uniquestate);
return newnode; return newnode;
} }
@ -579,9 +546,8 @@ _copyHash(Hash *from)
* ---------------- * ----------------
*/ */
Node_Copy(from, newnode, hashkey); Node_Copy(from, newnode, hashkey);
Node_Copy(from, newnode, hashstate);
newnode->hashtable = from->hashtable; /* both are unused !.. */
newnode->hashtablekey = from->hashtablekey; newnode->hashtablekey = from->hashtablekey;
newnode->hashtablesize = from->hashtablesize; newnode->hashtablesize = from->hashtablesize;
@ -599,7 +565,6 @@ _copySubPlan(SubPlan *from)
newnode->setParam = listCopy(from->setParam); newnode->setParam = listCopy(from->setParam);
newnode->parParam = listCopy(from->parParam); newnode->parParam = listCopy(from->parParam);
Node_Copy(from, newnode, sublink); Node_Copy(from, newnode, sublink);
newnode->shutdown = from->shutdown;
return newnode; return newnode;
} }
@ -1901,7 +1866,7 @@ copyObject(void *from)
} }
break; break;
default: default:
elog(NOTICE, "copyObject: don't know how to copy %d", nodeTag(from)); elog(ERROR, "copyObject: don't know how to copy %d", nodeTag(from));
retval = from; retval = from;
break; break;
} }

View File

@ -239,7 +239,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/parser/Attic/gram.c,v 2.64 1999/01/26 23:32:04 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/parser/Attic/gram.c,v 2.65 1999/01/29 09:23:02 vadim Exp $
* *
* HISTORY * HISTORY
* AUTHOR DATE MAJOR EVENT * AUTHOR DATE MAJOR EVENT
@ -4791,7 +4791,7 @@ static const short yycheck[] = { 3,
-1, -1, -1, -1, -1, -1, -1, 214 -1, -1, -1, -1, -1, -1, -1, 214
}; };
/* -*-C-*- Note some compilers choke on comments on `#line' lines. */ /* -*-C-*- Note some compilers choke on comments on `#line' lines. */
#line 3 "/usr/local/bison/bison.simple" #line 3 "/usr/share/misc/bison.simple"
/* Skeleton output parser for bison, /* Skeleton output parser for bison,
Copyright (C) 1984, 1989, 1990 Free Software Foundation, Inc. Copyright (C) 1984, 1989, 1990 Free Software Foundation, Inc.
@ -4984,7 +4984,7 @@ __yy_memcpy (char *to, char *from, int count)
#endif #endif
#endif #endif
#line 196 "/usr/local/bison/bison.simple" #line 196 "/usr/share/misc/bison.simple"
/* The user can define YYPARSE_PARAM as the name of an argument to be passed /* The user can define YYPARSE_PARAM as the name of an argument to be passed
into yyparse. The argument should have type void *. into yyparse. The argument should have type void *.
@ -11088,7 +11088,7 @@ case 960:
break;} break;}
} }
/* the action file gets copied in in place of this dollarsign */ /* the action file gets copied in in place of this dollarsign */
#line 498 "/usr/local/bison/bison.simple" #line 498 "/usr/share/misc/bison.simple"
yyvsp -= yylen; yyvsp -= yylen;
yyssp -= yylen; yyssp -= yylen;

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/time/tqual.c,v 1.23 1998/12/18 09:10:39 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/utils/time/tqual.c,v 1.24 1999/01/29 09:23:12 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -368,6 +368,7 @@ bool
HeapTupleSatisfiesDirty(HeapTupleHeader tuple) HeapTupleSatisfiesDirty(HeapTupleHeader tuple)
{ {
SnapshotDirty->xmin = SnapshotDirty->xmax = InvalidTransactionId; SnapshotDirty->xmin = SnapshotDirty->xmax = InvalidTransactionId;
ItemPointerSetInvalid(&(SnapshotDirty->tid));
if (AMI_OVERRIDE) if (AMI_OVERRIDE)
return true; return true;
@ -413,6 +414,7 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple)
{ {
if (tuple->t_infomask & HEAP_MARKED_FOR_UPDATE) if (tuple->t_infomask & HEAP_MARKED_FOR_UPDATE)
return true; return true;
SnapshotDirty->tid = tuple->t_ctid;
return false; /* updated by other */ return false; /* updated by other */
} }
@ -437,6 +439,7 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple)
if (tuple->t_infomask & HEAP_MARKED_FOR_UPDATE) if (tuple->t_infomask & HEAP_MARKED_FOR_UPDATE)
return true; return true;
SnapshotDirty->tid = tuple->t_ctid;
return false; /* updated by other */ return false; /* updated by other */
} }

View File

@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: execnodes.h,v 1.21 1999/01/25 12:01:19 vadim Exp $ * $Id: execnodes.h,v 1.22 1999/01/29 09:23:13 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -194,21 +194,27 @@ typedef struct JunkFilter
*/ */
typedef struct EState typedef struct EState
{ {
NodeTag type; NodeTag type;
ScanDirection es_direction; ScanDirection es_direction;
Snapshot es_snapshot; Snapshot es_snapshot;
List *es_range_table; List *es_range_table;
RelationInfo *es_result_relation_info; RelationInfo *es_result_relation_info;
Relation es_into_relation_descriptor; Relation es_into_relation_descriptor;
ParamListInfo es_param_list_info; ParamListInfo es_param_list_info;
ParamExecData *es_param_exec_vals; /* this is for subselects */ ParamExecData *es_param_exec_vals; /* this is for subselects */
int es_BaseId; int es_BaseId;
TupleTable es_tupleTable; TupleTable es_tupleTable;
JunkFilter *es_junkFilter; JunkFilter *es_junkFilter;
int *es_refcount; int *es_refcount;
uint32 es_processed; /* # of tuples processed */ uint32 es_processed; /* # of tuples processed */
Oid es_lastoid; /* last oid processed (by INSERT) */ Oid es_lastoid; /* last oid processed (by INSERT) */
List *es_rowMark; /* not good place, but there is no other */ List *es_rowMark; /* not good place, but there is no other */
/* Below is to re-evaluate plan qual in READ COMMITTED mode */
struct Plan *es_origPlan;
Pointer es_evalPlanQual;
bool *es_evTupleNull;
HeapTuple *es_evTuple;
bool es_useEvalPlan;
} EState; } EState;
/* ---------------- /* ----------------

View File

@ -7,7 +7,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: tqual.h,v 1.18 1998/12/18 09:09:55 vadim Exp $ * $Id: tqual.h,v 1.19 1999/01/29 09:23:17 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -22,6 +22,7 @@ typedef struct SnapshotData
TransactionId xmax; /* XID >= xmax are invisible to me */ TransactionId xmax; /* XID >= xmax are invisible to me */
uint32 xcnt; /* # of xact below */ uint32 xcnt; /* # of xact below */
TransactionId *xip; /* array of xacts in progress */ TransactionId *xip; /* array of xacts in progress */
ItemPointerData tid; /* required for Dirty snapshot -:( */
} SnapshotData; } SnapshotData;
typedef SnapshotData *Snapshot; typedef SnapshotData *Snapshot;