Ensure that the contents of a holdable cursor don't depend on out-of-line

toasted values, since those could get dropped once the cursor's transaction
is over.  Per bug #4553 from Andrew Gierth.

Back-patch as far as 8.1.  The bug actually exists back to 7.4 when holdable
cursors were introduced, but this patch won't work before 8.1 without
significant adjustments.  Given the lack of field complaints, it doesn't seem
worth the work (and risk of introducing new bugs) to try to make a patch for
the older branches.
This commit is contained in:
Tom Lane 2008-12-01 17:06:21 +00:00
parent 3191ab5dcc
commit ec543db77b
4 changed files with 140 additions and 17 deletions

View File

@ -14,7 +14,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/portalcmds.c,v 1.76 2008/11/30 20:51:25 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/portalcmds.c,v 1.77 2008/12/01 17:06:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -351,11 +351,15 @@ PersistHoldablePortal(Portal portal)
*/
ExecutorRewind(queryDesc);
/* Change the destination to output to the tuplestore */
/*
* Change the destination to output to the tuplestore. Note we
* tell the tuplestore receiver to detoast all data passed through it.
*/
queryDesc->dest = CreateDestReceiver(DestTuplestore);
SetTuplestoreDestReceiverParams(queryDesc->dest,
portal->holdStore,
portal->holdContext);
portal->holdContext,
true);
/* Fetch the result set into the tuplestore */
ExecutorRun(queryDesc, ForwardScanDirection, 0L);

View File

@ -1,46 +1,97 @@
/*-------------------------------------------------------------------------
*
* tstoreReceiver.c
* an implementation of DestReceiver that stores the result tuples in
* a Tuplestore
* An implementation of DestReceiver that stores the result tuples in
* a Tuplestore.
*
* Optionally, we can force detoasting (but not decompression) of out-of-line
* toasted values. This is to support cursors WITH HOLD, which must retain
* data even if the underlying table is dropped.
*
*
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/tstoreReceiver.c,v 1.20 2008/11/30 20:51:25 tgl Exp $
* $PostgreSQL: pgsql/src/backend/executor/tstoreReceiver.c,v 1.21 2008/12/01 17:06:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/tuptoaster.h"
#include "executor/tstoreReceiver.h"
typedef struct
{
DestReceiver pub;
Tuplestorestate *tstore;
MemoryContext cxt;
/* parameters: */
Tuplestorestate *tstore; /* where to put the data */
MemoryContext cxt; /* context containing tstore */
bool detoast; /* were we told to detoast? */
/* workspace: */
Datum *outvalues; /* values array for result tuple */
Datum *tofree; /* temp values to be pfree'd */
} TStoreState;
static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
/*
* Prepare to receive tuples from executor.
*/
static void
tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* do nothing */
TStoreState *myState = (TStoreState *) self;
bool needtoast = false;
Form_pg_attribute *attrs = typeinfo->attrs;
int natts = typeinfo->natts;
int i;
/* Check if any columns require detoast work */
if (myState->detoast)
{
for (i = 0; i < natts; i++)
{
if (attrs[i]->attisdropped)
continue;
if (attrs[i]->attlen == -1)
{
needtoast = true;
break;
}
}
}
/* Set up appropriate callback */
if (needtoast)
{
myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
/* Create workspace */
myState->outvalues = (Datum *)
MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
myState->tofree = (Datum *)
MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
}
else
{
myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
myState->outvalues = NULL;
myState->tofree = NULL;
}
}
/*
* Receive a tuple from the executor and store it in the tuplestore.
* This is for the easy case where we don't have to detoast.
*/
static void
tstoreReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
{
TStoreState *myState = (TStoreState *) self;
MemoryContext oldcxt = MemoryContextSwitchTo(myState->cxt);
@ -50,13 +101,77 @@ tstoreReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
MemoryContextSwitchTo(oldcxt);
}
/*
* Receive a tuple from the executor and store it in the tuplestore.
* This is for the case where we have to detoast any toasted values.
*/
static void
tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
{
TStoreState *myState = (TStoreState *) self;
TupleDesc typeinfo = slot->tts_tupleDescriptor;
Form_pg_attribute *attrs = typeinfo->attrs;
int natts = typeinfo->natts;
int nfree;
int i;
MemoryContext oldcxt;
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
/*
* Fetch back any out-of-line datums. We build the new datums array in
* myState->outvalues[] (but we can re-use the slot's isnull array).
* Also, remember the fetched values to free afterwards.
*/
nfree = 0;
for (i = 0; i < natts; i++)
{
Datum val = slot->tts_values[i];
if (!attrs[i]->attisdropped &&
attrs[i]->attlen == -1 &&
!slot->tts_isnull[i])
{
if (VARATT_IS_EXTERNAL(DatumGetPointer(val)))
{
val = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *)
DatumGetPointer(val)));
myState->tofree[nfree++] = val;
}
}
myState->outvalues[i] = val;
}
/*
* Push the modified tuple into the tuplestore.
*/
oldcxt = MemoryContextSwitchTo(myState->cxt);
tuplestore_putvalues(myState->tstore, typeinfo,
myState->outvalues, slot->tts_isnull);
MemoryContextSwitchTo(oldcxt);
/* And release any temporary detoasted values */
for (i = 0; i < nfree; i++)
pfree(DatumGetPointer(myState->tofree[i]));
}
/*
* Clean up at end of an executor run
*/
static void
tstoreShutdownReceiver(DestReceiver *self)
{
/* do nothing */
TStoreState *myState = (TStoreState *) self;
/* Release workspace if any */
if (myState->outvalues)
pfree(myState->outvalues);
myState->outvalues = NULL;
if (myState->tofree)
pfree(myState->tofree);
myState->tofree = NULL;
}
/*
@ -76,7 +191,7 @@ CreateTuplestoreDestReceiver(void)
{
TStoreState *self = (TStoreState *) palloc0(sizeof(TStoreState));
self->pub.receiveSlot = tstoreReceiveSlot;
self->pub.receiveSlot = tstoreReceiveSlot_notoast; /* might change */
self->pub.rStartup = tstoreStartupReceiver;
self->pub.rShutdown = tstoreShutdownReceiver;
self->pub.rDestroy = tstoreDestroyReceiver;
@ -93,11 +208,13 @@ CreateTuplestoreDestReceiver(void)
void
SetTuplestoreDestReceiverParams(DestReceiver *self,
Tuplestorestate *tStore,
MemoryContext tContext)
MemoryContext tContext,
bool detoast)
{
TStoreState *myState = (TStoreState *) self;
Assert(myState->pub.mydest == DestTuplestore);
myState->tstore = tStore;
myState->cxt = tContext;
myState->detoast = detoast;
}

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.126 2008/11/30 20:51:25 tgl Exp $
* $PostgreSQL: pgsql/src/backend/tcop/pquery.c,v 1.127 2008/12/01 17:06:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -1036,7 +1036,8 @@ FillPortalStore(Portal portal, bool isTopLevel)
treceiver = CreateDestReceiver(DestTuplestore);
SetTuplestoreDestReceiverParams(treceiver,
portal->holdStore,
portal->holdContext);
portal->holdContext,
false);
completionTag[0] = '\0';

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/executor/tstoreReceiver.h,v 1.11 2008/11/30 20:51:25 tgl Exp $
* $PostgreSQL: pgsql/src/include/executor/tstoreReceiver.h,v 1.12 2008/12/01 17:06:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -23,6 +23,7 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
Tuplestorestate *tStore,
MemoryContext tContext);
MemoryContext tContext,
bool detoast);
#endif /* TSTORE_RECEIVER_H */