postgresql/contrib/intagg/int_aggregate.c
Tom Lane 234a02b2a8 Replace direct assignments to VARATT_SIZEP(x) with SET_VARSIZE(x, len).
Get rid of VARATT_SIZE and VARATT_DATA, which were simply redundant with
VARSIZE and VARDATA, and as a consequence almost no code was using the
longer names.  Rename the length fields of struct varlena and various
derived structures to catch anyplace that was accessing them directly;
and clean up various places so caught.  In itself this patch doesn't
change any behavior at all, but it is necessary infrastructure if we hope
to play any games with the representation of varlena headers.
Greg Stark and Tom Lane
2007-02-27 23:48:10 +00:00

281 lines
6.5 KiB
C

/*
* Integer array aggregator / enumerator
*
* Mark L. Woodward
* DMN Digital Music Network.
* www.dmn.com
*
* $PostgreSQL: pgsql/contrib/intagg/int_aggregate.c,v 1.26 2007/02/27 23:48:06 tgl Exp $
*
* Copyright (C) Digital Music Network
* December 20, 2001
*
* This file is the property of the Digital Music Network (DMN).
* It is being made available to users of the PostgreSQL system
* under the BSD license.
*/
#include "postgres.h"
#include <ctype.h>
#include <sys/types.h>
#include "access/heapam.h"
#include "access/tupmacs.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
PG_MODULE_MAGIC;
/*
* This is actually a postgres version of a one dimensional array.
* We cheat a little by using the lower-bound field as an indicator
* of the physically allocated size, while the dimensionality is the
* count of items accumulated so far.
*/
typedef struct
{
ArrayType a;
int items;
int lower;
int4 array[1];
} PGARRAY;
/* This is used to keep track of our position during enumeration */
typedef struct callContext
{
PGARRAY *p;
int num;
int flags;
} CTX;
#define TOASTED 1
#define START_NUM 8 /* initial size of arrays */
#define PGARRAY_SIZE(n) (sizeof(PGARRAY) + (((n)-1)*sizeof(int4)))
static PGARRAY *GetPGArray(PGARRAY * p, AggState *aggstate, bool fAdd);
static PGARRAY *ShrinkPGArray(PGARRAY * p);
Datum int_agg_state(PG_FUNCTION_ARGS);
Datum int_agg_final_array(PG_FUNCTION_ARGS);
Datum int_enum(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(int_agg_state);
PG_FUNCTION_INFO_V1(int_agg_final_array);
PG_FUNCTION_INFO_V1(int_enum);
/*
* Manage the allocation state of the array
*
* Note that the array needs to be in a reasonably long-lived context,
* ie the Agg node's aggcontext.
*/
static PGARRAY *
GetPGArray(PGARRAY * p, AggState *aggstate, bool fAdd)
{
if (!p)
{
/* New array */
int cb = PGARRAY_SIZE(START_NUM);
p = (PGARRAY *) MemoryContextAlloc(aggstate->aggcontext, cb);
SET_VARSIZE(p, cb);
p->a.ndim = 1;
p->a.dataoffset = 0; /* we don't support nulls, for now */
p->a.elemtype = INT4OID;
p->items = 0;
p->lower = START_NUM;
}
else if (fAdd)
{
/* Ensure array has space for another item */
if (p->items >= p->lower)
{
PGARRAY *pn;
int n = p->lower * 2;
int cbNew = PGARRAY_SIZE(n);
pn = (PGARRAY *) MemoryContextAlloc(aggstate->aggcontext, cbNew);
memcpy(pn, p, VARSIZE(p));
SET_VARSIZE(pn, cbNew);
pn->lower = n;
/* do not pfree(p), because nodeAgg.c will */
p = pn;
}
}
return p;
}
/*
* Shrinks the array to its actual size and moves it into the standard
* memory allocation context
*/
static PGARRAY *
ShrinkPGArray(PGARRAY * p)
{
PGARRAY *pnew;
/* get target size */
int cb = PGARRAY_SIZE(p->items);
/* use current transaction context */
pnew = palloc(cb);
memcpy(pnew, p, cb);
/* fix up the fields in the new array to match normal conventions */
SET_VARSIZE(pnew, cb);
pnew->lower = 1;
/* do not pfree(p), because nodeAgg.c will */
return pnew;
}
/* Called for each iteration during an aggregate function */
Datum
int_agg_state(PG_FUNCTION_ARGS)
{
PGARRAY *state;
PGARRAY *p;
/*
* As of PG 8.1 we can actually verify that we are being used as an
* aggregate function, and so it is safe to scribble on our left input.
*/
if (!(fcinfo->context && IsA(fcinfo->context, AggState)))
elog(ERROR, "int_agg_state may only be used as an aggregate");
if (PG_ARGISNULL(0))
state = NULL; /* first time through */
else
state = (PGARRAY *) PG_GETARG_POINTER(0);
p = GetPGArray(state, (AggState *) fcinfo->context, true);
if (!PG_ARGISNULL(1))
{
int4 value = PG_GETARG_INT32(1);
if (!p) /* internal error */
elog(ERROR, "no aggregate storage");
else if (p->items >= p->lower) /* internal error */
elog(ERROR, "aggregate storage too small");
else
p->array[p->items++] = value;
}
PG_RETURN_POINTER(p);
}
/*
* This is the final function used for the integer aggregator. It returns all
* the integers collected as a one dimensional integer array
*/
Datum
int_agg_final_array(PG_FUNCTION_ARGS)
{
PGARRAY *state;
PGARRAY *p;
PGARRAY *pnew;
/*
* As of PG 8.1 we can actually verify that we are being used as an
* aggregate function, and so it is safe to scribble on our left input.
*/
if (!(fcinfo->context && IsA(fcinfo->context, AggState)))
elog(ERROR, "int_agg_final_array may only be used as an aggregate");
if (PG_ARGISNULL(0))
state = NULL; /* zero items in aggregation */
else
state = (PGARRAY *) PG_GETARG_POINTER(0);
p = GetPGArray(state, (AggState *) fcinfo->context, false);
pnew = ShrinkPGArray(p);
PG_RETURN_POINTER(pnew);
}
/*
* This function accepts an array, and returns one item for each entry in the
* array
*/
Datum
int_enum(PG_FUNCTION_ARGS)
{
PGARRAY *p = (PGARRAY *) PG_GETARG_POINTER(0);
CTX *pc;
ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
if (!rsi || !IsA(rsi, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("int_enum called in context that cannot accept a set")));
if (!p)
{
elog(WARNING, "no data sent");
PG_RETURN_NULL();
}
if (!fcinfo->flinfo->fn_extra)
{
/* Allocate working state */
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(fcinfo->flinfo->fn_mcxt);
pc = (CTX *) palloc(sizeof(CTX));
/* Don't copy attribute if you don't need to */
if (VARATT_IS_EXTENDED(p))
{
/* Toasted!!! */
pc->p = (PGARRAY *) PG_DETOAST_DATUM_COPY(p);
pc->flags = TOASTED;
}
else
{
/* Untoasted */
pc->p = p;
pc->flags = 0;
}
/* Now that we have a detoasted array, verify dimensions */
/* We'll treat a zero-D array as empty, below */
if (pc->p->a.ndim > 1)
elog(ERROR, "int_enum only accepts 1-D arrays");
pc->num = 0;
fcinfo->flinfo->fn_extra = (void *) pc;
MemoryContextSwitchTo(oldcontext);
}
else
/* use existing working state */
pc = (CTX *) fcinfo->flinfo->fn_extra;
/* Are we done yet? */
if (pc->p->a.ndim < 1 || pc->num >= pc->p->items)
{
/* We are done */
if (pc->flags & TOASTED)
pfree(pc->p);
pfree(pc);
fcinfo->flinfo->fn_extra = NULL;
rsi->isDone = ExprEndResult;
}
else
{
/* nope, return the next value */
int val = pc->p->array[pc->num++];
rsi->isDone = ExprMultipleResult;
PG_RETURN_INT32(val);
}
PG_RETURN_NULL();
}